Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: intermediate tx commiting in state sync #361

Merged
merged 5 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions grovedb/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@

#[cfg(feature = "minimal")]
impl GroveDb {
pub fn start_syncing_session(&self, app_hash: [u8; 32]) -> Pin<Box<MultiStateSyncSession>> {
MultiStateSyncSession::new(self.start_transaction(), app_hash)
pub fn start_syncing_session(
&self,
app_hash: [u8; 32],
subtrees_batch_size: usize,
) -> Pin<Box<MultiStateSyncSession>> {
MultiStateSyncSession::new(self.start_transaction(), app_hash, subtrees_batch_size)
}

pub fn commit_session(&self, session: Pin<Box<MultiStateSyncSession>>) -> Result<(), Error> {
Expand Down Expand Up @@ -149,7 +153,7 @@
})?;
for chunk_id in nested_chunk_ids
.is_empty()
.then(|| Vec::new())

Check warning on line 156 in grovedb/src/replication.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant closure

warning: redundant closure --> grovedb/src/replication.rs:156:27 | 156 | .then(|| Vec::new()) | ^^^^^^^^^^^^^ help: replace the closure with the function itself: `Vec::new` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure = note: `#[warn(clippy::redundant_closure)]` on by default
.into_iter()
.chain(nested_chunk_ids.into_iter())
{
Expand Down Expand Up @@ -184,6 +188,8 @@
///
/// # Parameters
/// - `app_hash`: The root hash of the application state to synchronize.
/// - `subtrees_batch_size`: Maximum number of subtrees that can be
/// processed in a single batch.
/// - `version`: The version of the state sync protocol to use.
/// - `grove_version`: The version of GroveDB being used.
///
Expand Down Expand Up @@ -216,6 +222,7 @@
pub fn start_snapshot_syncing(
&self,
app_hash: CryptoHash,
subtrees_batch_size: usize,
version: u16,
grove_version: &GroveVersion,
) -> Result<Pin<Box<MultiStateSyncSession>>, Error> {
Expand All @@ -235,7 +242,7 @@

let root_prefix = [0u8; 32];

let mut session = self.start_syncing_session(app_hash);
let mut session = self.start_syncing_session(app_hash, subtrees_batch_size);

session.add_subtree_sync_info(
self,
Expand Down
175 changes: 162 additions & 13 deletions grovedb/src/replication/state_sync_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
collections::{BTreeMap, BTreeSet},
fmt,
marker::PhantomPinned,
mem,
pin::Pin,
};

Expand Down Expand Up @@ -167,19 +168,35 @@
/// This is placed last to ensure it is dropped last.
transaction: Transaction<'db>,

/// Maximum number of subtrees that can be processed in a single batch.
subtrees_batch_size: usize,

/// Counter tracking the number of subtrees processed in the current batch.
num_processed_subtrees_in_batch: usize,

/// Metadata for newly discovered subtrees that are pending processing.
pending_discovered_subtrees: Option<SubtreesMetadata>,

/// Marker to ensure this struct is not moved in memory.
_pin: PhantomPinned,
}

impl<'db> MultiStateSyncSession<'db> {
/// Initializes a new state sync session.
pub fn new(transaction: Transaction<'db>, app_hash: [u8; 32]) -> Pin<Box<Self>> {
pub fn new(
transaction: Transaction<'db>,
app_hash: [u8; 32],
subtrees_batch_size: usize,
) -> Pin<Box<Self>> {
Box::pin(MultiStateSyncSession {
transaction,
current_prefixes: Default::default(),
processed_prefixes: Default::default(),
app_hash,
version: CURRENT_STATE_SYNC_VERSION,
subtrees_batch_size,
num_processed_subtrees_in_batch: 0,
pending_discovered_subtrees: None,
_pin: PhantomPinned,
})
}
Expand All @@ -204,6 +221,14 @@
unsafe { Pin::into_inner_unchecked(self) }.transaction
}

pub unsafe fn set_new_transaction(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub unsafe is too much, and no safety docs even

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fominok reduced the scope of unsafe.
Is there a better way to set a new transaction without consuming the session?

self: Pin<&mut Self>,
new_tx: Transaction<'db>,
) -> Transaction<'db> {

Check warning on line 227 in grovedb/src/replication/state_sync_session.rs

View workflow job for this annotation

GitHub Actions / clippy

unsafe function's docs are missing a `# Safety` section

warning: unsafe function's docs are missing a `# Safety` section --> grovedb/src/replication/state_sync_session.rs:224:5 | 224 | / pub unsafe fn set_new_transaction( 225 | | self: Pin<&mut Self>, 226 | | new_tx: Transaction<'db>, 227 | | ) -> Transaction<'db> { | |_________________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#missing_safety_doc = note: `#[warn(clippy::missing_safety_doc)]` on by default
let this = self.get_unchecked_mut();
mem::replace(&mut this.transaction, new_tx)
}

/// Adds synchronization information for a subtree into the current
/// synchronization session.
///
Expand Down Expand Up @@ -295,6 +320,20 @@
&mut unsafe { self.get_unchecked_mut() }.processed_prefixes
}

