Skip to content

Commit 45dd976

Browse files
saartochner-lumigoCircleCI
andauthored
RD-9485 - recursive parse triggeredBy (#265)
* recursive parse triggeredBy Co-authored-by: CircleCI <[email protected]>
1 parent 7435f93 commit 45dd976

22 files changed

+992
-486
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ repos:
1414
entry: venv/bin/mypy
1515
args: ['--show-error-codes', '--check-untyped-defs', '--warn-redundant-casts', '--strict-equality', '--warn-unused-ignores', '--warn-return-any', '--disallow-untyped-defs', '--disallow-incomplete-defs', '--disallow-any-generics', '--ignore-missing-imports']
1616

17-
- repo: https://gitlab.com/pycqa/flake8
18-
rev: 3.8.1
17+
- repo: https://github.com/pre-commit/pre-commit-hooks
18+
rev: v2.4.0
1919
hooks:
2020
- id: flake8
2121
args: ["--ignore","E501,W503","--exclude","src/lumigo_tracer/libs/*"]

.secrets.baseline

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,14 @@
168168
"filename": "src/test/unit/event/test_event_trigger.py",
169169
"hashed_secret": "885bb9903f72e004ff2974807b70e7c970d3e6d5",
170170
"is_verified": false,
171-
"line_number": 476
171+
"line_number": 567
172172
},
173173
{
174174
"type": "Hex High Entropy String",
175175
"filename": "src/test/unit/event/test_event_trigger.py",
176176
"hashed_secret": "3fae06dc55a618caed1d794dcd512bfe7e76c9f1",
177177
"is_verified": false,
178-
"line_number": 510
178+
"line_number": 612
179179
}
180180
],
181181
"src/test/unit/test_lumigo_utils.py": [
@@ -220,5 +220,5 @@
220220
}
221221
]
222222
},
223-
"generated_at": "2022-10-12T08:37:22Z"
223+
"generated_at": "2022-11-08T16:26:37Z"
224224
}
Lines changed: 39 additions & 311 deletions
Original file line numberDiff line numberDiff line change
@@ -1,311 +1,39 @@
1-
import re
2-
from typing import Union, List, Dict
3-
4-
from lumigo_tracer.parsing_utils import recursive_get_key, safe_get, safe_split_get
5-
from lumigo_tracer.lumigo_utils import (
6-
lumigo_safe_execute,
7-
Configuration,
8-
STEP_FUNCTION_UID_KEY,
9-
LUMIGO_EVENT_KEY,
10-
md5hash,
11-
)
12-
13-
TRIGGER_CREATION_TIME_KEY = "approxEventCreationTime"
14-
MESSAGE_ID_KEY = "messageId"
15-
MESSAGE_IDS_KEY = "messageIds"
16-
TOTAL_SIZE_BYTES = "totalSizeBytes"
17-
RECORDS_NUM = "recordsNum"
18-
MESSAGE_ID_TO_CHAINED_RESOURCE = "messageIdToChainResource"
19-
20-
21-
def parse_triggered_by(event: dict): # type: ignore[no-untyped-def,type-arg]
22-
"""
23-
This function parses the event and build the dictionary that describes the given event.
24-
25-
The current possible values are:
26-
* {triggeredBy: unknown}
27-
* {triggeredBy: apigw, api: <host>, resource: <>, httpMethod: <>, stage: <>, identity: <>, referer: <>}
28-
"""
29-
with lumigo_safe_execute("triggered by"):
30-
if not isinstance(event, dict):
31-
if _is_step_function(event):
32-
return _parse_step_function(event)
33-
return None
34-
if _is_supported_http_method(event):
35-
return _parse_http_method(event)
36-
if _is_load_balancer_method(event):
37-
return _parse_load_balancer_method(event)
38-
elif _is_supported_sns(event):
39-
return _parse_sns(event)
40-
elif _is_supported_streams(event):
41-
return _parse_streams(event)
42-
elif _is_supported_cw(event):
43-
return _parse_cw(event)
44-
elif _is_step_function(event):
45-
return _parse_step_function(event)
46-
elif _is_event_bridge(event):
47-
return _parse_event_bridge(event)
48-
elif _is_appsync(event):
49-
return _parse_appsync(event)
50-
51-
return _parse_unknown(event)
52-
53-
54-
def _parse_unknown(event: dict): # type: ignore[no-untyped-def,type-arg]
55-
result = {"triggeredBy": "unknown"}
56-
return result
57-
58-
59-
def _is_step_function(event: Union[List, Dict]): # type: ignore[no-untyped-def,type-arg,type-arg]
60-
return (
61-
Configuration.is_step_function
62-
and isinstance(event, (list, dict)) # noqa
63-
and STEP_FUNCTION_UID_KEY in recursive_get_key(event, LUMIGO_EVENT_KEY, default={}) # noqa
64-
)
65-
66-
67-
def _parse_step_function(event: dict): # type: ignore[no-untyped-def,type-arg]
68-
result = {
69-
"triggeredBy": "stepFunction",
70-
"messageId": recursive_get_key(event, LUMIGO_EVENT_KEY)[STEP_FUNCTION_UID_KEY],
71-
}
72-
return result
73-
74-
75-
def _is_supported_http_method(event: dict): # type: ignore[no-untyped-def,type-arg]
76-
return (
77-
"httpMethod" in event # noqa
78-
and "headers" in event # noqa
79-
and "requestContext" in event # noqa
80-
and event.get("requestContext", {}).get("elb") is None # noqa
81-
and event.get("requestContext", {}).get("stage") is not None # noqa
82-
) or ( # noqa
83-
event.get("version", "") == "2.0" and "headers" in event # noqa
84-
) # noqa
85-
86-
87-
def _is_load_balancer_method(event: dict): # type: ignore[no-untyped-def,type-arg]
88-
return (
89-
"httpMethod" in event # noqa
90-
and "headers" in event # noqa
91-
and event["headers"].get("host") # noqa
92-
and "requestContext" in event # noqa
93-
and ( # noqa
94-
event.get("requestContext", {}).get("elb") is not None # noqa
95-
or event.get("requestContext", {}).get("alb") is not None # noqa
96-
) # noqa
97-
)
98-
99-
100-
def _parse_http_method(event: dict): # type: ignore[no-untyped-def,type-arg]
101-
version = event.get("version")
102-
if version and version.startswith("2.0"):
103-
return _parse_http_method_v2(event)
104-
return _parse_http_method_v1(event)
105-
106-
107-
def _parse_load_balancer_method(event: dict): # type: ignore[no-untyped-def,type-arg]
108-
result = {
109-
"triggeredBy": "load_balancer",
110-
"httpMethod": event.get("httpMethod", ""),
111-
}
112-
if isinstance(event.get("headers"), dict):
113-
result["api"] = event["headers"].get("host")
114-
return result
115-
116-
117-
def _parse_http_method_v1(event: dict): # type: ignore[no-untyped-def,type-arg]
118-
result = {
119-
"triggeredBy": "apigw",
120-
"httpMethod": event.get("httpMethod", ""),
121-
"resource": event.get("resource", ""),
122-
"messageId": event.get("requestContext", {}).get("requestId", ""),
123-
}
124-
if isinstance(event.get("headers"), dict):
125-
result["api"] = event["headers"].get("Host", "unknown.unknown.unknown")
126-
if isinstance(event.get("requestContext"), dict):
127-
result["stage"] = event["requestContext"].get("stage", "unknown")
128-
return result
129-
130-
131-
def _parse_http_method_v2(event: dict): # type: ignore[no-untyped-def,type-arg]
132-
result = {
133-
"triggeredBy": "apigw",
134-
"httpMethod": event.get("requestContext", {}).get("http", {}).get("method"),
135-
"resource": event.get("requestContext", {}).get("http", {}).get("path"),
136-
"messageId": event.get("requestContext", {}).get("requestId", ""),
137-
"api": event.get("requestContext", {}).get("domainName", ""),
138-
"stage": event.get("requestContext", {}).get("stage", "unknown"),
139-
}
140-
return result
141-
142-
143-
def _is_supported_sns(event: dict): # type: ignore[no-untyped-def,type-arg]
144-
return event.get("Records", [{}])[0].get("EventSource") == "aws:sns"
145-
146-
147-
def _parse_sns(event: dict): # type: ignore[no-untyped-def,type-arg]
148-
return {
149-
"triggeredBy": "sns",
150-
"arn": event["Records"][0]["Sns"]["TopicArn"],
151-
"messageId": event["Records"][0]["Sns"].get("MessageId"),
152-
RECORDS_NUM: len(event["Records"]),
153-
}
154-
155-
156-
def _is_event_bridge(event: dict): # type: ignore[no-untyped-def,type-arg]
157-
return (
158-
isinstance(event.get("version"), str)
159-
and isinstance(event.get("id"), str) # noqa: W503
160-
and isinstance(event.get("detail-type"), str) # noqa: W503
161-
and isinstance(event.get("source"), str) # noqa: W503
162-
and isinstance(event.get("time"), str) # noqa: W503
163-
and isinstance(event.get("region"), str) # noqa: W503
164-
and isinstance(event.get("resources"), list) # noqa: W503
165-
and isinstance(event.get("detail"), dict) # noqa: W503
166-
)
167-
168-
169-
def _is_appsync(event: dict) -> bool: # type: ignore[type-arg]
170-
host = safe_get(event, ["context", "request", "headers", "host"])
171-
if not host:
172-
host = safe_get(event, ["request", "headers", "host"])
173-
return isinstance(host, str) and "appsync-api" in host
174-
175-
176-
def _parse_event_bridge(event: dict): # type: ignore[no-untyped-def,type-arg]
177-
return {"triggeredBy": "eventBridge", "messageId": event["id"]}
178-
179-
180-
def _parse_appsync(event: dict) -> dict: # type: ignore[type-arg]
181-
headers = safe_get(event, ["context", "request", "headers"])
182-
if not headers:
183-
headers = safe_get(event, ["request", "headers"])
184-
host = headers.get("host")
185-
trace_id = headers.get("x-amzn-trace-id")
186-
message_id = safe_split_get(trace_id, "=", -1)
187-
return {"triggeredBy": "appsync", "api": host, "messageId": message_id}
188-
189-
190-
def _is_supported_cw(event: dict): # type: ignore[no-untyped-def,type-arg]
191-
return event.get("detail-type") == "Scheduled Event" and "source" in event and "time" in event
192-
193-
194-
def _parse_cw(event: dict): # type: ignore[no-untyped-def,type-arg]
195-
resource = event.get("resources", ["/unknown"])[0].split("/")[1]
196-
return {
197-
"triggeredBy": "cloudwatch",
198-
"resource": resource,
199-
"region": event.get("region"),
200-
"detailType": event.get("detail-type"),
201-
}
202-
203-
204-
def _is_supported_streams(event: dict): # type: ignore[no-untyped-def,type-arg]
205-
return event.get("Records", [{}])[0].get("eventSource") in [
206-
"aws:kinesis",
207-
"aws:dynamodb",
208-
"aws:sqs",
209-
"aws:s3",
210-
]
211-
212-
213-
def _parse_streams(event: dict) -> Dict[str, str]: # type: ignore[type-arg]
214-
"""
215-
:return: {"triggeredBy": str, "arn": str}
216-
If has messageId, return also: {"messageId": str}
217-
"""
218-
triggered_by = event["Records"][0]["eventSource"].split(":")[1]
219-
result = {"triggeredBy": triggered_by, RECORDS_NUM: len(event["Records"])}
220-
if triggered_by == "s3":
221-
result["arn"] = event["Records"][0]["s3"]["bucket"]["arn"]
222-
result["messageId"] = (
223-
event["Records"][0].get("responseElements", {}).get("x-amz-request-id")
224-
)
225-
else:
226-
result["arn"] = event["Records"][0]["eventSourceARN"]
227-
if triggered_by == "sqs":
228-
result.update(_parse_sqs_event(event))
229-
elif triggered_by == "kinesis":
230-
result.update(_parse_kinesis_event(event))
231-
elif triggered_by == "dynamodb":
232-
result.update(_parse_dynamomdb_event(event))
233-
return result
234-
235-
236-
def _get_ddb_approx_creation_time_ms(event) -> int: # type: ignore[no-untyped-def]
237-
return event["Records"][0].get("dynamodb", {}).get("ApproximateCreationDateTime", 0) * 1000 # type: ignore[no-any-return]
238-
239-
240-
def _parse_dynamomdb_event(event) -> Dict[str, Union[int, List[str]]]: # type: ignore[no-untyped-def]
241-
creation_time = _get_ddb_approx_creation_time_ms(event)
242-
mids = []
243-
total_size_bytes: int = 0
244-
for record in event["Records"]:
245-
total_size_bytes += record["dynamodb"].get("SizeBytes", 0)
246-
event_name = record.get("eventName")
247-
if event_name in ("MODIFY", "REMOVE") and record.get("dynamodb", {}).get("Keys"):
248-
mids.append(md5hash(record["dynamodb"]["Keys"]))
249-
elif event_name == "INSERT" and record.get("dynamodb", {}).get("NewImage"):
250-
mids.append(md5hash(record["dynamodb"]["NewImage"]))
251-
return {
252-
MESSAGE_IDS_KEY: mids,
253-
TRIGGER_CREATION_TIME_KEY: creation_time,
254-
TOTAL_SIZE_BYTES: total_size_bytes,
255-
}
256-
257-
258-
def _parse_kinesis_event(event) -> Dict[str, Union[int, str, List[str], List[Dict[str, str]]]]: # type: ignore[no-untyped-def]
259-
result = {}
260-
message_ids = []
261-
records = safe_get(event, ["Records"], default=[])
262-
for record in records:
263-
message_id = safe_get(record, ["kinesis", "sequenceNumber"])
264-
if message_id:
265-
message_ids.append(message_id)
266-
if message_ids:
267-
if len(message_ids) == 1:
268-
result[MESSAGE_ID_KEY] = message_ids[0]
269-
else:
270-
result[MESSAGE_IDS_KEY] = message_ids
271-
event_id = safe_get(event, ["Records", 0, "eventID"])
272-
if isinstance(event_id, str):
273-
result["shardId"] = event_id.split(":", 1)[0]
274-
return result
275-
276-
277-
def _parse_sqs_event(event) -> Dict[str, Union[int, str, List[str], List[Dict[str, str]]]]: # type: ignore[no-untyped-def]
278-
message_ids = []
279-
chained_resources: List[Dict[str, str]] = []
280-
for record in event.get("Records", []):
281-
record_message_id = record.get("messageId")
282-
if not record_message_id:
283-
continue
284-
message_ids.append(record_message_id)
285-
if _is_sns_inside_sqs_record(record):
286-
body = record.get("body", "")
287-
message_id = re.search(r'"MessageId" : "(\w{8}-\w{4}-\w{4}-\w{4}-\w{12})"', body)
288-
topic_arn = re.search(r'"TopicArn" : "(arn:aws:sns:[\w\-:]+)"', body)
289-
if message_id and topic_arn:
290-
message_ids.append(message_id.group(1))
291-
chained_resources.append(
292-
{
293-
"resourceType": "sns",
294-
"TopicArn": topic_arn.group(1),
295-
"childMessageId": record_message_id,
296-
"parentMessageId": message_id.group(1),
297-
}
298-
)
299-
result: Dict[str, Union[int, str, List[str], List[Dict[str, str]]]] = {}
300-
if len(message_ids) > 1:
301-
result[MESSAGE_IDS_KEY] = message_ids
302-
else:
303-
result[MESSAGE_ID_KEY] = message_ids[0]
304-
if chained_resources:
305-
result[MESSAGE_ID_TO_CHAINED_RESOURCE] = chained_resources
306-
return result
307-
308-
309-
def _is_sns_inside_sqs_record(record: dict): # type: ignore[no-untyped-def,type-arg]
310-
body = record.get("body")
311-
return isinstance(body, str) and "SimpleNotificationService" in body and "TopicArn" in body
1+
import json
2+
from typing import List, Optional, Dict, Any
3+
4+
from lumigo_tracer.event.trigger_parsing import EVENT_TRIGGER_PARSERS, INNER_MESSAGES_MAGIC_PATTERN
5+
from lumigo_tracer.event.trigger_parsing.event_trigger_base import TriggerType
6+
from lumigo_tracer.lumigo_utils import Configuration, get_logger
7+
8+
9+
def _recursive_parse_trigger_by(
10+
message: Dict[Any, Any], parent_id: Optional[str], level: int
11+
) -> List[TriggerType]:
12+
triggers = []
13+
if level >= Configuration.chained_services_max_depth:
14+
get_logger().info("Chained services parsing has stopped due to depth")
15+
return []
16+
for parser in EVENT_TRIGGER_PARSERS:
17+
if parser.should_handle(message):
18+
new_trigger = parser.handle(event=message, target_id=parent_id)
19+
triggers.append(new_trigger)
20+
current_trigger_id: str = new_trigger["id"] # type: ignore
21+
22+
inner_messages = parser.extract_inner(event=message)
23+
if len(inner_messages) >= Configuration.chained_services_max_width:
24+
get_logger().info("Chained services parsing has stopped due to width")
25+
inner_messages = inner_messages[: Configuration.chained_services_max_width]
26+
for sub_message in inner_messages:
27+
if INNER_MESSAGES_MAGIC_PATTERN.search(sub_message):
28+
# We want to load only relevant messages, so first run a quick scan
29+
triggers.extend(
30+
_recursive_parse_trigger_by(
31+
json.loads(sub_message), parent_id=current_trigger_id, level=level + 1
32+
)
33+
)
34+
break
35+
return triggers
36+
37+
38+
def parse_triggers(event: Dict[Any, Any]) -> List[Dict[Any, Any]]:
39+
return _recursive_parse_trigger_by(event, parent_id=None, level=0)

0 commit comments

Comments
 (0)