From ba30e691b59267b84ae70e231a3da84711237716 Mon Sep 17 00:00:00 2001 From: Mihai Mitrea Date: Tue, 10 Mar 2026 11:55:51 +0000 Subject: [PATCH 1/4] Track token staleness with staleAfter and preserve legacy staleDuration behavior Replace the previous staleness tracking with a cached staleAfter timestamp that is computed when a token is stored. This makes the cache state easier to reason about because callers now classify tokens by comparing the current time against staleAfter and the expiry buffer, instead of deriving the stale window ad hoc at read time. Preserve backward compatibility for callers that set staleDuration through the builder by keeping that configuration on the legacy fixed-window path. This ensures that existing integrations which explicitly pass staleDuration continue to get the behavior they configured, both for the initial cached token and for tokens obtained through later refreshes. Update async refresh handling so failed async refreshes push staleAfter forward by a one-minute backoff instead of repeatedly retrying on every stale read. Also prevent an older async refresh result from overwriting a newer token that is already in the cache, and expand the tests to cover staleAfter computation as well as retry behavior before and after the backoff window. --- NEXT_CHANGELOG.md | 1 + .../sdk/core/oauth/CachedTokenSource.java | 176 +++++++---- .../sdk/core/oauth/CachedTokenSourceTest.java | 293 ++++++++++++------ 3 files changed, 313 insertions(+), 157 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 9c54a0919..08ba51f1e 100755 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -11,6 +11,7 @@ ### Documentation ### Internal Changes +* Added retry with backoff to `CachedTokenSource` async refresh so that a failed background refresh no longer disables async until a blocking call succeeds. ### API Changes * Add `com.databricks.sdk.service.dataclassification` and `com.databricks.sdk.service.knowledgeassistants` packages. diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java index f5024640c..b325a85d8 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java @@ -38,21 +38,23 @@ private enum TokenState { // monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes) while increasing the likelihood // that the token is refreshed asynchronously if the auth server is down. private static final Duration MAX_STALE_DURATION = Duration.ofMinutes(20); + // Delay before another async refresh may be attempted after an async refresh failure. + private static final Duration ASYNC_REFRESH_RETRY_BACKOFF = Duration.ofMinutes(1); // Default additional buffer before expiry to consider a token as expired. // This is 40 seconds by default since Azure Databricks rejects tokens that are within 30 seconds // of expiry. private static final Duration DEFAULT_EXPIRY_BUFFER = Duration.ofSeconds(40); - // The token source to use for refreshing the token. + // Underlying token source used to fetch replacement tokens. private final TokenSource tokenSource; // Whether asynchronous refresh is enabled. private boolean asyncDisabled = false; // The legacy duration before expiry to consider a token as 'stale'. private final Duration staticStaleDuration; // Whether to use the dynamic stale duration computation or defer to the legacy duration. - private final boolean useDynamicStaleDuration; + private final boolean useLegacyStaleDuration; // The dynamically computed duration before expiry to consider a token as 'stale'. - private volatile Duration dynamicStaleDuration; + private volatile Instant staleAfter; // Additional buffer before expiry to consider a token as expired. private final Duration expiryBuffer; // Clock supplier for current time. @@ -62,23 +64,16 @@ private enum TokenState { protected volatile Token token; // Whether a refresh is currently in progress (for async refresh). private boolean refreshInProgress = false; - // Whether the last refresh attempt succeeded. - private boolean lastRefreshSucceeded = true; private CachedTokenSource(Builder builder) { this.tokenSource = builder.tokenSource; this.asyncDisabled = builder.asyncDisabled; this.staticStaleDuration = builder.staleDuration; - this.useDynamicStaleDuration = builder.useDynamicStaleDuration; + this.useLegacyStaleDuration = builder.useLegacyStaleDuration; this.expiryBuffer = builder.expiryBuffer; this.clockSupplier = builder.clockSupplier; - this.token = builder.token; - if (this.useDynamicStaleDuration && this.token != null) { - this.dynamicStaleDuration = computeStaleDuration(this.token); - } else { - this.dynamicStaleDuration = Duration.ofMinutes(0); - } + this.updateToken(builder.token); } /** @@ -91,7 +86,7 @@ public static class Builder { private final TokenSource tokenSource; private boolean asyncDisabled = false; private Duration staleDuration = DEFAULT_STALE_DURATION; - private boolean useDynamicStaleDuration = true; + private boolean useLegacyStaleDuration = false; private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER; private ClockSupplier clockSupplier = new UtcClockSupplier(); private Token token; @@ -139,7 +134,10 @@ public Builder setAsyncDisabled(boolean asyncDisabled) { * Sets the duration before token expiry at which the token is considered stale. * *

