-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsynthetic_individual_details_ddb.py
198 lines (162 loc) · 6.36 KB
/
synthetic_individual_details_ddb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import argparse
import logging
import os
import time
import uuid
import boto3
from boto3.dynamodb.types import TypeSerializer
from botocore.exceptions import ClientError
from faker import Faker
DEFAULT_LOG_LEVEL = logging.INFO
LOGGER = logging.getLogger(__name__)
LOGGING_FORMAT = "%(asctime)s %(levelname)-5.5s " \
"[%(name)s]:[%(threadName)s] " \
"%(message)s"
AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"
AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN"
TABLE_NAME = "IndividualDetails"
MAX_RECORDS = 1000
def _check_missing_field(validation_dict, extraction_key):
"""Check if a field exists in a dictionary
:param validation_dict: Dictionary
:param extraction_key: String
:raises: Exception
"""
extracted_value = validation_dict.get(extraction_key)
if extracted_value is None:
LOGGER.error(f"Missing '{extraction_key}' key in the dict")
raise Exception
def _validate_field(validation_dict, extraction_key, expected_value):
"""Validate the passed in field
:param validation_dict: Dictionary
:param extraction_key: String
:param expected_value: String
:raises: ValueError
"""
extracted_value = validation_dict.get(extraction_key)
_check_missing_field(validation_dict, extraction_key)
if extracted_value != expected_value:
LOGGER.error(f"Incorrect value found for '{extraction_key}' key")
raise ValueError
def _cli_args():
"""Parse CLI Args
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser(description="synthetic-data-gen-ddb")
parser.add_argument("-e",
"--env",
action="store_true",
help="Use environment variables for AWS credentials")
parser.add_argument("-p",
"--aws-profile",
type=str,
default="default",
help="AWS profile to be used for the API calls")
parser.add_argument("-r",
"--aws-region",
type=str,
help="AWS region for API calls")
parser.add_argument("-m",
"--max-records",
type=int,
default=MAX_RECORDS,
help="Maximum records to be inserted")
parser.add_argument("-v",
"--verbose",
action="store_true",
help="debug log output")
return parser.parse_args()
def _silence_noisy_loggers():
"""Silence chatty libraries for better logging"""
for logger in ['boto3', 'botocore',
'botocore.vendored.requests.packages.urllib3']:
logging.getLogger(logger).setLevel(logging.WARNING)
def main():
"""What executes when the script is run"""
start = time.time() # to capture elapsed time
args = _cli_args()
# logging configuration
log_level = DEFAULT_LOG_LEVEL
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(level=log_level, format=LOGGING_FORMAT)
# silence chatty libraries
_silence_noisy_loggers()
if args.env:
LOGGER.info(
"Attempting to fetch AWS credentials via environment variables")
aws_access_key_id = os.environ.get(AWS_ACCESS_KEY_ID)
aws_secret_access_key = os.environ.get(AWS_SECRET_ACCESS_KEY)
aws_session_token = os.environ.get(AWS_SESSION_TOKEN)
if not aws_secret_access_key or not aws_access_key_id or not aws_session_token:
raise Exception(
f"Missing one or more environment variables - "
f"'{AWS_ACCESS_KEY_ID}', '{AWS_SECRET_ACCESS_KEY}', "
f"'{AWS_SESSION_TOKEN}'"
)
else:
LOGGER.info(f"AWS Profile being used: {args.aws_profile}")
boto3.setup_default_session(profile_name=args.aws_profile)
region_check = [
# explicit cli argument
args.aws_region,
# check if set through ENV vars
os.environ.get('AWS_REGION'),
os.environ.get('AWS_DEFAULT_REGION'),
# else check if set in config or in boto already
boto3.DEFAULT_SESSION.region_name if boto3.DEFAULT_SESSION else None,
boto3.Session().region_name,
]
aws_region = None
for aws_region in region_check:
if aws_region:
LOGGER.info(f"AWS Region: {aws_region}")
break
if not aws_region:
raise Exception("Need to have a valid AWS Region")
ddb_client = boto3.client("dynamodb")
table_name = os.environ.get(
"INDIVIDUAL_TABLE_NAME", TABLE_NAME)
# Check if table exists
list_table_resp = ddb_client.list_tables()
_check_missing_field(list_table_resp, "ResponseMetadata")
_validate_field(
list_table_resp["ResponseMetadata"],
"HTTPStatusCode",
200)
_check_missing_field(list_table_resp, "TableNames")
tables = list_table_resp["TableNames"]
if table_name not in tables:
raise Exception(f"Table {table_name} does not exist")
serializer = TypeSerializer()
fake = Faker()
# Generate and insert data into the table
for _ in range(MAX_RECORDS):
python_obj = {
'id': str(uuid.uuid4()),
'Individual_Gender': fake.random_element(elements=('Male', 'Female')),
'Individual_Location': fake.city(),
}
try:
resp = ddb_client.put_item(
TableName=table_name,
Item={
k: serializer.serialize(v) for k, v in python_obj.items()
},
ConditionExpression="attribute_not_exists(id)",
)
_check_missing_field(resp, "ResponseMetadata")
_validate_field(resp["ResponseMetadata"], "HTTPStatusCode", 200)
except ClientError as e:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
LOGGER.error(f"Primary Key violation")
else:
LOGGER.error("Unable to insert data into DynamoDB")
raise Exception
LOGGER.info(f"Inserted item with id: {python_obj['id']}.")
LOGGER.info(f"Data insertion complete for table: {table_name}.")
ddb_client.close()
LOGGER.info(f"Total time elapsed: {time.time() - start} seconds")
if __name__ == "__main__":
main()