Skip to content

Commit

Permalink
Merge pull request #2602 from murgatroid99/grpc-js_pick_first_sticky_…
Browse files Browse the repository at this point in the history
…tf_reresolve

grpc-js: pick_first: fix happy eyeballs and reresolution in sticky TF mode
  • Loading branch information
murgatroid99 authored Oct 19, 2023
2 parents ebc2c3e + d465f83 commit 845abca
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 7 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.9.6",
"version": "1.9.7",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
30 changes: 25 additions & 5 deletions packages/grpc-js/src/load-balancer-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/
private stickyTransientFailureMode = false;

/**
* Indicates whether we called channelControlHelper.requestReresolution since
* the last call to updateAddressList
*/
private requestedResolutionSinceLastUpdate = false;

/**
* The most recent error reported by any subchannel as it transitioned to
* TRANSIENT_FAILURE.
Expand Down Expand Up @@ -216,15 +222,28 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
}

private requestReresolution() {
this.requestedResolutionSinceLastUpdate = true;
this.channelControlHelper.requestReresolution();
}

private maybeEnterStickyTransientFailureMode() {
if (this.stickyTransientFailureMode) {
if (!this.allChildrenHaveReportedTF()) {
return;
}
if (!this.allChildrenHaveReportedTF()) {
if (!this.requestedResolutionSinceLastUpdate) {
/* Each time we get an update we reset each subchannel's
* hasReportedTransientFailure flag, so the next time we get to this
* point after that, each subchannel has reported TRANSIENT_FAILURE
* at least once since then. That is the trigger for requesting
* reresolution, whether or not the LB policy is already in sticky TF
* mode. */
this.requestReresolution();
}
if (this.stickyTransientFailureMode) {
return;
}
this.stickyTransientFailureMode = true;
this.channelControlHelper.requestReresolution();
for (const { subchannel } of this.children) {
subchannel.startConnecting();
}
Expand Down Expand Up @@ -256,7 +275,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
if (newState !== ConnectivityState.READY) {
this.removeCurrentPick();
this.calculateAndReportNewState();
this.channelControlHelper.requestReresolution();
this.requestReresolution();
}
return;
}
Expand All @@ -283,7 +302,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {

private startNextSubchannelConnecting(startIndex: number) {
clearTimeout(this.connectionDelayTimeout);
if (this.triedAllSubchannels || this.stickyTransientFailureMode) {
if (this.triedAllSubchannels) {
return;
}
for (const [index, child] of this.children.entries()) {
Expand Down Expand Up @@ -382,6 +401,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentSubchannelIndex = 0;
this.children = [];
this.triedAllSubchannels = false;
this.requestedResolutionSinceLastUpdate = false;
}

updateAddressList(
Expand Down
90 changes: 89 additions & 1 deletion packages/grpc-js/test/test-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ function updateStateCallBackForExpectedStateSequence(
) {
const actualStateSequence: ConnectivityState[] = [];
let lastPicker: Picker | null = null;
let finished = false;
return (connectivityState: ConnectivityState, picker: Picker) => {
if (finished) {
return;
}
// Ignore duplicate state transitions
if (
connectivityState === actualStateSequence[actualStateSequence.length - 1]
Expand All @@ -60,6 +64,7 @@ function updateStateCallBackForExpectedStateSequence(
if (
expectedStateSequence[actualStateSequence.length] !== connectivityState
) {
finished = true;
done(
new Error(
`Unexpected state ${
Expand All @@ -69,10 +74,12 @@ function updateStateCallBackForExpectedStateSequence(
)}]`
)
);
return;
}
actualStateSequence.push(connectivityState);
lastPicker = picker;
if (actualStateSequence.length === expectedStateSequence.length) {
finished = true;
done();
}
};
Expand All @@ -90,7 +97,7 @@ describe('Shuffler', () => {
});
});

describe('pick_first load balancing policy', () => {
describe.only('pick_first load balancing policy', () => {
const config = new PickFirstLoadBalancingConfig(false);
let subchannels: MockSubchannel[] = [];
const baseChannelControlHelper: ChannelControlHelper = {
Expand Down Expand Up @@ -462,6 +469,87 @@ describe('pick_first load balancing policy', () => {
});
});
});
it('Should request reresolution every time each child reports TF', done => {
let reresolutionRequestCount = 0;
const targetReresolutionRequestCount = 3;
const currentStartState = ConnectivityState.IDLE;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 3 }], config);
process.nextTick(() => {
subchannels[2].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
});
});
});
});
});
});
it('Should request reresolution if the new subchannels are already in TF', done => {
let reresolutionRequestCount = 0;
const targetReresolutionRequestCount = 3;
const currentStartState = ConnectivityState.TRANSIENT_FAILURE;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
});
});
});
describe('Address list randomization', () => {
const shuffleConfig = new PickFirstLoadBalancingConfig(true);
it('Should pick different subchannels after multiple updates', done => {
Expand Down

0 comments on commit 845abca

Please sign in to comment.