Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core-amqp][event-hubs] Fix "too much pending tasks" by making acquire lock calls cancellable #14844

Merged
merged 22 commits into from
Apr 19, 2021

Conversation

chradek
Copy link
Contributor

@chradek chradek commented Apr 12, 2021

Fixes Event Hubs "Too much pending tasks" error: #14606

Summary

This PR fixes the "too much pending tasks" error by updating the acquire('lock') operations to support timeouts and cancellation. Previously, we were essentially wrapping our acquire lock calls in timeouts, but were unable to actually clear the pending task. So while the overall operation would fail with a timeout error, the lock was not released and over time tasks would queue up faster than the lock could be released.

Detailed scenario

  1. EventHubConsumerClient.subscribe() called.
  2. The EventProcessor._runLoopWithLoadBalancing method is invoked, which starts a loop that continues until the subscription or EventHubConsumerClient is closed.
  3. At the start of each loop iteration, we attempt to get the partitionIds.
  4. If there isn't already a $management request/response link open on the connection, we wait to acquire a lock to initialize the request/response link:
    return defaultLock.acquire(this.managementLock, () => {
    return this._init();
    });
  5. Once a lock is acquired, if the request/response link is still not created, attempt to create it.
  6. One of these steps is to negotiate a claim (this is for auth):
    await this._context.readyToOpenLink();
    await this._negotiateClaim();
  7. At this point, we'll wait to acquire another lock to create the CBS session (for auth):
    await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
    return this._context.cbsSession.init();
    });

Now, we have a timeout around acquiring the managementLock (used in step 4), and if we timeout, we will try again until our retries are exhausted. However, our timeout wraps the defaultLock.acquire call, so it doesn't actually release the lock.

Under normal "transient" conditions, we'd expect to see the negotiateClaim call timeout when trying to create a connection to the service, the managementLock gets released, and assuming the network conditions have returned to normal, the next attempt to initialize the management request/response link would succeed. However in the scenario we're testing, we're unable to recreate the connection for an extended period of time. What's interesting is that we're not seeing ECONNREFUSED errors as one might expect. Instead it appears from the client-side that it's attempting to connect to a blackhole that never responds.

So, another wrinkle is that eventually once the getPartitionIds call throws a timeout error, the EventProcessor loop will sleep for some duration and then move on to the next iteration, starting with calling getPartitionIds again. This leads to more attempts to acquire the management lock, which means overtime we eventually hit the limit.

New lock type

@azure/core-amqp currently uses the AsyncLock from async-lock to implement locking. At first glance, it does support setting a timeout for acquiring a lock. However there are 2 issues:

  1. The error thrown is a generic Error so we'd have to inspect the error message and convert it to an OperationTimeoutError so that it could be retryable. Since error messages aren't guaranteed to be stable, this poses some risk.
  2. When a pending task reaches its timeout, it isn't actually removed from the queue. Instead, once it acquires the lock it immediately yields to the next item in the queue. This means we would still see the "too much pending tasks" even using the timeout.

AsyncLock also doesn't support cancellation via something like abortSignal, which is something we need to support in all our async operations.

Thus, I've added CancellableAsyncLock to @azure/core-amqp. It supports both cancellation (which throws an AbortError) and timeouts (which throws an OperationTimeoutError). We should actually be able to get rid of AsyncLock now, but since that's technically a breaking change I currently export both types.

@ghost ghost added the Azure.Core label Apr 12, 2021
@chradek chradek force-pushed the eh-fix-management-pending branch from 8e1055e to 17a2b84 Compare April 16, 2021 02:04
@chradek chradek changed the title [core-amqp] adds defaultCancellableLock [core-amqp][event-hubs] Fix "too much pending tasks" by making acquire lock calls cancellable Apr 16, 2021
@chradek chradek marked this pull request as ready for review April 16, 2021 17:05
@chradek
Copy link
Contributor Author

chradek commented Apr 16, 2021

Looks like core-rest is failing and will fail until #14899 is merged.

Copy link
Member

@richardpark-msft richardpark-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you disagree, let's chat, but I think we should be very aggressive about making the abortSignal and timeout values be non-optional. They can be undefined, but I want to make sure it's a compile time issue to not have passed it since (in our stacks) it's probably an error.