fn num_processed_subtrees_in_batch(self: Pin<&mut MultiStateSyncSession<'db>>) -> &mut usize {
// SAFETY: no memory-sensitive assumptions are made about fields except the
// `transaciton` so it will be safe to modify them
&mut unsafe { self.get_unchecked_mut() }.num_processed_subtrees_in_batch
}

fn pending_discovered_subtrees(
self: Pin<&mut MultiStateSyncSession<'db>>,
) -> &mut Option<SubtreesMetadata> {
// SAFETY: no memory-sensitive assumptions are made about fields except the
// `transaciton` so it will be safe to modify them
&mut unsafe { self.get_unchecked_mut() }.pending_discovered_subtrees
}

/// Applies a chunk during the state synchronization process.
/// This method should be called by ABCI when the `ApplySnapshotChunk`
/// method is invoked.
Expand All @@ -311,9 +350,10 @@
/// GroveDB version.
///
/// # Returns
/// - `Ok(Vec<Vec<u8>>)`: A vector of global chunk IDs (each represented as
/// a vector of bytes) that can be fetched from sources for further
/// synchronization.
/// - `Ok((Vec<Vec<u8>>, bool))`: A tuple of: vector of global chunk IDs
/// (each represented as a vector of bytes) that can be fetched from
/// sources for further synchronization, and boolean indicating wether the
/// intermediate tx should be commited.
/// - `Err(Error)`: An error if the chunk application fails or if the chunk
/// proof is invalid.
///
Expand All @@ -337,7 +377,7 @@
packed_global_chunks: &[u8],
version: u16,
grove_version: &GroveVersion,
) -> Result<Vec<Vec<u8>>, Error> {
) -> Result<(Vec<Vec<u8>>, bool), Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
Expand Down Expand Up @@ -449,22 +489,131 @@

self.as_mut().processed_prefixes().insert(chunk_prefix);

*self.as_mut().num_processed_subtrees_in_batch() += 1;

let new_subtrees_metadata =
self.discover_new_subtrees_metadata(db, &completed_path, grove_version)?;

if let Ok(res) =
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version)
{
next_chunk_ids.extend(res);
next_global_chunk_ids.extend(next_chunk_ids);
if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size {
match self.as_mut().pending_discovered_subtrees() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of match is preferred as if let Some(existing_subtrees_metadata) = .. {} else {}

None => {
*self.as_mut().pending_discovered_subtrees() =
Some(new_subtrees_metadata);
}
Some(existing_subtrees_metadata) => {
existing_subtrees_metadata
.data
.extend(new_subtrees_metadata.data);
}
}
} else {
return Err(Error::InternalError(
"Unable to discover Subtrees".to_string(),
));
if let Ok(res) =
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version)
{
next_chunk_ids.extend(res);
next_global_chunk_ids.extend(next_chunk_ids);
} else {
return Err(Error::InternalError(
"Unable to discover Subtrees".to_string(),
));
}
}

Check warning on line 520 in grovedb/src/replication/state_sync_session.rs

View workflow job for this annotation

GitHub Actions / clippy

this `else { if .. }` block can be collapsed

warning: this `else { if .. }` block can be collapsed --> grovedb/src/replication/state_sync_session.rs:509:24 | 509 | } else { | ________________________^ 510 | | if let Ok(res) = 511 | | self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) ... | 520 | | } | |_________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_else_if = note: `#[warn(clippy::collapsible_else_if)]` on by default help: collapse nested if block | 509 ~ } else if let Ok(res) = 510 + self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) 511 + { 512 + next_chunk_ids.extend(res); 513 + next_global_chunk_ids.extend(next_chunk_ids); 514 + } else { 515 + return Err(Error::InternalError( 516 + "Unable to discover Subtrees".to_string(), 517 + )); 518 + } |
}
}

let mut res: Vec<Vec<u8>> = vec![];
for grouped_next_global_chunk_ids in next_global_chunk_ids.chunks(CONST_GROUP_PACKING_SIZE)
{
res.push(pack_nested_bytes(grouped_next_global_chunk_ids.to_vec()));
}
Ok((
res,
self.num_processed_subtrees_in_batch >= self.subtrees_batch_size,
))
}

