Skip to content

Commit

Permalink
Singer taps using state and data lake target (#75)
Browse files Browse the repository at this point in the history
* Addl TZ codes: EST, UTC
* Singer-Taps: default to data lake target, generic config templates
* ECS: permitted s3 buckets
* Bump minor version
  • Loading branch information
aaronsteers authored Mar 15, 2020
1 parent 1ac36cd commit f759028
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 55 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.0
0.9.1
5 changes: 4 additions & 1 deletion catalog/aws/dbt/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ locals {
admin_cidr = var.admin_cidr
admin_ports = ["8080", "10000"]
tz_hour_offset = (
contains(["PST", "Pacific"], var.scheduled_timezone) ? -8 : 0
contains(["PST"], var.scheduled_timezone) ? -8 :
contains(["EST"], var.scheduled_timezone) ? -5 :
contains(["UTC", "GMT"], var.scheduled_timezone) ? 0 :
1 / 0 # ERROR: currently supported timezone code are: "UTC", "GMT", "EST", and "PST"
)
}

Expand Down
48 changes: 40 additions & 8 deletions catalog/aws/singer-taps/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ data "aws_availability_zones" "az_list" {}

locals {
tz_hour_offset = (
contains(["PST", "Pacific"], var.scheduled_timezone) ? -8 : 0
contains(["PST"], var.scheduled_timezone) ? -8 :
contains(["EST"], var.scheduled_timezone) ? -5 :
contains(["UTC", "GMT"], var.scheduled_timezone) ? 0 :
1 / 0 # ERROR: currently supported timezone code are: "UTC", "GMT", "EST", and "PST"
)
name_prefix = "${var.name_prefix}Tap-"
container_image = coalesce(
var.container_image, "slalomggp/singer:${var.taps[0].id}-to-${var.target.id}"
var.container_image, "slalomggp/singer:${var.taps[0].id}-to-${local.target.id}"
)
sync_commands = [
for tap in var.taps :
"s-tap sync ${tap.id} ${var.target.id}"
"s-tap sync ${tap.id} ${local.target.id}"
]
container_command = (
length(local.sync_commands) == 1 ? local.sync_commands[0] :
Expand All @@ -25,6 +28,34 @@ locals {
EOF
))
)
target = (
var.data_lake_type == "S3" ?
{
id = "s3-csv"
settings = {
# https://gist.github.com/aaronsteers/19eb4d6cba926327f8b25089cb79259b
# Parse the S3 path into 'bucket' and 'key' values:
s3_bucket = split("/", split("//", var.data_lake_storage_path)[1])[0]
s3_key_prefix = join("/",
[
join("/", slice(
split("/", split("//", var.data_lake_storage_path)[1]),
1,
length(split("/", split("//", var.data_lake_storage_path)[1]))
)),
replace(var.data_file_naming_scheme, "{file}", "")
]
)
}
secrets = {
# AWS creds secrets will be parsed from local env variables, provided by ECS Task Role
# aws_access_key_id = "../.secrets/aws-secrets-manager-secrets.yml:S3_CSV_aws_access_key_id"
# aws_secret_access_key = "../.secrets/aws-secrets-manager-secrets.yml:S3_CSV_aws_secret_access_key"
}
} :
var.target
)

}

