Initial commit
This commit is contained in:
114
server/src/message-service/MS.js
Normal file
114
server/src/message-service/MS.js
Normal file
@@ -0,0 +1,114 @@
|
||||
import amqp from 'amqplib'
|
||||
import autoBind from 'auto-bind2'
|
||||
import createError from 'http-errors'
|
||||
|
||||
export class MS {
|
||||
constructor(exchangeName, options, log) {
|
||||
this.exchangeName = exchangeName
|
||||
this.options = options || {}
|
||||
this.isProduction = (process.env.NODE_ENV === 'production')
|
||||
this.log = log
|
||||
autoBind(this)
|
||||
}
|
||||
|
||||
async connect(amqpUri) {
|
||||
this.connection = await amqp.connect(amqpUri)
|
||||
this.connection.on('error', () => {
|
||||
this.log.error(`RabbitMQ has gone, shutting down service`)
|
||||
process.exit(-1)
|
||||
})
|
||||
|
||||
this.channel = await this.connection.createChannel()
|
||||
this.channel.prefetch(1) // Only process one message at a time
|
||||
return this
|
||||
}
|
||||
|
||||
async listen(obj) {
|
||||
let handlers = {}
|
||||
let typeNames = ''
|
||||
|
||||
for (const key of Object.getOwnPropertyNames(obj.constructor.prototype)) {
|
||||
const val = obj[key]
|
||||
|
||||
if (key !== 'constructor' && typeof val === 'function') {
|
||||
handlers[key] = val
|
||||
|
||||
if (!typeNames) {
|
||||
typeNames = `'${key}'`
|
||||
} else {
|
||||
typeNames += ', ' + `'${key}'`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.handlers = handlers
|
||||
|
||||
let ok = await this.channel.assertExchange(this.exchangeName, 'fanout', { durable: !!this.options.durable })
|
||||
const q = await this.channel.assertQueue('', {exclusive: true})
|
||||
|
||||
this.log.info(`Waiting for '${this.exchangeName}' exchange ${typeNames} messages in queue '${q.queue}'`)
|
||||
await this.channel.bindQueue(q.queue, this.exchangeName, '')
|
||||
this.channel.consume(q.queue, this.consumeMessage)
|
||||
}
|
||||
|
||||
consumeMessage(msg) {
|
||||
const { type, appId, replyTo, correlationId } = msg.properties
|
||||
const s = msg.content.toString()
|
||||
const content = JSON.parse(s)
|
||||
|
||||
this.log.info(`Received '${type}' from '${appId}', ${s}`)
|
||||
const sendReply = (replyContent) => {
|
||||
if (content.passback) {
|
||||
replyContent = { ...replyContent, passback: content.passback }
|
||||
}
|
||||
this.channel.sendToQueue(replyTo, new Buffer(JSON.stringify(replyContent)), {
|
||||
correlationId,
|
||||
appId,
|
||||
contentType: 'application/json',
|
||||
timestamp: Date.now(),
|
||||
type: `replyTo.${type}`
|
||||
})
|
||||
}
|
||||
|
||||
this.dispatchMessage(type, content).then((res) => {
|
||||
sendReply({ data: res })
|
||||
this.channel.ack(msg)
|
||||
this.log.info(`Processed '${type}' (correlation id '${correlationId}')`)
|
||||
}).catch((err) => {
|
||||
this.log.error(`Failed to process '${type}' (correlation id '${correlationId}')`)
|
||||
if (!this.isProduction) {
|
||||
// So we can see what happened
|
||||
console.error(err)
|
||||
}
|
||||
sendReply({ error: { name: err.name, message: err.message, problems: err.problems } })
|
||||
this.channel.ack(msg)
|
||||
})
|
||||
}
|
||||
|
||||
dispatchMessage(type, content) {
|
||||
const handler = this.handlers[type]
|
||||
|
||||
if (handler) {
|
||||
return handler(content)
|
||||
} else {
|
||||
return Promise.reject(createError.BadRequest(`Unknown message type '${type}'`))
|
||||
}
|
||||
}
|
||||
|
||||
// Used for intra-service requests, such as when generating packages
|
||||
async request(exchangeName, msgType, msg, correlationId) {
|
||||
const channel = await this.connection.createChannel()
|
||||
await channel.checkExchange(exchangeName)
|
||||
await channel.publish(exchangeName, '', new Buffer(JSON.stringify(msg)), {
|
||||
type: msgType,
|
||||
contentType: 'application/json',
|
||||
timestamp: Date.now(),
|
||||
correlationId,
|
||||
appId: this.appId,
|
||||
replyTo: this.replyQueueName
|
||||
})
|
||||
await channel.close()
|
||||
|
||||
return correlationId
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user