-
Notifications
You must be signed in to change notification settings - Fork 850
feat: support and flush chunks in the chat stream helper #1809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: zimeg-feat-ai-apps-chunks
Are you sure you want to change the base?
Changes from all commits
5de794b
61d6d53
85081e1
a6bb951
92c93e0
7c32814
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,10 @@ | ||
| import json | ||
| import logging | ||
| from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union | ||
| from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union | ||
|
|
||
| import slack_sdk.errors as e | ||
| from slack_sdk.models.blocks.blocks import Block | ||
| from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk | ||
| from slack_sdk.models.metadata import Metadata | ||
| from slack_sdk.web.slack_response import SlackResponse | ||
|
|
||
|
|
@@ -65,7 +66,8 @@ def __init__( | |
| def append( | ||
| self, | ||
| *, | ||
| markdown_text: str, | ||
| markdown_text: Optional[str] = None, | ||
| chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, | ||
|
Comment on lines
+69
to
+70
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since both arguments are now optional what happens if a developer does not pass any of these arguments when using the method? |
||
| **kwargs, | ||
| ) -> Optional[SlackResponse]: | ||
| """Append to the stream. | ||
|
|
@@ -74,6 +76,7 @@ def append( | |
| is stopped this method cannot be called. | ||
|
|
||
| Args: | ||
| chunks: An array of streaming chunks that can contain either markdown text or task updates. | ||
| markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is | ||
| what will be appended to the message received so far. | ||
| **kwargs: Additional arguments passed to the underlying API calls. | ||
|
|
@@ -101,9 +104,10 @@ def append( | |
| raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") | ||
| if kwargs.get("token"): | ||
| self._token = kwargs.pop("token") | ||
| self._buffer += markdown_text | ||
| if len(self._buffer) >= self._buffer_size: | ||
| return self._flush_buffer(**kwargs) | ||
| if markdown_text is not None: | ||
| self._buffer += markdown_text | ||
| if len(self._buffer) >= self._buffer_size or chunks is not None: | ||
| return self._flush_buffer(chunks=chunks, **kwargs) | ||
|
Comment on lines
+107
to
+110
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm understanding this correctly if I do streamer = client.chat_stream(
channel="C0123456789",
recipient_team_id="T0123456789",
recipient_user_id="U0123456789",
thread_ts="10101010101.010101",
)
streamer.append(markdown_text="\n")
streamer.append(
chunks=[
MarkdownTextChunk(
text="Hello.\nI have received the task. ",
),
],
)
|
||
| details = { | ||
| "buffer_length": len(self._buffer), | ||
| "buffer_size": self._buffer_size, | ||
|
|
@@ -119,6 +123,7 @@ def stop( | |
| self, | ||
| *, | ||
| markdown_text: Optional[str] = None, | ||
| chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, | ||
| blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, | ||
| metadata: Optional[Union[Dict, Metadata]] = None, | ||
| **kwargs, | ||
|
|
@@ -127,6 +132,7 @@ def stop( | |
|
|
||
| Args: | ||
| blocks: A list of blocks that will be rendered at the bottom of the finalized message. | ||
| chunks: An array of streaming chunks that can contain either markdown text or task updates. | ||
| markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is | ||
| what will be appended to the message received so far. | ||
| metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you | ||
|
|
@@ -167,26 +173,36 @@ def stop( | |
| raise e.SlackRequestError("Failed to stop stream: stream not started") | ||
| self._stream_ts = str(response["ts"]) | ||
| self._state = "in_progress" | ||
| flushings: List[Union[Dict, Chunk]] = [] | ||
| if len(self._buffer) != 0: | ||
| flushings.append(MarkdownTextChunk(text=self._buffer)) | ||
| if chunks is not None: | ||
| flushings.extend(chunks) | ||
|
Comment on lines
+176
to
+180
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! This allows the |
||
| response = self._client.chat_stopStream( | ||
| token=self._token, | ||
| channel=self._stream_args["channel"], | ||
| ts=self._stream_ts, | ||
| blocks=blocks, | ||
| markdown_text=self._buffer, | ||
| chunks=flushings, | ||
| metadata=metadata, | ||
| **kwargs, | ||
| ) | ||
| self._state = "completed" | ||
| return response | ||
|
|
||
| def _flush_buffer(self, **kwargs) -> SlackResponse: | ||
| """Flush the internal buffer by making appropriate API calls.""" | ||
| def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> SlackResponse: | ||
| """Flush the internal buffer with chunks by making appropriate API calls.""" | ||
| flushings: List[Union[Dict, Chunk]] = [] | ||
| if len(self._buffer) != 0: | ||
| flushings.append(MarkdownTextChunk(text=self._buffer)) | ||
| if chunks is not None: | ||
| flushings.extend(chunks) | ||
| if not self._stream_ts: | ||
| response = self._client.chat_startStream( | ||
| **self._stream_args, | ||
| token=self._token, | ||
| **kwargs, | ||
| markdown_text=self._buffer, | ||
| chunks=flushings, | ||
| ) | ||
| self._stream_ts = response.get("ts") | ||
| self._state = "in_progress" | ||
|
|
@@ -196,7 +212,7 @@ def _flush_buffer(self, **kwargs) -> SlackResponse: | |
| channel=self._stream_args["channel"], | ||
| ts=self._stream_ts, | ||
| **kwargs, | ||
| markdown_text=self._buffer, | ||
| chunks=flushings, | ||
| ) | ||
| self._buffer = "" | ||
| return response | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2621,7 +2621,7 @@ def chat_appendStream( | |
| *, | ||
| channel: str, | ||
| ts: str, | ||
| markdown_text: str, | ||
| markdown_text: Optional[str] = None, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🌟 it is much easier to have both markdown and chunks optional! |
||
| chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, | ||
| **kwargs, | ||
| ) -> SlackResponse: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| from slack_sdk.models.blocks.basic_components import FeedbackButtonObject | ||
| from slack_sdk.models.blocks.block_elements import FeedbackButtonsElement, IconButtonElement | ||
| from slack_sdk.models.blocks.blocks import ContextActionsBlock | ||
| from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk | ||
| from tests.mock_web_api_server import cleanup_mock_web_api_server, setup_mock_web_api_server | ||
| from tests.slack_sdk.web.mock_web_api_handler import MockHandler | ||
|
|
||
|
|
@@ -105,7 +106,10 @@ def test_streams_a_short_message(self): | |
| stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {}) | ||
| self.assertEqual(stop_request.get("channel"), "C0123456789") | ||
| self.assertEqual(stop_request.get("ts"), "123.123") | ||
| self.assertEqual(stop_request.get("markdown_text"), "nice!") | ||
| self.assertEqual( | ||
| json.dumps(stop_request.get("chunks")), | ||
| '[{"text": "nice!", "type": "markdown_text"}]', | ||
| ) | ||
|
|
||
| def test_streams_a_long_message(self): | ||
| streamer = self.client.chat_stream( | ||
|
|
@@ -146,13 +150,19 @@ def test_streams_a_long_message(self): | |
| start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {}) | ||
| self.assertEqual(start_request.get("channel"), "C0123456789") | ||
| self.assertEqual(start_request.get("thread_ts"), "123.000") | ||
| self.assertEqual(start_request.get("markdown_text"), "**this messag") | ||
| self.assertEqual( | ||
| json.dumps(start_request.get("chunks")), | ||
| '[{"text": "**this messag", "type": "markdown_text"}]', | ||
| ) | ||
| self.assertEqual(start_request.get("recipient_team_id"), "T0123456789") | ||
| self.assertEqual(start_request.get("recipient_user_id"), "U0123456789") | ||
|
|
||
| append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {}) | ||
| self.assertEqual(append_request.get("channel"), "C0123456789") | ||
| self.assertEqual(append_request.get("markdown_text"), "e is bold!") | ||
| self.assertEqual( | ||
| json.dumps(append_request.get("chunks")), | ||
| '[{"text": "e is bold!", "type": "markdown_text"}]', | ||
| ) | ||
| self.assertEqual(append_request.get("token"), "xoxb-chat_stream_test_token1") | ||
| self.assertEqual(append_request.get("ts"), "123.123") | ||
|
|
||
|
|
@@ -162,10 +172,74 @@ def test_streams_a_long_message(self): | |
| '[{"elements": [{"negative_button": {"text": {"emoji": true, "text": "bad", "type": "plain_text"}, "value": "-1"}, "positive_button": {"text": {"emoji": true, "text": "good", "type": "plain_text"}, "value": "+1"}, "type": "feedback_buttons"}, {"icon": "trash", "text": {"emoji": true, "text": "delete", "type": "plain_text"}, "type": "icon_button"}], "type": "context_actions"}]', | ||
| ) | ||
| self.assertEqual(stop_request.get("channel"), "C0123456789") | ||
| self.assertEqual(stop_request.get("markdown_text"), "**") | ||
| self.assertEqual( | ||
| json.dumps(stop_request.get("chunks")), | ||
| '[{"text": "**", "type": "markdown_text"}]', | ||
| ) | ||
| self.assertEqual(stop_request.get("token"), "xoxb-chat_stream_test_token2") | ||
| self.assertEqual(stop_request.get("ts"), "123.123") | ||
|
|
||
| def test_streams_a_chunk_message(self): | ||
| streamer = self.client.chat_stream( | ||
| channel="C0123456789", | ||
| recipient_team_id="T0123456789", | ||
| recipient_user_id="U0123456789", | ||
| thread_ts="123.000", | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i tested this with an empty call to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A warning is a nice idea. I wouldn't consider it a blocker for the PR, if we want to move forward and add the warning later. |
||
| streamer.append(markdown_text="**this is ") | ||
| streamer.append(markdown_text="buffered**") | ||
| streamer.append( | ||
| chunks=[ | ||
| TaskUpdateChunk( | ||
| id="001", | ||
| title="Counting...", | ||
| status="pending", | ||
| ), | ||
| ], | ||
| ) | ||
| streamer.append( | ||
| chunks=[ | ||
| MarkdownTextChunk(text="**this is unbuffered**"), | ||
| ], | ||
| ) | ||
| streamer.append(markdown_text="\n") | ||
| streamer.stop( | ||
| chunks=[ | ||
| MarkdownTextChunk(text=":space_invader:"), | ||
| ], | ||
| ) | ||
|
|
||
| self.assertEqual(self.received_requests.get("/chat.startStream", 0), 1) | ||
| self.assertEqual(self.received_requests.get("/chat.appendStream", 0), 1) | ||
| self.assertEqual(self.received_requests.get("/chat.stopStream", 0), 1) | ||
|
|
||
| if hasattr(self.thread.server, "chat_stream_requests"): | ||
| start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {}) | ||
| self.assertEqual(start_request.get("channel"), "C0123456789") | ||
| self.assertEqual(start_request.get("thread_ts"), "123.000") | ||
| self.assertEqual( | ||
| json.dumps(start_request.get("chunks")), | ||
| '[{"text": "**this is buffered**", "type": "markdown_text"}, {"id": "001", "status": "pending", "title": "Counting...", "type": "task_update"}]', | ||
| ) | ||
| self.assertEqual(start_request.get("recipient_team_id"), "T0123456789") | ||
| self.assertEqual(start_request.get("recipient_user_id"), "U0123456789") | ||
|
|
||
| append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {}) | ||
| self.assertEqual(append_request.get("channel"), "C0123456789") | ||
| self.assertEqual(append_request.get("ts"), "123.123") | ||
| self.assertEqual( | ||
| json.dumps(append_request.get("chunks")), | ||
| '[{"text": "**this is unbuffered**", "type": "markdown_text"}]', | ||
| ) | ||
|
|
||
| stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {}) | ||
| self.assertEqual(stop_request.get("channel"), "C0123456789") | ||
| self.assertEqual(stop_request.get("ts"), "123.123") | ||
| self.assertEqual( | ||
| json.dumps(stop_request.get("chunks")), | ||
| '[{"text": "\\n", "type": "markdown_text"}, {"text": ":space_invader:", "type": "markdown_text"}]', | ||
| ) | ||
|
|
||
| def test_streams_errors_when_appending_to_an_unstarted_stream(self): | ||
| streamer = self.client.chat_stream( | ||
| channel="C0123456789", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😉 quite the visual name! 💩 Would
chunks_to_flushalso work? Perhaps too long.