Skip to content

Commit

Permalink
Merge pull request #103 from dynatrace-oss/PCLOUDS-3547_add_support_f…
Browse files Browse the repository at this point in the history
…or_multiple_filters_with_pipe

PCLOUDS-3547 Rebased from fork
  • Loading branch information
NematulloKozimov authored Mar 27, 2024
2 parents c3bdcda + d994d6e commit 7efa443
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
50 changes: 45 additions & 5 deletions logs_ingest/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,19 @@ def _prepare_filters_dict(self) -> Dict:
filters_to_apply.append(log_level_filter)
parsed_filters_to_log.append(filter_name)
if "contains_pattern" in filter_name:
contains_pattern_filter = self._create_contains_pattern_filter(filter_value)
filters_to_apply.append(contains_pattern_filter)
parsed_filters_to_log.append(filter_name)
if isinstance(filter_value, list):
for filter_pattern in filter_value:
contains_pattern_filter = (
self._create_contains_pattern_filter(filter_pattern)
)
filters_to_apply.append(contains_pattern_filter)
parsed_filters_to_log.append(filter_name)
if not isinstance(filter_value, list):
contains_pattern_filter = self._create_contains_pattern_filter(
filter_value
)
filters_to_apply.append(contains_pattern_filter)
parsed_filters_to_log.append(filter_name)
if filters_to_apply:
filters_to_apply_dict[k] = filters_to_apply
logging.info(f"Successfully parsed filters: {parsed_filters_to_log}")
Expand All @@ -76,7 +86,21 @@ def _prepare_filters_dict(self) -> Dict:
def _group_filters(self) -> Dict:
filters_dict = {}
for key, filter_name_value in self._filters_tuples:
filters_dict.setdefault(key, {}).update({filter_name_value[0]: filter_name_value[1]})
if "|" in filter_name_value[1]:
filter_patterns = filter_name_value[1].split("|")
if filter_patterns:
filter_patterns = [
f"{pattern.strip()}" for pattern in filter_patterns
]
filters_dict.setdefault(key, {}).update(
{filter_name_value[0]: filter_patterns}
)
continue

filters_dict.setdefault(key, {}).update(
{filter_name_value[0]: filter_name_value[1]}
)

return filters_dict

@staticmethod
Expand All @@ -97,7 +121,23 @@ def should_filter_out_record(self, parsed_record: Dict) -> bool:
content = parsed_record.get("content", "")

log_filters = self._get_filters(resource_id, resource_type)
return not all(log_filter(severity, content) for log_filter in log_filters)

filter_patterns = []

for log_filter in log_filters:
if 'contains_pattern' in str(log_filter):
filter_patterns.append(log_filter)

pipe_separated_filters_result = True

if len(filter_patterns) > 1:
log_filters = set(log_filters) - set(filter_patterns)
pipe_separated_filters_result = any(log_filter(severity, str(content)) for log_filter in filter_patterns)

return (not all(log_filter(severity, str(content)) for log_filter in log_filters)
or not pipe_separated_filters_result)



def _get_filters(self, resource_id, resource_type):
filters = self.filters_dict.get(resource_id, [])
Expand Down
7 changes: 4 additions & 3 deletions logs_ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ def parse_record(record: Dict, self_monitoring: SelfMonitoring):
if "resourceId" in record:
extract_resource_id_attributes(parsed_record, record["resourceId"])

if log_filter.should_filter_out_record(parsed_record):
return None

metadata_engine.apply(record, parsed_record)
convert_date_format(parsed_record)
category = record.get("category", "").lower()
Expand All @@ -173,6 +170,10 @@ def parse_record(record: Dict, self_monitoring: SelfMonitoring):
parsed_record[attribute_key] = string_attribute_value[: attribute_value_length_limit]

content = parsed_record.get("content", None)

if log_filter.should_filter_out_record(parsed_record):
return None

if content:
if not isinstance(content, str):
parsed_record["content"] = json.dumps(parsed_record["content"])
Expand Down

0 comments on commit 7efa443

Please sign in to comment.