Skip to content

Commit 8370ffb

Browse files
authored
Support China region (#164)
1 parent 3adec93 commit 8370ffb

File tree

4 files changed

+199
-20
lines changed

4 files changed

+199
-20
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ requests==2.24.0
99
pymongo==3.11.0
1010
redispy==3.0.0
1111
sqlalchemy==1.3.20
12+
moto==1.3.16

src/lumigo_tracer/lumigo_utils.py

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@
99
import time
1010
import http.client
1111
from collections import OrderedDict
12+
import random
1213
from typing import Union, List, Optional, Dict, Any, Tuple, Pattern
1314
from contextlib import contextmanager
1415
from base64 import b64encode
1516
import inspect
1617

18+
try:
19+
import boto3
20+
except Exception:
21+
boto3 = None
22+
1723
EXECUTION_TAGS_KEY = "lumigo_execution_tags_no_scrub"
1824
EDGE_HOST = "{region}.lumigo-tracer-edge.golumigo.com"
1925
EDGE_PATH = "/api/spans"
@@ -56,6 +62,8 @@
5662
LUMIGO_TOKEN_KEY = "LUMIGO_TRACER_TOKEN"
5763
KILL_SWITCH = "LUMIGO_SWITCH_OFF"
5864
ERROR_SIZE_LIMIT_MULTIPLIER = 2
65+
CHINA_REGION = "cn-northwest-1"
66+
EDGE_KINESIS_STREAM_NAME = "prod_trc-inges-edge_edge-kinesis-stream"
5967

6068
_logger: Dict[str, logging.Logger] = {}
6169

@@ -84,6 +92,9 @@ class Configuration:
8492
domains_scrubber: Optional[List] = None
8593
max_entry_size: int = DEFAULT_MAX_ENTRY_SIZE
8694
get_key_depth: int = DEFAULT_KEY_DEPTH
95+
edge_kinesis_stream_name: str = EDGE_KINESIS_STREAM_NAME
96+
edge_kinesis_aws_access_key_id: Optional[str] = None
97+
edge_kinesis_aws_secret_access_key: Optional[str] = None
8798

8899
@staticmethod
89100
def get_max_entry_size(has_error: bool = False) -> int:
@@ -104,6 +115,9 @@ def config(
104115
domains_scrubber: Optional[List[str]] = None,
105116
max_entry_size: int = DEFAULT_MAX_ENTRY_SIZE,
106117
get_key_depth: int = None,
118+
edge_kinesis_stream_name: Optional[str] = None,
119+
edge_kinesis_aws_access_key_id: Optional[str] = None,
120+
edge_kinesis_aws_secret_access_key: Optional[str] = None,
107121
) -> None:
108122
"""
109123
This function configure the lumigo wrapper.
@@ -120,6 +134,9 @@ def config(
120134
:param domains_scrubber: List of regexes. We will not collect data of requests with hosts that match it.
121135
:param max_entry_size: The maximum size of each entry when sending back the events.
122136
:param get_key_depth: Max depth to search the lumigo key in the event (relevant to step functions). default 4.
137+
:param edge_kinesis_stream_name: The name of the Kinesis to push the spans in China region
138+
:param edge_kinesis_aws_access_key_id: The credentials to push to the Kinesis in China region
139+
:param edge_kinesis_aws_secret_access_key: The credentials to push to the Kinesis in China region
123140
"""
124141
if should_report is not None:
125142
Configuration.should_report = should_report
@@ -162,6 +179,18 @@ def config(
162179
domains_scrubber_regex = DOMAIN_SCRUBBER_REGEXES
163180
Configuration.domains_scrubber = [re.compile(r, re.IGNORECASE) for r in domains_scrubber_regex]
164181
Configuration.max_entry_size = int(os.environ.get("LUMIGO_MAX_ENTRY_SIZE", max_entry_size))
182+
Configuration.edge_kinesis_stream_name = (
183+
edge_kinesis_stream_name
184+
or os.environ.get("LUMIGO_EDGE_KINESIS_STREAM_NAME") # noqa`
185+
or EDGE_KINESIS_STREAM_NAME # noqa
186+
)
187+
Configuration.edge_kinesis_aws_access_key_id = edge_kinesis_aws_access_key_id or os.environ.get(
188+
"LUMIGO_EDGE_KINESIS_AWS_ACCESS_KEY_ID"
189+
)
190+
Configuration.edge_kinesis_aws_secret_access_key = (
191+
edge_kinesis_aws_secret_access_key
192+
or os.environ.get("LUMIGO_EDGE_KINESIS_AWS_SECRET_ACCESS_KEY") # noqa
193+
)
165194

166195

167196
def _is_span_has_error(span: dict) -> bool:
@@ -235,9 +264,19 @@ def report_json(region: Union[None, str], msgs: List[dict], should_retry: bool =
235264
:return: The duration of reporting (in milliseconds),
236265
or 0 if we didn't send (due to configuration or fail).
237266
"""
238-
global edge_connection
267+
if not Configuration.should_report:
268+
return 0
239269
get_logger().info(f"reporting the messages: {msgs[:10]}")
270+
try:
271+
prune_trace: bool = not os.environ.get("LUMIGO_PRUNE_TRACE_OFF", "").lower() == "true"
272+
to_send = _create_request_body(msgs, prune_trace).encode()
273+
except Exception as e:
274+
get_logger().exception("Failed to create request: A span was lost.", exc_info=e)
275+
return 0
276+
if region == CHINA_REGION:
277+
return _publish_spans_to_kinesis(to_send, CHINA_REGION)
240278
host = None
279+
global edge_connection
241280
with lumigo_safe_execute("report json: establish connection"):
242281
host = prepare_host(Configuration.host or EDGE_HOST.format(region=region))
243282
duration = 0
@@ -246,28 +285,67 @@ def report_json(region: Union[None, str], msgs: List[dict], should_retry: bool =
246285
if not edge_connection:
247286
get_logger().warning("Can not establish connection. Skip sending span.")
248287
return duration
249-
if Configuration.should_report:
250-
try:
251-
prune_trace: bool = not os.environ.get("LUMIGO_PRUNE_TRACE_OFF", "").lower() == "true"
252-
to_send = _create_request_body(msgs, prune_trace).encode()
253-
start_time = time.time()
254-
edge_connection.request(
255-
"POST", EDGE_PATH, to_send, headers={"Content-Type": "application/json"}
256-
)
257-
response = edge_connection.getresponse()
258-
response.read() # We most read the response to keep the connection available
259-
duration = int((time.time() - start_time) * 1000)
260-
get_logger().info(f"successful reporting, code: {getattr(response, 'code', 'unknown')}")
261-
except Exception as e:
262-
if should_retry:
263-
get_logger().exception(f"Could not report to {host}. Retrying.", exc_info=e)
264-
edge_connection = establish_connection(host)
265-
report_json(region, msgs, should_retry=False)
266-
else:
267-
get_logger().exception("Could not report: A span was lost.", exc_info=e)
288+
try:
289+
start_time = time.time()
290+
edge_connection.request(
291+
"POST", EDGE_PATH, to_send, headers={"Content-Type": "application/json"}
292+
)
293+
response = edge_connection.getresponse()
294+
response.read() # We most read the response to keep the connection available
295+
duration = int((time.time() - start_time) * 1000)
296+
get_logger().info(f"successful reporting, code: {getattr(response, 'code', 'unknown')}")
297+
except Exception as e:
298+
if should_retry:
299+
get_logger().exception(f"Could not report to {host}. Retrying.", exc_info=e)
300+
edge_connection = establish_connection(host)
301+
report_json(region, msgs, should_retry=False)
302+
else:
303+
get_logger().exception("Could not report: A span was lost.", exc_info=e)
268304
return duration
269305

270306

307+
def _publish_spans_to_kinesis(to_send: bytes, region: str) -> int:
308+
start_time = time.time()
309+
with lumigo_safe_execute("report json: publish to kinesis"):
310+
get_logger().info("Sending spans to Kinesis")
311+
if not Configuration.edge_kinesis_aws_access_key_id:
312+
get_logger().error("Missing edge_kinesis_aws_access_key_id, can't publish the spans")
313+
return 0
314+
if not Configuration.edge_kinesis_aws_secret_access_key:
315+
get_logger().error(
316+
"Missing edge_kinesis_aws_secret_access_key, can't publish the spans"
317+
)
318+
return 0
319+
_send_data_to_kinesis(
320+
stream_name=Configuration.edge_kinesis_stream_name,
321+
to_send=to_send,
322+
region=region,
323+
aws_access_key_id=Configuration.edge_kinesis_aws_access_key_id,
324+
aws_secret_access_key=Configuration.edge_kinesis_aws_secret_access_key,
325+
)
326+
return int((time.time() - start_time) * 1000)
327+
328+
329+
def _send_data_to_kinesis(
330+
stream_name: str,
331+
to_send: bytes,
332+
region: str,
333+
aws_access_key_id: str,
334+
aws_secret_access_key: str,
335+
):
336+
if not boto3:
337+
get_logger().error("boto3 is missing. Unable to send to Kinesis.")
338+
return None
339+
client = boto3.client(
340+
"kinesis",
341+
region_name=region,
342+
aws_access_key_id=aws_access_key_id,
343+
aws_secret_access_key=aws_secret_access_key,
344+
)
345+
client.put_record(Data=to_send, StreamName=stream_name, PartitionKey=str(random.random()))
346+
get_logger().info("Successful sending to Kinesis")
347+
348+
271349
def get_logger(logger_name="lumigo"):
272350
"""
273351
This function returns lumigo's logger.

src/test/unit/test_lumigo_utils.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from mock import Mock
77

88
import pytest
9+
from lumigo_tracer import lumigo_utils
910
from lumigo_tracer.lumigo_utils import (
1011
_create_request_body,
1112
_is_span_has_error,
@@ -34,6 +35,7 @@
3435
is_error_code,
3536
get_size_upper_bound,
3637
is_aws_arn,
38+
CHINA_REGION,
3739
)
3840
import json
3941

@@ -387,6 +389,43 @@ def test_report_json_retry(monkeypatch, reporter_mock, caplog, errors, final_log
387389
assert caplog.records[-1].levelname == final_log
388390

389391

392+
def test_report_json_china_missing_access_key_id(monkeypatch, reporter_mock, caplog):
393+
monkeypatch.setattr(Configuration, "should_report", True)
394+
reporter_mock.side_effect = report_json
395+
assert report_json(CHINA_REGION, [{"a": "b"}]) == 0
396+
assert any(
397+
"edge_kinesis_aws_access_key_id" in record.message and record.levelname == "ERROR"
398+
for record in caplog.records
399+
)
400+
401+
402+
def test_report_json_china_missing_secret_access_key(monkeypatch, reporter_mock, caplog):
403+
monkeypatch.setattr(Configuration, "should_report", True)
404+
monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value")
405+
reporter_mock.side_effect = report_json
406+
assert report_json(CHINA_REGION, [{"a": "b"}]) == 0
407+
assert any(
408+
"edge_kinesis_aws_secret_access_key" in record.message and record.levelname == "ERROR"
409+
for record in caplog.records
410+
)
411+
412+
413+
def test_report_json_china_no_boto(monkeypatch, reporter_mock, caplog):
414+
reporter_mock.side_effect = report_json
415+
monkeypatch.setattr(Configuration, "should_report", True)
416+
monkeypatch.setattr(Configuration, "edge_kinesis_aws_access_key_id", "my_value")
417+
monkeypatch.setattr(Configuration, "edge_kinesis_aws_secret_access_key", "my_value")
418+
monkeypatch.setattr(lumigo_utils, "boto3", None)
419+
420+
report_json(CHINA_REGION, [{"a": "b"}])
421+
422+
assert any(
423+
"boto3 is missing. Unable to send to Kinesis" in record.message
424+
and record.levelname == "ERROR" # noqa
425+
for record in caplog.records
426+
)
427+
428+
390429
@pytest.mark.parametrize("env, expected", [("True", True), ("other", False), ("123", False)])
391430
def test_is_kill_switch_on(monkeypatch, env, expected):
392431
monkeypatch.setenv(KILL_SWITCH, env)

src/test/unit/test_tracer.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,26 @@
66
import sys
77
from functools import wraps
88
import logging
9+
from unittest.mock import MagicMock
910

11+
import boto3
1012
import pytest
1113
from capturer import CaptureOutput
1214

1315
from lumigo_tracer import lumigo_tracer, LumigoChalice, add_execution_tag
16+
from lumigo_tracer import lumigo_utils
1417
from lumigo_tracer.lumigo_utils import (
1518
Configuration,
1619
STEP_FUNCTION_UID_KEY,
1720
LUMIGO_EVENT_KEY,
1821
_create_request_body,
1922
EXECUTION_TAGS_KEY,
23+
report_json,
24+
EDGE_KINESIS_STREAM_NAME,
2025
)
2126

2227
from lumigo_tracer.spans_container import SpansContainer
28+
from moto import mock_kinesis
2329

2430

2531
def test_lambda_wrapper_basic_events(reporter_mock, context):
@@ -364,3 +370,58 @@ def lambda_test_function(event, context):
364370
# following python's runtime: runtime/lambda_runtime_marshaller.py:27
365371
expected_message = 'The lambda will probably fail due to bad return value. Original message: "Object of type datetime is not JSON serializable"'
366372
assert function_span["error"]["message"] == expected_message
373+
374+
375+
@mock_kinesis
376+
def test_china(context, reporter_mock, monkeypatch):
377+
china_region_for_test = "ap-east-1" # Moto doesn't work for China
378+
monkeypatch.setattr(lumigo_utils, "CHINA_REGION", china_region_for_test)
379+
monkeypatch.setenv("AWS_REGION", china_region_for_test)
380+
reporter_mock.side_effect = report_json # Override the conftest's monkeypatch
381+
access_key_id = "my_access_key_id"
382+
secret_access_key = "my_secret_access_key"
383+
# Create edge Kinesis
384+
client = boto3.client(
385+
"kinesis",
386+
region_name=china_region_for_test,
387+
aws_access_key_id=access_key_id,
388+
aws_secret_access_key=secret_access_key,
389+
)
390+
client.create_stream(StreamName=EDGE_KINESIS_STREAM_NAME, ShardCount=1)
391+
shard_id = client.describe_stream(StreamName=EDGE_KINESIS_STREAM_NAME)["StreamDescription"][
392+
"Shards"
393+
][0]["ShardId"]
394+
shard_iterator = client.get_shard_iterator(
395+
StreamName=EDGE_KINESIS_STREAM_NAME,
396+
ShardId=shard_id,
397+
ShardIteratorType="AT_TIMESTAMP",
398+
Timestamp=datetime.datetime.utcnow(),
399+
)["ShardIterator"]
400+
401+
original_get_boto_client = boto3.client
402+
monkeypatch.setattr(boto3, "client", MagicMock(side_effect=original_get_boto_client))
403+
404+
@lumigo_tracer(
405+
edge_kinesis_aws_access_key_id=access_key_id,
406+
edge_kinesis_aws_secret_access_key=secret_access_key,
407+
should_report=True,
408+
)
409+
def lambda_test_function(event, context):
410+
return "ret_value"
411+
412+
event = {"k": "v"}
413+
result = lambda_test_function(event, context)
414+
415+
assert result == "ret_value"
416+
# Spans sent to Kinesis
417+
records = client.get_records(ShardIterator=shard_iterator)["Records"]
418+
assert len(records) == 2 # Start span and end span
419+
span_sent = json.loads(records[1]["Data"].decode())[0]
420+
assert span_sent["event"] == json.dumps(event)
421+
# Used the client from the decorator params
422+
boto3.client.assert_called_with(
423+
"kinesis",
424+
region_name=china_region_for_test,
425+
aws_access_key_id=access_key_id,
426+
aws_secret_access_key=secret_access_key,
427+
)

0 commit comments

Comments
 (0)