1
1
# vim: set fileencoding=utf-8 :
2
2
3
3
import tempfile
4
+ import pytest
4
5
5
6
import pyvips
6
7
from helpers import JPEG_FILE , temp_filename , skip_if_no
7
8
9
+ if pyvips .at_least_libvips (8 , 9 ):
10
+ class Mystreami (pyvips .Streamiu ):
11
+ def __init__ (self , pointer ):
12
+ super (Mystreami , self ).__init__ (pointer )
8
13
9
- class Mystreami ( pyvips . Streamiu ):
10
- def __init__ ( self , pointer ):
11
- super ( Mystreami , self ). __init__ ( pointer )
14
+ # these must be attached before we build the streamiu, see `new`
15
+ self . signal_connect ( 'read' , self . read_cb )
16
+ self . signal_connect ( 'seek' , self . seek_cb )
12
17
13
- # these must be attached before we build the streamiu, see `new`
14
- self . signal_connect ( 'read' , self . read_cb )
15
- self . signal_connect ( 'seek' , self . seek_cb )
18
+ @ staticmethod
19
+ def new ( name , pipe_mode = False ):
20
+ """Make a new input stream from a filename.
16
21
17
- @staticmethod
18
- def new (name , pipe_mode = False ):
19
- """Make a new input stream from a filename.
22
+ """
20
23
21
- """
24
+ gtype = pyvips .type_from_name ('VipsStreamiu' )
25
+ pointer = pyvips .GObject .new_pointer_from_gtype (gtype )
26
+ self = Mystreami (pointer )
22
27
23
- gtype = pyvips .type_from_name ('VipsStreamiu' )
24
- pointer = pyvips .GObject .new_pointer_from_gtype (gtype )
25
- self = Mystreami (pointer )
28
+ self .name = name
29
+ self .pipe_mode = pipe_mode
30
+ self .loaded_bytes = open (name , 'rb' ).read ()
31
+ self .memory = memoryview (self .loaded_bytes )
32
+ self .length = len (self .loaded_bytes )
33
+ self .read_point = 0
26
34
27
- self .name = name
28
- self .pipe_mode = pipe_mode
29
- self .loaded_bytes = open (name , 'rb' ).read ()
30
- self .memory = memoryview (self .loaded_bytes )
31
- self .length = len (self .loaded_bytes )
32
- self .read_point = 0
35
+ return self .build ()
33
36
34
- return self .build ()
37
+ def read_cb (self , buf ):
38
+ # print('read: {0} bytes ...'.format(len(buf)))
39
+ p = self .read_point
40
+ bytes_available = self .length - p
41
+ bytes_to_copy = min (bytes_available , len (buf ))
42
+ buf [:bytes_to_copy ] = self .memory [p :p + bytes_to_copy ]
43
+ self .read_point += bytes_to_copy
44
+ # print(' copied from position {0}'.format(p))
35
45
36
- def read_cb (self , buf ):
37
- # print('read: {0} bytes ...'.format(len(buf)))
38
- p = self .read_point
39
- bytes_available = self .length - p
40
- bytes_to_copy = min (bytes_available , len (buf ))
41
- buf [:bytes_to_copy ] = self .memory [p :p + bytes_to_copy ]
42
- self .read_point += bytes_to_copy
43
- # print(' copied from position {0}'.format(p))
46
+ return bytes_to_copy
44
47
45
- return bytes_to_copy
48
+ def seek_cb (self , offset , whence ):
49
+ # print('seek: offset = {0}, whence = {1} ...'
50
+ # .format(offset, whence))
46
51
47
- def seek_cb (self , offset , whence ):
48
- # print('seek: offset = {0}, whence = {1} ...'.format(offset, whence))
52
+ if self .pipe_mode :
53
+ # print(' -1 (pipe mode)')
54
+ return - 1
49
55
50
- if self .pipe_mode :
51
- # print(' -1 (pipe mode)')
52
- return - 1
56
+ if whence == 0 :
57
+ # SEEK_SET
58
+ new_read_point = offset
59
+ elif whence == 1 :
60
+ # SEEK_CUR
61
+ new_read_point = self .read_point + offset
62
+ elif whence == 2 :
63
+ # SEEK_END
64
+ new_read_point = self .length + offset
65
+ else :
66
+ raise Exception ('bad whence {0}' .format (whence ))
53
67
54
- if whence == 0 :
55
- # SEEK_SET
56
- new_read_point = offset
57
- elif whence == 1 :
58
- # SEEK_CUR
59
- new_read_point = self .read_point + offset
60
- elif whence == 2 :
61
- # SEEK_END
62
- new_read_point = self .length + offset
63
- else :
64
- raise Exception ('bad whence {0}' .format (whence ))
68
+ self .read_point = max (0 , min (self .length , new_read_point ))
69
+ # print(' new read_point = {0}'.format(self.read_point))
65
70
66
- self .read_point = max (0 , min (self .length , new_read_point ))
67
- # print(' new read_point = {0}'.format(self.read_point))
71
+ return self .read_point
68
72
69
- return self .read_point
70
73
74
+ class Mystreamo (pyvips .Streamou ):
75
+ def __init__ (self , pointer ):
76
+ super (Mystreamo , self ).__init__ (pointer )
71
77
72
- class Mystreamo ( pyvips . Streamou ):
73
- def __init__ ( self , pointer ):
74
- super ( Mystreamo , self ). __init__ ( pointer )
78
+ # these must be attached before we build the streamou, see `new`
79
+ self . signal_connect ( 'write' , self . write_cb )
80
+ self . signal_connect ( 'finish' , self . finish_cb )
75
81
76
- # these must be attached before we build the streamou, see `new`
77
- self . signal_connect ( 'write' , self . write_cb )
78
- self . signal_connect ( 'finish' , self . finish_cb )
82
+ @ staticmethod
83
+ def new ( name ):
84
+ """Make a new output stream from a filename.
79
85
80
- @staticmethod
81
- def new (name ):
82
- """Make a new output stream from a filename.
86
+ """
83
87
84
- """
88
+ gtype = pyvips .type_from_name ('VipsStreamou' )
89
+ pointer = pyvips .GObject .new_pointer_from_gtype (gtype )
90
+ self = Mystreamo (pointer )
85
91
86
- gtype = pyvips .type_from_name ('VipsStreamou' )
87
- pointer = pyvips .GObject .new_pointer_from_gtype (gtype )
88
- self = Mystreamo (pointer )
92
+ self .name = name
93
+ self .f = open (name , 'wb' )
89
94
90
- self .name = name
91
- self .f = open (name , 'wb' )
95
+ return self .build ()
92
96
93
- return self .build ()
97
+ def write_cb (self , buf ):
98
+ # print('write: {0} bytes ...'.format(len(buf)))
99
+ # py2 write does not return number of bytes written
100
+ self .f .write (buf )
94
101
95
- def write_cb (self , buf ):
96
- # print('write: {0} bytes ...'.format(len(buf)))
97
- # py2 write does not return number of bytes written
98
- self .f .write (buf )
102
+ return len (buf )
99
103
100
- return len (buf )
104
+ def finish_cb (self ):
105
+ # print('finish: ...')
106
+ self .f .close ()
101
107
102
- def finish_cb (self ):
103
- # print('finish: ...')
104
- self .f .close ()
105
108
106
-
107
- class TestSignals :
109
+ class TestStreams :
108
110
@classmethod
109
111
def setup_class (cls ):
110
112
cls .tempdir = tempfile .mkdtemp ()
111
113
112
114
@skip_if_no ('jpegload' )
115
+ @pytest .mark .skipif (not pyvips .at_least_libvips (8 , 9 ),
116
+ reason = "requires libvips >= 8.9" )
113
117
def test_stream (self ):
114
118
streami = pyvips .Streami .new_from_file (JPEG_FILE )
115
119
image = pyvips .Image .new_from_stream (streami , '' , access = 'sequential' )
@@ -123,6 +127,8 @@ def test_stream(self):
123
127
assert abs (image - image2 ).abs ().max () < 10
124
128
125
129
@skip_if_no ('jpegload' )
130
+ @pytest .mark .skipif (not pyvips .at_least_libvips (8 , 9 ),
131
+ reason = "requires libvips >= 8.9" )
126
132
def test_streamu (self ):
127
133
streamiu = Mystreami .new (JPEG_FILE )
128
134
image = pyvips .Image .new_from_stream (streamiu , '' , access = 'sequential' )
@@ -137,6 +143,8 @@ def test_streamu(self):
137
143
assert abs (image - image2 ).abs ().max () < 10
138
144
139
145
@skip_if_no ('jpegload' )
146
+ @pytest .mark .skipif (not pyvips .at_least_libvips (8 , 9 ),
147
+ reason = "requires libvips >= 8.9" )
140
148
def test_streamu_pipe (self ):
141
149
streamiu = Mystreami .new (JPEG_FILE , True )
142
150
image = pyvips .Image .new_from_stream (streamiu , '' , access = 'sequential' )
0 commit comments