sdk/core/core-amqp/review/core-amqp.api.md Outdated Show resolved Hide resolved
@@ -351,6 +362,9 @@ export function createSasTokenProvider(data: {
sharedAccessSignature: string;
} | NamedKeyCredential | SASCredential): SasTokenProvider;

// @public
export const defaultCancellableLock: CancellableAsyncLock;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is one case where we should just leave the instantiation and usage to the calling package, rather than declaring a default global variable.

I know it's made to work cleanly, etc... but is there a specific reason we'd need to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's just a matter of having 1 of these in core-amqp, or 1 in core-amqp, service-bus, and event-hubs. I don't have a strong opinion either way since core-amqp is kind of a weird 'public but not really but it is' package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arent we replacing the use of defaultLock with this defaultCancellableLock? If so, I would prefer to maintain the same pattern for now. We can always investigate the caller instantiating in a separate task.

sdk/core/core-amqp/src/util/lock.ts Outdated Show resolved Hide resolved
sdk/core/core-amqp/test/lock.spec.ts Show resolved Hide resolved
sdk/core/core-amqp/test/utils/utils.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventHubReceiver.ts Outdated Show resolved Hide resolved
{
connectionId: {
enumerable: true,
get: () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool and makes me think we should always make these properties dynamic (ie, Service Bus too).

In a separate PR I was thinking it'd be nice to just pass in a logger (and maybe it's time to make it so the logger properly prints out the header).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow how the change here helps us... Was there something in the PR description I missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I didn't mention this in the PR. Basically, before this change, sometimes in our logs we'd see the wrong connection-id when referring to a receiver because we copied the value of the id. With this change, we're always referencing the id from our context object. Now our logs are accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this while trying to troubleshoot the too many pending tasks issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wowza, should we follow this format everywhere else we log the connection id? Or only certain scenarios need this? If yes for either question, can you log an issue to investigate all other places where we may have to change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should look into following this method anywhere we would copy the connectionId value today. Issue created:
#14923

sdk/core/core-amqp/review/core-amqp.api.md Outdated Show resolved Hide resolved
sdk/core/core-amqp/src/util/lock.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/linkEntity.ts Outdated Show resolved Hide resolved
@richardpark-msft
Copy link
Member

(approved after discussion with @chradek and he's handling all the contentious discussions)

@@ -407,16 +407,13 @@ export class ManagementClient extends LinkEntity {
const initOperationStartTime = Date.now();

try {
await waitForTimeoutOrAbortOrResolve({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Now that we are not using waitForTimeoutOrAbortOrResolve, you can delete the file timeoutAbortSignalUtils.ts file. Or do that in a separate PR

sdk/core/core-amqp/src/cbs.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/eventHubReceiver.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/linkEntity.ts Outdated Show resolved Hide resolved
sdk/eventhub/event-hubs/src/linkEntity.ts Outdated Show resolved Hide resolved
@chradek
Copy link
Contributor Author

chradek commented Apr 19, 2021

/azp run js - event-hubs - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@chradek chradek merged commit 4027106 into Azure:master Apr 19, 2021
jay-most pushed a commit to jay-most/azure-sdk-for-js that referenced this pull request Apr 26, 2021
…e lock calls cancellable (Azure#14844)

* [core-amqp] adds defaultCancellableLock

* [core-amqp] make cbs acquireLock call cancellable

* [core-amqp] update CancellableAsyncLock to throw OperationTimeoutError if acquireTimeoutInMs is reached

* [core-amqp] fix eslint errors

* [event-hubs] add timeouts to acquire calls

* pass abortSignal to init/negotiateClaim methods

* update pnpm-lock.yaml

* [core-amqp] make flaky tests not flaky

* [core-amqp] make fields required

* [core-amqp] AcquireOptions -> AcquireLockProperties, add defaultCancellableLock back, remove unneeded code from allSettled helper util

* [core-amqp] parameter rename cleanup

* [event-hubs] add timeout to link initialization calls

* update pnpm-lock.yaml

* [event-hubs] improve timeout to cbsSession.negotiateClaimLock

* [core-amqp] add isOpen() to CbsClient

* [event-hubs] remove unneeded AbortError branch from event hub receiver

* [core-amqp] fix flaky test in node 15

* [event-hubs] use cbs isOpen()

* [core-amqp] add timeout to CbsClient init and negotiateClaim methods

* [event-hubs] pass timeout through to CbsClient init and negotiateClaim methods
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants