Systemic Azure Bus is a systemic component for the Azure Service Bus SDK. Its goal is to help you deal with azure bus topics and queues subscriptions and publications.
This library:
- enforces the client to use a particular, sensible configuration
- provides safe defaults for configuration
- Exposes an easy interface for publication/subscription
- Solves error handling
- Allows clients to easily retry, retry with exponential backoff or dead letter a failed message
- Opens/closes the connections
A typical, simple configuration looks like this:
{
connection: {
connectionString: process.env.AZURE_SERVICEBUS_CONNECTION_STRING,
},
subscriptions: {
topicSubscriptionName: {
topic: 'myTopic',
subscription: 'myTopic.action'
},
},
publications: {
topicPublicationName: {
topic: 'myDestinationTopic',
contentType: 'application/json', // optional - default is json
},
},
}
const initBus = require('systemic-azure-bus');
const { start, stop } = initBus();
...
const api = await start({ config }); // configuration similar to the one above
const publicationId = 'topicPublicationName'; // declared in config
const publishInMyPublication = api.publish(publicationId);
await publishInMyPublication({ foo: 'bar' });
We provide a streaming API to subscribe to a topic and process messages flowing in.
const subscriptionId = 'topicSubscriptionName'; // declared in config
const subscribe = api.subscribe(console.error); // how to handle error
const handler = ({ body, userProperties }) => {
// do something with message...
};
subscribe(subscriptionId, handler);
In the case we want to retrieve the rules applied to a subscription, we can use this.
let subscriptionRules = await bus.getSubscriptionRules('topicSubscriptionName');
When a message goes to DLQ (Dead Letter Queue) we could peek those messages with this operation.
const subscriptionId = 'topicSubscriptionName'; // declared in config
const deadMessage = await api.peekDlq(subscriptionId); // retrieves only one
Sometimes we need to process messages in DLQ, i.e. to purge it or to republish and reprocess them. We provide a streaming API to process them.
const handler = ({ body, userProperties }) => {
// do something with message...
};
const subscriptionId = 'topicSubscriptionName'; // declared in config
api.processDlq(subscriptionId, handler);