/// Resumes the state synchronization process.
///
/// This function attempts to continue the state synchronization by
/// processing any pending discovered subtrees and preparing new
/// synchronization sessions.
///
/// # Parameters
///
/// - `self`: A pinned, boxed reference to `MultiStateSyncSession`, ensuring
/// the struct remains in place during processing.
/// - `db`: A reference to the `GroveDb` instance used for querying the
/// database.
/// - `version`: The state synchronization protocol version being used.
/// - `grove_version`: A reference to the `GroveVersion`, which represents
/// the version of the underlying database structure.
///
/// # Returns
///
/// - `Ok(Vec<Vec<u8>>)` if the synchronization process successfully
/// discovers and prepares new subtree chunks.
/// - `Err(Error)`: An error if the synchronization process encounters
/// issues, such as:
/// - The provided `version` does not match the expected protocol version.
/// - No pending discovered subtrees are available.
/// - Failure to discover new subtrees in the database.
///
/// # Errors
///
/// - Returns `Error::CorruptedData` if an unsupported state sync protocol
/// version is provided or if there are no pending discovered subtrees.
/// - Returns `Error::InternalError` if it fails to discover new subtrees.
///
/// # Behavior
///
/// - Validates that the provided protocol `version` matches the expected
/// `CURRENT_STATE_SYNC_VERSION`.
/// - Retrieves and clears any pending discovered subtrees.
/// - Resets the processed subtrees counter for the current batch.
/// - Attempts to prepare new synchronization state sessions.
/// - Packs the discovered chunk IDs into nested byte vectors before
/// returning them.
pub fn resume_sync(
self: &mut Pin<Box<MultiStateSyncSession<'db>>>,
db: &'db GroveDb,
version: u16,
grove_version: &GroveVersion,
) -> Result<Vec<Vec<u8>>, Error> {
// For now, only CURRENT_STATE_SYNC_VERSION is supported
if version != CURRENT_STATE_SYNC_VERSION {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}
if version != self.version {
return Err(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
));
}

let new_subtrees_metadata =
self.as_mut()
.pending_discovered_subtrees()
.take()
.ok_or(Error::CorruptedData(
"Unsupported state sync protocol version".to_string(),
))?;
*self.as_mut().num_processed_subtrees_in_batch() = 0;

let mut next_chunk_ids = vec![];
let mut next_global_chunk_ids: Vec<Vec<u8>> = vec![];

if let Ok(discovered_chunk_ids) =
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version)
{
next_chunk_ids.extend(discovered_chunk_ids);
next_global_chunk_ids.extend(next_chunk_ids);
} else {
return Err(Error::InternalError(
"Unable to discover Subtrees".to_string(),
));
}

let mut res: Vec<Vec<u8>> = vec![];
for grouped_next_global_chunk_ids in next_global_chunk_ids.chunks(CONST_GROUP_PACKING_SIZE)
{
Expand Down
30 changes: 22 additions & 8 deletions tutorials/src/bin/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,25 +269,39 @@ fn sync_db_demo(
) -> Result<(), grovedb::Error> {
let start_time = Instant::now();
let app_hash = source_db.root_hash(None, grove_version).value.unwrap();
let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION, grove_version)?;
let mut session = target_db.start_snapshot_syncing(app_hash, 2, CURRENT_STATE_SYNC_VERSION, grove_version)?;

let mut chunk_queue : VecDeque<Vec<u8>> = VecDeque::new();

// The very first chunk to fetch is always identified by the root app_hash
chunk_queue.push_back(app_hash.to_vec());

let mut num_chunks = 0;
while let Some(chunk_id) = chunk_queue.pop_front() {
num_chunks += 1;
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION, grove_version)?;

let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), &ops, CURRENT_STATE_SYNC_VERSION, grove_version)?;
chunk_queue.extend(more_chunks);
let mut intermediate_commit = false;
while chunk_queue.front().is_some() || intermediate_commit {
if let Some(chunk_id) = chunk_queue.pop_front() {
num_chunks += 1;
let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION, grove_version)?;
let (more_chunks, need_to_commit) = session.apply_chunk(&target_db, chunk_id.as_slice(), &ops, CURRENT_STATE_SYNC_VERSION, grove_version)?;
if need_to_commit {
intermediate_commit = true;
}
chunk_queue.extend(more_chunks);
} else {
let old_tx = unsafe {
session.as_mut().set_new_transaction(target_db.start_transaction())
};
target_db.commit_transaction(old_tx).value?;
intermediate_commit = false;
let more_new_chunks = session.resume_sync(&target_db, CURRENT_STATE_SYNC_VERSION, grove_version)?;
chunk_queue.extend(more_new_chunks);
}
}
println!("num_chunks: {}", num_chunks);

if session.is_sync_completed() {
target_db.commit_session(session).expect("failed to commit session");
println!("is_sync_completed()");
target_db.commit_session(session)?;
}
let elapsed = start_time.elapsed();
println!("state_synced in {:.2?}", elapsed);
Expand Down
Loading