Skip to content

Commit

Permalink
[#206] Support grpc built-in retry header
Browse files Browse the repository at this point in the history
  • Loading branch information
feelform committed Aug 14, 2024
1 parent 4b1ca89 commit 690612e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 25 deletions.
11 changes: 3 additions & 8 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@ const GrpcClientSideStream = require('./grpc-client-side-stream')
const Scheduler = require('../utils/scheduler')
const makeAgentInformationMetadataInterceptor = require('./grpc/make-agent-information-metadata-interceptor')
const socketIdInterceptor = require('./grpc/socketid-interceptor')
const grpcBuiltInRetryHeaderInterceptor = require('./grpc/grpc-built-in-retry-header-interceptor')
const CallArgumentsBuilder = require('./call-arguments-builder')
const OptionsBuilder = require('./grpc/options-builder')

// AgentInfoSender.java
// refresh daily
const DEFAULT_AGENT_INFO_REFRESH_INTERVAL_MS = 24 * 60 * 60 * 1000
// retry every 3 seconds
const DEFAULT_AGENT_INFO_SEND_INTERVAL_MS = 3 * 1000
// retry 3 times per attempt
const DEFAULT_MAX_TRY_COUNT_PER_ATTEMPT = 3

// in GrpcTransportConfig.java
const DEFAULT_METADATA_RETRY_MAX_COUNT = 3
const DEFAULT_METADATA_RETRY_DELAY_MILLIS = 1000

class GrpcDataSender {
constructor(collectorIp, collectorTcpPort, collectorStatPort, collectorSpanPort, agentInfo, config) {
Expand Down Expand Up @@ -70,6 +63,7 @@ class GrpcDataSender {
const agentBuilder = new OptionsBuilder()
.addInterceptor(makeAgentInformationMetadataInterceptor(this.agentInfo))
.addInterceptor(socketIdInterceptor)
.addInterceptor(grpcBuiltInRetryHeaderInterceptor)

if (config && config.grpcServiceConfig && typeof config.grpcServiceConfig.getAgentServiceConfig === 'function') {
agentBuilder.setGrpcServiceConfig(config.grpcServiceConfig.getAgentServiceConfig())
Expand All @@ -80,6 +74,7 @@ class GrpcDataSender {
initializeMetadataClients(collectorIp, collectorTcpPort, config) {
const metadataBuilder = new OptionsBuilder()
.addInterceptor(makeAgentInformationMetadataInterceptor(this.agentInfo))
.addInterceptor(grpcBuiltInRetryHeaderInterceptor)

if (config && config.grpcServiceConfig && typeof config.grpcServiceConfig.getMetadataServiceConfig === 'function') {
metadataBuilder.setGrpcServiceConfig(config.grpcServiceConfig.getMetadataServiceConfig())
Expand Down
21 changes: 21 additions & 0 deletions lib/client/grpc/grpc-built-in-retry-header-interceptor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Pinpoint Node.js Agent
* Copyright 2020-present NAVER Corp.
* Apache License v2.0
*/

'use strict'

const grpc = require('@grpc/grpc-js')
const InterceptingCall = grpc.InterceptingCall

const grpcBuiltInRetryHeaderInterceptor = function (options, nextCall) {
return new InterceptingCall(nextCall(options), {
start: function (metadata, listener, next) {
metadata.add('grpc.built-in.retry', 'true')
next(metadata, listener, next)
},
})
}

module.exports = grpcBuiltInRetryHeaderInterceptor
79 changes: 62 additions & 17 deletions test/client/grpc-unary-rpc.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const SqlUidMetaData = require('../../lib/client/sql-uid-meta-data')
let callCount = 0
let afterCount = 0
let callRequests = []
let callMetadata = []
// https://github.com/agreatfool/grpc_tools_node_protoc_ts/blob/v5.0.0/examples/src/grpcjs/client.ts
const service = (call, callback) => {
const succeedOnRetryAttempt = call.metadata.get('succeed-on-retry-attempt')
Expand All @@ -35,6 +36,7 @@ const service = (call, callback) => {
result.setSuccess(true)
result.setMessage(`succeed-on-retry-attempt: ${succeedOnRetryAttempt[0]}, grpc-previous-rpc-attempts: ${previousAttempts[0]}`)
callRequests.push(call.request)
callMetadata.push(call.metadata)
callback(null, result)
} else {
const statusCode = call.metadata.get('respond-with-status')
Expand All @@ -58,6 +60,7 @@ function beforeSpecificOne(port, one, serviceConfig) {
afterCount = 0
config.clear()
callRequests = []
callMetadata = []
const actualConfig = config.getConfig({ 'grpc.service_config': serviceConfig })
actualConfig.collectorIp = 'localhost'
actualConfig.collectorTcpPort = port
Expand Down Expand Up @@ -149,9 +152,6 @@ test('AgentInfo with retries enabled but not configured', (t) => {
dataSender = beforeSpecificOne(port, AgentInfoOnlyDataSource)

let callArguments = new CallArgumentsBuilder(function (error, response) {
if (error) {
t.fail(error)
}
t.true(response.getSuccess(), '1st PResult.success is true')
}).build()
dataSender.sendAgentInfo(agentInfo(), callArguments)
Expand All @@ -163,9 +163,6 @@ test('AgentInfo with retries enabled but not configured', (t) => {
dataSender.sendAgentInfo(agentInfo(), callArguments)

callArguments = new CallArgumentsBuilder(function (error, response) {
if (error) {
t.fail(error)
}
t.true(response.getSuccess(), '3st PResult.success is true')
t.end()
}).build()
Expand All @@ -190,9 +187,6 @@ test('AgentInfo with retries enabled and configured', (t) => {
dataSender = beforeSpecificOne(port, AgentInfoOnlyDataSource)

let callArguments = new CallArgumentsBuilder(function (error, response) {
if (error) {
t.fail(error)
}
t.true(response.getSuccess(), '1st PResult.success is true')
t.equal(response.getMessage(), 'succeed-on-retry-attempt: undefined, grpc-previous-rpc-attempts: undefined', '1st PResult.message is "succeed-on-retry-attempt: undefined, grpc-previous-rpc-attempts: undefined"')
afterOne(t)
Expand All @@ -209,9 +203,6 @@ test('AgentInfo with retries enabled and configured', (t) => {
dataSender.sendAgentInfo(agentInfo(), callArguments)

callArguments = new CallArgumentsBuilder(function (error, response) {
if (error) {
t.fail(error)
}
t.true(response.getSuccess(), '3st PResult.success is true')
t.equal(response.getMessage(), 'succeed-on-retry-attempt: undefined, grpc-previous-rpc-attempts: undefined', '3st PResult.message is "succeed-on-retry-attempt: undefined, grpc-previous-rpc-attempts: undefined"')
afterOne(t)
Expand All @@ -237,10 +228,10 @@ test('sendApiMetaInfo retry', (t) => {

let actual = new ApiMetaInfo(1, 'ApiDescriptor', MethodType.DEFAULT)
let callArguments = new CallArgumentsBuilder(function (error, response) {
if (error) {
t.fail(error)
}
t.true(response.getSuccess(), '1st PResult.success is true')

const metadata = callMetadata[0]
t.deepEqual(metadata.get('grpc.built-in.retry'), ['true'], '1st metadata.get("grpc.built-in.retry") is "true"')
afterOne(t)
}).build()
dataSender.sendApiMetaInfo(actual, callArguments)
Expand All @@ -254,6 +245,9 @@ test('sendApiMetaInfo retry', (t) => {
t.equal(callRequests[1].getApiid(), 2, '2nd callRequests[1].apiId is 2')
t.equal(callRequests[1].getApiinfo(), 'ApiDescriptor2', '2nd callRequests[1].apiInfo is "ApiDescriptor2"')
t.equal(callRequests[1].getType(), MethodType.DEFAULT, '2nd callRequests[1].type is MethodType.DEFAULT')

const metadata = callMetadata[1]
t.deepEqual(metadata.get('grpc.built-in.retry'), ['true'], '2nd metadata.get("grpc.built-in.retry") is "true"')
afterOne(t)
}).setMetadata('succeed-on-retry-attempt', '2')
.setMetadata('respond-with-status', '14')
Expand All @@ -275,7 +269,8 @@ test('sendApiMetaInfo lineNumber and location', (t) => {
let dataSender
server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
dataSender = beforeSpecificOne(port, MetaInfoOnlyDataSource)
const apiMetaInfoActual = ApiMetaInfo.create(new MethodDescriptorBuilder()

let apiMetaInfoActual = ApiMetaInfo.create(new MethodDescriptorBuilder()
.setApiId(1)
.setClassName('Router')
.setMethodName('get')
Expand All @@ -284,7 +279,6 @@ test('sendApiMetaInfo lineNumber and location', (t) => {
.setLocation('node_modules/express/lib/application.js')
.build()
)

let callArguments = new CallArgumentsBuilder(function (error, response) {
t.true(response.getSuccess(), '1st PResult.success is true')

Expand All @@ -294,10 +288,59 @@ test('sendApiMetaInfo lineNumber and location', (t) => {
t.equal(data.getType(), 1400, 'type')
t.equal(data.getLine(), 481, 'line')
t.equal(data.getLocation(), 'node_modules/express/lib/application.js', 'location')

const metadata = callMetadata[0]
t.deepEqual(metadata.get('grpc.built-in.retry'), ['true'], '1st metadata.get("grpc.built-in.retry") is "true"')
afterOne(t)
}).build()
dataSender.sendApiMetaInfo(apiMetaInfoActual, callArguments)

apiMetaInfoActual = ApiMetaInfo.create(new MethodDescriptorBuilder()
.setApiId(2)
.setClassName('Router')
.setMethodName('post')
.setType(1400)
.setLineNumber(482)
.setLocation('node_modules/express/lib/application.js')
.build()
)
callArguments = new CallArgumentsBuilder(function (error, response) {
t.true(response.getSuccess(), '2nd PResult.success is true')

const data = callRequests[1]
t.equal(data.getApiid(), 2, 'apiId')
t.equal(data.getApiinfo(), 'Router.post', 'Apiinfo')
t.equal(data.getType(), 1400, 'type')
t.equal(data.getLine(), 482, 'line')
t.equal(data.getLocation(), 'node_modules/express/lib/application.js', 'location')

const metadata = callMetadata[1]
t.deepEqual(metadata.get('grpc.built-in.retry'), ['true'], '2nd metadata.get("grpc.built-in.retry") is "true"')
afterOne(t)
}).setMetadata('succeed-on-retry-attempt', '2')
.setMetadata('respond-with-status', '14')
.build()
dataSender.sendApiMetaInfo(apiMetaInfoActual, callArguments)

apiMetaInfoActual = ApiMetaInfo.create(new MethodDescriptorBuilder()
.setApiId(3)
.setClassName('Router')
.setMethodName('put')
.setType(1400)
.setLineNumber(483)
.setLocation('node_modules/express/lib/application.js')
.build()
)
callArguments = new CallArgumentsBuilder(function (error, response) {
t.equal(error.code, 14, `3rd error.code is 14`)
t.equal(error.details, 'Failed on retry 2', `3rd error.details is "Failed on retry 2"`)
t.equal(error.message, '14 UNAVAILABLE: Failed on retry 2', `3rd error.message is "14 UNAVAILABLE: Failed on retry 2"`)
afterOne(t)
}).setMetadata('succeed-on-retry-attempt', '3')
.setMetadata('respond-with-status', '14')
.build()
dataSender.sendApiMetaInfo(apiMetaInfoActual, callArguments)

t.teardown(() => {
dataSender.close()
server.forceShutdown()
Expand Down Expand Up @@ -392,6 +435,7 @@ test('sendSqlUidMetaData retry', (t) => {
t.true(response.getSuccess(), '1st PResult.success is true')
t.deepEqual(callRequests[0].getSqluid(), parsingResult.getId(), '1st callRequests[0].getSqlid() is parsingResult.getId()')
t.equal(callRequests[0].getSql(), parsingResult.getSql(), '1st callRequests[0].getSql() is parsingResult.getSql()')
t.deepEqual(callMetadata[0].get('grpc.built-in.retry'), ['true'], '1st metadata.get("grpc.built-in.retry") is "true"')
afterOne(t)
}).build()
dataSender.sendSqlUidMetaData(actual, callArguments)
Expand All @@ -402,6 +446,7 @@ test('sendSqlUidMetaData retry', (t) => {
t.true(response.getSuccess(), '2nd PResult.success is true')
t.deepEqual(callRequests[1].getSqluid(), parsingResult2.getId(), '2nd callRequests[1].getSqlid() is parsingResult.getId()')
t.equal(callRequests[1].getSql(), parsingResult2.getSql(), '2nd callRequests[1].getSql() is parsingResult.getSql()')
t.deepEqual(callMetadata[1].get('grpc.built-in.retry'), ['true'], '2nd metadata.get("grpc.built-in.retry") is "true"')
afterOne(t)
}).setMetadata('succeed-on-retry-attempt', '2')
.setMetadata('respond-with-status', '14')
Expand Down

0 comments on commit 690612e

Please sign in to comment.