Skip to content

Commit 53798f6

Browse files
committed
Setup auto reconciliation of S3 objects in Dynamo
1 parent f36d1aa commit 53798f6

File tree

8 files changed

+289
-9
lines changed

8 files changed

+289
-9
lines changed

src/api/functions/s3.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export type CreatePresignedPutInputs = {
1414
mimeType: string;
1515
md5hash?: string; // Must be a base64-encoded MD5 hash
1616
urlExpiresIn?: number;
17+
metadata?: Record<string, string>;
1718
};
1819

1920
export async function createPresignedPut({
@@ -24,13 +25,15 @@ export async function createPresignedPut({
2425
mimeType,
2526
md5hash,
2627
urlExpiresIn,
28+
metadata,
2729
}: CreatePresignedPutInputs) {
2830
const command = new PutObjectCommand({
2931
Bucket: bucketName,
3032
Key: key,
3133
ContentLength: length,
3234
ContentType: mimeType,
3335
ContentMD5: md5hash,
36+
Metadata: metadata,
3437
});
3538

3639
const expiresIn = urlExpiresIn || 900;

src/api/routes/roomRequests.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ const roomRequestRoutes: FastifyPluginAsync = async (fastify, _options) => {
128128
message: "Could not get username from request.",
129129
});
130130
}
131+
const createdAt = new Date().toISOString();
131132
const requestId = request.params.requestId;
132133
const semesterId = request.params.semesterId;
133134
const attachmentS3key = request.body.attachmentInfo
134-
? `roomRequests/${requestId}/${request.body.status}/${request.id}/${request.body.attachmentInfo.filename}`
135+
? `reconciled/roomRequests/${requestId}/${request.id}/${request.body.attachmentInfo.filename}`
135136
: undefined;
136137
const getReservationData = new QueryCommand({
137138
TableName: genericConfig.RoomRequestsStatusTableName,
@@ -157,12 +158,24 @@ const roomRequestRoutes: FastifyPluginAsync = async (fastify, _options) => {
157158
region: genericConfig.AwsRegion,
158159
});
159160
}
161+
if (!attachmentS3key) {
162+
throw new InternalServerError({ message: "Failed to handle file." });
163+
}
160164
uploadUrl = await createPresignedPut({
161165
s3client: fastify.s3Client,
162-
key: attachmentS3key!,
166+
key: attachmentS3key,
163167
bucketName: fastify.environmentConfig.AssetsBucketId,
164168
length: fileSizeBytes,
165169
mimeType: contentType,
170+
metadata: {
171+
dynamoTable: genericConfig.RoomRequestsStatusTableName,
172+
dynamoPrimaryKey: JSON.stringify({
173+
requestId,
174+
"createdAt#status": `${createdAt}#${request.body.status}`,
175+
}),
176+
dynamoAttribute: "attachmentS3key",
177+
dynamoPendingAttributeName: "pendingAttachmentS3key",
178+
},
166179
});
167180
}
168181
const createdNotified =
@@ -178,7 +191,6 @@ const roomRequestRoutes: FastifyPluginAsync = async (fastify, _options) => {
178191
message: "Could not find original reservation requestor",
179192
});
180193
}
181-
const createdAt = new Date().toISOString();
182194
const itemPut = {
183195
TableName: genericConfig.RoomRequestsStatusTableName,
184196
Item: marshall(

src/s3UploadConfirmer/main.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import json
2+
import boto3
3+
import urllib.parse
4+
from typing import Dict, Any, Optional
5+
import logging
6+
7+
# Configure logging
8+
logger = logging.getLogger()
9+
logger.setLevel(logging.INFO)
10+
11+
# Initialize AWS clients
12+
dynamodb = boto3.client("dynamodb")
13+
s3 = boto3.client("s3")
14+
15+
16+
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
17+
"""
18+
Lambda function to handle S3 upload events and update DynamoDB.
19+
20+
Expects S3 object metadata:
21+
- dynamoTable: DynamoDB table name
22+
- dynamoPrimaryKey: JSON string of primary key
23+
- dynamoAttribute: Target attribute name to set with value from pending attribute
24+
- dynamoPendingAttributeName: Source pending attribute name to remove
25+
"""
26+
try:
27+
# Process each S3 event record
28+
for record in event["Records"]:
29+
process_s3_record(record)
30+
31+
return {
32+
"statusCode": 200,
33+
"body": json.dumps("Successfully processed S3 events"),
34+
}
35+
36+
except Exception as e:
37+
logger.error(f"Error processing S3 event: {str(e)}", exc_info=True)
38+
raise
39+
40+
41+
def process_s3_record(record: Dict[str, Any]) -> None:
42+
"""Process a single S3 event record."""
43+
44+
# Extract S3 event details
45+
bucket = record["s3"]["bucket"]["name"]
46+
key = urllib.parse.unquote_plus(record["s3"]["object"]["key"])
47+
48+
logger.info(f"Processing upload for bucket={bucket}, key={key}")
49+
50+
# Get object metadata
51+
metadata = get_object_metadata(bucket, key)
52+
53+
if not metadata:
54+
logger.warning(f"No metadata found for object {key}. Skipping DynamoDB update.")
55+
return
56+
57+
# Extract required metadata fields
58+
dynamo_table = metadata.get("dynamotable")
59+
dynamo_primary_key_json = metadata.get("dynamoprimarykey")
60+
dynamo_attribute = metadata.get("dynamoattribute")
61+
dynamo_pending_attribute = metadata.get("dynamopendingattributename")
62+
63+
# Validate required metadata - exit early if any are missing
64+
if not dynamo_table:
65+
logger.warning(f"Missing dynamoTable metadata for {key}")
66+
return
67+
68+
if not dynamo_primary_key_json:
69+
logger.warning(f"Missing dynamoPrimaryKey metadata for {key}")
70+
return
71+
72+
if not dynamo_attribute:
73+
logger.warning(f"Missing dynamoAttribute metadata for {key}")
74+
return
75+
76+
if not dynamo_pending_attribute:
77+
logger.warning(f"Missing dynamoPendingAttributeName metadata for {key}")
78+
return
79+
80+
# Parse primary key
81+
try:
82+
primary_key = json.loads(dynamo_primary_key_json)
83+
except json.JSONDecodeError as e:
84+
logger.error(f"Failed to parse dynamoPrimaryKey JSON: {e}")
85+
return
86+
87+
# Update DynamoDB - all variables are guaranteed to be strings now
88+
update_dynamodb(
89+
table_name=dynamo_table,
90+
primary_key=primary_key,
91+
target_attribute=dynamo_attribute,
92+
pending_attribute=dynamo_pending_attribute,
93+
)
94+
95+
logger.info(f"Successfully updated DynamoDB for {key}")
96+
97+
98+
def get_object_metadata(bucket: str, key: str) -> Optional[Dict[str, str]]:
99+
"""Retrieve metadata from S3 object."""
100+
try:
101+
response = s3.head_object(Bucket=bucket, Key=key)
102+
return response.get("Metadata", {})
103+
except Exception as e:
104+
logger.error(f"Error getting metadata for {bucket}/{key}: {str(e)}")
105+
return None
106+
107+
108+
def update_dynamodb(
109+
table_name: str,
110+
primary_key: Dict[str, str],
111+
target_attribute: str,
112+
pending_attribute: str,
113+
) -> None:
114+
"""
115+
Update DynamoDB item, moving value from pending attribute to target attribute.
116+
117+
Args:
118+
table_name: DynamoDB table name
119+
primary_key: Primary key as dict (e.g., {"requestId": "123", "createdAt#status": "..."})
120+
target_attribute: The confirmed attribute name (e.g., "attachmentS3key")
121+
pending_attribute: The pending attribute name (e.g., "pendingAttachmentS3key")
122+
"""
123+
124+
# Convert primary key to DynamoDB format
125+
dynamo_key = {k: {"S": v} for k, v in primary_key.items()}
126+
127+
try:
128+
# Build update expression to move pending attribute value to target attribute
129+
# SET target = pending, REMOVE pending
130+
update_expression = "SET #target = #pending REMOVE #pending"
131+
132+
expression_attribute_names = {
133+
"#target": target_attribute,
134+
"#pending": pending_attribute,
135+
}
136+
137+
# Condition: pending attribute should exist and equal the uploaded s3 key
138+
condition_expression = (
139+
"attribute_exists(#pending) AND #pending = :expected_s3key"
140+
)
141+
142+
dynamodb.update_item(
143+
TableName=table_name,
144+
Key=dynamo_key,
145+
UpdateExpression=update_expression,
146+
ExpressionAttributeNames=expression_attribute_names,
147+
ConditionExpression=condition_expression,
148+
ReturnValues="ALL_NEW",
149+
)
150+
151+
logger.info(
152+
f"Updated DynamoDB table={table_name}, "
153+
f"key={primary_key}, "
154+
f"moved value from {pending_attribute} to {target_attribute}"
155+
)
156+
157+
except dynamodb.exceptions.ConditionalCheckFailedException:
158+
logger.info(
159+
f"Skipping update for {table_name} with key {primary_key}. "
160+
f"This is expected if the file was already confirmed or uploaded without metadata."
161+
)
162+
except Exception as e:
163+
logger.error(
164+
f"Error updating DynamoDB table={table_name}, key={primary_key}: {str(e)}",
165+
exc_info=True,
166+
)
167+
raise

terraform/envs/prod/main.tf

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,11 @@ module "frontend" {
129129
}
130130

131131
module "assets" {
132-
source = "../../modules/assets"
133-
ProjectId = var.ProjectId
134-
BucketAllowedCorsOrigins = ["https://${var.CorePublicDomain}"]
132+
source = "../../modules/assets"
133+
ProjectId = var.ProjectId
134+
BucketAllowedCorsOrigins = ["https://${var.CorePublicDomain}"]
135+
ConfirmerLambdaArnPrimary = module.lambdas.s3_confirmer_function_arn
136+
ConfirmerLambdaArnSecondary = module.lambdas_usw2.s3_confirmer_function_arn
135137
}
136138

137139
resource "aws_lambda_event_source_mapping" "queue_consumer" {

terraform/envs/qa/main.tf

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,11 @@ module "frontend" {
132132
}
133133

134134
module "assets" {
135-
source = "../../modules/assets"
136-
ProjectId = var.ProjectId
137-
BucketAllowedCorsOrigins = ["https://${var.CorePublicDomain}", "http://localhost:5173"]
135+
source = "../../modules/assets"
136+
ProjectId = var.ProjectId
137+
BucketAllowedCorsOrigins = ["https://${var.CorePublicDomain}", "http://localhost:5173"]
138+
ConfirmerLambdaArnPrimary = module.lambdas.s3_confirmer_function_arn
139+
ConfirmerLambdaArnSecondary = module.lambdas_usw2.s3_confirmer_function_arn
138140
}
139141

140142
// Multi-Region Failover: US-West-2

terraform/modules/assets/main.tf

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,48 @@ resource "aws_iam_policy" "bucket_access" {
103103
policy = data.aws_iam_policy_document.bucket_access.json
104104
}
105105

106+
resource "aws_lambda_permission" "allow_bucket_primary" {
107+
statement_id = "AllowExecutionFromS3Bucket"
108+
action = "lambda:InvokeFunction"
109+
function_name = var.ConfirmerLambdaArnPrimary
110+
principal = "s3.amazonaws.com"
111+
source_arn = module.buckets.bucket_info[var.PrimaryRegion].arn
112+
}
113+
114+
resource "aws_lambda_permission" "allow_bucket_secondary" {
115+
for_each = module.buckets.buckets_info
116+
statement_id = "AllowExecutionFromS3Bucket"
117+
action = "lambda:InvokeFunction"
118+
function_name = var.ConfirmerLambdaArnSecondary
119+
principal = "s3.amazonaws.com"
120+
source_arn = module.buckets.bucket_info[var.SecondaryRegion].arn
121+
}
122+
123+
124+
resource "aws_s3_bucket_notification" "primary_bucket_notification" {
125+
bucket = module.buckets.bucket_info[var.PrimaryRegion].id
126+
lambda_function {
127+
lambda_function_arn = var.ConfirmerLambdaArnPrimary
128+
events = ["s3:ObjectCreated:*"]
129+
filter_prefix = "reconciled/"
130+
}
131+
132+
depends_on = [aws_lambda_permission.allow_bucket_primary]
133+
}
134+
135+
136+
resource "aws_s3_bucket_notification" "secondary_bucket_notification" {
137+
bucket = module.buckets.bucket_info[var.SecondaryRegion].id
138+
lambda_function {
139+
lambda_function_arn = var.ConfirmerLambdaArnSecondary
140+
events = ["s3:ObjectCreated:*"]
141+
filter_prefix = "reconciled/"
142+
}
143+
144+
depends_on = [aws_lambda_permission.allow_bucket_secondary]
145+
}
146+
147+
106148
output "access_policy_arn" {
107149
description = "ARN of the IAM policy for bucket access"
108150
value = aws_iam_policy.bucket_access.arn

terraform/modules/assets/variables.tf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,12 @@ variable "BucketAllowedCorsOrigins" {
1717
type = list(string)
1818
description = "List of URLs that bucket can be read/written from."
1919
}
20+
21+
variable "ConfirmerLambdaArnPrimary" {
22+
type = string
23+
}
24+
25+
variable "ConfirmerLambdaArnSecondary" {
26+
type = string
27+
}
28+

0 commit comments

Comments
 (0)