-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: intermediate tx commiting in state sync #361
Conversation
WalkthroughThis pull request updates the synchronization process in GroveDB. The changes introduce a new Changes
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@@ -204,6 +221,14 @@ impl<'db> MultiStateSyncSession<'db> { | |||
unsafe { Pin::into_inner_unchecked(self) }.transaction | |||
} | |||
|
|||
pub unsafe fn set_new_transaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub unsafe
is too much, and no safety docs even
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fominok reduced the scope of unsafe.
Is there a better way to set a new transaction without consuming the session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
tutorials/src/bin/replication.rs (2)
272-272
: Use a configurable batch size instead of a hardcoded value
The value "2" may be too small for larger sync operations or could be suboptimal in other scenarios. Consider exposing this as a configuration parameter to allow better tuning.
303-303
: Enhance logging messages
A single "is_sync_completed()" message may not be sufficiently descriptive. Consider including information about elapsed time or next steps, if applicable.grovedb/src/replication/state_sync_session.rs (1)
224-230
: Exercise caution usingunsafe
Theset_new_transaction
method requires anunsafe
block. Double-check that no references to the old transaction remain active. If possible, explain the invariants in comments for future maintainers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
grovedb/src/replication.rs
(4 hunks)grovedb/src/replication/state_sync_session.rs
(7 hunks)tutorials/src/bin/replication.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Tests
- GitHub Check: Linting
- GitHub Check: Compilation errors
- GitHub Check: Code Coverage
🔇 Additional comments (10)
tutorials/src/bin/replication.rs (2)
280-289
: Intermediate commit logic is conceptually sound
The usage ofintermediate_commit
helps avoid large monolithic transactions; this is beneficial for memory and stability. Verify that partial commits won’t cause unexpected inconsistencies if a crash/error occurs immediately after.
291-297
: Check usage of unsafe block for transaction replacement
Replacing the transaction viaunsafe
is likely required, but carefully confirm that no references to the old transaction remain in use. If possible, provide a safer abstraction or thoroughly document why unsafe is necessary here.grovedb/src/replication.rs (3)
35-41
: Validatesubtrees_batch_size
before usage
When starting a syncing session, check if the user-providedsubtrees_batch_size
is zero or extremely large. This might lead to edge cases or potential panics in subsequent code.
191-192
: Documentation looks consistent
The new documentation forsubtrees_batch_size
properly reflects its usage. Nice job keeping comments updated.
223-225
: Ensure subtrees_batch_size behaves as intended
The updated method signature now includessubtrees_batch_size
. Consider adding unit tests to confirm that subtrees are correctly batched and that boundary cases (like 0 or extremely large sizes) are handled.Also applies to: 245-245
grovedb/src/replication/state_sync_session.rs (5)
5-5
: Confirm necessity ofstd::mem
import
This import is presumably formem::replace
inset_new_transaction
. Ensure there are no inadvertent side effects or unintentional usage.
171-179
: Batch-related fields introduced
The fieldssubtrees_batch_size
,num_processed_subtrees_in_batch
, andpending_discovered_subtrees
effectively manage partial synchronization. Be sure to prevent negative values (or zero) forsubtrees_batch_size
from causing infinite loops or skipping commits.
185-190
: Constructor correctness
The new constructor properly sets up the batch fields, but consider exposingversion
as a parameter in case multiple protocols are used in the future. For now, it’s fine that it defaults toCURRENT_STATE_SYNC_VERSION
.Also applies to: 197-200
323-327
: Field access is straightforward and safe
These helper methods for returning mutable references to session fields are simple and used internally. They appear correct as long as the unsafe block is limited to safe usage of these references.Also applies to: 329-335
353-357
: Return value documentation is aligned
The docstring and function signature now clearly convey thatapply_chunk
returns a boolean indicating the need for an intermediate commit. This is helpful for orchestrating partial commits.Also applies to: 380-380
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
tutorials/src/bin/replication.rs (5)
272-272
: Replace hardcoded batch size with a named constantThe hardcoded value
2
for the subtrees batch size should be replaced with a named constant to improve code readability and maintainability.- let mut session = target_db.start_snapshot_syncing(app_hash, 2, CURRENT_STATE_SYNC_VERSION, grove_version)?; + const SUBTREES_BATCH_SIZE: u32 = 2; // Small value for demo purposes + let mut session = target_db.start_snapshot_syncing(app_hash, SUBTREES_BATCH_SIZE, CURRENT_STATE_SYNC_VERSION, grove_version)?;
283-294
: Add error handling for intermediate transaction operationsWhile the code commits the old transaction, it doesn't handle potential errors that might occur during this process or during the resumption of sync. Consider adding specific error handling for these operations.
- if intermediate_commit { - let old_tx = unsafe { - session.as_mut().set_new_transaction(target_db.start_transaction()) - }; - target_db.commit_transaction(old_tx).value?; - let more_new_chunks = session.resume_sync(&target_db, CURRENT_STATE_SYNC_VERSION, grove_version)?; - chunk_queue.extend(more_new_chunks); - } - else { - chunk_queue.extend(more_chunks); - } + if intermediate_commit { + // Perform intermediate commit and resume synchronization + let old_tx = unsafe { + session.as_mut().set_new_transaction(target_db.start_transaction()) + }; + + match target_db.commit_transaction(old_tx) { + Ok(result) => { + if let Err(e) = result { + println!("Error committing intermediate transaction: {:?}", e); + return Err(e); + } + + // Resume sync after successful commit + match session.resume_sync(&target_db, CURRENT_STATE_SYNC_VERSION, grove_version) { + Ok(more_new_chunks) => chunk_queue.extend(more_new_chunks), + Err(e) => { + println!("Error resuming sync after intermediate commit: {:?}", e); + return Err(e); + } + } + }, + Err(e) => { + println!("Failed to commit intermediate transaction: {:?}", e); + return Err(e); + } + } + } else { + chunk_queue.extend(more_chunks); + }
283-294
: Add logging and metrics for intermediate commitsFor production use, it would be beneficial to add logging and metrics to track the number of intermediate commits performed during synchronization. This would help monitor the effectiveness of this solution and identify potential issues.
+ let mut intermediate_commit_count = 0; ... if intermediate_commit { + intermediate_commit_count += 1; + println!("Performing intermediate commit #{}", intermediate_commit_count); let old_tx = unsafe { session.as_mut().set_new_transaction(target_db.start_transaction()) }; ... } + println!("Total intermediate commits: {}", intermediate_commit_count); println!("num_chunks: {}", num_chunks);
299-301
: Improve error handling in commit sessionThe error handling for committing the session has been simplified, but consider adding more detailed error messages to help debugging if a failure occurs.
if session.is_sync_completed() { println!("state_sync completed"); - target_db.commit_session(session)?; + match target_db.commit_session(session) { + Ok(_) => println!("Successfully committed sync session"), + Err(e) => { + println!("Failed to commit sync session: {:?}", e); + return Err(e); + } + } }
265-307
: Add comments explaining the intermediate commit mechanismThe tutorial should include comments that explain the purpose and behavior of the intermediate commit mechanism, especially since this is a demonstration of a new feature.
fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, grove_version: &GroveVersion, ) -> Result<(), grovedb::Error> { + // This function demonstrates state synchronization with intermediate commits + // to reduce memory usage and minimize the risk of failures during large state transfers. + // + // The synchronization process works as follows: + // 1. Start a snapshot syncing session with a specified batch size + // 2. Process chunks one by one + // 3. When a batch is complete (indicated by intermediate_commit=true): + // - Commit the current transaction + // - Start a new transaction + // - Resume synchronization + // 4. When all chunks are processed, commit the final session
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
grovedb/src/replication.rs
(4 hunks)grovedb/src/replication/state_sync_session.rs
(8 hunks)tutorials/src/bin/replication.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- grovedb/src/replication.rs
- grovedb/src/replication/state_sync_session.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Tests
- GitHub Check: Compilation errors
- GitHub Check: Code Coverage
- GitHub Check: Linting
🔇 Additional comments (1)
tutorials/src/bin/replication.rs (1)
283-294
:❓ Verification inconclusive
Review the unsafe block and transaction handling
The current implementation uses an unsafe block to modify the session state and set a new transaction. The use of
unsafe
is a potential source of bugs if not properly handled.
🏁 Script executed:
#!/bin/bash # Check the implementation of the set_new_transaction method # to understand why unsafe is needed and ensure it's used correctly rg -A 10 "fn set_new_transaction" --type rustLength of output: 987
Confirm Correct Use of Unsafe in Transaction Handling
The call to
set_new_transaction
in the unsafe block (located ingrovedb/src/replication/state_sync_session.rs
) is designed to replace the current transaction while dealing with a pinned reference. Please verify that:
- Using
unsafe
viaget_unchecked_mut()
is fully justified in this context and that all required invariants for the pinned state are maintained.- The transaction lifecycle—starting a new transaction before the old one is committed—remains sound and free from race conditions or inconsistent states.
A brief inline comment documenting these invariants and the rationale for using
unsafe
would help future maintainers understand the design intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
grovedb/src/replication/state_sync_session.rs (1)
170-172
:⚠️ Potential issueValidate that subtrees_batch_size is not zero
The
subtrees_batch_size
field is used for determining when to commit intermediate transactions, but there's no validation to ensure it's not zero. A zero value would cause issues with the condition on line 534 always being true, leading to excessive transaction commits.Add validation in the constructor to prevent zero values:
pub fn new(db: &'db GroveDb, app_hash: [u8; 32], subtrees_batch_size: usize) -> Pin<Box<Self>> { + // Ensure subtrees_batch_size is not zero to prevent infinite transaction commits + let subtrees_batch_size = if subtrees_batch_size == 0 { 1 } else { subtrees_batch_size }; Box::pin(MultiStateSyncSession { db, transaction: db.start_transaction(), current_prefixes: Default::default(), processed_prefixes: Default::default(), app_hash, version: CURRENT_STATE_SYNC_VERSION, subtrees_batch_size, num_processed_subtrees_in_batch: 0, pending_discovered_subtrees: None, _pin: PhantomPinned, }) }This aligns with the comment in the past review addressing the zero-check issue.
🧹 Nitpick comments (4)
grovedb/src/replication/state_sync_session.rs (4)
229-235
: Improve safety documentation for set_new_transaction methodThe safety comment is brief and doesn't fully explain what makes this operation unsafe and what invariants must be maintained. The method replaces the transaction without proper safety guarantees beyond checking if current_prefixes is empty.
Enhance the safety documentation to clarify the invariants and constraints:
// SAFETY: This is unsafe as it requires `self.current_prefixes` to be empty // and no references to the transaction should exist elsewhere. The operation // commits the current transaction and creates a new one, potentially // invalidating any existing references to the old transaction. // Calling this method when references to the transaction still exist could // lead to use-after-free bugs and undefined behavior. unsafe fn set_new_transaction( self: &mut Pin<Box<MultiStateSyncSession<'db>>>, ) -> Result<(), Error> { let this = Pin::as_mut(self).get_unchecked_mut(); let old_tx = mem::replace(&mut this.transaction, this.db.start_transaction()); self.db.commit_transaction(old_tx).unwrap() }
501-524
: Refactor batch processing logic for better clarityThe conditional block for determining whether to store discovered subtrees as pending or process them immediately is complex and nested. Restructuring this logic would improve readability and maintainability.
- if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size { - match self.as_mut().pending_discovered_subtrees() { - None => { - *self.as_mut().pending_discovered_subtrees() = - Some(new_subtrees_metadata); - } - Some(existing_subtrees_metadata) => { - existing_subtrees_metadata - .data - .extend(new_subtrees_metadata.data); - } - } - } else { - if let Ok(res) = - self.prepare_sync_state_sessions(new_subtrees_metadata, grove_version) - { - next_chunk_ids.extend(res); - next_global_chunk_ids.extend(next_chunk_ids); - } else { - return Err(Error::InternalError( - "Unable to discover Subtrees".to_string(), - )); - } - } + // Check if we've reached the batch size limit + if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size { + // Store subtrees for later processing after intermediate commit + if let Some(existing_subtrees_metadata) = self.as_mut().pending_discovered_subtrees() { + // Extend existing metadata with the new discoveries + existing_subtrees_metadata + .data + .extend(new_subtrees_metadata.data); + } else { + // No existing metadata, create new + *self.as_mut().pending_discovered_subtrees() = Some(new_subtrees_metadata); + } + return Ok(next_global_chunk_ids); + } + + // Process newly discovered subtrees immediately + match self.prepare_sync_state_sessions(new_subtrees_metadata, grove_version) { + Ok(res) => { + next_chunk_ids.extend(res); + next_global_chunk_ids.extend(next_chunk_ids); + } + Err(_) => { + return Err(Error::InternalError( + "Unable to discover Subtrees".to_string(), + )); + } + }
544-544
: Consider adding transaction metrics for monitoringThe implementation now supports intermediate transaction committing to reduce memory usage, but lacks monitoring capabilities to track transaction sizes, frequency, and performance.
Adding metrics collection would help validate the effectiveness of this change and assist with tuning the
subtrees_batch_size
parameter:+ /// Metrics for tracking transaction behavior during state sync + struct StateSyncMetrics { + /// Total number of transactions committed + transactions_committed: usize, + /// Average number of subtrees per transaction + avg_subtrees_per_transaction: f64, + /// Maximum number of subtrees seen in a single transaction + max_subtrees_in_transaction: usize, + } + + impl StateSyncMetrics { + fn new() -> Self { + StateSyncMetrics { + transactions_committed: 0, + avg_subtrees_per_transaction: 0.0, + max_subtrees_in_transaction: 0, + } + } + + fn record_transaction(&mut self, subtrees_count: usize) { + self.transactions_committed += 1; + self.max_subtrees_in_transaction = self.max_subtrees_in_transaction.max(subtrees_count); + + // Update running average + self.avg_subtrees_per_transaction = + ((self.avg_subtrees_per_transaction * (self.transactions_committed - 1) as f64) + + subtrees_count as f64) / self.transactions_committed as f64; + } + }This would integrate with the
set_new_transaction
method to record metrics when transactions are committed.
187-202
: Store the original subtrees_batch_size configuration for future referenceThe code doesn't retain the original
subtrees_batch_size
parameter, making it difficult to reset or adjust the batch size dynamically if needed during long-running operations.pub struct MultiStateSyncSession<'db> { /// GroveDb instance to apply changes to db: &'db GroveDb, /// Map of currently processing subtrees. /// Keys are `SubtreePrefix` (path digests), and values are /// `SubtreeStateSyncInfo` for each subtree. current_prefixes: BTreeMap<SubtreePrefix, SubtreeStateSyncInfo<'db>>, /// Set of processed prefixes, represented as `SubtreePrefix` (path /// digests). processed_prefixes: BTreeSet<SubtreePrefix>, /// Root application hash (`app_hash`). app_hash: [u8; 32], /// Version of the state synchronization protocol. pub(crate) version: u16, /// Maximum number of subtrees that can be processed in a single batch. subtrees_batch_size: usize, + + /// The original configured batch size, useful for resetting after adjustments + original_subtrees_batch_size: usize, /// Counter tracking the number of subtrees processed in the current batch. num_processed_subtrees_in_batch: usize, /// Metadata for newly discovered subtrees that are pending processing. pending_discovered_subtrees: Option<SubtreesMetadata>, /// Transaction used for the synchronization process. /// This is placed last to ensure it is dropped last. transaction: Transaction<'db>, /// Marker to ensure this struct is not moved in memory. _pin: PhantomPinned, } impl<'db> MultiStateSyncSession<'db> { /// Initializes a new state sync session. pub fn new(db: &'db GroveDb, app_hash: [u8; 32], subtrees_batch_size: usize) -> Pin<Box<Self>> { // Ensure subtrees_batch_size is not zero to prevent infinite transaction commits let subtrees_batch_size = if subtrees_batch_size == 0 { 1 } else { subtrees_batch_size }; Box::pin(MultiStateSyncSession { db, transaction: db.start_transaction(), current_prefixes: Default::default(), processed_prefixes: Default::default(), app_hash, version: CURRENT_STATE_SYNC_VERSION, subtrees_batch_size, + original_subtrees_batch_size: subtrees_batch_size, num_processed_subtrees_in_batch: 0, pending_discovered_subtrees: None, _pin: PhantomPinned, }) } + /// Resets the batch size to the original configured value + pub fn reset_batch_size(&mut self) { + self.subtrees_batch_size = self.original_subtrees_batch_size; + } + + /// Adjusts the batch size dynamically based on system conditions + pub fn adjust_batch_size(&mut self, new_size: usize) { + self.subtrees_batch_size = if new_size == 0 { 1 } else { new_size }; + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
grovedb/src/replication.rs
(4 hunks)grovedb/src/replication/state_sync_session.rs
(10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- grovedb/src/replication.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Tests
- GitHub Check: Linting
- GitHub Check: Compilation errors
- GitHub Check: Code Coverage
🔇 Additional comments (5)
grovedb/src/replication/state_sync_session.rs (5)
599-610
: Improve error message clarity in resume_sync methodThe error message when no pending subtrees are available or when the protocol version is unsupported is not specific enough, which might make debugging more difficult.
if version != self.version { return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), + format!("Protocol version mismatch: expected {}, got {}", self.version, version), )); } let new_subtrees_metadata = self.as_mut() .pending_discovered_subtrees() .take() .ok_or(Error::CorruptedData( - "No pending subtrees available for resume_sync".to_string(), + "No pending subtrees available for resume_sync. This may indicate an incorrect sync flow.".to_string(), ))?;This aligns with the comment from a previous review about clarifying error messages.
222-226
: Good transaction handling approachThe
commit
method is cleanly implemented, properly using thePin
API to safely consume the session and commit the transaction. The unsafe block is used appropriately to convert from a pinned box to a regular value.
173-178
: Well-designed fields for batch trackingThe addition of the
num_processed_subtrees_in_batch
andpending_discovered_subtrees
fields effectively supports the intermediate transaction committing functionality, allowing the system to track progress and resume synchronization after commits.
612-625
: Well-implemented resume_sync functionThe
resume_sync
method correctly handles the continuation of synchronization after intermediate commits. It validates the protocol version, retrieves and processes pending subtrees, and resets the batch counter.
534-543
:❓ Verification inconclusive
Ensure transaction safety with comprehensive checking
The transaction replacement logic checks only if
current_prefixes
is empty, but doesn't verify or handle other potential references to the transaction. Additionally, there's no error handling for the transaction commit.
🏁 Script executed:
#!/bin/bash # Find all places where transaction or its references are used echo "Searching for transaction references in the codebase..." rg -A 2 -B 2 "transaction" --glob "*.rs" | grep -v "start_transaction" | head -n 20 # Check specifically for uses of the transaction field echo "Looking for direct access to the transaction field..." ast-grep --pattern 'self.transaction' | head -n 10Length of output: 3193
Transaction Safety: Verify Additional References & Error Handling
The segment in
grovedb/src/replication/state_sync_session.rs
currently relies solely on checking thatself.current_prefixes
is empty before replacing the transaction. Our repository search did not reveal additional transaction references within this module—references toself.transaction
were only found in other parts of the code (e.g., in the RocksDB storage context). This suggests that the design intentionally limits the lifetime of transaction references within this context.That said, please review the following:
- Reference Assurance: Confirm that no other indirect references to the transaction exist in this workflow beyond
current_prefixes
. If any are used elsewhere (even in related modules), ensure they are properly dropped or managed before callingset_new_transaction
.- Error Handling: Currently, any error from
unsafe { self.set_new_transaction()?; }
propagates upward using the?
operator. Consider if additional context, logging, or a recovery/rollback mechanism is needed when the transaction commit (or replacement) fails.If the design intentionally restricts transaction references to what’s tracked by
current_prefixes
and upstream error handling is sufficient, then this code can remain as is. However, it’s worthwhile to reassess both the reference tracking and the error-reporting strategy during the next review.
/// - `Ok(Vec<Vec<u8>>)`: A vector of global chunk IDs (each represented as | ||
/// a vector of bytes) that can be fetched from sources for further | ||
/// synchronization. | ||
/// - `Ok(Vec<Vec<u8>>)`: A tuple of: vector of global chunk IDs (each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no tuple anymore
if let Ok(res) = | ||
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) | ||
if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size { | ||
match self.as_mut().pending_discovered_subtrees() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this kind of match is preferred as if let Some(existing_subtrees_metadata) = .. {} else {}
Issue being fixed or feature implemented
Because tx sessions on GroveDB are held in memory, single-tx state sync consumes excessive memory and has a very high failure risk (connection lost, timeout, crash, etc.) especially for mainnet syncing.
What was done?
This PR allows state sync to be conducted with intermediate transaction commits instead of a single one.
How Has This Been Tested?
Replication tutorial
Breaking Changes
Checklist:
For repository code-owners and collaborators only
Summary by CodeRabbit
New Features
Documentation