When asynchronous refresh is enabled, tokens that are stale but not yet expired will - * trigger a background refresh while continuing to serve the current token. + * trigger a background refresh while continuing to serve the current token. Calling this method + * opts into the legacy fixed stale-duration behavior instead of the default dynamic stale + * computation, preserving backward compatibility for callers that already provide a custom + * stale duration. * * @param staleDuration The duration before expiry to consider a token stale. Must be greater * than the expiry buffer duration. @@ -147,7 +145,7 @@ public Builder setAsyncDisabled(boolean asyncDisabled) { */ public Builder setStaleDuration(Duration staleDuration) { this.staleDuration = staleDuration; - this.useDynamicStaleDuration = false; + this.useLegacyStaleDuration = true; return this; } @@ -190,6 +188,77 @@ public CachedTokenSource build() { } } + /** + * Replaces the cached token and recomputes the time after which it should be treated as stale. + * + *

Legacy mode uses the configured fixed stale duration. Dynamic mode derives the stale window + * from the token's remaining TTL and caps it at {@link #MAX_STALE_DURATION}. The stale threshold + * is written before the volatile token write so readers that observe the new token also observe + * the matching {@code staleAfter} value. + * + * @param t The token to cache. May be null. + */ + private void updateToken(Token t) { + if (t == null) { + this.staleAfter = null; + this.token = t; + return; + } + + if (t.getExpiry() == null) { + this.staleAfter = null; + this.token = t; + return; + } + + if (this.useLegacyStaleDuration) { + this.staleAfter = t.getExpiry().minus(staticStaleDuration); + } else { + Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry()); + Duration staleDuration = ttl.dividedBy(2); + if (staleDuration.compareTo(MAX_STALE_DURATION) > 0) { + staleDuration = MAX_STALE_DURATION; + } + if (staleDuration.compareTo(Duration.ZERO) <= 0) { + staleDuration = Duration.ZERO; + } + + this.staleAfter = t.getExpiry().minus(staleDuration); + } + + // Publish the token after staleAfter so readers that observe the new token also observe the + // stale threshold computed for that token. + this.token = t; + } + + /** + * Delays the next async refresh attempt after an async refresh failure. + * + *

