8
8
import re
9
9
import tempfile
10
10
import time
11
+ from typing import List , Optional , Dict , Tuple , Any , Callable , Union
11
12
12
13
import attr
13
14
from pexpect import TIMEOUT
@@ -95,18 +96,18 @@ class QEMUDriver(ConsoleExpectMixin, Driver, PowerProtocol, ConsoleProtocol):
95
96
default = None ,
96
97
validator = attr .validators .optional (attr .validators .instance_of (str )))
97
98
98
- def __attrs_post_init__ (self ):
99
+ def __attrs_post_init__ (self ) -> None :
99
100
super ().__attrs_post_init__ ()
100
- self .status = 0
101
- self .txdelay = None
102
- self ._child = None
103
- self ._tempdir = None
104
- self ._socket = None
105
- self ._clientsocket = None
106
- self ._forwarded_ports = {}
101
+ self .status : int = 0
102
+ self .txdelay : Optional [ float ] = None
103
+ self ._child : Optional [ subprocess . Popen ] = None
104
+ self ._tempdir : Optional [ str ] = None
105
+ self ._socket : Optional [ socket . socket ] = None
106
+ self ._clientsocket : Optional [ socket . socket ] = None
107
+ self ._forwarded_ports : Dict [ Tuple [ str , str , int ], Tuple [ str , str , int , str , int ]] = {}
107
108
atexit .register (self ._atexit )
108
109
109
- def _atexit (self ):
110
+ def _atexit (self ) -> None :
110
111
if not self ._child :
111
112
return
112
113
self ._child .terminate ()
@@ -116,7 +117,7 @@ def _atexit(self):
116
117
self ._child .kill ()
117
118
self ._child .communicate (timeout = 1 )
118
119
119
- def get_qemu_version (self , qemu_bin ) :
120
+ def get_qemu_version (self , qemu_bin : str ) -> Tuple [ int , int , int ] :
120
121
p = subprocess .run ([qemu_bin , "-version" ], stdout = subprocess .PIPE , encoding = "utf-8" )
121
122
if p .returncode != 0 :
122
123
raise ExecutionError (f"Unable to get QEMU version. QEMU exited with: { p .returncode } " )
@@ -127,7 +128,7 @@ def get_qemu_version(self, qemu_bin):
127
128
128
129
return (int (m .group ('major' )), int (m .group ('minor' )), int (m .group ('micro' )))
129
130
130
- def get_qemu_base_args (self ):
131
+ def get_qemu_base_args (self ) -> List [ str ] :
131
132
"""Returns the base command line used for Qemu without the options
132
133
related to QMP. These options can be used to start an interactive
133
134
Qemu manually for debugging tests
@@ -230,7 +231,7 @@ def get_qemu_base_args(self):
230
231
231
232
return cmd
232
233
233
- def on_activate (self ):
234
+ def on_activate (self ) -> None :
234
235
self ._tempdir = tempfile .mkdtemp (prefix = "labgrid-qemu-tmp-" )
235
236
sockpath = f"{ self ._tempdir } /serialrw"
236
237
self ._socket = socket .socket (socket .AF_UNIX , socket .SOCK_STREAM )
@@ -248,7 +249,7 @@ def on_activate(self):
248
249
self ._cmd .append ("-serial" )
249
250
self ._cmd .append ("chardev:serialsocket" )
250
251
251
- def on_deactivate (self ):
252
+ def on_deactivate (self ) -> None :
252
253
if self .status :
253
254
self .off ()
254
255
if self ._clientsocket :
@@ -259,7 +260,7 @@ def on_deactivate(self):
259
260
shutil .rmtree (self ._tempdir )
260
261
261
262
@step ()
262
- def on (self , pre_start_hook = None ):
263
+ def on (self , pre_start_hook : Optional [ Callable [[], None ]] = None ) -> None :
263
264
"""Start the QEMU subprocess, accept the unix socket connection, run the
264
265
pre_start_hook if applicable and start the emulator using a QMP Command"""
265
266
if self .status :
@@ -295,7 +296,7 @@ def on(self, pre_start_hook=None):
295
296
self .monitor_command ("cont" )
296
297
297
298
@step ()
298
- def off (self ):
299
+ def off (self ) -> None :
299
300
"""Stop the emulator using a monitor command and await the exitcode"""
300
301
if not self .status :
301
302
return
@@ -306,37 +307,38 @@ def off(self):
306
307
self ._child = None
307
308
self .status = 0
308
309
309
- def cycle (self ):
310
+ def cycle (self ) -> None :
310
311
"""Cycle the emulator by restarting it"""
311
312
self .off ()
312
313
self .on ()
313
314
314
315
@step (result = True , args = ['command' , 'arguments' ])
315
- def monitor_command (self , command , arguments = {}):
316
+ def monitor_command (self , command : str , arguments : Dict [ str , Any ] = {}) -> Any :
316
317
"""Execute a monitor_command via the QMP"""
317
318
if not self .status :
318
319
raise ExecutionError (
319
320
"Can't use monitor command on non-running target" )
320
321
return self .qmp .execute (command , arguments )
321
322
322
- def _add_port_forward (self , proto , local_address , local_port , remote_address , remote_port ) :
323
+ def _add_port_forward (self , proto : str , local_address : str , local_port : int , remote_address : str , remote_port : int ) -> None :
323
324
self .monitor_command (
324
325
"human-monitor-command" ,
325
326
{"command-line" : f"hostfwd_add { proto } :{ local_address } :{ local_port } -{ remote_address } :{ remote_port } " },
326
327
)
327
328
328
- def add_port_forward (self , proto , local_address , local_port , remote_address , remote_port ) :
329
+ def add_port_forward (self , proto : str , local_address : str , local_port : int , remote_address : str , remote_port : int ) -> None :
329
330
self ._add_port_forward (proto , local_address , local_port , remote_address , remote_port )
330
- self ._forwarded_ports [(proto , local_address , local_port )] = (proto , local_address , local_port , remote_address , remote_port )
331
+ self ._forwarded_ports [(proto , local_address , local_port )] = (
332
+ proto , local_address , local_port , remote_address , remote_port )
331
333
332
- def remove_port_forward (self , proto , local_address , local_port ) :
334
+ def remove_port_forward (self , proto : str , local_address : str , local_port : int ) -> None :
333
335
del self ._forwarded_ports [(proto , local_address , local_port )]
334
336
self .monitor_command (
335
337
"human-monitor-command" ,
336
338
{"command-line" : f"hostfwd_remove { proto } :{ local_address } :{ local_port } " },
337
339
)
338
340
339
- def _read (self , size = 1 , timeout = 10 , max_size = None ):
341
+ def _read (self , size : int = 1 , timeout : float = 10 , max_size : Optional [ int ] = None ) -> bytes :
340
342
ready , _ , _ = select .select ([self ._clientsocket ], [], [], timeout )
341
343
if ready :
342
344
# Collect some more data
@@ -349,8 +351,8 @@ def _read(self, size=1, timeout=10, max_size=None):
349
351
raise TIMEOUT (f"Timeout of { timeout :.2f} seconds exceeded" )
350
352
return res
351
353
352
- def _write (self , data ) :
354
+ def _write (self , data : bytes ) -> int :
353
355
return self ._clientsocket .send (data )
354
356
355
- def __str__ (self ):
357
+ def __str__ (self ) -> str :
356
358
return f"QemuDriver({ self .target .name } )"
0 commit comments