Skip to content

Dump/restore for subgraphs#6397

Open
lutter wants to merge 20 commits intomasterfrom
lutter/dump
Open

Dump/restore for subgraphs#6397
lutter wants to merge 20 commits intomasterfrom
lutter/dump

Conversation

@lutter
Copy link
Collaborator

@lutter lutter commented Feb 24, 2026

This PR implements graphman dump and graphman restore commands that do what it says on the tin.

Dumps are consistent, but they are taken using a single transaction; that may have very bad effects on the overall system if dumps take a very long time. Restore is a little nicer in that it splits the data import into multiple jobs.

There are lots of ways in which this could be improved, but I feel this is a useful starting point, at the very least for development and test systems. The help text for the dump and restore command have scary warnings about not using them in production - and they shouldn't, even though I think at least for smaller subgraphs they might work ok.

Here's the help text for graphman dump:

Dump a subgraph deployment into a directory

EXPERIMENTAL - NOT FOR PRODUCTION USE

This will create a dump of the subgraph deployment in the specified directory. The dump includes
the subgraph manifest, the mapping, and the data in the database as parquet files. The dump can
be used to restore the subgraph deployment later with the `restore` command.

Usage: graphman --config <CONFIG> dump <DEPLOYMENT> <DIRECTORY>

Arguments:
  <DEPLOYMENT>
          The deployment (see `help info`)

  <DIRECTORY>
          The name of the directory to dump to

and grpahman restore:

Restore a subgraph deployment from a dump directory

EXPERIMENTAL - NOT FOR PRODUCTION USE

Restore a subgraph deployment from a dump created with the `dump` command.

Usage: graphman --config <CONFIG> restore [OPTIONS] <DIRECTORY>

Arguments:
  <DIRECTORY>
          Path to the dump directory

Options:
      --shard <SHARD>
          The database shard to restore into (default: primary)

      --name <NAME>
          Subgraph name for deployment rule matching and node assignment. If omitted, uses an
          existing name from the database; errors if none found

      --replace
          Drop and recreate if the deployment already exists in the target shard

      --add
          Create a copy in a shard that doesn't have this deployment (requires --shard)

      --force
          Restore no matter what: replace if exists in target shard, add if not

Define the directory layout, metadata.json schema, and per-entity
Parquet file format for dumping and restoring subgraph data.
Add `OidValue::Int4Range(Bound<i32>, Bound<i32>)` variant to properly
deserialize PostgreSQL int4range columns (OID 3904). Update `select_cols`
in dsl.rs to bind block_range as `Range<Integer>` instead of using the
Bytes placeholder, resolving the existing TODO.
Add block$ (immutable), block_range (mutable), and causality_region
to the column lookup so callers that need these columns by name can
find them. Also add CAUSALITY_REGION_COL pseudo-column static and
tighten with_nsp visibility to pub(crate).
Create the foundation for Parquet-based dump/restore: a schema mapping
module that converts relational Table definitions to Arrow Schema objects.

- arrow_schema() maps Table -> Arrow Schema with system columns (vid,
  block tracking, causality_region) followed by data columns
- data_sources_arrow_schema() provides fixed schema for data_sources$ table
- All ColumnType variants mapped (TSVector skipped, Enum -> Utf8)
- List columns wrapped in Arrow List type based on Column.is_list()
Implement rows_to_record_batch() which converts Vec<DynamicRow<OidValue>>
into Arrow RecordBatch objects. Uses a type-erased ColumnBuilder enum that
dispatches on Arrow DataType to create the appropriate array builder.

Supports all OidValue scalar variants (Bool, Int, Int8, Bytes, String,
BigDecimal, Timestamp) and all array variants (BoolArray, Ints, Int8Array,
BytesArray, StringArray, BigDecimalArray, TimestampArray). Block range
columns arrive as separate Int32 values from the dump query, keeping the
converter as a clean 1:1 mapping.
Add ParquetChunkWriter that wraps Arrow's ArrowWriter to stream
RecordBatches into a ZSTD-compressed Parquet file while tracking
row count and vid range. Returns ChunkInfo metadata on finish for
inclusion in metadata.json.
Wire dump queries and Parquet writing into Layout::dump(). This is the
core of the dump feature, covering both entity tables and the special
data_sources$ table.

For entity tables: build a DynamicSelectClause that selects vid, block
columns (split into lower/upper for mutable tables), causality_region,
and data columns. Use VidBatcher for adaptive batching, convert rows
to Arrow RecordBatch, and write via ParquetChunkWriter.

For data_sources$: use a concrete QueryableByName struct with raw SQL
(fixed schema, no DynamicSelectClause needed). Check table existence
via catalog::table_exists before attempting dump.

Metadata includes version, network, block pointers, entity count,
graft info, health, indexes, and per-table chunk tracking. Written
atomically via tmp+rename so its presence signals a complete dump.

Add entity_count() helper in detail.rs. Wire dump() through
DeploymentStore and SubgraphStore.
Wire the dump subcommand into graphman CLI. Takes a deployment search
argument and an output directory, resolves the deployment, and delegates
to SubgraphStore::dump().
Add `parquet/reader.rs` with `read_batches()` to read Parquet files
back into Arrow RecordBatches. This is the foundation for the restore
pipeline (step 1 of the restore plan).
Add `RestoreRow`, `DataSourceRestoreRow` structs and conversion
functions `record_batch_to_restore_rows()` and
`record_batch_to_data_source_rows()` that convert Arrow RecordBatches
back into typed data suitable for database insertion. This is step 2
of the restore plan.
Add a new constructor to InsertQuery that accepts RestoreRow data
from Parquet files, bypassing the WriteChunk/EntityWrite pipeline.
This is the insertion foundation for the restore path.

