Skip to content

Commit e81d5da

Browse files
committed
Integrate large payload support for SQS
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary. As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action Relates to: #279
1 parent 7c90d25 commit e81d5da

File tree

4 files changed

+59
-2
lines changed

4 files changed

+59
-2
lines changed

kombu/asynchronous/aws/ext.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
try:
66
import boto3
7+
import sqs_extended_client
78
from botocore import exceptions
89
from botocore.awsrequest import AWSRequest
910
from botocore.httpsession import get_cert_path
1011
from botocore.response import get_response
1112
except ImportError:
1213
boto3 = None
14+
sqs_extended_client = None
1315

1416
class _void:
1517
pass

kombu/asynchronous/aws/sqs/ext.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@
55

66
try:
77
import boto3
8+
import sqs_extended_client
89
except ImportError:
910
boto3 = None
11+
sqs_extended_client = None

kombu/transport/SQS.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
from __future__ import annotations
134134

135135
import base64
136+
import json
136137
import socket
137138
import string
138139
import uuid
@@ -144,7 +145,7 @@
144145
from vine import ensure_promise, promise, transform
145146

146147
from kombu.asynchronous import get_event_loop
147-
from kombu.asynchronous.aws.ext import boto3, exceptions
148+
from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client
148149
from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection
149150
from kombu.asynchronous.aws.sqs.message import AsyncMessage
150151
from kombu.log import get_logger
@@ -498,6 +499,25 @@ def _message_to_python(self, message, queue_name, q_url):
498499
message['ReceiptHandle'],
499500
)
500501
else:
502+
503+
if (
504+
sqs_extended_client and
505+
isinstance(payload, list)
506+
and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS
507+
):
508+
# Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload
509+
s3_details = payload[1]
510+
s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"]
511+
512+
s3_client = self.s3()
513+
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key)
514+
515+
# The message body is under a wrapper class called StreamingBody
516+
streaming_body = response["Body"]
517+
payload = json.loads(
518+
self._optional_b64_decode(streaming_body.read())
519+
)
520+
501521
try:
502522
properties = payload['properties']
503523
delivery_info = payload['properties']['delivery_info']
@@ -713,6 +733,32 @@ def close(self):
713733
# if "can't set attribute" not in str(exc):
714734
# raise
715735

736+
def new_s3_client(
737+
self, region, access_key_id, secret_access_key, session_token=None
738+
):
739+
session = boto3.session.Session(
740+
region_name=region,
741+
aws_access_key_id=access_key_id,
742+
aws_secret_access_key=secret_access_key,
743+
aws_session_token=session_token,
744+
)
745+
is_secure = self.is_secure if self.is_secure is not None else True
746+
client_kwargs = {"use_ssl": is_secure}
747+
748+
if self.endpoint_url is not None:
749+
client_kwargs["endpoint_url"] = self.endpoint_url
750+
751+
client = session.client("s3", **client_kwargs)
752+
753+
return client
754+
755+
def s3(self):
756+
return self.new_s3_client(
757+
region=self.region,
758+
access_key_id=self.conninfo.userid,
759+
secret_access_key=self.conninfo.password,
760+
)
761+
716762
def new_sqs_client(self, region, access_key_id,
717763
secret_access_key, session_token=None):
718764
session = boto3.session.Session(
@@ -729,7 +775,13 @@ def new_sqs_client(self, region, access_key_id,
729775
client_kwargs['endpoint_url'] = self.endpoint_url
730776
client_config = self.transport_options.get('client-config') or {}
731777
config = Config(**client_config)
732-
return session.client('sqs', config=config, **client_kwargs)
778+
client = session.client('sqs', config=config, **client_kwargs)
779+
780+
if self.transport_options.get('large_payload_bucket') and sqs_extended_client:
781+
client.large_payload_support = self.transport_options.get('large_payload_bucket')
782+
client.use_legacy_attribute = False
783+
784+
return client
733785

734786
def sqs(self, queue=None):
735787
if queue is not None and self.predefined_queues:

requirements/extras/sqs.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
boto3>=1.26.143
22
urllib3>=1.26.16
3+
amazon-sqs-extended-client>=1.0.1

0 commit comments

Comments
 (0)