Skip to content

Commit d2edf68

Browse files
committed
add lambda code
1 parent bdbf8a8 commit d2edf68

File tree

17 files changed

+4673
-0
lines changed

17 files changed

+4673
-0
lines changed

lambda/catalog-creator-ddb/app.py

+234
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
import logging
2+
import os
3+
4+
import boto3
5+
from boto3.dynamodb.types import TypeDeserializer
6+
7+
8+
LOGGER = logging.getLogger()
9+
10+
DDB_PARTIQL = "SELECT * FROM {} WHERE glue_job_created = False AND data_catalog_entry = False AND data_source_type = 'dynamodb'"
11+
12+
GLUE_TRACKER_DDB_TABLE_NAME_ENV_VAR = "DDB_GLUE_TRACKER_TABLE_NAME"
13+
DATA_CATALOG_DB_NAME_ENV_VAR = "DATA_CATALOG_DB_NAME"
14+
15+
GLUE_ROLE_ARN_ENV_VAR = "GLUE_ROLE_ARN"
16+
17+
18+
class MalformedEvent(Exception):
19+
"""Raised if a malformed event received"""
20+
21+
22+
class MissingEnvironmentVariable(Exception):
23+
"""Raised if a required environment variable is missing"""
24+
25+
26+
def _silence_noisy_loggers():
27+
"""Silence chatty libraries for better logging"""
28+
for logger in ['boto3', 'botocore',
29+
'botocore.vendored.requests.packages.urllib3']:
30+
logging.getLogger(logger).setLevel(logging.WARNING)
31+
32+
33+
def _configure_logger():
34+
"""Configure python logger"""
35+
level = logging.INFO
36+
verbose = os.environ.get("VERBOSE", "")
37+
if verbose.lower() == "true":
38+
print("Will set the logging output to DEBUG")
39+
level = logging.DEBUG
40+
41+
if len(logging.getLogger().handlers) > 0:
42+
# The Lambda environment pre-configures a handler logging to stderr.
43+
# If a handler is already configured, `.basicConfig` does not execute.
44+
# Thus we set the level directly.
45+
logging.getLogger().setLevel(level)
46+
else:
47+
logging.basicConfig(level=level)
48+
49+
50+
def _check_missing_field(validation_dict, extraction_key):
51+
"""Check if a field exists in a dictionary
52+
53+
:param validation_dict: Dictionary
54+
:param extraction_key: String
55+
56+
:raises: MalformedEvent
57+
"""
58+
extracted_value = validation_dict.get(extraction_key)
59+
60+
if not extracted_value:
61+
LOGGER.error(f"Missing '{extraction_key}' field in the event")
62+
raise MalformedEvent
63+
64+
65+
def _validate_field(validation_dict, extraction_key, expected_value):
66+
"""Validate the passed in field
67+
68+
:param validation_dict: Dictionary
69+
:param extraction_key: String
70+
:param expected_value: String
71+
72+
:raises: ValueError
73+
"""
74+
extracted_value = validation_dict.get(extraction_key)
75+
76+
_check_missing_field(validation_dict, extraction_key)
77+
78+
if extracted_value != expected_value:
79+
LOGGER.error(f"Incorrect value found for '{extraction_key}' field")
80+
raise ValueError
81+
82+
83+
def _fetch_ddb_results(client, query):
84+
"""Fetch results from PartiQL DynamoDB query
85+
86+
:param client: Boto3 client (DynamoDB)
87+
:param query: String
88+
89+
:rtype: Dictionary
90+
"""
91+
resp = client.execute_statement(Statement=query)
92+
93+
_check_missing_field(resp, "ResponseMetadata")
94+
95+
_validate_field(resp["ResponseMetadata"], "HTTPStatusCode", 200)
96+
97+
return resp
98+
99+
100+
def unmarshall_ddb_items(ddb_items):
101+
"""Deserialize ddb_items
102+
103+
:param ddb_items: List
104+
105+
:rtype: List
106+
"""
107+
unmarshalled = []
108+
109+
deserializer = TypeDeserializer()
110+
111+
for ddb_item in ddb_items:
112+
unmarshalled.append(
113+
{k: deserializer.deserialize(v) for k, v in ddb_item.items()}
114+
)
115+
116+
return unmarshalled
117+
118+
119+
def update_ddb(client, table_name, obj, dc_table_name, dc_db_name):
120+
"""Update the dynamodb entry in the tracker table
121+
122+
:param client: Boto3 Client Object
123+
:param table_name: String
124+
:param obj: Dictionary
125+
:param dc_table_name: String
126+
:param dc_db_name: String
127+
"""
128+
resp = client.update_item(
129+
TableName=table_name,
130+
Key={"id": {"S": obj["id"]}},
131+
UpdateExpression="SET #data_catalog_entry = :true, #data_catalog_table_name =:t, #data_catalog_db_name =:d",
132+
ExpressionAttributeNames={
133+
"#data_catalog_entry": "data_catalog_entry",
134+
"#data_catalog_table_name": "data_catalog_table_name",
135+
"#data_catalog_db_name": "data_catalog_db_name",
136+
},
137+
ExpressionAttributeValues={
138+
":true": {"BOOL": True},
139+
":t": {"S": dc_table_name},
140+
":d": {"S": dc_db_name},
141+
}
142+
)
143+
_check_missing_field(resp, "ResponseMetadata")
144+
_validate_field(resp["ResponseMetadata"], "HTTPStatusCode", 200)
145+
LOGGER.info("Successfully updated DynamoDB item")
146+
147+
148+
def create_crawler(client, data_catalog_db_name, table_name, role_arn):
149+
"""Create crawler for the DynamoDB source
150+
151+
:param client: Boto3 client obj
152+
:param data_catalog_db_name: String
153+
:param table_name: String
154+
:param role_arn: String
155+
"""
156+
crawler_name = f"{table_name}_crawler"
157+
LOGGER.info(f"Attempting to create crawler: {table_name}")
158+
resp = client.create_crawler(
159+
Name=crawler_name,
160+
Role=role_arn,
161+
DatabaseName=data_catalog_db_name,
162+
Targets={
163+
'DynamoDBTargets': [
164+
{
165+
'Path': table_name,
166+
},
167+
],
168+
},
169+
Schedule='cron(0 2 * * ? *)',
170+
)
171+
_check_missing_field(resp, "ResponseMetadata")
172+
_validate_field(resp["ResponseMetadata"], "HTTPStatusCode", 200)
173+
174+
175+
def lambda_handler(event, context):
176+
"""What executes when the program is run"""
177+
178+
# configure python logger
179+
_configure_logger()
180+
# silence chatty libraries
181+
_silence_noisy_loggers()
182+
183+
ddb_table_name = os.environ.get(GLUE_TRACKER_DDB_TABLE_NAME_ENV_VAR)
184+
if not ddb_table_name:
185+
raise MissingEnvironmentVariable(f"{GLUE_TRACKER_DDB_TABLE_NAME_ENV_VAR} is missing")
186+
187+
ddb_client = boto3.client("dynamodb")
188+
189+
ddb_resp = _fetch_ddb_results(
190+
ddb_client,
191+
DDB_PARTIQL.format(ddb_table_name),
192+
)
193+
ddb_results = ddb_resp.get("Items")
194+
if not ddb_results:
195+
LOGGER.warning("No data sources returned. Exiting.")
196+
return
197+
198+
pythonic_results = unmarshall_ddb_items(ddb_results)
199+
200+
role_arn = os.environ.get(GLUE_ROLE_ARN_ENV_VAR)
201+
if not role_arn:
202+
raise MissingEnvironmentVariable(f"{GLUE_ROLE_ARN_ENV_VAR} is missing")
203+
204+
data_catalog_db_name = os.environ.get(DATA_CATALOG_DB_NAME_ENV_VAR)
205+
if not data_catalog_db_name:
206+
raise MissingEnvironmentVariable(f"{DATA_CATALOG_DB_NAME_ENV_VAR} is missing")
207+
208+
glue_client = boto3.client("glue")
209+
210+
for ddb_obj in pythonic_results:
211+
tb_name = ddb_obj["data_source_attrs"]["tableDescription"]["tableName"]
212+
213+
create_crawler(
214+
client=glue_client,
215+
data_catalog_db_name=data_catalog_db_name,
216+
table_name=tb_name,
217+
role_arn=role_arn
218+
)
219+
LOGGER.info("Successfully created crawler")
220+
221+
LOGGER.info("Attempting to update glue job tracker table")
222+
update_ddb(
223+
client=ddb_client,
224+
obj=ddb_obj,
225+
table_name=ddb_table_name,
226+
dc_table_name=tb_name.lower(),
227+
dc_db_name=data_catalog_db_name
228+
)
229+
230+
LOGGER.debug("Closing DynamoDB Boto3 client")
231+
ddb_client.close()
232+
233+
LOGGER.debug("Closing Glue boto3 client")
234+
glue_client.close()

0 commit comments

Comments
 (0)