-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ServiceBus] Cache UpdateDispositionWorkItem Mono #22317
Conversation
@@ -250,7 +250,7 @@ protected Message decodeDelivery(Delivery delivery) { | |||
sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", | |||
error, handler.getErrorContext(receiver))); | |||
} | |||
}); | |||
}).cache().then(Mono.empty()); // cache because closeAsync use `when` to subscribe this Mono again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need .then(Mono.empty())
? If so, then use then()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Mono.empty() is not required here.
@@ -362,7 +362,7 @@ private void cleanupWorkItems() { | |||
}); | |||
} | |||
|
|||
private void completeWorkItem(String lockToken, Delivery delivery, MonoSink<Void> sink, Throwable error) { | |||
private void completeWorkItem(String lockToken, Delivery delivery, MonoSink<Object> sink, Throwable error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need it to be a MonoSink of Object? I thought you could cache to .<Void>cache()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the .cache()
call turns the sink into a MonoSink<Object>
from MonoSink<Void>
.
final Mono<Void> result = Mono.create(sink -> {
workItem.start(sink);
try {
provider.getReactorDispatcher().invoke(() -> {
unsettled.disposition(deliveryState);
pendingUpdates.put(lockToken, workItem);
});
} catch (IOException error) {
sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
error, handler.getErrorContext(receiver)));
}
}).cache().then();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can cast the create operator to Void
as shown below.
final Mono<Void> result = Mono.<Void>create(sink -> {
workItem.start(sink);
try {
provider.getReactorDispatcher().invoke(() -> {
unsettled.disposition(deliveryState);
pendingUpdates.put(lockToken, workItem);
});
} catch (IOException error) {
sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
error, handler.getErrorContext(receiver)));
}
}).cache();
Machinelearningservices microsoft.machine learning services 2022 12 01 preview (Azure#21761) * Adds base for updating Microsoft.MachineLearningServices from version preview/2022-10-01-preview to version 2022-12-01-preview * Updates readme * Updates API version in new specs and examples * Add Dec API Registries Swagger (Azure#21419) * add december registries swagger + examples * add status code 202 in examples * fix 202 examples * fixes * fixes * fix * add 202 back in for put/patch Co-authored-by: Komal Yadav <komalyadav@microsoft.com> * remove location (Azure#21430) Co-authored-by: Komal Yadav <komalyadav@microsoft.com> * remove readonly flag on schedules property for CI (Azure#21653) Co-authored-by: Naman Agarwal <naagarw@microsoft.com> * add missing workspace properties (Azure#21725) * December preview updating mfe.json specs (Azure#21510) * December preview updating mfe.json specs * MFE Dec 2022 Preview API - Adding logbase * MFE 2022-12-01-preview swagger spec model validation fix * MFE 2022-12-01-preview swagger spec model validation fix, add missing location * MFE 2022-12-01-preview swagger spec model validation - typo fix * MFE 2022-12-01-preview swagger spec model validation - fix api version in automljob example * MFE 2022-12-01-preview swagger spec model validation - fix for multiselectenabled error * MFE 2022-12-01-preview swagger spec model validation - fix for multiselectenabled error * Fix for 1006 - RemovedDefinition (RecurrenceTrigger,CronTrigger) (Azure#21822) * fix ReadonlyPropertyChanged of MLC (Azure#21814) Co-authored-by: Bingchen Li <bingchenli@microsoft.com> * fixed custom-words conflict (Azure#21829) * fix custom-words conflict merge (Azure#21830) * example fix (INVALID_REQUEST_PARAMETER) (Azure#21832) Co-authored-by: Ivaliy Ivanov <ivaliyivanov@Ivaliys-MacBook-Air.local> * example fix, use correct api preview version - (INVALID_REQUEST_PARAMETER) (Azure#21833) Co-authored-by: Ivaliy Ivanov <ivaliyivanov@Ivaliys-MacBook-Air.local> * Revert breaking change for MLC swagger 2022-12-01-preview (Azure#21885) Co-authored-by: Bingchen Li <bingchenli@microsoft.com> * Revert Connection Category back to enum. (Azure#21939) * revert provisioning state change (Azure#21940) * remove body (Azure#21978) Co-authored-by: Komal Yadav <komalyadav@microsoft.com> * Addressed comments, added x-ms-long-running-operation to a patch call (Azure#22005) * Addressed comments, added x-ms-long-running-operation to a patch call * fix examples for patch - remove body * fixed formatting * Ivalbert fix patch2 (Azure#22006) * Addressed comments, added x-ms-long-running-operation to a patch call * fix examples for patch - remove body * fixed formatting * fixed formatting * Updated custom words (Azure#22262) * Fixed prettier errors (Azure#22237) * fixed examples for LRO_RESPONSE_HEADER check (Azure#22293) * fixed examples for LRO_RESPONSE_HEADER check (Azure#22294) * Example fix - OBJECT_MISSING_REQUIRED_PROPERTY - Missing required property: triggerType (Azure#22317) --------- Co-authored-by: Komal Yadav <23komal.yadav23@gmail.com> Co-authored-by: Komal Yadav <komalyadav@microsoft.com> Co-authored-by: Naman Agarwal <namanag16@gmail.com> Co-authored-by: Naman Agarwal <naagarw@microsoft.com> Co-authored-by: ZhidaLiu <zhili@microsoft.com> Co-authored-by: libc16 <88697960+libc16@users.noreply.github.com> Co-authored-by: Bingchen Li <bingchenli@microsoft.com> Co-authored-by: Ivaliy Ivanov <ivaliyivanov@Ivaliys-MacBook-Air.local>
fixes #22299
ServiceBusReactorReceiver.updateDisposition
does not return or throw error if try timeout is long (from about 50 seconds but no exact number, usually 10 seconds is safe) and network is down.This is because when try timeout is long enough,
ServiceBusReactorReceiver.updateDisposition()
hasn't timed out while another thread closes theServiceBusReactorReceiver
. In thecloseAsync
method, the Mono object of work items of typeUpdateDispositionWorkItem
is subscribed again (usingwhen
). The dual subscription causes the first Mono's sink doesn't complete. Adding a.cache()
to the Mono will ensure the sink object isn't created for a second time.