diff --git a/contrib/opencensus-ext-azure/CHANGELOG.md b/contrib/opencensus-ext-azure/CHANGELOG.md index 38556d9d3..0e49f85e3 100644 --- a/contrib/opencensus-ext-azure/CHANGELOG.md +++ b/contrib/opencensus-ext-azure/CHANGELOG.md @@ -4,6 +4,8 @@ - Fix `logger.exception` with no exception info throwing error ([#1006](https://github.com/census-instrumentation/opencensus-python/pull/1006)) +- Add `enable_local_storage` to turn on/off local storage + retry + flushing logic +([#1006](https://github.com/census-instrumentation/opencensus-python/pull/1006)) ## 1.0.7 Released 2021-01-25 diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py index d6c099619..dd14acec4 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/__init__.py @@ -101,6 +101,7 @@ def __init__(self, *args, **kwargs): _default = BaseObject( connection_string=None, + enable_local_storage=True, enable_standard_metrics=True, endpoint='https://dc.services.visualstudio.com/v2/track', export_interval=15.0, diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py index feca72979..49136a172 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/common/transport.py @@ -22,16 +22,17 @@ class TransportMixin(object): def _transmit_from_storage(self): - for blob in self.storage.gets(): - # give a few more seconds for blob lease operation - # to reduce the chance of race (for perf consideration) - if blob.lease(self.options.timeout + 5): - envelopes = blob.get() - result = self._transmit(envelopes) - if result > 0: - blob.lease(result) - else: - blob.delete() + if self.storage: + for blob in self.storage.gets(): + # give a few more seconds for blob lease operation + # to reduce the chance of race (for perf consideration) + if blob.lease(self.options.timeout + 5): + envelopes = blob.get() + result = self._transmit(envelopes) + if result > 0: + blob.lease(result) + else: + blob.delete() def _transmit(self, envelopes): """ diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py index ca7a18dd8..7aad98904 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/log_exporter/__init__.py @@ -47,13 +47,15 @@ def __init__(self, **options): raise ValueError('Sampling must be in the range: [0,1]') self.export_interval = self.options.export_interval self.max_batch_size = self.options.max_batch_size - self.storage = LocalFileStorage( - path=self.options.storage_path, - max_size=self.options.storage_max_size, - maintenance_period=self.options.storage_maintenance_period, - retention_period=self.options.storage_retention_period, - source=self.__class__.__name__, - ) + self.storage = None + if self.options.enable_local_storage: + self.storage = LocalFileStorage( + path=self.options.storage_path, + max_size=self.options.storage_max_size, + maintenance_period=self.options.storage_maintenance_period, + retention_period=self.options.storage_retention_period, + source=self.__class__.__name__, + ) self._telemetry_processors = [] self.addFilter(SamplingFilter(self.options.logging_sampling_rate)) self._queue = Queue(capacity=self.options.queue_capacity) @@ -66,7 +68,8 @@ def _export(self, batch, event=None): # pragma: NO COVER envelopes = [self.log_record_to_envelope(x) for x in batch] envelopes = self.apply_telemetry_processors(envelopes) result = self._transmit(envelopes) - if result > 0: + # Only store files if local storage enabled + if self.storage and result > 0: self.storage.put(envelopes, result) if event: if isinstance(event, QueueExitEvent): @@ -79,8 +82,10 @@ def _export(self, batch, event=None): # pragma: NO COVER event.set() def close(self): - self.storage.close() - self._worker.stop() + if self.storage: + self.storage.close() + if self._worker: + self._worker.stop() def createLock(self): self.lock = None diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/__init__.py index 17ee88c50..9937b9d53 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/metrics_exporter/__init__.py @@ -47,13 +47,15 @@ def __init__(self, **options): self.export_interval = self.options.export_interval self.max_batch_size = self.options.max_batch_size self._telemetry_processors = [] - self.storage = LocalFileStorage( - path=self.options.storage_path, - max_size=self.options.storage_max_size, - maintenance_period=self.options.storage_maintenance_period, - retention_period=self.options.storage_retention_period, - source=self.__class__.__name__, - ) + self.storage = None + if self.options.enable_local_storage: + self.storage = LocalFileStorage( + path=self.options.storage_path, + max_size=self.options.storage_max_size, + maintenance_period=self.options.storage_maintenance_period, + retention_period=self.options.storage_retention_period, + source=self.__class__.__name__, + ) self._atexit_handler = atexit.register(self.shutdown) self.exporter_thread = None super(MetricsExporter, self).__init__() @@ -68,7 +70,8 @@ def export_metrics(self, metrics): for batch in batched_envelopes: batch = self.apply_telemetry_processors(batch) result = self._transmit(batch) - if result > 0: + # Only store files if local storage enabled + if self.storage and result > 0: self.storage.put(batch, result) # If there is still room to transmit envelopes, transmit from storage @@ -141,7 +144,8 @@ def shutdown(self): if self.exporter_thread: self.exporter_thread.close() # Shutsdown storage worker - self.storage.close() + if self.storage: + self.storage.close() def new_metrics_exporter(**options): diff --git a/contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py b/contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py index 42ffe6ffe..89a6a0d02 100644 --- a/contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py +++ b/contrib/opencensus-ext-azure/opencensus/ext/azure/trace_exporter/__init__.py @@ -49,13 +49,15 @@ class AzureExporter(BaseExporter, ProcessorMixin, TransportMixin): def __init__(self, **options): self.options = Options(**options) utils.validate_instrumentation_key(self.options.instrumentation_key) - self.storage = LocalFileStorage( - path=self.options.storage_path, - max_size=self.options.storage_max_size, - maintenance_period=self.options.storage_maintenance_period, - retention_period=self.options.storage_retention_period, - source=self.__class__.__name__, - ) + self.storage = None + if self.options.enable_local_storage: + self.storage = LocalFileStorage( + path=self.options.storage_path, + max_size=self.options.storage_max_size, + maintenance_period=self.options.storage_maintenance_period, + retention_period=self.options.storage_retention_period, + source=self.__class__.__name__, + ) self._telemetry_processors = [] super(AzureExporter, self).__init__(**options) atexit.register(self._stop, self.options.grace_period) @@ -166,7 +168,8 @@ def emit(self, batch, event=None): envelopes = [self.span_data_to_envelope(sd) for sd in batch] envelopes = self.apply_telemetry_processors(envelopes) result = self._transmit(envelopes) - if result > 0: + # Only store files if local storage enabled + if self.storage and result > 0: self.storage.put(envelopes, result) if event: if isinstance(event, QueueExitEvent): @@ -179,5 +182,7 @@ def emit(self, batch, event=None): logger.exception('Exception occurred while exporting the data.') def _stop(self, timeout=None): - self.storage.close() - self._worker.stop(timeout) + if self.storage: + self.storage.close() + if self._worker: + self._worker.stop(timeout)