From abdfd6e50c686b57f98744d372039cc54e12c8c1 Mon Sep 17 00:00:00 2001 From: Eray Date: Thu, 16 Oct 2025 12:39:10 +0300 Subject: [PATCH 1/3] node: Fix duplicate key constraint when reverting chain shard changes - Implement robust backup naming system to avoid conflicts - Add support for reusing existing backups when appropriate - Preserve previous backups with unique names - Add comprehensive logging for backup operations Fixes #6196 --- node/src/manager/commands/chain.rs | 147 +++++++++++++++++++++++++---- 1 file changed, 131 insertions(+), 16 deletions(-) diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 2f344bdeb7b..4b885f84b7c 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -22,6 +22,7 @@ use graph::{ use graph_chain_ethereum::EthereumAdapter; use graph_chain_ethereum::EthereumAdapterTrait as _; use graph_chain_ethereum::chain::BlockFinality; +use graph_store_postgres::AsyncPgConnection; use graph_store_postgres::BlockStore; use graph_store_postgres::ChainStore; use graph_store_postgres::PoolCoordinator; @@ -236,6 +237,43 @@ pub async fn update_chain_genesis( Ok(()) } +struct ChainSwapOutcome { + latest_backup_name: String, + previous_backup_final_name: Option, + reused_previous_backup: bool, + allocated_chain: bool, +} + +fn backup_name(chain: &str, base: &str) -> String { + format!("{chain}-{base}") +} + +fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String { + format!("{backup_name}-{suffix}") +} + +async fn next_backup_name( + conn: &mut AsyncPgConnection, + chain: &str, + base: &str, +) -> Result { + let backup_name = backup_name(chain, base); + if find_chain(conn, &backup_name).await?.is_none() { + return Ok(backup_name); + } + + let mut suffix = 1usize; + + loop { + let candidate = suffixed_backup_name(&backup_name, suffix); + if find_chain(conn, &candidate).await?.is_none() { + return Ok(candidate); + } + + suffix += 1; + } +} + pub async fn change_block_cache_shard( primary_store: ConnectionPool, store: BlockStore, @@ -250,6 +288,10 @@ pub async fn change_block_cache_shard( .await? .ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?; let old_shard = chain.shard; + let canonical_backup_name = format!("{chain_name}-old"); + + let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?; + let existing_backup_store = store.chain_store(&canonical_backup_name).await; println!("Current shard: {}", old_shard); @@ -257,44 +299,101 @@ pub async fn change_block_cache_shard( .chain_store(&chain_name) .await .ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?; - let new_name = format!("{}-old", &chain_name); let ident = chain_store.chain_identifier().await?; + let target_shard = Shard::new(shard.clone())?; + let reuse_existing_backup = existing_backup + .as_ref() + .map(|backup| backup.shard.as_str() == target_shard.as_str()) + .unwrap_or(false); + + let allocated_chain = if reuse_existing_backup { + None + } else { + let chain = + BlockStore::allocate_chain(&mut conn, &chain_name, &target_shard, &ident).await?; + store.add_chain_store(&chain, true).await?; + Some(chain) + }; - conn.transaction::<(), StoreError, _>(|conn| { + let outcome = conn.transaction::(|conn| { async { - let shard = Shard::new(shard.to_string())?; - - let chain = BlockStore::allocate_chain(conn, &chain_name, &shard, &ident).await?; - - store.add_chain_store(&chain, true).await?; - - // Drop the foreign key constraint on deployment_schemas sql_query( "alter table deployment_schemas drop constraint deployment_schemas_network_fkey;", ) .execute(conn).await?; - // Update the current chain name to chain-old - update_chain_name(conn, &chain_name, &new_name).await?; + let previous_backup_final_name = if let Some(backup) = existing_backup.as_ref() { + let temp_name = next_backup_name(conn, &chain_name, "old").await?; + update_chain_name(conn, &backup.name, &temp_name).await?; + + if reuse_existing_backup { + update_chain_name(conn, &temp_name, &chain_name).await?; + Some(chain_name.clone()) + } else { + Some(temp_name) + } + } else { + None + }; - // Create a new chain with the name in the destination shard - let _ = add_chain(conn, &chain_name, &shard, ident).await?; + let latest_backup_name = next_backup_name(conn, &chain_name, "old").await?; + update_chain_name(conn, &chain_name, &latest_backup_name).await?; + + if !reuse_existing_backup { + add_chain(conn, &chain_name, &target_shard, ident.clone()).await?; + } - // Re-add the foreign key constraint sql_query( "alter table deployment_schemas add constraint deployment_schemas_network_fkey foreign key (network) references chains(name);", ) .execute(conn).await?; - Ok(()) + + Ok(ChainSwapOutcome { + latest_backup_name, + previous_backup_final_name, + reused_previous_backup: reuse_existing_backup, + allocated_chain: allocated_chain.is_some(), + }) }.scope_boxed() }).await?; - chain_store.update_name(&new_name).await?; + let ChainSwapOutcome { + latest_backup_name, + previous_backup_final_name, + reused_previous_backup, + allocated_chain, + } = outcome; + + chain_store.update_name(&latest_backup_name).await?; + + if let (Some(backup_store), Some(final_name)) = ( + existing_backup_store.as_ref(), + previous_backup_final_name.as_ref(), + ) { + backup_store.update_name(final_name).await?; + } println!( "Changed block cache shard for {} from {} to {}", chain_name, old_shard, shard ); + println!("Latest backup recorded as `{}`", latest_backup_name); + + if reused_previous_backup { + println!( + "Reused existing backup `{}` as the active `{}` chain", + canonical_backup_name, chain_name + ); + } else if let Some(ref preserved) = previous_backup_final_name { + println!("Preserved earlier backup as `{}`", preserved); + } + + if allocated_chain { + println!( + "Allocated new chain state for `{}` on shard {}", + chain_name, shard + ); + } Ok(()) } @@ -329,3 +428,19 @@ pub async fn ingest( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::{backup_name, suffixed_backup_name}; + + #[test] + fn backup_name_uses_plain_name_first() { + assert_eq!(backup_name("mainnet", "old"), "mainnet-old"); + } + + #[test] + fn suffixed_backup_name_adds_numeric_suffixes() { + assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1"); + assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42"); + } +} From afefd27b381f5e59cda767bc42591a3ef3e8829b Mon Sep 17 00:00:00 2001 From: Eray Date: Wed, 29 Apr 2026 09:44:17 +0300 Subject: [PATCH 2/3] node: Prompt before reusing change-shard backup --- node/src/manager/commands/chain.rs | 135 +++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 35 deletions(-) diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 4b885f84b7c..660469e04c7 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -33,6 +33,7 @@ use graph_store_postgres::find_chain; use graph_store_postgres::update_chain_name; use graph_store_postgres::{ConnectionPool, command_support::catalog::block_store}; +use crate::manager::prompt::prompt_for_confirmation; use crate::network_setup::Networks; pub async fn list(primary: ConnectionPool, store: BlockStore) -> Result<(), Error> { @@ -238,12 +239,17 @@ pub async fn update_chain_genesis( } struct ChainSwapOutcome { - latest_backup_name: String, - previous_backup_final_name: Option, reused_previous_backup: bool, allocated_chain: bool, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum ExistingBackupDisposition<'a> { + ProceedFresh, + PromptReuse, + AbortWrongShard(&'a str), +} + fn backup_name(chain: &str, base: &str) -> String { format!("{chain}-{base}") } @@ -252,7 +258,18 @@ fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String { format!("{backup_name}-{suffix}") } -async fn next_backup_name( +fn existing_backup_disposition<'a>( + existing_backup_shard: Option<&'a str>, + target_shard: &str, +) -> ExistingBackupDisposition<'a> { + match existing_backup_shard { + None => ExistingBackupDisposition::ProceedFresh, + Some(shard) if shard == target_shard => ExistingBackupDisposition::PromptReuse, + Some(shard) => ExistingBackupDisposition::AbortWrongShard(shard), + } +} + +async fn next_temporary_backup_name( conn: &mut AsyncPgConnection, chain: &str, base: &str, @@ -289,9 +306,7 @@ pub async fn change_block_cache_shard( .ok_or_else(|| anyhow!("unknown chain: {}", chain_name))?; let old_shard = chain.shard; let canonical_backup_name = format!("{chain_name}-old"); - let existing_backup = find_chain(&mut conn, &canonical_backup_name).await?; - let existing_backup_store = store.chain_store(&canonical_backup_name).await; println!("Current shard: {}", old_shard); @@ -301,10 +316,47 @@ pub async fn change_block_cache_shard( .ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?; let ident = chain_store.chain_identifier().await?; let target_shard = Shard::new(shard.clone())?; - let reuse_existing_backup = existing_backup - .as_ref() - .map(|backup| backup.shard.as_str() == target_shard.as_str()) - .unwrap_or(false); + let existing_backup_disposition = existing_backup_disposition( + existing_backup.as_ref().map(|backup| backup.shard.as_str()), + target_shard.as_str(), + ); + let reuse_existing_backup = matches!( + existing_backup_disposition, + ExistingBackupDisposition::PromptReuse + ); + + match existing_backup_disposition { + ExistingBackupDisposition::ProceedFresh => {} + ExistingBackupDisposition::AbortWrongShard(backup_shard) => { + bail!( + "`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`", + canonical_backup_name, + backup_shard, + canonical_backup_name, + chain_name, + target_shard, + ); + } + ExistingBackupDisposition::PromptReuse => { + let prompt = format!( + "`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?", + canonical_backup_name, target_shard, chain_name + ); + if !prompt_for_confirmation(&prompt)? { + println!( + "Aborting. Remove `{}` with `graphman chain remove {}` if you want to create a fresh cache on shard `{}`.", + canonical_backup_name, canonical_backup_name, target_shard + ); + return Ok(()); + } + } + } + + let existing_backup_store = if reuse_existing_backup { + store.chain_store(&canonical_backup_name).await + } else { + None + }; let allocated_chain = if reuse_existing_backup { None @@ -322,24 +374,20 @@ pub async fn change_block_cache_shard( ) .execute(conn).await?; - let previous_backup_final_name = if let Some(backup) = existing_backup.as_ref() { - let temp_name = next_backup_name(conn, &chain_name, "old").await?; + let temp_backup_name = if let Some(backup) = existing_backup.as_ref() { + let temp_name = next_temporary_backup_name(conn, &chain_name, "old").await?; update_chain_name(conn, &backup.name, &temp_name).await?; - - if reuse_existing_backup { - update_chain_name(conn, &temp_name, &chain_name).await?; - Some(chain_name.clone()) - } else { - Some(temp_name) - } + Some(temp_name) } else { None }; - let latest_backup_name = next_backup_name(conn, &chain_name, "old").await?; - update_chain_name(conn, &chain_name, &latest_backup_name).await?; + update_chain_name(conn, &chain_name, &canonical_backup_name).await?; - if !reuse_existing_backup { + if reuse_existing_backup { + debug_assert!(temp_backup_name.is_some()); + update_chain_name(conn, temp_backup_name.as_ref().unwrap(), &chain_name).await?; + } else { add_chain(conn, &chain_name, &target_shard, ident.clone()).await?; } @@ -349,8 +397,6 @@ pub async fn change_block_cache_shard( .execute(conn).await?; Ok(ChainSwapOutcome { - latest_backup_name, - previous_backup_final_name, reused_previous_backup: reuse_existing_backup, allocated_chain: allocated_chain.is_some(), }) @@ -358,34 +404,27 @@ pub async fn change_block_cache_shard( }).await?; let ChainSwapOutcome { - latest_backup_name, - previous_backup_final_name, reused_previous_backup, allocated_chain, } = outcome; - chain_store.update_name(&latest_backup_name).await?; + chain_store.update_name(&canonical_backup_name).await?; - if let (Some(backup_store), Some(final_name)) = ( - existing_backup_store.as_ref(), - previous_backup_final_name.as_ref(), - ) { - backup_store.update_name(final_name).await?; + if reused_previous_backup && let Some(backup_store) = existing_backup_store.as_ref() { + backup_store.update_name(&chain_name).await?; } println!( "Changed block cache shard for {} from {} to {}", chain_name, old_shard, shard ); - println!("Latest backup recorded as `{}`", latest_backup_name); + println!("Latest backup recorded as `{}`", canonical_backup_name); if reused_previous_backup { println!( "Reused existing backup `{}` as the active `{}` chain", canonical_backup_name, chain_name ); - } else if let Some(ref preserved) = previous_backup_final_name { - println!("Preserved earlier backup as `{}`", preserved); } if allocated_chain { @@ -431,7 +470,9 @@ pub async fn ingest( #[cfg(test)] mod tests { - use super::{backup_name, suffixed_backup_name}; + use super::{ + ExistingBackupDisposition, backup_name, existing_backup_disposition, suffixed_backup_name, + }; #[test] fn backup_name_uses_plain_name_first() { @@ -443,4 +484,28 @@ mod tests { assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1"); assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42"); } + + #[test] + fn existing_backup_disposition_proceeds_when_backup_missing() { + assert_eq!( + existing_backup_disposition(None, "shard_b"), + ExistingBackupDisposition::ProceedFresh + ); + } + + #[test] + fn existing_backup_disposition_prompts_when_backup_matches_target_shard() { + assert_eq!( + existing_backup_disposition(Some("shard_b"), "shard_b"), + ExistingBackupDisposition::PromptReuse + ); + } + + #[test] + fn existing_backup_disposition_aborts_when_backup_is_on_another_shard() { + assert_eq!( + existing_backup_disposition(Some("shard_a"), "shard_b"), + ExistingBackupDisposition::AbortWrongShard("shard_a") + ); + } } From 704e10d0b7cfc6319800ba58c39fb6519faa0171 Mon Sep 17 00:00:00 2001 From: Eray Date: Sat, 2 May 2026 01:02:51 +0300 Subject: [PATCH 3/3] node: Address change-shard review feedback --- node/src/manager/commands/chain.rs | 172 +++++++---------------------- 1 file changed, 40 insertions(+), 132 deletions(-) diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 660469e04c7..a32d86f1fce 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -22,7 +22,6 @@ use graph::{ use graph_chain_ethereum::EthereumAdapter; use graph_chain_ethereum::EthereumAdapterTrait as _; use graph_chain_ethereum::chain::BlockFinality; -use graph_store_postgres::AsyncPgConnection; use graph_store_postgres::BlockStore; use graph_store_postgres::ChainStore; use graph_store_postgres::PoolCoordinator; @@ -238,59 +237,6 @@ pub async fn update_chain_genesis( Ok(()) } -struct ChainSwapOutcome { - reused_previous_backup: bool, - allocated_chain: bool, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum ExistingBackupDisposition<'a> { - ProceedFresh, - PromptReuse, - AbortWrongShard(&'a str), -} - -fn backup_name(chain: &str, base: &str) -> String { - format!("{chain}-{base}") -} - -fn suffixed_backup_name(backup_name: &str, suffix: usize) -> String { - format!("{backup_name}-{suffix}") -} - -fn existing_backup_disposition<'a>( - existing_backup_shard: Option<&'a str>, - target_shard: &str, -) -> ExistingBackupDisposition<'a> { - match existing_backup_shard { - None => ExistingBackupDisposition::ProceedFresh, - Some(shard) if shard == target_shard => ExistingBackupDisposition::PromptReuse, - Some(shard) => ExistingBackupDisposition::AbortWrongShard(shard), - } -} - -async fn next_temporary_backup_name( - conn: &mut AsyncPgConnection, - chain: &str, - base: &str, -) -> Result { - let backup_name = backup_name(chain, base); - if find_chain(conn, &backup_name).await?.is_none() { - return Ok(backup_name); - } - - let mut suffix = 1usize; - - loop { - let candidate = suffixed_backup_name(&backup_name, suffix); - if find_chain(conn, &candidate).await?.is_none() { - return Ok(candidate); - } - - suffix += 1; - } -} - pub async fn change_block_cache_shard( primary_store: ConnectionPool, store: BlockStore, @@ -316,28 +262,33 @@ pub async fn change_block_cache_shard( .ok_or_else(|| anyhow!("unknown chain: {}", &chain_name))?; let ident = chain_store.chain_identifier().await?; let target_shard = Shard::new(shard.clone())?; - let existing_backup_disposition = existing_backup_disposition( - existing_backup.as_ref().map(|backup| backup.shard.as_str()), - target_shard.as_str(), - ); - let reuse_existing_backup = matches!( - existing_backup_disposition, - ExistingBackupDisposition::PromptReuse - ); - match existing_backup_disposition { - ExistingBackupDisposition::ProceedFresh => {} - ExistingBackupDisposition::AbortWrongShard(backup_shard) => { + let reuse_existing_backup = match existing_backup.as_ref() { + None => false, + Some(backup) if backup.shard != target_shard => { bail!( "`{}` already exists on shard `{}`. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`", canonical_backup_name, - backup_shard, + backup.shard, canonical_backup_name, chain_name, target_shard, ); } - ExistingBackupDisposition::PromptReuse => { + Some(backup) => { + let backup_ident = backup.network_identifier()?; + if backup_ident != ident { + bail!( + "`{}` has a different chain identifier ({}) than `{}` ({}). Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`", + canonical_backup_name, + backup_ident, + chain_name, + ident, + canonical_backup_name, + chain_name, + target_shard, + ); + } let prompt = format!( "`{}` already exists on shard `{}` and will be reused as the active `{}` chain.\nProceed?", canonical_backup_name, target_shard, chain_name @@ -349,8 +300,9 @@ pub async fn change_block_cache_shard( ); return Ok(()); } + true } - } + }; let existing_backup_store = if reuse_existing_backup { store.chain_store(&canonical_backup_name).await @@ -367,26 +319,32 @@ pub async fn change_block_cache_shard( Some(chain) }; - let outcome = conn.transaction::(|conn| { + let temp_backup_name = format!("{chain_name}-old-temp"); + if reuse_existing_backup && find_chain(&mut conn, &temp_backup_name).await?.is_some() { + bail!( + "`{}` already exists. Remove it with `graphman chain remove {}` before changing `{}` to shard `{}`", + temp_backup_name, + temp_backup_name, + chain_name, + target_shard, + ); + } + + conn.transaction::<(), StoreError, _>(|conn| { async { sql_query( "alter table deployment_schemas drop constraint deployment_schemas_network_fkey;", ) .execute(conn).await?; - let temp_backup_name = if let Some(backup) = existing_backup.as_ref() { - let temp_name = next_temporary_backup_name(conn, &chain_name, "old").await?; - update_chain_name(conn, &backup.name, &temp_name).await?; - Some(temp_name) - } else { - None - }; + if let Some(backup) = existing_backup.as_ref() { + update_chain_name(conn, &backup.name, &temp_backup_name).await?; + } update_chain_name(conn, &chain_name, &canonical_backup_name).await?; if reuse_existing_backup { - debug_assert!(temp_backup_name.is_some()); - update_chain_name(conn, temp_backup_name.as_ref().unwrap(), &chain_name).await?; + update_chain_name(conn, &temp_backup_name, &chain_name).await?; } else { add_chain(conn, &chain_name, &target_shard, ident.clone()).await?; } @@ -396,21 +354,13 @@ pub async fn change_block_cache_shard( ) .execute(conn).await?; - Ok(ChainSwapOutcome { - reused_previous_backup: reuse_existing_backup, - allocated_chain: allocated_chain.is_some(), - }) + Ok(()) }.scope_boxed() }).await?; - let ChainSwapOutcome { - reused_previous_backup, - allocated_chain, - } = outcome; - chain_store.update_name(&canonical_backup_name).await?; - if reused_previous_backup && let Some(backup_store) = existing_backup_store.as_ref() { + if reuse_existing_backup && let Some(backup_store) = existing_backup_store.as_ref() { backup_store.update_name(&chain_name).await?; } @@ -420,14 +370,14 @@ pub async fn change_block_cache_shard( ); println!("Latest backup recorded as `{}`", canonical_backup_name); - if reused_previous_backup { + if reuse_existing_backup { println!( "Reused existing backup `{}` as the active `{}` chain", canonical_backup_name, chain_name ); } - if allocated_chain { + if allocated_chain.is_some() { println!( "Allocated new chain state for `{}` on shard {}", chain_name, shard @@ -467,45 +417,3 @@ pub async fn ingest( } Ok(()) } - -#[cfg(test)] -mod tests { - use super::{ - ExistingBackupDisposition, backup_name, existing_backup_disposition, suffixed_backup_name, - }; - - #[test] - fn backup_name_uses_plain_name_first() { - assert_eq!(backup_name("mainnet", "old"), "mainnet-old"); - } - - #[test] - fn suffixed_backup_name_adds_numeric_suffixes() { - assert_eq!(suffixed_backup_name("mainnet-old", 1), "mainnet-old-1"); - assert_eq!(suffixed_backup_name("mainnet-old", 42), "mainnet-old-42"); - } - - #[test] - fn existing_backup_disposition_proceeds_when_backup_missing() { - assert_eq!( - existing_backup_disposition(None, "shard_b"), - ExistingBackupDisposition::ProceedFresh - ); - } - - #[test] - fn existing_backup_disposition_prompts_when_backup_matches_target_shard() { - assert_eq!( - existing_backup_disposition(Some("shard_b"), "shard_b"), - ExistingBackupDisposition::PromptReuse - ); - } - - #[test] - fn existing_backup_disposition_aborts_when_backup_is_on_another_shard() { - assert_eq!( - existing_backup_disposition(Some("shard_a"), "shard_b"), - ExistingBackupDisposition::AbortWrongShard("shard_a") - ); - } -}