import amqp from 'amqplib' import autoBind from 'auto-bind2' import createError from 'http-errors' export class MS { constructor(exchangeName, options, log) { this.exchangeName = exchangeName this.options = options || {} this.isProduction = (process.env.NODE_ENV === 'production') this.log = log autoBind(this) } async connect(amqpUri) { this.connection = await amqp.connect(amqpUri) this.connection.on('error', (err) => { this.log.error(`${err}. Shutting down service.`) process.exit(-1) }) this.channel = await this.connection.createChannel() this.channel.prefetch(1) // Only process one message at a time return this } async listen(obj) { let handlers = {} let typeNames = '' for (const key of Object.getOwnPropertyNames(obj.constructor.prototype)) { const val = obj[key] if (key !== 'constructor' && typeof val === 'function') { handlers[key] = val if (!typeNames) { typeNames = `'${key}'` } else { typeNames += ', ' + `'${key}'` } } } this.handlers = handlers let ok = await this.channel.assertExchange(this.exchangeName, 'fanout', { durable: !!this.options.durable }) const q = await this.channel.assertQueue('', {exclusive: true}) this.log.info(`Waiting for '${this.exchangeName}' exchange ${typeNames} messages in queue '${q.queue}'`) await this.channel.bindQueue(q.queue, this.exchangeName, '') this.channel.consume(q.queue, this.consumeMessage) } consumeMessage(msg) { const { type, appId, replyTo, correlationId } = msg.properties const s = msg.content.toString() const content = JSON.parse(s) this.log.info(`Received '${type}' from '${appId}', ${s}`) const sendReply = (replyContent) => { if (content.passback) { replyContent = { ...replyContent, passback: content.passback } } this.channel.sendToQueue(replyTo, new Buffer(JSON.stringify(replyContent)), { correlationId, appId, contentType: 'application/json', timestamp: Date.now(), type: `replyTo.${type}` }) } this.dispatchMessage(type, content).then((res) => { sendReply({ data: res }) this.channel.ack(msg) this.log.info(`Processed '${type}' (correlation id '${correlationId}')`) }).catch((err) => { this.log.error(`Failed to process '${type}' (correlation id '${correlationId}')`) if (!this.isProduction) { // So we can see what happened console.error(err) } sendReply({ error: { name: err.name, message: err.message, problems: err.problems } }) this.channel.ack(msg) }) } dispatchMessage(type, content) { const handler = this.handlers[type] if (handler) { return handler(content) } else { return Promise.reject(createError.BadRequest(`Unknown message type '${type}'`)) } } // Used for intra-service requests async request(exchangeName, msgType, msg, correlationId) { const channel = await this.connection.createChannel() await channel.checkExchange(exchangeName) await channel.publish(exchangeName, '', new Buffer(JSON.stringify(msg)), { type: msgType, contentType: 'application/json', timestamp: Date.now(), correlationId, appId: this.appId, replyTo: this.replyQueueName }) await channel.close() return correlationId } }