Skip to content

Commit

Permalink
Merge pull request #85 from Avivsalem/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
Avivsalem authored Aug 23, 2023
2 parents 4f07aa3 + 1ca9369 commit 988df41
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 58 deletions.
15 changes: 10 additions & 5 deletions messageflux/iodevices/objectstorage/s3_message_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from hashlib import md5
from typing import Optional, BinaryIO, Dict, Any, Tuple, TYPE_CHECKING

import boto3

from messageflux.iodevices.base.common import MessageBundle, Message
from messageflux.iodevices.message_store_device_wrapper.message_store_base import MessageStoreException, \
MessageStoreBase
Expand Down Expand Up @@ -67,8 +69,8 @@ class _S3MessageStoreBase(MessageStoreBase, metaclass=ABCMeta):
_ORIGINAL_HEADERS_KEY = "originalheaders"

def __init__(self,
s3_resource: 'S3ServiceResource',
magic: bytes,
s3_resource: Optional['S3ServiceResource'] = None,
auto_create_bucket: bool = False,
bucket_name_formatter: Optional[BucketNameFormatterBase] = None):
"""
Expand All @@ -83,6 +85,9 @@ def __init__(self,
self.bucket_name_formatter = bucket_name_formatter or BucketNameFormatterBase()
self._magic = magic

if s3_resource is None:
s3_resource = boto3.resource('s3')

self._s3_resource = s3_resource
self._auto_create_bucket = auto_create_bucket
self._bucket_cache: Dict[str, S3Bucket] = {}
Expand Down Expand Up @@ -199,15 +204,15 @@ class S3MessageStore(_S3MessageStoreBase):
"""

def __init__(self,
s3_resource: 'S3ServiceResource',
s3_resource: Optional['S3ServiceResource'] = None,
magic: bytes = b"__S3_MSGSTORE__",
auto_create_bucket: bool = False,
bucket_name_formatter: Optional[BucketNameFormatterBase] = None,
put_object_extra_args: Optional[Dict[str, Any]] = None):
"""
An S3 based message store
:param s3_resource: the s3 resource from boto
:param s3_resource: the s3 resource from boto (or None, to create it from env vars)
:param auto_create_bucket: Whether or not a bucket will be created
when a message is being put in a nonexistent one.
:param bucket_name_formatter: a formatter to use to manipulate the bucket name.
Expand Down Expand Up @@ -238,15 +243,15 @@ class S3UploadMessageStore(_S3MessageStoreBase):
"""