The cached token remains usable until it becomes expired. Moving {@code staleAfter} into the + * future prevents callers from immediately retrying async refresh on every stale read while the + * auth service is unhealthy. + */ + private void handleFailedAsyncRefresh() { + synchronized (this) { + if (this.staleAfter != null) { + Instant now = Instant.now(clockSupplier.getClock()); + this.staleAfter = now.plus(ASYNC_REFRESH_RETRY_BACKOFF); + } + } + } + + /** + * Returns {@code true} when the currently cached token has a later expiry than {@code candidate}, + * meaning the candidate should be discarded. This prevents an async refresh that was started + * before a blocking refresh from overwriting the newer token obtained by the blocking path. + */ + private boolean cachedTokenIsNewer(Token candidate) { + return token != null + && token.getExpiry() != null + && candidate.getExpiry() != null + && token.getExpiry().isAfter(candidate.getExpiry()); + } + /** * Gets the current token, refreshing if necessary. If async refresh is enabled, may return a * stale token while a refresh is in progress. @@ -206,21 +275,6 @@ public Token getToken() { return getTokenAsync(); } - private Duration computeStaleDuration(Token t) { - if (t.getExpiry() == null) { - return Duration.ZERO; // Tokens with no expiry are considered permanent. - } - - Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry()); - - if (ttl.compareTo(Duration.ZERO) <= 0) { - return Duration.ZERO; - } - - Duration halfTtl = ttl.dividedBy(2); - return halfTtl.compareTo(MAX_STALE_DURATION) > 0 ? MAX_STALE_DURATION : halfTtl; - } - /** * Determine the state of the current token (fresh, stale, or expired). * @@ -234,12 +288,11 @@ protected TokenState getTokenState(Token t) { return TokenState.FRESH; // Tokens with no expiry are considered permanent. } - Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry()); - if (lifeTime.compareTo(expiryBuffer) <= 0) { + Instant now = Instant.now(clockSupplier.getClock()); + if (now.isAfter(t.getExpiry().minus(expiryBuffer))) { return TokenState.EXPIRED; } - Duration staleDuration = useDynamicStaleDuration ? dynamicStaleDuration : staticStaleDuration; - if (lifeTime.compareTo(staleDuration) <= 0) { + if (now.isAfter(staleAfter)) { return TokenState.STALE; } return TokenState.FRESH; @@ -265,7 +318,6 @@ protected Token getTokenBlocking() { if (getTokenState(token) != TokenState.EXPIRED) { return token; } - lastRefreshSucceeded = false; Token newToken; try { newToken = tokenSource.getToken(); @@ -273,15 +325,8 @@ protected Token getTokenBlocking() { logger.error("Failed to refresh token synchronously", e); throw e; } - lastRefreshSucceeded = true; - // Write dynamicStaleDuration before publishing the new token via the volatile write, - // so unsynchronized readers that see the new token are guaranteed to also see the - // updated dynamicStaleDuration. - if (useDynamicStaleDuration && newToken != null) { - dynamicStaleDuration = computeStaleDuration(newToken); - } - token = newToken; + updateToken(newToken); return token; } } @@ -318,31 +363,28 @@ protected Token getTokenAsync() { private synchronized void triggerAsyncRefresh() { // Check token state again inside the synchronized block to avoid triggering a refresh if // another thread updated the token in the meantime. - if (!refreshInProgress && lastRefreshSucceeded && getTokenState(token) != TokenState.FRESH) { - refreshInProgress = true; - CompletableFuture.runAsync( - () -> { - try { - // Attempt to refresh the token in the background. - Token newToken = tokenSource.getToken(); - synchronized (this) { - // Write dynamicStaleDuration before publishing the new token via the volatile - // write, so unsynchronized readers that see the new token are guaranteed to also - // see the updated dynamicStaleDuration. - if (useDynamicStaleDuration && newToken != null) { - dynamicStaleDuration = computeStaleDuration(newToken); - } - token = newToken; - refreshInProgress = false; - } - } catch (Exception e) { - synchronized (this) { - lastRefreshSucceeded = false; - refreshInProgress = false; - logger.error("Asynchronous token refresh failed", e); + if (refreshInProgress || getTokenState(token) != TokenState.STALE) { + return; + } + + refreshInProgress = true; + CompletableFuture.runAsync( + () -> { + try { + Token newToken = tokenSource.getToken(); + synchronized (this) { + if (!cachedTokenIsNewer(newToken)) { + updateToken(newToken); } + refreshInProgress = false; } - }); - } + } catch (Exception e) { + synchronized (this) { + handleFailedAsyncRefresh(); + refreshInProgress = false; + logger.error("Asynchronous token refresh failed", e); + } + } + }); } } diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java index ecb531f03..c45066142 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java @@ -3,10 +3,13 @@ import static org.junit.jupiter.api.Assertions.*; import com.databricks.sdk.core.utils.TestClockSupplier; +import java.lang.reflect.Field; import java.time.Duration; import java.time.Instant; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -17,6 +20,7 @@ public class CachedTokenSourceTest { private static final String TOKEN_TYPE = "Bearer"; private static final String INITIAL_TOKEN = "initial-token"; private static final String REFRESH_TOKEN = "refreshed-token"; + private static final Instant BASE_TIME = Instant.parse("2026-03-09T00:00:00Z"); private static final long FRESH_MINUTES = 10; @@ -36,6 +40,35 @@ public class CachedTokenSourceTest { private static final long EXPIRED_MINUTES = -1; + private static Token tokenExpiringAt(String accessToken, Instant expiry) { + return new Token(accessToken, TOKEN_TYPE, null, expiry); + } + + /** + * Reads the private {@code staleAfter} field so the tests can verify the exact threshold computed + * by {@link CachedTokenSource} without changing the production API just for test visibility. + */ + private static Instant getStaleAfter(CachedTokenSource source) throws Exception { + Field staleAfterField = CachedTokenSource.class.getDeclaredField("staleAfter"); + staleAfterField.setAccessible(true); + return (Instant) staleAfterField.get(source); + } + + /** + * Polls a condition until it becomes true or the timeout expires. This keeps the async-refresh + * tests deterministic without relying on long fixed sleeps. + */ + private static void awaitCondition(String message, BooleanSupplier condition) throws Exception { + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(1); + while (System.nanoTime() < deadlineNanos) { + if (condition.getAsBoolean()) { + return; + } + Thread.sleep(10); + } + fail(message); + } + private static Stream provideAsyncRefreshScenarios() { return Stream.of( Arguments.of("Fresh token, async enabled", FRESH_MINUTES, 0L, false, false, INITIAL_TOKEN), @@ -126,105 +159,185 @@ public Token getToken() { } /** - * This test verifies that if an asynchronous token refresh fails, the next refresh attempt is - * forced to be synchronous. It ensures that after an async failure, the system does not - * repeatedly attempt async refreshes while the token is stale, and only performs a synchronous - * refresh when the token is expired. After a successful sync refresh, async refreshes resume as - * normal. + * Supplies coverage for the two stale-threshold strategies supported by the cache: + * + *

Dynamic mode computes {@code staleAfter} from the token TTL and caps the stale window at + * twenty minutes. Legacy mode uses the stale duration explicitly provided through the builder. + * The scenarios also cover null initial tokens and already-expired tokens to document how those + * edge cases initialize the cached threshold. */ - @Test - void testAsyncRefreshFailureFallback() throws Exception { - // Create a mutable clock supplier that we can control - TestClockSupplier clockSupplier = new TestClockSupplier(Instant.now()); - - // Create a token with a TTL of 4 minutes that will be stale in 3 minutes. - Token staleToken = - new Token( - INITIAL_TOKEN, - TOKEN_TYPE, + private static Stream provideStaleAfterScenarios() { + return Stream.of( + Arguments.of( + "Dynamic staleAfter uses half the TTL", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(4))), null, - Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(4))); - - class TestSource implements TokenSource { - int refreshCallCount = 0; - boolean isFirstRefresh = true; - - @Override - public Token getToken() { - refreshCallCount++; - try { - // Sleep to simulate token fetch delay - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (isFirstRefresh) { - isFirstRefresh = false; - throw new RuntimeException("Simulated async failure"); - } - // Return a token that expires in 10 minutes from current time - return new Token( - REFRESH_TOKEN + "-" + refreshCallCount, - TOKEN_TYPE, + BASE_TIME.plus(Duration.ofMinutes(2))), + Arguments.of( + "Dynamic staleAfter is capped at twenty minutes", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(60))), null, - Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10))); - } - } + BASE_TIME.plus(Duration.ofMinutes(40))), + Arguments.of( + "Dynamic staleAfter clamps expired tokens to expiry", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.minus(Duration.ofMinutes(1))), + null, + BASE_TIME.minus(Duration.ofMinutes(1))), + Arguments.of( + "Legacy staleAfter uses the builder stale duration", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(10))), + Duration.ofMinutes(3), + BASE_TIME.plus(Duration.ofMinutes(7))), + Arguments.of( + "Legacy staleAfter honors a stale duration longer than the remaining TTL", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(4))), + Duration.ofMinutes(5), + BASE_TIME.minus(Duration.ofMinutes(1))), + Arguments.of("No initial token leaves staleAfter unset in dynamic mode", null, null, null), + Arguments.of( + "No initial token leaves staleAfter unset in legacy mode", + null, + Duration.ofMinutes(5), + null)); + } - TestSource testSource = new TestSource(); - CachedTokenSource source = - new CachedTokenSource.Builder(testSource) - .setAsyncDisabled(false) - .setToken(staleToken) - .setClockSupplier(clockSupplier) - .build(); + /** + * Verifies that {@link CachedTokenSource} computes {@code staleAfter} correctly for both the + * default dynamic behavior and the legacy builder-provided stale duration path. + * + *

Each scenario builds the cache with a deterministic test clock, inspects the private {@code + * staleAfter} field via reflection, and compares it with the expected threshold for the + * configured token and builder options. The token source is a stub because this test only + * validates initialization logic and should never trigger a refresh. + */ + @ParameterizedTest(name = "{0}") + @MethodSource("provideStaleAfterScenarios") + void testStaleAfterComputationParametrized( + String testName, Token initialToken, Duration staleDuration, Instant expectedStaleAfter) + throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); - // Advance clock to put the token in the stale window. - clockSupplier.advanceTime(Duration.ofMinutes(3)); + CachedTokenSource.Builder builder = + new CachedTokenSource.Builder( + () -> { + throw new UnsupportedOperationException( + "Token refresh should not be invoked when computing staleAfter"); + }) + .setClockSupplier(clockSupplier); - // First call triggers async refresh, which fails - // Should return stale token immediately (async refresh) - Token token = source.getToken(); - assertEquals(INITIAL_TOKEN, token.getAccessToken(), "Should return stale token immediately"); - Thread.sleep(600); - assertEquals( - 1, testSource.refreshCallCount, "refresh() should have been called once (async, failed)"); + if (initialToken != null) { + builder.setToken(initialToken); + } + if (staleDuration != null) { + builder.setStaleDuration(staleDuration); + } + + CachedTokenSource source = builder.build(); - // Token is still stale, so next call should NOT trigger another refresh since the last refresh - // failed - token = source.getToken(); - assertEquals(INITIAL_TOKEN, token.getAccessToken(), "Should still return stale token"); - Thread.sleep(600); assertEquals( - 1, - testSource.refreshCallCount, - "refresh() should NOT be called again while stale after async failure"); + expectedStaleAfter, + getStaleAfter(source), + "Computed staleAfter did not match the expected value"); + } - // Advance time by 3 minutes to make the token expired - clockSupplier.advanceTime(Duration.ofMinutes(3)); + /** + * Verifies that once an async refresh fails, repeated reads before the one-minute backoff has + * elapsed keep serving the cached token and do not trigger another refresh attempt. + */ + @Test + void testGetTokenDoesNotRetryBeforeAsyncBackoffElapses() throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + AtomicInteger refreshCallCount = new AtomicInteger(); + CountDownLatch secondAttemptStarted = new CountDownLatch(1); - // Now getToken() should call refresh synchronously and return the refreshed token - token = source.getToken(); - assertEquals( - REFRESH_TOKEN + "-2", - token.getAccessToken(), - "Should return the refreshed token after sync refresh"); - Thread.sleep(600); - assertEquals( - 2, - testSource.refreshCallCount, - "refresh() should have been called synchronously after expiry"); - - // Advance time by 8 minutes to make the token stale again - clockSupplier.advanceTime(Duration.ofMinutes(8)); - // Should return stale token immediately (async refresh) - token = source.getToken(); - assertEquals( - REFRESH_TOKEN + "-2", token.getAccessToken(), "Should return stale token immediately"); - Thread.sleep(600); - assertEquals( - 3, - testSource.refreshCallCount, - "refresh() should have been called again asynchronously after making token stale"); + TokenSource tokenSource = + () -> { + if (refreshCallCount.incrementAndGet() == 1) { + throw new RuntimeException("Simulated async failure"); + } + secondAttemptStarted.countDown(); + return tokenExpiringAt( + REFRESH_TOKEN, Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10))); + }; + + CachedTokenSource source = buildStaleSource(tokenSource, clockSupplier); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + + Instant backoffThreshold = BASE_TIME.plus(Duration.ofMinutes(7)); + awaitCondition( + "staleAfter should be pushed forward by the async refresh backoff", + () -> backoffThreshold.equals(getStaleAfterUnchecked(source))); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + assertFalse( + secondAttemptStarted.await(200, TimeUnit.MILLISECONDS), + "No second refresh should start before the backoff has elapsed"); + assertEquals(1, refreshCallCount.get(), "Only the failed async refresh should have run"); + } + + /** + * Verifies that once the async-refresh backoff has elapsed, the next stale read starts another + * async refresh and the refreshed token is eventually published to the cache. + */ + @Test + void testGetTokenRetriesAfterAsyncBackoffElapsesAndUpdatesToken() throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + AtomicInteger refreshCallCount = new AtomicInteger(); + CountDownLatch secondAttemptStarted = new CountDownLatch(1); + + TokenSource tokenSource = + () -> { + if (refreshCallCount.incrementAndGet() == 1) { + throw new RuntimeException("Simulated async failure"); + } + secondAttemptStarted.countDown(); + return tokenExpiringAt( + REFRESH_TOKEN, Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10))); + }; + + CachedTokenSource source = buildStaleSource(tokenSource, clockSupplier); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + + Instant backoffThreshold = BASE_TIME.plus(Duration.ofMinutes(7)); + awaitCondition( + "staleAfter should be pushed forward by the async refresh backoff", + () -> backoffThreshold.equals(getStaleAfterUnchecked(source))); + + clockSupplier.advanceTime(Duration.ofMinutes(2)); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + assertTrue( + secondAttemptStarted.await(1, TimeUnit.SECONDS), + "A new async refresh should start once the backoff has elapsed"); + awaitCondition( + "The refreshed token should be published after the retry succeeds", + () -> REFRESH_TOKEN.equals(source.getToken().getAccessToken())); + assertEquals(2, refreshCallCount.get(), "The cache should retry exactly once after backoff"); + } + + /** + * Builds a CachedTokenSource whose initial token is already stale. The clock is advanced past the + * dynamic staleAfter threshold so the very first getToken call triggers an async refresh. + */ + private static CachedTokenSource buildStaleSource( + TokenSource tokenSource, TestClockSupplier clockSupplier) { + CachedTokenSource source = + new CachedTokenSource.Builder(tokenSource) + .setToken(tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(10)))) + .setClockSupplier(clockSupplier) + .build(); + clockSupplier.advanceTime(Duration.ofMinutes(6)); + return source; + } + + /** Wraps the checked reflection helper so it can be used inside polling lambdas. */ + private static Instant getStaleAfterUnchecked(CachedTokenSource source) { + try { + return getStaleAfter(source); + } catch (Exception e) { + throw new RuntimeException(e); + } } } From 393b548d5ca5ddd04c507684eb60a0b5f6f21ff2 Mon Sep 17 00:00:00 2001 From: Mihai Mitrea Date: Tue, 10 Mar 2026 12:27:38 +0000 Subject: [PATCH 2/4] Fix stale comment, null guard, and minor cleanups in CachedTokenSource - Update field comment on staleAfter to reflect its Instant type - Guard against null newToken in async refresh before calling cachedTokenIsNewer - Remove redundant synchronized block in handleFailedAsyncRefresh - Collapse duplicate null-check branches in updateToken - Use imperative tense in changelog entry Co-Authored-By: Claude Opus 4.6 (1M context) --- NEXT_CHANGELOG.md | 2 +- .../sdk/core/oauth/CachedTokenSource.java | 20 ++++++------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 08ba51f1e..94b02fcdb 100755 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -11,7 +11,7 @@ ### Documentation ### Internal Changes -* Added retry with backoff to `CachedTokenSource` async refresh so that a failed background refresh no longer disables async until a blocking call succeeds. +* Add retry with backoff to `CachedTokenSource` async refresh so that a failed background refresh no longer disables async until a blocking call succeeds. ### API Changes * Add `com.databricks.sdk.service.dataclassification` and `com.databricks.sdk.service.knowledgeassistants` packages. diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java index b325a85d8..03bfd8c1b 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java @@ -53,7 +53,7 @@ private enum TokenState { private final Duration staticStaleDuration; // Whether to use the dynamic stale duration computation or defer to the legacy duration. private final boolean useLegacyStaleDuration; - // The dynamically computed duration before expiry to consider a token as 'stale'. + // The earliest time at which the cached token should be considered stale. private volatile Instant staleAfter; // Additional buffer before expiry to consider a token as expired. private final Duration expiryBuffer; @@ -199,13 +199,7 @@ public CachedTokenSource build() { * @param t The token to cache. May be null. */ private void updateToken(Token t) { - if (t == null) { - this.staleAfter = null; - this.token = t; - return; - } - - if (t.getExpiry() == null) { + if (t == null || t.getExpiry() == null) { this.staleAfter = null; this.token = t; return; @@ -239,11 +233,9 @@ private void updateToken(Token t) { * auth service is unhealthy. */ private void handleFailedAsyncRefresh() { - synchronized (this) { - if (this.staleAfter != null) { - Instant now = Instant.now(clockSupplier.getClock()); - this.staleAfter = now.plus(ASYNC_REFRESH_RETRY_BACKOFF); - } + if (this.staleAfter != null) { + Instant now = Instant.now(clockSupplier.getClock()); + this.staleAfter = now.plus(ASYNC_REFRESH_RETRY_BACKOFF); } } @@ -373,7 +365,7 @@ private synchronized void triggerAsyncRefresh() { try { Token newToken = tokenSource.getToken(); synchronized (this) { - if (!cachedTokenIsNewer(newToken)) { + if (newToken != null && !cachedTokenIsNewer(newToken)) { updateToken(newToken); } refreshInProgress = false; From 1b86228a08164cd87e57ee96bf831ec69a153a8f Mon Sep 17 00:00:00 2001 From: Mihai Mitrea Date: Wed, 11 Mar 2026 09:18:25 +0000 Subject: [PATCH 3/4] Address PR review feedback on async refresh handling - Move refreshInProgress reset to a finally block so a thrown exception in updateToken or cachedTokenIsNewer cannot permanently deadlock future async refreshes. - Update the memory-ordering comment in updateToken to acknowledge that handleFailedAsyncRefresh writes staleAfter without a subsequent volatile token write (consequence is at most one extra async trigger). - Add a comment in triggerAsyncRefresh explaining why only STALE (not EXPIRED) triggers an async attempt. - Add testAsyncRefreshDiscardsOlderToken to cover the cachedTokenIsNewer discard path where a blocking refresh installs a newer token while an async refresh is in flight. --- .../sdk/core/oauth/CachedTokenSource.java | 15 ++-- .../sdk/core/oauth/CachedTokenSourceTest.java | 73 +++++++++++++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java index 03bfd8c1b..6f3d63ec5 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java @@ -221,7 +221,9 @@ private void updateToken(Token t) { } // Publish the token after staleAfter so readers that observe the new token also observe the - // stale threshold computed for that token. + // stale threshold computed for that token. Note: handleFailedAsyncRefresh writes staleAfter + // without a subsequent volatile token write, so a concurrent reader may briefly see a stale + // staleAfter value; the only consequence is one extra async trigger, which is harmless. this.token = t; } @@ -353,8 +355,9 @@ protected Token getTokenAsync() { * succeeded. */ private synchronized void triggerAsyncRefresh() { - // Check token state again inside the synchronized block to avoid triggering a refresh if - // another thread updated the token in the meantime. + // Re-check inside the synchronized block: another thread may have updated the token. + // Only STALE triggers async refresh; EXPIRED tokens are handled by getTokenBlocking, so + // an async attempt is unnecessary and would race with the blocking path. if (refreshInProgress || getTokenState(token) != TokenState.STALE) { return; } @@ -368,14 +371,16 @@ private synchronized void triggerAsyncRefresh() { if (newToken != null && !cachedTokenIsNewer(newToken)) { updateToken(newToken); } - refreshInProgress = false; } } catch (Exception e) { synchronized (this) { handleFailedAsyncRefresh(); - refreshInProgress = false; logger.error("Asynchronous token refresh failed", e); } + } finally { + synchronized (this) { + refreshInProgress = false; + } } }); } diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java index c45066142..9083159fa 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java @@ -317,6 +317,79 @@ void testGetTokenRetriesAfterAsyncBackoffElapsesAndUpdatesToken() throws Excepti assertEquals(2, refreshCallCount.get(), "The cache should retry exactly once after backoff"); } + /** + * Verifies that an async refresh result is discarded when the cache already holds a token with a + * later expiry. This covers the concurrent scenario where a blocking refresh runs while an async + * refresh is in flight: the async result is older and should not overwrite the newer cached token. + */ + @Test + void testAsyncRefreshDiscardsOlderToken() throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + + Token olderRefreshToken = + tokenExpiringAt("older-async-token", BASE_TIME.plus(Duration.ofMinutes(8))); + Token newerBlockingToken = + tokenExpiringAt("newer-blocking-token", BASE_TIME.plus(Duration.ofMinutes(20))); + + CountDownLatch asyncRefreshStarted = new CountDownLatch(1); + CountDownLatch allowAsyncToFinish = new CountDownLatch(1); + AtomicInteger refreshCallCount = new AtomicInteger(); + + TokenSource tokenSource = + () -> { + int call = refreshCallCount.incrementAndGet(); + if (call == 1) { + asyncRefreshStarted.countDown(); + try { + allowAsyncToFinish.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return olderRefreshToken; + } + return newerBlockingToken; + }; + + CachedTokenSource source = + new CachedTokenSource.Builder(tokenSource) + .setToken(tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(10)))) + .setClockSupplier(clockSupplier) + .build(); + + // Advance clock so the token is stale, triggering an async refresh. + clockSupplier.advanceTime(Duration.ofMinutes(6)); + Token staleResult = source.getToken(); + assertEquals(INITIAL_TOKEN, staleResult.getAccessToken()); + assertTrue( + asyncRefreshStarted.await(1, TimeUnit.SECONDS), + "Async refresh should have started"); + + // While async refresh is blocked, advance time so the token expires and force a blocking + // refresh that installs a newer token. + clockSupplier.advanceTime(Duration.ofMinutes(4)); + Token blockingResult = source.getToken(); + assertEquals("newer-blocking-token", blockingResult.getAccessToken()); + + // Let the async refresh finish — its older token should be discarded. + allowAsyncToFinish.countDown(); + awaitCondition( + "refreshInProgress should be reset after the async refresh completes", + () -> { + try { + Field f = CachedTokenSource.class.getDeclaredField("refreshInProgress"); + f.setAccessible(true); + return !(boolean) f.get(source); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + assertEquals( + "newer-blocking-token", + source.getToken().getAccessToken(), + "The newer blocking token should still be cached after the older async result is discarded"); + } + /** * Builds a CachedTokenSource whose initial token is already stale. The clock is advanced past the * dynamic staleAfter threshold so the very first getToken call triggers an async refresh. From edacf5d0c73ea0f607ac59917b9af64e21940ebf Mon Sep 17 00:00:00 2001 From: Mihai Mitrea Date: Wed, 11 Mar 2026 11:18:42 +0000 Subject: [PATCH 4/4] Fix spotless formatting and async backoff test race condition Fix two CI failures introduced by the previous commit: - Apply spotless-expected formatting to the new test (Javadoc line wrap, assertTrue collapse). - Wait for both staleAfter update and refreshInProgress reset before proceeding in backoff tests. Moving refreshInProgress = false to a finally block created a window where staleAfter was already updated but refreshInProgress was still true, causing triggerAsyncRefresh to bail out on Java 8 macOS. - Extract getRefreshInProgress/getRefreshInProgressUnchecked helpers and reuse them in testAsyncRefreshDiscardsOlderToken. --- .../sdk/core/oauth/CachedTokenSourceTest.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java index 9083159fa..5cc94c6ef 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java @@ -54,6 +54,12 @@ private static Instant getStaleAfter(CachedTokenSource source) throws Exception return (Instant) staleAfterField.get(source); } + private static boolean getRefreshInProgress(CachedTokenSource source) throws Exception { + Field f = CachedTokenSource.class.getDeclaredField("refreshInProgress"); + f.setAccessible(true); + return (boolean) f.get(source); + } + /** * Polls a condition until it becomes true or the timeout expires. This keeps the async-refresh * tests deterministic without relying on long fixed sleeps. @@ -266,8 +272,10 @@ void testGetTokenDoesNotRetryBeforeAsyncBackoffElapses() throws Exception { Instant backoffThreshold = BASE_TIME.plus(Duration.ofMinutes(7)); awaitCondition( - "staleAfter should be pushed forward by the async refresh backoff", - () -> backoffThreshold.equals(getStaleAfterUnchecked(source))); + "async refresh cycle should complete (staleAfter pushed forward, refreshInProgress reset)", + () -> + backoffThreshold.equals(getStaleAfterUnchecked(source)) + && !getRefreshInProgressUnchecked(source)); assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); assertFalse( @@ -302,8 +310,10 @@ void testGetTokenRetriesAfterAsyncBackoffElapsesAndUpdatesToken() throws Excepti Instant backoffThreshold = BASE_TIME.plus(Duration.ofMinutes(7)); awaitCondition( - "staleAfter should be pushed forward by the async refresh backoff", - () -> backoffThreshold.equals(getStaleAfterUnchecked(source))); + "async refresh cycle should complete (staleAfter pushed forward, refreshInProgress reset)", + () -> + backoffThreshold.equals(getStaleAfterUnchecked(source)) + && !getRefreshInProgressUnchecked(source)); clockSupplier.advanceTime(Duration.ofMinutes(2)); @@ -320,7 +330,8 @@ void testGetTokenRetriesAfterAsyncBackoffElapsesAndUpdatesToken() throws Excepti /** * Verifies that an async refresh result is discarded when the cache already holds a token with a * later expiry. This covers the concurrent scenario where a blocking refresh runs while an async - * refresh is in flight: the async result is older and should not overwrite the newer cached token. + * refresh is in flight: the async result is older and should not overwrite the newer cached + * token. */ @Test void testAsyncRefreshDiscardsOlderToken() throws Exception { @@ -360,9 +371,7 @@ void testAsyncRefreshDiscardsOlderToken() throws Exception { clockSupplier.advanceTime(Duration.ofMinutes(6)); Token staleResult = source.getToken(); assertEquals(INITIAL_TOKEN, staleResult.getAccessToken()); - assertTrue( - asyncRefreshStarted.await(1, TimeUnit.SECONDS), - "Async refresh should have started"); + assertTrue(asyncRefreshStarted.await(1, TimeUnit.SECONDS), "Async refresh should have started"); // While async refresh is blocked, advance time so the token expires and force a blocking // refresh that installs a newer token. @@ -374,15 +383,7 @@ void testAsyncRefreshDiscardsOlderToken() throws Exception { allowAsyncToFinish.countDown(); awaitCondition( "refreshInProgress should be reset after the async refresh completes", - () -> { - try { - Field f = CachedTokenSource.class.getDeclaredField("refreshInProgress"); - f.setAccessible(true); - return !(boolean) f.get(source); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + () -> !getRefreshInProgressUnchecked(source)); assertEquals( "newer-blocking-token", @@ -413,4 +414,12 @@ private static Instant getStaleAfterUnchecked(CachedTokenSource source) { throw new RuntimeException(e); } } + + private static boolean getRefreshInProgressUnchecked(CachedTokenSource source) { + try { + return getRefreshInProgress(source); + } catch (Exception e) { + throw new RuntimeException(e); + } + } }