diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 9505b6a6..11705b5c 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -32,19 +32,18 @@ pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; #[cfg(feature = "minimal")] impl GroveDb { - pub fn start_syncing_session(&self, app_hash: [u8; 32]) -> Pin> { - MultiStateSyncSession::new(self.start_transaction(), app_hash) + pub fn start_syncing_session( + &self, + app_hash: [u8; 32], + subtrees_batch_size: usize, + ) -> Pin> { + MultiStateSyncSession::new(self, app_hash, subtrees_batch_size) } pub fn commit_session(&self, session: Pin>) -> Result<(), Error> { - match self.commit_transaction(session.into_transaction()).value { - Ok(_) => Ok(()), - Err(e) => { - // Log the error or handle it as needed - eprintln!("Failed to commit session: {:?}", e); - Err(e) - } - } + session + .commit() + .inspect_err(|e| eprintln!("Failed to commit session: {:?}", e)) } /// Fetches a chunk of data from the database based on the given global @@ -184,6 +183,8 @@ impl GroveDb { /// /// # Parameters /// - `app_hash`: The root hash of the application state to synchronize. + /// - `subtrees_batch_size`: Maximum number of subtrees that can be + /// processed in a single batch. /// - `version`: The version of the state sync protocol to use. /// - `grove_version`: The version of GroveDB being used. /// @@ -216,6 +217,7 @@ impl GroveDb { pub fn start_snapshot_syncing( &self, app_hash: CryptoHash, + subtrees_batch_size: usize, version: u16, grove_version: &GroveVersion, ) -> Result>, Error> { @@ -233,12 +235,17 @@ impl GroveDb { )); } + if subtrees_batch_size == 0 { + return Err(Error::InternalError( + "subtrees_batch_size cannot be zero".to_string(), + )); + } + let root_prefix = [0u8; 32]; - let mut session = self.start_syncing_session(app_hash); + let mut session = self.start_syncing_session(app_hash, subtrees_batch_size); session.add_subtree_sync_info( - self, SubtreePath::empty(), app_hash, None, diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index aabc8664..3e5eb36d 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -2,6 +2,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, fmt, marker::PhantomPinned, + mem, pin::Pin, }; @@ -148,6 +149,9 @@ impl<'tx> SubtreeStateSyncInfo<'tx> { /// Struct governing the state synchronization process. pub struct MultiStateSyncSession<'db> { + /// GroveDb instance to apply changes to + db: &'db GroveDb, + /// Map of currently processing subtrees. /// Keys are `SubtreePrefix` (path digests), and values are /// `SubtreeStateSyncInfo` for each subtree. @@ -163,6 +167,15 @@ pub struct MultiStateSyncSession<'db> { /// Version of the state synchronization protocol. pub(crate) version: u16, + /// Maximum number of subtrees that can be processed in a single batch. + subtrees_batch_size: usize, + + /// Counter tracking the number of subtrees processed in the current batch. + num_processed_subtrees_in_batch: usize, + + /// Metadata for newly discovered subtrees that are pending processing. + pending_discovered_subtrees: Option, + /// Transaction used for the synchronization process. /// This is placed last to ensure it is dropped last. transaction: Transaction<'db>, @@ -173,13 +186,17 @@ pub struct MultiStateSyncSession<'db> { impl<'db> MultiStateSyncSession<'db> { /// Initializes a new state sync session. - pub fn new(transaction: Transaction<'db>, app_hash: [u8; 32]) -> Pin> { + pub fn new(db: &'db GroveDb, app_hash: [u8; 32], subtrees_batch_size: usize) -> Pin> { Box::pin(MultiStateSyncSession { - transaction, + db, + transaction: db.start_transaction(), current_prefixes: Default::default(), processed_prefixes: Default::default(), app_hash, version: CURRENT_STATE_SYNC_VERSION, + subtrees_batch_size, + num_processed_subtrees_in_batch: 0, + pending_discovered_subtrees: None, _pin: PhantomPinned, }) } @@ -195,13 +212,27 @@ impl<'db> MultiStateSyncSession<'db> { } } + if self.pending_discovered_subtrees.is_some() { + return false; + } + true } - pub fn into_transaction(self: Pin>) -> Transaction<'db> { - // SAFETY: the struct isn't used anymore and no one will refer to transaction - // address again - unsafe { Pin::into_inner_unchecked(self) }.transaction + pub fn commit(self: Pin>) -> Result<(), Error> { + // SAFETY: the struct isn't used anymore and no storage contexts would acccess + // transaction + let session = unsafe { Pin::into_inner_unchecked(self) }; + session.db.commit_transaction(session.transaction).unwrap() + } + + // SAFETY: This is unsafe as it requires `self.current_prefixes` to be empty + unsafe fn set_new_transaction( + self: &mut Pin>>, + ) -> Result<(), Error> { + let this = Pin::as_mut(self).get_unchecked_mut(); + let old_tx = mem::replace(&mut this.transaction, this.db.start_transaction()); + self.db.commit_transaction(old_tx).unwrap() } /// Adds synchronization information for a subtree into the current @@ -242,7 +273,6 @@ impl<'db> MultiStateSyncSession<'db> { /// lifetime guarantees are respected. pub fn add_subtree_sync_info<'b, B: AsRef<[u8]>>( self: &mut Pin>>, - db: &'db GroveDb, path: SubtreePath<'b, B>, hash: CryptoHash, actual_hash: Option, @@ -255,7 +285,8 @@ impl<'db> MultiStateSyncSession<'db> { }; if let Ok((merk, root_key, tree_type)) = - db.open_merk_for_replication(path.clone(), transaction_ref, grove_version) + self.db + .open_merk_for_replication(path.clone(), transaction_ref, grove_version) { let restorer = Restorer::new(merk, hash, actual_hash); let mut sync_info = SubtreeStateSyncInfo::new(restorer); @@ -295,6 +326,20 @@ impl<'db> MultiStateSyncSession<'db> { &mut unsafe { self.get_unchecked_mut() }.processed_prefixes } + fn num_processed_subtrees_in_batch(self: Pin<&mut MultiStateSyncSession<'db>>) -> &mut usize { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.num_processed_subtrees_in_batch + } + + fn pending_discovered_subtrees( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut Option { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.pending_discovered_subtrees + } + /// Applies a chunk during the state synchronization process. /// This method should be called by ABCI when the `ApplySnapshotChunk` /// method is invoked. @@ -311,9 +356,9 @@ impl<'db> MultiStateSyncSession<'db> { /// GroveDB version. /// /// # Returns - /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as - /// a vector of bytes) that can be fetched from sources for further - /// synchronization. + /// - `Ok(Vec>)`: A tuple of: vector of global chunk IDs (each + /// represented as a vector of bytes) that can be fetched from sources for + /// further synchronization. /// - `Err(Error)`: An error if the chunk application fails or if the chunk /// proof is invalid. /// @@ -332,7 +377,6 @@ impl<'db> MultiStateSyncSession<'db> { /// preserving consistency during the synchronization process. pub fn apply_chunk( self: &mut Pin>>, - db: &'db GroveDb, packed_global_chunk_ids: &[u8], packed_global_chunks: &[u8], version: u16, @@ -431,29 +475,44 @@ impl<'db> MultiStateSyncSession<'db> { let completed_path = subtree_state_sync.current_path.clone(); // Subtree is finished. We can save it. - if subtree_state_sync.num_processed_chunks > 0 { - if let Some(prefix_data) = current_prefixes.remove(&chunk_prefix) { + let is_subtree_empty = subtree_state_sync.num_processed_chunks == 0; + if let Some(prefix_data) = current_prefixes.remove(&chunk_prefix) { + if !is_subtree_empty { if let Err(err) = prefix_data.restorer.finalize(grove_version) { return Err(Error::InternalError(format!( "Unable to finalize Merk: {:?}", err ))); } - } else { - return Err(Error::InternalError(format!( - "Prefix {:?} does not exist in current_prefixes", - chunk_prefix - ))); } + } else { + return Err(Error::InternalError(format!( + "Prefix {:?} does not exist in current_prefixes", + chunk_prefix + ))); } self.as_mut().processed_prefixes().insert(chunk_prefix); + *self.as_mut().num_processed_subtrees_in_batch() += 1; + let new_subtrees_metadata = - self.discover_new_subtrees_metadata(db, &completed_path, grove_version)?; + self.discover_new_subtrees_metadata(&completed_path, grove_version)?; - if let Ok(res) = - self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) + if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size { + match self.as_mut().pending_discovered_subtrees() { + None => { + *self.as_mut().pending_discovered_subtrees() = + Some(new_subtrees_metadata); + } + Some(existing_subtrees_metadata) => { + existing_subtrees_metadata + .data + .extend(new_subtrees_metadata.data); + } + } + } else if let Ok(res) = + self.prepare_sync_state_sessions(new_subtrees_metadata, grove_version) { next_chunk_ids.extend(res); next_global_chunk_ids.extend(next_chunk_ids); @@ -465,11 +524,44 @@ impl<'db> MultiStateSyncSession<'db> { } } + if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size + && self.current_prefixes.is_empty() + { + // SAFETY: we made sure `self.current_prefixes` is empty so there are no + // references to the transaction we're about to replace + unsafe { + self.set_new_transaction()?; + } + + let new_subtrees_metadata = + self.as_mut() + .pending_discovered_subtrees() + .take() + .ok_or(Error::CorruptedData( + "No pending subtrees available for resume_sync".to_string(), + ))?; + *self.as_mut().num_processed_subtrees_in_batch() = 0; + + let mut next_chunk_ids = vec![]; + + if let Ok(discovered_chunk_ids) = + self.prepare_sync_state_sessions(new_subtrees_metadata, grove_version) + { + next_chunk_ids.extend(discovered_chunk_ids); + next_global_chunk_ids.extend(next_chunk_ids); + } else { + return Err(Error::InternalError( + "Unable to discover Subtrees".to_string(), + )); + } + } + let mut res: Vec> = vec![]; for grouped_next_global_chunk_ids in next_global_chunk_ids.chunks(CONST_GROUP_PACKING_SIZE) { res.push(pack_nested_bytes(grouped_next_global_chunk_ids.to_vec())); } + Ok(res) } @@ -505,7 +597,6 @@ impl<'db> MultiStateSyncSession<'db> { /// should be used carefully to maintain session integrity. fn discover_new_subtrees_metadata( self: &mut Pin>>, - db: &'db GroveDb, path_vec: &[Vec], grove_version: &GroveVersion, ) -> Result { @@ -515,7 +606,8 @@ impl<'db> MultiStateSyncSession<'db> { }; let subtree_path: Vec<&[u8]> = path_vec.iter().map(|vec| vec.as_slice()).collect(); let path: &[&[u8]] = &subtree_path; - let merk = db + let merk = self + .db .open_transactional_merk_at_path(path.into(), transaction_ref, None, grove_version) .value .map_err(|e| Error::CorruptedData(format!("failed to open merk by path-tx:{}", e)))?; @@ -595,7 +687,6 @@ impl<'db> MultiStateSyncSession<'db> { /// ensure seamless state synchronization. fn prepare_sync_state_sessions( self: &mut Pin>>, - db: &'db GroveDb, subtrees_metadata: SubtreesMetadata, grove_version: &GroveVersion, ) -> Result>, Error> { @@ -612,7 +703,6 @@ impl<'db> MultiStateSyncSession<'db> { let path: &[&[u8]] = &subtree_path; let next_chunks_ids = self.add_subtree_sync_info( - db, path.into(), *elem_value_hash, Some(*actual_value_hash), diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index d277e27d..045b6819 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -269,7 +269,8 @@ fn sync_db_demo( ) -> Result<(), grovedb::Error> { let start_time = Instant::now(); let app_hash = source_db.root_hash(None, grove_version).value.unwrap(); - let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION, grove_version)?; + const SUBTREES_BATCH_SIZE: u32 = 2; // Small value for demo purposes + let mut session = target_db.start_snapshot_syncing(app_hash, SUBTREES_BATCH_SIZE, CURRENT_STATE_SYNC_VERSION, grove_version)?; let mut chunk_queue : VecDeque> = VecDeque::new(); @@ -280,14 +281,14 @@ fn sync_db_demo( while let Some(chunk_id) = chunk_queue.pop_front() { num_chunks += 1; let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION, grove_version)?; - - let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), &ops, CURRENT_STATE_SYNC_VERSION, grove_version)?; + let more_chunks = session.apply_chunk(chunk_id.as_slice(), &ops, CURRENT_STATE_SYNC_VERSION, grove_version)?; chunk_queue.extend(more_chunks); } println!("num_chunks: {}", num_chunks); if session.is_sync_completed() { - target_db.commit_session(session).expect("failed to commit session"); + println!("state_sync completed"); + target_db.commit_session(session)?; } let elapsed = start_time.elapsed(); println!("state_synced in {:.2?}", elapsed);