From b908aed2a17580748023d9cc691d0b4fde932d03 Mon Sep 17 00:00:00 2001 From: Jeremy Vriens Date: Tue, 23 Aug 2022 17:00:49 +0200 Subject: [PATCH 1/2] Fixed AzureLogHandler with multiple processes. --- .../opencensus/ext/azure/log_exporter/__init__.py | 9 ++++++--- opencensus/common/schedule/__init__.py | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py index 7a1d5824a..a9588dfe7 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py @@ -75,8 +75,7 @@ def __init__(self, **options): def _export(self, batch, event=None): # pragma: NO COVER try: if batch: - envelopes = [self.log_record_to_envelope(x) for x in batch] - envelopes = self.apply_telemetry_processors(envelopes) + envelopes = self.apply_telemetry_processors(batch) result = self._transmit(envelopes) # Only store files if local storage enabled if self.storage: @@ -110,7 +109,11 @@ def createLock(self): self.lock = None def emit(self, record): - self._queue.put(record, block=False) + # Convert the raw LogRecord to an envelope before putting it on the + # queue as a LogRecord object is not serializable, while an Envelope + # object is. + envelope = self.log_record_to_envelope(record) + self._queue.put(envelope, block=False) def log_record_to_envelope(self, record): raise NotImplementedError # pragma: NO COVER diff --git a/opencensus/common/schedule/__init__.py b/opencensus/common/schedule/__init__.py index ed997a620..93cb4b9b7 100644 --- a/opencensus/common/schedule/__init__.py +++ b/opencensus/common/schedule/__init__.py @@ -15,6 +15,7 @@ from six.moves import queue import logging +import multiprocessing import threading import time @@ -82,7 +83,7 @@ class QueueExitEvent(QueueEvent): class Queue(object): def __init__(self, capacity): self.EXIT_EVENT = QueueExitEvent('EXIT') - self._queue = queue.Queue(maxsize=capacity) + self._queue = multiprocessing.Queue(maxsize=capacity) def _gets(self, count, timeout): start_time = time.time() From bfb12b2c07f5900ceb0bb2e562e14896ff94e1cb Mon Sep 17 00:00:00 2001 From: JeremyVriens Date: Tue, 23 Aug 2022 17:00:49 +0200 Subject: [PATCH 2/2] Fixed AzureLogHandler with multiple processes. --- .../opencensus/ext/azure/log_exporter/__init__.py | 9 ++++++--- opencensus/common/schedule/__init__.py | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py index 7a1d5824a..a9588dfe7 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py @@ -75,8 +75,7 @@ def __init__(self, **options): def _export(self, batch, event=None): # pragma: NO COVER try: if batch: - envelopes = [self.log_record_to_envelope(x) for x in batch] - envelopes = self.apply_telemetry_processors(envelopes) + envelopes = self.apply_telemetry_processors(batch) result = self._transmit(envelopes) # Only store files if local storage enabled if self.storage: @@ -110,7 +109,11 @@ def createLock(self): self.lock = None def emit(self, record): - self._queue.put(record, block=False) + # Convert the raw LogRecord to an envelope before putting it on the + # queue as a LogRecord object is not serializable, while an Envelope + # object is. + envelope = self.log_record_to_envelope(record) + self._queue.put(envelope, block=False) def log_record_to_envelope(self, record): raise NotImplementedError # pragma: NO COVER diff --git a/opencensus/common/schedule/__init__.py b/opencensus/common/schedule/__init__.py index ed997a620..93cb4b9b7 100644 --- a/opencensus/common/schedule/__init__.py +++ b/opencensus/common/schedule/__init__.py @@ -15,6 +15,7 @@ from six.moves import queue import logging +import multiprocessing import threading import time @@ -82,7 +83,7 @@ class QueueExitEvent(QueueEvent): class Queue(object): def __init__(self, capacity): self.EXIT_EVENT = QueueExitEvent('EXIT') - self._queue = queue.Queue(maxsize=capacity) + self._queue = multiprocessing.Queue(maxsize=capacity) def _gets(self, count, timeout): start_time = time.time()