Skip to content

Commit

Permalink
Add AMQPLib plugin (RabbitMQ) (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel authored Mar 4, 2021
1 parent 0f946b9 commit 25944ed
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 8,576 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ There are some built-in plugins that support automatic instrumentation of NodeJS
Library | Plugin Name
| :--- | :--- |
| built-in `http` and `https` module | `http` / `https` |
| [`express`](https://expressjs.com) | `express` |
| [`axios`](https://github.com/axios/axios) | `axios` |
| [`mysql`](https://github.com/mysqljs/mysql) | `mysql` |
| [`pg`](https://github.com/brianc/node-postgres) | `pg` |
| [`Express`](https://expressjs.com) | `express` |
| [`Axios`](https://github.com/axios/axios) | `axios` |
| [`MySQL`](https://github.com/mysqljs/mysql) | `mysql` |
| [`PostgreSQL`](https://github.com/brianc/node-postgres) | `pg` |
| [`pg-cursor`](https://github.com/brianc/node-postgres) | `pg-cursor` |
| [`mongodb`](https://github.com/mongodb/node-mongodb-native) | `mongodb` |
| [`MongoDB`](https://github.com/mongodb/node-mongodb-native) | `mongodb` |
| [`RabbitMQ`](https://github.com/squaremo/amqp.node) | `amqplib` |

### Compatible Libraries

Expand Down
8,543 changes: 0 additions & 8,543 deletions package-lock.json

This file was deleted.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@types/node": "^14.0.11",
"@types/semver": "^7.2.0",
"@types/uuid": "^8.0.0",
"amqplib": "^0.7.0",
"axios": "^0.21.0",
"express": "^4.17.1",
"grpc-tools": "^1.10.0",
Expand Down
24 changes: 24 additions & 0 deletions src/Tag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ export default {
dbStatementKey: 'db.statement',
dbSqlParametersKey: 'db.sql.parameters',
dbMongoParametersKey: 'db.mongo.parameters',
mqBrokerKey: 'mq.broker',
mqTopicKey: 'mq.topic',
mqQueueKey: 'mq.queue',

httpStatusCode(val: string | number | undefined): Tag {
return {
Expand Down Expand Up @@ -97,4 +100,25 @@ export default {
val: `${val}`,
} as Tag;
},
mqBroker(val: string | undefined): Tag {
return {
key: this.mqBrokerKey,
overridable: true,
val: `${val}`,
} as Tag;
},
mqTopic(val: string | undefined): Tag {
return {
key: this.mqTopicKey,
overridable: true,
val: `${val}`,
} as Tag;
},
mqQueue(val: string | undefined): Tag {
return {
key: this.mqQueueKey,
overridable: true,
val: `${val}`,
} as Tag;
},
};
123 changes: 123 additions & 0 deletions src/plugins/AMQPLibPlugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*!
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import SwPlugin from '../core/SwPlugin';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import Tag from '../Tag';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { ContextCarrier } from '../trace/context/ContextCarrier';
import PluginInstaller from '../core/PluginInstaller';

class AMQPLibPlugin implements SwPlugin {
readonly module = 'amqplib';
readonly versions = '*';

install(installer: PluginInstaller): void {
const {BaseChannel} = installer.require('amqplib/lib/channel');

this.interceptProducer(BaseChannel);
this.interceptConsumer(BaseChannel);
}

interceptProducer(BaseChannel: any): void {
const _sendMessage = BaseChannel.prototype.sendMessage;

BaseChannel.prototype.sendMessage = function(fields: any, properties: any, content: any) {
const topic = fields.exchange || '';
const queue = fields.routingKey || '';
const peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;

const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', peer).start();

try {
span.inject().items.forEach((item) => {
fields.headers[item.key] = item.value;
});

span.component = Component.RABBITMQ_PRODUCER;
span.layer = SpanLayer.MQ;
span.peer = peer;

span.tag(Tag.mqBroker((this.connection.stream.constructor.name === 'Socket' ? 'amqp://' : 'amqps://') + peer));

if (topic)
span.tag(Tag.mqTopic(topic));

if (queue)
span.tag(Tag.mqQueue(queue));

const ret = _sendMessage.call(this, fields, properties, content);

span.stop();

return ret;

} catch (e) {
span.error(e);
span.stop();

throw e;
}
}
}

interceptConsumer(BaseChannel: any): void {
const _dispatchMessage = BaseChannel.prototype.dispatchMessage;

BaseChannel.prototype.dispatchMessage = function(fields: any, message: any) {
const topic = message?.fields?.exchange || '';
const queue = message?.fields?.routingKey || '';
const carrier = ContextCarrier.from(message?.properties?.headers || {});
const span = ContextManager.current.newEntrySpan('RabbitMQ/' + topic + '/' + queue + '/Consumer', carrier).start();

try {
span.component = Component.RABBITMQ_CONSUMER;
span.layer = SpanLayer.MQ;
span.peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;

span.tag(Tag.mqBroker((this.connection.stream.constructor.name === 'Socket' ? 'amqp://' : 'amqps://') + span.peer));

if (topic)
span.tag(Tag.mqTopic(topic));

if (queue)
span.tag(Tag.mqQueue(queue));

if (message === null)
span.log('Cancel', true);

const ret = _dispatchMessage.call(this, fields, message);

span.stop();

return ret;

} catch (e) {
span.error(e);
span.stop();

throw e;
}
}
}
}

// noinspection JSUnusedGlobalSymbols
export default new AMQPLibPlugin();
29 changes: 14 additions & 15 deletions src/plugins/MongoDBPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,32 +187,31 @@ class MongoDBPlugin implements SwPlugin {
this.interceptOperation(Collection, 'estimatedDocumentCount', dropFunc);
this.interceptOperation(Collection, 'countDocuments', findOneFunc);

this.interceptOperation(Collection, 'createIndex', deleteFunc);
this.interceptOperation(Collection, 'createIndexes', deleteFunc);
this.interceptOperation(Collection, 'ensureIndex', deleteFunc);
this.interceptOperation(Collection, 'dropIndex', deleteFunc);
this.interceptOperation(Collection, 'dropIndexes', dropFunc);
this.interceptOperation(Collection, 'dropAllIndexes', dropFunc);
this.interceptOperation(Collection, 'reIndex', dropFunc);

this.interceptOperation(Collection, 'indexes', dropFunc);
this.interceptOperation(Collection, 'indexExists', deleteFunc);
this.interceptOperation(Collection, 'indexInformation', dropFunc);
this.interceptOperation(Collection, 'listIndexes', dropFunc); // cursor

this.interceptOperation(Collection, 'rename', deleteFunc);
this.interceptOperation(Collection, 'drop', dropFunc);


// TODO?

// createIndex
// createIndexes
// dropIndex
// dropIndexes
// dropAllIndexes
// ensureIndex
// indexExists
// indexInformation
// indexes
// listIndexes
// reIndex

// stats
// options
// isCapped
// initializeUnorderedBulkOp
// initializeOrderedBulkOp
// watch


// NODO:

// group
Expand Down Expand Up @@ -243,7 +242,7 @@ class MongoDBPlugin implements SwPlugin {
host = '???';
}

span = ContextManager.current.newExitSpan('/' + this.s.namespace.db, host).start(); // or this.s.db.databaseName
span = ContextManager.current.newExitSpan('MongoDB/' + operation, host).start();

try {
span.component = Component.MONGODB;
Expand Down
2 changes: 2 additions & 0 deletions src/trace/Component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export class Component {
static readonly MONGODB = new Component(9);
static readonly POSTGRESQL = new Component(22);
static readonly HTTP_SERVER = new Component(49);
static readonly RABBITMQ_PRODUCER = new Component(52);
static readonly RABBITMQ_CONSUMER = new Component(53);
static readonly EXPRESS = new Component(4002);
static readonly AXIOS = new Component(4005);

Expand Down
18 changes: 10 additions & 8 deletions src/trace/span/Span.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,19 @@ export default abstract class Span {
return this;
}

error(error: Error): this {
this.errored = true;
log(key: string, val: any): this {
this.logs.push({
timestamp: new Date().getTime(),
items: [
{
key: 'Stack',
val: error.stack,
} as LogItem,
],
items: [{key, val: `${val}`}]
} as Log);

return this;
}

error(error: Error): this {
this.errored = true;
this.log('Stack', error.stack || '')

return this;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/plugins/mongodb/expected.data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ segmentItems:
segments:
- segmentId: not null
spans:
- operationName: /admin
- operationName: MongoDB/findOne
operationId: 0
parentSpanId: 0
spanId: 1
Expand Down
2 changes: 0 additions & 2 deletions tests/plugins/pg/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import * as http from 'http';
import agent from '../../../src';

process.env.SW_AGENT_LOGGING_LEVEL = 'ERROR';

agent.start({
serviceName: 'client',
maxBufferSize: 1000,
Expand Down
2 changes: 0 additions & 2 deletions tests/plugins/pg/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import * as http from 'http';
import {Client} from 'pg';
import agent from '../../../src';

process.env.SW_AGENT_LOGGING_LEVEL = 'ERROR';

agent.start({
serviceName: 'server',
maxBufferSize: 1000,
Expand Down

0 comments on commit 25944ed

Please sign in to comment.