diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java
index a801cc51f..e8ef0dd4b 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java
@@ -51,10 +51,11 @@ public class H2Config {
private final int maxHeaderListSize;
private final boolean compressionEnabled;
private final int maxContinuations;
+ private final H2PingPolicy pingPolicy;
H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams,
final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize,
- final boolean compressionEnabled, final int maxContinuations) {
+ final boolean compressionEnabled, final int maxContinuations, final H2PingPolicy pingPolicy) {
super();
this.headerTableSize = headerTableSize;
this.pushEnabled = pushEnabled;
@@ -64,6 +65,7 @@ public class H2Config {
this.maxHeaderListSize = maxHeaderListSize;
this.compressionEnabled = compressionEnabled;
this.maxContinuations = maxContinuations;
+ this.pingPolicy = pingPolicy;
}
public int getHeaderTableSize() {
@@ -98,6 +100,15 @@ public int getMaxContinuations() {
return maxContinuations;
}
+ /**
+ * Optional keep-alive PING policy.
+ *
+ * @since 5.5
+ */
+ public H2PingPolicy getPingPolicy() {
+ return pingPolicy;
+ }
+
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
@@ -109,6 +120,7 @@ public String toString() {
.append(", maxHeaderListSize=").append(this.maxHeaderListSize)
.append(", compressionEnabled=").append(this.compressionEnabled)
.append(", maxContinuations=").append(this.maxContinuations)
+ .append(", pingPolicy=").append(this.pingPolicy)
.append("]");
return builder.toString();
}
@@ -142,7 +154,9 @@ public static H2Config.Builder copy(final H2Config config) {
.setInitialWindowSize(config.getInitialWindowSize())
.setMaxFrameSize(config.getMaxFrameSize())
.setMaxHeaderListSize(config.getMaxHeaderListSize())
- .setCompressionEnabled(config.isCompressionEnabled());
+ .setCompressionEnabled(config.isCompressionEnabled())
+ .setMaxContinuations(config.getMaxContinuations())
+ .setPingPolicy(config.getPingPolicy());
}
public static class Builder {
@@ -155,6 +169,7 @@ public static class Builder {
private int maxHeaderListSize;
private boolean compressionEnabled;
private int maxContinuations;
+ private H2PingPolicy pingPolicy;
Builder() {
this.headerTableSize = INIT_HEADER_TABLE_SIZE * 2;
@@ -165,6 +180,7 @@ public static class Builder {
this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE;
this.compressionEnabled = true;
this.maxContinuations = 100;
+ this.pingPolicy = null;
}
public Builder setHeaderTableSize(final int headerTableSize) {
@@ -211,7 +227,7 @@ public Builder setCompressionEnabled(final boolean compressionEnabled) {
* Sets max limit on number of continuations.
*
value zero represents no limit
*
- * @since 5,4
+ * @since 5.4
*/
public Builder setMaxContinuations(final int maxContinuations) {
Args.positive(maxContinuations, "Max continuations");
@@ -219,6 +235,16 @@ public Builder setMaxContinuations(final int maxContinuations) {
return this;
}
+ /**
+ * Sets optional keep-alive PING policy.
+ *
+ * @since 5.5
+ */
+ public Builder setPingPolicy(final H2PingPolicy pingPolicy) {
+ this.pingPolicy = pingPolicy;
+ return this;
+ }
+
public H2Config build() {
return new H2Config(
headerTableSize,
@@ -228,7 +254,8 @@ public H2Config build() {
maxFrameSize,
maxHeaderListSize,
compressionEnabled,
- maxContinuations);
+ maxContinuations,
+ pingPolicy);
}
}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java
new file mode 100644
index 000000000..fc81908fd
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java
@@ -0,0 +1,103 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.core5.http2.config;
+
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * HTTP/2 keep-alive ping policy.
+ *
+ * @since 5.5
+ */
+public final class H2PingPolicy {
+
+ private static final H2PingPolicy DISABLED = new H2PingPolicy(Timeout.DISABLED, Timeout.DISABLED);
+
+ private final Timeout idleTime;
+ private final Timeout ackTimeout;
+
+ private H2PingPolicy(final Timeout idleTime, final Timeout ackTimeout) {
+ this.idleTime = idleTime;
+ this.ackTimeout = ackTimeout;
+ }
+
+ public static H2PingPolicy disabled() {
+ return DISABLED;
+ }
+
+ public static Builder custom() {
+ return new Builder();
+ }
+
+ public Timeout getIdleTime() {
+ return idleTime;
+ }
+
+ public Timeout getAckTimeout() {
+ return ackTimeout;
+ }
+
+ public boolean isEnabled() {
+ return isActive(idleTime) && isActive(ackTimeout);
+ }
+
+ private static boolean isActive(final Timeout timeout) {
+ return timeout != null && timeout.isEnabled() && TimeValue.isPositive(timeout);
+ }
+
+ public static final class Builder {
+
+ private Timeout idleTime;
+ private Timeout ackTimeout;
+
+ private Builder() {
+ this.idleTime = Timeout.DISABLED;
+ this.ackTimeout = Timeout.DISABLED;
+ }
+
+ public Builder setIdleTime(final Timeout idleTime) {
+ this.idleTime = Args.notNull(idleTime, "idleTime");
+ return this;
+ }
+
+ public Builder setAckTimeout(final Timeout ackTimeout) {
+ this.ackTimeout = Args.notNull(ackTimeout, "ackTimeout");
+ return this;
+ }
+
+ public H2PingPolicy build() {
+ if (isActive(idleTime)) {
+ Args.check(isActive(ackTimeout), "ackTimeout must be positive when idleTime is enabled");
+ }
+ return new H2PingPolicy(idleTime, ackTimeout);
+ }
+ }
+
+}
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
index 5c4b5b397..5cf88ecf4 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java
@@ -77,6 +77,7 @@
import org.apache.hc.core5.http2.H2StreamTimeoutException;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.config.H2Param;
+import org.apache.hc.core5.http2.config.H2PingPolicy;
import org.apache.hc.core5.http2.config.H2Setting;
import org.apache.hc.core5.http2.frame.FrameFactory;
import org.apache.hc.core5.http2.frame.FrameFlag;
@@ -125,6 +126,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private final AtomicInteger connOutputWindow;
private final AtomicInteger outputRequests;
private final H2StreamListener streamListener;
+ private final KeepAlivePingSupport keepAlivePingSupport;
private ConnectionHandshake connState = ConnectionHandshake.READY;
private SettingsHandshake localSettingState = SettingsHandshake.READY;
@@ -183,6 +185,11 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
this.streamListener = streamListener;
+
+ final H2PingPolicy pingPolicy = this.localConfig.getPingPolicy();
+ this.keepAlivePingSupport = pingPolicy != null && pingPolicy.isEnabled()
+ ? new KeepAlivePingSupport(pingPolicy)
+ : null;
}
@Override
@@ -444,6 +451,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
for (;;) {
final RawFrame frame = inputBuffer.read(src, ioSession);
if (frame != null) {
+ if (keepAlivePingSupport != null) {
+ keepAlivePingSupport.onFrameInput(frame);
+ }
if (streamListener != null) {
streamListener.onFrameInput(this, frame.getStreamId(), frame);
}
@@ -487,6 +497,10 @@ public final void onOutput() throws HttpException, IOException {
ioSession.getLock().unlock();
}
+ if (keepAlivePingSupport != null) {
+ keepAlivePingSupport.activateIfReady();
+ }
+
if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
@@ -592,6 +606,15 @@ public final void onOutput() throws HttpException, IOException {
}
public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
+ if (keepAlivePingSupport != null
+ && connState.compareTo(ConnectionHandshake.ACTIVE) <= 0
+ && localSettingState == SettingsHandshake.ACKED
+ && remoteSettingState == SettingsHandshake.ACKED) {
+ if (keepAlivePingSupport.onTimeout(timeout)) {
+ return;
+ }
+ }
+
connState = ConnectionHandshake.SHUTDOWN;
final RawFrame goAway;
@@ -888,6 +911,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
}
if (frame.isFlagSet(FrameFlag.ACK)) {
+ if (keepAlivePingSupport != null && keepAlivePingSupport.consumePingAck(ping)) {
+ break;
+ }
final AsyncPingHandler pingHandler = pingHandlers.poll();
if (pingHandler != null) {
pingHandler.consumeResponse(ping);
@@ -910,6 +936,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
localSettingState = SettingsHandshake.ACKED;
ioSession.setEvent(SelectionKey.OP_WRITE);
applyLocalSettings();
+ if (keepAlivePingSupport != null) {
+ keepAlivePingSupport.activateIfReady();
+ }
}
} else {
final ByteBuffer payload = frame.getPayload();
@@ -923,6 +952,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
final RawFrame response = frameFactory.createSettingsAck();
commitFrame(response);
remoteSettingState = SettingsHandshake.ACKED;
+ if (keepAlivePingSupport != null) {
+ keepAlivePingSupport.activateIfReady();
+ }
}
}
break;
@@ -1328,6 +1360,141 @@ void appendState(final StringBuilder buf) {
.append(", streams.lastRemote=").append(streams.getLastRemoteId());
}
+ private final class KeepAlivePingSupport {
+
+ private static final int PING_DATA_LEN = 8;
+
+ private final Timeout idleTime;
+ private final Timeout ackTimeout;
+
+ private boolean active;
+ private boolean awaitingAck;
+
+ private long lastActivityNanos;
+ private long pingSeq;
+ private long expectedAckSeq;
+
+ KeepAlivePingSupport(final H2PingPolicy policy) {
+ Args.notNull(policy, "PING policy");
+ this.idleTime = policy.getIdleTime();
+ this.ackTimeout = policy.getAckTimeout();
+ this.active = false;
+ this.awaitingAck = false;
+ this.lastActivityNanos = System.nanoTime();
+ this.pingSeq = 0L;
+ this.expectedAckSeq = 0L;
+ }
+
+ void activateIfReady() {
+ if (active) {
+ return;
+ }
+ if (localSettingState == SettingsHandshake.ACKED && remoteSettingState == SettingsHandshake.ACKED) {
+ active = true;
+ awaitingAck = false;
+ lastActivityNanos = System.nanoTime();
+ ioSession.setSocketTimeout(idleTime);
+ }
+ }
+
+ void onFrameInput(final RawFrame frame) {
+ if (!active) {
+ return;
+ }
+ lastActivityNanos = System.nanoTime();
+ if (awaitingAck) {
+ if (!(frame.getType() == FrameType.PING.getValue() && frame.isFlagSet(FrameFlag.ACK))) {
+ awaitingAck = false;
+ }
+ }
+ ioSession.setSocketTimeout(idleTime);
+ }
+
+ void onActivity() {
+ if (!active) {
+ return;
+ }
+ lastActivityNanos = System.nanoTime();
+ if (awaitingAck) {
+ awaitingAck = false;
+ }
+ ioSession.setSocketTimeout(idleTime);
+ }
+
+ boolean consumePingAck(final ByteBuffer payload) {
+ if (!active || !awaitingAck) {
+ return false;
+ }
+ if (payload == null || payload.remaining() != PING_DATA_LEN) {
+ return false;
+ }
+ final long ack = payload.getLong(payload.position());
+ if (ack != expectedAckSeq) {
+ return false;
+ }
+ onActivity();
+ return true;
+ }
+
+ boolean onTimeout(final Timeout timeout) throws IOException {
+ activateIfReady();
+ if (!active) {
+ return false;
+ }
+
+ if (awaitingAck) {
+ shutdownKeepAlive(timeout);
+ return true;
+ }
+
+ final long idleNanos = idleTime.toMilliseconds() * 1_000_000L;
+ if (idleNanos > 0L) {
+ final long elapsed = System.nanoTime() - lastActivityNanos;
+ if (elapsed < idleNanos) {
+ final long remainingMs = Math.max(1L, (idleNanos - elapsed) / 1_000_000L);
+ ioSession.setSocketTimeout(Timeout.ofMilliseconds(remainingMs));
+ return true;
+ }
+ }
+
+ awaitingAck = true;
+ sendPing();
+ ioSession.setSocketTimeout(ackTimeout);
+ return true;
+ }
+
+ private void sendPing() throws IOException {
+ final long v = ++pingSeq;
+ expectedAckSeq = v;
+
+ final ByteBuffer payload = ByteBuffer.allocate(PING_DATA_LEN);
+ payload.putLong(v);
+ payload.flip();
+
+ final RawFrame ping = frameFactory.createPing(payload);
+ commitFrame(ping);
+ }
+
+ private void shutdownKeepAlive(final Timeout timeout) throws IOException {
+ connState = ConnectionHandshake.SHUTDOWN;
+
+ final RawFrame goAway = frameFactory.createGoAway(
+ streams.getLastRemoteId(),
+ H2Error.NO_ERROR,
+ "Ping response timeout (" + timeout + ")");
+ commitFrame(goAway);
+
+ for (final Iterator it = streams.iterator(); it.hasNext(); ) {
+ final H2Stream stream = it.next();
+ stream.fail(new H2StreamResetException(
+ H2Error.NO_ERROR,
+ "Ping response timeout (" + timeout + ")"));
+ }
+ streams.shutdownAndReleaseAll();
+ }
+
+ }
+
private static class Continuation {
final int streamId;
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java
new file mode 100644
index 000000000..6fb7505d8
--- /dev/null
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java
@@ -0,0 +1,257 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.core5.http2.examples;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.config.H2PingPolicy;
+import org.apache.hc.core5.http2.frame.FrameFlag;
+import org.apache.hc.core5.http2.frame.FrameType;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.H2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Minimal example demonstrating HTTP/2 connection keepalive using {@link H2PingPolicy}.
+ *
+ * The client configures an idle timeout and an ACK timeout. When the underlying HTTP/2
+ * connection becomes idle, the I/O reactor triggers a keepalive {@code PING}. If the
+ * peer responds with {@code PING[ACK]} within the configured ACK timeout, the connection
+ * remains usable; otherwise the connection is considered dead and is terminated by the
+ * transport.
+ *
+ *
+ * This example performs a single request to establish the connection and then waits
+ * long enough for one keepalive round-trip. It prints:
+ *
+ *
+ * - the remote endpoint once,
+ * - {@code >> PING} when a keepalive PING is emitted,
+ * - {@code << PING[ACK]} when the ACK is received,
+ * - a final counter line {@code keepalive: pingsOut=..., pingAcksIn=...}.
+ *
+ *
+ * Notes:
+ *
+ *
+ * - This is intentionally not a unit test; it is a runnable sanity-check and usage example.
+ * - Keepalive requires HTTP/2 settings negotiation to complete; PINGs may not be emitted
+ * immediately on startup.
+ * - Timing is inherently environment-dependent; adjust {@code idleTime}/{@code ackTimeout}
+ * if running on a slow or heavily loaded machine.
+ *
+ * @since 5.5
+ */
+
+public class H2KeepAlivePingClientExample {
+
+ public static void main(final String[] args) throws Exception {
+
+ final Timeout idleTime = Timeout.ofSeconds(1);
+ final Timeout ackTimeout = Timeout.ofSeconds(2);
+
+ final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setSoTimeout(5, TimeUnit.SECONDS)
+ .build();
+
+ final H2PingPolicy pingPolicy = H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build();
+
+ final H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ .setMaxConcurrentStreams(100)
+ .setPingPolicy(pingPolicy)
+ .build();
+
+ final AtomicBoolean remotePrinted = new AtomicBoolean(false);
+ final AtomicInteger pingsOut = new AtomicInteger(0);
+ final AtomicInteger pingAcksIn = new AtomicInteger(0);
+ final CountDownLatch pingAckLatch = new CountDownLatch(1);
+
+ final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+ .setIOReactorConfig(ioReactorConfig)
+ .setH2Config(h2Config)
+ .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
+ .setStreamListener(new H2StreamListener() {
+
+ private void printRemoteOnce(final HttpConnection connection) {
+ if (remotePrinted.compareAndSet(false, true)) {
+ System.out.println("remote=" + connection.getRemoteAddress());
+ }
+ }
+
+ @Override
+ public void onHeaderInput(final HttpConnection connection, final int streamId, final List extends Header> headers) {
+ }
+
+ @Override
+ public void onHeaderOutput(final HttpConnection connection, final int streamId, final List extends Header> headers) {
+ }
+
+ @Override
+ public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ printRemoteOnce(connection);
+ if (FrameType.valueOf(frame.getType()) == FrameType.PING && frame.isFlagSet(FrameFlag.ACK)) {
+ System.out.println("<< PING[ACK]");
+ pingAcksIn.incrementAndGet();
+ pingAckLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ printRemoteOnce(connection);
+ if (FrameType.valueOf(frame.getType()) == FrameType.PING && !frame.isFlagSet(FrameFlag.ACK)) {
+ System.out.println(">> PING");
+ pingsOut.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ @Override
+ public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> requester.close(CloseMode.GRACEFUL)));
+
+ requester.start();
+
+ final URI requestUri = new URI("http://nghttp2.org/httpbin/post");
+ final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(requestUri)
+ .setEntity("stuff")
+ .build();
+ final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>(new StringAsyncEntityConsumer());
+
+ final CountDownLatch exchangeLatch = new CountDownLatch(1);
+
+ requester.execute(new AsyncClientExchangeHandler() {
+
+ @Override
+ public void releaseResources() {
+ requestProducer.releaseResources();
+ responseConsumer.releaseResources();
+ exchangeLatch.countDown();
+ }
+
+ @Override
+ public void cancel() {
+ exchangeLatch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ exchangeLatch.countDown();
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
+ requestProducer.sendRequest(channel, httpContext);
+ }
+
+ @Override
+ public int available() {
+ return requestProducer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ requestProducer.produce(channel);
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response, final HttpContext httpContext) {
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
+ responseConsumer.consumeResponse(response, entityDetails, httpContext, null);
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ responseConsumer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) throws IOException {
+ responseConsumer.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> trailers) throws HttpException, IOException {
+ responseConsumer.streamEnd(trailers);
+ }
+
+ }, Timeout.ofSeconds(30), HttpCoreContext.create());
+
+ exchangeLatch.await();
+
+ final long waitMs = idleTime.toMilliseconds() + ackTimeout.toMilliseconds() + 500L;
+ pingAckLatch.await(waitMs, TimeUnit.MILLISECONDS);
+
+ System.out.println("keepalive: pingsOut=" + pingsOut.get() + ", pingAcksIn=" + pingAcksIn.get());
+
+ requester.close(CloseMode.GRACEFUL);
+ }
+
+}
diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java
index c85d10aa9..150382440 100644
--- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java
+++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java
@@ -57,10 +57,12 @@
import org.apache.hc.core5.http2.WritableByteChannelMock;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.config.H2Param;
+import org.apache.hc.core5.http2.config.H2PingPolicy;
import org.apache.hc.core5.http2.config.H2Setting;
import org.apache.hc.core5.http2.frame.DefaultFrameFactory;
import org.apache.hc.core5.http2.frame.FrameConsts;
import org.apache.hc.core5.http2.frame.FrameFactory;
+import org.apache.hc.core5.http2.frame.FrameFlag;
import org.apache.hc.core5.http2.frame.FrameType;
import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.frame.StreamIdGenerator;
@@ -846,28 +848,6 @@ static final class PriorityHeaderSender implements H2StreamHandler {
@Override public void releaseResources() { }
}
- // Small struct + parser to decode the frames we capture from writes
- private static final class FrameStub {
- final int type;
- final int streamId;
- FrameStub(final int type, final int streamId) { this.type = type; this.streamId = streamId; }
- }
- private static List parseFrames(final byte[] all) {
- final List out = new ArrayList<>();
- int p = 0;
- while (p + 9 <= all.length) {
- final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff);
- final int type = all[p + 3] & 0xff;
- final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16)
- | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff);
- p += 9;
- if (p + len > all.length) break;
- out.add(new FrameStub(type, sid));
- p += len;
- }
- return out;
- }
-
// 2) Client emits PRIORITY_UPDATE BEFORE HEADERS when Priority header present
@Test
void testSubmitWithPriorityHeaderEmitsPriorityUpdateBeforeHeaders() throws Exception {
@@ -1083,5 +1063,326 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception {
Assertions.assertEquals(1, timeoutEx.getStreamId());
}
+ private static byte[] encodeFrame(final RawFrame frame) throws IOException {
+ final WritableByteChannelMock writableChannel = new WritableByteChannelMock(256);
+ final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024);
+ outBuffer.write(frame, writableChannel);
+ return writableChannel.toByteArray();
+ }
+
+ private static void feedFrame(final AbstractH2StreamMultiplexer mux, final RawFrame frame) throws Exception {
+ mux.onInput(ByteBuffer.wrap(encodeFrame(frame)));
+ }
+
+ private static void completeSettingsHandshake(final AbstractH2StreamMultiplexer mux) throws Exception {
+ // Remote SETTINGS (non-ACK) -> mux replies with SETTINGS ACK and marks remoteSettingState ACKED
+ final RawFrame remoteSettings = FRAME_FACTORY.createSettings(new H2Setting[] {
+ new H2Setting(H2Param.MAX_FRAME_SIZE, FrameConsts.MIN_FRAME_SIZE)
+ });
+ feedFrame(mux, remoteSettings);
+
+ // Remote ACK of our SETTINGS -> localSettingState ACKED
+ feedFrame(mux, new RawFrame(FrameType.SETTINGS.getValue(), FrameFlag.ACK.getValue(), 0, null));
+ }
+
+ private static final class FrameStub {
+ final int type;
+ final int flags;
+ final int streamId;
+ final byte[] payload;
+
+ FrameStub(final int type, final int flags, final int streamId, final byte[] payload) {
+ this.type = type;
+ this.flags = flags;
+ this.streamId = streamId;
+ this.payload = payload;
+ }
+
+ boolean isPing() {
+ return type == FrameType.PING.getValue();
+ }
+
+ boolean isGoAway() {
+ return type == FrameType.GOAWAY.getValue();
+ }
+
+ boolean isAck() {
+ return (flags & FrameFlag.ACK.getValue()) != 0;
+ }
+ }
+
+ private static List parseFrames(final byte[] all) {
+ final List out = new ArrayList<>();
+ int p = 0;
+ while (p + 9 <= all.length) {
+ final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff);
+ final int type = all[p + 3] & 0xff;
+ final int flags = all[p + 4] & 0xff;
+ final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16)
+ | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff);
+ p += 9;
+ if (p + len > all.length) {
+ break;
+ }
+ final byte[] payload = new byte[len];
+ System.arraycopy(all, p, payload, 0, len);
+ out.add(new FrameStub(type, flags, sid, payload));
+ p += len;
+ }
+ return out;
+ }
+
+ private static byte[] concat(final List writes) {
+ final int total = writes.stream().mapToInt(a -> a.length).sum();
+ final byte[] all = new byte[total];
+ int p = 0;
+ for (final byte[] a : writes) {
+ System.arraycopy(a, 0, all, p, a.length);
+ p += a.length;
+ }
+ return all;
+ }
+
+
+ @Test
+ void testKeepAliveNotActiveBeforeSettingsHandshake() throws Exception {
+ final List writes = new ArrayList<>();
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
+ .thenAnswer(inv -> {
+ final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
+ final byte[] copy = new byte[b.remaining()];
+ b.get(copy);
+ writes.add(copy);
+ return copy.length;
+ });
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+
+ final Timeout idleTime = Timeout.ofMilliseconds(5);
+ final Timeout ackTimeout = Timeout.ofMilliseconds(5);
+
+ final H2Config h2Config = H2Config.custom()
+ .setPingPolicy(H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build())
+ .build();
+
+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
+ protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
+ httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler);
+
+ mux.onConnect();
+ writes.clear();
+
+ // BEFORE SETTINGS handshake is fully ACKed, keepalive must NOT run
+ mux.onTimeout(idleTime);
+
+ final List frames = parseFrames(concat(writes));
+ Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Must not emit PING before handshake");
+ Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Default timeout path must emit GOAWAY");
+ }
+
+ @Test
+ void testKeepAliveActivatesAfterSettingsAckedSetsIdleTimeout() throws Exception {
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))).thenReturn(0);
+
+ final Timeout idleTime = Timeout.ofMilliseconds(50);
+ final Timeout ackTimeout = Timeout.ofMilliseconds(20);
+
+ final H2Config h2Config = H2Config.custom()
+ .setPingPolicy(H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build())
+ .build();
+
+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
+ protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
+ httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler);
+
+ mux.onConnect();
+ completeSettingsHandshake(mux);
+
+ Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime));
+ Mockito.verify(protocolIOSession, Mockito.never()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout));
+ }
+
+ @Test
+ void testKeepAliveIdleTimeoutSendsPingAndSetsAckTimeout() throws Exception {
+ final List writes = new ArrayList<>();
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
+ .thenAnswer(inv -> {
+ final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
+ final byte[] copy = new byte[b.remaining()];
+ b.get(copy);
+ writes.add(copy);
+ return copy.length;
+ });
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+
+ final Timeout idleTime = Timeout.ofMilliseconds(5);
+ final Timeout ackTimeout = Timeout.ofMilliseconds(50);
+
+ final H2Config h2Config = H2Config.custom()
+ .setPingPolicy(H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build())
+ .build();
+
+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
+ protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
+ httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler);
+
+ mux.onConnect();
+ completeSettingsHandshake(mux);
+
+ writes.clear();
+ Thread.sleep(idleTime.toMilliseconds() + 10);
+
+ mux.onTimeout(idleTime);
+
+ Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout));
+
+ final List frames = parseFrames(concat(writes));
+ Assertions.assertTrue(frames.stream().anyMatch(f -> f.isPing() && !f.isAck()), "Must emit keepalive PING");
+ Assertions.assertTrue(mux.isOpen(), "Connection should still be open after sending PING");
+ }
+
+ @Test
+ void testKeepAlivePingAckReturnsToIdleTimeout() throws Exception {
+ final List writes = new ArrayList<>();
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
+ .thenAnswer(inv -> {
+ final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
+ final byte[] copy = new byte[b.remaining()];
+ b.get(copy);
+ writes.add(copy);
+ return copy.length;
+ });
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+
+ final Timeout idleTime = Timeout.ofMilliseconds(5);
+ final Timeout ackTimeout = Timeout.ofMilliseconds(50);
+
+ final H2Config h2Config = H2Config.custom()
+ .setPingPolicy(H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build())
+ .build();
+
+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
+ protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
+ httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler);
+
+ mux.onConnect();
+ completeSettingsHandshake(mux);
+
+ writes.clear();
+ Thread.sleep(idleTime.toMilliseconds() + 10);
+ mux.onTimeout(idleTime);
+
+ final List frames = parseFrames(concat(writes));
+ final FrameStub ping = frames.stream().filter(f -> f.isPing() && !f.isAck()).findFirst().orElse(null);
+ Assertions.assertNotNull(ping, "Expected a keepalive PING frame");
+ Assertions.assertEquals(8, ping.payload.length, "PING payload must be 8 bytes");
+
+ // Feed an ACK with the same 8 bytes
+ final RawFrame pingAck = new RawFrame(FrameType.PING.getValue(), FrameFlag.ACK.getValue(), 0, ByteBuffer.wrap(ping.payload));
+ feedFrame(mux, pingAck);
+
+ Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime));
+ }
+
+ @Test
+ void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception {
+ final List writes = new ArrayList<>();
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
+ .thenAnswer(inv -> {
+ final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
+ final byte[] copy = new byte[b.remaining()];
+ b.get(copy);
+ writes.add(copy);
+ return copy.length;
+ });
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+
+ final Timeout idleTime = Timeout.ofMilliseconds(5);
+ final Timeout ackTimeout = Timeout.ofMilliseconds(20);
+
+ final H2Config h2Config = H2Config.custom()
+ .setPingPolicy(H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build())
+ .build();
+
+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
+ protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
+ httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler);
+
+ mux.onConnect();
+ completeSettingsHandshake(mux);
+
+ // Ensure at least one live stream to be failed
+ final H2StreamChannel channel = mux.createChannel(1);
+ mux.createStream(channel, streamHandler);
+
+ writes.clear();
+ Thread.sleep(idleTime.toMilliseconds() + 10);
+ mux.onTimeout(idleTime); // send PING, awaiting ACK
+ writes.clear();
+
+ // No ACK arrives -> next timeout closes via keepalive path (GOAWAY + fail streams)
+ mux.onTimeout(ackTimeout);
+
+ final List frames = parseFrames(concat(writes));
+ Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Must emit GOAWAY on ping ACK timeout");
+
+ Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture());
+ Assertions.assertInstanceOf(H2StreamResetException.class, exceptionCaptor.getValue());
+
+ Assertions.assertFalse(mux.isOpen(), "Connection must not be open after keepalive shutdown");
+ }
+
+ @Test
+ void testKeepAliveDisabledNeverEmitsPing() throws Exception {
+ final List writes = new ArrayList<>();
+ Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
+ .thenAnswer(inv -> {
+ final ByteBuffer b = inv.getArgument(0, ByteBuffer.class);
+ final byte[] copy = new byte[b.remaining()];
+ b.get(copy);
+ writes.add(copy);
+ return copy.length;
+ });
+ Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
+ Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
+
+ final H2Config h2Config = H2Config.custom()
+ .setPingPolicy(H2PingPolicy.disabled())
+ .build();
+
+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
+ protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD,
+ httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler);
+
+ mux.onConnect();
+ writes.clear();
+
+ mux.onTimeout(Timeout.ofMilliseconds(1));
+
+ final List frames = parseFrames(concat(writes));
+ Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Disabled policy must never emit PING");
+ }
+
+
}
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java
new file mode 100644
index 000000000..3968c42c4
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java
@@ -0,0 +1,357 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.core5.testing.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.URIScheme;
+import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
+import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.support.BasicRequestBuilder;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.config.H2PingPolicy;
+import org.apache.hc.core5.http2.nio.AsyncPingHandler;
+import org.apache.hc.core5.http2.nio.command.PingCommand;
+import org.apache.hc.core5.reactor.Command;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.testing.extension.nio.H2TestResources;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class TestH2KeepAlivePingPolicyIT {
+
+ private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
+
+ @RegisterExtension
+ private final H2TestResources resources = new H2TestResources(URIScheme.HTTP, TIMEOUT);
+
+ @Test
+ void keepAlivePing_keepsConnectionOpenPastIdleTimeout() throws Exception {
+ final H2TestServer server = resources.server();
+ final H2TestClient client = resources.client();
+
+ server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) {
+ @Override
+ protected void handle(
+ final Message request,
+ final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(HttpStatus.SC_OK)
+ .setEntity("OK", ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ }
+ });
+
+ final Timeout idleTime = Timeout.ofMilliseconds(200);
+ final Timeout ackTimeout = Timeout.ofSeconds(2);
+
+ final H2PingPolicy pingPolicy = H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build();
+
+ final H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ .setPingPolicy(pingPolicy)
+ .build();
+
+ server.configure(h2Config);
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.configure(h2Config);
+ client.start();
+
+ final IOSession ioSession = client.requestSession(
+ new HttpHost("localhost", serverEndpoint.getPort()),
+ TIMEOUT,
+ null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+
+ // Make the inactivity timeout aggressive; keep-alive must prevent it from killing the session.
+ ioSession.setSocketTimeout(idleTime);
+
+ try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
+ final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort());
+
+ final Message r1 = executeHello(streamEndpoint, target);
+ Assertions.assertEquals(200, r1.getHead().getCode());
+ Assertions.assertEquals("OK", r1.getBody());
+
+ parkAtLeast(idleTime.toMilliseconds() * 6L);
+
+ Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open with keep-alive enabled");
+
+ final Message r2 = executeHello(streamEndpoint, target);
+ Assertions.assertEquals(200, r2.getHead().getCode());
+ Assertions.assertEquals("OK", r2.getBody());
+ }
+ }
+
+ @Test
+ void keepAlivePing_disabled_connectionClosesOnIdleTimeout() throws Exception {
+ final H2TestServer server = resources.server();
+ final H2TestClient client = resources.client();
+
+ server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) {
+ @Override
+ protected void handle(
+ final Message request,
+ final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(HttpStatus.SC_OK)
+ .setEntity("OK", ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ }
+ });
+
+ final H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ // pingPolicy is intentionally not set (disabled)
+ .build();
+
+ server.configure(h2Config);
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.configure(h2Config);
+ client.start();
+
+ final IOSession ioSession = client.requestSession(
+ new HttpHost("localhost", serverEndpoint.getPort()),
+ TIMEOUT,
+ null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+
+ final Timeout idleTimeout = Timeout.ofMilliseconds(200);
+ ioSession.setSocketTimeout(idleTimeout);
+
+ try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
+ final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort());
+
+ final Message r1 = executeHello(streamEndpoint, target);
+ Assertions.assertEquals(200, r1.getHead().getCode());
+ Assertions.assertEquals("OK", r1.getBody());
+
+ awaitTrue(() -> !ioSession.isOpen(), Timeout.ofSeconds(5), "Expected session to close without keep-alive");
+
+ Assertions.assertFalse(ioSession.isOpen(), "Expected session to close without keep-alive");
+
+ final Future> f = executeHelloAsync(streamEndpoint, target);
+ Assertions.assertThrows(ExecutionException.class, () -> f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
+ }
+ }
+
+ @Test
+ void keepAlivePing_enabled_doesNotStealAckFromExplicitPingCommand() throws Exception {
+ final H2TestServer server = resources.server();
+ final H2TestClient client = resources.client();
+
+ server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) {
+ @Override
+ protected void handle(
+ final Message request,
+ final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
+ final HttpContext context) throws IOException, HttpException {
+ responseTrigger.submitResponse(
+ AsyncResponseBuilder.create(HttpStatus.SC_OK)
+ .setEntity("OK", ContentType.TEXT_PLAIN)
+ .build(),
+ context);
+ }
+ });
+
+ final Timeout idleTime = Timeout.ofMilliseconds(100);
+ final Timeout ackTimeout = Timeout.ofSeconds(2);
+
+ final H2PingPolicy pingPolicy = H2PingPolicy.custom()
+ .setIdleTime(idleTime)
+ .setAckTimeout(ackTimeout)
+ .build();
+
+ final H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ .setPingPolicy(pingPolicy)
+ .build();
+
+ server.configure(h2Config);
+ final InetSocketAddress serverEndpoint = server.start();
+
+ client.configure(h2Config);
+ client.start();
+
+ final IOSession ioSession = client.requestSession(
+ new HttpHost("localhost", serverEndpoint.getPort()),
+ TIMEOUT,
+ null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+
+ ioSession.setSocketTimeout(idleTime);
+
+ try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
+ final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort());
+
+ // Warm-up to complete the HTTP/2 session & SETTINGS handshake.
+ final Message r1 = executeHello(streamEndpoint, target);
+ Assertions.assertEquals(200, r1.getHead().getCode());
+ Assertions.assertEquals("OK", r1.getBody());
+
+ // Give the keep-alive logic a chance to become active (no hard assumptions about socket timeout changes).
+ parkAtLeast(idleTime.toMilliseconds() * 3L);
+
+ final byte[] expected = new byte[]{0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55};
+ final CompletableFuture acked = new CompletableFuture<>();
+
+ final AsyncPingHandler handler = new AsyncPingHandler() {
+
+ @Override
+ public ByteBuffer getData() {
+ return ByteBuffer.wrap(expected).asReadOnlyBuffer();
+ }
+
+ @Override
+ public void consumeResponse(final ByteBuffer feedback) throws IOException, HttpException {
+ if (feedback == null || feedback.remaining() != expected.length) {
+ acked.completeExceptionally(new AssertionError("Unexpected ping ACK payload"));
+ return;
+ }
+ final ByteBuffer dup = feedback.slice();
+ final byte[] actual = new byte[expected.length];
+ dup.get(actual);
+ for (int i = 0; i < expected.length; i++) {
+ if (actual[i] != expected[i]) {
+ acked.completeExceptionally(new AssertionError("Ping ACK payload mismatch"));
+ return;
+ }
+ }
+ acked.complete(null);
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ acked.completeExceptionally(cause);
+ }
+
+ @Override
+ public void cancel() {
+ acked.cancel(false);
+ }
+ };
+
+ ioSession.enqueue(new PingCommand(handler), Command.Priority.NORMAL);
+ ioSession.setEvent(SelectionKey.OP_WRITE);
+
+ try {
+ acked.get(5, TimeUnit.SECONDS);
+ } catch (final TimeoutException ex) {
+ Assertions.fail("Timed out waiting for explicit PING ACK");
+ }
+
+ // Still usable.
+ Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open");
+ final Message r2 = executeHello(streamEndpoint, target);
+ Assertions.assertEquals(200, r2.getHead().getCode());
+ Assertions.assertEquals("OK", r2.getBody());
+ }
+ }
+
+ @Test
+ void keepAlivePingPolicy_rejectsDisabledAckTimeoutWhenIdleEnabled() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> H2PingPolicy.custom()
+ .setIdleTime(Timeout.ofSeconds(1))
+ .setAckTimeout(Timeout.DISABLED)
+ .build());
+ }
+
+ private static Message executeHello(
+ final ClientSessionEndpoint endpoint,
+ final HttpHost target) throws Exception {
+ final Future> f = executeHelloAsync(endpoint, target);
+ return f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ }
+
+ private static Future> executeHelloAsync(
+ final ClientSessionEndpoint endpoint,
+ final HttpHost target) {
+
+ final org.apache.hc.core5.http.message.BasicHttpRequest request = BasicRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath("/hello")
+ .build();
+
+ return endpoint.execute(
+ new BasicRequestProducer(request, null),
+ new BasicResponseConsumer(new StringAsyncEntityConsumer()),
+ null);
+ }
+
+ private static void parkAtLeast(final long millis) {
+ final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis);
+ while (System.nanoTime() < deadlineNanos) {
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
+ }
+ }
+
+ private interface Condition {
+ boolean get();
+ }
+
+ private static void awaitTrue(final Condition condition, final Timeout timeout, final String message) {
+ final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout.toMilliseconds());
+ while (System.nanoTime() < deadlineNanos) {
+ if (condition.get()) {
+ return;
+ }
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
+ }
+ Assertions.fail(message);
+ }
+
+}