Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AMQP 0-9-1 support #456

Closed
wants to merge 16 commits into from
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@types/jest": "^27.4.0",
"@types/qs": "^6.9.7",
"ajv": "^6.12.6",
"amqplib": "^0.10.3",
"async": "^3.2.0",
"better-ajv-errors": "^0.7.0",
"bufferutil": "^4.0.3",
Expand Down
185 changes: 185 additions & 0 deletions src/adapters/amqp/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import amqplib from "amqplib"
import Adapter from "../../lib/adapter.js"
import { AMQPAdapterConfig, AMQPAuthConfig } from "../../lib/index.js"
import GleeMessage from "../../lib/message.js"

interface ClientData {
auth?: AMQPAuthConfig
url?: URL
vhost?: string,
protocolVersion?: number
}

class AMQPAdapter extends Adapter {
private client: amqplib

name(): string {
return "AMQP adapter"
}
async connect(): Promise<any> {
return this._connect()
}

async send(message: GleeMessage){
return this._send(message)
}

private async initializeConnection(data: ClientData) {
const { url, auth, vhost, protocolVersion } = data

return amqplib.connect({
KhudaDad414 marked this conversation as resolved.
Show resolved Hide resolved
host: url.hostname,
port: url.port || 5672,
protocol: url.protocol.slice(0, url.protocol.length - 1),
username: auth.username,
password: auth.password,
keepalive: 0,
vhost: vhost || '/',
heartbeat: 0,
protocolVersion,
})
}

_fnConsumer(msg, callback) {
const newMsg = this._createMessage(msg)
callback(true)
this.emit("message", newMsg, this.client.connection)
}

_createMessage(msg) {
const headers = {
...msg.fields,
...msg.properties,
}
return new GleeMessage({
channel: msg.topic,
headers,
payload: msg.content.toString(),
})
}

_subscribe() {
const topics = Object.keys(this.parsedAsyncAPI.channels())
return Promise.all(
topics.map((topic) => {

Check warning on line 64 in src/adapters/amqp/index.ts

View workflow job for this annotation

GitHub Actions / Test NodeJS PR - ubuntu-latest

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed

Check warning on line 64 in src/adapters/amqp/index.ts

View workflow job for this annotation

GitHub Actions / Test NodeJS PR - macos-latest

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed
const operation: any = this.parsedAsyncAPI.channel(topic).publish()
const binding = operation ? operation.binding('amqp') : undefined
this.client
.createChannel()
.then((ch) => {
let connect = ch.assertExchange(
// eslint-disable-next-line sonarjs/no-duplicate-string
binding?.exchange.name || "amqp.topic",
binding?.exchange.type || "topic",
binding?.exchange || {}
)
connect = connect.then(() => {
return ch.assertQueue(
binding?.queue.name || '',
binding?.queue|| {}
)
})
.catch((error) => this.emit("error", error))

connect
.then((conQueue) => {
const queue = conQueue.queue
const channel = ch
.bindQueue(queue, binding?.exchange.name || 'amqp.topic', topic)
.then(() => {
return queue
})
.catch((error) => this.emit("error", error))
channel.then((queue) => {
ch.consume(queue, processMsg)
})
const processMsg = (msg) => {
msg.topic = topic
// Process incoming messages and send them to fnConsumer
// Here we need to send a callback(true) for acknowledge the message or callback(false) for reject them
this._fnConsumer(msg, function (ok) {
try {
ok ? ch.ack(msg) : ch.reject(msg, true)
} catch (e) {
this.closeOnErr(e)
}
})
}
})
.catch((error) => this.emit("error", error))
}).catch((error) => this.emit("error", error))
})
)
}

_send(message: GleeMessage): Promise<void> {
return new Promise((resolve, reject) => {
const operation = this.parsedAsyncAPI
.channel(message.channel)
.subscribe()
const binding = operation ? operation.binding('amqp') : undefined
const newMessage = Buffer.from(message.payload, "utf-8")
this.client.createChannel().then((ch) => {
const ok = ch.assertExchange(binding ? binding.exchange.name : 'amqp.topic' , binding ? binding.exchange.type : 'topic' , binding ? binding.exchange : {})
return ok.then(() => {
ch.publish(binding.exchange.name, message.channel, newMessage, {}, (err) => {
if (err) {
reject(err)
this.emit("error", err)
this.client.connection.close()
}
resolve()
return ch.close()
})
})
}).finally(() => this.client.connection.close())
})
}


async _connect() {
const amqpOptions: AMQPAdapterConfig = await this.resolveProtocolConfig(
"amqp"
)
const auth: AMQPAuthConfig = await this.getAuthConfig(amqpOptions.auth)
const url = new URL(this.AsyncAPIServer.url())

const protocolVersion = parseInt(
this.AsyncAPIServer.protocolVersion() || "0.9.1"
)
const channels = this.parsedAsyncAPI.channels()
const vhosts = []
for (const channel in channels) {
const operation = this.parsedAsyncAPI.channel(channel).subscribe().binding('amqp')
const vhost = operation?.queue?.vhost
if (vhosts.includes(vhost)) {
continue
} else {
vhosts.push(vhost)
this.client = await this.initializeConnection({
url,
auth,
vhost,
protocolVersion,
})
}
this._subscribe()

}
const connectClient = (): Promise<this> => {
return new Promise((resolve) => {
if (resolve) {
this.emit("connect", {
name: this.name(),
adapter: this,
connection: this.client.connection,
channels: this.getSubscribedChannels(),
})
}
})
}
connectClient()
}
}