Also add From<i32> impl for CausalityRegion to allow constructing
values from deserialized Parquet data.
Add a method to read and validate metadata.json from a dump
directory. Make fields of Metadata, Manifest, BlockPtr, Health,
and Error structs pub(crate) so they are accessible from the
restore module.
Add import_data() to the restore module that reads Parquet chunks
and inserts entity data into PostgreSQL tables. Supports resumability
by checking max(vid) in each table and skipping already-imported rows.

Entity tables use InsertQuery::for_restore() for efficient batch
inserts. The data_sources$ table uses raw SQL with bind parameters
since it has a fixed schema outside the Layout.
Wire up the restore pipeline through SubgraphStore::restore() and
DeploymentStore::restore(). Uses plan_restore() to determine whether to
create or replace the deployment site, validates the target shard exists,
resolves the subgraph name for deployment rule matching, and assigns the
restored deployment to a node.

Changes:
- DeploymentStore::restore() coordinates schema creation, data import,
  and finalization
- Inner::restore() handles conflict resolution, site allocation, and
  node assignment via deployment rules
- Expose create_site() and find_active_site() on primary::Connection
- Make create_site() accept an `active` parameter
Add `graphman restore` CLI with options:
  --directory  Path to dump directory
  --shard      Target shard (default: primary)
  --name       Subgraph name for deployment rule matching
  --replace    Drop and recreate if exists in target shard
  --add        Create copy in a different shard
  --force      Restore regardless of current state
Wrap the dump's data-reading operations in a REPEATABLE READ READ ONLY
transaction to get a consistent MVCC snapshot. This prevents head block
vs entity data mismatches, cross-table inconsistency, and missing or
phantom rows caused by concurrent indexing or pruning.

Add a TransactionBuilder for PermittedConnection since diesel-async's
TransactionBuilder requires TransactionManager = AnsiTransactionManager,
which pool-wrapped connection types don't satisfy.
Copy link
Member

@dimitrovmaksim dimitrovmaksim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know where is the best place to put those:

  1. The spec/implementation suggest that the dump command supports incremental dumps, but I couldn't spot such path and the file name is hardcoded to chunk_000000.parquet.
  2. import_data says it is resumable, but it seems there's no path to utilize this, not sure if intentional at this stage. All Actions seem destructive of previous restore attempts.
    /// This is resumable: if interrupted, it can be called again and will
    /// skip already-imported rows by checking the current max(vid) in each
    /// table.

/// Determine the target node for a deployment using the configured
/// deployment rules, ignoring the shard selection. Returns an error
/// if no rule matches.
async fn node_for_deployment(
Copy link
Member

@dimitrovmaksim dimitrovmaksim Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation says that this function ignores the shard from place(). If --shard is not passed, the shard will default to the primary one, but wouldn't this break the deployment rule for that node if a different shard is defined for subgraph deployments in the config?

/// Opens the file, reads all row groups, and returns them as a vector
/// of `RecordBatch`es. The batches retain the schema embedded in the
/// Parquet file.
pub fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, StoreError> {
Copy link
Member

@dimitrovmaksim dimitrovmaksim Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this function load the whole table into the memory? I know it says it's not production ready and mostly for local dev, where probably tables aren't big, but could be a problem for large tables.

ColumnType::String => add_typed::<Text>(select, table, name, is_list, is_nullable),
ColumnType::Enum(_) => {
// Cast enum to text for dump
let alias = table.meta.name.as_str();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This alias is never used, but then the c alias is hardcoded into the query (also on row 334 and 335). Probably this alias definition should be moved somewhere above and then used in all 3 places.

let relative_path = format!("{}/chunk_000000.parquet", table_dir_name);
let abs_path = table_dir.join("chunk_000000.parquet");
let mut writer = ParquetChunkWriter::new(abs_path, relative_path, &arrow_schema)?;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have some kind of progression logging, something like

Suggested change
println!("Dumping table {} into {}", table_name, relative_path)

use super::Layout;

#[derive(Serialize, Deserialize)]
pub(crate) struct Manifest {
Copy link
Member

@dimitrovmaksim dimitrovmaksim Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, but this Manifest structs and SubgraphManifestEntity seem pretty similar, is it possible to use the SubgraphManifestEntity direclty?

deployment: DeploymentSearch,
directory: String,
) -> Result<()> {
let directory = fs::canonicalize(&directory)?;
Copy link
Member

@dimitrovmaksim dimitrovmaksim Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion: dump does not create the output directory and expects it to already exists or will throw an error, but maybe the command should create it instead.

Copy link
Member

@dimitrovmaksim dimitrovmaksim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good as dev-only/experimental feature. Left some comments on things that I think are worth addressing, but nothing blocking for now.

}

/// Import the `data_sources$` table from Parquet chunks.
async fn import_data_sources(
Copy link
Member

@dimitrovmaksim dimitrovmaksim Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's something specific about data_sources$ that requires it, but import_data_sources seems to insert rows one at a time rather than in batches like the entity tables do. Spec file suggest it has to be batched as well

6. Restore `data_sources$` table via DDL from `DataSourcesTable::new().as_ddl()` + batch-insert

/// Subgraph name for deployment rule matching and node assignment.
/// If omitted, uses an existing name from the database; errors if none found.
#[clap(long)]
name: Option<String>,
Copy link
Member

@dimitrovmaksim dimitrovmaksim Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooops, just discovered this one.
If --name points to a subgraph that doesn't exist in the DB, restore still runs to completion but the deployment ends up orphaned — no name association, only reachable by hash, and is hard to clean up as unused record and unused remove does not see it. This should be validated, before restore is executed.
Also, docs should probably mention that in a fresh environment you need to run graphman create <NAME> first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants