-
Notifications
You must be signed in to change notification settings - Fork 63
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
319 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,297 @@ | ||
/*! | ||
* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
import SwPlugin from '../core/SwPlugin'; | ||
import ContextManager from '../trace/context/ContextManager'; | ||
import { Component } from '../trace/Component'; | ||
import Tag from '../Tag'; | ||
import { SpanLayer } from '../proto/language-agent/Tracing_pb'; | ||
import PluginInstaller from '../core/PluginInstaller'; | ||
import agentConfig from '../config/AgentConfig'; | ||
|
||
class MongoDBPlugin implements SwPlugin { | ||
readonly module = 'mongodb'; | ||
readonly versions = '*'; | ||
|
||
Cursor: any; | ||
|
||
// Experimental method to determine proper end time of cursor DB operation, we stop the span when the cursor is closed. | ||
// Problematic because other exit spans may be created during processing, for this reason we do not .resync() this | ||
// span to the span list until it is closed. If the cursor is never closed then the span will not be sent. | ||
|
||
maybeHookCursor(span: any, cursor: any): boolean { | ||
if (!(cursor instanceof this.Cursor)) | ||
return false; | ||
|
||
cursor.on('error', (err: any) => { | ||
span.resync(); // this may precede 'close' .resync() but its fine | ||
span.error(err); | ||
span.stop(); | ||
}); | ||
|
||
cursor.on('close', () => { | ||
span.resync(); // cursor does not .resync() until it is closed because maybe other exit spans will be opened during processing | ||
span.stop(); | ||
}); | ||
|
||
return true; | ||
} | ||
|
||
install(installer: PluginInstaller): void { | ||
const plugin = this; | ||
const Collection = installer.require('mongodb/lib/collection'); | ||
this.Cursor = installer.require('mongodb/lib/cursor'); | ||
|
||
const wrapCallback = (span: any, args: any[], idx: number): boolean => { | ||
const callback = args.length > idx && typeof args[idx = args.length - 1] === 'function' ? args[idx] : null; | ||
|
||
if (!callback) | ||
return false; | ||
|
||
args[idx] = function(this: any, error: any, result: any) { | ||
if (error || !plugin.maybeHookCursor(span, result)) { | ||
span.resync(); | ||
|
||
if (error) | ||
span.error(error); | ||
|
||
span.stop(); | ||
} | ||
|
||
return callback.call(this, error, result); | ||
} | ||
|
||
return true; | ||
}; | ||
|
||
const stringify = (params: any) => { | ||
if (params === undefined) | ||
return ''; | ||
|
||
let str = JSON.stringify(params); | ||
|
||
if (str.length > agentConfig.mongo_parameters_max_length) | ||
str = str.slice(0, agentConfig.mongo_parameters_max_length) + ' ...'; | ||
|
||
return str; | ||
} | ||
|
||
const insertFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [doc(s), options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}()`)); | ||
|
||
if (agentConfig.mongo_trace_parameters) | ||
span.tag(Tag.dbMongoParameters(stringify(args[0]))); | ||
|
||
return wrapCallback(span, args, 1); | ||
}; | ||
|
||
const deleteFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [filter, options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])})`)); | ||
|
||
return wrapCallback(span, args, 1); | ||
}; | ||
|
||
const updateFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [filter, update, options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])})`)); | ||
|
||
if (agentConfig.mongo_trace_parameters) | ||
span.tag(Tag.dbMongoParameters(stringify(args[1]))); | ||
|
||
return wrapCallback(span, args, 2); | ||
}; | ||
|
||
const findOneFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${typeof args[0] !== 'function' ? stringify(args[0]) : ''})`)); | ||
|
||
return wrapCallback(span, args, 0); | ||
}; | ||
|
||
const findAndRemoveFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, sort, options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${stringify(args[0])}${typeof args[1] !== 'function' && args[1] !== undefined ? ', ' + stringify(args[1]) : ''})`)); | ||
|
||
return wrapCallback(span, args, 1); | ||
}; | ||
|
||
const findAndModifyFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [query, sort, doc, options, callback] | ||
let params = stringify(args[0]); | ||
|
||
if (typeof args[1] !== 'function' && args[1] !== undefined) { | ||
params += ', ' + stringify(args[1]); | ||
|
||
if (typeof args[2] !== 'function' && args[2] !== undefined) { | ||
if (agentConfig.mongo_trace_parameters) | ||
span.tag(Tag.dbMongoParameters(stringify(args[2]))); | ||
} | ||
} | ||
|
||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${params})`)); | ||
|
||
return wrapCallback(span, args, 1); | ||
}; | ||
|
||
const mapReduceFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [map, reduce, options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}(${args[0]}, ${args[1]})`)); | ||
|
||
return wrapCallback(span, args, 2); | ||
}; | ||
|
||
const dropFunc = function(this: any, operation: string, span: any, args: any[]): boolean { // args = [options, callback] | ||
span.tag(Tag.dbStatement(`${this.s.namespace.collection}.${operation}()`)); | ||
|
||
return wrapCallback(span, args, 0); | ||
}; | ||
|
||
this.interceptOperation(Collection, 'insert', insertFunc); | ||
this.interceptOperation(Collection, 'insertOne', insertFunc); | ||
this.interceptOperation(Collection, 'insertMany', insertFunc); | ||
this.interceptOperation(Collection, 'save', insertFunc); | ||
this.interceptOperation(Collection, 'deleteOne', deleteFunc); | ||
this.interceptOperation(Collection, 'deleteMany', deleteFunc); | ||
this.interceptOperation(Collection, 'remove', deleteFunc); | ||
this.interceptOperation(Collection, 'removeOne', deleteFunc); | ||
this.interceptOperation(Collection, 'removeMany', deleteFunc); | ||
this.interceptOperation(Collection, 'update', updateFunc); | ||
this.interceptOperation(Collection, 'updateOne', updateFunc); | ||
this.interceptOperation(Collection, 'updateMany', updateFunc); | ||
this.interceptOperation(Collection, 'replaceOne', updateFunc); | ||
this.interceptOperation(Collection, 'find', findOneFunc); // cursor | ||
this.interceptOperation(Collection, 'findOne', findOneFunc); | ||
this.interceptOperation(Collection, 'findOneAndDelete', deleteFunc); | ||
this.interceptOperation(Collection, 'findOneAndReplace', updateFunc); | ||
this.interceptOperation(Collection, 'findOneAndUpdate', updateFunc); | ||
this.interceptOperation(Collection, 'findAndRemove', findAndRemoveFunc); | ||
this.interceptOperation(Collection, 'findAndModify', findAndModifyFunc); | ||
|
||
this.interceptOperation(Collection, 'bulkWrite', insertFunc); | ||
this.interceptOperation(Collection, 'mapReduce', mapReduceFunc); | ||
this.interceptOperation(Collection, 'aggregate', deleteFunc); // cursor | ||
this.interceptOperation(Collection, 'distinct', findAndRemoveFunc); | ||
this.interceptOperation(Collection, 'count', findOneFunc); | ||
this.interceptOperation(Collection, 'estimatedDocumentCount', dropFunc); | ||
this.interceptOperation(Collection, 'countDocuments', findOneFunc); | ||
|
||
this.interceptOperation(Collection, 'rename', deleteFunc); | ||
this.interceptOperation(Collection, 'drop', dropFunc); | ||
|
||
|
||
// TODO? | ||
|
||
// createIndex | ||
// createIndexes | ||
// dropIndex | ||
// dropIndexes | ||
// dropAllIndexes | ||
// ensureIndex | ||
// indexExists | ||
// indexInformation | ||
// indexes | ||
// listIndexes | ||
// reIndex | ||
|
||
// stats | ||
// options | ||
// isCapped | ||
// initializeUnorderedBulkOp | ||
// initializeOrderedBulkOp | ||
// watch | ||
|
||
|
||
// NODO: | ||
|
||
// group | ||
// parallelCollectionScan | ||
// geoHaystackSearch | ||
} | ||
|
||
interceptOperation(Collection: any, operation: string, operationFunc: any): void { | ||
const plugin = this; | ||
const _original = Collection.prototype[operation]; | ||
|
||
if (!_original) | ||
return; | ||
|
||
Collection.prototype[operation] = function(...args: any[]) { | ||
let ret: any; | ||
let host: string; | ||
|
||
try { | ||
host = this.s.db.serverConfig.s.options.servers.map((s: any) => `${s.host}:${s.port}`).join(','); // will this work for non-NativeTopology? | ||
} catch { | ||
host = '???'; | ||
} | ||
|
||
const span = ContextManager.current.newExitSpan('/' + this.s.namespace.db, host).start(); // or this.s.db.databaseName | ||
|
||
try { | ||
span.component = Component.MONGODB; | ||
span.layer = SpanLayer.DATABASE; | ||
span.peer = host; | ||
|
||
span.tag(Tag.dbType('MongoDB')); | ||
span.tag(Tag.dbInstance(`${this.s.namespace.db}`)); | ||
|
||
const hasCB = operationFunc.call(this, operation, span, args); | ||
|
||
ret = _original.apply(this, args); | ||
|
||
if (!hasCB) { | ||
if (plugin.maybeHookCursor(span, ret)) { | ||
// NOOP | ||
|
||
} else if (!ret || typeof ret.then !== 'function') { // generic Promise check | ||
span.stop(); // no callback passed in and no Promise or Cursor returned, play it safe | ||
|
||
return ret; | ||
|
||
} else { | ||
ret = ret.then( | ||
(res: any) => { | ||
span.resync(); | ||
span.stop(); | ||
|
||
return res; | ||
}, | ||
|
||
(err: any) => { | ||
span.resync(); | ||
span.error(err); | ||
span.stop(); | ||
|
||
return Promise.reject(err); | ||
} | ||
); | ||
} | ||
} | ||
|
||
} catch (e) { | ||
span.error(e); | ||
span.stop(); | ||
|
||
throw e; | ||
} | ||
|
||
span.async(); | ||
|
||
return ret; | ||
}; | ||
} | ||
} | ||
|
||
// noinspection JSUnusedGlobalSymbols | ||
export default new MongoDBPlugin(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters