diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 8bd3f4cdc..c9e82bee0 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -11,5 +11,6 @@ ### Documentation ### Internal Changes +* Add retry with backoff to `CachedTokenSource` async refresh so that a failed background refresh no longer disables async until a blocking call succeeds. ### API Changes diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java index f5024640c..6f3d63ec5 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/oauth/CachedTokenSource.java @@ -38,21 +38,23 @@ private enum TokenState { // monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes) while increasing the likelihood // that the token is refreshed asynchronously if the auth server is down. private static final Duration MAX_STALE_DURATION = Duration.ofMinutes(20); + // Delay before another async refresh may be attempted after an async refresh failure. + private static final Duration ASYNC_REFRESH_RETRY_BACKOFF = Duration.ofMinutes(1); // Default additional buffer before expiry to consider a token as expired. // This is 40 seconds by default since Azure Databricks rejects tokens that are within 30 seconds // of expiry. private static final Duration DEFAULT_EXPIRY_BUFFER = Duration.ofSeconds(40); - // The token source to use for refreshing the token. + // Underlying token source used to fetch replacement tokens. private final TokenSource tokenSource; // Whether asynchronous refresh is enabled. private boolean asyncDisabled = false; // The legacy duration before expiry to consider a token as 'stale'. private final Duration staticStaleDuration; // Whether to use the dynamic stale duration computation or defer to the legacy duration. - private final boolean useDynamicStaleDuration; - // The dynamically computed duration before expiry to consider a token as 'stale'. - private volatile Duration dynamicStaleDuration; + private final boolean useLegacyStaleDuration; + // The earliest time at which the cached token should be considered stale. + private volatile Instant staleAfter; // Additional buffer before expiry to consider a token as expired. private final Duration expiryBuffer; // Clock supplier for current time. @@ -62,23 +64,16 @@ private enum TokenState { protected volatile Token token; // Whether a refresh is currently in progress (for async refresh). private boolean refreshInProgress = false; - // Whether the last refresh attempt succeeded. - private boolean lastRefreshSucceeded = true; private CachedTokenSource(Builder builder) { this.tokenSource = builder.tokenSource; this.asyncDisabled = builder.asyncDisabled; this.staticStaleDuration = builder.staleDuration; - this.useDynamicStaleDuration = builder.useDynamicStaleDuration; + this.useLegacyStaleDuration = builder.useLegacyStaleDuration; this.expiryBuffer = builder.expiryBuffer; this.clockSupplier = builder.clockSupplier; - this.token = builder.token; - if (this.useDynamicStaleDuration && this.token != null) { - this.dynamicStaleDuration = computeStaleDuration(this.token); - } else { - this.dynamicStaleDuration = Duration.ofMinutes(0); - } + this.updateToken(builder.token); } /** @@ -91,7 +86,7 @@ public static class Builder { private final TokenSource tokenSource; private boolean asyncDisabled = false; private Duration staleDuration = DEFAULT_STALE_DURATION; - private boolean useDynamicStaleDuration = true; + private boolean useLegacyStaleDuration = false; private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER; private ClockSupplier clockSupplier = new UtcClockSupplier(); private Token token; @@ -139,7 +134,10 @@ public Builder setAsyncDisabled(boolean asyncDisabled) { * Sets the duration before token expiry at which the token is considered stale. * *

