diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py index 7a1d5824a..a9588dfe7 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py @@ -75,8 +75,7 @@ def __init__(self, **options): def _export(self, batch, event=None): # pragma: NO COVER try: if batch: - envelopes = [self.log_record_to_envelope(x) for x in batch] - envelopes = self.apply_telemetry_processors(envelopes) + envelopes = self.apply_telemetry_processors(batch) result = self._transmit(envelopes) # Only store files if local storage enabled if self.storage: @@ -110,7 +109,11 @@ def createLock(self): self.lock = None def emit(self, record): - self._queue.put(record, block=False) + # Convert the raw LogRecord to an envelope before putting it on the + # queue as a LogRecord object is not serializable, while an Envelope + # object is. + envelope = self.log_record_to_envelope(record) + self._queue.put(envelope, block=False) def log_record_to_envelope(self, record): raise NotImplementedError # pragma: NO COVER diff --git a/opencensus/common/schedule/__init__.py b/opencensus/common/schedule/__init__.py index ed997a620..93cb4b9b7 100644 --- a/opencensus/common/schedule/__init__.py +++ b/opencensus/common/schedule/__init__.py @@ -15,6 +15,7 @@ from six.moves import queue import logging +import multiprocessing import threading import time @@ -82,7 +83,7 @@ class QueueExitEvent(QueueEvent): class Queue(object): def __init__(self, capacity): self.EXIT_EVENT = QueueExitEvent('EXIT') - self._queue = queue.Queue(maxsize=capacity) + self._queue = multiprocessing.Queue(maxsize=capacity) def _gets(self, count, timeout): start_time = time.time()