Skip to content

Commit 5401428

Browse files
authored
add support for Kinesis targets in EventBridge (localstack#3925)
1 parent 1330d5c commit 5401428

File tree

14 files changed

+312
-47
lines changed

14 files changed

+312
-47
lines changed

.dockerignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
.venv*
22
localstack/dashboard/web/node_modules/
3-
localstack/ext/java/target
43
localstack/infra/
54
!localstack/infra/stepfunctions
65
!localstack/infra/localstack-*.jar

.env-exporter

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# ~/.env-exporter
2+
# about zlib
3+
export CFLAGS="-I$(xcrun --show-sdk-path)/usr/include"
4+
# about readline
5+
export CFLAGS="-I$(brew --prefix readline)/include $CFLAGS"
6+
export LDFLAGS="-L$(brew --prefix readline)/lib $LDFLAGS"
7+
# about openssl
8+
export CFLAGS="-I$(brew --prefix openssl)/include $CFLAGS"
9+
export LDFLAGS="-L$(brew --prefix openssl)/lib $LDFLAGS"
10+
# about SQLite (maybe not necessary)
11+
export CFLAGS="-I$(brew --prefix sqlite)/include $CFLAGS"
12+
export LDFLAGS="-L$(brew --prefix sqlite)/lib $LDFLAGS"

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
/infra/
88
localstack/infra/
9-
localstack/ext/java/.factorypath
109

1110
/node_modules/
1211
package-lock.json

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ ADD localstack/utils/common.py localstack/utils/bootstrap.py localstack/utils/
3434
ADD localstack/utils/aws/ localstack/utils/aws/
3535
ADD localstack/utils/kinesis/ localstack/utils/kinesis/
3636
ADD localstack/utils/analytics/ localstack/utils/analytics/
37+
ADD localstack/utils/generic/ localstack/utils/generic/
3738
ADD localstack/package.json localstack/package.json
3839
ADD localstack/services/__init__.py localstack/services/install.py localstack/services/
3940

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ In addition to the above, the [**Pro version** of LocalStack](https://localstack
9494

9595
## Requirements
9696

97-
* `python` (both Python 2.x and 3.x supported)
97+
* `python` (Python 2.x up to 3.8 supported)
9898
* `pip` (python package manager)
9999
* `Docker`
100100

bin/mvn_release.sh

Lines changed: 0 additions & 34 deletions
This file was deleted.

localstack/services/events/events_starter.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,19 @@ def filter_event(event_pattern, event):
6666
return True
6767

6868
rule_information = self.events_backend.describe_rule(rule)
69-
if rule_information.event_pattern:
70-
event_pattern = rule_information.event_pattern._filter
71-
if event_pattern and not filter_event(event_pattern, event):
72-
return False
69+
rule_event_pattern = json.loads(str(rule_information.event_pattern))
70+
71+
if rule_event_pattern and not filter_event(rule_event_pattern, event):
72+
return False
73+
7374
return True
7475

7576

7677
def process_events(event, targets):
7778
for target in targets:
7879
arn = target['Arn']
7980
changed_event = filter_event_with_target_input_path(target, event)
80-
aws_stack.send_event_to_target(arn, changed_event)
81+
aws_stack.send_event_to_target(arn, changed_event, aws_stack.get_events_target_attributes(target))
8182

8283

8384
def apply_patches():

localstack/utils/aws/aws_stack.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
APPLICATION_AMZ_JSON_1_0, APPLICATION_X_WWW_FORM_URLENCODED, TEST_AWS_ACCOUNT_ID,
1313
MAX_POOL_CONNECTIONS, TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY, S3_VIRTUAL_HOSTNAME)
1414
from localstack.utils.aws import templating
15+
from localstack.utils.generic import dict_utils
1516
from localstack.utils.common import (
1617
run_safe, to_str, is_string, is_string_or_bytes, make_http_request,
1718
is_port_open, get_service_protocol, retry, to_bytes)
@@ -47,6 +48,9 @@
4748
# maps SQS queue ARNs to queue URLs
4849
SQS_ARN_TO_URL_CACHE = {}
4950

51+
# List of parameters with additional event target parameters
52+
EVENT_TARGET_PARAMETERS = ['$.SqsParameters', '$.KinesisParameters']
53+
5054

5155
class Environment(object):
5256
def __init__(self, region=None, prefix=None):
@@ -573,7 +577,7 @@ def send_event_to_target(arn, event, target_attributes=None, asynchronous=True):
573577
elif ':sqs:' in arn:
574578
sqs_client = connect_to_service('sqs', region_name=region)
575579
queue_url = get_sqs_queue_url(arn)
576-
msg_group_id = (target_attributes or {}).get('MessageGroupId')
580+
msg_group_id = dict_utils.get_safe(target_attributes, '$.SqsParameters.MessageGroupId')
577581
kwargs = {'MessageGroupId': msg_group_id} if msg_group_id else {}
578582
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(event), **kwargs)
579583