def __init__(self,
s3_resource: 'S3ServiceResource',
s3_resource: Optional['S3ServiceResource'] = None,
magic: bytes = b"__S3_UPLOAD_MSGSTORE__",
auto_create_bucket: bool = False,
bucket_name_formatter: Optional[BucketNameFormatterBase] = None,
upload_extra_args: Optional[Dict[str, Any]] = None):
"""
An S3 based message store
:param s3_resource: the s3 resource from boto
:param s3_resource: the s3 resource from boto (or None, to create it from env vars)
:param auto_create_bucket: Whether or not a bucket will be created
when a message is being put in a nonexistent one.
:param bucket_name_formatter: a formatter to use to manipulate the bucket name.
Expand Down
2 changes: 2 additions & 0 deletions messageflux/iodevices/sqs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .sqs_input_device import SQSInputDeviceManager
from .sqs_output_device import SQSOutputDeviceManager
29 changes: 29 additions & 0 deletions messageflux/iodevices/sqs/message_attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json

from typing import Any, Dict, TYPE_CHECKING

if TYPE_CHECKING:
from mypy_boto3_sqs.type_defs import MessageAttributeValueQueueTypeDef


def get_aws_data_type(value: Any) -> str:
if isinstance(value, (list, set, frozenset, tuple)):
return "String.Array"
elif isinstance(value, bool):
return "String"
elif isinstance(value, (int, float)):
return "Number"
elif isinstance(value, bytes):
return "Binary"
else:
return "String"


def generate_message_attributes(attributes: Dict[str, Any]) -> Dict[str, 'MessageAttributeValueQueueTypeDef']:
return {
key: {
"DataType": get_aws_data_type(value),
"StringValue": value if isinstance(value, str) else json.dumps(value) # to avoid double encoding
}
for key, value in attributes.items()
}
146 changes: 146 additions & 0 deletions messageflux/iodevices/sqs/sqs_input_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import logging
import threading
from io import BytesIO
from typing import Optional, Union, TYPE_CHECKING

from messageflux.iodevices.base import (
InputDevice,
InputTransaction,
ReadResult,
InputDeviceException,
Message,
InputDeviceManager,
)
from messageflux.iodevices.base.input_transaction import NULLTransaction
from messageflux.iodevices.sqs.sqs_manager_base import SQSManagerBase

if TYPE_CHECKING:
from mypy_boto3_sqs.service_resource import Message as SQSMessage


class SQSInputTransaction(InputTransaction):
"""
represents a InputTransaction for SQS
"""

_device: "SQSInputDevice"

def __init__(self, device: "SQSInputDevice", message: 'SQSMessage'):
"""
:param device: the device that returned this transaction
:param message: the received message
"""
super(SQSInputTransaction, self).__init__(device=device)
self._message = message
self._logger = logging.getLogger(__name__)

def _commit(self):
try:
self._message.delete()
except Exception:
self._logger.exception("commit failed")

def _rollback(self):
try:
self._message.change_visibility(VisibilityTimeout=0)
except Exception:
self._logger.warning("rollback failed", exc_info=True)


class SQSInputDevice(InputDevice["SQSInputDeviceManager"]):
"""
represents an SQS input device
"""

def __init__(
self,
device_manager: "SQSInputDeviceManager",
queue_name: str,
included_message_attributes: Optional[Union[str, list]] = None, # TODO: what's this?
):
"""
constructs a new input SQS device
:param device_manager: the SQS device Manager that holds this device
:param queue_name: the name for the queue
"""
super().__init__(device_manager, queue_name)

if included_message_attributes is None:
included_message_attributes = ["All"]

self._included_message_attributes = included_message_attributes
self._max_messages_per_request = 1 # TODO: get this in manager
self._queue = self.manager.get_queue(queue_name)

def _read_message(
self,
cancellation_token: threading.Event,
timeout: Optional[float] = None,
with_transaction: bool = True,
) -> Optional["ReadResult"]:
"""
reads a stream from InputDevice (tries getting a message. if it fails, reconnects and tries again once)
:param timeout: the timeout in seconds to block. negative number means no blocking
:return: a tuple of stream and metadata from InputDevice, or (None, None) if no message is available
"""
if timeout is None:
sqs_messages = self._queue.receive_messages(
MessageAttributeNames=self._included_message_attributes,
MaxNumberOfMessages=self._max_messages_per_request,
) # TODO: what's the visibility timeout? should we extend it?
else:
sqs_messages = self._queue.receive_messages(
MessageAttributeNames=self._included_message_attributes,
MaxNumberOfMessages=self._max_messages_per_request,
WaitTimeSeconds=int(timeout)
) # TODO: what's the visibility timeout? should we extend it?

if not sqs_messages:
return None

assert (len(sqs_messages) == 1), "SQSInputDevice should only return one message at a time"

sqs_message = sqs_messages[0]

transaction: InputTransaction
if with_transaction:
transaction = SQSInputTransaction(device=self,
message=sqs_message)
else:
transaction = NULLTransaction(self)
sqs_message.delete()

return ReadResult(
message=Message(
headers={
key: value["BinaryValue"] if value['DataType'] == "Binary" else value['StringValue']
for key, value in sqs_message.message_attributes.items()
},
data=BytesIO(sqs_message.body.encode()),
),
transaction=transaction
)


class SQSInputDeviceManager(SQSManagerBase, InputDeviceManager[SQSInputDevice]):
"""
SQS input device manager
"""

def get_input_device(self, device_name: str) -> SQSInputDevice:
"""
Returns an incoming device by name
:param device_name: the name of the device to read from
:return: an input device for 'device_name'
"""
try:
return SQSInputDevice(self, device_name)
except Exception as e:
message = f"Couldn't create input device '{device_name}'"
self._logger.exception(message)
raise InputDeviceException(message) from e
49 changes: 49 additions & 0 deletions messageflux/iodevices/sqs/sqs_manager_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

from typing import Dict, TYPE_CHECKING, Optional

try:
import boto3
except ImportError as ex:
raise ImportError("Please Install the required extra: messageflux[sqs]") from ex

if TYPE_CHECKING:
from mypy_boto3_sqs.service_resource import Queue, SQSServiceResource


class SQSManagerBase:
"""
base class for sqs device managers
"""

def __init__(self, sqs_resource: Optional['SQSServiceResource'] = None) -> None:
"""
:param sqs_resource: the boto sqs service resource. Defaults to creating from env vars
"""
if sqs_resource is None:
sqs_resource = boto3.resource('sqs')

self._sqs_resource = sqs_resource
self._queue_cache: Dict[str, 'Queue'] = {}
self._logger = logging.getLogger(__name__)

def get_queue(self, queue_name: str) -> 'Queue':
"""
gets the queue from cache
"""
queue = self._queue_cache.get(queue_name, None)
if queue is None:
queue = self._sqs_resource.get_queue_by_name(QueueName=queue_name)
self._queue_cache[queue_name] = queue

return queue

def create_queue(self, queue_name: str, **kwargs) -> 'Queue':
"""
creates a queue
:param queue_name: the queue name to create
:return: the newly created queue
"""

return self._sqs_resource.create_queue(QueueName=queue_name, **kwargs)
71 changes: 71 additions & 0 deletions messageflux/iodevices/sqs/sqs_output_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import logging

from messageflux.iodevices.base import (
OutputDevice,
OutputDeviceException,
OutputDeviceManager,
)
from messageflux.iodevices.base.common import MessageBundle
from messageflux.iodevices.sqs.message_attributes import generate_message_attributes
from messageflux.iodevices.sqs.sqs_manager_base import SQSManagerBase
from messageflux.utils import get_random_id


class SQSOutputDevice(OutputDevice["SQSOutputDeviceManager"]):
"""
represents an SQS output devices
"""

def __init__(self, device_manager: "SQSOutputDeviceManager", queue_name: str):
"""
constructs a new output SQS device
:param device_manager: the SQS device Manager that holds this device
:param queue_name: the name of the queue
"""
super(SQSOutputDevice, self).__init__(device_manager, queue_name)
self._sqs_queue = self.manager.get_queue(queue_name)

# https://awscli.amazonaws.com/v2/documentation/api/latest/reference/sqs/get-queue-attributes.html#get-queue-attributes
self._is_fifo = queue_name.endswith(".fifo")
self._logger = logging.getLogger(__name__)

def _send_message(self, message_bundle: MessageBundle):
if self._is_fifo:
response = self._sqs_queue.send_message(
MessageBody=message_bundle.message.bytes.decode(),
MessageAttributes=generate_message_attributes(
message_bundle.message.headers
),
MessageGroupId=get_random_id(),
)
else:
response = self._sqs_queue.send_message(
MessageBody=message_bundle.message.bytes.decode(),
MessageAttributes=generate_message_attributes(
message_bundle.message.headers
),
)

if "MessageId" not in response:
raise OutputDeviceException("Couldn't send message to SQS")


class SQSOutputDeviceManager(SQSManagerBase, OutputDeviceManager[SQSOutputDevice]):
"""
this manager is used to create SQS devices
"""

def get_output_device(self, queue_name: str) -> SQSOutputDevice:
"""
Returns and outgoing device by name
:param queue_name: the name of the queue
:return: an output device for 'queue_name'
"""
try:
return SQSOutputDevice(self, queue_name)
except Exception as e:
message = f"Couldn't create output device '{queue_name}'"
self._logger.exception(message)
raise OutputDeviceException(message) from e
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ version = { file = "VERSION" }
dev = { file = "requirements-dev.txt" }
objectstorage = { file = "requirements-objectstorage.txt" }
objectstorage_mypy = { file = "requirements-objectstorage_mypy.txt" }
sqs = { file = "requirements-sqs.txt" }
sqs_mypy = { file = "requirements-sqs_mypy.txt" }
rabbitmq = { file = "requirements-rabbitmq.txt" }
rabbitmq_mypy = { file = "requirements-rabbitmq_mypy.txt" }
all = { file = "requirements-all.txt" }
Expand Down
1 change: 1 addition & 0 deletions requirements-sqs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3>=1.25,<2
2 changes: 2 additions & 0 deletions requirements-sqs_mypy.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3>=1.25,<2
boto3-stubs[sqs]>=1.25,<2
2 changes: 1 addition & 1 deletion tests/devices/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def rollback_test(input_device_manager: InputDeviceManager,
_assert_messages_equal(org_message=test_message_2, new_message=read_result2.message)
read_result1.rollback()
read_result2.rollback()

time.sleep(sleep_between_sends)
read_result = input_device.read_message(cancellation_token=cancellation_token)
assert read_result is not None
_assert_messages_equal(org_message=test_message_1, new_message=read_result.message)
Expand Down
Empty file.
Loading

0 comments on commit 988df41

Please sign in to comment.