Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 98 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@

## Overview

Python client for Amp - a high-performance data infrastructure for blockchain data.
Python client for Amp - a database for blockchain data.

**Features:**
- **Query Client**: Issue Flight SQL queries to Amp servers
- **Admin Client**: Manage datasets, deployments, and jobs programmatically
- **Registry Client**: Discover, search, and publish datasets to the Registry
- **Dataset Inspection**: Explore dataset schemas with `inspect()` and `describe()` methods
- **Data Loaders**: Zero-copy loading into PostgreSQL, Redis, Snowflake, Delta Lake, Iceberg, and more
- **Parallel Streaming**: High-throughput parallel data ingestion with automatic resume
- **Manifest Generation**: Fluent API for creating and deploying datasets from SQL queries
- **Auto-Refreshing Auth**: Seamless authentication with automatic token refresh

## Dependencies
1. Rust
Expand Down Expand Up @@ -45,7 +48,7 @@ from amp import Client
client = Client(url="grpc://localhost:8815")

# Execute query and convert to pandas
df = client.query("SELECT * FROM eth.blocks LIMIT 10").to_pandas()
df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_arrow().to_pandas()
print(df)
```

Expand All @@ -63,7 +66,7 @@ client = Client(

# Register and deploy a dataset
job = (
client.query("SELECT block_num, hash FROM eth.blocks")
client.sql("SELECT block_num, hash FROM eth.blocks")
.with_dependency('eth', '_/[email protected]')
.register_as('_', 'my_dataset', '1.0.0', 'blocks', 'mainnet')
.deploy(parallelism=4, end_block='latest', wait=True)
Expand All @@ -76,12 +79,97 @@ print(f"Deployment completed: {job.status}")

```python
# Load query results into PostgreSQL
loader = client.query("SELECT * FROM eth.blocks").load(
loader_type='postgresql',
result = client.sql("SELECT * FROM eth.blocks").load(
connection='my_pg_connection',
table_name='eth_blocks'
destination='eth_blocks'
)
print(f"Loaded {loader.rows_written} rows")
print(f"Loaded {result.rows_loaded} rows")
```

### Authentication

The client supports three authentication methods (in priority order):

```python
from amp import Client

# 1. Explicit token (highest priority)
client = Client(
url="grpc://localhost:8815",
auth_token="your-token"
)

# 2. Environment variable
# export AMP_AUTH_TOKEN="your-token"
client = Client(url="grpc://localhost:8815")

# 3. Shared auth file (auto-refresh, recommended)
# Uses ~/.amp/cache/amp_cli_auth (shared with TypeScript CLI)
client = Client(
url="grpc://localhost:8815",
auth=True # Automatically refreshes expired tokens
)
```

### Registry - Discovering Datasets

```python
from amp import Client

# Connect with registry support
client = Client(
query_url="grpc://localhost:8815",
registry_url="https://api.registry.amp.staging.thegraph.com",
auth=True
)

# Search for datasets
results = client.registry.datasets.search('ethereum blocks')
for dataset in results.datasets[:5]:
print(f"{dataset.namespace}/{dataset.name} - {dataset.description}")

# Get dataset details
dataset = client.registry.datasets.get('edgeandnode', 'ethereum-mainnet')
print(f"Latest version: {dataset.latest_version}")

# Inspect dataset schema
client.registry.datasets.inspect('edgeandnode', 'ethereum-mainnet')
```

### Dataset Inspection

Explore dataset schemas before querying:

```python
from amp.registry import RegistryClient

client = RegistryClient()

# Pretty-print dataset structure (interactive)
client.datasets.inspect('edgeandnode', 'ethereum-mainnet')
# Output:
# Dataset: edgeandnode/ethereum-mainnet@latest
#
# blocks (21 columns)
# block_num UInt64 NOT NULL
# timestamp Timestamp(Nanosecond) NOT NULL
# hash FixedSizeBinary(32) NOT NULL
# ...

# Get structured schema data (programmatic)
schema = client.datasets.describe('edgeandnode', 'ethereum-mainnet')

# Find tables with specific columns
for table_name, columns in schema.items():
col_names = [col['name'] for col in columns]
if 'block_num' in col_names:
print(f"Table '{table_name}' has block_num column")

# Find all address columns (20-byte binary)
for table_name, columns in schema.items():
addresses = [col['name'] for col in columns if col['type'] == 'FixedSizeBinary(20)']
if addresses:
print(f"{table_name}: {', '.join(addresses)}")
```

## Usage
Expand All @@ -108,7 +196,9 @@ uv run apps/execute_query.py

### Getting Started
- **[Admin Client Guide](docs/admin_client_guide.md)** - Complete guide for dataset management and deployment
- **[Admin API Reference](docs/api/admin_api.md)** - Full API documentation for admin operations
- **[Registry Guide](docs/registry-guide.md)** - Discover and search datasets in the Registry
- **[Dataset Inspection](docs/inspecting_datasets.md)** - Explore dataset schemas with `inspect()` and `describe()`
- **[Admin API Reference](docs/api/client_api.md)** - Full API documentation for admin operations

### Features
- **[Parallel Streaming Usage Guide](docs/parallel_streaming_usage.md)** - User guide for high-throughput parallel data loading
Expand Down
19 changes: 9 additions & 10 deletions docs/admin_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ client = Client(
)

# Query operations (Flight SQL)
df = client.query("SELECT * FROM eth.blocks LIMIT 10").to_pandas()
df = client.sql("SELECT * FROM eth.blocks LIMIT 10").to_pandas()

# Admin operations (HTTP API)
datasets = client.datasets.list_all()
Expand Down Expand Up @@ -170,7 +170,7 @@ The legacy `url` parameter still works for Flight SQL:
```python
# This still works
client = Client(url="grpc://localhost:8815")
client.query("SELECT * FROM eth.blocks")
client.sql("SELECT * FROM eth.blocks")
```

### Environment Variables
Expand Down Expand Up @@ -376,7 +376,7 @@ The QueryBuilder provides a fluent API for generating manifests from SQL queries

```python
# Build a query
query = client.query("SELECT block_num, hash FROM eth.blocks")
query = client.sql("SELECT block_num, hash FROM eth.blocks")

# Add dependencies
query = query.with_dependency('eth', '_/[email protected]')
Expand Down Expand Up @@ -409,7 +409,7 @@ The most powerful pattern combines query building, manifest generation, registra
```python
# Build, register, and deploy in one chain
job = (
client.query("SELECT block_num, hash FROM eth.blocks")
client.sql("SELECT block_num, hash FROM eth.blocks")
.with_dependency('eth', '_/[email protected]')
.register_as(
namespace='_',
Expand All @@ -432,7 +432,7 @@ print(f"Deployment completed: {job.status}")

```python
manifest = (
client.query("""
client.sql("""
SELECT
t.token_address,
t.amount,
Expand All @@ -453,8 +453,7 @@ manifest = (

```python
# 1. Develop query locally
# REVIEW: IS THIS CORRECT??
query = client.query("""
query = client.sql("""
SELECT
block_num,
COUNT(*) as tx_count
Expand Down Expand Up @@ -506,7 +505,7 @@ if job.status == 'Completed':
```python
# Register production version
context = (
client.query("SELECT * FROM processed_data")
client.sql("SELECT * FROM processed_data")
.with_dependency('raw', '_/[email protected]')
.register_as('_', 'processed_data', '2.0.0', 'data', 'mainnet')
)
Expand Down Expand Up @@ -691,7 +690,7 @@ thread.start()
```python
# Always specify full dependency references
query = (
client.query("SELECT * FROM base.data")
client.sql("SELECT * FROM base.data")
.with_dependency('base', '_/[email protected]') # Include version!
)

Expand All @@ -700,6 +699,6 @@ query = (

## Next Steps

- See [API Reference](api/admin_api.md) for complete API documentation
- See [API Reference](api/client_api.md) for complete API documentation
- Check [examples/admin/](../examples/admin/) for more code samples
- Review the [Admin API OpenAPI spec](../specs/admin.spec.json) for endpoint details
16 changes: 8 additions & 8 deletions docs/api/admin_api.md → docs/api/client_api.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Admin API Reference
# Client API Reference

Complete API reference for the Amp Admin Client.

Expand Down Expand Up @@ -90,7 +90,7 @@ Access the SchemaClient for schema operations.

#### Methods

##### `query(sql: str) -> QueryBuilder`
##### `sql(sql: str) -> QueryBuilder`

Create a QueryBuilder for the given SQL query.

Expand All @@ -103,7 +103,7 @@ Create a QueryBuilder for the given SQL query.
**Example:**

```python
qb = client.query("SELECT * FROM eth.blocks LIMIT 10")
qb = client.sql("SELECT * FROM eth.blocks LIMIT 10")
df = qb.to_pandas()
```

Expand Down Expand Up @@ -832,7 +832,7 @@ with_dependency(alias: str, reference: str) -> QueryBuilder

```python
qb = (
client.query("SELECT * FROM eth.blocks")
client.sql("SELECT * FROM eth.blocks")
.with_dependency('eth', '_/[email protected]')
)
```
Expand All @@ -856,7 +856,7 @@ to_manifest(table_name: str, network: str = 'mainnet') -> dict

```python
manifest = (
client.query("SELECT * FROM eth.blocks")
client.sql("SELECT * FROM eth.blocks")
.with_dependency('eth', '_/[email protected]')
.to_manifest('blocks', 'mainnet')
)
Expand Down Expand Up @@ -890,7 +890,7 @@ register_as(

```python
job = (
client.query("SELECT * FROM eth.blocks")
client.sql("SELECT * FROM eth.blocks")
.with_dependency('eth', '_/[email protected]')
.register_as('_', 'my_dataset', '1.0.0', 'blocks')
.deploy(parallelism=4, wait=True)
Expand Down Expand Up @@ -937,7 +937,7 @@ deploy(

```python
# Deploy and return immediately
context = client.query(...).register_as(...)
context = client.sql(...).register_as(...)
job = context.deploy(parallelism=4)
print(f"Started job {job.id}")

Expand Down Expand Up @@ -965,7 +965,7 @@ client = Client(

try:
# Build and test query
query = client.query("""
query = client.sql("""
SELECT block_num, hash, timestamp
FROM eth.blocks
WHERE block_num > 1000000
Expand Down
4 changes: 2 additions & 2 deletions docs/inspecting_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ for ds in results.datasets[:5]:

# Step 2: Inspect a dataset
print("\nInspecting dataset structure:")
registry.datasets.inspect('graphops', 'ethereum-mainnet')
registry.datasets.inspect('edgeandnode', 'ethereum-mainnet')

# Step 3: Get schema programmatically
schema = registry.datasets.describe('graphops', 'ethereum-mainnet')
schema = registry.datasets.describe('edgeandnode', 'ethereum-mainnet')

# Step 4: Query based on discovered schema
client = Client(query_url='grpc://your-server:1602', auth=True)
Expand Down
20 changes: 10 additions & 10 deletions docs/registry-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ client = Client(
query_url='grpc://localhost:1602', # Flight SQL queries
admin_url='http://localhost:8080', # Admin operations
registry_url='https://api.registry.amp.staging.thegraph.com', # Registry (default)
auth=True # Use ~/.amp-cli-config for authentication
auth=True # Use ~/.amp/cache/amp_cli_auth for authentication
)

# Search registry
Expand All @@ -43,7 +43,7 @@ manifest = client.registry.datasets.get_manifest(
dataset.latest_version.version_tag
)

client.admin.datasets.register(
client.datasets.register(
namespace=dataset.namespace,
name=dataset.name,
revision=dataset.latest_version.version_tag,
Expand Down Expand Up @@ -165,7 +165,7 @@ print(f'Dependencies: {list(manifest.get("dependencies", {}).keys())}')
Publishing requires authentication. Set up your auth token:

```python
# Option 1: Use existing auth from ~/.amp-cli-config
# Option 1: Use existing auth from ~/.amp/cache/amp_cli_auth
from amp import Client
client = Client(auth=True)

Expand Down Expand Up @@ -328,21 +328,21 @@ manifest = client.registry.datasets.get_manifest(

# 4. Deploy dependency to local node
print(f'Deploying {dataset.namespace}/{dataset.name}...')
client.admin.datasets.register(
client.datasets.register(
namespace=dataset.namespace,
name=dataset.name,
revision=full_dataset.latest_version.version_tag,
manifest=manifest
)

deploy_response = client.admin.datasets.deploy(
deploy_response = client.datasets.deploy(
dataset.namespace,
dataset.name,
full_dataset.latest_version.version_tag
)

# Wait for deployment
client.admin.jobs.wait_for_completion(deploy_response.job_id)
client.jobs.wait_for_completion(deploy_response.job_id)
print('Dependency deployed!')

# 5. Create derived dataset
Expand Down Expand Up @@ -371,15 +371,15 @@ derived_manifest = {
}

# 6. Deploy derived dataset
client.admin.datasets.register(
client.datasets.register(
namespace='_',
name='my_sample',
revision='1.0.0',
manifest=derived_manifest
)

deploy_response = client.admin.datasets.deploy('_', 'my_sample', '1.0.0')
client.admin.jobs.wait_for_completion(deploy_response.job_id)
deploy_response = client.datasets.deploy('_', 'my_sample', '1.0.0')
client.jobs.wait_for_completion(deploy_response.job_id)
print('Derived dataset deployed!')

# 7. Query the data
Expand Down Expand Up @@ -480,6 +480,6 @@ registry = RegistryClient(

The Registry client uses the same authentication as the Admin API:

1. Interactive login: `~/.amp-cli-config`
1. Interactive login: `~/.amp/cache/amp_cli_auth`
2. Direct token: Pass `auth_token='your-token'`
3. Unified client: Set `auth=True` to use saved credentials
Loading
Loading