diff --git a/.github/workflows/triage-issues.yml b/.github/workflows/triage-issues.yml index 85ccb72aa..cf13d3afc 100644 --- a/.github/workflows/triage-issues.yml +++ b/.github/workflows/triage-issues.yml @@ -16,7 +16,7 @@ jobs: issues: write pull-requests: write steps: - - uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0 + - uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1 with: days-before-issue-stale: 30 days-before-issue-close: 10 diff --git a/.github/workflows/unit-tests-jdk-14.yml b/.github/workflows/unit-tests-jdk-14.yml index d45537c4d..241dc5e17 100644 --- a/.github/workflows/unit-tests-jdk-14.yml +++ b/.github/workflows/unit-tests-jdk-14.yml @@ -16,11 +16,11 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Install JDK - uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: java-version: ${{ matrix.java-version }} distribution: "adopt" @@ -30,6 +30,6 @@ jobs: env: SKIP_UNSTABLE_TESTS: 1 - name: Upload coverage to Codecov - uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1 + uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2 with: token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/unit-tests-jdk-17.yml b/.github/workflows/unit-tests-jdk-17.yml index 047c85fff..72c2296f5 100644 --- a/.github/workflows/unit-tests-jdk-17.yml +++ b/.github/workflows/unit-tests-jdk-17.yml @@ -16,11 +16,11 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Install JDK - uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: java-version: ${{ matrix.java-version }} distribution: "adopt" diff --git a/.github/workflows/unit-tests-jdk-8.yml b/.github/workflows/unit-tests-jdk-8.yml index 8df5a23c5..522a79ec8 100644 --- a/.github/workflows/unit-tests-jdk-8.yml +++ b/.github/workflows/unit-tests-jdk-8.yml @@ -16,11 +16,11 @@ jobs: permissions: contents: read steps: - - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0 + - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1 with: persist-credentials: false - name: Install JDK - uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 + uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0 with: java-version: ${{ matrix.java-version }} distribution: "adopt" diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamHelper.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamHelper.java new file mode 100644 index 000000000..5408cc11b --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncChatStreamHelper.java @@ -0,0 +1,185 @@ +package com.slack.api.methods; + +import com.slack.api.methods.request.chat.ChatAppendStreamRequest; +import com.slack.api.methods.request.chat.ChatStartStreamRequest; +import com.slack.api.methods.request.chat.ChatStopStreamRequest; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import com.slack.api.model.block.LayoutBlock; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Async variant of {@link ChatStreamHelper} for {@link AsyncMethodsClient}. + *

+ * This helper buffers markdown text and flushes via chat.startStream / chat.appendStream, then finalizes via + * chat.stopStream. + *