export default AMQPAdapter
12 changes: 12 additions & 0 deletions src/lib/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ export interface MqttAuthConfig {
clientId?: string
}

export interface AMQPAuthConfig {
cert?: string
key?: string
username?: string
password?: string
}

export interface WsAuthConfig {
token?: string
}
Expand Down Expand Up @@ -67,6 +74,10 @@ export type MqttAdapterConfig = {
auth?: MqttAuthConfig | AuthFunction<MqttAuthConfig>
}

export type AMQPAdapterConfig = {
auth?: AMQPAuthConfig | AuthFunction<AMQPAuthConfig>
}

export type KafkaAdapterConfig = {
auth?: KafkaAuthConfig | AuthFunction<KafkaAuthConfig>
}
Expand All @@ -85,6 +96,7 @@ export type GleeConfig = {
mqtt?: MqttAdapterConfig,
http?: HttpAdapterConfig
kafka?: KafkaAdapterConfig
amqp?: AMQPAdapterConfig
}

export type GleeFunctionReturn = {
Expand Down
6 changes: 6 additions & 0 deletions src/registerAdapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import HttpServerAdapter from './adapters/http/server.js'
import HttpClientAdapter from './adapters/http/client.js'
import KafkaAdapter from './adapters/kafka/index.js'
import AMQPAdapter from './adapters/amqp/index.js'

export default async (app: Glee, parsedAsyncAPI: AsyncAPIDocument, config: GleeConfig) => {
const serverNames = await getSelectedServerNames()
Expand All @@ -27,7 +28,7 @@
if (config.cluster) registerAdapterForCluster(app, config.cluster)
}

function registerAdapterForServer(serverName: string, server: Server, app: Glee, parsedAsyncAPI: AsyncAPIDocument, config: GleeConfig) {

Check warning on line 31 in src/registerAdapters.ts

View workflow job for this annotation

GitHub Actions / Test NodeJS PR - ubuntu-latest

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed

Check warning on line 31 in src/registerAdapters.ts

View workflow job for this annotation

GitHub Actions / Test NodeJS PR - macos-latest

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed
const protocol = server.protocol()
const remoteServers = parsedAsyncAPI.extension('x-remoteServers')
if (['mqtt', 'mqtts', 'secure-mqtt'].includes(protocol)) {
Expand All @@ -44,6 +45,11 @@
})
} else if (['amqp', 'amqps'].includes(protocol)) {
// TODO: Implement AMQP support
app.addAdapter(AMQPAdapter, {
serverName,
server,
parsedAsyncAPI
});

Check warning on line 52 in src/registerAdapters.ts

View workflow job for this annotation

GitHub Actions / Test NodeJS PR - ubuntu-latest

Extra semicolon

Check warning on line 52 in src/registerAdapters.ts

View workflow job for this annotation

GitHub Actions / Test NodeJS PR - macos-latest

Extra semicolon
} else if (['ws', 'wss'].includes(protocol)) {
const configWsAdapter = config?.ws?.server?.adapter
if (remoteServers && remoteServers.includes(serverName)) {
Expand Down
Loading