feat(audit): add UserOp archiver to connect Kafka reader to S3 storage#117
feat(audit): add UserOp archiver to connect Kafka reader to S3 storage#117DeStefaniAndrei wants to merge 6 commits intomasterfrom
Conversation
- Add KafkaUserOpAuditArchiver struct - Implements read → write → commit loop for UserOp events - Add integration test (ignored due to MinIO etag limitation)
crates/audit/src/archiver.rs
Outdated
| .unwrap_or_default() | ||
| .as_millis() as i64; | ||
| let event_age_ms = now_ms.saturating_sub(event.timestamp); | ||
| self.metrics.event_age.record(event_age_ms as f64); |
There was a problem hiding this comment.
WDYT of having the audit metrics separated by tags?
Similar to how we record metrics for RPC calls here: https://github.com/base/tips/blob/master/crates/ingress-rpc/src/metrics.rs#L5-L8 , and instead we can call it type (or a better name 😅)
There was a problem hiding this comment.
Hope you had a great Christmas William. I added a type tag ("bundle" / "userop")
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
wlawt
left a comment
There was a problem hiding this comment.
re: our convo from Friday
also, i'm not sure if there's a scenario where you might need to deduplicate user_op hashes because of spam, but this was something i also added: https://github.com/base/tips/blob/master/crates/ingress-rpc/src/service.rs#L115-L118
|
|
||
| let writer = self.writer.clone(); | ||
| increment_in_flight_archive_tasks(EventType::UserOp); | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
this is one of the changes i needed to make for the KafkaAuditArchiver https://github.com/base/tips/blob/master/crates/audit/src/archiver.rs#L143 to avoid spawning unbounded number of tokio routines
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
Generic struct over
UserOpEventReaderandUserOpEventWritertraitsRuns an infinite loop that:
End-to-End Integration Tests (
integration_tests.rs)Added 7 end-to-end tests using Docker (Kafka + MinIO) covering single events, multiple events, deduplication, all event types, and full lifecycle.