+ * + */ +@Data +@Slf4j +@Builder +public class AsyncChatStreamHelper { + + public enum State { + STARTING, + IN_PROGRESS, + COMPLETED + } + + private final AsyncMethodsClient client; + private final String channel; + private final String threadTs; + private final String recipientTeamId; + private final String recipientUserId; + + @Builder.Default + private final int bufferSize = 256; + + @Builder.Default + private StringBuilder buffer = new StringBuilder(); + @Builder.Default + private State state = State.STARTING; + private String streamTs; + + /** + * Append text to the stream. + * + * @param markdownText markdown text to append + * @return a future that completes with a response if the buffer was flushed; completes with null if buffering + */ + public CompletableFuture append(String markdownText) { + if (state == State.COMPLETED) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new SlackChatStreamException("Cannot append to stream: stream state is " + state)); + return f; + } + + buffer.append(markdownText); + + if (buffer.length() >= bufferSize) { + return flushBuffer(); + } + + if (log.isDebugEnabled()) { + log.debug("AsyncChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " + + "recipientTeamId={}, recipientUserId={}, threadTs={}", + buffer.length(), bufferSize, channel, recipientTeamId, recipientUserId, threadTs); + } + return CompletableFuture.completedFuture(null); + } + + public CompletableFuture stop() { + return stop(null, null, null); + } + + public CompletableFuture stop(String markdownText) { + return stop(markdownText, null, null); + } + + public CompletableFuture stop( + String markdownText, + List blocks, + Message.Metadata metadata + ) { + if (state == State.COMPLETED) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new SlackChatStreamException("Cannot stop stream: stream state is " + state)); + return f; + } + + if (markdownText != null) { + buffer.append(markdownText); + } + + CompletableFuture ensureStarted; + if (streamTs == null) { + ensureStarted = client.chatStartStream(ChatStartStreamRequest.builder() + .channel(channel) + .threadTs(threadTs) + .recipientTeamId(recipientTeamId) + .recipientUserId(recipientUserId) + .build()) + .thenApply(startResponse -> { + if (!startResponse.isOk() || startResponse.getTs() == null) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to stop stream: stream not started - " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + streamTs = startResponse.getTs(); + state = State.IN_PROGRESS; + return null; + }); + } else { + ensureStarted = CompletableFuture.completedFuture(null); + } + + return ensureStarted.thenCompose(ignored -> client.chatStopStream(ChatStopStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(buffer.toString()) + .blocks(blocks) + .metadata(metadata) + .build()) + .thenApply(resp -> { + state = State.COMPLETED; + return resp; + })); + } + + private CompletableFuture flushBuffer() { + if (streamTs == null) { + return client.chatStartStream(ChatStartStreamRequest.builder() + .channel(channel) + .threadTs(threadTs) + .recipientTeamId(recipientTeamId) + .recipientUserId(recipientUserId) + .markdownText(buffer.toString()) + .build()) + .thenApply(startResponse -> { + if (!startResponse.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to start stream: " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + streamTs = startResponse.getTs(); + state = State.IN_PROGRESS; + ChatAppendStreamResponse synth = new ChatAppendStreamResponse(); + synth.setOk(startResponse.isOk()); + synth.setChannel(startResponse.getChannel()); + synth.setTs(startResponse.getTs()); + synth.setWarning(startResponse.getWarning()); + synth.setError(startResponse.getError()); + buffer.setLength(0); + return synth; + }); + } else { + return client.chatAppendStream(ChatAppendStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(buffer.toString()) + .build()) + .thenApply(resp -> { + if (!resp.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to append to stream: " + resp.getError()); + ex.getAppendResponses().add(resp); + throw ex; + } + buffer.setLength(0); + return resp; + }); + } + } +} + + diff --git a/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java b/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java index 7566d7643..d94648044 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/AsyncMethodsClient.java @@ -1016,7 +1016,9 @@ CompletableFuture CompletableFuture chatStopStream(ChatStopStreamRequest req); CompletableFuture chatStopStream(RequestConfigurator req); - + + AsyncChatStreamHelper asyncChatStreamHelper(RequestConfigurator req); + CompletableFuture chatUpdate(ChatUpdateRequest req); CompletableFuture chatUpdate(RequestConfigurator req); diff --git a/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamHelper.java b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamHelper.java new file mode 100644 index 000000000..9ef290661 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/ChatStreamHelper.java @@ -0,0 +1,250 @@ +package com.slack.api.methods; + +import com.slack.api.methods.request.chat.ChatAppendStreamRequest; +import com.slack.api.methods.request.chat.ChatStartStreamRequest; +import com.slack.api.methods.request.chat.ChatStopStreamRequest; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import com.slack.api.model.block.LayoutBlock; +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; + +/** + * A helper class for streaming markdown text into a conversation using the chat streaming APIs. + *

+ * This class provides a convenient interface for the chat.startStream, chat.appendStream, and chat.stopStream API + * methods, with automatic buffering and state management. + *

+ * Typical usage is to use the {@link MethodsClient#chatStreamHelper} method: + * + *

+ * {@code
+ * MethodsClient client = Slack.getInstance().methods(token);
+ * ChatStreamHelper stream = client.chatStreamHelper(req -> req
+ *     .channel("C0123456789")
+ *     .threadTs("1700000001.123456")
+ *     .recipientTeamId("T0123456789")
+ *     .recipientUserId("U0123456789")
+ *     .bufferSize(100));
+ *
+ * stream.append("**hello wo");
+ * stream.append("rld!**");
+ * ChatStopStreamResponse response = stream.stop();
+ * }
+ * 
+ */ +@Data +@Slf4j +@Builder +public class ChatStreamHelper { + + /** + * The state of the chat stream. + */ + public enum State { + STARTING, + IN_PROGRESS, + COMPLETED + } + + private final MethodsClient client; + private final String channel; + private final String threadTs; + private final String recipientTeamId; + private final String recipientUserId; + + /** + * The length of markdown_text to buffer in-memory before calling a method. + * Increasing this value decreases the number of method calls made for the same amount of text, + * which is useful to avoid rate limits. + * Default is 100. + */ + @Builder.Default + private final int bufferSize = 256; + + // Mutable state (not thread-safe) + @Builder.Default + private StringBuilder buffer = new StringBuilder(); + @Builder.Default + private State state = State.STARTING; + private String streamTs; + + /** + * Append text to the stream. + *

+ * This method can be called multiple times. After the stream is stopped, this method cannot be called. + * + * @param markdownText Accepts message text formatted in markdown. Limit this field to 12,000 characters. + * This text is what will be appended to the message received so far. + * @return ChatAppendStreamResponse if the buffer was flushed, null if buffering + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatAppendStreamResponse append(String markdownText) throws IOException, SlackApiException { + if (state == State.COMPLETED) { + throw new SlackChatStreamException("Cannot append to stream: stream state is " + state); + } + + buffer.append(markdownText); + + if (buffer.length() >= bufferSize) { + return flushBuffer(); + } + + if (log.isDebugEnabled()) { + log.debug("ChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " + + "recipientTeamId={}, recipientUserId={}, threadTs={}", + buffer.length(), bufferSize, channel, recipientTeamId, recipientUserId, threadTs); + } + + return null; + } + + /** + * Stop the stream and finalize the message. + * + * @return ChatStopStreamResponse from the chat.stopStream API call + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatStopStreamResponse stop() throws IOException, SlackApiException { + return stop(null, null, null); + } + + /** + * Stop the stream and finalize the message. + * + * @param markdownText Additional text to append before stopping + * @return ChatStopStreamResponse from the chat.stopStream API call + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatStopStreamResponse stop(String markdownText) throws IOException, SlackApiException { + return stop(markdownText, null, null); + } + + /** + * Stop the stream and finalize the message. + * + * @param markdownText Additional text to append before stopping (can be null) + * @param blocks A list of blocks that will be rendered at the bottom of the finalized message (can be null) + * @param metadata Metadata to attach to the message (can be null) + * @return ChatStopStreamResponse from the chat.stopStream API call + * @throws SlackChatStreamException if the stream is already completed or an API error occurs + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + public ChatStopStreamResponse stop( + String markdownText, + List blocks, + Message.Metadata metadata + ) throws IOException, SlackApiException { + if (state == State.COMPLETED) { + throw new SlackChatStreamException("Cannot stop stream: stream state is " + state); + } + + if (markdownText != null) { + buffer.append(markdownText); + } + + // If the stream hasn't started yet, start it first + if (streamTs == null) { + ChatStartStreamResponse startResponse = client.chatStartStream(ChatStartStreamRequest.builder() + .channel(channel) + .threadTs(threadTs) + .recipientTeamId(recipientTeamId) + .recipientUserId(recipientUserId) + .build()); + + if (!startResponse.isOk() || startResponse.getTs() == null) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to stop stream: stream not started - " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + + streamTs = startResponse.getTs(); + state = State.IN_PROGRESS; + } + + ChatStopStreamResponse response = client.chatStopStream(ChatStopStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(buffer.toString()) + .blocks(blocks) + .metadata(metadata) + .build()); + + state = State.COMPLETED; + return response; + } + + /** + * Flush the internal buffer by making appropriate API calls. + * + * @return ChatAppendStreamResponse from the API call (or a synthesized response for the first call) + * @throws IOException if a network error occurs + * @throws SlackApiException if a Slack API error occurs + */ + private ChatAppendStreamResponse flushBuffer() throws IOException, SlackApiException { + ChatAppendStreamResponse response; + + if (streamTs == null) { + // First flush - start the stream + ChatStartStreamResponse startResponse = client.chatStartStream(ChatStartStreamRequest.builder() + .channel(channel) + .threadTs(threadTs) + .recipientTeamId(recipientTeamId) + .recipientUserId(recipientUserId) + .markdownText(buffer.toString()) + .build()); + + if (!startResponse.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to start stream: " + startResponse.getError()); + ex.setStartResponse(startResponse); + throw ex; + } + + streamTs = startResponse.getTs(); + state = State.IN_PROGRESS; + + // Create a response object to return (mimicking the append response structure) + response = new ChatAppendStreamResponse(); + response.setOk(startResponse.isOk()); + response.setChannel(startResponse.getChannel()); + response.setTs(startResponse.getTs()); + response.setWarning(startResponse.getWarning()); + response.setError(startResponse.getError()); + } else { + // Subsequent flush - append to stream + response = client.chatAppendStream(ChatAppendStreamRequest.builder() + .channel(channel) + .ts(streamTs) + .markdownText(buffer.toString()) + .build()); + + if (!response.isOk()) { + SlackChatStreamException ex = new SlackChatStreamException( + "Failed to append to stream: " + response.getError()); + ex.getAppendResponses().add(response); + throw ex; + } + } + + // Clear the buffer + buffer.setLength(0); + return response; + } +} + + diff --git a/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java b/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java index 784d2e969..a6694c709 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/MethodsClient.java @@ -1660,6 +1660,8 @@ ChatScheduleMessageResponse chatScheduleMessage( ChatStopStreamResponse chatStopStream(RequestConfigurator req) throws IOException, SlackApiException; + ChatStreamHelper chatStreamHelper(RequestConfigurator req); + ChatUpdateResponse chatUpdate(ChatUpdateRequest req) throws IOException, SlackApiException; ChatUpdateResponse chatUpdate(RequestConfigurator req) diff --git a/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java b/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java new file mode 100644 index 000000000..fe7c9a181 --- /dev/null +++ b/slack-api-client/src/main/java/com/slack/api/methods/SlackChatStreamException.java @@ -0,0 +1,33 @@ +package com.slack.api.methods; + +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStartStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents an error that occurred during chat streaming operations. + */ +@Data +@Slf4j +@EqualsAndHashCode(callSuper = false) +public class SlackChatStreamException extends RuntimeException { + + private ChatStartStreamResponse startResponse; + private final List appendResponses = new ArrayList<>(); + private ChatStopStreamResponse stopResponse; + + public SlackChatStreamException(String message) { + super(message); + } + + public SlackChatStreamException(String message, Throwable cause) { + super(message, cause); + } +} + diff --git a/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java b/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java index a4d0b64a2..457fab752 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/impl/AsyncMethodsClientImpl.java @@ -2,6 +2,7 @@ import com.slack.api.RequestConfigurator; import com.slack.api.SlackConfig; +import com.slack.api.methods.AsyncChatStreamHelper; import com.slack.api.methods.AsyncMethodsClient; import com.slack.api.methods.MethodsClient; import com.slack.api.methods.SlackApiRequest; @@ -1741,6 +1742,11 @@ public CompletableFuture chatStopStream(RequestConfigura return chatStopStream(req.configure(ChatStopStreamRequest.builder()).build()); } + @Override + public AsyncChatStreamHelper asyncChatStreamHelper(RequestConfigurator req) { + return req.configure(AsyncChatStreamHelper.builder().client(this)).build(); + } + @Override public CompletableFuture chatUpdate(ChatUpdateRequest req) { return executor.execute(CHAT_UPDATE, toMap(req), () -> methods.chatUpdate(req)); diff --git a/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java b/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java index ac9fcf215..f1ded831c 100644 --- a/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java +++ b/slack-api-client/src/main/java/com/slack/api/methods/impl/MethodsClientImpl.java @@ -1976,6 +1976,11 @@ public ChatStopStreamResponse chatStopStream(RequestConfigurator req){ + return req.configure(ChatStreamHelper.builder().client(this)).build(); + } + @Override public ChatUpdateResponse chatUpdate(ChatUpdateRequest req) throws IOException, SlackApiException { return postFormWithTokenAndParseResponse(toForm(req), Methods.CHAT_UPDATE, getToken(req), ChatUpdateResponse.class); diff --git a/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamHelperTest.java b/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamHelperTest.java new file mode 100644 index 000000000..fc36feec3 --- /dev/null +++ b/slack-api-client/src/test/java/test_locally/api/methods/AsyncChatStreamHelperTest.java @@ -0,0 +1,193 @@ +package test_locally.api.methods; + +import com.slack.api.Slack; +import com.slack.api.SlackConfig; +import com.slack.api.methods.AsyncChatStreamHelper; +import com.slack.api.methods.SlackChatStreamException; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import util.MockSlackApiServer; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static com.slack.api.model.block.Blocks.section; +import static com.slack.api.model.block.composition.BlockCompositions.plainText; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static util.MockSlackApi.ValidToken; + +public class AsyncChatStreamHelperTest { + + private final MockSlackApiServer server = new MockSlackApiServer(); + private final SlackConfig config = new SlackConfig(); + private final Slack slack = Slack.getInstance(config); + + @Before + public void setup() throws Exception { + server.start(); + config.setMethodsEndpointUrlPrefix(server.getMethodsEndpointPrefix()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void append_buffers_when_under_bufferSize() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(256)); + + ChatAppendStreamResponse resp = stream.append("hello").get(); + assertThat(resp, is(nullValue())); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.STARTING)); + assertThat(stream.getStreamTs(), is(nullValue())); + assertThat(stream.getBuffer().toString(), is("hello")); + } + + @Test + public void append_flushes_and_starts_stream_on_first_flush() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(3)); + + ChatAppendStreamResponse resp = stream.append("hey").get(); // triggers flush + assertThat(resp.isOk(), is(true)); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.IN_PROGRESS)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + assertThat(stream.getBuffer().toString(), is("")); + } + + @Test + public void stop_completes() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.append("hello").get(); // buffered only + ChatStopStreamResponse stop = stream.stop().get(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test(expected = SlackChatStreamException.class) + public void append_throws_after_completed() throws Throwable { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop().get(); + try { + stream.append("nope").get(); + } catch (Exception e) { + // unwrap ExecutionException / CompletionException + Throwable cause = e.getCause() != null ? e.getCause() : e; + throw cause; + } + } + + @Test(expected = SlackChatStreamException.class) + public void stop_throws_after_completed() throws Throwable { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop().get(); + try { + stream.stop().get(); // second stop should throw + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void stop_with_additional_markdown_text() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.append("hello ").get(); + ChatStopStreamResponse stop = stream.stop("world!").get(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED)); + assertThat(stream.getBuffer().toString(), is("hello world!")); + } + + @Test + public void stop_with_blocks_and_metadata() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + Message.Metadata metadata = new Message.Metadata(); + metadata.setEventType("test_event"); + metadata.setEventPayload(Collections.singletonMap("key", "value")); + + ChatStopStreamResponse stop = stream.stop( + "final text", + Collections.singletonList(section(s -> s.text(plainText("Block text")))), + metadata + ).get(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED)); + } + + @Test + public void stop_after_stream_already_started() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1)); // force immediate flush + + // Start the stream via append + ChatAppendStreamResponse appendResp = stream.append("a").get(); + assertThat(appendResp.isOk(), is(true)); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.IN_PROGRESS)); + assertThat(stream.getStreamTs(), is(notNullValue())); + + // Now stop - should not call startStream again + ChatStopStreamResponse stop = stream.stop().get(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.COMPLETED)); + } + + @Test + public void default_buffer_size_is_256() { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123")); + + assertThat(stream.getBufferSize(), is(256)); + } + + @Test + public void multiple_appends_accumulate_in_buffer() throws Exception { + AsyncChatStreamHelper stream = slack.methodsAsync(ValidToken).asyncChatStreamHelper(req -> req + .channel("C123") + .bufferSize(256)); + + stream.append("hello").get(); + stream.append(" ").get(); + stream.append("world").get(); + + assertThat(stream.getBuffer().toString(), is("hello world")); + assertThat(stream.getState(), is(AsyncChatStreamHelper.State.STARTING)); + } +} + + diff --git a/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamHelperTest.java b/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamHelperTest.java new file mode 100644 index 000000000..d58ff7e2f --- /dev/null +++ b/slack-api-client/src/test/java/test_locally/api/methods/ChatStreamHelperTest.java @@ -0,0 +1,202 @@ +package test_locally.api.methods; + +import com.slack.api.Slack; +import com.slack.api.SlackConfig; +import com.slack.api.methods.ChatStreamHelper; +import com.slack.api.methods.SlackChatStreamException; +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; +import com.slack.api.methods.response.chat.ChatStopStreamResponse; +import com.slack.api.model.Message; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import util.MockSlackApiServer; + +import java.util.Collections; + +import static com.slack.api.model.block.Blocks.section; +import static com.slack.api.model.block.composition.BlockCompositions.plainText; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static util.MockSlackApi.ValidToken; + +public class ChatStreamHelperTest { + + private final MockSlackApiServer server = new MockSlackApiServer(); + private final SlackConfig config = new SlackConfig(); + private final Slack slack = Slack.getInstance(config); + + @Before + public void setup() throws Exception { + server.start(); + config.setMethodsEndpointUrlPrefix(server.getMethodsEndpointPrefix()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void append_buffers_when_under_bufferSize() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(256)); + + ChatAppendStreamResponse resp = stream.append("hello"); + assertThat(resp, is(nullValue())); + assertThat(stream.getState(), is(ChatStreamHelper.State.STARTING)); + assertThat(stream.getStreamTs(), is(nullValue())); + assertThat(stream.getBuffer().toString(), is("hello")); + } + + @Test + public void append_flushes_and_starts_stream_on_first_flush() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(3)); + + ChatAppendStreamResponse resp = stream.append("hey"); // triggers flush + assertThat(resp.isOk(), is(true)); + assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + assertThat(stream.getBuffer().toString(), is("")); + } + + @Test + public void append_flushes_with_appendStream_after_started() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1)); + + // first flush uses chat.startStream + ChatAppendStreamResponse first = stream.append("a"); + assertThat(first.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS)); + + // second flush uses chat.appendStream + ChatAppendStreamResponse second = stream.append("b"); + assertThat(second.isOk(), is(true)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS)); + } + + @Test + public void stop_starts_stream_if_needed_and_completes() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.append("hello"); // buffered only + ChatStopStreamResponse stop = stream.stop(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED)); + assertThat(stream.getStreamTs(), is("0000000000.000000")); + } + + @Test(expected = SlackChatStreamException.class) + public void append_throws_after_completed() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop(); + stream.append("nope"); + } + + @Test(expected = SlackChatStreamException.class) + public void stop_throws_after_completed() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.stop(); + stream.stop(); // second stop should throw + } + + @Test + public void stop_with_additional_markdown_text() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + stream.append("hello "); + ChatStopStreamResponse stop = stream.stop("world!"); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED)); + assertThat(stream.getBuffer().toString(), is("hello world!")); + } + + @Test + public void stop_with_blocks_and_metadata() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1000)); + + Message.Metadata metadata = new Message.Metadata(); + metadata.setEventType("test_event"); + metadata.setEventPayload(Collections.singletonMap("key", "value")); + + ChatStopStreamResponse stop = stream.stop( + "final text", + Collections.singletonList(section(s -> s.text(plainText("Block text")))), + metadata + ); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED)); + } + + @Test + public void stop_after_stream_already_started() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .threadTs("123.123") + .bufferSize(1)); // force immediate flush + + // Start the stream via append + ChatAppendStreamResponse appendResp = stream.append("a"); + assertThat(appendResp.isOk(), is(true)); + assertThat(stream.getState(), is(ChatStreamHelper.State.IN_PROGRESS)); + assertThat(stream.getStreamTs(), is(notNullValue())); + + // Now stop - should not call startStream again + ChatStopStreamResponse stop = stream.stop(); + assertThat(stop.isOk(), is(true)); + assertThat(stream.getState(), is(ChatStreamHelper.State.COMPLETED)); + } + + @Test + public void default_buffer_size_is_256() { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123")); + + assertThat(stream.getBufferSize(), is(256)); + } + + @Test + public void multiple_appends_accumulate_in_buffer() throws Exception { + ChatStreamHelper stream = slack.methods(ValidToken).chatStreamHelper(req -> req + .channel("C123") + .bufferSize(256)); + + stream.append("hello"); + stream.append(" "); + stream.append("world"); + + assertThat(stream.getBuffer().toString(), is("hello world")); + assertThat(stream.getState(), is(ChatStreamHelper.State.STARTING)); + } +} + + diff --git a/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java b/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java index f6e96f2f3..442dec153 100644 --- a/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java +++ b/slack-api-client/src/test/java/test_with_remote_apis/methods/chat_Test.java @@ -2,6 +2,7 @@ import com.slack.api.Slack; import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.AsyncChatStreamHelper; import com.slack.api.methods.request.chat.ChatPostMessageRequest; import com.slack.api.methods.request.chat.ChatUnfurlRequest; import com.slack.api.methods.request.chat.ChatUnfurlRequest.UnfurlMetadata; @@ -32,6 +33,7 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.junit.Assume; import java.io.IOException; import java.util.*; @@ -1160,6 +1162,41 @@ public void streamMessages() throws IOException, SlackApiException { assertThat(stops.getError(), is(nullValue())); } + @Test + public void streamMessages_async_helper() throws Exception { + AuthTestResponse auth = slack.methods(botToken).authTest(req -> req); + assertThat(auth.getError(), is(nullValue())); + loadRandomChannelId(); + String userId = findUser(); + + ChatPostMessageResponse topMessage = slack.methods(botToken).chatPostMessage(req -> req + .channel(randomChannelId) + .text("Get ready to stream a response in thread! (async helper)")); + assertThat(topMessage.getError(), is(nullValue())); + + AsyncChatStreamHelper stream = slack.methodsAsync(botToken).asyncChatStreamHelper(req -> req + .channel(randomChannelId) + .threadTs(topMessage.getTs()) + .recipientUserId(userId) + .recipientTeamId(auth.getTeamId()) + .bufferSize(1)); + + // first append -> starts stream + ChatAppendStreamResponse first = stream.append("hello").get(); + assertThat(first.isOk(), is(true)); + assertThat(first.getError(), is(nullValue())); + + // second append -> appendStream + ChatAppendStreamResponse second = stream.append(" world").get(); + assertThat(second.isOk(), is(true)); + assertThat(second.getError(), is(nullValue())); + + // stop -> stopStream + ChatStopStreamResponse stops = stream.stop().get(); + assertThat(stops.isOk(), is(true)); + assertThat(stops.getError(), is(nullValue())); + } + // https://github.com/slackapi/java-slack-sdk/issues/415 @Test public void attachmentsWithBlocks_issue_415() throws IOException, SlackApiException {