Skip to content

Commit a7660b5

Browse files
absentioThecave3
andauthored
fix: improve graceful shutdown and timed execution handling.
* fix: improve graceful shutdown and timed execution handling. * Removed unused exception and improved exception management for some cases by avoiding blind * fix: improve error logging in inbound and outbound connection threads * fix: update string formatting in stop_program function for consistency * fix: increase timeout for queue retrieval and improve stop method for graceful exit --------- Co-authored-by: Andrea Lacava <a.lacava@northeastern.edu>
1 parent 844f57e commit a7660b5

5 files changed

Lines changed: 80 additions & 44 deletions

File tree

examples/spectrum_dapp.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""
55

66
import argparse
7-
import multiprocessing
7+
import threading
88
import time
99

1010
from e3interface.e3_connector import E3LinkLayer, E3TransportLayer
@@ -14,12 +14,12 @@
1414

1515
def stop_program(time_to_wait, dapp: SpectrumSharingDApp):
1616
time.sleep(time_to_wait)
17-
print("Stop is called")
18-
dapp.stop()
17+
print(f"[INFO] Timer elapsed after {time_to_wait} seconds")
18+
dapp.stop_event.set()
1919
time.sleep(0.5) # to allow proper closure of the dApp threads, irrelevant to profiling
20-
print("Test completed")
20+
print("[INFO] Stopping of the dApp completed")
2121

22-
def main(args, time_to_wait: float = 60.0):
22+
def main(args):
2323
# with open(f"{LOG_DIR}/busy.txt", "w") as f:
2424
# f.close()
2525

@@ -70,24 +70,28 @@ def main(args, time_to_wait: float = 60.0):
7070
response, ranFunctionList = dapp.setup_connection()
7171

7272
if not response:
73-
raise ValueError("RAN refused Setup")
73+
raise ValueError("[WARNING] RAN refused Setup")
7474

75-
print(f"Setup Complete - RAN function available: {ranFunctionList}")
75+
print(f"[INFO] Setup Complete - RAN function available: {ranFunctionList}")
7676

7777
# atm we subscribe to all
7878
dapp.send_subscription_request(ranFunctionList)
7979

8080
if args.timed:
81-
timer = multiprocessing.Process(target=stop_program, args=(time_to_wait, dapp))
81+
timer = threading.Thread(target=stop_program, args=(args.timed, dapp), daemon=False)
8282
timer.start()
8383
else:
8484
timer = None
8585

8686
try:
8787
dapp.control_loop()
8888
finally:
89-
if args.timed:
90-
timer.kill()
89+
dapp.stop()
90+
if args.timed and timer is not None:
91+
if timer.is_alive():
92+
timer.join(timeout=2)
93+
if timer.is_alive():
94+
print("[ERROR] Timer thread did not terminate in time")
9195

9296

9397
if __name__ == "__main__":
@@ -105,7 +109,7 @@ def main(args, time_to_wait: float = 60.0):
105109
parser.add_argument('--num-subcarrier-spacing', type=int, default=30, help="Subcarrier spacing in kHz (FR1 is 30)")
106110
parser.add_argument('--e', action='store_true', default=False, help="Set if 3/4 sampling for FFT size is set on the gNB (-E option on OAI)")
107111
parser.add_argument('--center-freq', type=float, default=3.6192e9, help="Center frequency in Hz")
108-
parser.add_argument('--timed', action='store_true', default=False, help="Run with a 5-minute time limit")
112+
parser.add_argument('--timed', type=int, default=0, metavar='SECONDS', help="Run with a time limit (in seconds). 0 means no limit.")
109113
parser.add_argument('--model', type=str, default='', help="Path to the CNN model file to be used")
110114
parser.add_argument('--time-window', type=int, default=5, help="Number of input vectors to pass to the CNN model.")
111115
parser.add_argument('--moving-avg-window', type=int, default=30, help="Window size (in samples) for the moving average used to detect energy peaks in the spectrum.")

src/e3interface/e3_connector.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,19 @@ def send_setup_request(self, payload):
125125
while request_retries > 0:
126126
setup_socket = self.setup_context.socket(zmq.REQ)
127127
setup_socket.connect(self.setup_endpoint)
128-
e3_logger.debug("Send E3 Setup request")
129-
setup_socket.send(payload)
130-
131-
if (setup_socket.poll(request_timeout) & zmq.POLLIN) != 0:
132-
reply = setup_socket.recv()
133-
e3_logger.debug('ZMQ setup socket replied')
134-
return reply
128+
try:
129+
e3_logger.debug("Send E3 Setup request")
130+
setup_socket.send(payload)
131+
if (setup_socket.poll(request_timeout) & zmq.POLLIN) != 0:
132+
reply = setup_socket.recv()
133+
e3_logger.debug('ZMQ setup socket replied')
134+
setup_socket.close()
135+
return reply
136+
except KeyboardInterrupt:
137+
e3_logger.debug("Keyboard interrupt, closing E3 Setup Socket")
138+
setup_socket.setsockopt(zmq.LINGER, 0)
139+
setup_socket.close()
140+
raise
135141

136142
request_retries -= 1
137143
e3_logger.error("ZMQ setup did not reply")

src/e3interface/e3_interface.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,11 @@ def _inbound_connection(self):
149149

