-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix sync history processors in Temporal workflows using ContextVar #3704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
DouweM marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,12 +7,23 @@ | |
| import time | ||
| import uuid | ||
| from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterable, Iterator | ||
| from contextlib import asynccontextmanager, suppress | ||
| from contextlib import asynccontextmanager, contextmanager, suppress | ||
| from contextvars import ContextVar | ||
| from dataclasses import dataclass, fields, is_dataclass | ||
| from datetime import datetime, timezone | ||
| from functools import partial | ||
| from types import GenericAlias | ||
| from typing import TYPE_CHECKING, Any, Generic, TypeAlias, TypeGuard, TypeVar, get_args, get_origin, overload | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Generic, | ||
| TypeAlias, | ||
| TypeGuard, | ||
| TypeVar, | ||
| get_args, | ||
| get_origin, | ||
| overload, | ||
| ) | ||
|
|
||
| from anyio.to_thread import run_sync | ||
| from pydantic import BaseModel, TypeAdapter | ||
|
|
@@ -41,8 +52,33 @@ | |
| _P = ParamSpec('_P') | ||
| _R = TypeVar('_R') | ||
|
|
||
| _prefer_blocking_execution: ContextVar[bool] = ContextVar('_prefer_blocking_execution', default=False) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "prefer" makes it sound like this expresses a preference, that may or may not be respected. So "enable" or no prefix at all would be better |
||
|
|
||
|
|
||
| @contextmanager | ||
| def blocking_execution() -> Iterator[None]: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| """Context manager to enable blocking execution mode. | ||
| Inside this context, sync functions will execute inline rather than | ||
| being sent to a thread pool via [`anyio.to_thread.run_sync`][anyio.to_thread.run_sync]. | ||
| This is useful in environments where threading is restricted, such as | ||
| Temporal workflows which use a sandboxed event loop. | ||
| Yields: | ||
| None | ||
| """ | ||
| token = _prefer_blocking_execution.set(True) | ||
| try: | ||
| yield | ||
| finally: | ||
| _prefer_blocking_execution.reset(token) | ||
|
|
||
|
|
||
| async def run_in_executor(func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R: | ||
| if _prefer_blocking_execution.get(): | ||
| return func(*args, **kwargs) | ||
|
|
||
| wrapped_func = partial(func, *args, **kwargs) | ||
| return await run_sync(wrapped_func) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -243,17 +243,22 @@ def temporal_activities(self) -> list[Callable[..., Any]]: | |
|
|
||
| @contextmanager | ||
| def _temporal_overrides(self) -> Iterator[None]: | ||
| from pydantic_ai._utils import blocking_execution | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To the top please! Imports inside methods are exclusively for circular import issues, and files that depend on optional packages |
||
|
|
||
| # We reset tools here as the temporalized function toolset is already in self._toolsets. | ||
| with super().override(model=self._model, toolsets=self._toolsets, tools=[]): | ||
| token = self._temporal_overrides_active.set(True) | ||
| with ( | ||
| super().override(model=self._model, toolsets=self._toolsets, tools=[]), | ||
| blocking_execution(), | ||
| ): | ||
| temporal_active_token = self._temporal_overrides_active.set(True) | ||
| try: | ||
| yield | ||
| except PydanticSerializationError as e: | ||
| raise UserError( | ||
| "The `deps` object failed to be serialized. Temporal requires all objects that are passed to activities to be serializable using Pydantic's `TypeAdapter`." | ||
| ) from e | ||
| finally: | ||
| self._temporal_overrides_active.reset(token) | ||
| self._temporal_overrides_active.reset(temporal_active_token) | ||
|
|
||
| @overload | ||
| async def run( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| interactions: | ||
| - request: | ||
| headers: | ||
| accept: | ||
| - application/json | ||
| accept-encoding: | ||
| - gzip, deflate | ||
| connection: | ||
| - keep-alive | ||
| content-length: | ||
| - '105' | ||
| content-type: | ||
| - application/json | ||
| host: | ||
| - api.openai.com | ||
| method: POST | ||
| parsed_body: | ||
| messages: | ||
| - content: What is the capital of Mexico? | ||
| role: user | ||
| model: gpt-4o | ||
| stream: false | ||
| uri: https://api.openai.com/v1/chat/completions | ||
| response: | ||
| headers: | ||
| access-control-expose-headers: | ||
| - X-Request-ID | ||
| alt-svc: | ||
| - h3=":443"; ma=86400 | ||
| connection: | ||
| - keep-alive | ||
| content-length: | ||
| - '838' | ||
| content-type: | ||
| - application/json | ||
| openai-organization: | ||
| - user-grnwlxd1653lxdzp921aoihz | ||
| openai-processing-ms: | ||
| - '324' | ||
| openai-project: | ||
| - proj_FYsIItHHgnSPdHBVMzhNBWGa | ||
| openai-version: | ||
| - '2020-10-01' | ||
| strict-transport-security: | ||
| - max-age=31536000; includeSubDomains; preload | ||
| transfer-encoding: | ||
| - chunked | ||
| parsed_body: | ||
| choices: | ||
| - finish_reason: stop | ||
| index: 0 | ||
| logprobs: null | ||
| message: | ||
| annotations: [] | ||
| content: The capital of Mexico is Mexico City. | ||
| refusal: null | ||
| role: assistant | ||
| created: 1765424931 | ||
| id: chatcmpl-ClRxbbqMv20jQuYMqU1BaFBftlZWS | ||
| model: gpt-4o-2024-08-06 | ||
| object: chat.completion | ||
| service_tier: default | ||
| system_fingerprint: fp_83554c687e | ||
| usage: | ||
| completion_tokens: 8 | ||
| completion_tokens_details: | ||
| accepted_prediction_tokens: 0 | ||
| audio_tokens: 0 | ||
| reasoning_tokens: 0 | ||
| rejected_prediction_tokens: 0 | ||
| prompt_tokens: 14 | ||
| prompt_tokens_details: | ||
| audio_tokens: 0 | ||
| cached_tokens: 0 | ||
| total_tokens: 22 | ||
| status: | ||
| code: 200 | ||
| message: OK | ||
| version: 1 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1249,6 +1249,48 @@ async def test_temporal_agent_run_sync_in_workflow(allow_model_requests: None, c | |
| ) | ||
|
|
||
|
|
||
| def drop_first_message(msgs: list[ModelMessage]) -> list[ModelMessage]: | ||
| return msgs[1:] if len(msgs) > 1 else msgs | ||
|
|
||
|
|
||
| agent_with_sync_history_processor = Agent( | ||
| model, name='agent_with_sync_history_processor', history_processors=[drop_first_message] | ||
| ) | ||
| temporal_agent_with_sync_history_processor = TemporalAgent( | ||
| agent_with_sync_history_processor, activity_config=BASE_ACTIVITY_CONFIG | ||
| ) | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class AgentWithSyncHistoryProcessorWorkflow: | ||
| @workflow.run | ||
| async def run(self, prompt: str) -> str: | ||
| result = await temporal_agent_with_sync_history_processor.run(prompt) | ||
| return result.output | ||
|
|
||
|
|
||
| async def test_temporal_agent_with_sync_history_processor(allow_model_requests: None, client: Client): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we test sync instructions as well? |
||
| """Test that sync history processors work inside Temporal workflows. | ||
|
|
||
| This validates that the _prefer_blocking_execution ContextVar is properly set | ||
| by TemporalAgent._temporal_overrides(), allowing sync history processors to | ||
| execute without triggering NotImplementedError from anyio.to_thread.run_sync. | ||
| """ | ||
| async with Worker( | ||
| client, | ||
| task_queue=TASK_QUEUE, | ||
| workflows=[AgentWithSyncHistoryProcessorWorkflow], | ||
| plugins=[AgentPlugin(temporal_agent_with_sync_history_processor)], | ||
| ): | ||
| output = await client.execute_workflow( | ||
| AgentWithSyncHistoryProcessorWorkflow.run, | ||
| args=['What is the capital of Mexico?'], | ||
| id=AgentWithSyncHistoryProcessorWorkflow.__name__, | ||
| task_queue=TASK_QUEUE, | ||
| ) | ||
| assert output == snapshot('The capital of Mexico is Mexico City.') | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class SimpleAgentWorkflowWithRunStream: | ||
| @workflow.run | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just remove the entire section as they will "just work" now, so they don't really need to be mentioned under
Temporal Integration Considerations.The "use async when possible" recommendation is already in https://ai.pydantic.dev/tools-advanced/#parallel-tool-calls-concurrency