module "ecs_cluster" {
Expand All @@ -49,15 +80,16 @@ module "ecs_tap_sync_task" {
use_fargate = true
environment_vars = merge(
{
"TAP_CONFIG_DIR" : "s3://${var.source_code_s3_bucket}/${var.source_code_s3_path}/tap-snapshot-${local.unique_hash}"
"TAP_CONFIG_DIR" : "${var.data_lake_metadata_path}/tap-snapshot-${local.unique_hash}",
"TAP_STATE_FILE" : "${var.data_lake_storage_path}/${var.state_file_naming_scheme}",
},
{
for k, v in var.taps[0].settings :
"TAP_${upper(replace(var.taps[0].id, "-", "_"))}_${k}" => v
},
{
for k, v in var.target.settings :
"TARGET_${upper(replace(var.target.id, "-", "_"))}_${k}" => v
for k, v in local.target.settings :
"TARGET_${upper(replace(local.target.id, "-", "_"))}_${k}" => v
}
)
environment_secrets = merge(
Expand All @@ -66,8 +98,8 @@ module "ecs_tap_sync_task" {
"TAP_${upper(replace(var.taps[0].id, "-", "_"))}_${k}" => v
},
{
for k, v in var.target.secrets :
"TARGET_${upper(replace(var.target.id, "-", "_"))}_${k}" => v
for k, v in local.target.secrets :
"TARGET_${upper(replace(local.target.id, "-", "_"))}_${k}" => v
}
)
schedules = [
Expand Down
23 changes: 17 additions & 6 deletions catalog/aws/singer-taps/s3-upload.tf
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
locals {
source_files = fileset(var.source_code_folder, "*")
source_files = fileset(var.local_metadata_path, "*")
source_files_hash = join(",", [
for filepath in local.source_files :
filebase64sha256("${var.source_code_folder}/${filepath}")
filebase64sha256("${var.local_metadata_path}/${filepath}")
])
unique_hash = md5(local.source_files_hash)
}

resource "aws_s3_bucket_object" "s3_source_uploads" {
for_each = local.source_files
bucket = var.source_code_s3_bucket
key = "${var.source_code_s3_path}/tap-snapshot-${local.unique_hash}/${each.value}"
source = "${var.source_code_folder}/${each.value}"
# etag = filebase64sha256("${var.source_code_folder}/${each.value}")
# https://gist.github.com/aaronsteers/19eb4d6cba926327f8b25089cb79259b
# Parse the S3 path into 'bucket' and 'key' values:
bucket = split("/", split("//", var.data_lake_metadata_path)[1])[0]
key = join("/",
[
join("/", slice(
split("/", split("//", var.data_lake_metadata_path)[1]),
1,
length(split("/", split("//", var.data_lake_metadata_path)[1]))
)),
"tap-snapshot-${local.unique_hash}/${each.value}"
]
)
source = "${var.local_metadata_path}/${each.value}"
# etag = filebase64sha256("${var.local_metadata_path}/${each.value}")
}
55 changes: 42 additions & 13 deletions catalog/aws/singer-taps/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,38 @@ variable "resource_tags" {
### Custom variables for this module ###
########################################

variable "source_code_folder" { type = string }
variable "source_code_s3_bucket" { type = string }
variable "source_code_s3_path" {
variable "local_metadata_path" {
description = "The local folder which countains tap definitions files: `data.select` and `plan-*.yml`"
type = string
}
variable "data_lake_type" {
description = "Specify `S3` if loading to an S3 data lake, otherwise leave blank."
default = null
}
variable "data_lake_metadata_path" {
description = <<EOF
The remote folder for storing tap definitions files.
Currently only S3 paths (s3://...) are supported.
EOF
type = string
}
variable "data_lake_storage_path" {
description = <<EOF
The path to where files should be stored in the data lake.
Note:
- currently only S3 paths (S3://...) are supported.data
- You must specify `target` or `data_lake_storage_path` but not both.
EOF
type = string
default = null
}
variable "data_file_naming_scheme" {
type = string
default = "{tap}/{table}/v{version}/{file}"
}
variable "state_file_naming_scheme" {
type = string
default = "code/taps"
default = "{tap}/{table}/state/{tap}-{table}-v{version}-state.json"
}
variable "taps" {
type = list(object({
Expand All @@ -38,26 +65,28 @@ variable "taps" {
}))
}
variable "target" {
description = <<EOF
The definition of which target to load data into.
Note: You must specify `target` or `data_lake_storage_path` but not both.
EOF
type = object({
id = string
settings = map(string)
secrets = map(string)
})
default = {
id = "s3-csv"
settings = {
s3_key_prefix = "data/raw/{tap}/{table}/{version}/"
}
secrets = {}
}
default = null
}
variable "scheduled_sync_times" {
description = "A list of one or more daily sync times in `HHMM` format. E.g.: `0400` for 4am, `1600` for 4pm"
type = list(string)
description = "A list of schedule strings in 4 digit format: HHMM"
default = []
}
variable "scheduled_timezone" {
default = "PT"
description = <<EOF
The timezone used in scheduling.
Currently the following codes are supported: PST, EST, UTC
EOF
default = "PT"
}
variable "container_image" { default = null }
variable "container_entrypoint" { default = null }
Expand Down
35 changes: 35 additions & 0 deletions components/aws/ecs-task/iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,38 @@ resource "aws_iam_role_policy_attachment" "ecs_role_policy-S3" {
role = aws_iam_role.ecs_task_role.name
policy_arn = data.aws_iam_policy.AmazonS3FullAccess.arn
}


resource "aws_iam_policy" "permitted_s3_buckets_policy" {
count = var.permitted_s3_buckets == null ? 0 : length(var.permitted_s3_buckets) > 0 ? 1 : 0
name = "${var.name_prefix}ecs_task-permitted_s3_bucket_access"
path = "/"
description = "IAM policy for accessing S3 from a lambda"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": ["arn:aws:s3:::${join("\", \"arn:aws:s3:::", var.permitted_s3_buckets)}"]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": ["arn:aws:s3:::${join("/*\", \"arn:aws:s3:::", var.permitted_s3_buckets)}/*"]
}
]
}
EOF
}

resource "aws_iam_role_policy_attachment" "permitted_s3_buckets_policy_attachment" {
count = var.permitted_s3_buckets == null ? 0 : length(var.permitted_s3_buckets) > 0 ? 1 : 0
role = aws_iam_role.ecs_task_role.name
policy_arn = aws_iam_policy.permitted_s3_buckets_policy[0].arn
}
5 changes: 5 additions & 0 deletions components/aws/ecs-task/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ variable "container_entrypoint" { default = null }
variable "container_command" { default = null }
variable "container_ram_gb" { default = "8" }
variable "container_num_cores" { default = "4" }
variable "permitted_s3_buckets" {
description = "A list of bucket names, to which the ECS task will be granted read/write access."
type = list(string)
default = null
}
resource "null_resource" "validate_is_fargate_config_valid" {
count = (
var.use_fargate == false ? 0 :
Expand Down
4 changes: 3 additions & 1 deletion components/aws/lambda-python/python-zip.tf
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ data "archive_file" "lambda_zip" {
}

resource "aws_s3_bucket_object" "s3_lambda_zip" {
count = local.is_disabled ? 0 : 1
count = local.is_disabled ? 0 : 1
# https://gist.github.com/aaronsteers/19eb4d6cba926327f8b25089cb79259b
# Parse the S3 path into 'bucket' and 'key' values:
bucket = split("/", split("//", var.s3_path_to_lambda_zip)[1])[0]
key = join("/", slice(
split("/", split("//", var.s3_path_to_lambda_zip)[1]),
Expand Down
54 changes: 29 additions & 25 deletions samples/dbt-and-singer-on-aws/02_singer-taps.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,47 @@ module "singer_taps_on_aws" {

# ADD OR MODIFY CONFIGURATION HERE:

source_code_folder = "../data/taps"
source_code_s3_bucket = module.data_lake_on_aws.s3_metadata_bucket
scheduled_timezone = "PST"
scheduled_sync_times = ["0600"]
local_metadata_path = "./sample-taps" # For most projects, this will be: "../../data/taps"
data_lake_type = "S3"
data_lake_metadata_path = "s3://${module.data_lake_on_aws.s3_metadata_bucket}"
data_lake_storage_path = "s3://${module.data_lake_on_aws.s3_data_bucket}/data/raw"
scheduled_timezone = "PST"
scheduled_sync_times = ["0600"]

taps = [
{
# Update with correct source information
id = "sample"
# For 'id', enter any plugin name or alias from the index below, excluding the `tap-` prefix:
# https://github.com/slalom-ggp/dataops-tools/blob/master/containers/singer/singer_index.yml
id = "pardot"
settings = {
# Update with any extract settings
start_date = "2020-02-28T00:00:00Z"
}
secrets = {
# Update with names of secrets keys
SAMPLE_username = "./sample-taps/sample-creds-config.json:username",
SAMPLE_password = "./sample-taps/sample-creds-config.json:password",
email = "./sample-taps/.secrets/tap-pardot-config.json:email",
password = "./sample-taps/.secrets/tap-pardot-config.json:password",
user_key = "./sample-taps/.secrets/tap-pardot-config.json:user_key",
}
}
]

/* OPTIONALLY, COPY-PASTE ADDITIONAL SETTINGS FROM BELOW:
# Target is not needed when data_lake_storage_path is provided:
# target = {
# id = "s3-csv"
# settings = {
# s3_bucket = module.data_lake_on_aws.s3_data_bucket
# s3_key_prefix = "data/raw/{tap}/{table}/{version}/"
# }
# secrets = {
# aws_access_key_id = "../.secrets/aws-secrets-manager-secrets.yml:S3_CSV_aws_access_key_id"
# aws_secret_access_key = "../.secrets/aws-secrets-manager-secrets.yml:S3_CSV_aws_secret_access_key"
# }
# }

path_pattern = "{tap}/{table}/{version}/{file}"
target = {
id = "s3-csv"
settings = {
s3_bucket = module.data_lake_on_aws.s3_data_bucket
s3_key_prefix = "data/raw/{tap}/{table}/{version}/"
}
secrets = {
aws_access_key_id = "../.secrets/aws-secrets-manager-secrets.yml:S3_CSV_aws_access_key_id"
aws_secret_access_key = "../.secrets/aws-secrets-manager-secrets.yml:S3_CSV_aws_secret_access_key"
}
}
/* OPTIONALLY, COPY-PASTE ADDITIONAL SETTINGS FROM BELOW:
s3_data_bucket = module.data_lake_on_aws.s3_data_bucket
s3_data_root = "data/raw"
container_image = "slalomggp/singer:pardot-to-s3-csv--pre"
data_file_naming_scheme = "{tap}/{table}/{version}/{file}"
state_file_naming_scheme = "{tap}/{table}/state/{tap}-{table}-v{version}-state.json"
*/
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"start_date": "2019-02-24",
"email": "******",
"password": "******",
"user_key": "******"
}

0 comments on commit f759028

Please sign in to comment.