150150
case _:
151151
raise ValueError("Unrecognized PDU type ", pdu[0])
152-
153-
except Exception as e:
154-
e3_logger.error(f"Error in inbound thread: {e}")
152+
except KeyboardInterrupt:
153+
e3_logger.debug("Inbound thread received SIGINT, stopping")
154+
self.stop_event.set()
155+
except Exception:
156+
e3_logger.exception(f"Error in inbound thread")
155157
self.stop_event.set()
156158
finally:
157159
e3_logger.info("Close inbound connection")
@@ -188,9 +190,11 @@ def _outbound_connection(self):
188190

189191
e3_logger.debug(f"Send the pdu encoded {payload}")
190192
self.e3_connector.send(payload)
191-
192-
except Exception as e:
193-
e3_logger.error(f"Error outbound thread: {e}")
193+
except KeyboardInterrupt:
194+
e3_logger.debug("Outbound thread received SIGINT, stopping")
195+
self.stop_event.set()
196+
except Exception:
197+
e3_logger.exception(f"Error in outbound thread")
194198
self.stop_event.set()
195199
finally:
196200
e3_logger.info("Close outbound connection")
@@ -259,10 +263,19 @@ def terminate_connections(self):
259263
e3_logger.info("Stop event")
260264
self.stop_event.set()
261265

262-
if hasattr(self, "inbound_process"):
263-
self.inbound_process.join()
264-
if hasattr(self, "outbound_process"):
265-
self.outbound_process.join()
266+
if hasattr(self, "inbound_process") and self.inbound_process.is_alive():
267+
self.inbound_process.join(timeout=2)
268+
if self.inbound_process.is_alive():
269+
e3_logger.warning("Inbound process did not terminate gracefully, forcing termination")
270+
self.inbound_process.terminate()
271+
self.inbound_process.join(timeout=1)
272+
273+
if hasattr(self, "outbound_process") and self.outbound_process.is_alive():
274+
self.outbound_process.join(timeout=2)
275+
if self.outbound_process.is_alive():
276+
e3_logger.warning("Outbound process did not terminate gracefully, forcing termination")
277+
self.outbound_process.terminate()
278+
self.outbound_process.join(timeout=1)
266279

267280
self.e3_connector.dispose()
268281

src/spectrum/spectrum_dapp.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
__author__ = "Andrea Lacava"
77

88
import multiprocessing
9+
import queue
910
import time
1011
import os
1112
import numpy as np
@@ -299,8 +300,8 @@ def get_iqs_from_ran(self, dapp_identifier, data):
299300
if update_sampling:
300301
self.sampling_threshold = new_sampling_threshold
301302
dapp_logger.info(f"Custom logic updated sampling threshold to {self.sampling_threshold}")
302-
except Exception as e:
303-
dapp_logger.error(f"Error in custom control callback: {e}")
303+
except Exception:
304+
dapp_logger.exception(f"Error in custom control callback")
304305
update_sampling = False
305306

306307
prb_new = prb_blk_list.view(prb_blk_list.dtype.newbyteorder('>'))
@@ -324,17 +325,27 @@ def get_iqs_from_ran(self, dapp_identifier, data):
324325
self.control_count = 1
325326

326327
def _control_loop(self):
327-
if self.energyGui:
328-
abs_iq_av_db = self.sig_queue.get()
329-
self.energyPlotter.process_iq_data(abs_iq_av_db)
330-
331-
if self.iqPlotterGui:
332-
iq_data = self.iq_queue.get()
333-
self.iqPlotter.process_iq_data(iq_data)
334-
335-
if self.dashboard:
336-
message = self.demo_queue.get()
337-
self.demo.process_iq_data(message)
328+
# If no GUIs are enabled, just sleep to avoid busy-waiting
329+
if not (self.energyGui or self.iqPlotterGui or self.dashboard):
330+
time.sleep(1)
331+
return
332+
333+
try:
334+
if self.energyGui:
335+
abs_iq_av_db = self.sig_queue.get(timeout=0.1)
336+
self.energyPlotter.process_iq_data(abs_iq_av_db)
337+
338+
if self.iqPlotterGui:
339+
iq_data = self.iq_queue.get(timeout=0.1)
340+
self.iqPlotter.process_iq_data(iq_data)
341+
342+
if self.dashboard:
343+
message = self.demo_queue.get(timeout=0.1)
344+
self.demo.process_iq_data(message)
345+
except queue.Empty:
346+
pass # This is allowed
347+
except Exception:
348+
dapp_logger.exception("[SPECTRUM] Error in the control loop")
338349

339350
def _stop(self):
340351
if self.save_iqs:

src/visualization/dashboard.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ def index(self):
4747
return render_template("index.html")
4848

4949
def run(self):
50-
self.socketio.run(self.app, host="0.0.0.0", port=7778)
50+
self.socketio.run(self.app, host="0.0.0.0", port=7778, debug=False, use_reloader=False)
5151

5252
def _initialize_plot(self):
53-
self.run_thread = threading.Thread(target=self.run)
53+
self.run_thread = threading.Thread(target=self.run, daemon=True)
5454
self.run_thread.start()
5555

5656
def handle_initial_connection(self):
@@ -97,8 +97,10 @@ def process_iq_data(self, message):
9797

9898
def stop(self):
9999
"""Stops the server and kills the thread."""
100+
# This is probably overkill since now the run thread it's a demon,
101+
# still with timeout it does not impact the graceful exit
100102
if self.run_thread and self.run_thread.is_alive():
101-
self.run_thread.join()
103+
self.run_thread.join(timeout=1)
102104

103105
if __name__ == "__main__":
104106
server_app = Dashboard()

0 commit comments

Comments
 (0)