When asynchronous refresh is enabled, tokens that are stale but not yet expired will - * trigger a background refresh while continuing to serve the current token. + * trigger a background refresh while continuing to serve the current token. Calling this method + * opts into the legacy fixed stale-duration behavior instead of the default dynamic stale + * computation, preserving backward compatibility for callers that already provide a custom + * stale duration. * * @param staleDuration The duration before expiry to consider a token stale. Must be greater * than the expiry buffer duration. @@ -147,7 +145,7 @@ public Builder setAsyncDisabled(boolean asyncDisabled) { */ public Builder setStaleDuration(Duration staleDuration) { this.staleDuration = staleDuration; - this.useDynamicStaleDuration = false; + this.useLegacyStaleDuration = true; return this; } @@ -190,6 +188,71 @@ public CachedTokenSource build() { } } + /** + * Replaces the cached token and recomputes the time after which it should be treated as stale. + * + *

Legacy mode uses the configured fixed stale duration. Dynamic mode derives the stale window + * from the token's remaining TTL and caps it at {@link #MAX_STALE_DURATION}. The stale threshold + * is written before the volatile token write so readers that observe the new token also observe + * the matching {@code staleAfter} value. + * + * @param t The token to cache. May be null. + */ + private void updateToken(Token t) { + if (t == null || t.getExpiry() == null) { + this.staleAfter = null; + this.token = t; + return; + } + + if (this.useLegacyStaleDuration) { + this.staleAfter = t.getExpiry().minus(staticStaleDuration); + } else { + Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry()); + Duration staleDuration = ttl.dividedBy(2); + if (staleDuration.compareTo(MAX_STALE_DURATION) > 0) { + staleDuration = MAX_STALE_DURATION; + } + if (staleDuration.compareTo(Duration.ZERO) <= 0) { + staleDuration = Duration.ZERO; + } + + this.staleAfter = t.getExpiry().minus(staleDuration); + } + + // Publish the token after staleAfter so readers that observe the new token also observe the + // stale threshold computed for that token. Note: handleFailedAsyncRefresh writes staleAfter + // without a subsequent volatile token write, so a concurrent reader may briefly see a stale + // staleAfter value; the only consequence is one extra async trigger, which is harmless. + this.token = t; + } + + /** + * Delays the next async refresh attempt after an async refresh failure. + * + *

