diff --git a/metrics_utility/automation_controller_billing/collector.py b/metrics_utility/automation_controller_billing/collector.py deleted file mode 100644 index 421ec458..00000000 --- a/metrics_utility/automation_controller_billing/collector.py +++ /dev/null @@ -1,151 +0,0 @@ -import contextlib -import json -import os - -from awx.main.utils import datetime_hook -from django.conf import settings -from django.core.serializers.json import DjangoJSONEncoder -from django.db import connection - -import metrics_utility.base as base - -from metrics_utility.automation_controller_billing.package.factory import Factory as PackageFactory -from metrics_utility.logger import logger - - -# work around https://github.com/ansible/awx/pull/15676 -try: - # 2.4, early 2.5 - from awx.main.utils.pglock import advisory_lock -except ImportError: - # later 2.5, 2.6 - from ansible_base.lib.utils.db import advisory_lock - - -class Collector(base.Collector): - def __init__(self, collection_type=base.Collector.SCHEDULED_COLLECTION, collector_module=None, ship_target=None, billing_provider_params=None): - if collector_module is None: - from metrics_utility.automation_controller_billing import collectors - - collector_module = collectors - - self.ship_target = ship_target - self.billing_provider_params = billing_provider_params - - super(Collector, self).__init__(collection_type=collection_type, collector_module=collector_module) - - # TODO: extract advisory lock name in the superclass and log message, so we can change it here and then use - # this method from superclass - # TODO: extract to superclass ability to push extra params into config.json - def gather(self, dest=None, subset=None, since=None, until=None, billing_provider_params=None): - """Entry point for gathering - - :param dest: (default: /tmp/awx-analytics-*) - directory for temp files - :param subset: (list) collector_module's function names if only subset is required (typically tests) - :param since: (datetime) - low threshold of data changes (max. and default - 28 days ago) - :param until: (datetime) - high threshold of data changes (defaults to now) - :return: None or list of paths to tarballs (.tar.gz) - """ - if not self.is_enabled(): - return None - - key = 'gather_automation_controller_billing_lock' - suffix = os.getenv('METRICS_UTILITY_COLLECTOR_LOCK_SUFFIX') - if suffix: - key = f'gather_automation_controller_billing_{suffix}_lock' - - with self._pg_advisory_lock(key, wait=False) as acquired: - if not acquired: - logger.log(self.log_level, 'Not gathering Automation Controller billing data, another task holds lock') - return None - - self._gather_initialize(dest, subset, since, until) - - if not self._gather_config(): - return None - - self._gather_json_collections() - - self._gather_csv_collections() - - self._process_packages() - - self._gather_finalize() - - self._gather_cleanup() - - return self.all_tar_paths() - - def _gather_config(self): - if not super()._gather_config(): - return False - - # Extend the config collection to contain billing specific info: - config_collection = self.collections['config'] - data = json.loads(config_collection.data) - data['billing_provider_params'] = self.billing_provider_params - config_collection._save_gathering(data) - - return True - - def _is_valid_license(self): - # TODO: which license to check? Any license will do? - return True - - def _is_shipping_configured(self): - # This check is already done in each Package class - return True - - @staticmethod - def db_connection(): - return connection - - @classmethod - def registered_collectors(cls, module=None): - from metrics_utility.automation_controller_billing import collectors - - return base.Collector.registered_collectors(collectors) - - @contextlib.contextmanager - def _pg_advisory_lock(self, key, wait=False): - """Use awx specific implementation to pass tests with sqlite3""" - with advisory_lock(key, wait=wait) as lock: - yield lock - - def _last_gathering(self): - # Not needed in this implementation, but we need to define an abstract method - pass - - def _load_last_gathered_entries(self): - # We are reusing Settings used by Analytics, so we don't have to backport changes into analytics - # We can safely do this, by making sure we use the same lock as Analytics, before we persist - # these settings. - from awx.conf.models import Setting - - last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_gathered_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) - return last_gathered_entries - - def _gather_finalize(self): - """Persisting timestamps (manual/schedule mode only)""" - - disabled_str = os.getenv('METRICS_UTILITY_DISABLE_SAVE_LAST_GATHERED_ENTRIES', 'false') - disabled = False - if disabled_str and (disabled_str.lower() == 'true'): - disabled = True - - if self.is_shipping_enabled() and not disabled: - # We need to wait on analytics lock, to update the last collected timestamp settings - # so we don't clash with analytics job collection. - with self._pg_advisory_lock('gather_analytics_lock', wait=True): - # We need to load fresh settings again as we're obtaning the lock, since - # Analytics job could have changed this on the background and we'd be resetting - # the Analytics values here. - self._load_last_gathered_entries() - self._update_last_gathered_entries() - - def _save_last_gathered_entries(self, last_gathered_entries): - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_gathered_entries, cls=DjangoJSONEncoder) - - def _package_class(self): - return PackageFactory(ship_target=self.ship_target).create() diff --git a/metrics_utility/automation_controller_billing/collectors.py b/metrics_utility/automation_controller_billing/collectors.py index c8daa942..e92a9ce2 100644 --- a/metrics_utility/automation_controller_billing/collectors.py +++ b/metrics_utility/automation_controller_billing/collectors.py @@ -49,6 +49,7 @@ def daily_slicing(key, last_gather, **kwargs): else: from awx.conf.models import Setting + # FIXME: merge with collector _calculate_collection_interval? horizon = until - timedelta(days=get_max_gather_period_days()) last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) @@ -173,6 +174,7 @@ def _copy_table_aap_2_5_and_above(cursor, query, file): with cursor.copy(query) as copy: while data := copy.read(): byte_data = bytes(data) + # byte_data is individual CSV lines here file.write(byte_data.decode()) diff --git a/metrics_utility/automation_controller_billing/package/package_crc.py b/metrics_utility/automation_controller_billing/package/package_crc.py index 5c539040..836baba9 100644 --- a/metrics_utility/automation_controller_billing/package/package_crc.py +++ b/metrics_utility/automation_controller_billing/package/package_crc.py @@ -1,3 +1,4 @@ +import base64 import json import os @@ -8,7 +9,7 @@ import metrics_utility.base as base -from metrics_utility.exceptions import FailedToUploadPayload +from metrics_utility.exceptions import FailedToUploadPayload, MetricsException from metrics_utility.logger import logger @@ -16,19 +17,17 @@ class PackageCRC(base.Package): CERT_PATH = '/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem' PAYLOAD_CONTENT_TYPE = 'application/vnd.redhat.aap-billing-controller.aap_billing_controller_payload+tgz' - SHIPPING_AUTH_SERVICE_ACCOUNT = 'service-account' - def _tarname_base(self): timestamp = self.collector.gather_until return f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' - def get_sso_url(self): + def _get_sso_url(self): return os.getenv('METRICS_UTILITY_CRC_SSO_URL', 'https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token') - def get_ingress_url(self): + def _get_ingress_url(self): return os.getenv('METRICS_UTILITY_CRC_INGRESS_URL', 'https://console.redhat.com/api/ingress/v1/upload') - def get_proxy_url(self): + def _get_proxy_url(self): return os.getenv('METRICS_UTILITY_PROXY_URL') def _get_rh_user(self): @@ -40,28 +39,33 @@ def _get_rh_password(self): def _get_http_request_headers(self): return get_awx_http_client_headers() - def shipping_auth_mode(self): - # TODO make this as a configuration so we can use this for local testing, - # for now, uncomment when testin locally in docker - # return self.SHIPPING_AUTH_IDENTITY - - return self.SHIPPING_AUTH_SERVICE_ACCOUNT + # only service_account was reachable in 0.6.0 without changing code; identity still isn't + # FIXME: only use for service if needed, remove if not + def _shipping_auth(self): + return os.getenv('METRICS_UTILITY_SHIP_AUTH', 'service_account') # identity | mutual_tls | service_account | user_pass def is_shipping_configured(self): - # TODO: move to base, or children - ret = super() - if ret is False: + if not self.tar_path: + logger.error('Insights for Ansible Automation Platform TAR not found') return False - if self.shipping_auth_mode() == self.SHIPPING_AUTH_SERVICE_ACCOUNT: - if not self.get_ingress_url(): - logger.error('METRICS_UTILITY_CRC_INGRESS_URL is not set') - return False + if not os.path.exists(self.tar_path): + logger.error(f'Insights for Ansible Automation Platform TAR {self.tar_path} not found') + return False + + if 'Error:' in str(self.tar_path): + return False + + if not self._get_ingress_url(): + logger.error('METRICS_UTILITY_CRC_INGRESS_URL is not set') + return False - if not self.get_sso_url(): + if self._shipping_auth() == 'service_account': + if not self._get_sso_url(): logger.error('METRICS_UTILITY_CRC_SSO_URL is not set') return False + if self._shipping_auth() in ('service_account', 'user_pass'): if not self._get_rh_user(): logger.error('METRICS_UTILITY_SERVICE_ACCOUNT_ID is not set') return False @@ -69,52 +73,134 @@ def is_shipping_configured(self): if not self._get_rh_password(): logger.error('METRICS_UTILITY_SERVICE_ACCOUNT_SECRET is not set') return False - return True - def _send_data(self, url, files, session): - # TODO: move to base - if self.shipping_auth_mode() == self.SHIPPING_AUTH_SERVICE_ACCOUNT: - sso_url = self.get_sso_url() - headers = {'Content-Type': 'application/x-www-form-urlencoded'} - - data = {'client_id': self._get_rh_user(), 'client_secret': self._get_rh_password(), 'grant_type': 'client_credentials'} - - r = requests.post(sso_url, headers=headers, data=data, verify=self.CERT_PATH, timeout=(31, 31)) - access_token = json.loads(r.content)['access_token'] - - ################################# - ## Query crc with bearer token - headers = session.headers - headers['authorization'] = f'Bearer {access_token}' - - proxies = {} - if self.get_proxy_url(): - proxies = {'https': self.get_proxy_url()} - - response = session.post( - url, - files=files, - verify=self.CERT_PATH, - proxies=proxies, - headers=headers, - timeout=(31, 31), - ) - - elif self.shipping_auth_mode() == self.SHIPPING_AUTH_USERPASS: - response = session.post( - url, - files=files, - verify=self.CERT_PATH, - auth=(self._get_rh_user(), self._get_rh_password()), - headers=session.headers, - timeout=(31, 31), - ) + # _get_proxy_url is optional - else: - response = session.post(url, files=files, headers=session.headers, timeout=(31, 31)) + return True - # Accept 2XX status_codes - if response.status_code >= 300: - raise FailedToUploadPayload(f'Upload failed with status {response.status_code}, {response.text}') + def ship(self): + """ + Ship gathered metrics to the Insights API + """ + if not self.is_shipping_configured(): + self.shipping_successful = False + return False - return True + logger.debug(f'shipping analytics file: {self.tar_path}') + + with open(self.tar_path, 'rb') as f: + files = { + 'file': ( + os.path.basename(self.tar_path), + f, + self.PAYLOAD_CONTENT_TYPE, + ) + } + + response = self._request(files) + + # Accept 2XX status_codes + if response.status_code >= 300: + raise FailedToUploadPayload(f'Upload failed with status {response.status_code}, {response.text}') + + self.shipping_successful = True + return True + + def _session(self): + session = requests.Session() + session.headers = self._get_http_request_headers() + session.headers.pop('Content-Type') + + session.verify = self.CERT_PATH + session.timeout = (31, 31) + + return session + + def _bearer(self): + response = requests.post( + self._get_sso_url(), + data={'client_id': self._get_rh_user(), 'client_secret': self._get_rh_password(), 'grant_type': 'client_credentials'}, + headers={'Content-Type': 'application/x-www-form-urlencoded'}, + timeout=(31, 31), + verify=self.CERT_PATH, + ) + + return json.loads(response.content)['access_token'] + + def _proxies(self): + if not self._get_proxy_url(): + return {} + + return {'https': self._get_proxy_url()} + + def _identity(self, url, files): + session = self._session() + + # FIXME: make parametrizable, if used + identity = { + 'identity': { + 'type': 'User', + 'account_number': '0000001', + 'user': {'is_org_admin': True}, + 'internal': {'org_id': '000001'}, + } + } + session.headers['x-rh-identity'] = base64.b64encode(json.dumps(identity).encode('utf8')) + + return session.post( + url, + files=files, + proxies=self._proxies(), + ) + + def _mutual_tls(self, url, files): + session = self._session() + + # a single file (containing the private key and the certificate) + # or a tuple of both files paths (cert_file, keyfile) + session.cert = ( + '/etc/pki/consumer/cert.pem', + '/etc/pki/consumer/key.pem', + ) + + return session.post( + url, + files=files, + proxies=self._proxies(), + ) + + def _service_account(self, url, files): + session = self._session() + + access_token = self._bearer() + session.headers['authorization'] = f'Bearer {access_token}' + + return session.post( + url, + files=files, + proxies=self._proxies(), + ) + + def _user_pass(self, url, files): + session = self._session() + session.auth = (self._get_rh_user(), self._get_rh_password()) + + return session.post( + url, + files=files, + proxies=self._proxies(), + ) + + def _request(self, url, files): + url = self._get_ingress_url() + mode = self._shipping_auth() + if mode == 'identity': + return self._identity(url, files) + elif mode == 'mutual_tls': + return self._mutual_tls(url, files) + elif mode == 'service_account': + return self._service_account(url, files) + elif mode == 'user_pass': + return self._user_pass(url, files) + else: + raise MetricsException(f'Invalid METRICS_UTILITY_SHIP_AUTH {mode}: identity | mutual_tls | service_account (default) | user_pass') diff --git a/metrics_utility/automation_controller_billing/package/package_directory.py b/metrics_utility/automation_controller_billing/package/package_directory.py index 68cd3f5c..5fcfb0e6 100644 --- a/metrics_utility/automation_controller_billing/package/package_directory.py +++ b/metrics_utility/automation_controller_billing/package/package_directory.py @@ -10,12 +10,7 @@ class PackageDirectory(base.Package): def _batch_since_and_until(self): - # TODO: how to verify this is the daily batch of job_host_summary? - # self.collection_keys is: ['job_host_summary', 'manifest'] - # So we can take this and acess cherrypicked collections, if we need to - # collect more collections in the future - # But the main collection should always be first. - + # FIXME: get from config return self.collections[0].since, self.collections[0].until def _tarname_base(self): diff --git a/metrics_utility/automation_controller_billing/package/package_s3.py b/metrics_utility/automation_controller_billing/package/package_s3.py index 384a25d0..9a2485dc 100644 --- a/metrics_utility/automation_controller_billing/package/package_s3.py +++ b/metrics_utility/automation_controller_billing/package/package_s3.py @@ -10,12 +10,7 @@ class PackageS3(base.Package): def _batch_since_and_until(self): - # TODO: how to verify this is the daily batch of job_host_summary? - # self.collection_keys is: ['job_host_summary', 'manifest'] - # So we can take this and acess cherrypicked collections, if we need to - # collect more collections in the future - # But the main collection should always be first. - + # FIXME: get from config return self.collections[0].since, self.collections[0].until def _tarname_base(self): @@ -47,7 +42,7 @@ def _destination_path(self, base_path, timestamp, filename): def ship(self): """ - Ship gathered metrics to the Directory + Ship gathered metrics to S3 """ if not self.is_shipping_configured(): self.shipping_successful = False diff --git a/metrics_utility/base/README.md b/metrics_utility/base/README.md index a25be0d4..b5de4b3b 100644 --- a/metrics_utility/base/README.md +++ b/metrics_utility/base/README.md @@ -24,10 +24,6 @@ Entrypoint with "gather()" method. Collector is an Abstract class, implement abstract methods. - `_package_class`: Returns class of your implementation of Package -- `_is_valid_license`: Check for valid license specific to your service -- `_is_shipping_configured`: Check if shipping to cloud is configured -- `_last_gathering`: returns datetime. Loading last successful run from some persistent storage -- `_save_last_gather`: Persisting last successful run - `_load_last_gathered_entries`: Has to fill dictionary `self.last_gathered_entries`. Load from persistent storage Dict contains keys equal to collector's registered functions' keys (with @register decorator) - `_save_last_gathered_entries`: Persisting `self.last_gathered_entries` @@ -72,14 +68,8 @@ See the [test_gathering.py](tests/functional/test_gathering.py) for details Package is also abstract class. You have to implement basically info for POST request to cloud. -- `PAYLOAD_CONTENT_TYPE`: contains registered content type for cloud's ingress service - `MAX_DATA_SIZE`: maximum size in bytes of **uncompressed** data for one tarball. Ingress limits uploads to 100MB. Defaults to 200MB. -- `get_ingress_url`: Cloud's ingress service URL -- `_get_rh_user`: User for POST request -- `_get_rh_password`: Password for POST request -- `_get_x_rh_identity`: X-RH Identity Used for local testing instead of user and password -- `_get_http_request_headers`: Dict with any custom headers for POST request An example can be found in [Test package](tests/classes/package.py) diff --git a/metrics_utility/base/collection.py b/metrics_utility/base/collection.py index 4cd29744..051c22d9 100644 --- a/metrics_utility/base/collection.py +++ b/metrics_utility/base/collection.py @@ -82,20 +82,28 @@ def slices(self): since = self.collector.gather_since until = self.collector.gather_until last_gather = self.collector.last_gather + # These slicer functions may return a generator. The `since` parameter is # allowed to be None, and will fall back to LAST_ENTRIES[key] or to # LAST_GATHER (truncated appropriately to match the 28-day limit). # # Or it can force full table sync if interval is given + if self.fnc_slicing and self.full_sync_enabled: + return self.fnc_slicing(self.key, last_gather, full_sync_enabled=True) + if self.fnc_slicing: - if self.full_sync_enabled: - slices = self.fnc_slicing(self.key, last_gather, full_sync_enabled=True) - else: - slices = self.fnc_slicing(self.key, last_gather, since=since, until=until) - else: - slices = [(self._gather_since(), self._gather_until())] + # TODO: accepts since=None? or just never used that way? + return self.fnc_slicing(self.key, last_gather, since=since, until=until) + + # Start/end of gathering based on settings excluding slices + # FIXME: merge with collector _calculate_collection_interval? + last_entry = max( + self.last_gathered_entry or self.collector.last_gather, + self.collector.gather_until - timedelta(days=get_max_gather_period_days()), + ) - return slices + ## TODO: gather_since OR last_entry is the answer to Final since-until .. except both ^returns too + return [(self.collector.gather_since or last_entry, self.collector.gather_until)] def ship_immediately(self): """ @@ -142,19 +150,6 @@ def _is_full_sync_enabled(self, interval_days): last_full_sync = self.collector.last_gathered_entry_for(f'{self.key}_full') return not last_full_sync or last_full_sync < now() - timedelta(days=interval_days) - def _gather_since(self): - """Start of gathering based on settings excluding slices""" - - last_entry = max( - self.last_gathered_entry or self.collector.last_gather, - self.collector.gather_until - timedelta(days=get_max_gather_period_days()), - ) - return self.collector.gather_since or last_entry - - def _gather_until(self): - """End of gathering based on settings excluding slices""" - return self.collector.gather_until - @abstractmethod def _save_gathering(self, data): pass diff --git a/metrics_utility/base/collection_manifest.py b/metrics_utility/base/collection_manifest.py index 6217a78d..f59afedf 100644 --- a/metrics_utility/base/collection_manifest.py +++ b/metrics_utility/base/collection_manifest.py @@ -5,10 +5,7 @@ class CollectionManifest(CollectionJSON): def __init__(self, collector): super().__init__(collector, self.collecting) - self.data = {} - self.since = collector.gather_since - self.until = collector.gather_until @register('manifest', '1.0', format='json', description='Manifest file') def collecting(self, **kwargs): diff --git a/metrics_utility/base/collector.py b/metrics_utility/base/collector.py index 5624946c..93c8ea57 100644 --- a/metrics_utility/base/collector.py +++ b/metrics_utility/base/collector.py @@ -1,14 +1,15 @@ import contextlib -import hashlib import inspect -import logging +import json import os import pathlib import shutil import tempfile -from abc import abstractmethod - +from awx.main.utils import datetime_hook +from django.conf import settings +from django.core.serializers.json import DjangoJSONEncoder +from django.db import connection from django.utils.timezone import now, timedelta from metrics_utility.logger import logger @@ -16,57 +17,79 @@ from .collection import Collection from .collection_csv import CollectionCSV from .collection_json import CollectionJSON -from .package import Package from .utils import get_max_gather_period_days, get_optional_collectors +# work around https://github.com/ansible/awx/pull/15676 +try: + # 2.4, early 2.5 + from awx.main.utils.pglock import advisory_lock +except ImportError: + # later 2.5, 2.6 + from ansible_base.lib.utils.db import advisory_lock + + +def _last_gathered_entries(): + # We are reusing Settings used by Analytics, so we don't have to backport changes into analytics + # We can safely do this, by making sure we use the same lock as Analytics, before we persist + # these settings. + from awx.conf.models import Setting + + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_gathered_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) + + return last_gathered_entries + + class Collector: - """Abstract class. The Collector is an entry-point for gathering data + """The Collector is an entry-point for gathering data from awx to cloud. - Abstract and following methods has to be implemented: - - _package_class() - reference to your implementation of Package There are several params: - collection_type: - - manual/scheduled - data are gathered and shipped, local timestamps about gathering are updated + - manual - data are gathered and shipped, local timestamps about gathering are updated - dry-run - data are gathered, but not shipped, tarballs from /tmp not deleted (testing mode) - collector_module: module with functions with decorator `@register` - they define what data are collected - collector functions are wrapped by kind of Collection object - Collections are grouped by Package, and Packages are creating tarballs and shipping them. - Collector is an abstract class, example of implementation is in tests/classes - Data are gathered maximally 28 days ago and can be set to less (see gather(since, until,..)) """ MANUAL_COLLECTION = 'manual' DRY_RUN = 'dry-run' - SCHEDULED_COLLECTION = 'scheduled' - def __init__(self, collection_type=DRY_RUN, collector_module=None, licensed=True): - self.licensed = licensed + def __init__(self, collection_type, collector_module=None, ship_target=None, billing_provider_params=None): + if collector_module is None: + from metrics_utility.automation_controller_billing import collectors + + collector_module = collectors + self.collector_module = collector_module self.collection_type = collection_type self.collections = {} self.packages = {} - self.last_gathered_entries = None - self.log_level = logging.ERROR if self.collection_type != self.SCHEDULED_COLLECTION else logging.DEBUG + self.ship_target = ship_target + self.billing_provider_params = billing_provider_params self.tmp_dir = None self.gather_dir = None self.gather_since = None self.gather_until = None self.last_gather = None + self.last_gathered_entries = None - # - # Class methods ----------------------------- - # @classmethod - def registered_collectors(cls, module): + def registered_collectors(cls, module=None): """ Returns all functions in 'module' defined with "@register" decorator """ + if module is None: + from metrics_utility.automation_controller_billing import collectors + + module = collectors + return { func.__insights_analytics_key__: { 'name': func.__insights_analytics_key__, @@ -77,9 +100,6 @@ def registered_collectors(cls, module): if inspect.isfunction(func) and hasattr(func, '__insights_analytics_key__') } - # - # Public methods ---------------------------- - # def config_present(self): """ Checks if collector_module contains 'config' method (required) @@ -89,7 +109,6 @@ def config_present(self): return self.collections.get('config') is not None @staticmethod - @abstractmethod def db_connection(): """ DB connection for advisory lock. Can be @@ -97,9 +116,9 @@ def db_connection(): - sqlalchemy.engine.base.Engine.raw_connection() - etc. """ - pass + return connection - def gather(self, dest=None, subset=None, since=None, until=None): + def gather(self, dest=None, subset=None, since=None, until=None, billing_provider_params=None): """Entry point for gathering :param dest: (default: /tmp/awx-analytics-*) - directory for temp files @@ -108,12 +127,15 @@ def gather(self, dest=None, subset=None, since=None, until=None): :param until: (datetime) - high threshold of data changes (defaults to now) :return: None or list of paths to tarballs (.tar.gz) """ - if not self.is_enabled(): - return None - with self._pg_advisory_lock('gather_analytics_lock', wait=False) as acquired: + key = 'gather_automation_controller_billing_lock' + suffix = os.getenv('METRICS_UTILITY_COLLECTOR_LOCK_SUFFIX') + if suffix: + key = f'gather_automation_controller_billing_{suffix}_lock' + + with self._pg_advisory_lock(key, wait=False) as acquired: if not acquired: - logger.log(self.log_level, 'Not gathering analytics, another task holds lock') + logger.error('Not gathering Automation Controller billing data, another task holds lock') return None self._gather_initialize(dest, subset, since, until) @@ -136,21 +158,6 @@ def gather(self, dest=None, subset=None, since=None, until=None): def is_dry_run(self): return self.collection_type == self.DRY_RUN - def is_enabled(self): - """Checks for license and shipping data (like credentials)""" - if not self._is_valid_license() and self.licensed: - logger.log(self.log_level, 'Invalid License provided, or No License Provided') - return False - - if self.is_shipping_enabled(): - return self._is_shipping_configured() - - return True - - def is_shipping_enabled(self): - """Shipping is enabled in manual/scheduled mode""" - return not self.is_dry_run() - def last_gathered_entry_for(self, key): return self.last_gathered_entries.get(key) @@ -165,14 +172,14 @@ def delete_tarballs(self): for path in self.all_tar_paths(): os.remove(path) - # - # Private methods --------------------------- - # def _calculate_collection_interval(self, since, until): _now = now() + _max = get_max_gather_period_days() + _timedelta = timedelta(days=_max) + original_since = since original_until = until - logger.warning(f'Original since-until: {original_since} to {original_until}') + logger.info(f'Original since-until: {original_since} to {original_until}') # Make sure that the endpoints are not in the future. if until is not None and until > _now: @@ -187,13 +194,11 @@ def _calculate_collection_interval(self, since, until): # `since` parameter. if since is not None: if until is not None: - if until > since + timedelta(days=get_max_gather_period_days()): - until = since + timedelta(days=get_max_gather_period_days()) - logger.warning( - f'End of the collection interval is greater than {get_max_gather_period_days()} days from start, setting end to {until}.' - ) + if until > since + _timedelta: + until = since + _timedelta + logger.warning(f'End of the collection interval is greater than {_max} days from start, setting end to {until}.') else: # until is None - until = min(since + timedelta(days=get_max_gather_period_days()), _now) + until = min(since + _timedelta, _now) elif until is None: until = _now @@ -206,17 +211,16 @@ def _calculate_collection_interval(self, since, until): # `until`, but we want to keep `since` empty if it wasn't passed in because we use that # case to know whether to use the bookkeeping settings variables to decide the start of # the interval. - horizon = until - timedelta(days=get_max_gather_period_days()) + # FIXME: but we also want to log the real since... + horizon = until - _timedelta if since is not None and since < horizon: since = horizon - logger.warning( - f'Start of the collection interval is more than {get_max_gather_period_days()} days prior to {until}, setting to {horizon}.' - ) + logger.warning(f'Start of the collection interval is more than {_max} days prior to {until}, setting to {horizon}.') - last_gather = self._last_gathering() or horizon + last_gather = horizon if last_gather < horizon: last_gather = horizon - logger.warning(f'Last analytics run was more than {get_max_gather_period_days()} days prior to {until}, using {horizon} instead.') + logger.warning(f'Last analytics run was more than {_max} days prior to {until}, using {horizon} instead.') self.gather_since = since self.gather_until = until @@ -248,8 +252,6 @@ def _find_available_package(self, group, key, requested_size=None): return available_package def _gather_initialize(self, tmp_root_dir, collectors_subset, since, until): - logger.debug(f'Last analytics run was: {self._last_gathering()}') - self._init_tmp_dir(tmp_root_dir) self.last_gathered_entries = self._load_last_gathered_entries() @@ -265,11 +267,18 @@ def _gather_config(self): TODO: add "always" flag to @register decorator """ if not self.config_present(): - logger.log(self.log_level, "'config' collector data is missing") + logger.error("'config' collector data is missing") return False - else: - self.collections['config'].gather(self._package_class().max_data_size()) - return True + + self.collections['config'].gather(self._package_class().max_data_size()) + + # Extend the config collection to contain billing specific info: + config_collection = self.collections['config'] + data = json.loads(config_collection.data) + data['billing_provider_params'] = self.billing_provider_params + config_collection._save_gathering(data) + + return True def _gather_json_collections(self): """JSON collections are simpler, they're just gathered and added to the Package""" @@ -334,27 +343,9 @@ def _add_collection_to_package(self, collection): @contextlib.contextmanager def _pg_advisory_lock(self, key, wait=False): - """Postgres db lock""" - connection = self.db_connection() - - if connection is None: - yield True - else: - # Build 64-bit integer out of the resource id - resource_key = int(hashlib.sha512(key.encode()).hexdigest(), 16) % 2**63 - - cursor = connection.cursor() - - try: - if wait: - cursor.execute('SELECT pg_advisory_lock(%s);', (resource_key,)) - else: - cursor.execute('SELECT pg_try_advisory_lock(%s);', (resource_key,)) - acquired = cursor.fetchall()[0][0] - yield acquired - finally: - cursor.execute('SELECT pg_advisory_unlock(%s);', (resource_key,)) - cursor.close() + """Use awx specific implementation to pass tests with sqlite3""" + with advisory_lock(key, wait=wait) as lock: + yield lock def _process_packages(self): for group, packages in self.packages.items(): @@ -372,18 +363,30 @@ def _process_package(self, package): """ if not package.processed: package.make_tgz() - if self.is_shipping_enabled(): + if not self.is_dry_run(): package.ship() package.delete_collected_files() package.processed = True def _gather_finalize(self): - """Persisting timestamps (manual/schedule mode only)""" - if self.is_shipping_enabled(): + """Persisting timestamps (manual mode only)""" + if self.is_dry_run(): + return + + # FIXME via billing_provider_params + disabled = os.getenv('METRICS_UTILITY_DISABLE_SAVE_LAST_GATHERED_ENTRIES', 'false').lower() == 'true' + if disabled: + return + + # We need to wait on analytics lock, to update the last collected timestamp settings + # so we don't clash with analytics job collection. + with self._pg_advisory_lock('gather_analytics_lock', wait=True): + # We need to load fresh settings again as we're obtaning the lock, since + # Analytics job could have changed this on the background and we'd be resetting + # the Analytics values here. + self._load_last_gathered_entries() self._update_last_gathered_entries() - self._save_last_gather() - def _gather_cleanup(self): """Deleting temp files""" shutil.rmtree(self.tmp_dir, ignore_errors=True) # clean up individual artifact files @@ -395,36 +398,12 @@ def _init_tmp_dir(self, tmp_root_dir=None): self.gather_dir = self.tmp_dir.joinpath('stage') self.gather_dir.mkdir(mode=0o700) - @abstractmethod - def _is_shipping_configured(self): - """Custom check for shipping availability should contain: - 1) Is Insights for Ansible Automation Platform enabled? - 2) Is URL and credentials present? - :return: bool - """ - pass - - @abstractmethod - def _is_valid_license(self): - """License check - :return: bool - """ - pass - - @abstractmethod - def _last_gathering(self): - """Returns timestamp of last successful gathering - Complement to _save_last_gathering() - """ - pass - - @abstractmethod def _load_last_gathered_entries(self): """Loads persisted timestamps named by keys from collector_module Complement to the _save_last_gathered_entries() :return dict """ - pass + return _last_gathered_entries() def _update_last_gathered_entries(self): last_gathered_updates = {'keys': {}, 'locked': set()} @@ -442,20 +421,12 @@ def _update_last_gathered_entries(self): self._save_last_gathered_entries(self.last_gathered_entries) - @abstractmethod def _save_last_gathered_entries(self, last_gathered_entries): """Saves dictionary with timestamps to persistent storage Complement to the _load_last_gathered_entries() :param last_gathered_entries: dict """ - pass - - @abstractmethod - def _save_last_gather(self): - """Persists timestamp of last successful gathering - Complement to _last_gathering() - """ - pass + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_gathered_entries, cls=DjangoJSONEncoder) def _create_collections(self, subset=None): """Creates Collections from decorated functions (by @register) from self.collector_module @@ -504,12 +475,12 @@ def _create_collection(self, fnc_collecting): def _create_package(self): package_class = self._package_class() - return package_class(self) + return package_class(collector=self) - @staticmethod - def _package_class(): - """Has to be redefined by your Package implementation""" - return Package + def _package_class(self): + from metrics_utility.automation_controller_billing.package.factory import Factory as PackageFactory + + return PackageFactory(ship_target=self.ship_target).create() def _reset_collections_and_packages(self): self.collections = { diff --git a/metrics_utility/base/csv_file_splitter.py b/metrics_utility/base/csv_file_splitter.py index b947e8c4..b44021fa 100644 --- a/metrics_utility/base/csv_file_splitter.py +++ b/metrics_utility/base/csv_file_splitter.py @@ -55,6 +55,7 @@ def write(self, s): """Writes to file and creates new one if file exceedes threshold""" if not self.header: self.header = s[: s.index('\n')] + # this goes over the limit before cycling, but ensures we write at least once, and this is before gz anyway self.counter += self.currentfile.write(s) if self.counter >= self.max_file_size: self.cycle_file() diff --git a/metrics_utility/base/package.py b/metrics_utility/base/package.py index d82e2a0f..f831461b 100644 --- a/metrics_utility/base/package.py +++ b/metrics_utility/base/package.py @@ -1,13 +1,6 @@ -import base64 -import json -import os import pathlib import tarfile -from abc import abstractmethod - -import requests - from metrics_utility.logger import logger from .collection_data_status import CollectionDataStatus @@ -16,33 +9,14 @@ class Package: """ - Abstract class - Package serves for managing one tarball and shipping it to the cloud. - Abstract methods has to be implemented, as well as: - - CERT_PATH - path to auth certificate (for POST request to cloud), if not development mode - - PAYLOAD_CONTENT_TYPE - registered in ingress-service in cloud - - MAX_DATA_SIZE - defaults to 200MB (upload limit is 100MB, so it expects 50% compression rate) - + Package serves for managing one tarball and shipping it somewhere See the README.md and tests/functional/test_gathering.py to see how are packages used - """ - - CERT_PATH = '/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem' - # i.e. "application/vnd.redhat.tower.tower_payload+tgz" - PAYLOAD_CONTENT_TYPE = 'application/vnd.redhat.TODO+tgz' - - SHIPPING_AUTH_USERPASS = 'user-pass' - SHIPPING_AUTH_S3_USERPASS = 'user-pass-s3' - SHIPPING_AUTH_IDENTITY = 'x-rh-identity' # Development mode only - SHIPPING_AUTH_CERTIFICATES = 'mutual-tls' # Mutual TLS - - DEFAULT_RHSM_CERT_FILE = '/etc/pki/consumer/cert.pem' - DEFAULT_RHSM_KEY_FILE = '/etc/pki/consumer/key.pem' - """ Some tables can be *very* large, and we have a 100MB upload limit. - + Default to 200MB, expecting 50% compression rate Split large table dumps at dump time into a series of files. """ + MAX_DATA_SIZE = 200 * 1048576 def __init__(self, collector): @@ -72,67 +46,9 @@ def delete_collected_files(self): for collection in self.collections: collection.cleanup() - @abstractmethod - def get_ingress_url(self): - """URL of cloud's upload URL""" - pass - - @abstractmethod - def get_s3_configured(self): - """URL of cloud's upload URL""" - pass - def has_free_space(self, requested_size): return self.total_data_size + requested_size <= self.max_data_size() - def is_shipping_configured(self): - if not self.tar_path: - logger.error('Insights for Ansible Automation Platform TAR not found') - return False - - if not os.path.exists(self.tar_path): - logger.error(f'Insights for Ansible Automation Platform TAR {self.tar_path} not found') - return False - - if 'Error:' in str(self.tar_path): - return False - - if self.shipping_auth_mode() == self.SHIPPING_AUTH_USERPASS: - if not self.get_ingress_url(): - logger.error('AUTOMATION_ANALYTICS_URL is not set') - return False - - if not self._get_rh_user(): - logger.error('REDHAT_USERNAME is not set') - return False - - if not self._get_rh_password(): - logger.error('REDHAT_PASSWORD is not set') - return False - - if self.shipping_auth_mode() == self.SHIPPING_AUTH_S3_USERPASS: - if not self.get_s3_configured(): - logger.error('S3 configuration is not set') - return False - - if not self._get_rh_user(): - logger.error('aws_access_key_id is not set') - return False - - if not self._get_rh_password(): - logger.error('aws_secret_access_key is not set') - return False - - if not self._get_rh_region(): - logger.error('aws_region is not set') - return False - - if not self._get_rh_bucket(): - logger.error('aws_bucket is not set') - return False - - return True - def make_tgz(self): target = self.collector.tmp_dir.parent try: @@ -157,45 +73,8 @@ def make_tgz(self): logger.exception(f'Failed to write analytics archive file: {e}') return False - def ship(self): - """ - Ship gathered metrics to the Insights API - """ - if not self.is_shipping_configured(): - self.shipping_successful = False - return False - - logger.debug(f'shipping analytics file: {self.tar_path}') - - with open(self.tar_path, 'rb') as f: - files = { - 'file': ( - os.path.basename(self.tar_path), - f, - self._payload_content_type(), - ) - } - s = requests.Session() - if self.shipping_auth_mode() == self.SHIPPING_AUTH_CERTIFICATES: - # as a single file (containing the private key and the certificate) or - # as a tuple of both files paths (cert_file, keyfile) - s.cert = self._get_client_certificates() - - s.headers = self._get_http_request_headers() - s.headers.pop('Content-Type') - - if self.shipping_auth_mode() == self.SHIPPING_AUTH_IDENTITY: - s.headers['x-rh-identity'] = self._get_x_rh_identity() - - url = self.get_ingress_url() - self.shipping_successful = self._send_data(url, files, s) - - return self.shipping_successful - - def shipping_auth_mode(self): - return self.SHIPPING_AUTH_USERPASS - def update_last_gathered_entries(self, updates_dict): + # set by subclass.ship() if self.shipping_successful: for collection in self.collections: collection.update_last_gathered_entries(updates_dict) @@ -213,26 +92,6 @@ def _collection_to_tar(self, tar, collection): logger.exception(f'Could not generate metric {collection.filename}: {e}') return None - def _send_data(self, url, files, session): - if self.shipping_auth_mode() == self.SHIPPING_AUTH_USERPASS: - response = session.post( - url, - files=files, - verify=self.CERT_PATH, - auth=(self._get_rh_user(), self._get_rh_password()), - headers=session.headers, - timeout=(31, 31), - ) - else: - response = session.post(url, files=files, headers=session.headers, timeout=(31, 31)) - - # Accept 2XX status_codes - if response.status_code >= 300: - logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text)) - return False - - return True - def _config_to_tar(self, tar): if self.collector.collections['config'] is None: logger.error("'config' collector data is missing, and is required to ship.") @@ -242,58 +101,6 @@ def _config_to_tar(self, tar): return True - @abstractmethod - def _get_http_request_headers(self): - """Optional HTTP headers for POST request to get_ingress_url() URL - :return: dict() - """ - pass - - @abstractmethod - def _get_rh_user(self): - """Auth: username for HTTP POST request to cloud. - shipping_auth_mode() must return SHIPPING_AUTH_USERPASS (default) - """ - pass - - @abstractmethod - def _get_rh_password(self): - """Auth: password for HTTP POST request to cloud. - shipping_auth_mode() must return SHIPPING_AUTH_USERPASS (default) - """ - pass - - @abstractmethod - def _get_rh_region(self): - """s3: The region that boto3 will connect to""" - pass - - @abstractmethod - def _get_rh_bucket(self): - """s3: The bucket that boto3 will use""" - pass - - def _get_client_certificates(self): - """Auth: get client certificate and key, by default we use the RHSM certs - :return: string or tuple of 2 strings - """ - return (self.DEFAULT_RHSM_CERT_FILE, self.DEFAULT_RHSM_KEY_FILE) - - def _get_x_rh_identity(self): - """Auth: x-rh-identity header for HTTP POST request to cloud - Optional, if shipping_auth_mode() redefined to SHIPPING_AUTH_IDENTITY - """ - identity = { - 'identity': { - 'type': 'User', - 'account_number': '0000001', - 'user': {'is_org_admin': True}, - 'internal': {'org_id': '000001'}, - } - } - identity = base64.b64encode(json.dumps(identity).encode('utf8')) - return identity - def _data_collection_status_to_tar(self, tar): try: self.data_collection_status.gather(None) @@ -309,10 +116,3 @@ def _manifest_to_tar(self, tar): self.add_collection(self.manifest) except Exception as e: logger.exception(f'Could not generate {self.manifest.filename}: {e}') - - def _payload_content_type(self): - return self.PAYLOAD_CONTENT_TYPE - - def _tarname_base(self): - timestamp = self.collector.gather_until - return f'analytics-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' diff --git a/metrics_utility/management/commands/gather_automation_controller_billing_data.py b/metrics_utility/management/commands/gather_automation_controller_billing_data.py index 0cf6632b..c169236f 100644 --- a/metrics_utility/management/commands/gather_automation_controller_billing_data.py +++ b/metrics_utility/management/commands/gather_automation_controller_billing_data.py @@ -4,7 +4,7 @@ from django.core.management.base import BaseCommand -from metrics_utility.automation_controller_billing.collector import Collector +from metrics_utility.base.collector import Collector from metrics_utility.exceptions import ( BadShipTarget, NoAnalyticsCollected, diff --git a/metrics_utility/test/base/classes/analytics_collector.py b/metrics_utility/test/base/classes/analytics_collector.py index bacf4b33..31ffb35a 100644 --- a/metrics_utility/test/base/classes/analytics_collector.py +++ b/metrics_utility/test/base/classes/analytics_collector.py @@ -12,20 +12,8 @@ def db_connection(): def _package_class(): return Package - def _is_shipping_configured(self): - return False - - def _is_valid_license(self): - return True - - def _last_gathering(self): - return None - def _load_last_gathered_entries(self): return {} def _save_last_gathered_entries(self, last_gathered_entries): return None - - def _save_last_gather(self): - return None diff --git a/metrics_utility/test/base/classes/package.py b/metrics_utility/test/base/classes/package.py index b08f0808..1db3f743 100644 --- a/metrics_utility/test/base/classes/package.py +++ b/metrics_utility/test/base/classes/package.py @@ -2,24 +2,8 @@ class Package(InsightsAnalyticsPackage): - PAYLOAD_CONTENT_TYPE = 'application/vnd.redhat.test.test_payload+tgz' - MAX_DATA_SIZE = 1000 + MAX_DATA_SIZE = 1000 # bytes def _tarname_base(self): timestamp = self.collector.gather_until return f'test-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' - - def get_ingress_url(self): - return None - - def _get_rh_user(self): - return '' - - def _get_rh_password(self): - return '' - - def _get_x_rh_identity(self): - return '' - - def _get_http_request_headers(self): - return {} diff --git a/metrics_utility/test/base/functional/test_gathering.py b/metrics_utility/test/base/functional/test_gathering.py index a7283e5b..bc16e578 100644 --- a/metrics_utility/test/base/functional/test_gathering.py +++ b/metrics_utility/test/base/functional/test_gathering.py @@ -1,5 +1,4 @@ import json -import logging import tarfile from unittest.mock import patch @@ -20,7 +19,6 @@ def collector(mocker): collector_module=base.functional.collector_module, collection_type=AnalyticsCollector.DRY_RUN, ) - mocker.patch.object(collector, '_is_valid_license', return_value=True) return collector @@ -30,7 +28,7 @@ def test_missing_config(mocker, collector): tgz_files = collector.gather(subset=['json_collection_1', 'json_collection_2']) assert tgz_files is None - mock_logger.log.assert_called_with(logging.ERROR, "'config' collector data is missing") + mock_logger.error.assert_called_with("'config' collector data is missing") def test_json_collections(collector): @@ -47,7 +45,10 @@ def test_json_collections(collector): assert './json_collection_1.json' in files.keys() assert './json_collection_2.json' in files.keys() - assert json.loads(files['./config.json'].read()) == {'version': '1.0'} + assert json.loads(files['./config.json'].read()) == { + 'version': '1.0', + 'billing_provider_params': None, + } assert json.loads(files['./json_collection_1.json'].read()) == {'json1': 'True'} assert json.loads(files['./json_collection_2.json'].read()) == {'json2': 'True'} diff --git a/metrics_utility/test/base/functional/test_slicing.py b/metrics_utility/test/base/functional/test_slicing.py index 6b00e83b..f37e9e6f 100644 --- a/metrics_utility/test/base/functional/test_slicing.py +++ b/metrics_utility/test/base/functional/test_slicing.py @@ -21,7 +21,6 @@ def collector(mocker): collector_module=base.functional.collector_module4_slicing, collection_type=AnalyticsCollector.DRY_RUN, ) - mocker.patch.object(collector, '_is_valid_license', return_value=True) return collector diff --git a/metrics_utility/test/gather/test_gather_ranges.py b/metrics_utility/test/gather/test_gather_ranges.py index 0ca5f1a0..8e179a77 100644 --- a/metrics_utility/test/gather/test_gather_ranges.py +++ b/metrics_utility/test/gather/test_gather_ranges.py @@ -3,10 +3,11 @@ import tarfile from datetime import datetime +from unittest.mock import patch import pytest -from metrics_utility.test.util import run_gather_ext +from metrics_utility.test.util import run_gather_ext, run_gather_int env_vars = { @@ -95,3 +96,23 @@ def test_only_host_scope(cleanup_glob): print(tar.getnames()) # ensure main_host.csv is present # assert './main_host.csv' in tar.getnames() + + +@pytest.mark.filterwarnings('ignore::ResourceWarning') +def test_no_since(cleanup_glob, caplog): + result = run_gather_ext(env_vars, ['--ship', '--until=2024-01-04']) + validate_exists(file_glob) + + text = result.stderr + '\n' + result.stdout + assert 'Original since-until: None to 2024-01-04 00:00:00+00:00' in text + assert 'Final since-until: 2024-01-01 00:00:00+00:00 to 2024-01-04 00:00:00+00:00' in text + + # mock last gather to 2024-01-04 + with patch('metrics_utility.base.collector._last_gathered_entries') as mock: + mock.return_value = {'job_host_summary': '2024-01-04T00:00:00Z'} + run_gather_int(env_vars, {'ship': True, 'until': '2024-01-05'}) + + validate_exists(file_glob) + + assert 'Original since-until: None to 2024-01-05 00:00:00+00:00' in caplog.messages + assert 'Final since-until: 2024-01-04 00:00:00+00:00 to 2024-01-05 00:00:00+00:00' in caplog.messages diff --git a/metrics_utility/test/gather/test_jobhostsummary_gather.py b/metrics_utility/test/gather/test_jobhostsummary_gather.py index 920576e5..09e15213 100644 --- a/metrics_utility/test/gather/test_jobhostsummary_gather.py +++ b/metrics_utility/test/gather/test_jobhostsummary_gather.py @@ -366,14 +366,10 @@ def mock_collection_gather(self, path): ) # Check collection statuses - print('\nCollection statuses:') expected_collections = {'job_host_summary.csv', 'main_jobevent.csv', 'main_host.csv'} errors_found = [] for collection_name, status in collection_statuses.items(): - status_str = 'ok' if status else 'failed' - print(f' {collection_name}: {status_str}') - if not status: errors_found.append(f"Collection '{collection_name}' failed")