diff --git a/.github/workflows/triage-issues.yml b/.github/workflows/triage-issues.yml
index 85ccb72aa..cf13d3afc 100644
--- a/.github/workflows/triage-issues.yml
+++ b/.github/workflows/triage-issues.yml
@@ -16,7 +16,7 @@ jobs:
issues: write
pull-requests: write
steps:
- - uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0
+ - uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1
with:
days-before-issue-stale: 30
days-before-issue-close: 10
diff --git a/.github/workflows/unit-tests-jdk-14.yml b/.github/workflows/unit-tests-jdk-14.yml
index d45537c4d..241dc5e17 100644
--- a/.github/workflows/unit-tests-jdk-14.yml
+++ b/.github/workflows/unit-tests-jdk-14.yml
@@ -16,11 +16,11 @@ jobs:
permissions:
contents: read
steps:
- - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install JDK
- uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
java-version: ${{ matrix.java-version }}
distribution: "adopt"
@@ -30,6 +30,6 @@ jobs:
env:
SKIP_UNSTABLE_TESTS: 1
- name: Upload coverage to Codecov
- uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
with:
token: ${{ secrets.CODECOV_TOKEN }}
diff --git a/.github/workflows/unit-tests-jdk-17.yml b/.github/workflows/unit-tests-jdk-17.yml
index 047c85fff..72c2296f5 100644
--- a/.github/workflows/unit-tests-jdk-17.yml
+++ b/.github/workflows/unit-tests-jdk-17.yml
@@ -16,11 +16,11 @@ jobs:
permissions:
contents: read
steps:
- - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install JDK
- uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
java-version: ${{ matrix.java-version }}
distribution: "adopt"
diff --git a/.github/workflows/unit-tests-jdk-8.yml b/.github/workflows/unit-tests-jdk-8.yml
index 8df5a23c5..522a79ec8 100644
--- a/.github/workflows/unit-tests-jdk-8.yml
+++ b/.github/workflows/unit-tests-jdk-8.yml
@@ -16,11 +16,11 @@ jobs:
permissions:
contents: read
steps:
- - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install JDK
- uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
java-version: ${{ matrix.java-version }}
distribution: "adopt"
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamHelper.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamHelper.java
new file mode 100644
index 000000000..5408cc11b
--- /dev/null
+++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamHelper.java
@@ -0,0 +1,185 @@
+package com.slack.api.methods;
+
+import com.slack.api.methods.request.chat.ChatAppendStreamRequest;
+import com.slack.api.methods.request.chat.ChatStartStreamRequest;
+import com.slack.api.methods.request.chat.ChatStopStreamRequest;
+import com.slack.api.methods.response.chat.ChatAppendStreamResponse;
+import com.slack.api.methods.response.chat.ChatStartStreamResponse;
+import com.slack.api.methods.response.chat.ChatStopStreamResponse;
+import com.slack.api.model.Message;
+import com.slack.api.model.block.LayoutBlock;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async variant of {@link ChatStreamHelper} for {@link AsyncMethodsClient}.
+ *
+ * This helper buffers markdown text and flushes via chat.startStream / chat.appendStream, then finalizes via
+ * chat.stopStream.
+ *
+ *
+ */
+@Data
+@Slf4j
+@Builder
+public class AsyncChatStreamHelper {
+
+ public enum State {
+ STARTING,
+ IN_PROGRESS,
+ COMPLETED
+ }
+
+ private final AsyncMethodsClient client;
+ private final String channel;
+ private final String threadTs;
+ private final String recipientTeamId;
+ private final String recipientUserId;
+
+ @Builder.Default
+ private final int bufferSize = 256;
+
+ @Builder.Default
+ private StringBuilder buffer = new StringBuilder();
+ @Builder.Default
+ private State state = State.STARTING;
+ private String streamTs;
+
+ /**
+ * Append text to the stream.
+ *
+ * @param markdownText markdown text to append
+ * @return a future that completes with a response if the buffer was flushed; completes with null if buffering
+ */
+ public CompletableFuture append(String markdownText) {
+ if (state == State.COMPLETED) {
+ CompletableFuture f = new CompletableFuture<>();
+ f.completeExceptionally(new SlackChatStreamException("Cannot append to stream: stream state is " + state));
+ return f;
+ }
+
+ buffer.append(markdownText);
+
+ if (buffer.length() >= bufferSize) {
+ return flushBuffer();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("AsyncChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " +
+ "recipientTeamId={}, recipientUserId={}, threadTs={}",
+ buffer.length(), bufferSize, channel, recipientTeamId, recipientUserId, threadTs);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ public CompletableFuture stop() {
+ return stop(null, null, null);
+ }
+
+ public CompletableFuture stop(String markdownText) {
+ return stop(markdownText, null, null);
+ }
+
+ public CompletableFuture stop(
+ String markdownText,
+ List blocks,
+ Message.Metadata metadata
+ ) {
+ if (state == State.COMPLETED) {
+ CompletableFuture f = new CompletableFuture<>();
+ f.completeExceptionally(new SlackChatStreamException("Cannot stop stream: stream state is " + state));
+ return f;
+ }
+
+ if (markdownText != null) {
+ buffer.append(markdownText);
+ }
+
+ CompletableFuture ensureStarted;
+ if (streamTs == null) {
+ ensureStarted = client.chatStartStream(ChatStartStreamRequest.builder()
+ .channel(channel)
+ .threadTs(threadTs)
+ .recipientTeamId(recipientTeamId)
+ .recipientUserId(recipientUserId)
+ .build())
+ .thenApply(startResponse -> {
+ if (!startResponse.isOk() || startResponse.getTs() == null) {
+ SlackChatStreamException ex = new SlackChatStreamException(
+ "Failed to stop stream: stream not started - " + startResponse.getError());
+ ex.setStartResponse(startResponse);
+ throw ex;
+ }
+ streamTs = startResponse.getTs();
+ state = State.IN_PROGRESS;
+ return null;
+ });
+ } else {
+ ensureStarted = CompletableFuture.completedFuture(null);
+ }
+
+ return ensureStarted.thenCompose(ignored -> client.chatStopStream(ChatStopStreamRequest.builder()
+ .channel(channel)
+ .ts(streamTs)
+ .markdownText(buffer.toString())
+ .blocks(blocks)
+ .metadata(metadata)
+ .build())
+ .thenApply(resp -> {
+ state = State.COMPLETED;
+ return resp;
+ }));
+ }
+
+ private CompletableFuture flushBuffer() {
+ if (streamTs == null) {
+ return client.chatStartStream(ChatStartStreamRequest.builder()
+ .channel(channel)
+ .threadTs(threadTs)
+ .recipientTeamId(recipientTeamId)
+ .recipientUserId(recipientUserId)
+ .markdownText(buffer.toString())
+ .build())
+ .thenApply(startResponse -> {
+ if (!startResponse.isOk()) {
+ SlackChatStreamException ex = new SlackChatStreamException(
+ "Failed to start stream: " + startResponse.getError());
+ ex.setStartResponse(startResponse);
+ throw ex;
+ }
+ streamTs = startResponse.getTs();
+ state = State.IN_PROGRESS;
+ ChatAppendStreamResponse synth = new ChatAppendStreamResponse();
+ synth.setOk(startResponse.isOk());
+ synth.setChannel(startResponse.getChannel());
+ synth.setTs(startResponse.getTs());
+ synth.setWarning(startResponse.getWarning());
+ synth.setError(startResponse.getError());
+ buffer.setLength(0);
+ return synth;
+ });
+ } else {
+ return client.chatAppendStream(ChatAppendStreamRequest.builder()
+ .channel(channel)
+ .ts(streamTs)
+ .markdownText(buffer.toString())
+ .build())
+ .thenApply(resp -> {
+ if (!resp.isOk()) {
+ SlackChatStreamException ex = new SlackChatStreamException(
+ "Failed to append to stream: " + resp.getError());
+ ex.getAppendResponses().add(resp);
+ throw ex;
+ }
+ buffer.setLength(0);
+ return resp;
+ });
+ }
+ }
+}
+
+
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java
index 7566d7643..d94648044 100644
--- a/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java
+++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java
@@ -1016,7 +1016,9 @@ CompletableFuture
CompletableFuture chatStopStream(ChatStopStreamRequest req);
CompletableFuture chatStopStream(RequestConfigurator req);
-
+
+ AsyncChatStreamHelper asyncChatStreamHelper(RequestConfigurator req);
+
CompletableFuture chatUpdate(ChatUpdateRequest req);
CompletableFuture chatUpdate(RequestConfigurator req);
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamHelper.java b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamHelper.java
new file mode 100644
index 000000000..9ef290661
--- /dev/null
+++ b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamHelper.java
@@ -0,0 +1,250 @@
+package com.slack.api.methods;
+
+import com.slack.api.methods.request.chat.ChatAppendStreamRequest;
+import com.slack.api.methods.request.chat.ChatStartStreamRequest;
+import com.slack.api.methods.request.chat.ChatStopStreamRequest;
+import com.slack.api.methods.response.chat.ChatAppendStreamResponse;
+import com.slack.api.methods.response.chat.ChatStartStreamResponse;
+import com.slack.api.methods.response.chat.ChatStopStreamResponse;
+import com.slack.api.model.Message;
+import com.slack.api.model.block.LayoutBlock;
+import lombok.Builder;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A helper class for streaming markdown text into a conversation using the chat streaming APIs.
+ *
+ * This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API
+ * methods, with automatic buffering and state management.
+ *
+ * Typical usage is to use the {@link MethodsClient#chatStreamHelper} method:
+ *
+ *
+ * {@code
+ * MethodsClient client = Slack.getInstance().methods(token);
+ * ChatStreamHelper stream = client.chatStreamHelper(req -> req
+ * .channel("C0123456789")
+ * .threadTs("1700000001.123456")
+ * .recipientTeamId("T0123456789")
+ * .recipientUserId("U0123456789")
+ * .bufferSize(100));
+ *
+ * stream.append("**hello wo");
+ * stream.append("rld!**");
+ * ChatStopStreamResponse response = stream.stop();
+ * }
+ *
+ */
+@Data
+@Slf4j
+@Builder
+public class ChatStreamHelper {
+
+ /**
+ * The state of the chat stream.
+ */
+ public enum State {
+ STARTING,
+ IN_PROGRESS,
+ COMPLETED
+ }
+
+ private final MethodsClient client;
+ private final String channel;
+ private final String threadTs;
+ private final String recipientTeamId;
+ private final String recipientUserId;
+
+ /**
+ * The length of markdown_text to buffer in-memory before calling a method.
+ * Increasing this value decreases the number of method calls made for the same amount of text,
+ * which is useful to avoid rate limits.
+ * Default is 100.
+ */
+ @Builder.Default
+ private final int bufferSize = 256;
+
+ // Mutable state (not thread-safe)
+ @Builder.Default
+ private StringBuilder buffer = new StringBuilder();
+ @Builder.Default
+ private State state = State.STARTING;
+ private String streamTs;
+
+ /**
+ * Append text to the stream.
+ *
+ * This method can be called multiple times. After the stream is stopped, this method cannot be called.
+ *
+ * @param markdownText Accepts message text formatted in markdown. Limit this field to 12,000 characters.
+ * This text is what will be appended to the message received so far.
+ * @return ChatAppendStreamResponse if the buffer was flushed, null if buffering
+ * @throws SlackChatStreamException if the stream is already completed or an API error occurs
+ * @throws IOException if a network error occurs
+ * @throws SlackApiException if a Slack API error occurs
+ */
+ public ChatAppendStreamResponse append(String markdownText) throws IOException, SlackApiException {
+ if (state == State.COMPLETED) {
+ throw new SlackChatStreamException("Cannot append to stream: stream state is " + state);
+ }
+
+ buffer.append(markdownText);
+
+ if (buffer.length() >= bufferSize) {
+ return flushBuffer();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("ChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " +
+ "recipientTeamId={}, recipientUserId={}, threadTs={}",
+ buffer.length(), bufferSize, channel, recipientTeamId, recipientUserId, threadTs);
+ }
+
+ return null;
+ }
+
+ /**
+ * Stop the stream and finalize the message.
+ *
+ * @return ChatStopStreamResponse from the chat.stopStream API call
+ * @throws SlackChatStreamException if the stream is already completed or an API error occurs
+ * @throws IOException if a network error occurs
+ * @throws SlackApiException if a Slack API error occurs
+ */
+ public ChatStopStreamResponse stop() throws IOException, SlackApiException {
+ return stop(null, null, null);
+ }
+
+ /**
+ * Stop the stream and finalize the message.
+ *
+ * @param markdownText Additional text to append before stopping
+ * @return ChatStopStreamResponse from the chat.stopStream API call
+ * @throws SlackChatStreamException if the stream is already completed or an API error occurs
+ * @throws IOException if a network error occurs
+ * @throws SlackApiException if a Slack API error occurs
+ */
+ public ChatStopStreamResponse stop(String markdownText) throws IOException, SlackApiException {
+ return stop(markdownText, null, null);
+ }
+
+ /**
+ * Stop the stream and finalize the message.
+ *
+ * @param markdownText Additional text to append before stopping (can be null)
+ * @param blocks A list of blocks that will be rendered at the bottom of the finalized message (can be null)
+ * @param metadata Metadata to attach to the message (can be null)
+ * @return ChatStopStreamResponse from the chat.stopStream API call
+ * @throws SlackChatStreamException if the stream is already completed or an API error occurs
+ * @throws IOException if a network error occurs
+ * @throws SlackApiException if a Slack API error occurs
+ */
+ public ChatStopStreamResponse stop(
+ String markdownText,
+ List blocks,
+ Message.Metadata metadata
+ ) throws IOException, SlackApiException {
+ if (state == State.COMPLETED) {
+ throw new SlackChatStreamException("Cannot stop stream: stream state is " + state);
+ }
+
+ if (markdownText != null) {
+ buffer.append(markdownText);
+ }
+
+ // If the stream hasn't started yet, start it first
+ if (streamTs == null) {
+ ChatStartStreamResponse startResponse = client.chatStartStream(ChatStartStreamRequest.builder()
+ .channel(channel)
+ .threadTs(threadTs)
+ .recipientTeamId(recipientTeamId)
+ .recipientUserId(recipientUserId)
+ .build());
+
+ if (!startResponse.isOk() || startResponse.getTs() == null) {
+ SlackChatStreamException ex = new SlackChatStreamException(
+ "Failed to stop stream: stream not started - " + startResponse.getError());
+ ex.setStartResponse(startResponse);
+ throw ex;
+ }
+
+ streamTs = startResponse.getTs();
+ state = State.IN_PROGRESS;
+ }
+
+ ChatStopStreamResponse response = client.chatStopStream(ChatStopStreamRequest.builder()
+ .channel(channel)
+ .ts(streamTs)
+ .markdownText(buffer.toString())
+ .blocks(blocks)
+ .metadata(metadata)
+ .build());
+
+ state = State.COMPLETED;
+ return response;
+ }
+
+ /**
+ * Flush the internal buffer by making appropriate API calls.
+ *
+ * @return ChatAppendStreamResponse from the API call (or a synthesized response for the first call)
+ * @throws IOException if a network error occurs
+ * @throws SlackApiException if a Slack API error occurs
+ */
+ private ChatAppendStreamResponse flushBuffer() throws IOException, SlackApiException {
+ ChatAppendStreamResponse response;
+
+ if (streamTs == null) {
+ // First flush - start the stream
+ ChatStartStreamResponse startResponse = client.chatStartStream(ChatStartStreamRequest.builder()
+ .channel(channel)
+ .threadTs(threadTs)
+ .recipientTeamId(recipientTeamId)
+ .recipientUserId(recipientUserId)
+ .markdownText(buffer.toString())
+ .build());
+
+ if (!startResponse.isOk()) {
+ SlackChatStreamException ex = new SlackChatStreamException(
+ "Failed to start stream: " + startResponse.getError());
+ ex.setStartResponse(startResponse);
+ throw ex;
+ }
+
+ streamTs = startResponse.getTs();
+ state = State.IN_PROGRESS;
+
+ // Create a response object to return (mimicking the append response structure)
+ response = new ChatAppendStreamResponse();
+ response.setOk(startResponse.isOk());
+ response.setChannel(startResponse.getChannel());
+ response.setTs(startResponse.getTs());
+ response.setWarning(startResponse.getWarning());
+ response.setError(startResponse.getError());
+ } else {
+ // Subsequent flush - append to stream
+ response = client.chatAppendStream(ChatAppendStreamRequest.builder()
+ .channel(channel)
+ .ts(streamTs)
+ .markdownText(buffer.toString())
+ .build());
+
+ if (!response.isOk()) {
+ SlackChatStreamException ex = new SlackChatStreamException(
+ "Failed to append to stream: " + response.getError());
+ ex.getAppendResponses().add(response);
+ throw ex;
+ }
+ }
+
+ // Clear the buffer
+ buffer.setLength(0);
+ return response;
+ }
+}
+
+
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java b/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java
index 784d2e969..a6694c709 100644
--- a/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java
+++ b/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java
@@ -1660,6 +1660,8 @@ ChatScheduleMessageResponse chatScheduleMessage(
ChatStopStreamResponse chatStopStream(RequestConfigurator req) throws IOException, SlackApiException;
+ ChatStreamHelper chatStreamHelper(RequestConfigurator req);
+
ChatUpdateResponse chatUpdate(ChatUpdateRequest req) throws IOException, SlackApiException;
ChatUpdateResponse chatUpdate(RequestConfigurator req)
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java b/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java
new file mode 100644
index 000000000..fe7c9a181
--- /dev/null
+++ b/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java
@@ -0,0 +1,33 @@
+package com.slack.api.methods;
+
+import com.slack.api.methods.response.chat.ChatAppendStreamResponse;
+import com.slack.api.methods.response.chat.ChatStartStreamResponse;
+import com.slack.api.methods.response.chat.ChatStopStreamResponse;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents an error that occurred during chat streaming operations.
+ */
+@Data
+@Slf4j
+@EqualsAndHashCode(callSuper = false)
+public class SlackChatStreamException extends RuntimeException {
+
+ private ChatStartStreamResponse startResponse;
+ private final List appendResponses = new ArrayList<>();
+ private ChatStopStreamResponse stopResponse;
+
+ public SlackChatStreamException(String message) {
+ super(message);
+ }
+
+ public SlackChatStreamException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
+
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java b/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java
index a4d0b64a2..457fab752 100644
--- a/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java
+++ b/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java
@@ -2,6 +2,7 @@
import com.slack.api.RequestConfigurator;
import com.slack.api.SlackConfig;
+import com.slack.api.methods.AsyncChatStreamHelper;
import com.slack.api.methods.AsyncMethodsClient;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.SlackApiRequest;
@@ -1741,6 +1742,11 @@ public CompletableFuture chatStopStream(RequestConfigura
return chatStopStream(req.configure(ChatStopStreamRequest.builder()).build());
}
+ @Override
+ public AsyncChatStreamHelper asyncChatStreamHelper(RequestConfigurator req) {
+ return req.configure(AsyncChatStreamHelper.builder().client(this)).build();
+ }
+
@Override
public CompletableFuture chatUpdate(ChatUpdateRequest req) {
return executor.execute(CHAT_UPDATE, toMap(req), () -> methods.chatUpdate(req));
diff --git a/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java b/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java
index ac9fcf215..f1ded831c 100644
--- a/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java
+++ b/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java
@@ -1976,6 +1976,11 @@ public ChatStopStreamResponse chatStopStream(RequestConfigurator req){
+ return req.configure(ChatStreamHelper.builder().client(this)).build();
+ }
+
@Override
public ChatUpdateResponse chatUpdate(ChatUpdateRequest req) throws IOException, SlackApiException {
return postFormWithTokenAndParseResponse(toForm(req), Methods.CHAT_UPDATE, getToken(req), ChatUpdateResponse.class);
diff --git a/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamHelperTest.java b/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamHelperTest.java
new file mode 100644
index 000000000..fc36feec3
--- /dev/null
+++ b/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamHelperTest.java
@@ -0,0 +1,193 @@
+package test_locally.api.methods;
+
+import com.slack.api.Slack;
+import com.slack.api.SlackConfig;
+import com.slack.api.methods.AsyncChatStreamHelper;
+import com.slack.api.methods.SlackChatStreamException;
+import com.slack.api.methods.response.chat.ChatAppendStreamResponse;
+import com.slack.api.methods.response.chat.ChatStopStreamResponse;
+import com.slack.api.model.Message;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import util.MockSlackApiServer;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static com.slack.api.model.block.Blocks.section;
+import static com.slack.api.model.block.composition.BlockCompositions.plainText;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static util.MockSlackApi.ValidToken;
+
+public class AsyncChatStreamHelperTest {
+
+ private final MockSlackApiServer server = new MockSlackApiServer();
+ private final SlackConfig config = new SlackConfig();
+ private final Slack slack = Slack.getInstance(config);
+
+ @Before
+ public void setup() throws Exception {
+ server.start();
+ config.setMethodsEndpointUrlPrefix(server.getMethodsEndpointPrefix());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void append_buffers_when_under_bufferSize() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(256));
+
+ ChatAppendStreamResponse resp = stream.append("hello").get();
+ assertThat(resp, is(nullValue()));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.STARTING));
+ assertThat(stream.getStreamTs(), is(nullValue()));
+ assertThat(stream.getBuffer().toString(), is("hello"));
+ }
+
+ @Test
+ public void append_flushes_and_starts_stream_on_first_flush() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(3));
+
+ ChatAppendStreamResponse resp = stream.append("hey").get(); // triggers flush
+ assertThat(resp.isOk(), is(true));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.IN_PROGRESS));
+ assertThat(stream.getStreamTs(), is("0000000000.000000"));
+ assertThat(stream.getBuffer().toString(), is(""));
+ }
+
+ @Test
+ public void stop_completes() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.append("hello").get(); // buffered only
+ ChatStopStreamResponse stop = stream.stop().get();
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED));
+ assertThat(stream.getStreamTs(), is("0000000000.000000"));
+ }
+
+ @Test(expected = SlackChatStreamException.class)
+ public void append_throws_after_completed() throws Throwable {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.stop().get();
+ try {
+ stream.append("nope").get();
+ } catch (Exception e) {
+ // unwrap ExecutionException / CompletionException
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ throw cause;
+ }
+ }
+
+ @Test(expected = SlackChatStreamException.class)
+ public void stop_throws_after_completed() throws Throwable {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.stop().get();
+ try {
+ stream.stop().get(); // second stop should throw
+ } catch (ExecutionException e) {
+ throw e.getCause();
+ }
+ }
+
+ @Test
+ public void stop_with_additional_markdown_text() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.append("hello ").get();
+ ChatStopStreamResponse stop = stream.stop("world!").get();
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED));
+ assertThat(stream.getBuffer().toString(), is("hello world!"));
+ }
+
+ @Test
+ public void stop_with_blocks_and_metadata() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ Message.Metadata metadata = new Message.Metadata();
+ metadata.setEventType("test_event");
+ metadata.setEventPayload(Collections.singletonMap("key", "value"));
+
+ ChatStopStreamResponse stop = stream.stop(
+ "final text",
+ Collections.singletonList(section(s -> s.text(plainText("Block text")))),
+ metadata
+ ).get();
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED));
+ }
+
+ @Test
+ public void stop_after_stream_already_started() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1)); // force immediate flush
+
+ // Start the stream via append
+ ChatAppendStreamResponse appendResp = stream.append("a").get();
+ assertThat(appendResp.isOk(), is(true));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.IN_PROGRESS));
+ assertThat(stream.getStreamTs(), is(notNullValue()));
+
+ // Now stop - should not call startStream again
+ ChatStopStreamResponse stop = stream.stop().get();
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED));
+ }
+
+ @Test
+ public void default_buffer_size_is_256() {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123"));
+
+ assertThat(stream.getBufferSize(), is(256));
+ }
+
+ @Test
+ public void multiple_appends_accumulate_in_buffer() throws Exception {
+ AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req
+ .channel("C123")
+ .bufferSize(256));
+
+ stream.append("hello").get();
+ stream.append(" ").get();
+ stream.append("world").get();
+
+ assertThat(stream.getBuffer().toString(), is("hello world"));
+ assertThat(stream.getState(), is(AsyncChatStreamHelper.State.STARTING));
+ }
+}
+
+
diff --git a/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamHelperTest.java b/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamHelperTest.java
new file mode 100644
index 000000000..d58ff7e2f
--- /dev/null
+++ b/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamHelperTest.java
@@ -0,0 +1,202 @@
+package test_locally.api.methods;
+
+import com.slack.api.Slack;
+import com.slack.api.SlackConfig;
+import com.slack.api.methods.ChatStreamHelper;
+import com.slack.api.methods.SlackChatStreamException;
+import com.slack.api.methods.response.chat.ChatAppendStreamResponse;
+import com.slack.api.methods.response.chat.ChatStopStreamResponse;
+import com.slack.api.model.Message;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import util.MockSlackApiServer;
+
+import java.util.Collections;
+
+import static com.slack.api.model.block.Blocks.section;
+import static com.slack.api.model.block.composition.BlockCompositions.plainText;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static util.MockSlackApi.ValidToken;
+
+public class ChatStreamHelperTest {
+
+ private final MockSlackApiServer server = new MockSlackApiServer();
+ private final SlackConfig config = new SlackConfig();
+ private final Slack slack = Slack.getInstance(config);
+
+ @Before
+ public void setup() throws Exception {
+ server.start();
+ config.setMethodsEndpointUrlPrefix(server.getMethodsEndpointPrefix());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void append_buffers_when_under_bufferSize() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(256));
+
+ ChatAppendStreamResponse resp = stream.append("hello");
+ assertThat(resp, is(nullValue()));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.STARTING));
+ assertThat(stream.getStreamTs(), is(nullValue()));
+ assertThat(stream.getBuffer().toString(), is("hello"));
+ }
+
+ @Test
+ public void append_flushes_and_starts_stream_on_first_flush() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(3));
+
+ ChatAppendStreamResponse resp = stream.append("hey"); // triggers flush
+ assertThat(resp.isOk(), is(true));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS));
+ assertThat(stream.getStreamTs(), is("0000000000.000000"));
+ assertThat(stream.getBuffer().toString(), is(""));
+ }
+
+ @Test
+ public void append_flushes_with_appendStream_after_started() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1));
+
+ // first flush uses chat.startStream
+ ChatAppendStreamResponse first = stream.append("a");
+ assertThat(first.isOk(), is(true));
+ assertThat(stream.getStreamTs(), is("0000000000.000000"));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS));
+
+ // second flush uses chat.appendStream
+ ChatAppendStreamResponse second = stream.append("b");
+ assertThat(second.isOk(), is(true));
+ assertThat(stream.getStreamTs(), is("0000000000.000000"));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS));
+ }
+
+ @Test
+ public void stop_starts_stream_if_needed_and_completes() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.append("hello"); // buffered only
+ ChatStopStreamResponse stop = stream.stop();
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED));
+ assertThat(stream.getStreamTs(), is("0000000000.000000"));
+ }
+
+ @Test(expected = SlackChatStreamException.class)
+ public void append_throws_after_completed() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.stop();
+ stream.append("nope");
+ }
+
+ @Test(expected = SlackChatStreamException.class)
+ public void stop_throws_after_completed() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.stop();
+ stream.stop(); // second stop should throw
+ }
+
+ @Test
+ public void stop_with_additional_markdown_text() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ stream.append("hello ");
+ ChatStopStreamResponse stop = stream.stop("world!");
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED));
+ assertThat(stream.getBuffer().toString(), is("hello world!"));
+ }
+
+ @Test
+ public void stop_with_blocks_and_metadata() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1000));
+
+ Message.Metadata metadata = new Message.Metadata();
+ metadata.setEventType("test_event");
+ metadata.setEventPayload(Collections.singletonMap("key", "value"));
+
+ ChatStopStreamResponse stop = stream.stop(
+ "final text",
+ Collections.singletonList(section(s -> s.text(plainText("Block text")))),
+ metadata
+ );
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED));
+ }
+
+ @Test
+ public void stop_after_stream_already_started() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .threadTs("123.123")
+ .bufferSize(1)); // force immediate flush
+
+ // Start the stream via append
+ ChatAppendStreamResponse appendResp = stream.append("a");
+ assertThat(appendResp.isOk(), is(true));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS));
+ assertThat(stream.getStreamTs(), is(notNullValue()));
+
+ // Now stop - should not call startStream again
+ ChatStopStreamResponse stop = stream.stop();
+ assertThat(stop.isOk(), is(true));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED));
+ }
+
+ @Test
+ public void default_buffer_size_is_256() {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123"));
+
+ assertThat(stream.getBufferSize(), is(256));
+ }
+
+ @Test
+ public void multiple_appends_accumulate_in_buffer() throws Exception {
+ ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req
+ .channel("C123")
+ .bufferSize(256));
+
+ stream.append("hello");
+ stream.append(" ");
+ stream.append("world");
+
+ assertThat(stream.getBuffer().toString(), is("hello world"));
+ assertThat(stream.getState(), is(ChatStreamHelper.State.STARTING));
+ }
+}
+
+
diff --git a/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java b/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java
index f6e96f2f3..442dec153 100644
--- a/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java
+++ b/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java
@@ -2,6 +2,7 @@
import com.slack.api.Slack;
import com.slack.api.methods.SlackApiException;
+import com.slack.api.methods.AsyncChatStreamHelper;
import com.slack.api.methods.request.chat.ChatPostMessageRequest;
import com.slack.api.methods.request.chat.ChatUnfurlRequest;
import com.slack.api.methods.request.chat.ChatUnfurlRequest.UnfurlMetadata;
@@ -32,6 +33,7 @@
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.Assume;
import java.io.IOException;
import java.util.*;
@@ -1160,6 +1162,41 @@ public void streamMessages() throws IOException, SlackApiException {
assertThat(stops.getError(), is(nullValue()));
}
+ @Test
+ public void streamMessages_async_helper() throws Exception {
+ AuthTestResponse auth = slack.methods(botToken).authTest(req -> req);
+ assertThat(auth.getError(), is(nullValue()));
+ loadRandomChannelId();
+ String userId = findUser();
+
+ ChatPostMessageResponse topMessage = slack.methods(botToken).chatPostMessage(req -> req
+ .channel(randomChannelId)
+ .text("Get ready to stream a response in thread! (async helper)"));
+ assertThat(topMessage.getError(), is(nullValue()));
+
+ AsyncChatStreamHelper stream = slack.methodsAsync(botToken).asyncChatStreamHelper(req -> req
+ .channel(randomChannelId)
+ .threadTs(topMessage.getTs())
+ .recipientUserId(userId)
+ .recipientTeamId(auth.getTeamId())
+ .bufferSize(1));
+
+ // first append -> starts stream
+ ChatAppendStreamResponse first = stream.append("hello").get();
+ assertThat(first.isOk(), is(true));
+ assertThat(first.getError(), is(nullValue()));
+
+ // second append -> appendStream
+ ChatAppendStreamResponse second = stream.append(" world").get();
+ assertThat(second.isOk(), is(true));
+ assertThat(second.getError(), is(nullValue()));
+
+ // stop -> stopStream
+ ChatStopStreamResponse stops = stream.stop().get();
+ assertThat(stops.isOk(), is(true));
+ assertThat(stops.getError(), is(nullValue()));
+ }
+
// https://github.com/slackapi/java-slack-sdk/issues/415
@Test
public void attachmentsWithBlocks_issue_415() throws IOException, SlackApiException {