diff --git a/corehq/apps/api/odata/utils.py b/corehq/apps/api/odata/utils.py index f09608e12274..cee2b6447997 100644 --- a/corehq/apps/api/odata/utils.py +++ b/corehq/apps/api/odata/utils.py @@ -2,8 +2,7 @@ from collections import namedtuple from corehq.apps.export.models import ExportInstance -from corehq.util.datadog.gauges import datadog_counter -from corehq.util.datadog.utils import bucket_value +from corehq.util.metrics import metrics_histogram FieldMetadata = namedtuple('FieldMetadata', ['name', 'odata_type']) @@ -64,14 +63,16 @@ def record_feed_access_in_datadog(request, config_id, duration, response): column_count = len(rows[0]) except IndexError: column_count = 0 - datadog_counter('commcare.odata_feed.test_v3', tags=[ - 'domain:{}'.format(request.domain), - 'feed_id:{}'.format(config_id), - 'feed_type:{}'.format(config.type), - 'username:{}'.format(username), - 'row_count:{}'.format(row_count), - 'column_count:{}'.format(column_count), - 'size:{}'.format(len(response.content)), - 'duration:{}'.format(duration), - 'duration_bucket:{}'.format(bucket_value(duration, (1, 5, 20, 60, 120, 300, 600), 's')), - ]) + metrics_histogram( + 'commcare.odata_feed.test_v3', duration, + bucket_tag='duration_bucket', buckets=(1, 5, 20, 60, 120, 300, 600), bucket_unit='s', + tags={ + 'domain': request.domain, + 'feed_id': config_id, + 'feed_type': config.type, + 'username': username, + 'row_count': row_count, + 'column_count': column_count, + 'size': len(response.content) + } + ) diff --git a/corehq/apps/case_importer/tasks.py b/corehq/apps/case_importer/tasks.py index 14d63356b5c6..087ba77a8e30 100644 --- a/corehq/apps/case_importer/tasks.py +++ b/corehq/apps/case_importer/tasks.py @@ -1,21 +1,16 @@ -from celery import states -from celery.exceptions import Ignore from celery.schedules import crontab from celery.task import task -from soil.progress import update_task_state - from corehq.apps.hqadmin.tasks import ( AbnormalUsageAlert, send_abnormal_usage_alert, ) -from corehq.util.datadog.gauges import datadog_gauge_task - from .do_import import do_import from .exceptions import ImporterError from .tracking.analytics import get_case_upload_files_total_bytes from .tracking.case_upload_tracker import CaseUpload from .util import get_importer_error_message, exit_celery_with_error_message +from ...util.metrics import metrics_gauge_task @task(serializer='pickle', queue='case_import_queue') @@ -64,7 +59,7 @@ def _alert_on_result(result, domain): send_abnormal_usage_alert.delay(alert) -total_bytes = datadog_gauge_task( +total_bytes = metrics_gauge_task( 'commcare.case_importer.files.total_bytes', get_case_upload_files_total_bytes, run_every=crontab(minute=0) diff --git a/corehq/apps/hqwebapp/tasks.py b/corehq/apps/hqwebapp/tasks.py index f08cb646ae0d..35fdef85381f 100644 --- a/corehq/apps/hqwebapp/tasks.py +++ b/corehq/apps/hqwebapp/tasks.py @@ -5,9 +5,10 @@ from celery.task import task, periodic_task from corehq.util.bounced_email_manager import BouncedEmailManager +from corehq.util.metrics import metrics_gauge_task from dimagi.utils.logging import notify_exception -from corehq.util.datadog.gauges import datadog_gauge_task, datadog_track_errors +from corehq.util.datadog.gauges import datadog_track_errors from corehq.util.log import send_HTML_email @@ -129,5 +130,5 @@ def get_maintenance_alert_active(): return 1 if MaintenanceAlert.get_latest_alert() else 0 -datadog_gauge_task('commcare.maintenance_alerts.active', get_maintenance_alert_active, +metrics_gauge_task('commcare.maintenance_alerts.active', get_maintenance_alert_active, run_every=crontab(minute=1)) diff --git a/corehq/apps/receiverwrapper/views.py b/corehq/apps/receiverwrapper/views.py index 10922e0b4e45..6fe383b8465b 100644 --- a/corehq/apps/receiverwrapper/views.py +++ b/corehq/apps/receiverwrapper/views.py @@ -1,13 +1,9 @@ -import logging import os from django.http import HttpResponseBadRequest, HttpResponseForbidden from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_POST -from couchdbkit import ResourceNotFound -from tastypie.http import HttpTooManyRequests - import couchforms from casexml.apps.case.xform import get_case_updates, is_device_report from couchforms import openrosa_response @@ -50,13 +46,10 @@ convert_xform_to_json, should_use_sql_backend, ) -from corehq.util.datadog.gauges import datadog_counter, datadog_gauge -from corehq.util.datadog.metrics import ( - MULTIMEDIA_SUBMISSION_ERROR_COUNT, - XFORM_LOCKED_COUNT, -) -from corehq.util.datadog.utils import bucket_value +from corehq.util.metrics import metrics_counter, metrics_histogram from corehq.util.timer import TimingContext +from couchdbkit import ResourceNotFound +from tastypie.http import HttpTooManyRequests PROFILE_PROBABILITY = float(os.getenv('COMMCARE_PROFILE_SUBMISSION_PROBABILITY', 0)) PROFILE_LIMIT = os.getenv('COMMCARE_PROFILE_SUBMISSION_LIMIT') @@ -70,10 +63,10 @@ def _process_form(request, domain, app_id, user_id, authenticated, if rate_limit_submission(domain): return HttpTooManyRequests() - metric_tags = [ - 'backend:sql' if should_use_sql_backend(domain) else 'backend:couch', - 'domain:{}'.format(domain), - ] + metric_tags = { + 'backend': 'sql' if should_use_sql_backend(domain) else 'couch', + 'domain': domain + } try: instance, attachments = couchforms.get_instance_and_attachment(request) @@ -85,9 +78,11 @@ def _process_form(request, domain, app_id, user_id, authenticated, except: meta = {} + metrics_counter('commcare.corrupt_multimedia_submissions', tags={ + 'domain': domain, 'authenticated': authenticated + }) return _submission_error( - request, "Received a submission with POST.keys()", - MULTIMEDIA_SUBMISSION_ERROR_COUNT, metric_tags, + request, "Received a submission with POST.keys()", metric_tags, domain, app_id, user_id, authenticated, meta, ) @@ -133,8 +128,11 @@ def _process_form(request, domain, app_id, user_id, authenticated, try: result = submission_post.run() except XFormLockError as err: + metrics_counter('commcare.xformlocked.count', tags={ + 'domain': domain, 'authenticated': authenticated + }) return _submission_error( - request, "XFormLockError: %s" % err, XFORM_LOCKED_COUNT, + request, "XFormLockError: %s" % err, metric_tags, domain, app_id, user_id, authenticated, status=423, notify=False, ) @@ -145,7 +143,7 @@ def _process_form(request, domain, app_id, user_id, authenticated, return response -def _submission_error(request, message, count_metric, metric_tags, +def _submission_error(request, message, metric_tags, domain, app_id, user_id, authenticated, meta=None, status=400, notify=True): """Notify exception, datadog count, record metrics, construct response @@ -157,7 +155,6 @@ def _submission_error(request, message, count_metric, metric_tags, "domain:{}".format(domain), "authenticated:{}".format(authenticated), ] - datadog_counter(count_metric, tags=details) if notify: details.extend([ "user_id:{}".format(user_id), @@ -172,24 +169,28 @@ def _submission_error(request, message, count_metric, metric_tags, def _record_metrics(tags, submission_type, response, timer=None, xform=None): + tags.update({ + 'submission_type': submission_type, + 'status_code': response.status_code + }) + if xform and xform.metadata and xform.metadata.timeEnd and xform.received_on: lag = xform.received_on - xform.metadata.timeEnd lag_days = lag.total_seconds() / 86400 - tags += [ - 'lag:%s' % bucket_value(lag_days, (1, 2, 4, 7, 14, 31, 90), 'd') - ] - - tags += [ - 'submission_type:{}'.format(submission_type), - 'status_code:{}'.format(response.status_code) - ] + metrics_histogram( + 'commcare.xform_submissions.lag.days', lag_days, + bucket_tag='lag', buckets=(1, 2, 4, 7, 14, 31, 90), bucket_unit='d', + tags=tags + ) if timer: - tags += [ - 'duration:%s' % bucket_value(timer.duration, (1, 5, 20, 60, 120, 300, 600), 's'), - ] + metrics_histogram( + 'commcare.xform_submissions.duration.seconds', timer.duration, + bucket_tag='duration', buckets=(1, 5, 20, 60, 120, 300, 600), bucket_unit='s', + tags=tags + ) - datadog_counter('commcare.xform_submissions.count', tags=tags) + metrics_counter('commcare.xform_submissions.count', tags=tags) @location_safe diff --git a/corehq/apps/sms/tasks.py b/corehq/apps/sms/tasks.py index 968551e66eeb..254579db4273 100644 --- a/corehq/apps/sms/tasks.py +++ b/corehq/apps/sms/tasks.py @@ -7,6 +7,7 @@ from celery.schedules import crontab +from corehq.util.metrics import metrics_gauge_task from dimagi.utils.couch import ( CriticalSection, get_redis_client, @@ -50,7 +51,7 @@ from corehq.apps.users.models import CommCareUser, CouchUser from corehq.messaging.util import use_phone_entries from corehq.util.celery_utils import no_result_task -from corehq.util.datadog.gauges import datadog_counter, datadog_gauge_task +from corehq.util.datadog.gauges import datadog_counter from corehq.util.timezones.conversions import ServerTime MAX_TRIAL_SMS = 50 @@ -588,4 +589,4 @@ def queued_sms(): return QueuedSMS.objects.count() -datadog_gauge_task('commcare.sms.queued', queued_sms, run_every=crontab()) +metrics_gauge_task('commcare.sms.queued', queued_sms, run_every=crontab()) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 5382fbdca840..1a6c1fc12431 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -6,6 +6,7 @@ from celery.task import periodic_task, task from celery.utils.log import get_task_logger +from corehq.util.metrics import metrics_gauge_task from dimagi.utils.couch import get_redis_lock from dimagi.utils.couch.undo import DELETED_SUFFIX @@ -25,7 +26,6 @@ from corehq.util.datadog.gauges import ( datadog_bucket_timer, datadog_counter, - datadog_gauge_task, ) from corehq.util.datadog.utils import make_buckets_from_timedeltas from corehq.util.soft_assert import soft_assert @@ -137,7 +137,7 @@ def process_repeat_record(repeat_record): logging.exception('Failed to process repeat record: {}'.format(repeat_record._id)) -repeaters_overdue = datadog_gauge_task( +repeaters_overdue = metrics_gauge_task( 'commcare.repeaters.overdue', get_overdue_repeat_record_count, run_every=crontab() # every minute diff --git a/corehq/util/datadog/gauges.py b/corehq/util/datadog/gauges.py index bb4788491192..744fa9ebc057 100644 --- a/corehq/util/datadog/gauges.py +++ b/corehq/util/datadog/gauges.py @@ -9,23 +9,6 @@ from corehq.util.timer import TimingContext -def datadog_gauge_task(name, fn, run_every, enforce_prefix='commcare'): - """ - helper for easily registering datadog gauges to run periodically - - To update a datadog gauge on a schedule based on the result of a function - just add to your app's tasks.py: - - my_calculation = datadog_gauge_task('my.datadog.metric', my_calculation_function, - run_every=crontab(minute=0)) - - """ - _enforce_prefix(name, enforce_prefix) - - datadog_gauge = _DatadogGauge(name, fn, run_every) - return datadog_gauge.periodic_task() - - def datadog_histogram(name, value, enforce_prefix='commcare', tags=None): """ Usage: Used to track the statistical distribution of a set of values over a statsd flush period. @@ -100,23 +83,6 @@ def new_stop(name=None): return timer -class _DatadogGauge(object): - - def __init__(self, name, fn, run_every): - self.name = name - self.fn = fn - self.run_every = run_every - - def periodic_task(self): - @periodic_task(serializer='pickle', queue='background_queue', run_every=self.run_every, - acks_late=True, ignore_result=True) - @wraps(self.fn) - def inner(*args, **kwargs): - statsd.gauge(self.name, self.fn(*args, **kwargs)) - - return inner - - def _enforce_prefix(name, prefix): soft_assert(fail_if_debug=True).call( not prefix or name.split('.')[0] == prefix, diff --git a/corehq/util/datadog/metrics.py b/corehq/util/datadog/metrics.py index 545be07a5a85..d56dea5154db 100644 --- a/corehq/util/datadog/metrics.py +++ b/corehq/util/datadog/metrics.py @@ -2,6 +2,4 @@ ERROR_COUNT = 'commcare.error.count' REPEATER_ERROR_COUNT = 'commcare.repeaters.error' REPEATER_SUCCESS_COUNT = 'commcare.repeaters.success' -MULTIMEDIA_SUBMISSION_ERROR_COUNT = 'commcare.corrupt-multimedia-submission.error.count' DATE_OPENED_CASEBLOCK_ERROR_COUNT = 'commcare.date-opened-caseblock-bug.error.count' -XFORM_LOCKED_COUNT = 'commcare.xformlocked.count' diff --git a/corehq/util/metrics/__init__.py b/corehq/util/metrics/__init__.py new file mode 100644 index 000000000000..5421865d9b53 --- /dev/null +++ b/corehq/util/metrics/__init__.py @@ -0,0 +1,178 @@ +""" +Metrics collection +****************** + +.. contents:: + :local: + +This package exposes functions and utilities to record metrics in CommCare. These metrics +are exported / exposed to the configured metrics providers. Supported providers are: + + * Datadog + * Prometheus + +Providers are enabled using the `METRICS_PROVIDER` setting. Multiple providers can be +enabled concurrently: + +:: + + METRICS_PROVIDERS = [ + 'corehq.util.metrics.prometheus.PrometheusMetrics', + 'corehq.util.metrics.datadog.DatadogMetrics', + ] + +If no metrics providers are configured CommCare will log all metrics to the `commcare.metrics` logger +at the DEBUG level. + +Metric tagging +============== +Metrics may be tagged by passing a dictionary of tag names and values. Tags should be used +to add dimensions to a metric e.g. request type, response status. + +Tags should not originate from unbounded sources or sources with high dimensionality such as +timestamps, user IDs, request IDs etc. Ideally a tag should not have more than 10 possible values. + +Read more about tagging: + +* https://prometheus.io/docs/practices/naming/#labels +* https://docs.datadoghq.com/tagging/ + +Metric Types +============ + +Counter metric +'''''''''''''' + +A counter is a cumulative metric that represents a single monotonically increasing counter +whose value can only increase or be reset to zero on restart. For example, you can use a +counter to represent the number of requests served, tasks completed, or errors. + +Do not use a counter to expose a value that can decrease. For example, do not use a counter +for the number of currently running processes; instead use a gauge. + +:: + + metrics_counter('commcare.case_import.count', 1, tags={'domain': domain}) + + +Gauge metric +'''''''''''' + +A gauge is a metric that represents a single numerical value that can arbitrarily go up and down. + +Gauges are typically used for measured values like temperatures or current memory usage, +but also "counts" that can go up and down, like the number of concurrent requests. + +:: + + metrics_gauge('commcare.case_import.queue_length', queue_length) + +For regular reporting of a gauge metric there is the `metrics_gauge_task` function: + +.. autofunction:: corehq.util.metrics.metrics_gauge_task + +Histogram metric +'''''''''''''''' + +A histogram samples observations (usually things like request durations or response sizes) +and counts them in configurable buckets. + +:: + + metrics_histogram( + 'commcare.case_import.duration', timer_duration, + bucket_tag='size', buckets=[10, 50, 200, 1000], bucket_unit='s', + tags={'domain': domain} + ) + +Histograms are recorded differently in the different providers. + +.. automethod:: corehq.util.metrics.datadog.DatadogMetrics._histogram + +.. automethod:: corehq.util.metrics.prometheus.PrometheusMetrics._histogram + + +Other Notes +=========== + +* All metrics must use the prefix 'commcare.' +""" +from functools import wraps +from typing import Iterable + +from celery.task import periodic_task + +import settings +from corehq.util.metrics.metrics import DebugMetrics, DelegatedMetrics, DEFAULT_BUCKETS, _enforce_prefix +from dimagi.utils.modules import to_function + +__all__ = [ + 'metrics_counter', + 'metrics_gauge', + 'metrics_histogram', + 'metrics_gauge_task', +] + +_metrics = None + + +def _get_metrics_provider(): + global _metrics + if not _metrics: + providers = [] + for provider_path in settings.METRICS_PROVIDERS: + provider = to_function(provider_path)() + providers.append(provider) + + if not providers: + _metrics = DebugMetrics() + elif len(providers) > 1: + _metrics = DelegatedMetrics(providers) + else: + _metrics = providers[0] + return _metrics + + +def metrics_counter(name: str, value: float = 1, tags: dict = None, documentation: str = ''): + provider = _get_metrics_provider() + provider.counter(name, value, tags, documentation) + + +def metrics_gauge(name: str, value: float, tags: dict = None, documentation: str = ''): + provider = _get_metrics_provider() + provider.gauge(name, value, tags, documentation) + + +def metrics_histogram( + name: str, value: float, + bucket_tag: str, buckets: Iterable[int] = DEFAULT_BUCKETS, bucket_unit: str = '', + tags: dict = None, documentation: str = ''): + provider = _get_metrics_provider() + provider.histogram(name, value, bucket_tag, buckets, bucket_unit, tags, documentation) + + +def metrics_gauge_task(name, fn, run_every): + """ + Helper for easily registering gauges to run periodically + + To update a gauge on a schedule based on the result of a function + just add to your app's tasks.py: + + :: + + my_calculation = metrics_gauge_task( + 'commcare.my.metric', my_calculation_function, run_every=crontab(minute=0) + ) + + """ + _enforce_prefix(name, 'commcare') + + @periodic_task(serializer='pickle', queue='background_queue', run_every=run_every, + acks_late=True, ignore_result=True) + @wraps(fn) + def inner(*args, **kwargs): + from corehq.util.metrics import metrics_gauge + # TODO: make this use prometheus push gateway + metrics_gauge(name, fn(*args, **kwargs)) + + return inner diff --git a/corehq/util/metrics/datadog.py b/corehq/util/metrics/datadog.py new file mode 100644 index 000000000000..461c94b4048d --- /dev/null +++ b/corehq/util/metrics/datadog.py @@ -0,0 +1,110 @@ +import logging +from typing import List + +from django.conf import settings + +from corehq.util.datadog.utils import bucket_value +from corehq.util.metrics.metrics import HqMetrics +from datadog.dogstatsd.base import DogStatsd + +datadog_logger = logging.getLogger('datadog') + +COMMON_TAGS = ['environment:{}'.format(settings.SERVER_ENVIRONMENT)] + +statsd = DogStatsd(constant_tags=COMMON_TAGS) + + +class DatadogMetrics(HqMetrics): + """Datadog Metrics Provider + + Settings: + * DATADOG_API_KEY + * DATADOG_APP_KEY + """ + + def __init__(self): + if settings.UNIT_TESTING or settings.DEBUG or 'ddtrace.contrib.django' not in settings.INSTALLED_APPS: + try: + from ddtrace import tracer + tracer.enabled = False + except ImportError: + pass + + if settings.UNIT_TESTING: + return + + if not settings.DATADOG_API_KEY or not settings.DATADOG_APP_KEY: + raise Exception( + "Datadog not configured. " + "Set DATADOG_API_KEY and DATADOG_APP_KEY in settings or update METRICS_PROVIDERS " + "to remove the Datadog provider." + ) + + try: + from datadog import initialize + except ImportError: + pass + else: + initialize(settings.DATADOG_API_KEY, settings.DATADOG_APP_KEY) + + def _counter(self, name: str, value: float, tags: dict = None, documentation: str = ''): + """Although this is submitted as a COUNT the Datadog app represents these as a RATE. + See https://docs.datadoghq.com/developers/metrics/types/?tab=rate#definition""" + dd_tags = _format_tags(tags) + _datadog_record(statsd.increment, name, value, dd_tags) + + def _gauge(self, name: str, value: float, tags: dict = None, documentation: str = ''): + """See https://docs.datadoghq.com/developers/metrics/types/?tab=gauge#definition""" + dd_tags = _format_tags(tags) + _datadog_record(statsd.gauge, name, value, dd_tags) + + def _histogram(self, name: str, value: float, + bucket_tag: str, buckets: List[int], bucket_unit: str = '', + tags: dict = None, documentation: str = ''): + """ + This implementation of histogram uses tagging to record the buckets. + It does not use the Datadog Histogram metric type. + + The metric itself will be incremented by 1 on each call. The value + passed to `metrics_histogram` will be used to create the bucket tag. + + For example: + + :: + + h = metrics_histogram( + 'commcare.request.duration', 1.4, + bucket_tag='duration', buckets=[1,2,3], bucket_units='ms', + tags=tags + ) + + # resulting metrics + # commcare.request.duration:1|c|#duration:lt_2ms + + For more explanation about why this implementation was chosen see: + + * https://github.com/dimagi/commcare-hq/pull/17080 + * https://github.com/dimagi/commcare-hq/pull/17030#issuecomment-315794700 + """ + tags = _format_tags(tags) + if not tags: + tags = [] + bucket = bucket_value(value, buckets, bucket_unit) + tags.append(f'{bucket_tag}:{bucket}') + _datadog_record(statsd.increment, name, 1, tags) + + +def _format_tags(tag_values: dict): + if not tag_values: + return None + + return [ + f'{name}:{value}' for name, value in tag_values.items() + ] + + +def _datadog_record(fn, name, value, tags=None): + try: + fn(name, value, tags=tags) + except Exception: + datadog_logger.exception('Unable to record Datadog stats') diff --git a/corehq/util/metrics/metrics.py b/corehq/util/metrics/metrics.py new file mode 100644 index 000000000000..d181927f8fb4 --- /dev/null +++ b/corehq/util/metrics/metrics.py @@ -0,0 +1,99 @@ +import abc +import logging +import re +from abc import abstractmethod +from typing import List + +from corehq.util.soft_assert import soft_assert +from prometheus_client.utils import INF + +METRIC_NAME_RE = re.compile(r'^[a-zA-Z_:.][a-zA-Z0-9_:.]*$') +METRIC_TAG_NAME_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') +RESERVED_METRIC_TAG_NAME_RE = re.compile(r'^__.*$') +RESERVED_METRIC_TAG_NAMES = ['quantile', 'le'] + + +logger = logging.getLogger('commcare.metrics') + + +def _enforce_prefix(name, prefix): + soft_assert(fail_if_debug=True).call( + not prefix or name.startswith(prefix), + "Did you mean to call your metric 'commcare.{}'? ".format(name)) + + +def _validate_tag_names(tag_names): + tag_names = set(tag_names) + for l in tag_names: + if not METRIC_TAG_NAME_RE.match(l): + raise ValueError('Invalid metric tag name: ' + l) + if RESERVED_METRIC_TAG_NAME_RE.match(l): + raise ValueError('Reserved metric tag name: ' + l) + if l in RESERVED_METRIC_TAG_NAMES: + raise ValueError('Reserved metric tag name: ' + l) + return tag_names + + +DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF) + + +class HqMetrics(metaclass=abc.ABCMeta): + def initialize(self): + pass + + def counter(self, name: str, value: float = 1, tags: dict = None, documentation: str = ''): + _enforce_prefix(name, 'commcare') + _validate_tag_names(tags) + self._counter(name, value, tags, documentation) + + def gauge(self, name: str, value: float, tags: dict = None, documentation: str = ''): + _enforce_prefix(name, 'commcare') + _validate_tag_names(tags) + self._gauge(name, value, tags, documentation) + + def histogram(self, name: str, value: float, + bucket_tag: str, buckets: List[int] = DEFAULT_BUCKETS, bucket_unit: str = '', + tags: dict = None, documentation: str = ''): + """Create a histogram metric. Histogram implementations differ between provider. See provider + implementations for details. + """ + _enforce_prefix(name, 'commcare') + _validate_tag_names(tags) + self._histogram(name, value, bucket_tag, buckets, bucket_unit, tags, documentation) + + @abstractmethod + def _counter(self, name, value, tags, documentation): + raise NotImplementedError + + @abstractmethod + def _gauge(self, name, value, tags, documentation): + raise NotImplementedError + + @abstractmethod + def _histogram(self, name, value, bucket_tag, buckets, bucket_unit, tags, documentation): + raise NotImplementedError + + +class DebugMetrics: + def __getattr__(self, item): + if item in ('counter', 'gauge', 'histogram'): + def _check(name, value, *args, **kwargs): + tags = kwargs.get('tags', {}) + _enforce_prefix(name, 'commcare') + _validate_tag_names(tags) + logger.debug("[%s] %s %s %s", item, name, tags, value) + return _check + raise AttributeError(item) + + +class DelegatedMetrics: + def __init__(self, delegates): + self.delegates = delegates + + def __getattr__(self, item): + if item in ('counter', 'gauge', 'histogram'): + def _record_metric(*args, **kwargs): + for delegate in self.delegates: + getattr(delegate, item)(*args, **kwargs) + return _record_metric + raise AttributeError(item) diff --git a/corehq/util/metrics/prometheus.py b/corehq/util/metrics/prometheus.py new file mode 100644 index 000000000000..00d66b24711a --- /dev/null +++ b/corehq/util/metrics/prometheus.py @@ -0,0 +1,65 @@ +from typing import List + +from prometheus_client import Counter as PCounter +from prometheus_client import Gauge as PGauge +from prometheus_client import Histogram as PHistogram + +from corehq.util.metrics.metrics import HqMetrics + + +class PrometheusMetrics(HqMetrics): + """Prometheus Metrics Provider""" + + def __init__(self): + self._metrics = {} + + def _counter(self, name: str, value: float = 1, tags: dict = None, documentation: str = ''): + """See https://prometheus.io/docs/concepts/metric_types/#counter""" + self._get_metric(PCounter, name, tags, documentation).inc(value) + + def _gauge(self, name: str, value: float, tags: dict = None, documentation: str = ''): + """See https://prometheus.io/docs/concepts/metric_types/#histogram""" + self._get_metric(PGauge, name, tags, documentation).set(value) + + def _histogram(self, name: str, value: float, bucket_tag: str, buckets: List[int], bucket_unit: str = '', + tags: dict = None, documentation: str = ''): + """ + A cumulative histogram with a base metric name of exposes multiple time series + during a scrape: + + * cumulative counters for the observation buckets, exposed as + `_bucket{le=""}` + * the total sum of all observed values, exposed as `_sum` + * the count of events that have been observed, exposed as `_count` + (identical to `_bucket{le="+Inf"}` above) + + For example + :: + + h = metrics_histogram( + 'commcare.request_duration', 1.4, + bucket_tag='duration', buckets=[1,2,3], bucket_units='ms', + tags=tags + ) + + # resulting metrics + # commcare_request_duration_bucket{...tags..., le="1.0"} 0.0 + # commcare_request_duration_bucket{...tags..., le="2.0"} 1.0 + # commcare_request_duration_bucket{...tags..., le="3.0"} 1.0 + # commcare_request_duration_bucket{...tags..., le="+Inf"} 1.0 + # commcare_request_duration_sum{...tags...} 1.4 + # commcare_request_duration_count{...tags...} 1.0 + + See https://prometheus.io/docs/concepts/metric_types/#histogram""" + self._get_metric(PHistogram, name, tags, documentation, buckets=buckets).observe(value) + + def _get_metric(self, metric_type, name, tags, documentation, **kwargs): + name = name.replace('.', '_') + metric = self._metrics.get(name) + if not metric: + tags = tags or {} + metric = metric_type(name, documentation, labelnames=tags.keys(), **kwargs) + self._metrics[name] = metric + else: + assert metric.__class__ == metric_type + return metric.labels(**tags) if tags else metric diff --git a/corehq/util/metrics/tests/__init__.py b/corehq/util/metrics/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/corehq/util/metrics/tests/test_metrics.py b/corehq/util/metrics/tests/test_metrics.py new file mode 100644 index 000000000000..50b5ad514b51 --- /dev/null +++ b/corehq/util/metrics/tests/test_metrics.py @@ -0,0 +1,179 @@ +from typing import Dict, Tuple + +from django.test import SimpleTestCase + +from corehq.util.metrics.datadog import DatadogMetrics +from corehq.util.metrics.prometheus import PrometheusMetrics +from corehq.util.metrics.tests.utils import patch_datadog +from prometheus_client.samples import Sample +from prometheus_client.utils import INF + + +class _TestMetrics(SimpleTestCase): + provider_class = None + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.provider = cls.provider_class() + + def test_counter(self): + self.provider.counter('commcare.test.counter', tags={'t1': 'a', 't2': 'b'}) + self.provider.counter('commcare.test.counter', 2, tags={'t1': 'c', 't2': 'b'}) + self.provider.counter('commcare.test.counter', tags={'t1': 'c', 't2': 'b'}) + self.assertCounterMetric('commcare.test.counter', { + (('t1', 'a'), ('t2', 'b')): 1, + (('t1', 'c'), ('t2', 'b')): 3, + }) + + def test_gauge(self): + self.provider.gauge('commcare.test.gauge', 4.2, tags={'t1': 'a', 't2': 'b'}) + self.provider.gauge('commcare.test.gauge', 2, tags={'t1': 'c', 't2': 'b'}) + self.provider.gauge('commcare.test.gauge', 5, tags={'t1': 'c', 't2': 'b'}) + self.assertGaugeMetric('commcare.test.gauge', { + (('t1', 'a'), ('t2', 'b')): 4.2, + (('t1', 'c'), ('t2', 'b')): 5, + }) + + def assertCounterMetric(self, metric: str, expected: Dict[Tuple[Tuple[str, str], ...], float]): + """ + :param metric: metric class + :param expected: dict mapping tag tuples to metric values + """ + raise NotImplementedError + + def assertGaugeMetric(self, metric: str, expected: Dict[Tuple[Tuple[str, str], ...], float]): + """ + :param metric: metric class + :param expected: dict mapping tag tuples to metric values + """ + raise NotImplementedError + + +class TestDatadogMetrics(_TestMetrics): + provider_class = DatadogMetrics + + def setUp(self) -> None: + super().setUp() + self.patch = patch_datadog() + self.recorded_metrics = self.patch.__enter__() + + def tearDown(self) -> None: + self.patch.__exit__(None, None, None) + super().tearDown() + + def test_histogram(self): + for value in (0.2, 0.7, 2.5): + self.provider.histogram( + 'commcare.test.histogram', value, 'duration', + buckets=[1, 2, 3], bucket_unit='ms', tags={'t1': 'a', 't2': 'b'} + ) + for value in (2, 5): + self.provider.histogram( + 'commcare.test.histogram', value, 'duration', + buckets=[1, 2, 3], bucket_unit='ms', tags={'t1': 'c', 't2': 'b'} + ) + self.assertHistogramMetric('commcare.test.histogram', { + (('t1', 'a'), ('t2', 'b')): {1: 2, 3: 1}, + (('t1', 'c'), ('t2', 'b')): {3: 1, INF: 1} + }, 'duration', [1, 2, 3], 'ms') + + def assertCounterMetric(self, metric_name, expected): + self.assertEqual({key[0] for key in self.recorded_metrics}, {metric_name}) + actual = { + key[1]: sum(val) for key, val in self.recorded_metrics.items() + } + self.assertDictEqual(actual, expected) + + def assertGaugeMetric(self, metric_name, expected): + self.assertEqual({key[0] for key in self.recorded_metrics}, {metric_name}) + actual = { + key[1]: val[-1] for key, val in self.recorded_metrics.items() + } + self.assertDictEqual(actual, expected) + + def assertHistogramMetric(self, metric_name, expected, bucket_tag, buckets, bucket_unit): + self.assertEqual({key[0] for key in self.recorded_metrics}, {metric_name}) + expected_samples = {} + for tags, expected_buckets in expected.items(): + for bucket, val in expected_buckets.items(): + prefix = 'lt' + if bucket == INF: + bucket = buckets[-1] + prefix = 'over' + dd_bucket_tag = (bucket_tag, f'{prefix}_{bucket:03d}{bucket_unit}') + expected_samples[tuple(sorted(tags + (dd_bucket_tag,)))] = val + + actual = { + key[1]: sum(val) for key, val in self.recorded_metrics.items() + } + self.assertDictEqual(actual, expected_samples) + + +class TestPrometheusMetrics(_TestMetrics): + provider_class = PrometheusMetrics + + def test_histogram(self): + for value in (0.2, 0.7, 2.5): + self.provider.histogram( + 'commcare_test_histogram', value, 'duration', + buckets=[1, 2, 3], bucket_unit='ms', tags={'t1': 'a', 't2': 'b'} + ) + for value in (2, 5): + self.provider.histogram( + 'commcare_test_histogram', value, 'duration', + buckets=[1, 2, 3], bucket_unit='ms', tags={'t1': 'c', 't2': 'b'} + ) + self.assertHistogramMetric('commcare_test_histogram', { + (('t1', 'a'), ('t2', 'b')): {1: 2, 3: 1}, + (('t1', 'c'), ('t2', 'b')): {2: 1, INF: 1} + }, [1, 2, 3]) + + def _samples_to_dics(self, samples, filter_name=None): + """Convert a Sample tuple into a dict((name, (labels tuple)) -> value)""" + return { + tuple(sorted(sample.labels.items())): sample.value + for sample in samples + if not filter_name or sample.name == filter_name + } + + def assertGaugeMetric(self, metric_name, expected): + metric_name = metric_name.replace('.', '_') + metric = self.provider._metrics[metric_name] + [collected] = metric.collect() + actual = self._samples_to_dics(collected.samples) + self.assertDictEqual(actual, expected) + + def assertCounterMetric(self, metric_name, expected): + metric_name = metric_name.replace('.', '_') + metric = self.provider._metrics[metric_name] + total_name = f'{metric_name}_total' + [collected] = metric.collect() + actual = self._samples_to_dics(collected.samples, total_name) + self.assertDictEqual(actual, expected) + + def assertHistogramMetric(self, metric_name, expected, buckets): + # Note that Prometheus histograms are cumulative so we must sum up the successive bucket values + # https://en.wikipedia.org/wiki/Histogram#Cumulative_histogram + metric = self.provider._metrics[metric_name] + [collected] = metric.collect() + + sample_name = f'{metric_name}_bucket' + expected_samples = [] + for key, value in expected.items(): + cumulative_value = 0 + for bucket in buckets: + val = value.get(bucket, 0) + cumulative_value += val + labels = dict(key + (('le', str(float(bucket))),)) + expected_samples.append(Sample(sample_name, labels, float(cumulative_value), None, None)) + + labels = dict(key + (('le', '+Inf'),)) + cumulative_value += value.get(INF, 0) + expected_samples.append(Sample(sample_name, labels, float(cumulative_value), None, None)) + + actual = [ + s for s in collected.samples + if s.name.endswith('bucket') + ] + self.assertListEqual(actual, expected_samples) diff --git a/corehq/util/metrics/tests/utils.py b/corehq/util/metrics/tests/utils.py new file mode 100644 index 000000000000..6c4c3edcc9ee --- /dev/null +++ b/corehq/util/metrics/tests/utils.py @@ -0,0 +1,17 @@ +from collections import defaultdict +from contextlib import contextmanager + +import mock + + +@contextmanager +def patch_datadog(): + def record(fn, name, value, tags=None): + def get_tag_pairs(tags: list): + return tuple(sorted(tuple(t.split(':', 1)) for t in tags)) + key = (name, get_tag_pairs(tags or [])) + stats[key].append(value) + + stats = defaultdict(list) + with mock.patch("corehq.util.metrics.datadog._datadog_record", new=record): + yield stats diff --git a/corehq/util/metrics/urls.py b/corehq/util/metrics/urls.py new file mode 100644 index 000000000000..b6df3985ac4f --- /dev/null +++ b/corehq/util/metrics/urls.py @@ -0,0 +1,7 @@ +from django.conf.urls import url + +from corehq.util.metrics.views import prometheus_metrics + +urlpatterns = [ + url(r"^metrics$", prometheus_metrics, name="prometheus-django-metrics") +] diff --git a/corehq/util/metrics/views.py b/corehq/util/metrics/views.py new file mode 100644 index 000000000000..d55d55934b7b --- /dev/null +++ b/corehq/util/metrics/views.py @@ -0,0 +1,19 @@ +import os + +import prometheus_client +from django.http import HttpResponse +from prometheus_client import multiprocess + + +def prometheus_metrics(request): + """Exports /metrics as a Django view. + """ + if "prometheus_multiproc_dir" in os.environ: + registry = prometheus_client.CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + else: + registry = prometheus_client.REGISTRY + metrics_page = prometheus_client.generate_latest(registry) + return HttpResponse( + metrics_page, content_type=prometheus_client.CONTENT_TYPE_LATEST + ) diff --git a/deployment/gunicorn/gunicorn_conf.py b/deployment/gunicorn/gunicorn_conf.py index 668c9e68a008..aa8399de7ad6 100644 --- a/deployment/gunicorn/gunicorn_conf.py +++ b/deployment/gunicorn/gunicorn_conf.py @@ -1,3 +1,6 @@ +import glob +import os + preload_app = True worker_class = 'gevent' keepalive = 60 @@ -18,3 +21,15 @@ def post_fork(server, worker): # see: https://github.com/benoitc/gunicorn/issues/527#issuecomment-19601046 from django.urls import resolve resolve('/') + + +def on_starting(server): + """Wipe the metrics from previous processes""" + path = os.environ.get('prometheus_multiproc_dir') + for f in glob.glob(os.path.join(path, '*.db')): + os.remove(f) + + +def child_exit(server, worker): + from prometheus_client import multiprocess + multiprocess.mark_process_dead(worker.pid) diff --git a/deployment/prometheus_server.py b/deployment/prometheus_server.py new file mode 100644 index 000000000000..7e0a7da74e8a --- /dev/null +++ b/deployment/prometheus_server.py @@ -0,0 +1,24 @@ +""" +Simple WSGI server that exposes Prometheus metrics. + +Environment variable `prometheus_multiproc_dir` must be set and match +the value used by Django. +""" +import os +from wsgiref.simple_server import make_server + +from prometheus_client import CollectorRegistry, make_wsgi_app, multiprocess +from prometheus_client.exposition import _SilentHandler + +multiproc_dir = os.environ.get("prometheus_multiproc_dir") +if not multiproc_dir: + raise Exception("Environment variable 'prometheus_multiproc_dir' is not set") + +print(f"Exposing metrics from '{multiproc_dir}'") + +registry = CollectorRegistry() +multiprocess.MultiProcessCollector(registry) + +app = make_wsgi_app(registry) +httpd = make_server('', 9011, app, handler_class=_SilentHandler) +httpd.serve_forever() diff --git a/docs/index.rst b/docs/index.rst index 697abee9a1b6..47845d2f3d93 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -48,6 +48,7 @@ Welcome to CommCareHQ's documentation! openmrs js-guide/README databases + metrics Tips for documenting -------------------- diff --git a/docs/metrics.rst b/docs/metrics.rst new file mode 100644 index 000000000000..fdfaf57dc3b1 --- /dev/null +++ b/docs/metrics.rst @@ -0,0 +1 @@ +.. automodule:: corehq.util.metrics diff --git a/requirements-python3/dev-requirements.txt b/requirements-python3/dev-requirements.txt index e8ddb5311179..d8f390f9b4af 100644 --- a/requirements-python3/dev-requirements.txt +++ b/requirements-python3/dev-requirements.txt @@ -130,6 +130,7 @@ pillow==6.2.1 pip-tools==4.4.0 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 prompt-toolkit==1.0.18 # via ipython psutil==5.1.3 psycogreen==1.0.1 diff --git a/requirements-python3/prod-requirements.txt b/requirements-python3/prod-requirements.txt index 2215c01047ed..39794d0cf3f3 100644 --- a/requirements-python3/prod-requirements.txt +++ b/requirements-python3/prod-requirements.txt @@ -109,6 +109,7 @@ pickleshare==0.7.5 # via ipython pillow==6.2.1 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 prompt-toolkit==1.0.18 # via ipython psycogreen==1.0.1 psycopg2==2.7.7 diff --git a/requirements-python3/requirements.txt b/requirements-python3/requirements.txt index b7e4f2fb9c4b..313cdf9fe778 100644 --- a/requirements-python3/requirements.txt +++ b/requirements-python3/requirements.txt @@ -101,6 +101,7 @@ phonenumberslite==8.10.22 pillow==6.2.1 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 psycogreen==1.0.1 psycopg2==2.7.7 py-kissmetrics==1.0.1 diff --git a/requirements-python3/test-requirements.txt b/requirements-python3/test-requirements.txt index 7daa4ecef200..0c55274d867c 100644 --- a/requirements-python3/test-requirements.txt +++ b/requirements-python3/test-requirements.txt @@ -113,6 +113,7 @@ pillow==6.2.1 pip-tools==4.4.0 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 psycogreen==1.0.1 psycopg2==2.7.7 py-kissmetrics==1.0.1 diff --git a/requirements/dev-requirements.txt b/requirements/dev-requirements.txt index e8ddb5311179..d8f390f9b4af 100644 --- a/requirements/dev-requirements.txt +++ b/requirements/dev-requirements.txt @@ -130,6 +130,7 @@ pillow==6.2.1 pip-tools==4.4.0 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 prompt-toolkit==1.0.18 # via ipython psutil==5.1.3 psycogreen==1.0.1 diff --git a/requirements/prod-requirements.txt b/requirements/prod-requirements.txt index 2215c01047ed..39794d0cf3f3 100644 --- a/requirements/prod-requirements.txt +++ b/requirements/prod-requirements.txt @@ -109,6 +109,7 @@ pickleshare==0.7.5 # via ipython pillow==6.2.1 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 prompt-toolkit==1.0.18 # via ipython psycogreen==1.0.1 psycopg2==2.7.7 diff --git a/requirements/requirements.in b/requirements/requirements.in index 1e555dd475c3..96b3a9eb92db 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -114,3 +114,4 @@ werkzeug==0.11.15 CommcareTranslationChecker==0.9.3.5 WeasyPrint==0.42.3 architect==0.5.6 +prometheus-client==0.7.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index b7e4f2fb9c4b..313cdf9fe778 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -101,6 +101,7 @@ phonenumberslite==8.10.22 pillow==6.2.1 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 psycogreen==1.0.1 psycopg2==2.7.7 py-kissmetrics==1.0.1 diff --git a/requirements/test-requirements.txt b/requirements/test-requirements.txt index 7daa4ecef200..0c55274d867c 100644 --- a/requirements/test-requirements.txt +++ b/requirements/test-requirements.txt @@ -113,6 +113,7 @@ pillow==6.2.1 pip-tools==4.4.0 ply==3.11 # via eulxml, jsonpath-rw polib==1.1.0 +prometheus-client==0.7.1 psycogreen==1.0.1 psycopg2==2.7.7 py-kissmetrics==1.0.1 diff --git a/settings.py b/settings.py index eaa6db1f5837..8541637ad31c 100755 --- a/settings.py +++ b/settings.py @@ -835,6 +835,11 @@ SUBSCRIPTION_USERNAME = None SUBSCRIPTION_PASSWORD = None +# List of metrics providers to use. Available providers: +# * 'corehq.util.metrics.datadog.DatadogMetrics' +# * 'corehq.util.metrics.prometheus.PrometheusMetrics' +METRICS_PROVIDERS = [] + DATADOG_API_KEY = None DATADOG_APP_KEY = None @@ -2073,19 +2078,6 @@ if 'dummy' not in CACHES: CACHES['dummy'] = {'BACKEND': 'django.core.cache.backends.dummy.DummyCache'} -try: - from datadog import initialize -except ImportError: - pass -else: - initialize(DATADOG_API_KEY, DATADOG_APP_KEY) - -if UNIT_TESTING or DEBUG or 'ddtrace.contrib.django' not in INSTALLED_APPS: - try: - from ddtrace import tracer - tracer.enabled = False - except ImportError: - pass REST_FRAMEWORK = { 'DATETIME_FORMAT': '%Y-%m-%dT%H:%M:%S.%fZ', diff --git a/urls.py b/urls.py index 6d2b35fc2004..d0c98c0a1564 100644 --- a/urls.py +++ b/urls.py @@ -104,6 +104,7 @@ url(r'^hq/sms/', include(sms_admin_interface_urls)), url(r'^hq/multimedia/', include('corehq.apps.hqmedia.urls')), url(r'^hq/admin/', include('corehq.apps.hqadmin.urls')), + url(r'^hq/admin/', include('corehq.util.metrics.urls')), url(r'^hq/flags/', include('corehq.apps.toggle_ui.urls')), url(r'^hq/notifications/', include('corehq.apps.notifications.urls')), url(r'^unicel/', include('corehq.messaging.smsbackends.unicel.urls')), @@ -141,7 +142,7 @@ url(r'^unsubscribe_report/(?P[\w-]+)/' r'(?P[\w.%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})/(?P[\w-]+)/', ReportNotificationUnsubscribeView.as_view(), name=ReportNotificationUnsubscribeView.urlname), - url(r'^phone/list_apps', list_apps, name="list_accessible_apps") + url(r'^phone/list_apps', list_apps, name="list_accessible_apps"), ] + LOCAL_APP_URLS if settings.ENABLE_PRELOGIN_SITE: