1515#
1616
1717"""Event Gate Lambda function implementation."""
18- import base64
1918import json
2019import logging
2120import os
2423
2524import boto3
2625import jwt
27- import requests
2826import urllib3
29- from cryptography .exceptions import UnsupportedAlgorithm
30- from cryptography .hazmat .primitives import serialization
3127from jsonschema import validate
3228from jsonschema .exceptions import ValidationError
3329
30+ from src .handlers .handler_token import HandlerToken
3431from src .writers import writer_eventbridge , writer_kafka , writer_postgres
3532from src .utils .conf_path import CONF_DIR , INVALID_CONF_ENV
3633
6461logger .debug ("Loaded TOPICS" )
6562
6663with open (os .path .join (_CONF_DIR , "config.json" ), "r" , encoding = "utf-8" ) as file :
67- CONFIG = json .load (file )
64+ config = json .load (file )
6865logger .debug ("Loaded main CONFIG" )
6966
7067aws_s3 = boto3 .Session ().resource ("s3" , verify = False ) # nosec Boto verify disabled intentionally
7168logger .debug ("Initialized AWS S3 Client" )
7269
73- if CONFIG ["access_config" ].startswith ("s3://" ):
74- name_parts = CONFIG ["access_config" ].split ("/" )
70+ if config ["access_config" ].startswith ("s3://" ):
71+ name_parts = config ["access_config" ].split ("/" )
7572 BUCKET_NAME = name_parts [2 ]
7673 BUCKET_OBJECT_KEY = "/" .join (name_parts [3 :])
7774 ACCESS = json .loads (aws_s3 .Bucket (BUCKET_NAME ).Object (BUCKET_OBJECT_KEY ).get ()["Body" ].read ().decode ("utf-8" ))
7875else :
79- with open (CONFIG ["access_config" ], "r" , encoding = "utf-8" ) as file :
76+ with open (config ["access_config" ], "r" , encoding = "utf-8" ) as file :
8077 ACCESS = json .load (file )
8178logger .debug ("Loaded ACCESS definitions" )
8279
83- TOKEN_PROVIDER_URL = CONFIG ["token_provider_url" ]
84- # Add timeout to avoid hanging requests; wrap in robust error handling so failures are explicit
85- try :
86- response_json = requests .get (CONFIG ["token_public_key_url" ], verify = False , timeout = 5 ).json () # nosec external
87- token_public_key_encoded = response_json ["key" ]
88- TOKEN_PUBLIC_KEY : Any = serialization .load_der_public_key (base64 .b64decode (token_public_key_encoded ))
89- logger .debug ("Loaded TOKEN_PUBLIC_KEY" )
90- except (requests .RequestException , ValueError , KeyError , UnsupportedAlgorithm ) as exc :
91- logger .exception ("Failed to fetch or deserialize token public key from %s" , CONFIG .get ("token_public_key_url" ))
92- raise RuntimeError ("Token public key initialization failed" ) from exc
93-
94- writer_eventbridge .init (logger , CONFIG )
95- writer_kafka .init (logger , CONFIG )
80+ # Initialize token handler and load token public keys
81+ handler_token = HandlerToken (config ).load_public_keys ()
82+
83+ # Initialize EventGate writers
84+ writer_eventbridge .init (logger , config )
85+ writer_kafka .init (logger , config )
9686writer_postgres .init (logger )
9787
9888
@@ -124,12 +114,6 @@ def get_api() -> Dict[str, Any]:
124114 return {"statusCode" : 200 , "body" : API }
125115
126116
127- def get_token () -> Dict [str , Any ]:
128- """Return 303 redirect to token provider endpoint."""
129- logger .debug ("Handling GET Token" )
130- return {"statusCode" : 303 , "headers" : {"Location" : TOKEN_PROVIDER_URL }}
131-
132-
133117def get_topics () -> Dict [str , Any ]:
134118 """Return list of available topic names."""
135119 logger .debug ("Handling GET Topics" )
@@ -163,7 +147,7 @@ def post_topic_message(topic_name: str, topic_message: Dict[str, Any], token_enc
163147 """
164148 logger .debug ("Handling POST %s" , topic_name )
165149 try :
166- token = jwt . decode (token_encoded , TOKEN_PUBLIC_KEY , algorithms = [ "RS256" ]) # type: ignore[arg-type]
150+ token : Dict [ str , Any ] = handler_token . decode_jwt (token_encoded )
167151 except jwt .PyJWTError : # type: ignore[attr-defined]
168152 return _error_response (401 , "auth" , "Invalid or missing token" )
169153
@@ -205,41 +189,6 @@ def post_topic_message(topic_name: str, topic_message: Dict[str, Any], token_enc
205189 }
206190
207191
208- def extract_token (event_headers : Dict [str , str ]) -> str :
209- """Extract bearer token from headers (case-insensitive).
210-
211- Supports:
212- - Custom 'bearer' header (any casing) whose value is the raw token
213- - Standard 'Authorization: Bearer <token>' header (case-insensitive scheme & key)
214- Returns empty string if token not found or malformed.
215- """
216- if not event_headers :
217- return ""
218-
219- # Normalize keys to lowercase for case-insensitive lookup
220- lowered = {str (k ).lower (): v for k , v in event_headers .items ()}
221-
222- # Direct bearer header (raw token)
223- if "bearer" in lowered and isinstance (lowered ["bearer" ], str ):
224- token_candidate = lowered ["bearer" ].strip ()
225- if token_candidate :
226- return token_candidate
227-
228- # Authorization header with Bearer scheme
229- auth_val = lowered .get ("authorization" , "" )
230- if not isinstance (auth_val , str ): # defensive
231- return ""
232- auth_val = auth_val .strip ()
233- if not auth_val :
234- return ""
235-
236- # Case-insensitive match for 'Bearer ' prefix
237- if not auth_val .lower ().startswith ("bearer " ):
238- return ""
239- token_part = auth_val [7 :].strip () # len('Bearer ')==7
240- return token_part
241-
242-
243192def lambda_handler (event : Dict [str , Any ], context : Any ): # pylint: disable=unused-argument,too-many-return-statements
244193 """AWS Lambda entry point.
245194
@@ -250,7 +199,7 @@ def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unus
250199 if resource == "/api" :
251200 return get_api ()
252201 if resource == "/token" :
253- return get_token ()
202+ return handler_token . get_token_provider_info ()
254203 if resource == "/topics" :
255204 return get_topics ()
256205 if resource == "/topics/{topic_name}" :
@@ -261,7 +210,7 @@ def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unus
261210 return post_topic_message (
262211 event ["pathParameters" ]["topic_name" ].lower (),
263212 json .loads (event ["body" ]),
264- extract_token (event .get ("headers" , {})),
213+ handler_token . extract_token (event .get ("headers" , {})),
265214 )
266215 if resource == "/terminate" :
267216 sys .exit ("TERMINATING" ) # pragma: no cover - deliberate termination path
0 commit comments