The cached token remains usable until it becomes expired. Moving {@code staleAfter} into the + * future prevents callers from immediately retrying async refresh on every stale read while the + * auth service is unhealthy. + */ + private void handleFailedAsyncRefresh() { + if (this.staleAfter != null) { + Instant now = Instant.now(clockSupplier.getClock()); + this.staleAfter = now.plus(ASYNC_REFRESH_RETRY_BACKOFF); + } + } + + /** + * Returns {@code true} when the currently cached token has a later expiry than {@code candidate}, + * meaning the candidate should be discarded. This prevents an async refresh that was started + * before a blocking refresh from overwriting the newer token obtained by the blocking path. + */ + private boolean cachedTokenIsNewer(Token candidate) { + return token != null + && token.getExpiry() != null + && candidate.getExpiry() != null + && token.getExpiry().isAfter(candidate.getExpiry()); + } + /** * Gets the current token, refreshing if necessary. If async refresh is enabled, may return a * stale token while a refresh is in progress. @@ -206,21 +269,6 @@ public Token getToken() { return getTokenAsync(); } - private Duration computeStaleDuration(Token t) { - if (t.getExpiry() == null) { - return Duration.ZERO; // Tokens with no expiry are considered permanent. - } - - Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry()); - - if (ttl.compareTo(Duration.ZERO) <= 0) { - return Duration.ZERO; - } - - Duration halfTtl = ttl.dividedBy(2); - return halfTtl.compareTo(MAX_STALE_DURATION) > 0 ? MAX_STALE_DURATION : halfTtl; - } - /** * Determine the state of the current token (fresh, stale, or expired). * @@ -234,12 +282,11 @@ protected TokenState getTokenState(Token t) { return TokenState.FRESH; // Tokens with no expiry are considered permanent. } - Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry()); - if (lifeTime.compareTo(expiryBuffer) <= 0) { + Instant now = Instant.now(clockSupplier.getClock()); + if (now.isAfter(t.getExpiry().minus(expiryBuffer))) { return TokenState.EXPIRED; } - Duration staleDuration = useDynamicStaleDuration ? dynamicStaleDuration : staticStaleDuration; - if (lifeTime.compareTo(staleDuration) <= 0) { + if (now.isAfter(staleAfter)) { return TokenState.STALE; } return TokenState.FRESH; @@ -265,7 +312,6 @@ protected Token getTokenBlocking() { if (getTokenState(token) != TokenState.EXPIRED) { return token; } - lastRefreshSucceeded = false; Token newToken; try { newToken = tokenSource.getToken(); @@ -273,15 +319,8 @@ protected Token getTokenBlocking() { logger.error("Failed to refresh token synchronously", e); throw e; } - lastRefreshSucceeded = true; - // Write dynamicStaleDuration before publishing the new token via the volatile write, - // so unsynchronized readers that see the new token are guaranteed to also see the - // updated dynamicStaleDuration. - if (useDynamicStaleDuration && newToken != null) { - dynamicStaleDuration = computeStaleDuration(newToken); - } - token = newToken; + updateToken(newToken); return token; } } @@ -316,33 +355,33 @@ protected Token getTokenAsync() { * succeeded. */ private synchronized void triggerAsyncRefresh() { - // Check token state again inside the synchronized block to avoid triggering a refresh if - // another thread updated the token in the meantime. - if (!refreshInProgress && lastRefreshSucceeded && getTokenState(token) != TokenState.FRESH) { - refreshInProgress = true; - CompletableFuture.runAsync( - () -> { - try { - // Attempt to refresh the token in the background. - Token newToken = tokenSource.getToken(); - synchronized (this) { - // Write dynamicStaleDuration before publishing the new token via the volatile - // write, so unsynchronized readers that see the new token are guaranteed to also - // see the updated dynamicStaleDuration. - if (useDynamicStaleDuration && newToken != null) { - dynamicStaleDuration = computeStaleDuration(newToken); - } - token = newToken; - refreshInProgress = false; - } - } catch (Exception e) { - synchronized (this) { - lastRefreshSucceeded = false; - refreshInProgress = false; - logger.error("Asynchronous token refresh failed", e); + // Re-check inside the synchronized block: another thread may have updated the token. + // Only STALE triggers async refresh; EXPIRED tokens are handled by getTokenBlocking, so + // an async attempt is unnecessary and would race with the blocking path. + if (refreshInProgress || getTokenState(token) != TokenState.STALE) { + return; + } + + refreshInProgress = true; + CompletableFuture.runAsync( + () -> { + try { + Token newToken = tokenSource.getToken(); + synchronized (this) { + if (newToken != null && !cachedTokenIsNewer(newToken)) { + updateToken(newToken); } } - }); - } + } catch (Exception e) { + synchronized (this) { + handleFailedAsyncRefresh(); + logger.error("Asynchronous token refresh failed", e); + } + } finally { + synchronized (this) { + refreshInProgress = false; + } + } + }); } } diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java index ecb531f03..5cc94c6ef 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/oauth/CachedTokenSourceTest.java @@ -3,10 +3,13 @@ import static org.junit.jupiter.api.Assertions.*; import com.databricks.sdk.core.utils.TestClockSupplier; +import java.lang.reflect.Field; import java.time.Duration; import java.time.Instant; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -17,6 +20,7 @@ public class CachedTokenSourceTest { private static final String TOKEN_TYPE = "Bearer"; private static final String INITIAL_TOKEN = "initial-token"; private static final String REFRESH_TOKEN = "refreshed-token"; + private static final Instant BASE_TIME = Instant.parse("2026-03-09T00:00:00Z"); private static final long FRESH_MINUTES = 10; @@ -36,6 +40,41 @@ public class CachedTokenSourceTest { private static final long EXPIRED_MINUTES = -1; + private static Token tokenExpiringAt(String accessToken, Instant expiry) { + return new Token(accessToken, TOKEN_TYPE, null, expiry); + } + + /** + * Reads the private {@code staleAfter} field so the tests can verify the exact threshold computed + * by {@link CachedTokenSource} without changing the production API just for test visibility. + */ + private static Instant getStaleAfter(CachedTokenSource source) throws Exception { + Field staleAfterField = CachedTokenSource.class.getDeclaredField("staleAfter"); + staleAfterField.setAccessible(true); + return (Instant) staleAfterField.get(source); + } + + private static boolean getRefreshInProgress(CachedTokenSource source) throws Exception { + Field f = CachedTokenSource.class.getDeclaredField("refreshInProgress"); + f.setAccessible(true); + return (boolean) f.get(source); + } + + /** + * Polls a condition until it becomes true or the timeout expires. This keeps the async-refresh + * tests deterministic without relying on long fixed sleeps. + */ + private static void awaitCondition(String message, BooleanSupplier condition) throws Exception { + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(1); + while (System.nanoTime() < deadlineNanos) { + if (condition.getAsBoolean()) { + return; + } + Thread.sleep(10); + } + fail(message); + } + private static Stream provideAsyncRefreshScenarios() { return Stream.of( Arguments.of("Fresh token, async enabled", FRESH_MINUTES, 0L, false, false, INITIAL_TOKEN), @@ -126,105 +165,261 @@ public Token getToken() { } /** - * This test verifies that if an asynchronous token refresh fails, the next refresh attempt is - * forced to be synchronous. It ensures that after an async failure, the system does not - * repeatedly attempt async refreshes while the token is stale, and only performs a synchronous - * refresh when the token is expired. After a successful sync refresh, async refreshes resume as - * normal. + * Supplies coverage for the two stale-threshold strategies supported by the cache: + * + *

Dynamic mode computes {@code staleAfter} from the token TTL and caps the stale window at + * twenty minutes. Legacy mode uses the stale duration explicitly provided through the builder. + * The scenarios also cover null initial tokens and already-expired tokens to document how those + * edge cases initialize the cached threshold. */ - @Test - void testAsyncRefreshFailureFallback() throws Exception { - // Create a mutable clock supplier that we can control - TestClockSupplier clockSupplier = new TestClockSupplier(Instant.now()); - - // Create a token with a TTL of 4 minutes that will be stale in 3 minutes. - Token staleToken = - new Token( - INITIAL_TOKEN, - TOKEN_TYPE, + private static Stream provideStaleAfterScenarios() { + return Stream.of( + Arguments.of( + "Dynamic staleAfter uses half the TTL", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(4))), null, - Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(4))); - - class TestSource implements TokenSource { - int refreshCallCount = 0; - boolean isFirstRefresh = true; - - @Override - public Token getToken() { - refreshCallCount++; - try { - // Sleep to simulate token fetch delay - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (isFirstRefresh) { - isFirstRefresh = false; - throw new RuntimeException("Simulated async failure"); - } - // Return a token that expires in 10 minutes from current time - return new Token( - REFRESH_TOKEN + "-" + refreshCallCount, - TOKEN_TYPE, + BASE_TIME.plus(Duration.ofMinutes(2))), + Arguments.of( + "Dynamic staleAfter is capped at twenty minutes", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(60))), null, - Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10))); - } + BASE_TIME.plus(Duration.ofMinutes(40))), + Arguments.of( + "Dynamic staleAfter clamps expired tokens to expiry", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.minus(Duration.ofMinutes(1))), + null, + BASE_TIME.minus(Duration.ofMinutes(1))), + Arguments.of( + "Legacy staleAfter uses the builder stale duration", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(10))), + Duration.ofMinutes(3), + BASE_TIME.plus(Duration.ofMinutes(7))), + Arguments.of( + "Legacy staleAfter honors a stale duration longer than the remaining TTL", + tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(4))), + Duration.ofMinutes(5), + BASE_TIME.minus(Duration.ofMinutes(1))), + Arguments.of("No initial token leaves staleAfter unset in dynamic mode", null, null, null), + Arguments.of( + "No initial token leaves staleAfter unset in legacy mode", + null, + Duration.ofMinutes(5), + null)); + } + + /** + * Verifies that {@link CachedTokenSource} computes {@code staleAfter} correctly for both the + * default dynamic behavior and the legacy builder-provided stale duration path. + * + *

Each scenario builds the cache with a deterministic test clock, inspects the private {@code + * staleAfter} field via reflection, and compares it with the expected threshold for the + * configured token and builder options. The token source is a stub because this test only + * validates initialization logic and should never trigger a refresh. + */ + @ParameterizedTest(name = "{0}") + @MethodSource("provideStaleAfterScenarios") + void testStaleAfterComputationParametrized( + String testName, Token initialToken, Duration staleDuration, Instant expectedStaleAfter) + throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + + CachedTokenSource.Builder builder = + new CachedTokenSource.Builder( + () -> { + throw new UnsupportedOperationException( + "Token refresh should not be invoked when computing staleAfter"); + }) + .setClockSupplier(clockSupplier); + + if (initialToken != null) { + builder.setToken(initialToken); } + if (staleDuration != null) { + builder.setStaleDuration(staleDuration); + } + + CachedTokenSource source = builder.build(); + + assertEquals( + expectedStaleAfter, + getStaleAfter(source), + "Computed staleAfter did not match the expected value"); + } + + /** + * Verifies that once an async refresh fails, repeated reads before the one-minute backoff has + * elapsed keep serving the cached token and do not trigger another refresh attempt. + */ + @Test + void testGetTokenDoesNotRetryBeforeAsyncBackoffElapses() throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + AtomicInteger refreshCallCount = new AtomicInteger(); + CountDownLatch secondAttemptStarted = new CountDownLatch(1); + + TokenSource tokenSource = + () -> { + if (refreshCallCount.incrementAndGet() == 1) { + throw new RuntimeException("Simulated async failure"); + } + secondAttemptStarted.countDown(); + return tokenExpiringAt( + REFRESH_TOKEN, Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10))); + }; + + CachedTokenSource source = buildStaleSource(tokenSource, clockSupplier); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + + Instant backoffThreshold = BASE_TIME.plus(Duration.ofMinutes(7)); + awaitCondition( + "async refresh cycle should complete (staleAfter pushed forward, refreshInProgress reset)", + () -> + backoffThreshold.equals(getStaleAfterUnchecked(source)) + && !getRefreshInProgressUnchecked(source)); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + assertFalse( + secondAttemptStarted.await(200, TimeUnit.MILLISECONDS), + "No second refresh should start before the backoff has elapsed"); + assertEquals(1, refreshCallCount.get(), "Only the failed async refresh should have run"); + } + + /** + * Verifies that once the async-refresh backoff has elapsed, the next stale read starts another + * async refresh and the refreshed token is eventually published to the cache. + */ + @Test + void testGetTokenRetriesAfterAsyncBackoffElapsesAndUpdatesToken() throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + AtomicInteger refreshCallCount = new AtomicInteger(); + CountDownLatch secondAttemptStarted = new CountDownLatch(1); + + TokenSource tokenSource = + () -> { + if (refreshCallCount.incrementAndGet() == 1) { + throw new RuntimeException("Simulated async failure"); + } + secondAttemptStarted.countDown(); + return tokenExpiringAt( + REFRESH_TOKEN, Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10))); + }; + + CachedTokenSource source = buildStaleSource(tokenSource, clockSupplier); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + + Instant backoffThreshold = BASE_TIME.plus(Duration.ofMinutes(7)); + awaitCondition( + "async refresh cycle should complete (staleAfter pushed forward, refreshInProgress reset)", + () -> + backoffThreshold.equals(getStaleAfterUnchecked(source)) + && !getRefreshInProgressUnchecked(source)); + + clockSupplier.advanceTime(Duration.ofMinutes(2)); + + assertEquals(INITIAL_TOKEN, source.getToken().getAccessToken()); + assertTrue( + secondAttemptStarted.await(1, TimeUnit.SECONDS), + "A new async refresh should start once the backoff has elapsed"); + awaitCondition( + "The refreshed token should be published after the retry succeeds", + () -> REFRESH_TOKEN.equals(source.getToken().getAccessToken())); + assertEquals(2, refreshCallCount.get(), "The cache should retry exactly once after backoff"); + } + + /** + * Verifies that an async refresh result is discarded when the cache already holds a token with a + * later expiry. This covers the concurrent scenario where a blocking refresh runs while an async + * refresh is in flight: the async result is older and should not overwrite the newer cached + * token. + */ + @Test + void testAsyncRefreshDiscardsOlderToken() throws Exception { + TestClockSupplier clockSupplier = new TestClockSupplier(BASE_TIME); + + Token olderRefreshToken = + tokenExpiringAt("older-async-token", BASE_TIME.plus(Duration.ofMinutes(8))); + Token newerBlockingToken = + tokenExpiringAt("newer-blocking-token", BASE_TIME.plus(Duration.ofMinutes(20))); + + CountDownLatch asyncRefreshStarted = new CountDownLatch(1); + CountDownLatch allowAsyncToFinish = new CountDownLatch(1); + AtomicInteger refreshCallCount = new AtomicInteger(); + + TokenSource tokenSource = + () -> { + int call = refreshCallCount.incrementAndGet(); + if (call == 1) { + asyncRefreshStarted.countDown(); + try { + allowAsyncToFinish.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return olderRefreshToken; + } + return newerBlockingToken; + }; - TestSource testSource = new TestSource(); CachedTokenSource source = - new CachedTokenSource.Builder(testSource) - .setAsyncDisabled(false) - .setToken(staleToken) + new CachedTokenSource.Builder(tokenSource) + .setToken(tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(10)))) .setClockSupplier(clockSupplier) .build(); - // Advance clock to put the token in the stale window. - clockSupplier.advanceTime(Duration.ofMinutes(3)); + // Advance clock so the token is stale, triggering an async refresh. + clockSupplier.advanceTime(Duration.ofMinutes(6)); + Token staleResult = source.getToken(); + assertEquals(INITIAL_TOKEN, staleResult.getAccessToken()); + assertTrue(asyncRefreshStarted.await(1, TimeUnit.SECONDS), "Async refresh should have started"); - // First call triggers async refresh, which fails - // Should return stale token immediately (async refresh) - Token token = source.getToken(); - assertEquals(INITIAL_TOKEN, token.getAccessToken(), "Should return stale token immediately"); - Thread.sleep(600); - assertEquals( - 1, testSource.refreshCallCount, "refresh() should have been called once (async, failed)"); + // While async refresh is blocked, advance time so the token expires and force a blocking + // refresh that installs a newer token. + clockSupplier.advanceTime(Duration.ofMinutes(4)); + Token blockingResult = source.getToken(); + assertEquals("newer-blocking-token", blockingResult.getAccessToken()); + + // Let the async refresh finish — its older token should be discarded. + allowAsyncToFinish.countDown(); + awaitCondition( + "refreshInProgress should be reset after the async refresh completes", + () -> !getRefreshInProgressUnchecked(source)); - // Token is still stale, so next call should NOT trigger another refresh since the last refresh - // failed - token = source.getToken(); - assertEquals(INITIAL_TOKEN, token.getAccessToken(), "Should still return stale token"); - Thread.sleep(600); assertEquals( - 1, - testSource.refreshCallCount, - "refresh() should NOT be called again while stale after async failure"); + "newer-blocking-token", + source.getToken().getAccessToken(), + "The newer blocking token should still be cached after the older async result is discarded"); + } + + /** + * Builds a CachedTokenSource whose initial token is already stale. The clock is advanced past the + * dynamic staleAfter threshold so the very first getToken call triggers an async refresh. + */ + private static CachedTokenSource buildStaleSource( + TokenSource tokenSource, TestClockSupplier clockSupplier) { + CachedTokenSource source = + new CachedTokenSource.Builder(tokenSource) + .setToken(tokenExpiringAt(INITIAL_TOKEN, BASE_TIME.plus(Duration.ofMinutes(10)))) + .setClockSupplier(clockSupplier) + .build(); + clockSupplier.advanceTime(Duration.ofMinutes(6)); + return source; + } - // Advance time by 3 minutes to make the token expired - clockSupplier.advanceTime(Duration.ofMinutes(3)); + /** Wraps the checked reflection helper so it can be used inside polling lambdas. */ + private static Instant getStaleAfterUnchecked(CachedTokenSource source) { + try { + return getStaleAfter(source); + } catch (Exception e) { + throw new RuntimeException(e); + } + } - // Now getToken() should call refresh synchronously and return the refreshed token - token = source.getToken(); - assertEquals( - REFRESH_TOKEN + "-2", - token.getAccessToken(), - "Should return the refreshed token after sync refresh"); - Thread.sleep(600); - assertEquals( - 2, - testSource.refreshCallCount, - "refresh() should have been called synchronously after expiry"); - - // Advance time by 8 minutes to make the token stale again - clockSupplier.advanceTime(Duration.ofMinutes(8)); - // Should return stale token immediately (async refresh) - token = source.getToken(); - assertEquals( - REFRESH_TOKEN + "-2", token.getAccessToken(), "Should return stale token immediately"); - Thread.sleep(600); - assertEquals( - 3, - testSource.refreshCallCount, - "refresh() should have been called again asynchronously after making token stale"); + private static boolean getRefreshInProgressUnchecked(CachedTokenSource source) { + try { + return getRefreshInProgress(source); + } catch (Exception e) { + throw new RuntimeException(e); + } } }