Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus#48460
Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus#48460EldertGrootenboer wants to merge 21 commits intoAzure:mainfrom
Conversation
Add RecoveryKind error classification and recovery-aware retry to azure-core-amqp. Apply tiered recovery to all Service Bus sender, receiver, and session paths. On LINK errors: dispose stale link/session, retry with fresh resources. On CONNECTION errors: force-close the cached connection, retry with fresh connection. Includes quick-retry optimization and didQuickRetry deduplication. Fixes Azure#44688
Add azure-core-amqp to AdditionalModules in ci.yml and trigger paths so CI builds it from source alongside servicebus. Update pom.xml to reference 2.12.0-beta.1 with current tag (cross-module PR -- to be revisited after azure-core-amqp is released to Maven Central). Note: uses current tag temporarily; reviewer to confirm release sequencing.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.
You can also share your feedback on Copilot code review. Take the survey.
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java
Outdated
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Show resolved
Hide resolved
Address 6 comments from Copilot PR review on Azure#48460: - sendFluxInternal: wrap only batchList (link acquisition) with withRetryAndRecovery, not the full sendOperation. Wrapping the outer operation caused the user-provided Flux to be re-subscribed on each retry and nested retries with sendBatchInternal. - scheduleMessageInternal: change getSendLink to getSendLinkWithRetry so schedule operations get the same tiered recovery as other send paths. - ReactorConnectionCache.forceCloseConnection: use connection.dispose() instead of closeAsync() so isDisposed() returns true synchronously. This ensures cacheInvalidateIf invalidates the cached reference immediately on the next get() call. - RetryUtil: replace Math.random() with ThreadLocalRandom.current() to eliminate shared RNG contention and improve testability. - performRecovery comment: remove contradictory 'happens organically' comment that conflicted with the explicit forceCloseConnection() call. - ServiceBusReceiverAsyncClient: add error handler to the subscribe() call inside the LINK recovery callback so failures are logged and do not silently leak.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 7 comments.
You can also share your feedback on Copilot code review. Take the survey.
...aging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java
Show resolved
Hide resolved
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java
Show resolved
Hide resolved
- T1: ServiceBusSessionAcquirer - return delay after forceCloseConnection for RecoveryKind.CONNECTION so session acquisition retries after connection recovery - T2: ServiceBusSessionManager - merge LINK and CONNECTION into a single delay branch so CONNECTION errors retry instead of falling through to publishError - T3: ServiceBusSenderAsyncClient - narrow withRetryAndRecovery to wrap only link acquisition (getSendLink); messages.collect() moved outside retry boundary to avoid re-subscribing user Flux on retry - T4: RecoveryKind - reclassify OPERATION_CANCELLED from NONE to LINK because core-amqp raises it when AMQP layer unexpectedly aborts or disconnects, which requires link recovery (e.g. ReceiverUnsettledDeliveries remote Released outcome) - T5: RecoveryKind - reclassify RESOURCE_LIMIT_EXCEEDED from FATAL to NONE to match ReactorSender.isGeneralSendError() which treats it as retriable alongside SERVER_BUSY and TIMEOUT - T6: RetryUtilTest - add four tests for createRetryWithRecovery covering FATAL no-retry, LINK recovery callback, CONNECTION recovery callback, and retry budget exhaustion; use virtual time for backoff-delay scenarios - T7: RecoveryKindTest - rename operationCancelled test to expect LINK result, add new test asserting RESOURCE_LIMIT_EXCEEDED classifies as NONE
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
You can also share your feedback on Copilot code review. Take the survey.
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java
Outdated
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
- T8: RetryUtil - clamp final jittered delay to maxDelay so retryOptions are
consistently respected. Previously jitter was applied after the pre-jitter cap
which could produce a delay exceeding retryOptions.getMaxDelay(). Also cap
baseDelay to maxDelay in FIXED mode (FIXED previously used baseDelay without
checking against maxDelay, unlike the EXPONENTIAL path).
- T9: ServiceBusReceiverAsyncClient - fix misleading log message in the LINK/
CONNECTION recovery callback. The error handler on connectionProcessor.subscribe
fires only when obtaining the connection fails (not when removeSession fails,
since it returns a boolean). Renamed to "Error obtaining connection during {}
recovery." Also log the boolean result of removeSession at VERBOSE level to
confirm whether a stale session was actually evicted.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
You can also share your feedback on Copilot code review. Take the survey.
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java
Show resolved
Hide resolved
…overy T10: In ServiceBusSenderAsyncClient.performRecovery(), replace link.dispose() with link.closeAsync().subscribe(...). ReactorSender.dispose() calls closeAsync().block(tryTimeout), which blocks the Reactor thread when invoked from a recovery callback on a non-blocking scheduler. T11: In ReactorConnectionCache.forceCloseConnection(), replace connection.dispose() with a non-blocking equivalent: set a new forceInvalidate AtomicBoolean flag before starting connection.closeAsync().subscribe(...). The cacheInvalidateIf predicate now checks forceInvalidate in addition to isDisposed(), ensuring the cache is invalidated synchronously (by the flag) while the close handshake completes asynchronously. ReactorConnection.dispose() has the same blocking pattern. T12: Update comment in RetryUtil.createRetryWithRecovery() to remove the misleading claim that the quick-retry path matches Go's ResetAttempts(). The Java implementation uses a didQuickRetry flag only (no attempt counter reset); subsequent retries continue standard exponential backoff from the running count.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
You can also share your feedback on Copilot code review. Take the survey.
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java
Outdated
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 9 comments.
You can also share your feedback on Copilot code review. Take the survey.
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java
Outdated
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Show resolved
Hide resolved
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
Show resolved
Hide resolved
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
Outdated
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java
Outdated
Show resolved
Hide resolved
… guard session disposal, improve recovery log context
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
You can also share your feedback on Copilot code review. Take the survey.
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
Outdated
Show resolved
Hide resolved
|
Azure Pipelines successfully started running 1 pipeline(s). |
| // Perform recovery before retry. | ||
| if (kind != RecoveryKind.NONE && recoveryAction != null) { | ||
| try { | ||
| recoveryAction.accept(kind); |
There was a problem hiding this comment.
This worries me because these should be handled by downstream subscribers rather than injecting this in the middle of the reactor operation pipeline. Thoughts, @anuchandy ?
| } | ||
|
|
||
| // Standard backoff delay. | ||
| final Duration delay; |
There was a problem hiding this comment.
I feel like we have this delay logic in another location so we should consolidate it so users don't inadvertently end up with two different retry durations if they decide to use the normal retry rather than this method
What this PR does
Adds tiered send/receive recovery to
azure-core-amqpandazure-messaging-servicebus, matching the recovery pattern used by the Go, .NET, Python, and JS SDKs. When an AMQP operation fails, the error is classified into a recovery tier — NONE, LINK, CONNECTION, or FATAL — and the appropriate resources are closed before retrying.This resolves a long-standing issue where a single stale AMQP connection or link could cause a Service Bus sender or receiver to become permanently stuck, requiring a JVM restart.
Changes
azure-core-amqp (shared — benefits Event Hubs too)
RecoveryKind(new): Error classification enum withclassify(Throwable)that maps everyAmqpErrorConditionto NONE/LINK/CONNECTION/FATALRetryUtil.withRetryAndRecovery()(new): Retry wrapper that invokes a recovery callback between attempts. Includes quick-retry optimization for the first LINK/CONNECTION error (matching Go SDK'sdidQuickRetry)ReactorConnectionCache.forceCloseConnection()(new): Closes the cached connection so the nextget()creates a fresh one — for stale connections where AMQP heartbeats are echoed by intermediate infrastructureAmqpChannelProcessor.forceCloseChannel()(new): Same for the v1 connection cacheazure-messaging-servicebus
sendBatchInternal,sendFluxInternal,scheduleMessageInternal,getSendLinkWithRetry) now usewithRetryAndRecoverywith active link disposal +forceCloseConnectionon CONNECTION errorswithRetryAndRecoverywith session removal on LINK errors +forceCloseConnectionon CONNECTION errorsConnectionCacheWrapper.forceCloseConnection(): Delegates to v1/v2 cacheTests
failedSendMessageReportsMetricsto use FATAL error (matching broadened retry filter)Cross-SDK parity
GetRecoveryKind()HasLinkCommunicationErrorshutdown_handlerRecoveryKind.classify()FaultTolerantAmqpObjectcloseLink()link.dispose()+removeSession()Namespace.Recover()ActiveConnection.GetOrCreateAsyncrefreshConnection()forceCloseConnection()didQuickRetry + ResetAttemptsAtomicBoolean didQuickRetrydoBeforeRetry)Related issues
Directly fixes or would have prevented:
operation-cancellederrors