-
Notifications
You must be signed in to change notification settings - Fork 3k
Pluggable Transport Abstractions #1591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pluggable Transport Abstractions #1591
Conversation
|
The session objects aren't transport specific, what problem are you trying to solve? Why does this PR has so many "👍" ? |
Hey @Kludex , Thanks for your reply. Currently Since gRPC Transport does not require read write streams, we have created abstract classes in this PR following interface segregation principle so that the abstract classes could be inherited by the current transport in this sdk as well as any future transport like This is part of modelcontextprotocol/modelcontextprotocol#1352 initiative. Please let me know if I need to explain further. Thanks. |
Yeah, I don't think there's any problem in that. Again, the session classes do not depend on any transport.
The minimum is having the streams, check the
Are you talking about what is implemented in: #1936 ? Streams are the same as queue, you do require queues in that PR, replace the |
|
I've added a PR to merge in here - it's a full implementation of MCP using gRPC. I've been using gRPC/thrift/avro for awhile - and this PR implements full backward compatiibility with the proto I created. I'm totally open to changes and hope I can contribute to the efforts. Most important - it implements true streaming calls. I've added three documents in the proto directory that document the following:
|
|
If you want to make a point, please make it here. I don't think it's reasonable to tell me to read documents somewhere else.
Use I'll be closing this, since a new interface or abstraction is not needed. |
|
Fair point on the documentation - I will put them as a separate reply. I didn't want to inundate you - apologies for that as I'd love to have a discussion about the tunneling. I've refactored to use I see Google Cloud just pushed their mcp-grpc-transport-proto today. This validates the typed RPC approach - they're not wrapping JSON-RPC in protobuf, they have typed RPCs for each MCP operation: service Mcp {
rpc ListResources(ListResourcesRequest) returns (ListResourcesResponse);
rpc ListTools(ListToolsRequest) returns (ListToolsResponse);
rpc CallTool(CallToolRequest) returns (stream CallToolResponse);
// ...
}This aligns with what I implemented. However, looking at their proto, I think there's room for improvement: Google's proto is mostly unary RPCs. Only They use a Here's what I propose: I've drafted an extension that adds true streaming while staying compatible with Google's base proto: service McpStreaming {
// Bidirectional session for multiplexed operations
rpc Session(stream SessionRequest) returns (stream SessionResponse);
// Push notifications for resource changes
rpc WatchResources(WatchResourcesRequest) returns (stream WatchResourcesResponse);
// Stream large resources in chunks
rpc ReadResourceChunked(ReadResourceChunkedRequest) returns (stream ResourceChunk);
// Parallel tool execution
rpc StreamToolCalls(stream StreamToolCallsRequest) returns (stream StreamToolCallsResponse);
}Another point I'd like to discuss: Schema registry integration There's also an opportunity here for dynamic schema management. MCP tools declare JSON schemas for inputs, but with gRPC we could integrate with schema registries (like Confluent Schema Registry, Apicurio, or Amazon Glue). This will allow us to:
I've forked Google's proto repo and drafted a proposal: https://github.com/ai-pipestream/mcp-grpc-transport-proto/tree/streaming-extensions (i'll push more changes in a moment) Questions I'd like feedback on:
Happy to open PRs to either Google's repo or the MCP SDK to continue the discussion with concrete code. |
|
I think you are antecipating yourself a bit here. I have no interest nor opinion in how the specific gRPC transport implementation should look like.
How do you see that happening, and why does the user needs to define the chunks the server will send themselves? I'm asking those questions based on the snippet I saw in your branch: import asyncio
from mcp import StreamPromptCompletionChunk
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Streaming Prompt Completion")
@mcp.stream_prompt_completion()
async def stream_prompt_completion(name: str, arguments: dict[str, str] | None):
query = (arguments or {}).get("q", "")
tokens = [f"Prompt {name}: ", query, " ...done"]
for token in tokens[:-1]:
yield StreamPromptCompletionChunk(token=token)
await asyncio.sleep(0.05)
yield StreamPromptCompletionChunk(
token=tokens[-1],
isFinal=True,
finishReason="stop",
)I don't think the above is how we want to do it in any level - we have a lower level server and the high level (more user-friendly
I don't have an opinion nor I care about gRPC, but it should not live in this repository given that it's an extension. Going back to the original intent of this PR: the gRPC transport implemented can't be compliant given that MCP is tightly coupled with JSONRPC, which are reflected by the schemas in https://github.com/modelcontextprotocol/modelcontextprotocol/blob/main/schema/2025-11-25/schema.ts, and the documentation. So... Now I understand why the If we want to have an abstraction that will make sense for transport implementers, the definition of the MCP types itself needs to change. |
|
Totally understand - I should've told you my motivation and apologies for trying it here. I initially saw the gRPC CTA: I submitted an email and it was suggested to join the discussion here for the gRPC work that was being done - my mistake though, I didn't realize this was the wrong place for this. To answer your question about the I meant to show what native gRPC streaming could enable, not a thought-out UX. The chunking abstraction definitely shouldn't be exposed to users like that at the high level. I'll go ahead and focus on the gRPC proto definition with Google's repo. Thanks for the clarification on where this discussion belongs. |
Just to be clear, it's the specificities of gRPC that I don't care. I'm happy to discuss ways to create an interface to make it easier for people to work on their own transport implementations. :) |
|
That's actually what I was exploring with the tunneling proposal - a way to create an interface that works for both streaming transports (like gRPC) and cursor-based transports (like JSON-RPC) without changing either wire protocol. The ProblemRight now, if I want to implement a gRPC transport, I'm forced into "fake streaming" - the server has to buffer everything into a
No memory or latency benefits - just extra steps. The Idea: StreamingAdapterWhat if streaming was the internal abstraction, and cursor-based transports just emulated it? class StreamingAdapter:
"""Unified streaming interface over any transport."""
def __init__(self, transport: ClientTransportSession):
self._transport = transport
async def stream_list_tools(self) -> AsyncIterator[types.Tool]:
if isinstance(self._transport, GrpcClientTransport):
# Native streaming - zero overhead
async for tool in self._transport._stream_list_tools_native():
yield tool
else:
# Cursor-based - iterate pages internally
cursor = None
while True:
result = await self._transport.list_tools(cursor=cursor)
for tool in result.tools:
yield tool
cursor = result.nextCursor
if cursor is None:
break
async def list_tools(self) -> types.ListToolsResult:
"""Backward compatible - collects stream into result."""
tools = [t async for t in self.stream_list_tools()]
return types.ListToolsResult(tools=tools, nextCursor=None)The key points:
What Changes, What Doesn't
The JSON-RPC wire protocol stays identical. Existing servers and clients don't need updates. Streaming is purely additive. Backpressure RealityOne thing I didn't want to hide - transports have different backpressure characteristics:
The adapter preserves gRPC's backpressure. For cursor-based transports, "backpressure" is implicit in when the client requests the next page. I think adapters should preserve these realities rather than pretending they don't exist. Why I like this approachSimply put, I think this gives us the best of both worlds for transport implementers. This approach:
I'm not saying this is the right answer - there's probably many ways to do this. I was inspired by how IPv6 tunneling works - same concept of letting the new protocol work natively where supported while transparently bridging over the old one. I can write this up in Java and Python. The real win is that MCP implementations could react instantly to requests and maintain 2-way communication between agents without blocking on replies. |
The types in the spec itself need to be decoupled from JSONRPC. Once they do, we can look for ways to make the BaseSession to be more "pluggable". |
|
@Kludex that's exactly the direction I'd like to work on. I'd suggest starting with the gRPC proto definition in that repo, with the goal that the JSON-RPC interface stays unchanged and works alongside it - not a separate spec maintained in parallel. The current spec can do this, but it needs true streaming added in the design. The biggest win from the gRPC spec would be introducing true streaming without cursors - this is where we see significant performance (memory and network) gains as well as better integration with AI workloads in data mesh scenarios and client chatbot services. Once we have a solid spec, a tunneling approach will emerge because the gRPC service would be streaming OOTB. Also, it's best to design the gRPC definition using gRPC specs and avoid a 100% 1:1 mapping of the JSON-RPC API since the streaming aspect already deviates from it. The reason is that gRPC specs prioritize backward compatibility and cross-language design with an emphasis on schemas. Working through the gRPC definition first will surface the right design questions for the transport layer. I noticed https://github.com/GoogleCloudPlatform/mcp-grpc-transport-proto was pushed - if we address some of the spec issues there (I opened 2 issues and an initial PR that surfaces some of the gRPC concerns), it will make the individual SDK conversations easier to understand how a transport layer can be defined. |
Motivation and Context
Add support for Pluggable Transport Abstractions in MCP Python SDK.
Add abstractions to support
Pluggable Transport.This PR majorly adds two abstract classes and APIs that every transport must implement. The abstract classes are
src/mcp/client/transport_session.py->ClientTransportSessionsrc/mcp/server/transport_session.py->ServerTransportSessionBoth the above classes have minimal APIs that every transport must implement in order to achieve the features defined in MCP Specification
Additionally existing transport classes which are based on
JSONRPCinherits from these two new classes.src/mcp/client/session.py->ClientSession-> inherits fromsrc/mcp/client/transport_session.py->ClientTransportSessionsrc/mcp/server/session.py->ServerSession-> inherits fromsrc/mcp/server/transport_session.py->ServerTransportSessionType Hints Fixes
Since
ClientSessionandServerSessionhas a higher level abstraction so this PR also updates the type hints to the parent classes. Precisely - places where we useClientSessionare updated to useClientTransportSessionand similarlyServerSessiontype hints are updated to useServerTransportSession.How Has This Been Tested?
Tested using
pyrightanduv run pytest. Changes are also validated usingCIruns.Breaking Changes
No.
Types of changes
Checklist
Additional context
In future if we want to add more transports, those could implement abstract classes introduced in the PR which are -
ClientTransportSesssionandServerTransportSession.