diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 5bbb982688aa8..e4ae928777fcc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -695,4 +695,34 @@ void updateDispositionDoesNotAddCredit() { verify(link1).addCredits(eq(PREFETCH)); verify(link1).updateDisposition(eq(lockToken), eq(deliveryState)); } + + @Test + void updateDispositionClosesLinkOnTimeout() { + // Arrange + final ServiceBusReceiveLinkProcessor processor = Flux.create(sink -> sink.next(link1)) + .subscribeWith(linkProcessor); + + final AmqpException amqpException = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, + "Test-timeout-error", new AmqpErrorContext("test-namespace")); + when(retryPolicy.calculateRetryDelay(amqpException, 1)).thenReturn(Duration.ofSeconds(1)); + + final String lockToken = "lockToken"; + final DeliveryState deliveryState = mock(DeliveryState.class); + + when(link1.updateDisposition(eq(lockToken), eq(deliveryState))).thenReturn(Mono.error(amqpException)); + when(link1.closeAsync()).thenReturn(Mono.empty()); + + // Act & Assert + StepVerifier.create(processor.updateDisposition(lockToken, deliveryState)) + .expectErrorSatisfies(error -> assertSame(amqpException, error)) + .verify(); + processor.cancel(); + + verify(link1).updateDisposition(eq(lockToken), eq(deliveryState)); + verify(link1, times(1)).closeAsync(); + + assertTrue(processor.isTerminated()); + assertFalse(processor.hasError()); + assertNull(processor.getError()); + } }