1
- def __del__ (self ):
1
+ def __del__ (self ) -> None :
2
2
try :
3
3
if self .cb :
4
4
libvirtmod .virStreamEventRemoveCallback (self ._o )
@@ -9,7 +9,7 @@ def __del__(self):
9
9
libvirtmod .virStreamFree (self ._o )
10
10
self ._o = None
11
11
12
- def _dispatchStreamEventCallback (self , events , cbData ) :
12
+ def _dispatchStreamEventCallback (self , events : int , cbData : Dict [ str , Any ]) -> int :
13
13
"""
14
14
Dispatches events to python user's stream event callbacks
15
15
"""
@@ -19,14 +19,14 @@ def _dispatchStreamEventCallback(self, events, cbData):
19
19
cb (self , events , opaque )
20
20
return 0
21
21
22
- def eventAddCallback (self , events , cb , opaque ) :
22
+ def eventAddCallback (self , events : int , cb : Callable [[ 'virStream' , int , _T ], None ], opaque : _T ) -> None :
23
23
self .cb = cb
24
24
cbData = {"stream" : self , "cb" : cb , "opaque" : opaque }
25
25
ret = libvirtmod .virStreamEventAddCallback (self ._o , events , cbData )
26
26
if ret == - 1 :
27
27
raise libvirtError ('virStreamEventAddCallback() failed' )
28
28
29
- def recvAll (self , handler , opaque ) :
29
+ def recvAll (self , handler : Callable [[ 'virStream' , bytes , _T ], int ], opaque : _T ) -> None :
30
30
"""Receive the entire data stream, sending the data to the
31
31
requested data sink. This is simply a convenient alternative
32
32
to virStreamRecv, for apps that do blocking-I/O.
@@ -59,7 +59,7 @@ def handler(stream, # virStream instance
59
59
pass
60
60
raise e
61
61
62
- def sendAll (self , handler , opaque ) :
62
+ def sendAll (self , handler : Callable [[ 'virStream' , int , _T ], bytes ], opaque : _T ) -> None :
63
63
"""
64
64
Send the entire data stream, reading the data from the
65
65
requested data source. This is simply a convenient alternative
@@ -92,7 +92,7 @@ def handler(stream, # virStream instance
92
92
raise libvirtError ("cannot use sendAll with "
93
93
"nonblocking stream" )
94
94
95
- def recv (self , nbytes ) :
95
+ def recv (self , nbytes : int ) -> bytes :
96
96
"""Reads a series of bytes from the stream. This method may
97
97
block the calling application for an arbitrary amount
98
98
of time.
@@ -110,7 +110,7 @@ def recv(self, nbytes):
110
110
raise libvirtError ('virStreamRecv() failed' )
111
111
return ret
112
112
113
- def send (self , data ) :
113
+ def send (self , data : bytes ) -> int :
114
114
"""Write a series of bytes to the stream. This method may
115
115
block the calling application for an arbitrary amount
116
116
of time. Once an application has finished sending data
@@ -129,7 +129,7 @@ def send(self, data):
129
129
raise libvirtError ('virStreamSend() failed' )
130
130
return ret
131
131
132
- def recvHole (self , flags = 0 ) :
132
+ def recvHole (self , flags : int = 0 ) -> int :
133
133
"""This method is used to determine the length in bytes
134
134
of the empty space to be created in a stream's target
135
135
file when uploading or downloading sparsely populated
@@ -140,7 +140,7 @@ def recvHole(self, flags=0):
140
140
raise libvirtError ('virStreamRecvHole() failed' )
141
141
return ret
142
142
143
- def sendHole (self , length , flags = 0 ) :
143
+ def sendHole (self , length : int , flags : int = 0 ) -> int :
144
144
"""Rather than transmitting empty file space, this method
145
145
directs the stream target to create length bytes of empty
146
146
space. This method would be used when uploading or
@@ -152,7 +152,7 @@ def sendHole(self, length, flags=0):
152
152
raise libvirtError ('virStreamSendHole() failed' )
153
153
return ret
154
154
155
- def recvFlags (self , nbytes , flags = 0 ) :
155
+ def recvFlags (self , nbytes : int , flags : int = 0 ) -> Union [ bytes , int ] :
156
156
"""Reads a series of bytes from the stream. This method may
157
157
block the calling application for an arbitrary amount
158
158
of time. This is just like recv except it has flags
@@ -171,7 +171,7 @@ def recvFlags(self, nbytes, flags=0):
171
171
raise libvirtError ('virStreamRecvFlags() failed' )
172
172
return ret
173
173
174
- def sparseRecvAll (self , handler , holeHandler , opaque ) :
174
+ def sparseRecvAll (self , handler : Callable [[ 'virStream' , bytes , _T ], Union [ bytes , int ]], holeHandler : Callable [[ 'virStream' , int , _T ], Optional [ int ]], opaque : _T ) -> None :
175
175
"""Receive the entire data stream, sending the data to
176
176
the requested data sink handler and calling the skip
177
177
holeHandler to generate holes for sparse stream targets.
@@ -219,7 +219,7 @@ def holeHandler(stream, # virStream instance
219
219
self .abort ()
220
220
raise RuntimeError ("sparseRecvAll handler returned %d" % ret )
221
221
222
- def sparseSendAll (self , handler , holeHandler , skipHandler , opaque ) :
222
+ def sparseSendAll (self , handler : Callable [[ 'virStream' , int , _T ], Union [ bytes , int ]], holeHandler : Callable [[ 'virStream' , _T ], Tuple [ bool , int ]], skipHandler : Callable [[ 'virStream' , int , _T ], int ], opaque : _T ) -> None :
223
223
"""Send the entire data stream, reading the data from the
224
224
requested data source. This is simply a convenient
225
225
alternative to virStreamSend, for apps that do
@@ -269,6 +269,7 @@ def skipHandler(stream, # virStream instance
269
269
if not got :
270
270
break
271
271
272
+ assert isinstance (got , bytes )
272
273
ret = self .send (got )
273
274
if ret == - 2 :
274
275
raise libvirtError ("cannot use sparseSendAll with "
0 commit comments