Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/triage-issues.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
issues: write
pull-requests: write
steps:
- uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0
- uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1
with:
days-before-issue-stale: 30
days-before-issue-close: 10
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/unit-tests-jdk-14.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ jobs:
permissions:
contents: read
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install JDK
uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
java-version: ${{ matrix.java-version }}
distribution: "adopt"
Expand All @@ -30,6 +30,6 @@ jobs:
env:
SKIP_UNSTABLE_TESTS: 1
- name: Upload coverage to Codecov
uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
with:
token: ${{ secrets.CODECOV_TOKEN }}
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests-jdk-17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ jobs:
permissions:
contents: read
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install JDK
uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
java-version: ${{ matrix.java-version }}
distribution: "adopt"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests-jdk-8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ jobs:
permissions:
contents: read
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install JDK
uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
java-version: ${{ matrix.java-version }}
distribution: "adopt"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package com.slack.api.methods;

import com.slack.api.methods.request.chat.ChatAppendStreamRequest;
import com.slack.api.methods.request.chat.ChatStartStreamRequest;
import com.slack.api.methods.request.chat.ChatStopStreamRequest;
import com.slack.api.methods.response.chat.ChatAppendStreamResponse;
import com.slack.api.methods.response.chat.ChatStartStreamResponse;
import com.slack.api.methods.response.chat.ChatStopStreamResponse;
import com.slack.api.model.Message;
import com.slack.api.model.block.LayoutBlock;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Async variant of {@link ChatStreamHelper} for {@link AsyncMethodsClient}.
* <p>
* This helper buffers markdown text and flushes via chat.startStream / chat.appendStream, then finalizes via
* chat.stopStream.
* <p>
*
*/
@Data
@Slf4j
@Builder
public class AsyncChatStreamHelper {

public enum State {
STARTING,
IN_PROGRESS,
COMPLETED
}

private final AsyncMethodsClient client;
private final String channel;
private final String threadTs;
private final String recipientTeamId;
private final String recipientUserId;

@Builder.Default
private final int bufferSize = 256;

@Builder.Default
private StringBuilder buffer = new StringBuilder();
@Builder.Default
private State state = State.STARTING;
private String streamTs;

/**
* Append text to the stream.
*
* @param markdownText markdown text to append
* @return a future that completes with a response if the buffer was flushed; completes with null if buffering
*/
public CompletableFuture<ChatAppendStreamResponse> append(String markdownText) {
if (state == State.COMPLETED) {
CompletableFuture<ChatAppendStreamResponse> f = new CompletableFuture<>();
f.completeExceptionally(new SlackChatStreamException("Cannot append to stream: stream state is " + state));
return f;
}

buffer.append(markdownText);

if (buffer.length() >= bufferSize) {
return flushBuffer();
}

if (log.isDebugEnabled()) {
log.debug("AsyncChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " +
"recipientTeamId={}, recipientUserId={}, threadTs={}",
buffer.length(), bufferSize, channel, recipientTeamId, recipientUserId, threadTs);
}
return CompletableFuture.completedFuture(null);
}

public CompletableFuture<ChatStopStreamResponse> stop() {
return stop(null, null, null);
}

public CompletableFuture<ChatStopStreamResponse> stop(String markdownText) {
return stop(markdownText, null, null);
}

public CompletableFuture<ChatStopStreamResponse> stop(
String markdownText,
List<LayoutBlock> blocks,
Message.Metadata metadata
) {
if (state == State.COMPLETED) {
CompletableFuture<ChatStopStreamResponse> f = new CompletableFuture<>();
f.completeExceptionally(new SlackChatStreamException("Cannot stop stream: stream state is " + state));
return f;
}

if (markdownText != null) {
buffer.append(markdownText);
}

CompletableFuture<Void> ensureStarted;
if (streamTs == null) {
ensureStarted = client.chatStartStream(ChatStartStreamRequest.builder()
.channel(channel)
.threadTs(threadTs)
.recipientTeamId(recipientTeamId)
.recipientUserId(recipientUserId)
.build())
.thenApply(startResponse -> {
if (!startResponse.isOk() || startResponse.getTs() == null) {
SlackChatStreamException ex = new SlackChatStreamException(
"Failed to stop stream: stream not started - " + startResponse.getError());
ex.setStartResponse(startResponse);
throw ex;
}
streamTs = startResponse.getTs();
state = State.IN_PROGRESS;
return null;
});
} else {
ensureStarted = CompletableFuture.completedFuture(null);
}

return ensureStarted.thenCompose(ignored -> client.chatStopStream(ChatStopStreamRequest.builder()
.channel(channel)
.ts(streamTs)
.markdownText(buffer.toString())
.blocks(blocks)
.metadata(metadata)
.build())
.thenApply(resp -> {
state = State.COMPLETED;
return resp;
}));
}

private CompletableFuture<ChatAppendStreamResponse> flushBuffer() {
if (streamTs == null) {
return client.chatStartStream(ChatStartStreamRequest.builder()
.channel(channel)
.threadTs(threadTs)
.recipientTeamId(recipientTeamId)
.recipientUserId(recipientUserId)
.markdownText(buffer.toString())
.build())
.thenApply(startResponse -> {
if (!startResponse.isOk()) {
SlackChatStreamException ex = new SlackChatStreamException(
"Failed to start stream: " + startResponse.getError());
ex.setStartResponse(startResponse);
throw ex;
}
streamTs = startResponse.getTs();
state = State.IN_PROGRESS;
ChatAppendStreamResponse synth = new ChatAppendStreamResponse();
synth.setOk(startResponse.isOk());
synth.setChannel(startResponse.getChannel());
synth.setTs(startResponse.getTs());
synth.setWarning(startResponse.getWarning());
synth.setError(startResponse.getError());
buffer.setLength(0);
return synth;
});
} else {
return client.chatAppendStream(ChatAppendStreamRequest.builder()
.channel(channel)
.ts(streamTs)
.markdownText(buffer.toString())
.build())
.thenApply(resp -> {
if (!resp.isOk()) {
SlackChatStreamException ex = new SlackChatStreamException(
"Failed to append to stream: " + resp.getError());
ex.getAppendResponses().add(resp);
throw ex;
}
buffer.setLength(0);
return resp;
});
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,9 @@ CompletableFuture<AdminConversationsWhitelistListGroupsLinkedToChannelResponse>
CompletableFuture<ChatStopStreamResponse> chatStopStream(ChatStopStreamRequest req);

CompletableFuture<ChatStopStreamResponse> chatStopStream(RequestConfigurator<ChatStopStreamRequest.ChatStopStreamRequestBuilder> req);


AsyncChatStreamHelper asyncChatStreamHelper(RequestConfigurator<AsyncChatStreamHelper.AsyncChatStreamHelperBuilder> req);

CompletableFuture<ChatUpdateResponse> chatUpdate(ChatUpdateRequest req);

CompletableFuture<ChatUpdateResponse> chatUpdate(RequestConfigurator<ChatUpdateRequest.ChatUpdateRequestBuilder> req);
Expand Down
Loading