Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ async def streamable_http_client(
http_client: Optional pre-configured httpx.AsyncClient. If None, a default
client with recommended MCP timeouts will be created. To configure headers,
authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here.
Note: User-provided clients do not receive SSRF redirect protection.
If redirect validation is required, use ``create_mcp_http_client()``
or configure redirect hooks manually.
terminate_on_close: If True, send a DELETE request to terminate the session when the context exits.

Yields:
Expand All @@ -543,6 +546,8 @@ async def streamable_http_client(
if client is None:
# Create default client with recommended MCP timeouts
client = create_mcp_http_client()
else:
logger.debug("Using user-provided HTTP client; SSRF redirect protection is not applied")

transport = StreamableHTTPTransport(url)

Expand Down
85 changes: 84 additions & 1 deletion src/mcp/shared/_httpx_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,84 @@
"""Utilities for creating standardized httpx AsyncClient instances."""

import logging
from enum import Enum
from typing import Any, Protocol

import httpx

__all__ = ["create_mcp_http_client", "MCP_DEFAULT_TIMEOUT", "MCP_DEFAULT_SSE_READ_TIMEOUT"]
logger = logging.getLogger(__name__)

__all__ = [
"MCP_DEFAULT_SSE_READ_TIMEOUT",
"MCP_DEFAULT_TIMEOUT",
"RedirectPolicy",
"create_mcp_http_client",
]

# Default MCP timeout configuration
MCP_DEFAULT_TIMEOUT = 30.0 # General operations (seconds)
MCP_DEFAULT_SSE_READ_TIMEOUT = 300.0 # SSE streams - 5 minutes (seconds)


class RedirectPolicy(Enum):
"""Policy for validating HTTP redirects to protect against SSRF attacks.

Attributes:
ALLOW_ALL: No restrictions on redirects (legacy behavior).
BLOCK_SCHEME_DOWNGRADE: Block HTTPS-to-HTTP downgrades on redirect (default).
ENFORCE_HTTPS: Only allow HTTPS redirect destinations.
"""

ALLOW_ALL = "allow_all"
BLOCK_SCHEME_DOWNGRADE = "block_scheme_downgrade"
ENFORCE_HTTPS = "enforce_https"


async def _check_redirect(response: httpx.Response, policy: RedirectPolicy) -> None:
"""Validate redirect responses against the configured policy.

This is installed as an httpx response event hook. It inspects redirect
responses (3xx with a ``Location`` header) and raises
:class:`httpx.HTTPStatusError` when the redirect violates *policy*.

Args:
response: The httpx response to check.
policy: The redirect policy to enforce.
"""
if not response.has_redirect_location:
return

original_url = response.request.url
redirect_url = httpx.URL(response.headers["Location"])
if redirect_url.is_relative_url:
redirect_url = original_url.join(redirect_url)

if policy == RedirectPolicy.BLOCK_SCHEME_DOWNGRADE:
if original_url.scheme == "https" and redirect_url.scheme == "http":
logger.warning(
"Blocked HTTPS-to-HTTP redirect from %s to %s",
original_url,
redirect_url,
)
raise httpx.HTTPStatusError(
f"HTTPS-to-HTTP redirect blocked: {original_url} -> {redirect_url}",
request=response.request,
response=response,
)
elif policy == RedirectPolicy.ENFORCE_HTTPS:
if redirect_url.scheme != "https":
logger.warning(
"Blocked non-HTTPS redirect from %s to %s",
original_url,
redirect_url,
)
raise httpx.HTTPStatusError(
f"Non-HTTPS redirect blocked: {original_url} -> {redirect_url}",
request=response.request,
response=response,
)


class McpHttpClientFactory(Protocol): # pragma: no branch
def __call__( # pragma: no branch
self,
Expand All @@ -24,18 +92,25 @@ def create_mcp_http_client(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
redirect_policy: RedirectPolicy = RedirectPolicy.BLOCK_SCHEME_DOWNGRADE,
) -> httpx.AsyncClient:
"""Create a standardized httpx AsyncClient with MCP defaults.

This function provides common defaults used throughout the MCP codebase:
- follow_redirects=True (always enabled)
- Default timeout of 30 seconds if not specified
- SSRF redirect protection via *redirect_policy*

Args:
headers: Optional headers to include with all requests.
timeout: Request timeout as httpx.Timeout object.
Defaults to 30 seconds if not specified.
auth: Optional authentication handler.
redirect_policy: Policy controlling which redirects are allowed.
Defaults to ``RedirectPolicy.BLOCK_SCHEME_DOWNGRADE`` which blocks
HTTPS-to-HTTP downgrades. Use ``RedirectPolicy.ENFORCE_HTTPS`` to
only allow HTTPS destinations, or ``RedirectPolicy.ALLOW_ALL`` to
disable redirect validation entirely (legacy behavior).

Returns:
Configured httpx.AsyncClient instance with MCP defaults.
Expand Down Expand Up @@ -94,4 +169,12 @@ def create_mcp_http_client(
if auth is not None: # pragma: no cover
kwargs["auth"] = auth

# Install redirect validation hook
if redirect_policy != RedirectPolicy.ALLOW_ALL:

async def check_redirect_hook(response: httpx.Response) -> None:
await _check_redirect(response, redirect_policy)

kwargs["event_hooks"] = {"response": [check_redirect_hook]}

return httpx.AsyncClient(**kwargs)
178 changes: 177 additions & 1 deletion tests/shared/test_httpx_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Tests for httpx utility functions."""

import httpx
import pytest

from mcp.shared._httpx_utils import create_mcp_http_client
from mcp.shared._httpx_utils import RedirectPolicy, _check_redirect, create_mcp_http_client

pytestmark = pytest.mark.anyio


def test_default_settings():
Expand All @@ -22,3 +25,176 @@ def test_custom_parameters():

assert client.headers["Authorization"] == "Bearer token"
assert client.timeout.connect == 60.0


def test_default_redirect_policy():
"""Test that the default redirect policy is BLOCK_SCHEME_DOWNGRADE."""
client = create_mcp_http_client()
# Event hooks should be installed for the default policy
assert len(client.event_hooks["response"]) == 1


def test_allow_all_policy_no_hooks():
"""Test that ALLOW_ALL does not install event hooks."""
client = create_mcp_http_client(redirect_policy=RedirectPolicy.ALLOW_ALL)
assert len(client.event_hooks["response"]) == 0


# --- _check_redirect unit tests ---


async def test_check_redirect_ignores_non_redirect():
"""Test that non-redirect responses are ignored."""
response = httpx.Response(200, request=httpx.Request("GET", "https://example.com"))
# Should not raise
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)
await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS)


async def test_check_redirect_ignores_redirect_without_location_header():
"""Test that redirect responses without a Location header are ignored."""
response = httpx.Response(
302,
request=httpx.Request("GET", "https://example.com"),
)
# No Location header → has_redirect_location is False
assert not response.has_redirect_location
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)


