Skip to content

Commit 02f16cb

Browse files
RD-3862 - feat: support eventBridge (#158)
* feat: support eventBridge
1 parent 560af98 commit 02f16cb

File tree

4 files changed

+135
-1
lines changed

4 files changed

+135
-1
lines changed

src/lumigo_tracer/event/event_trigger.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ def parse_triggered_by(event: dict):
3737
return _parse_cw(event)
3838
elif _is_step_function(event):
3939
return _parse_step_function(event)
40+
elif _is_event_bridge(event):
41+
return _parse_event_bridge(event)
4042

4143
return _parse_unknown(event)
4244

@@ -118,6 +120,23 @@ def _parse_sns(event: dict):
118120
}
119121

120122

123+
def _is_event_bridge(event: dict):
124+
return (
125+
isinstance(event.get("version"), str)
126+
and isinstance(event.get("id"), str) # noqa: W503
127+
and isinstance(event.get("detail-type"), str) # noqa: W503
128+
and isinstance(event.get("source"), str) # noqa: W503
129+
and isinstance(event.get("time"), str) # noqa: W503
130+
and isinstance(event.get("region"), str) # noqa: W503
131+
and isinstance(event.get("resources"), list) # noqa: W503
132+
and isinstance(event.get("detail"), dict) # noqa: W503
133+
)
134+
135+
136+
def _parse_event_bridge(event: dict):
137+
return {"triggeredBy": "eventBridge", "messageId": event["id"]}
138+
139+
121140
def _is_supported_cw(event: dict):
122141
return event.get("detail-type") == "Scheduled Event" and "source" in event and "time" in event
123142

src/lumigo_tracer/wrappers/http/http_parser.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,40 @@ def parse_response(self, url: str, status_code: int, headers, body: bytes) -> di
236236
)
237237

238238

239+
class EventBridgeParser(Parser):
240+
def parse_request(self, parse_params: HttpRequest) -> dict:
241+
try:
242+
parsed_body = json.loads(parse_params.body)
243+
except json.JSONDecodeError as e:
244+
get_logger().exception(
245+
"Error while trying to parse eventBridge request body", exc_info=e
246+
)
247+
parsed_body = {}
248+
resource_names = set()
249+
if isinstance(parsed_body.get("Entries"), list):
250+
resource_names = {
251+
e["EventBusName"] for e in parsed_body["Entries"] if e.get("EventBusName")
252+
}
253+
return recursive_json_join(
254+
{"info": {"resourceNames": list(resource_names) or None}},
255+
super().parse_request(parse_params),
256+
)
257+
258+
def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict:
259+
try:
260+
parsed_body = json.loads(body)
261+
except json.JSONDecodeError as e:
262+
get_logger().debug("Error while trying to parse eventBridge request body", exc_info=e)
263+
parsed_body = {}
264+
message_ids = []
265+
if isinstance(parsed_body.get("Entries"), list):
266+
message_ids = [e["EventId"] for e in parsed_body["Entries"] if e.get("EventId")]
267+
return recursive_json_join(
268+
{"info": {"messageIds": message_ids}},
269+
super().parse_response(url, status_code, headers, body),
270+
)
271+
272+
239273
class ApiGatewayV2Parser(ServerlessAWSParser):
240274
# API-GW V1 covered by ServerlessAWSParser
241275

@@ -259,6 +293,8 @@ def get_parser(url: str, headers: Optional[dict] = None) -> Type[Parser]:
259293
return LambdaParser
260294
elif service == "kinesis":
261295
return KinesisParser
296+
elif service == "events":
297+
return EventBridgeParser
262298
elif safe_split_get(url, ".", 1) == "s3":
263299
return S3Parser
264300
# SQS Legacy Endpoints: https://docs.aws.amazon.com/general/latest/gr/rande.html

src/test/unit/event/test_event_trigger.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,19 @@
165165
},
166166
{"triggeredBy": "unknown"},
167167
),
168+
( # EventBridge - happy flow
169+
{
170+
"version": "0",
171+
"id": "f0f73aaa-e64f-a550-5be2-850898090583",
172+
"detail-type": "string",
173+
"source": "source_lambda",
174+
"time": "2020-10-19T13:34:29Z",
175+
"region": "us-west-2",
176+
"resources": [],
177+
"detail": {"a": 0.024995371455989845},
178+
},
179+
{"triggeredBy": "eventBridge", "messageId": "f0f73aaa-e64f-a550-5be2-850898090583"},
180+
),
168181
( # cloudwatch
169182
{
170183
"id": "cdc73f9d-aea9-11e3-9d5a-835b769c0d9c",

src/test/unit/wrappers/http/test_http_parser.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
get_parser,
1111
ApiGatewayV2Parser,
1212
DynamoParser,
13+
EventBridgeParser,
1314
)
1415

1516

@@ -24,7 +25,7 @@ def test_serverless_aws_parser_fallback_doesnt_change():
2425

2526

2627
def test_get_parser_check_headers():
27-
url = "api.rti.dev.toyota.com"
28+
url = "api.dev.com"
2829
headers = {"x-amzn-requestid": "1234"}
2930
assert get_parser(url, headers) == ServerlessAWSParser
3031

@@ -164,3 +165,68 @@ def test_double_response_size_limit_on_error_status_code():
164165
assert response_with_error["headers"] == json.dumps(d)
165166
assert len(response_with_error["body"]) > len(response_no_error["body"])
166167
assert response_with_error["body"] == json.dumps(d)
168+
169+
170+
def test_event_bridge_parser_request_happy_flow():
171+
parser = EventBridgeParser()
172+
params = HttpRequest(
173+
host="",
174+
method="POST",
175+
uri="",
176+
headers={},
177+
body=json.dumps(
178+
{
179+
"Entries": [
180+
{
181+
"Source": "source_lambda",
182+
"Resources": [],
183+
"DetailType": "string",
184+
"Detail": '{"a": 1}',
185+
"EventBusName": "name1",
186+
},
187+
{
188+
"Source": "source_lambda",
189+
"Resources": [],
190+
"DetailType": "string",
191+
"Detail": '{"a": 2}',
192+
"EventBusName": "name1",
193+
},
194+
{
195+
"Source": "source_lambda",
196+
"Resources": [],
197+
"DetailType": "string",
198+
"Detail": '{"a": 3}',
199+
"EventBusName": "name2",
200+
},
201+
]
202+
}
203+
),
204+
)
205+
response = parser.parse_request(params)
206+
assert set(response["info"]["resourceNames"]) == {"name2", "name1"}
207+
208+
209+
def test_event_bridge_parser_request_sad_flow():
210+
parser = EventBridgeParser()
211+
params = HttpRequest(host="", method="POST", uri="", headers={}, body="not a json")
212+
response = parser.parse_request(params)
213+
assert response["info"]["resourceNames"] is None
214+
215+
216+
def test_event_bridge_parser_response_happy_flow():
217+
parser = EventBridgeParser()
218+
response = parser.parse_response(
219+
"",
220+
200,
221+
{},
222+
body=json.dumps(
223+
{"Entries": [{"EventId": "1-2-3-4"}, {"EventId": "6-7-8-9"}], "FailedEntryCount": 0}
224+
).encode(),
225+
)
226+
assert response["info"]["messageIds"] == ["1-2-3-4", "6-7-8-9"]
227+
228+
229+
def test_event_bridge_parser_response_sad_flow():
230+
parser = EventBridgeParser()
231+
response = parser.parse_response("", 200, {}, body=b"not a json")
232+
assert not response["info"]["messageIds"]

0 commit comments

Comments
 (0)