Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AMQP 0-9-1 support #456

Closed
wants to merge 16 commits into from
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@types/jest": "^27.4.0",
"@types/qs": "^6.9.7",
"ajv": "^6.12.6",
"amqplib": "^0.10.3",
"async": "^3.2.0",
"better-ajv-errors": "^0.7.0",
"bufferutil": "^4.0.3",
Expand Down
184 changes: 184 additions & 0 deletions src/adapters/amqp/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import amqplib from "amqplib"
import Adapter from "../../lib/adapter.js"
import { AMQPAdapterConfig, AMQPAuthConfig } from "../../lib/index.js"
import GleeMessage from "../../lib/message.js"

interface ClientData {
auth?: AMQPAuthConfig
url?: URL
serverBindings?: any
protocolVersion?: number
}

class AMQPAdapter extends Adapter {
private client: amqplib

name(): string {
return "AMQP adapter"
}
async connect(): Promise<any> {
return this._connect()
}

async send(message: GleeMessage){
return this._send(message)
}

private async initializeConnection(data: ClientData) {
const { url, auth, serverBindings, protocolVersion } = data

return amqplib.connect({
KhudaDad414 marked this conversation as resolved.
Show resolved Hide resolved
host: url.hostname,
port: url.port || 5672,
protocol: url.protocol.slice(0, url.protocol.length - 1),
username: auth.username,
password: auth.password,
keepalive: serverBindings?.keepAlive || 0,
vhost: serverBindings?.vhost || '/',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have these properties in server binding(or any server bindings for that matter). 🤔
https://github.com/asyncapi/bindings/tree/master/amqp

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we don't. I was playing around trying a default connection. I'll definitely get rid of it

heartbeat: serverBindings?.heartbeat || 0,
protocolVersion,
} as any)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we supressing typescript typechecking here?

}

_fnConsumer(msg, callback) {
const newMsg = this._createMessage(msg)
callback(true)
this.emit("message", newMsg, this.client.connection)
}

_createMessage(msg) {
const headers = {
...msg.fields,
...msg.properties,
}
return new GleeMessage({
channel: msg.topic,
headers,
payload: msg.content.toString(),
})
}

_subscribe() {
const topics = Object.keys(this.parsedAsyncAPI.channels())
return Promise.all(
topics.map((topic) => {
const operation: any = this.parsedAsyncAPI.channel(topic).publish()
const binding = operation ? operation.binding('amqp') : undefined
this.client
.createChannel()
.then((ch) => {
let connect = ch.assertExchange(
// eslint-disable-next-line sonarjs/no-duplicate-string
binding?.exchange.name || "amqp.topic",
binding?.exchange.type || "topic",
binding?.exchange || {}
)
connect = connect.then(() => {
return ch.assertQueue(
binding?.queue.name || '',
binding?.queue|| {}
)
})
.catch((error) => this.emit("error", error))

connect
.then((conQueue) => {
const queue = conQueue.queue
const channel = ch
.bindQueue(queue, binding?.exchange.name || 'amqp.topic', topic)
.then(() => {
return queue
})
.catch((error) => console.log(error))
channel.then((queue) => {
ch.consume(queue, processMsg)
})
const processMsg = (msg) => {
msg.topic = topic
// Process incoming messages and send them to fnConsumer
// Here we need to send a callback(true) for acknowledge the message or callback(false) for reject them
this._fnConsumer(msg, function (ok) {
try {
ok ? ch.ack(msg) : ch.reject(msg, true)
} catch (e) {
this.closeOnErr(e)
}
})
}
})
.catch((error) => this.emit("error", error))
})
})
)
}

_send(message: GleeMessage): Promise<void> {
return new Promise((resolve, reject) => {
const operation = this.parsedAsyncAPI
.channel(message.channel)
.subscribe()
const binding = operation ? operation.binding('amqp') : undefined
const newMessage = Buffer.from(message.payload, "utf-8")
this.client.createChannel().then((ch) => {
const ok = ch.assertExchange(binding ? binding.exchange.name : 'amqp.topic' , binding ? binding.exchange.type : 'topic' , binding ? binding.exchange : {})
return ok.then(() => {
ch.publish(binding.exchange.name, message.channel, newMessage, {}, (err) => {
if (err) {
reject(err)
this.emit("error", err)
this.client.connection.close()
}
resolve()
return ch.close()
})
})
}).finally(() => this.client.connection.close())
})
}


async _connect(): Promise<this> {
const resolved = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the point of this variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right!

const amqpOptions: AMQPAdapterConfig = await this.resolveProtocolConfig(
"amqp"
)
const auth: AMQPAuthConfig = await this.getAuthConfig(amqpOptions.auth)
const url = new URL(this.AsyncAPIServer.url())

const protocolVersion = parseInt(
this.AsyncAPIServer.protocolVersion() || "0.9.1"
)
const serverBindings = this.AsyncAPIServer.binding('amqp')


this.client = await this.initializeConnection({
url,
auth,
serverBindings,
protocolVersion,
})

const connectClient = (): Promise<this> => {
return new Promise((resolve, reject) => {
const catchError = (error) => {
if (!resolved) return reject(error)
this.emit("error", error)
}
if (resolve) {
this.emit("connect", {
name: this.name(),
adapter: this,
connection: this.client.connection,
channels: this.getSubscribedChannels(),
})
}
this._subscribe()
.catch(catchError)
})
}

return connectClient()
}
}

export default AMQPAdapter
12 changes: 12 additions & 0 deletions src/lib/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ export interface MqttAuthConfig {
clientId?: string
}

export interface AMQPAuthConfig {
cert?: string
key?: string
username?: string
password?: string
}

export interface WsAuthConfig {
token?: string
}
Expand Down Expand Up @@ -67,6 +74,10 @@ export type MqttAdapterConfig = {
auth?: MqttAuthConfig | AuthFunction<MqttAuthConfig>
}

export type AMQPAdapterConfig = {
auth?: AMQPAuthConfig | AuthFunction<AMQPAuthConfig>
}

export type KafkaAdapterConfig = {
auth?: KafkaAuthConfig | AuthFunction<KafkaAuthConfig>
}
Expand All @@ -85,6 +96,7 @@ export type GleeConfig = {
mqtt?: MqttAdapterConfig,
http?: HttpAdapterConfig
kafka?: KafkaAdapterConfig
amqp?: AMQPAdapterConfig
}

export type GleeFunctionReturn = {
Expand Down
6 changes: 6 additions & 0 deletions src/registerAdapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { GleeConfig, GleeClusterAdapterConfig } from './lib/index.js'
import HttpServerAdapter from './adapters/http/server.js'
import HttpClientAdapter from './adapters/http/client.js'
import KafkaAdapter from './adapters/kafka/index.js'
import AMQPAdapter from './adapters/amqp/index.js'

export default async (app: Glee, parsedAsyncAPI: AsyncAPIDocument, config: GleeConfig) => {
const serverNames = await getSelectedServerNames()
Expand Down Expand Up @@ -44,6 +45,11 @@ function registerAdapterForServer(serverName: string, server: Server, app: Glee,
})
} else if (['amqp', 'amqps'].includes(protocol)) {
// TODO: Implement AMQP support
app.addAdapter(AMQPAdapter, {
serverName,
server,
parsedAsyncAPI
});
} else if (['ws', 'wss'].includes(protocol)) {
const configWsAdapter = config?.ws?.server?.adapter
if (remoteServers && remoteServers.includes(serverName)) {
Expand Down