Skip to content

Conversation

@dsfaccini
Copy link
Collaborator

@dsfaccini dsfaccini commented Dec 11, 2025

Problem

Sync history processors fail inside Temporal workflows because anyio.to_thread.run_sync tries to create threads, which Temporal's sandboxed event loop doesn't support. This causes NotImplementedError.

Solution

Use a ContextVar to control when run_in_executor() should execute sync functions directly (blocking) vs using threading. This follows the approach discussed in the issue comments and approved by @DouweM.

Changes

  • _utils.py: Added _prefer_blocking_execution ContextVar, removed Temporal detection code
  • temporal/_agent.py: Set ContextVar in _temporal_overrides() context manager
  • test_utils.py: Updated test to use ContextVar instead of mocking Temporal internals
  • test_temporal.py: Added integration test for sync history processors in workflows

Testing

  • Unit test verifies ContextVar controls blocking behavior
  • Integration test validates sync history processors work in Temporal workflows

@github-actions
Copy link

github-actions bot commented Dec 11, 2025

Docs Preview

commit: ac164f9
Preview URL: https://45b5d94b-pydantic-ai-previews.pydantic.workers.dev

with super().override(model=self._model, toolsets=self._toolsets, tools=[]):
token = self._temporal_overrides_active.set(True)
temporal_active_token = self._temporal_overrides_active.set(True)
blocking_token = _prefer_blocking_execution.set(True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a contextmanager exposed by the utils module, so that we don't deal with the contextvar directly here. See the current_run_context feature in #3537

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can add that contextmanager to the with statement above

return messages[1:] if len(messages) > 1 else messages


agent_with_sync_history_processor = Agent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to just ahead of the workflow that uses it, to keep the test file consistently ordered

simple_temporal_agent = TemporalAgent(simple_agent, activity_config=BASE_ACTIVITY_CONFIG)


def drop_first_message_sync(messages: list[ModelMessage]) -> list[ModelMessage]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this an inline lambda as it's very simple

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyright always complains about lambdas, and the length causes a line break, so I'd prefer to keep the extra 1-2 lines with proper types and a nice name



async def test_run_in_executor_with_blocking_execution_enabled() -> None:
from pydantic_ai._utils import _prefer_blocking_execution # pyright: ignore[reportPrivateUsage]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above; let's not use the private var, but a new public contextmanager


Other than that, any agent and toolset will just work!

### Instructions Functions, Output Functions, and History Processors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just remove the entire section as they will "just work" now, so they don't really need to be mentioned under Temporal Integration Considerations.

The "use async when possible" recommendation is already in https://ai.pydantic.dev/tools-advanced/#parallel-tool-calls-concurrency

_P = ParamSpec('_P')
_R = TypeVar('_R')

_prefer_blocking_execution: ContextVar[bool] = ContextVar('_prefer_blocking_execution', default=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"prefer" makes it sound like this expresses a preference, that may or may not be respected. So "enable" or no prefix at all would be better



@contextmanager
def blocking_execution() -> Iterator[None]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with _utils.blocking_execution() makes it seems like all execution will be blocking, but that's not true because of async etc. So maybe we can rename this to something like with disable_threads()?


@contextmanager
def _temporal_overrides(self) -> Iterator[None]:
from pydantic_ai._utils import blocking_execution
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the top please! Imports inside methods are exclusively for circular import issues, and files that depend on optional packages

return result.output


async def test_temporal_agent_with_sync_history_processor(allow_model_requests: None, client: Client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test sync instructions as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

3 participants