Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions slack_sdk/web/async_chat_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

import json
import logging
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

import slack_sdk.errors as e
from slack_sdk.models.blocks.blocks import Block
from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk
from slack_sdk.models.metadata import Metadata
from slack_sdk.web.async_slack_response import AsyncSlackResponse

Expand Down Expand Up @@ -75,7 +76,8 @@ def __init__(
async def append(
self,
*,
markdown_text: str,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> Optional[AsyncSlackResponse]:
"""Append to the stream.
Expand All @@ -84,6 +86,7 @@ async def append(
is stopped this method cannot be called.

Args:
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
**kwargs: Additional arguments passed to the underlying API calls.
Expand Down Expand Up @@ -111,9 +114,10 @@ async def append(
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
if kwargs.get("token"):
self._token = kwargs.pop("token")
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size:
return await self._flush_buffer(**kwargs)
if markdown_text is not None:
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size or chunks is not None:
return await self._flush_buffer(chunks=chunks, **kwargs)
details = {
"buffer_length": len(self._buffer),
"buffer_size": self._buffer_size,
Expand All @@ -129,6 +133,7 @@ async def stop(
self,
*,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
metadata: Optional[Union[Dict, Metadata]] = None,
**kwargs,
Expand All @@ -137,6 +142,7 @@ async def stop(

Args:
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
Expand Down Expand Up @@ -177,26 +183,36 @@ async def stop(
raise e.SlackRequestError("Failed to stop stream: stream not started")
self._stream_ts = str(response["ts"])
self._state = "in_progress"
flushings: List[Union[Dict, Chunk]] = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😉 quite the visual name! 💩 Would chunks_to_flush also work? Perhaps too long.

if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
response = await self._client.chat_stopStream(
token=self._token,
channel=self._stream_args["channel"],
ts=self._stream_ts,
blocks=blocks,
markdown_text=self._buffer,
chunks=flushings,
metadata=metadata,
**kwargs,
)
self._state = "completed"
return response

async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
"""Flush the internal buffer by making appropriate API calls."""
async def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> AsyncSlackResponse:
"""Flush the internal buffer with chunks by making appropriate API calls."""
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
if not self._stream_ts:
response = await self._client.chat_startStream(
**self._stream_args,
token=self._token,
**kwargs,
markdown_text=self._buffer,
chunks=flushings,
)
self._stream_ts = response.get("ts")
self._state = "in_progress"
Expand All @@ -206,7 +222,7 @@ async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse:
channel=self._stream_args["channel"],
ts=self._stream_ts,
**kwargs,
markdown_text=self._buffer,
chunks=flushings,
)
self._buffer = ""
return response
2 changes: 1 addition & 1 deletion slack_sdk/web/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2631,7 +2631,7 @@ async def chat_appendStream(
*,
channel: str,
ts: str,
markdown_text: str,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> AsyncSlackResponse:
Expand Down
36 changes: 26 additions & 10 deletions slack_sdk/web/chat_stream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
import logging
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union

import slack_sdk.errors as e
from slack_sdk.models.blocks.blocks import Block
from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk
from slack_sdk.models.metadata import Metadata
from slack_sdk.web.slack_response import SlackResponse

Expand Down Expand Up @@ -65,7 +66,8 @@ def __init__(
def append(
self,
*,
markdown_text: str,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
Comment on lines +69 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both arguments are now optional what happens if a developer does not pass any of these arguments when using the method?

**kwargs,
) -> Optional[SlackResponse]:
"""Append to the stream.
Expand All @@ -74,6 +76,7 @@ def append(
is stopped this method cannot be called.

Args:
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
**kwargs: Additional arguments passed to the underlying API calls.
Expand Down Expand Up @@ -101,9 +104,10 @@ def append(
raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}")
if kwargs.get("token"):
self._token = kwargs.pop("token")
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size:
return self._flush_buffer(**kwargs)
if markdown_text is not None:
self._buffer += markdown_text
if len(self._buffer) >= self._buffer_size or chunks is not None:
return self._flush_buffer(chunks=chunks, **kwargs)
Comment on lines +107 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this correctly if I do

streamer = client.chat_stream(
    channel="C0123456789",
    recipient_team_id="T0123456789",
    recipient_user_id="U0123456789",
    thread_ts="10101010101.010101",
)

streamer.append(markdown_text="\n")

streamer.append(
    chunks=[
        MarkdownTextChunk(
            text="Hello.\nI have received the task. ",
        ),
    ],
)
  • The first append with markdown will not be sent to Slack but rather added to the buffer
  • The second append with the chunks will be sent to Slack along with what ever may be in the buffer regarless of if the buffer has exceeded the buffer_size?

details = {
"buffer_length": len(self._buffer),
"buffer_size": self._buffer_size,
Expand All @@ -119,6 +123,7 @@ def stop(
self,
*,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None,
metadata: Optional[Union[Dict, Metadata]] = None,
**kwargs,
Expand All @@ -127,6 +132,7 @@ def stop(

Args:
blocks: A list of blocks that will be rendered at the bottom of the finalized message.
chunks: An array of streaming chunks that can contain either markdown text or task updates.
markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is
what will be appended to the message received so far.
metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you
Expand Down Expand Up @@ -167,26 +173,36 @@ def stop(
raise e.SlackRequestError("Failed to stop stream: stream not started")
self._stream_ts = str(response["ts"])
self._state = "in_progress"
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
Comment on lines +176 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This allows the markdown_text and chunks to be accepted and passed to the method using the chunks param 💯

response = self._client.chat_stopStream(
token=self._token,
channel=self._stream_args["channel"],
ts=self._stream_ts,
blocks=blocks,
markdown_text=self._buffer,
chunks=flushings,
metadata=metadata,
**kwargs,
)
self._state = "completed"
return response

def _flush_buffer(self, **kwargs) -> SlackResponse:
"""Flush the internal buffer by making appropriate API calls."""
def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> SlackResponse:
"""Flush the internal buffer with chunks by making appropriate API calls."""
flushings: List[Union[Dict, Chunk]] = []
if len(self._buffer) != 0:
flushings.append(MarkdownTextChunk(text=self._buffer))
if chunks is not None:
flushings.extend(chunks)
if not self._stream_ts:
response = self._client.chat_startStream(
**self._stream_args,
token=self._token,
**kwargs,
markdown_text=self._buffer,
chunks=flushings,
)
self._stream_ts = response.get("ts")
self._state = "in_progress"
Expand All @@ -196,7 +212,7 @@ def _flush_buffer(self, **kwargs) -> SlackResponse:
channel=self._stream_args["channel"],
ts=self._stream_ts,
**kwargs,
markdown_text=self._buffer,
chunks=flushings,
)
self._buffer = ""
return response
2 changes: 1 addition & 1 deletion slack_sdk/web/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2621,7 +2621,7 @@ def chat_appendStream(
*,
channel: str,
ts: str,
markdown_text: str,
markdown_text: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌟 it is much easier to have both markdown and chunks optional!

chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> SlackResponse:
Expand Down
2 changes: 1 addition & 1 deletion slack_sdk/web/legacy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2632,7 +2632,7 @@ def chat_appendStream(
*,
channel: str,
ts: str,
markdown_text: str,
markdown_text: Optional[str] = None,
chunks: Optional[Sequence[Union[Dict, Chunk]]] = None,
**kwargs,
) -> Union[Future, SlackResponse]:
Expand Down
82 changes: 78 additions & 4 deletions tests/slack_sdk/web/test_chat_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from slack_sdk.models.blocks.basic_components import FeedbackButtonObject
from slack_sdk.models.blocks.block_elements import FeedbackButtonsElement, IconButtonElement
from slack_sdk.models.blocks.blocks import ContextActionsBlock
from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk
from tests.mock_web_api_server import cleanup_mock_web_api_server, setup_mock_web_api_server
from tests.slack_sdk.web.mock_web_api_handler import MockHandler

Expand Down Expand Up @@ -105,7 +106,10 @@ def test_streams_a_short_message(self):
stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {})
self.assertEqual(stop_request.get("channel"), "C0123456789")
self.assertEqual(stop_request.get("ts"), "123.123")
self.assertEqual(stop_request.get("markdown_text"), "nice!")
self.assertEqual(
json.dumps(stop_request.get("chunks")),
'[{"text": "nice!", "type": "markdown_text"}]',
)

def test_streams_a_long_message(self):
streamer = self.client.chat_stream(
Expand Down Expand Up @@ -146,13 +150,19 @@ def test_streams_a_long_message(self):
start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {})
self.assertEqual(start_request.get("channel"), "C0123456789")
self.assertEqual(start_request.get("thread_ts"), "123.000")
self.assertEqual(start_request.get("markdown_text"), "**this messag")
self.assertEqual(
json.dumps(start_request.get("chunks")),
'[{"text": "**this messag", "type": "markdown_text"}]',
)
self.assertEqual(start_request.get("recipient_team_id"), "T0123456789")
self.assertEqual(start_request.get("recipient_user_id"), "U0123456789")

append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {})
self.assertEqual(append_request.get("channel"), "C0123456789")
self.assertEqual(append_request.get("markdown_text"), "e is bold!")
self.assertEqual(
json.dumps(append_request.get("chunks")),
'[{"text": "e is bold!", "type": "markdown_text"}]',
)
self.assertEqual(append_request.get("token"), "xoxb-chat_stream_test_token1")
self.assertEqual(append_request.get("ts"), "123.123")

Expand All @@ -162,10 +172,74 @@ def test_streams_a_long_message(self):
'[{"elements": [{"negative_button": {"text": {"emoji": true, "text": "bad", "type": "plain_text"}, "value": "-1"}, "positive_button": {"text": {"emoji": true, "text": "good", "type": "plain_text"}, "value": "+1"}, "type": "feedback_buttons"}, {"icon": "trash", "text": {"emoji": true, "text": "delete", "type": "plain_text"}, "type": "icon_button"}], "type": "context_actions"}]',
)
self.assertEqual(stop_request.get("channel"), "C0123456789")
self.assertEqual(stop_request.get("markdown_text"), "**")
self.assertEqual(
json.dumps(stop_request.get("chunks")),
'[{"text": "**", "type": "markdown_text"}]',
)
self.assertEqual(stop_request.get("token"), "xoxb-chat_stream_test_token2")
self.assertEqual(stop_request.get("ts"), "123.123")

def test_streams_a_chunk_message(self):
streamer = self.client.chat_stream(
channel="C0123456789",
recipient_team_id="T0123456789",
recipient_user_id="U0123456789",
thread_ts="123.000",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tested this with an empty call to .append() and i think we should raise an error/ warning for when append is sent empty 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A warning is a nice idea. I wouldn't consider it a blocker for the PR, if we want to move forward and add the warning later.

streamer.append(markdown_text="**this is ")
streamer.append(markdown_text="buffered**")
streamer.append(
chunks=[
TaskUpdateChunk(
id="001",
title="Counting...",
status="pending",
),
],
)
streamer.append(
chunks=[
MarkdownTextChunk(text="**this is unbuffered**"),
],
)
streamer.append(markdown_text="\n")
streamer.stop(
chunks=[
MarkdownTextChunk(text=":space_invader:"),
],
)

self.assertEqual(self.received_requests.get("/chat.startStream", 0), 1)
self.assertEqual(self.received_requests.get("/chat.appendStream", 0), 1)
self.assertEqual(self.received_requests.get("/chat.stopStream", 0), 1)

if hasattr(self.thread.server, "chat_stream_requests"):
start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {})
self.assertEqual(start_request.get("channel"), "C0123456789")
self.assertEqual(start_request.get("thread_ts"), "123.000")
self.assertEqual(
json.dumps(start_request.get("chunks")),
'[{"text": "**this is buffered**", "type": "markdown_text"}, {"id": "001", "status": "pending", "title": "Counting...", "type": "task_update"}]',
)
self.assertEqual(start_request.get("recipient_team_id"), "T0123456789")
self.assertEqual(start_request.get("recipient_user_id"), "U0123456789")

append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {})
self.assertEqual(append_request.get("channel"), "C0123456789")
self.assertEqual(append_request.get("ts"), "123.123")
self.assertEqual(
json.dumps(append_request.get("chunks")),
'[{"text": "**this is unbuffered**", "type": "markdown_text"}]',
)

stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {})
self.assertEqual(stop_request.get("channel"), "C0123456789")
self.assertEqual(stop_request.get("ts"), "123.123")
self.assertEqual(
json.dumps(stop_request.get("chunks")),
'[{"text": "\\n", "type": "markdown_text"}, {"text": ":space_invader:", "type": "markdown_text"}]',
)

def test_streams_errors_when_appending_to_an_unstarted_stream(self):
streamer = self.client.chat_stream(
channel="C0123456789",
Expand Down
Loading