@@ -600,13 +604,29 @@ def send_event_to_target(arn, event, target_attributes=None, asynchronous=True):
600604
}]
601605
)
602606

607+
elif ':kinesis:' in arn:
608+
partition_key_path = dict_utils.get_safe(
609+
target_attributes,
610+
'$.KinesisParameters.PartitionKeyPath',
611+
default_value='$.id'
612+
)
613+
614+
stream_name = arn.split('/')[-1]
615+
partition_key = dict_utils.get_safe(event, partition_key_path, event['id'])
616+
kinesis_client = connect_to_service('kinesis', region_name=region)
617+
618+
kinesis_client.put_record(
619+
StreamName=stream_name,
620+
Data=to_bytes(json.dumps(event)),
621+
PartitionKey=partition_key
622+
)
623+
603624
else:
604625
LOG.warning('Unsupported Events rule target ARN: "%s"' % arn)
605626

606627

607628
def get_events_target_attributes(target):
608-
# TODO: add support for other target types
609-
return target.get('SqsParameters')
629+
return dict_utils.pick_attributes(target, EVENT_TARGET_PARAMETERS)
610630

611631

612632
def get_or_create_bucket(bucket_name):

localstack/utils/generic/__init__.py

Whitespace-only changes.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
def get_safe(dictionary, path, default_value=None):
2+
"""
3+
Performs a safe navigation on a Dictionary object and
4+
returns the result or default value (if specified).
5+
The function follows a common AWS path resolution pattern "$.a.b.c".
6+
7+
:type dictionary: dict
8+
:param dictionary: Dict to perform safe navigation.
9+
10+
:type path: list|str
11+
:param path: List or dot-separated string containing the path of an attribute,
12+
starting from the root node "$".
13+
14+
:type default_value: any
15+
:param default_value: Default value to return in case resolved value is None.
16+
17+
:rtype: any
18+
:return: Resolved value or default_value.
19+
"""
20+
if not isinstance(dictionary, dict) or len(dictionary) == 0:
21+
return default_value
22+
23+
attribute_path = path if isinstance(path, list) else path.split('.')
24+
if len(attribute_path) == 0 or attribute_path[0] != '$':
25+
raise AttributeError('Safe navigation must begin with a root node "$"')
26+
27+
current_value = dictionary
28+
for path_node in attribute_path:
29+
if path_node == '$':
30+
continue
31+
elif isinstance(current_value, dict) and path_node in current_value:
32+
current_value = current_value[path_node]
33+
else:
34+
current_value = None
35+
36+
return current_value or default_value
37+
38+
39+
def set_safe_mutable(dictionary, path, value):
40+
"""
41+
Mutates original dict and sets the specified value under provided path.
42+
43+
:type dictionary: dict
44+
:param dictionary: Dict to mutate.
45+
46+
:type path: list|str
47+
:param path: List or dot-separated string containing the path of an attribute,
48+
starting from the root node "$".
49+
50+
:type value: any
51+
:param value: Value to set under specified path.
52+
53+
:rtype: dict
54+
:return: Returns mutated dictionary.
55+
"""
56+
if not isinstance(dictionary, dict):
57+
raise AttributeError('"dictionary" must be of type "dict"')
58+
59+
attribute_path = path if isinstance(path, list) else path.split('.')
60+
attribute_path_len = len(attribute_path)
61+
62+
if attribute_path_len == 0 or attribute_path[0] != '$':
63+
raise AttributeError('Dict navigation must begin with a root node "$"')
64+
65+
current_pointer = dictionary
66+
for i in range(attribute_path_len):
67+
path_node = attribute_path[i]
68+
69+
if path_node == '$':
70+
continue
71+
72+
if i < attribute_path_len - 1:
73+
if path_node not in current_pointer:
74+
current_pointer[path_node] = {}
75+
if not isinstance(current_pointer, dict):
76+
raise RuntimeError('Error while deeply setting a dict value. Supplied path is not of type "dict"')
77+
else:
78+
current_pointer[path_node] = value
79+
80+
current_pointer = current_pointer[path_node]
81+
82+
return dictionary
83+
84+
85+
def pick_attributes(dictionary, paths):
86+
"""
87+
Picks selected attributes a returns them as a new dictionary.
88+
This function works as a whitelist of attributes to keep in a new dictionary.
89+
90+
:type dictionary: dict
91+
:param dictionary: Dict to pick attributes from.
92+
93+
:type paths: list of (list or str)
94+
:param paths: List of lists or strings with dot-separated paths, starting from the root node "$".
95+
96+
:rtype: dict
97+
:return: Returns whitelisted dictionary.
98+
"""
99+
new_dictionary = {}
100+
101+
for path in paths:
102+
value = get_safe(dictionary, path)
103+
104+
if value is not None:
105+
set_safe_mutable(new_dictionary, path, value)
106+
107+
return new_dictionary

0 commit comments

Comments
 (0)