# --- BLOCK_SCHEME_DOWNGRADE tests ---


async def test_block_scheme_downgrade_blocks_https_to_http():
"""Test BLOCK_SCHEME_DOWNGRADE blocks HTTPS->HTTP redirect."""
response = httpx.Response(
302,
headers={"Location": "http://evil.com"},
request=httpx.Request("GET", "https://example.com"),
)

with pytest.raises(httpx.HTTPStatusError, match="HTTPS-to-HTTP redirect blocked"):
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)


async def test_block_scheme_downgrade_allows_https_to_https():
"""Test BLOCK_SCHEME_DOWNGRADE allows HTTPS->HTTPS redirect."""
response = httpx.Response(
302,
headers={"Location": "https://other.com"},
request=httpx.Request("GET", "https://example.com"),
)
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)


async def test_block_scheme_downgrade_allows_http_to_http():
"""Test BLOCK_SCHEME_DOWNGRADE allows HTTP->HTTP redirect."""
response = httpx.Response(
302,
headers={"Location": "http://other.com"},
request=httpx.Request("GET", "http://example.com"),
)
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)


async def test_block_scheme_downgrade_allows_http_to_https():
"""Test BLOCK_SCHEME_DOWNGRADE allows HTTP->HTTPS upgrade."""
response = httpx.Response(
302,
headers={"Location": "https://other.com"},
request=httpx.Request("GET", "http://example.com"),
)
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)


async def test_block_scheme_downgrade_allows_relative_redirect():
"""Test BLOCK_SCHEME_DOWNGRADE allows relative Location headers."""
response = httpx.Response(
302,
headers={"Location": "/other-path"},
request=httpx.Request("GET", "https://example.com/start"),
)
await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE)


# --- ENFORCE_HTTPS tests ---


async def test_enforce_https_blocks_http_target():
"""Test ENFORCE_HTTPS blocks any HTTP redirect target."""
response = httpx.Response(
302,
headers={"Location": "http://evil.com"},
request=httpx.Request("GET", "https://example.com"),
)

with pytest.raises(httpx.HTTPStatusError, match="Non-HTTPS redirect blocked"):
await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS)


async def test_enforce_https_blocks_http_to_http():
"""Test ENFORCE_HTTPS blocks HTTP->HTTP redirect."""
response = httpx.Response(
302,
headers={"Location": "http://other.com"},
request=httpx.Request("GET", "http://example.com"),
)

with pytest.raises(httpx.HTTPStatusError, match="Non-HTTPS redirect blocked"):
await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS)


async def test_enforce_https_allows_https_target():
"""Test ENFORCE_HTTPS allows HTTPS redirect target."""
response = httpx.Response(
302,
headers={"Location": "https://other.com"},
request=httpx.Request("GET", "https://example.com"),
)
await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS)


# --- ALLOW_ALL tests ---


async def test_allow_all_permits_https_to_http():
"""Test ALLOW_ALL permits HTTPS->HTTP redirect."""
response = httpx.Response(
302,
headers={"Location": "http://evil.com"},
request=httpx.Request("GET", "https://example.com"),
)
await _check_redirect(response, RedirectPolicy.ALLOW_ALL)


# --- Integration tests (exercise the event hook wiring end-to-end) ---


async def test_redirect_hook_blocks_scheme_downgrade_via_transport():
"""Test that the event hook installed by create_mcp_http_client blocks HTTPS->HTTP."""

def mock_handler(request: httpx.Request) -> httpx.Response:
if str(request.url) == "https://example.com/start":
return httpx.Response(302, headers={"Location": "http://evil.com/stolen"})
return httpx.Response(200, text="OK") # pragma: no cover

async with create_mcp_http_client() as client:
client._transport = httpx.MockTransport(mock_handler)

with pytest.raises(httpx.HTTPStatusError, match="HTTPS-to-HTTP redirect blocked"):
await client.get("https://example.com/start")


async def test_redirect_hook_allows_safe_redirect_via_transport():
"""Test that the event hook allows HTTPS->HTTPS redirects through the client."""

def mock_handler(request: httpx.Request) -> httpx.Response:
if str(request.url) == "https://example.com/start":
return httpx.Response(302, headers={"Location": "https://example.com/final"})
return httpx.Response(200, text="OK")

async with create_mcp_http_client() as client:
client._transport = httpx.MockTransport(mock_handler)

response = await client.get("https://example.com/start")
assert response.status_code == 200
Loading