Skip to content

Commit 00f6fe7

Browse files
committed
Add rabbit channel
Signed-off-by: Piotr Persona <[email protected]>
1 parent 7dd9eb9 commit 00f6fe7

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

api/channel/rabbit.py

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,67 @@
1+
import json
2+
import logging
3+
4+
import pika.exceptions
5+
16
from api.channel.channel import Channel, LocationMessage, ChannelResponse
27

8+
LOG = logging.getLogger(__name__)
9+
10+
11+
def create_connection(host, port, connection_attempts, retry_delay):
12+
connection = pika.BlockingConnection(pika.ConnectionParameters(
13+
host=host, port=port, connection_attempts=connection_attempts, retry_delay=retry_delay))
14+
channel = connection.channel()
15+
return channel
16+
17+
18+
def create_rabbit_channel(channel, exchange, topic):
19+
rabbit_channel = RabbitChannel(channel, exchange=exchange, topic=topic)
20+
return rabbit_channel
21+
322

423
class RabbitChannel(Channel):
5-
def __init__(self):
6-
pass
24+
__exchange_type = 'topic'
25+
26+
def __init__(self, channel, exchange, topic):
27+
self.channel = channel
28+
self.exchange = exchange
29+
self.topic = topic
30+
31+
@staticmethod
32+
def __serialize_message(message: LocationMessage):
33+
location = message.location
34+
body = json.dumps(dict(
35+
longitude=location.longitude,
36+
latitude=location.latitude,
37+
))
38+
return body
739

840
def send(self, message: LocationMessage) -> ChannelResponse:
9-
pass
41+
try:
42+
self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.__exchange_type)
43+
except pika.exceptions.AMQPError as amqp_error:
44+
LOG.error(amqp_error)
45+
return ChannelResponse(
46+
message=f"Cannot declare exchange {self.exchange} of type {self.__exchange_type}",
47+
status=ChannelResponse.Status.ERROR,
48+
)
49+
user_id = message.user_id
50+
routing_key = f'{self.topic}.{user_id}'
51+
body = self.__serialize_message(message)
52+
try:
53+
self.channel.basic_publish(
54+
exchange=self.exchange,
55+
routing_key=routing_key,
56+
body=body,
57+
)
58+
except pika.exceptions.AMQPError as amqp_error:
59+
LOG.error(amqp_error)
60+
return ChannelResponse(
61+
message=f"Cannot publish message to {routing_key}",
62+
status=ChannelResponse.Status.ERROR,
63+
)
64+
return ChannelResponse(
65+
message=f"Location uploaded for user {user_id}",
66+
status=ChannelResponse.Status.OK,
67+
)

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
connexion[swagger-ui]
2+
pika==1.1.0

0 commit comments

Comments
 (0)