Conversation
Define the directory layout, metadata.json schema, and per-entity Parquet file format for dumping and restoring subgraph data.
Add `OidValue::Int4Range(Bound<i32>, Bound<i32>)` variant to properly deserialize PostgreSQL int4range columns (OID 3904). Update `select_cols` in dsl.rs to bind block_range as `Range<Integer>` instead of using the Bytes placeholder, resolving the existing TODO.
Add block$ (immutable), block_range (mutable), and causality_region to the column lookup so callers that need these columns by name can find them. Also add CAUSALITY_REGION_COL pseudo-column static and tighten with_nsp visibility to pub(crate).
Create the foundation for Parquet-based dump/restore: a schema mapping module that converts relational Table definitions to Arrow Schema objects. - arrow_schema() maps Table -> Arrow Schema with system columns (vid, block tracking, causality_region) followed by data columns - data_sources_arrow_schema() provides fixed schema for data_sources$ table - All ColumnType variants mapped (TSVector skipped, Enum -> Utf8) - List columns wrapped in Arrow List type based on Column.is_list()
Implement rows_to_record_batch() which converts Vec<DynamicRow<OidValue>> into Arrow RecordBatch objects. Uses a type-erased ColumnBuilder enum that dispatches on Arrow DataType to create the appropriate array builder. Supports all OidValue scalar variants (Bool, Int, Int8, Bytes, String, BigDecimal, Timestamp) and all array variants (BoolArray, Ints, Int8Array, BytesArray, StringArray, BigDecimalArray, TimestampArray). Block range columns arrive as separate Int32 values from the dump query, keeping the converter as a clean 1:1 mapping.
Add ParquetChunkWriter that wraps Arrow's ArrowWriter to stream RecordBatches into a ZSTD-compressed Parquet file while tracking row count and vid range. Returns ChunkInfo metadata on finish for inclusion in metadata.json.
Wire dump queries and Parquet writing into Layout::dump(). This is the core of the dump feature, covering both entity tables and the special data_sources$ table. For entity tables: build a DynamicSelectClause that selects vid, block columns (split into lower/upper for mutable tables), causality_region, and data columns. Use VidBatcher for adaptive batching, convert rows to Arrow RecordBatch, and write via ParquetChunkWriter. For data_sources$: use a concrete QueryableByName struct with raw SQL (fixed schema, no DynamicSelectClause needed). Check table existence via catalog::table_exists before attempting dump. Metadata includes version, network, block pointers, entity count, graft info, health, indexes, and per-table chunk tracking. Written atomically via tmp+rename so its presence signals a complete dump. Add entity_count() helper in detail.rs. Wire dump() through DeploymentStore and SubgraphStore.
Wire the dump subcommand into graphman CLI. Takes a deployment search argument and an output directory, resolves the deployment, and delegates to SubgraphStore::dump().
Add `parquet/reader.rs` with `read_batches()` to read Parquet files back into Arrow RecordBatches. This is the foundation for the restore pipeline (step 1 of the restore plan).
Add `RestoreRow`, `DataSourceRestoreRow` structs and conversion functions `record_batch_to_restore_rows()` and `record_batch_to_data_source_rows()` that convert Arrow RecordBatches back into typed data suitable for database insertion. This is step 2 of the restore plan.
Add a new constructor to InsertQuery that accepts RestoreRow data from Parquet files, bypassing the WriteChunk/EntityWrite pipeline. This is the insertion foundation for the restore path. Also add From<i32> impl for CausalityRegion to allow constructing values from deserialized Parquet data.
Add a method to read and validate metadata.json from a dump directory. Make fields of Metadata, Manifest, BlockPtr, Health, and Error structs pub(crate) so they are accessible from the restore module.
Add import_data() to the restore module that reads Parquet chunks and inserts entity data into PostgreSQL tables. Supports resumability by checking max(vid) in each table and skipping already-imported rows. Entity tables use InsertQuery::for_restore() for efficient batch inserts. The data_sources$ table uses raw SQL with bind parameters since it has a fixed schema outside the Layout.
Wire up the restore pipeline through SubgraphStore::restore() and DeploymentStore::restore(). Uses plan_restore() to determine whether to create or replace the deployment site, validates the target shard exists, resolves the subgraph name for deployment rule matching, and assigns the restored deployment to a node. Changes: - DeploymentStore::restore() coordinates schema creation, data import, and finalization - Inner::restore() handles conflict resolution, site allocation, and node assignment via deployment rules - Expose create_site() and find_active_site() on primary::Connection - Make create_site() accept an `active` parameter
Add `graphman restore` CLI with options: --directory Path to dump directory --shard Target shard (default: primary) --name Subgraph name for deployment rule matching --replace Drop and recreate if exists in target shard --add Create copy in a different shard --force Restore regardless of current state
Wrap the dump's data-reading operations in a REPEATABLE READ READ ONLY transaction to get a consistent MVCC snapshot. This prevents head block vs entity data mismatches, cross-table inconsistency, and missing or phantom rows caused by concurrent indexing or pruning. Add a TransactionBuilder for PermittedConnection since diesel-async's TransactionBuilder requires TransactionManager = AnsiTransactionManager, which pool-wrapped connection types don't satisfy.
There was a problem hiding this comment.
Didn't know where is the best place to put those:
- The spec/implementation suggest that the
dumpcommand supports incremental dumps, but I couldn't spot such path and the file name is hardcoded tochunk_000000.parquet. import_datasays it is resumable, but it seems there's no path to utilize this, not sure if intentional at this stage. All Actions seem destructive of previous restore attempts.
graph-node/store/postgres/src/relational/restore.rs
Lines 318 to 320 in 8f6b0dc
| /// Determine the target node for a deployment using the configured | ||
| /// deployment rules, ignoring the shard selection. Returns an error | ||
| /// if no rule matches. | ||
| async fn node_for_deployment( |
There was a problem hiding this comment.
The documentation says that this function ignores the shard from place(). If --shard is not passed, the shard will default to the primary one, but wouldn't this break the deployment rule for that node if a different shard is defined for subgraph deployments in the config?
| /// Opens the file, reads all row groups, and returns them as a vector | ||
| /// of `RecordBatch`es. The batches retain the schema embedded in the | ||
| /// Parquet file. | ||
| pub fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, StoreError> { |
There was a problem hiding this comment.
Wouldn't this function load the whole table into the memory? I know it says it's not production ready and mostly for local dev, where probably tables aren't big, but could be a problem for large tables.
| ColumnType::String => add_typed::<Text>(select, table, name, is_list, is_nullable), | ||
| ColumnType::Enum(_) => { | ||
| // Cast enum to text for dump | ||
| let alias = table.meta.name.as_str(); |
There was a problem hiding this comment.
This alias is never used, but then the c alias is hardcoded into the query (also on row 334 and 335). Probably this alias definition should be moved somewhere above and then used in all 3 places.
| let relative_path = format!("{}/chunk_000000.parquet", table_dir_name); | ||
| let abs_path = table_dir.join("chunk_000000.parquet"); | ||
| let mut writer = ParquetChunkWriter::new(abs_path, relative_path, &arrow_schema)?; | ||
|
|
There was a problem hiding this comment.
It would be nice to have some kind of progression logging, something like
| println!("Dumping table {} into {}", table_name, relative_path) |
| use super::Layout; | ||
|
|
||
| #[derive(Serialize, Deserialize)] | ||
| pub(crate) struct Manifest { |
There was a problem hiding this comment.
Just a thought, but this Manifest structs and SubgraphManifestEntity seem pretty similar, is it possible to use the SubgraphManifestEntity direclty?
| deployment: DeploymentSearch, | ||
| directory: String, | ||
| ) -> Result<()> { | ||
| let directory = fs::canonicalize(&directory)?; |
There was a problem hiding this comment.
Just a suggestion: dump does not create the output directory and expects it to already exists or will throw an error, but maybe the command should create it instead.
dimitrovmaksim
left a comment
There was a problem hiding this comment.
Looks good as dev-only/experimental feature. Left some comments on things that I think are worth addressing, but nothing blocking for now.
| } | ||
|
|
||
| /// Import the `data_sources$` table from Parquet chunks. | ||
| async fn import_data_sources( |
There was a problem hiding this comment.
Not sure if there's something specific about data_sources$ that requires it, but import_data_sources seems to insert rows one at a time rather than in batches like the entity tables do. Spec file suggest it has to be batched as well
graph-node/docs/specs/dump-restore.md
Line 207 in 8f6b0dc
| /// Subgraph name for deployment rule matching and node assignment. | ||
| /// If omitted, uses an existing name from the database; errors if none found. | ||
| #[clap(long)] | ||
| name: Option<String>, |
There was a problem hiding this comment.
Ooops, just discovered this one.
If --name points to a subgraph that doesn't exist in the DB, restore still runs to completion but the deployment ends up orphaned — no name association, only reachable by hash, and is hard to clean up as unused record and unused remove does not see it. This should be validated, before restore is executed.
Also, docs should probably mention that in a fresh environment you need to run graphman create <NAME> first.
This PR implements
graphman dumpandgraphman restorecommands that do what it says on the tin.Dumps are consistent, but they are taken using a single transaction; that may have very bad effects on the overall system if dumps take a very long time. Restore is a little nicer in that it splits the data import into multiple jobs.
There are lots of ways in which this could be improved, but I feel this is a useful starting point, at the very least for development and test systems. The help text for the dump and restore command have scary warnings about not using them in production - and they shouldn't, even though I think at least for smaller subgraphs they might work ok.
Here's the help text for
graphman dump:and
grpahman restore: