Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add jaeger tracing #96

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Configuration is provided via environment variables:
|K8S| service running as kubernetes service |no|
|K8S_RTPENGINE_SERVICE_NAME| rtpengine service name(required for K8S) |no|
|K8S_FEATURE_SERVER_SERVICE_NAME| feature server service name(required for K8S) |no|
|JAMBONES_OTEL_ENABLED| set to 1 to enable otel tracing |no|
|JAMBONES_OTEL_SERVICE_NAME| app name |no|
|OTEL_EXPORTER_JAEGER_ENDPOINT| jaeger endpoint url 'http://127.0.0.1:14268/api/traces' |no|

##### drachtio server location
```
Expand Down
21 changes: 21 additions & 0 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ assert.ok(process.env.DRACHTIO_SECRET, 'missing DRACHTIO_SECRET env var');
assert.ok(process.env.JAMBONES_TIME_SERIES_HOST, 'missing JAMBONES_TIME_SERIES_HOST env var');
assert.ok(process.env.JAMBONES_NETWORK_CIDR || process.env.K8S, 'missing JAMBONES_NETWORK_CIDR env var');


const Srf = require('drachtio-srf');
const srf = new Srf('sbc-inbound');
const opts = Object.assign({
Expand Down Expand Up @@ -77,6 +78,21 @@ const {getRtpEngine, setRtpEngines} = require('@jambonz/rtpengine-utils')([], lo
dtmfListenPort: process.env.DTMF_LISTEN_PORT || 22224,
protocol: ngProtocol
});

const {version} = require('./package.json');
const {JambonzTracer} = require('@jambonz/tracing');
const {tracer} = new JambonzTracer({
version,
name: process.env.JAMBONES_OTEL_SERVICE_NAME || 'jambonz-sbc-inbound',
enabled: process.env.JAMBONES_OTEL_ENABLED,
jaegerHost: process.env.OTEL_EXPORTER_JAEGER_AGENT_HOST,
jaegerEndpoint: process.env.OTEL_EXPORTER_JAEGER_ENDPOINT,
zipkinUrl: process.env.OTEL_EXPORTER_ZIPKIN_URL,
collectorUrl: process.env.OTEL_EXPORTER_COLLECTOR_URL,
logLevel: process.env.JAMBONES_LOGLEVEL
});


srf.locals = {...srf.locals,
stats,
writeCallCount,
Expand Down Expand Up @@ -105,6 +121,9 @@ srf.locals = {...srf.locals,
incrKey,
decrKey,
retrieveSet
},
otel: {
tracer
}
};
const {
Expand All @@ -125,6 +144,7 @@ const activeCallIds = srf.locals.activeCallIds;

const {
initLocals,
createRootSpan,
handleSipRec,
identifyAccount,
checkLimits,
Expand Down Expand Up @@ -194,6 +214,7 @@ if (process.env.NODE_ENV === 'test') {
/* install middleware */
srf.use('invite', [
initLocals,
createRootSpan,
handleSipRec,
identifyAccount,
checkLimits,
Expand Down
22 changes: 18 additions & 4 deletions lib/call-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const {
SdpWantsSDES,
nudgeCallCounts,
roundTripTime,
parseConnectionIp
parseConnectionIp,
} = require('./utils');

const {forwardInDialogRequests} = require('drachtio-fn-b2b-sugar');
Expand Down Expand Up @@ -125,7 +125,7 @@ class CallSession extends Emitter {
unsubscribeDTMF,
subscribeRequest,
subscribeAnswer,
unsubscribe
unsubscribe,
} = engine;
this.offer = offer;
this.answer = answer;
Expand All @@ -141,8 +141,12 @@ class CallSession extends Emitter {
this.subscribeAnswer = subscribeAnswer;
this.unsubscribe = unsubscribe;

const {rootSpan} = this.req.locals;
const childSpan = rootSpan.startChildSpan('sbc:inbound-connect', {});

const featureServer = await this.getFeatureServer();
if (!featureServer) {
childSpan.endWithError('No available feature servers');
this.logger.info('No available feature servers, rejecting call!');
const tags = ['accepted:no', 'sipStatus:480', `originator:${this.req.locals.originator}`];
this.stats.increment('sbc.terminations', tags);
Expand Down Expand Up @@ -187,6 +191,7 @@ class CallSession extends Emitter {
'direction:inbound', 'command:offer', `rtpengine:${this.rtpengineIp}`]);
this.logger.debug({opts, response, rtt, rtpengine: this.rtpengineIp}, 'response from rtpengine to offer');
if ('ok' !== response.result) {
childSpan.endWithError('rtpengine offer failed');
this.logger.error({}, `rtpengine offer failed with ${JSON.stringify(response)}`);
throw new Error('rtpengine failed: answer');
}
Expand All @@ -196,7 +201,8 @@ class CallSession extends Emitter {
'To': this.req.get('To'),
'X-Account-Sid': this.req.locals.account_sid,
'X-CID': this.req.get('Call-ID'),
'X-Forwarded-For': `${this.req.source_address}`
'X-Forwarded-For': `${this.req.source_address}`,
...childSpan.getSIPTracingPropagationHeaders()
};
if (this.privateSipAddress) headers = {...headers, Contact: `<sip:${this.privateSipAddress}>`};

Expand Down Expand Up @@ -233,7 +239,10 @@ class CallSession extends Emitter {
}
}

if (this.req.canceled) throw new Error('call canceled');
if (this.req.canceled) {
childSpan.endWithError('call canceled');
throw new Error('call canceled');
}

// now send the INVITE in towards the feature servers
debug(`sending INVITE to ${proxy} with ${uri}`);
Expand Down Expand Up @@ -289,18 +298,23 @@ class CallSession extends Emitter {
this.logger.info('call connected successfully to feature server');
debug('call connected successfully to feature server');
this._setHandlers({uas, uac});
childSpan.end();
return;
} catch (err) {
this.rtpEngineResource.destroy().catch((err) => this.logger.info({err}, 'Error destroying rtpe after failure'));
this.activeCallIds.delete(this.req.get('Call-ID'));
if (err instanceof SipError) {
childSpan.endWithError(`sipStatus:${err.status}`);
const tags = ['accepted:no', `sipStatus:${err.status}`, `originator:${this.req.locals.originator}`];
this.stats.increment('sbc.terminations', tags);
this.logger.info(`call failed to connect to feature server with ${err.status}`);
this.emit('failed');
}
else if (err.message !== 'call canceled') {
childSpan.endWithError('unexpected error routing inbound call');
this.logger.error(err, 'unexpected error routing inbound call');
} else {
childSpan.endWithError('call error');
}
this.srf.endSession(this.req);
}
Expand Down
Loading