Skip to content

Commit

Permalink
Segment reference and reporting overhaul (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel authored May 14, 2021
1 parent 4011a53 commit 090072e
Show file tree
Hide file tree
Showing 20 changed files with 225 additions and 201 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ agent.start({
});
```

note that all options given (including empty/null values) will override the corresponding default values, e.g. `agent.start({ collectorAddress: '' })` will override the default value of `collectorAddress` to empty string, causing errors like `DNS resolution failed`.
Note that all options given (including empty/null values) will override the corresponding default values, e.g. `agent.start({ collectorAddress: '' })` will override the default value of `collectorAddress` to empty string, causing errors like `DNS resolution failed`.

- Use environment variables.

Expand All @@ -66,6 +66,8 @@ Environment Variable | Description | Default
| `SW_MONGO_PARAMETERS_MAX_LENGTH` | The maximum string length of mongodb parameters to log | `512` |
| `SW_AGENT_MAX_BUFFER_SIZE` | The maximum buffer size before sending the segment data to backend | `'1000'` |

Note that the various ignore options like `SW_IGNORE_SUFFIX`, `SW_TRACE_IGNORE_PATH` and `SW_HTTP_IGNORE_METHOD` as well as endpoints which are not recorded due to exceeding `SW_AGENT_MAX_BUFFER_SIZE` all propagate their ignored status downstream to any other endpoints they may call. If that endpoint is running the Node Skywalking agent then regardless of its ignore settings it will not be recorded since its upstream parent was not recorded. This allows elimination of entire trees of endpoints you are not interested in as well as eliminating partial traces if a span in the chain is ignored but calls out to other endpopints which are recorded as children of ROOT instead of the actual parent.

## Supported Libraries

There are some built-in plugins that support automatic instrumentation of NodeJS libraries, the complete lists are as follows:
Expand Down
51 changes: 0 additions & 51 deletions src/agent/Buffer.ts

This file was deleted.

28 changes: 15 additions & 13 deletions src/agent/protocol/grpc/clients/TraceReportClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import { createLogger } from '../../../../logging';
import Client from './Client';
import { TraceSegmentReportServiceClient } from '../../../../proto/language-agent/Tracing_grpc_pb';
import AuthInterceptor from '../AuthInterceptor';
import Buffer from '../../../../agent/Buffer';
import SegmentObjectAdapter from '../SegmentObjectAdapter';
import { emitter } from '../../../../lib/EventEmitter';
import Segment from '../../../../trace/context/Segment';
Expand All @@ -33,20 +32,18 @@ const logger = createLogger(__filename);

export default class TraceReportClient implements Client {
private readonly reporterClient: TraceSegmentReportServiceClient;
private readonly buffer: Buffer<Segment>;
private readonly buffer: Segment[] = [];
private timeout?: NodeJS.Timeout;

constructor() {
this.buffer = new Buffer();
this.reporterClient = new TraceSegmentReportServiceClient(
config.collectorAddress,
config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(),
{ interceptors: [AuthInterceptor] },
);
emitter.on('segment-finished', (segment) => {
if (this.buffer.put(segment)) {
this.timeout?.ref();
}
this.buffer.push(segment);
this.timeout?.ref();
});
}

Expand All @@ -56,6 +53,8 @@ export default class TraceReportClient implements Client {

start() {
const reportFunction = () => {
emitter.emit('segments-sent'); // reset limiter in SpanContext

try {
if (this.buffer.length === 0) {
return;
Expand All @@ -67,15 +66,18 @@ export default class TraceReportClient implements Client {
}
});

while (this.buffer.length > 0) {
const segment = this.buffer.take();
if (segment) {
if (logger.isDebugEnabled()) {
logger.debug('Sending segment ', { segment });
}
try {
for (const segment of this.buffer) {
if (segment) {
if (logger._isDebugEnabled) {
logger.debug('Sending segment ', { segment });
}

stream.write(new SegmentObjectAdapter(segment));
stream.write(new SegmentObjectAdapter(segment));
}
}
} finally {
this.buffer.length = 0;
}

stream.end();
Expand Down
7 changes: 6 additions & 1 deletion src/core/PluginInstaller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ const logger = createLogger(__filename);

let topModule = module;
while (topModule.parent) {
const filename = topModule.filename;

topModule = topModule.parent;

if (filename.endsWith('/skywalking-nodejs/lib/index.js')) // stop at the appropriate level in case app is being run by some other framework
break;
}

export default class PluginInstaller {
Expand Down Expand Up @@ -88,7 +93,7 @@ export default class PluginInstaller {
const pluginFile = path.join(this.pluginDir, file);

try {
plugin = require(pluginFile).default as SwPlugin;
plugin = this.require(pluginFile).default as SwPlugin;
const { isSupported, version } = this.checkModuleVersion(plugin);

if (!isSupported) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/EventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Segment from '../trace/context/Segment';

declare interface SkyWalkingEventEmitter {
on(event: 'segment-finished', listener: (segment: Segment) => void): this;
on(event: 'segments-sent', listener: () => void): this;
}

class SkyWalkingEventEmitter extends EventEmitter {
Expand Down
33 changes: 24 additions & 9 deletions src/logging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import * as winston from 'winston';
import { Logger } from 'winston';

type LoggerLevelAware = Logger & {
isDebugEnabled(): boolean;
isInfoEnabled(): boolean;
_isDebugEnabled: boolean;
_isInfoEnabled: boolean;
};

export function createLogger(name: string): LoggerLevelAware {
const loggingLevel = process.env.SW_AGENT_LOGGING_LEVEL || (process.env.NODE_ENV !== 'production' ? 'debug' : 'info');
const loggingLevel = (process.env.SW_AGENT_LOGGING_LEVEL || 'error').toLowerCase();

const logger = winston.createLogger({
level: loggingLevel,
Expand All @@ -35,6 +35,7 @@ export function createLogger(name: string): LoggerLevelAware {
file: name,
},
});

if (process.env.NODE_ENV !== 'production' || process.env.SW_LOGGING_TARGET === 'console') {
logger.add(
new winston.transports.Console({
Expand All @@ -49,11 +50,25 @@ export function createLogger(name: string): LoggerLevelAware {
);
}

const isDebugEnabled = (): boolean => logger.levels[logger.level] >= logger.levels.debug;
const isInfoEnabled = (): boolean => logger.levels[logger.level] >= logger.levels.info;
const loggerLevel = logger.levels[logger.level];
const _isDebugEnabled = loggerLevel >= logger.levels.debug;
const _isInfoEnabled = loggerLevel >= logger.levels.info;

Object.assign(logger, {
_isDebugEnabled,
_isInfoEnabled,
});

const nop = (): void => { /* a cookie for the linter */ };

if (loggerLevel < logger.levels.debug) // we do this because logger still seems to stringify anything sent to it even if it is below the logging level, costing performance
(logger as any).debug = nop;

if (loggerLevel < logger.levels.info)
(logger as any).info = nop;

if (loggerLevel < logger.levels.warn)
(logger as any).warn = nop;

return Object.assign(logger, {
isDebugEnabled,
isInfoEnabled,
} as LoggerLevelAware);
return logger as LoggerLevelAware;
}
2 changes: 1 addition & 1 deletion src/plugins/AMQPLibPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AMQPLibPlugin implements SwPlugin {
const topic = fields.exchange || '';
const queue = fields.routingKey || '';
const peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;
const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', peer, Component.RABBITMQ_PRODUCER);
const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', Component.RABBITMQ_PRODUCER);

span.start();

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/AxiosPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AxiosPlugin implements SwPlugin {
const method = (config.method || 'GET').toUpperCase();
const span = ignoreHttpMethodCheck(method)
? DummySpan.create()
: ContextManager.current.newExitSpan(operation, host, Component.AXIOS, Component.HTTP);
: ContextManager.current.newExitSpan(operation, Component.AXIOS, Component.HTTP);

span.start();

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/HttpPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class HttpPlugin implements SwPlugin {
const method = arguments[url instanceof URL || typeof url === 'string' ? 1 : 0]?.method || 'GET';
const span = ignoreHttpMethodCheck(method)
? DummySpan.create()
: ContextManager.current.newExitSpan(operation, host, Component.HTTP);
: ContextManager.current.newExitSpan(operation, Component.HTTP);

if (span.depth) // if we inherited from a higher level plugin then do nothing, higher level should do all the work and we don't duplicate here
return _request.apply(this, arguments);
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/MongoDBPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class MongoDBPlugin implements SwPlugin {
host = db.serverConfig.s.options.servers.map((s: any) => `${s.host}:${s.port}`).join(','); // will this work for non-NativeTopology?
} catch { /* nop */ }

span = ContextManager.current.newExitSpan('MongoDB/' + operation, host, Component.MONGODB);
span = ContextManager.current.newExitSpan('MongoDB/' + operation, Component.MONGODB);

span.start();

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/MongoosePlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class MongoosePlugin implements SwPlugin {
return _original.apply(this, arguments);

const host = `${this.db.host}:${this.db.port}`;
span = ContextManager.current.newExitSpan('Mongoose/' + operation, host, Component.MONGOOSE, Component.MONGODB);
span = ContextManager.current.newExitSpan('Mongoose/' + operation, Component.MONGOOSE, Component.MONGODB);

span.start();

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/MySQLPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MySQLPlugin implements SwPlugin {
let query: any;

const host = `${this.config.host}:${this.config.port}`;
const span = ContextManager.current.newExitSpan('mysql/query', host, Component.MYSQL);
const span = ContextManager.current.newExitSpan('mysql/query', Component.MYSQL);

span.start();

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/PgPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MySQLPlugin implements SwPlugin {
let query: any;

const host = `${this.host}:${this.port}`;
const span = ContextManager.current.newExitSpan('pg/query', host, Component.POSTGRESQL);
const span = ContextManager.current.newExitSpan('pg/query', Component.POSTGRESQL);

span.start();

Expand Down
3 changes: 2 additions & 1 deletion src/trace/context/Context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ContextCarrier } from './ContextCarrier';
export default interface Context {
segment: Segment;
nSpans: number;
finished: boolean;

newLocalSpan(operation: string): Span;

Expand All @@ -36,7 +37,7 @@ export default interface Context {
/* if 'inherit' is specified then the span returned is marked for inheritance by an Exit span component which is
created later and calls this function with a matching 'component' value. For example Axios using an Http exit
connection will be merged into a single exit span, see those plugins for how this is done. */
newExitSpan(operation: string, peer: string, component: Component, inherit?: Component): Span;
newExitSpan(operation: string, component: Component, inherit?: Component): Span;

start(span: Span): Context;

Expand Down
16 changes: 10 additions & 6 deletions src/trace/context/ContextCarrier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,21 @@ export class ContextCarrier extends CarrierItem {
}

isValid(): boolean {
return (
this.traceId !== undefined &&
this.segmentId !== undefined &&
return Boolean(
this.traceId?.rawId &&
this.segmentId?.rawId &&
this.spanId !== undefined &&
this.service !== undefined &&
this.endpoint !== undefined &&
!isNaN(this.spanId) &&
this.service &&
this.endpoint &&
this.clientAddress !== undefined
);
}

public static from(map: { [key: string]: string }): ContextCarrier {
public static from(map: { [key: string]: string }): ContextCarrier | undefined {
if (!map.hasOwnProperty('sw8'))
return;

const carrier = new ContextCarrier();

carrier.items.filter((item) => map.hasOwnProperty(item.key)).forEach((item) => (item.value = map[item.key]));
Expand Down
Loading

0 comments on commit 090072e

Please sign in to comment.