Skip to content

Commit 4b98916

Browse files
authored
Merge pull request #684 from cmu-delphi/krivard/meta-logging
Add pairwise logging for meta
2 parents bc5aa3e + 447f0c3 commit 4b98916

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

src/acquisition/covidcast/database.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
257257
n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server
258258
# NOTE: this may present a small problem if this job runs on different hardware than the db,
259259
# but we should not run into that issue in prod.
260+
logger.info(f"using {n_threads} workers")
260261

261262
srcsigs = Queue() # multi-consumer threadsafe!
262263

@@ -305,22 +306,24 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
305306
meta_lock = threading.Lock()
306307

307308
def worker():
308-
logger.info("starting thread: " + threading.current_thread().name)
309+
name = threading.current_thread().name
310+
logger.info("starting thread", thread=name)
309311
# set up new db connection for thread
310312
worker_dbc = Database()
311313
worker_dbc.connect(connector_impl=self._connector_impl)
312314
w_cursor = worker_dbc._cursor
313315
try:
314316
while True:
315317
(source, signal) = srcsigs.get_nowait() # this will throw the Empty caught below
318+
logger.info("starting pair", thread=name, pair=f"({source}, {signal})")
316319
w_cursor.execute(inner_sql, (source, signal))
317320
with meta_lock:
318321
meta.extend(list(
319322
dict(zip(w_cursor.column_names, x)) for x in w_cursor
320323
))
321324
srcsigs.task_done()
322325
except Empty:
323-
logger.info("no jobs left, thread terminating: " + threading.current_thread().name)
326+
logger.info("no jobs left, thread terminating", thread=name)
324327
finally:
325328
worker_dbc.disconnect(False) # cleanup
326329

@@ -334,7 +337,7 @@ def worker():
334337
logger.info("jobs complete")
335338
for t in threads:
336339
t.join()
337-
logger.error("threads terminated")
340+
logger.info("all threads terminated")
338341

339342
# sort the metadata because threaded workers dgaf
340343
sorting_fields = "data_source signal time_type geo_type".split()

0 commit comments

Comments
 (0)