Skip to content

Commit 1a2c152

Browse files
committed
implement a Kafka loader
1 parent 7c66375 commit 1a2c152

File tree

9 files changed

+3156
-17
lines changed

9 files changed

+3156
-17
lines changed

.test.env

Whitespace-only changes.

notebooks/kafka_streaming.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import marimo
2+
3+
__generated_with = '0.17.0'
4+
app = marimo.App(width='medium')
5+
6+
7+
@app.cell
8+
def _():
9+
import marimo as mo
10+
11+
from amp.client import Client
12+
13+
return Client, mo
14+
15+
16+
@app.cell(hide_code=True)
17+
def _(mo):
18+
mo.md(
19+
r"""
20+
# Kafka Streaming Example
21+
22+
This notebook demonstrates continuous streaming from Flight SQL to Kafka with reorg detection.
23+
"""
24+
)
25+
return
26+
27+
28+
@app.cell(hide_code=True)
29+
def _(mo):
30+
mo.md(r"""## Setup""")
31+
return
32+
33+
34+
@app.cell
35+
def _(Client):
36+
client = Client('grpc://127.0.0.1:1602')
37+
return (client,)
38+
39+
40+
@app.cell
41+
def _(client):
42+
client.configure_connection(
43+
'my_kafka',
44+
'kafka',
45+
{'bootstrap_servers': 'localhost:9092', 'client_id': 'amp-streaming-client', 'key_field': 'block_num'},
46+
)
47+
return
48+
49+
50+
@app.cell(hide_code=True)
51+
def _(mo):
52+
mo.md(
53+
r"""
54+
## Streaming Query
55+
56+
This query uses `SETTINGS stream = true` to continuously stream new blocks as they arrive.
57+
The loader will automatically handle blockchain reorganizations.
58+
"""
59+
)
60+
return
61+
62+
63+
@app.cell
64+
def _(client):
65+
streaming_results = client.sql(
66+
"""
67+
SELECT
68+
block_num,
69+
log_index
70+
FROM anvil.logs
71+
"""
72+
).load(
73+
'my_kafka',
74+
'eth_logs_stream',
75+
stream=True,
76+
create_table=True,
77+
)
78+
return (streaming_results,)
79+
80+
81+
@app.cell(hide_code=True)
82+
def _(mo):
83+
mo.md(
84+
r"""
85+
## Monitor Stream
86+
87+
This cell will continuously print results as they arrive. It starts a Kafka consumer to print
88+
the results as they come in.
89+
"""
90+
)
91+
return
92+
93+
94+
@app.cell
95+
def _(streaming_results):
96+
import json
97+
import threading
98+
99+
from kafka import KafkaConsumer
100+
101+
def consume_kafka():
102+
consumer = KafkaConsumer(
103+
'eth_logs_stream',
104+
bootstrap_servers='localhost:9092',
105+
auto_offset_reset='latest',
106+
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
107+
)
108+
print('Kafka Consumer started')
109+
for message in consumer:
110+
print(f'Consumed: {message.value}')
111+
112+
consumer_thread = threading.Thread(target=consume_kafka, daemon=True)
113+
consumer_thread.start()
114+
115+
print('Kafka Producer started')
116+
for result in streaming_results:
117+
print(f'Produced: {result}')
118+
return
119+
120+
121+
if __name__ == '__main__':
122+
app.run()

notebooks/test_loaders.py

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import marimo
22

3-
__generated_with = "0.14.16"
3+
__generated_with = "0.17.0"
44
app = marimo.App(width="full")
55

66

