Skip to content
4 changes: 3 additions & 1 deletion docs/input.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Some model APIs do not support file URLs at all or for specific file types. In t
- [`OpenAIResponsesModel`][pydantic_ai.models.openai.OpenAIResponsesModel]: All URLs
- [`AnthropicModel`][pydantic_ai.models.anthropic.AnthropicModel]: `DocumentUrl` with media type `text/plain`
- [`GoogleModel`][pydantic_ai.models.google.GoogleModel] using GLA (Gemini Developer API): All URLs except YouTube video URLs and files uploaded to the [Files API](https://ai.google.dev/gemini-api/docs/files).
- [`BedrockConverseModel`][pydantic_ai.models.bedrock.BedrockConverseModel]: All URLs
- [`BedrockConverseModel`][pydantic_ai.models.bedrock.BedrockConverseModel]: All URLs except S3 URLs, specifically starting with `s3://`.

If the model API supports file URLs but may not be able to download a file because of crawling or access restrictions, you can instruct Pydantic AI to download the file content and send that instead of the URL by enabling the `force_download` flag on the URL object. For example, [`GoogleModel`][pydantic_ai.models.google.GoogleModel] on Vertex AI limits YouTube video URLs to one URL per request.

Expand All @@ -138,3 +138,5 @@ result = agent.run_sync(
)
print(result.output)
```

`BedrockConverseModel` supports `s3://<bucket-name>/<object-key>` URIs, provided that the assumed role has the `s3:GetObject` permission. An optional `bucketOwner` query parameter must be specified if the bucket is not owned by the account making the request. For example: `s3://my-bucket/my-file.png?bucketOwner=123456789012`.
2 changes: 2 additions & 0 deletions pydantic_ai_slim/pydantic_ai/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,8 @@ async def download_item(
"""
if item.url.startswith('gs://'):
raise UserError('Downloading from protocol "gs://" is not supported.')
elif item.url.startswith('s3://'):
raise UserError('Downloading from protocol "s3://" is not supported.')
elif isinstance(item, VideoUrl) and item.is_youtube:
raise UserError('Downloading YouTube videos is not supported.')

Expand Down
22 changes: 17 additions & 5 deletions pydantic_ai_slim/pydantic_ai/models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime
from itertools import count
from typing import TYPE_CHECKING, Any, Generic, Literal, cast, overload
from urllib.parse import parse_qs, urlparse

import anyio.to_thread
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -62,13 +63,15 @@
ConverseStreamResponseTypeDef,
CountTokensRequestTypeDef,
DocumentBlockTypeDef,
DocumentSourceTypeDef,
GuardrailConfigurationTypeDef,
ImageBlockTypeDef,
InferenceConfigurationTypeDef,
MessageUnionTypeDef,
PerformanceConfigurationTypeDef,
PromptVariableValuesTypeDef,
ReasoningContentBlockOutputTypeDef,
S3LocationTypeDef,
SystemContentBlockTypeDef,
ToolChoiceTypeDef,
ToolConfigurationTypeDef,
Expand Down Expand Up @@ -733,20 +736,29 @@ async def _map_user_prompt( # noqa: C901
else:
raise NotImplementedError('Binary content is not supported yet.')
elif isinstance(item, ImageUrl | DocumentUrl | VideoUrl):
downloaded_item = await download_item(item, data_format='bytes', type_format='extension')
format = downloaded_item['data_type']
source: DocumentSourceTypeDef
if item.url.startswith('s3://'):
parsed = urlparse(item.url)
s3_location: S3LocationTypeDef = {'uri': f'{parsed.scheme}://{parsed.netloc}{parsed.path}'}
if bucket_owner := parse_qs(parsed.query).get('bucketOwner', [None])[0]:
s3_location['bucketOwner'] = bucket_owner
source = {'s3Location': s3_location}
else:
downloaded_item = await download_item(item, data_format='bytes', type_format='extension')
source = {'bytes': downloaded_item['data']}

if item.kind == 'image-url':
format = item.media_type.split('/')[1]
assert format in ('jpeg', 'png', 'gif', 'webp'), f'Unsupported image format: {format}'
image: ImageBlockTypeDef = {'format': format, 'source': {'bytes': downloaded_item['data']}}
image: ImageBlockTypeDef = {'format': format, 'source': source}
content.append({'image': image})

elif item.kind == 'document-url':
name = f'Document {next(document_count)}'
document: DocumentBlockTypeDef = {
'name': name,
'format': item.format,
'source': {'bytes': downloaded_item['data']},
'source': source,
}
content.append({'document': document})

Expand All @@ -763,7 +775,7 @@ async def _map_user_prompt( # noqa: C901
'wmv',
'three_gp',
), f'Unsupported video format: {format}'
video: VideoBlockTypeDef = {'format': format, 'source': {'bytes': downloaded_item['data']}}
video: VideoBlockTypeDef = {'format': format, 'source': source}
content.append({'video': video})
elif isinstance(item, AudioUrl): # pragma: no cover
raise NotImplementedError('Audio is not supported yet.')
Expand Down
122 changes: 122 additions & 0 deletions tests/models/test_bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,128 @@ async def test_text_document_url_input(allow_model_requests: None, bedrock_provi
)


async def test_s3_image_url_input(bedrock_provider: BedrockProvider):
"""Test that s3:// image URLs are passed directly to Bedrock API without downloading."""
model = BedrockConverseModel('us.amazon.nova-pro-v1:0', provider=bedrock_provider)
image_url = ImageUrl(url='s3://my-bucket/images/test-image.jpg', media_type='image/jpeg')

req = [
ModelRequest(parts=[UserPromptPart(content=['What is in this image?', image_url])]),
]

_, bedrock_messages = await model._map_messages(req, ModelRequestParameters(), None) # type: ignore[reportPrivateUsage]

assert bedrock_messages == snapshot(
[
{
'role': 'user',
'content': [
{'text': 'What is in this image?'},
{
'image': {
'format': 'jpeg',
'source': {'s3Location': {'uri': 's3://my-bucket/images/test-image.jpg'}},
}
},
],
}
]
)


async def test_s3_video_url_input(bedrock_provider: BedrockProvider):
"""Test that s3:// video URLs are passed directly to Bedrock API."""
model = BedrockConverseModel('us.amazon.nova-pro-v1:0', provider=bedrock_provider)
video_url = VideoUrl(url='s3://my-bucket/videos/test-video.mp4', media_type='video/mp4')

req = [
ModelRequest(parts=[UserPromptPart(content=['Describe this video', video_url])]),
]

_, bedrock_messages = await model._map_messages(req, ModelRequestParameters(), None) # type: ignore[reportPrivateUsage]

assert bedrock_messages == snapshot(
[
{
'role': 'user',
'content': [
{'text': 'Describe this video'},
{
'video': {
'format': 'mp4',
'source': {'s3Location': {'uri': 's3://my-bucket/videos/test-video.mp4'}},
}
},
],
}
]
)


async def test_s3_document_url_input(bedrock_provider: BedrockProvider):
"""Test that s3:// document URLs are passed directly to Bedrock API."""
model = BedrockConverseModel('anthropic.claude-v2', provider=bedrock_provider)
document_url = DocumentUrl(url='s3://my-bucket/documents/test-doc.pdf', media_type='application/pdf')

req = [
ModelRequest(parts=[UserPromptPart(content=['What is the main content on this document?', document_url])]),
]

_, bedrock_messages = await model._map_messages(req, ModelRequestParameters(), None) # type: ignore[reportPrivateUsage]

assert bedrock_messages == snapshot(
[
{
'role': 'user',
'content': [
{'text': 'What is the main content on this document?'},
{
'document': {
'format': 'pdf',
'name': 'Document 1',
'source': {'s3Location': {'uri': 's3://my-bucket/documents/test-doc.pdf'}},
}
},
],
}
]
)


async def test_s3_url_with_bucket_owner(bedrock_provider: BedrockProvider):
"""Test that s3:// URLs with bucketOwner parameter are parsed correctly."""
model = BedrockConverseModel('us.amazon.nova-pro-v1:0', provider=bedrock_provider)
image_url = ImageUrl(url='s3://my-bucket/images/test-image.jpg?bucketOwner=123456789012', media_type='image/jpeg')

req = [
ModelRequest(parts=[UserPromptPart(content=['What is in this image?', image_url])]),
]

_, bedrock_messages = await model._map_messages(req, ModelRequestParameters(), None) # type: ignore[reportPrivateUsage]

assert bedrock_messages == snapshot(
[
{
'role': 'user',
'content': [
{'text': 'What is in this image?'},
{
'image': {
'format': 'jpeg',
'source': {
's3Location': {
'uri': 's3://my-bucket/images/test-image.jpg',
'bucketOwner': '123456789012',
}
},
}
},
],
}
]
)


@pytest.mark.vcr()
async def test_text_as_binary_content_input(allow_model_requests: None, bedrock_provider: BedrockProvider):
m = BedrockConverseModel('us.amazon.nova-pro-v1:0', provider=bedrock_provider)
Expand Down
16 changes: 16 additions & 0 deletions tests/models/test_download_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ async def test_download_item_raises_user_error_with_gs_uri(
_ = await download_item(url, data_format='bytes')


@pytest.mark.parametrize(
'url',
(
pytest.param(AudioUrl(url='s3://my-bucket/audio.wav')),
pytest.param(DocumentUrl(url='s3://my-bucket/document.pdf')),
pytest.param(ImageUrl(url='s3://my-bucket/image.png')),
pytest.param(VideoUrl(url='s3://my-bucket/video.mp4')),
),
)
async def test_download_item_raises_user_error_with_s3_uri(
url: AudioUrl | DocumentUrl | ImageUrl | VideoUrl,
) -> None:
with pytest.raises(UserError, match='Downloading from protocol "s3://" is not supported.'):
_ = await download_item(url, data_format='bytes')


async def test_download_item_raises_user_error_with_youtube_url() -> None:
with pytest.raises(UserError, match='Downloading YouTube videos is not supported.'):
_ = await download_item(VideoUrl(url='https://youtu.be/lCdaVNyHtjU'), data_format='bytes')
Expand Down