Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 0 additions & 88 deletions metrics_utility/automation_controller_billing/base/s3_handler.py

This file was deleted.

26 changes: 11 additions & 15 deletions metrics_utility/automation_controller_billing/extract/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def load_config(self, file_path):
with open(file_path) as f:
return json.loads(f.read())
except FileNotFoundError:
logger.warning(f'{self.LOG_PREFIX} missing required file under path: {file_path} and date: {self.date}')
logger.warning(f'{self.LOG_PREFIX} missing required file under path {file_path}')

def process_tarballs(self, path, temp_dir, enabled_set=None):
_safe_extract(path, temp_dir, enabled_set=enabled_set)
Expand Down Expand Up @@ -115,16 +115,6 @@ def csv_enabled(self, name):

return name in self.enabled_set

def get_path_prefix(self, date):
"""Return the data/Y/m/d path"""
ship_path = self.extra_params['ship_path']

year = date.strftime('%Y')
month = date.strftime('%m')
day = date.strftime('%d')

return f'{ship_path}/data/{year}/{month}/{day}'

def sheet_enabled(self, sheets_required):
"""
Checks if any sheets_required item is in METRICS_UTILITY_OPTIONAL_CCSP_REPORT_SHEETS
Expand All @@ -143,17 +133,23 @@ def filter_tarball_paths(self, paths, collections):
if collections is None:
return paths

# these are in *every* tarball, thus not a valid thing to filter by
if 'data_collection_status' in collections:
raise MetricsException('data_collection_status is not a valid tarball name filter')

if 'config' in collections:
raise MetricsException('config is not a valid tarball name filter')

def match(s):
# include all files produced by 0.6.0 and lower, and anything with an unexpected name
if re.search(r'-\d+.tar.gz$', s):
# require at least .tar.gz
if not re.search(r'\.tar\.gz$', s):
return False

# include all files produced by 0.6.0 and lower
if re.search(r'-\d+\.tar\.gz$', s):
return True
if re.search(r'-\d+-\w+.tar.gz$', s) is None:

# include anything with an unexpected name
if not re.search(r'-\d+-\w+\.tar\.gz$', s):
return True

# should not happen, but make sure we're not ignoring data if it does
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
import os
import tempfile

from metrics_utility.automation_controller_billing.extract.base import Base
from metrics_utility.library.storage import StorageDirectory
from metrics_utility.logger import logger


class ExtractorDirectory(Base):
LOG_PREFIX = '[ExtractorDirectory]'

def __init__(self, extra_params):
super().__init__(extra_params)

self.storage = StorageDirectory(base_path=extra_params.get('ship_path'))

def iter_batches(self, date, collections, optional):
# Read tarball in memory in batches
logger.debug(f'{self.LOG_PREFIX} Processing {date}')
paths = self.fetch_partition_paths(date, collections)

for path in paths:
if not path.endswith('.tar.gz'):
continue

with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
try:
yield self.process_tarballs(path, temp_dir, enabled_set=(collections or []) + (optional or []))
enabled_set = (collections or []) + (optional or [])

except Exception as e:
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {path} failed with {e}')
for path in self._fetch_partition_paths(date, collections):
try:
with self.storage.get(path) as local_path:
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
yield self.process_tarballs(local_path, temp_dir, enabled_set=enabled_set)
except Exception as e:
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {path} failed with {e}')

def fetch_partition_paths(self, date, collections):
prefix = self.get_path_prefix(date)
def _fetch_partition_paths(self, date, collections):
year = date.strftime('%Y')
month = date.strftime('%m')
day = date.strftime('%d')

try:
paths = [os.path.join(prefix, f) for f in os.listdir(prefix) if os.path.isfile(os.path.join(prefix, f))]
except FileNotFoundError:
paths = []
# relative to storage base_path (=ship_path)
prefix = f'data/{year}/{month}/{day}'

paths = self.storage.list_files(prefix)
return self.filter_tarball_paths(paths, collections)
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
import tempfile

from metrics_utility.automation_controller_billing.base.s3_handler import S3Handler
from metrics_utility.automation_controller_billing.extract.base import Base
from metrics_utility.library.storage import StorageS3
from metrics_utility.logger import logger


Expand All @@ -12,27 +11,34 @@ class ExtractorS3(Base):
def __init__(self, extra_params):
super().__init__(extra_params)

self.s3_handler = S3Handler(params=self.extra_params)
self.storage = StorageS3(
bucket=extra_params.get('bucket_name'),
endpoint=extra_params.get('bucket_endpoint'),
region=extra_params.get('bucket_region'),
access_key=extra_params.get('bucket_access_key'),
secret_key=extra_params.get('bucket_secret_key'),
)

def iter_batches(self, date, collections, optional):
# Read tarball in memory in batches
logger.debug(f'{self.LOG_PREFIX} Processing {date}')
s3_paths = self.fetch_partition_paths(date, collections)
enabled_set = (collections or []) + (optional or [])

for s3_path in s3_paths:
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
try:
local_path = os.path.join(temp_dir, 'source_tarball')
self.s3_handler.download_file(s3_path, local_path)
for s3_path in self._fetch_partition_paths(date, collections):
try:
with self.storage.get(s3_path) as local_path:
with tempfile.TemporaryDirectory(prefix='automation_controller_billing_data_') as temp_dir:
yield self.process_tarballs(local_path, temp_dir, enabled_set=enabled_set)
except Exception as e:
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {s3_path} failed with {e}')

yield self.process_tarballs(local_path, temp_dir, enabled_set=(collections or []) + (optional or []))
def _fetch_partition_paths(self, date, collections):
year = date.strftime('%Y')
month = date.strftime('%m')
day = date.strftime('%d')

except Exception as e:
logger.exception(f'{self.LOG_PREFIX} ERROR: Extracting {s3_path} failed with {e}')

def fetch_partition_paths(self, date, collections):
# FIXME: apply collections= filtering, so we don't download files from S3 if we know they don't have the right thing
prefix = self.get_path_prefix(date)
paths = self.s3_handler.list_files(prefix)
ship_path = self.extra_params.get('ship_path')
prefix = f'{ship_path}/data/{year}/{month}/{day}'

paths = self.storage.list_files(prefix)
return self.filter_tarball_paths(paths, collections)
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import shutil

from django.conf import settings

import metrics_utility.base as base

from metrics_utility.library.storage import StorageDirectory
from metrics_utility.logger import logger


Expand Down Expand Up @@ -58,8 +58,11 @@ def ship(self):
since, _ = self._batch_since_and_until()
destination_path = self._destination_path(self.collector.billing_provider_params['ship_path'], since, os.path.basename(self.tar_path))

os.makedirs(os.path.dirname(destination_path), exist_ok=True)
shutil.copyfile(self.tar_path, destination_path)
params = self.collector.billing_provider_params
storage = StorageDirectory(
base_path=params.get('ship_path'),
)
storage.put(destination_path, filename=self.tar_path)

logger.debug(f'tarball saved to: {destination_path}')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import metrics_utility.base as base

from metrics_utility.automation_controller_billing.base.s3_handler import S3Handler
from metrics_utility.library.storage import StorageS3
from metrics_utility.logger import logger


Expand Down Expand Up @@ -58,8 +58,15 @@ def ship(self):
since, _ = self._batch_since_and_until()
destination_path = self._destination_path(self.collector.billing_provider_params['ship_path'], since, os.path.basename(self.tar_path))

s3_handler = S3Handler(params=self.collector.billing_provider_params)
s3_handler.upload_file(self.tar_path, object_name=destination_path)
params = self.collector.billing_provider_params
storage = StorageS3(
bucket=params.get('bucket_name'),
endpoint=params.get('bucket_endpoint'),
region=params.get('bucket_region'),
access_key=params.get('bucket_access_key'),
secret_key=params.get('bucket_secret_key'),
)
storage.put(destination_path, filename=self.tar_path)

logger.debug(f'tarball saved to: {destination_path}')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os


# Not wrapping StorageDirectory just because .. why, but:
# FIXME: replace ReportSaver directly with storage


class ReportSaverDirectory:
def __init__(self, extra_params):
self.extra_params = extra_params
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import tempfile

from metrics_utility.automation_controller_billing.base.s3_handler import S3Handler
from metrics_utility.library.storage import StorageS3
from metrics_utility.logger import logger


Expand All @@ -11,22 +11,28 @@ class ReportSaverS3:
def __init__(self, extra_params):
self.extra_params = extra_params

# FIXME: remove once build_report no longer uses it
self.report_spreadsheet_destination_path = self.extra_params['report_spreadsheet_destination_path']

self.s3_handler = S3Handler(params=self.extra_params)
self.dest_path = extra_params['report_spreadsheet_destination_path']
self.storage = StorageS3(
bucket=extra_params.get('bucket_name'),
endpoint=extra_params.get('bucket_endpoint'),
region=extra_params.get('bucket_region'),
access_key=extra_params.get('bucket_access_key'),
secret_key=extra_params.get('bucket_secret_key'),
)

def report_exist(self):
return len([file for file in self.s3_handler.list_files(self.report_spreadsheet_destination_path)]) > 0
return self.storage.exists(self.dest_path)

def save(self, report_spreadsheet):
with tempfile.TemporaryDirectory(prefix='report_saver_billing_data_') as temp_dir:
try:
local_report_path = os.path.join(temp_dir, 'report')
report_spreadsheet.save(local_report_path)

self.s3_handler.upload_file(local_report_path, self.report_spreadsheet_destination_path)

self.storage.put(self.dest_path, filename=local_report_path)
except Exception as e:
logger.exception(f'{self.LOG_PREFIX} ERROR: Saving report to S3 into path {self.report_spreadsheet_destination_path} failed with {e}')
logger.exception(f'{self.LOG_PREFIX} ERROR: Saving report to S3 into path {self.dest_path} failed with {e}')

logger.info(f'Report sent into S3 bucket into path: {self.report_spreadsheet_destination_path}')
logger.info(f'Report sent into S3 bucket into path: {self.dest_path}')
2 changes: 1 addition & 1 deletion metrics_utility/management/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def handle_s3_ship_target():
if missing:
raise MissingRequiredEnvVar(f'Missing some required env variables for S3 configuration, namely: {", ".join(missing)}.')

# S3Handler params
# S3Handler params - passed to StorageS3 as bucket, endpoint, region, access_key, secret_key (no ship_path)
return {
'ship_path': ship_path,
'bucket_name': bucket_name,
Expand Down
Loading