@@ -89,15 +89,15 @@ def _(mo):
8989
create_table=True,
9090
)
9191
""",
92-
name='_',
92+
name="_"
9393
)
9494

9595

9696
@app.cell
9797
def _(psql_load_results):
9898
for p_result in psql_load_results:
9999
print(p_result)
100-
return (p_result,)
100+
return
101101

102102

103103
@app.cell(hide_code=True)
@@ -120,7 +120,7 @@ def _(client):
120120
def _(redis_load_results):
121121
for r_result in redis_load_results:
122122
print(r_result)
123-
return (r_result,)
123+
return
124124

125125

126126
@app.cell(hide_code=True)
@@ -149,7 +149,7 @@ def _(client):
149149
else:
150150
# Single result
151151
print(f'Total: {result.rows_loaded} rows')
152-
return batch_result, result
152+
return (batch_result,)
153153

154154

155155
@app.cell
@@ -291,7 +291,7 @@ def _(lmdb_load_result):
291291
def _(batch_result, lmdb_load_result):
292292
for lmdb_batch_result in lmdb_load_result:
293293
print(f'Batch: {batch_result.rows_loaded} rows')
294-
return (lmdb_batch_result,)
294+
return
295295

296296

297297
@app.cell
@@ -325,7 +325,7 @@ def _(env):
325325
myList = [ key for key, _ in txn.cursor() ]
326326
print(myList)
327327
print(len(myList))
328-
return myList, txn
328+
return
329329

330330

331331
@app.cell
@@ -340,7 +340,74 @@ def _(env, pa):
340340
batch = reader.read_next_batch()
341341

342342
print(batch)
343-
return batch, key, open_txn, reader, value
343+
return
344+
345+
346+
@app.cell(hide_code=True)
347+
def _(mo):
348+
mo.md(r"""# Kafka""")
349+
return
350+
351+
352+
@app.cell
353+
def _(client):
354+
# Configure Kafka connection
355+
client.configure_connection(
356+
'my_kafka',
357+
'kafka',
358+
{
359+
'bootstrap_servers': 'localhost:9092',
360+
'client_id': 'amp-test-client',
361+
'key_field': 'id'
362+
}
363+
)
364+
return
365+
366+
367+
@app.cell
368+
def _(client):
369+
# Load data to Kafka topic
370+
kafka_load_results = client.sql('select * from eth_firehose.logs limit 100').load(
371+
'my_kafka',
372+
'test_logs',
373+
create_table=True,
374+
)
375+
return (kafka_load_results,)
376+
377+
378+
@app.cell
379+
def _(kafka_load_results):
380+
# Check results
381+
for k_result in kafka_load_results:
382+
print(f'Kafka batch: {k_result.rows_loaded} rows loaded, duration: {k_result.duration:.2f}s')
383+
return (k_result,)
384+
385+
386+
@app.cell
387+
def _():
388+
from kafka import KafkaConsumer
389+
import json
390+
391+
consumer = KafkaConsumer(
392+
'test_logs',
393+
bootstrap_servers='localhost:9092',
394+
auto_offset_reset='earliest',
395+
consumer_timeout_ms=3000,
396+
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
397+
)
398+
399+
messages = list(consumer)
400+
consumer.close()
401+
402+
print(f"Total messages in Kafka: {len(messages)}")
403+
print(f"\nFirst message:")
404+
if messages:
405+
msg = messages[0].value
406+
print(f" Block: {msg.get('block_num')}")
407+
print(f" Timestamp: {msg.get('timestamp')}")
408+
print(f" Address: {msg.get('address')}")
409+
410+
return
344411

345412

346413
@app.cell

pyproject.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,20 @@ lmdb = [
6262
"lmdb>=1.4.0",
6363
]
6464

65+
kafka = [
66+
"kafka-python>=2.2.15",
67+
]
68+
6569
all_loaders = [
66-
"psycopg2-binary>=2.9.0", # PostgreSQL
67-
"redis>=4.5.0", # Redis
68-
"deltalake>=1.0.2", # Delta Lake (consistent version)
70+
"psycopg2-binary>=2.9.0", # PostgreSQL
71+
"redis>=4.5.0", # Redis
72+
"deltalake>=1.0.2", # Delta Lake (consistent version)
6973
"pyiceberg[sql-sqlite]>=0.10.0", # Apache Iceberg
7074
"pydantic>=2.0,<2.12", # PyIceberg 0.10.0 compatibility
7175
"snowflake-connector-python>=4.0.0", # Snowflake
7276
"snowpipe-streaming>=1.0.0", # Snowpipe Streaming API
7377
"lmdb>=1.4.0", # LMDB
78+
"kafka-python>=2.2.15",
7479
]
7580

7681
test = [

src/amp/loaders/implementations/__init__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
try:
2323
from .iceberg_loader import IcebergLoader
24-
except ImportError:
24+
except Exception:
2525
IcebergLoader = None
2626

2727
try:
@@ -34,11 +34,10 @@
3434
except ImportError:
3535
LMDBLoader = None
3636

37-
# Add any other loaders here
38-
# try:
39-
# from .snowflake_loader import SnowflakeLoader
40-
# except ImportError:
41-
# SnowflakeLoader = None
37+
try:
38+
from .kafka_loader import KafkaLoader
39+
except ImportError:
40+
KafkaLoader = None
4241

4342
__all__ = []
4443

@@ -55,3 +54,5 @@
5554
__all__.append('SnowflakeLoader')
5655
if LMDBLoader:
5756
__all__.append('LMDBLoader')
57+
if KafkaLoader:
58+
__all__.append('KafkaLoader')

0 commit comments

Comments
 (0)