Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: intermediate tx commiting in state sync #361

Merged
merged 5 commits into from
Mar 4, 2025

Conversation

ogabrielides
Copy link
Collaborator

@ogabrielides ogabrielides commented Mar 3, 2025

Issue being fixed or feature implemented

Because tx sessions on GroveDB are held in memory, single-tx state sync consumes excessive memory and has a very high failure risk (connection lost, timeout, crash, etc.) especially for mainnet syncing.

What was done?

This PR allows state sync to be conducted with intermediate transaction commits instead of a single one.

How Has This Been Tested?

Replication tutorial

Breaking Changes

Checklist:

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have added or updated relevant unit/integration/functional/e2e tests
  • I have made corresponding changes to the documentation

For repository code-owners and collaborators only

  • I have assigned this pull request to a milestone

Summary by CodeRabbit

  • New Features

    • Enhanced the replication synchronization process with flexible batch processing for smoother and more efficient state updates.
    • Improved workflow for handling intermediate commits, ensuring robust and uninterrupted synchronization.
    • Introduced a new method to resume synchronization, allowing for better management of discovered subtrees.
    • Added a constant for subtree batch size in the synchronization demo, refining session initiation.
  • Documentation

    • Updated guides to clearly explain the new synchronization options and improved processing flow.

Copy link
Contributor

coderabbitai bot commented Mar 3, 2025

Walkthrough

This pull request updates the synchronization process in GroveDB. The changes introduce a new subtrees_batch_size parameter to the start_syncing_session and start_snapshot_syncing methods, allowing control over the number of subtrees processed per batch. Modifications in the MultiStateSyncSession struct add new fields and methods for batch tracking and pending subtree management, and update the apply_chunk method to indicate when an intermediate commit is needed. The synchronization demo in the tutorials is updated to reflect these changes in the control flow.

Changes

File(s) Change Summary
grovedb/src/replication.rs Updated start_syncing_session & start_snapshot_syncing signatures to include subtrees_batch_size; documentation updated to specify new parameter.
grovedb/src/replication/state_sync_session.rs Updated MultiStateSyncSession constructor to accept subtrees_batch_size; added new fields (num_processed_subtrees_in_batch, pending_discovered_subtrees); introduced unsafe set_new_transaction; added commit and resume_sync methods; modified apply_chunk to return a tuple with commit flag.
tutorials/src/bin/replication.rs Modified sync_db_demo function to use a literal version (2) for snapshot syncing; implemented intermediate commit handling; replaced while loop with enhanced control flow that resumes synchronization with new transactions; added print statements and streamlined error handling.

Possibly related PRs

  • feat: state sync optimization #346: Introduces similar changes to replication methods by adding the subtrees_batch_size parameter, indicating a direct connection with modifications in session initiation.
  • feat: chunk packing #359: Updates the MultiStateSyncSession struct and the apply_chunk method for handling batch processes, directly relating to the batch tracking and synchronization logic changes in this PR.

Suggested reviewers

  • QuantumExplorer
  • fominok

Poem

I'm just a rabbit, hopping through the code,
With subtrees in batches along the sync road.
Each commit a little bounce, each patch a joyful leap,
Together in the system, our promises we keep.
🐇✨ Hooray for clean changes—up the data mountain we go!


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a125e2 and 19a572f.

📒 Files selected for processing (2)
  • grovedb/src/replication/state_sync_session.rs (11 hunks)
  • tutorials/src/bin/replication.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tutorials/src/bin/replication.rs
  • grovedb/src/replication/state_sync_session.rs

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@@ -204,6 +221,14 @@ impl<'db> MultiStateSyncSession<'db> {
unsafe { Pin::into_inner_unchecked(self) }.transaction
}

pub unsafe fn set_new_transaction(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub unsafe is too much, and no safety docs even

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fominok reduced the scope of unsafe.
Is there a better way to set a new transaction without consuming the session?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
tutorials/src/bin/replication.rs (2)

272-272: Use a configurable batch size instead of a hardcoded value
The value "2" may be too small for larger sync operations or could be suboptimal in other scenarios. Consider exposing this as a configuration parameter to allow better tuning.


303-303: Enhance logging messages
A single "is_sync_completed()" message may not be sufficiently descriptive. Consider including information about elapsed time or next steps, if applicable.

grovedb/src/replication/state_sync_session.rs (1)

224-230: Exercise caution using unsafe
The set_new_transaction method requires an unsafe block. Double-check that no references to the old transaction remain active. If possible, explain the invariants in comments for future maintainers.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cf999d7 and 07d1b28.

📒 Files selected for processing (3)
  • grovedb/src/replication.rs (4 hunks)
  • grovedb/src/replication/state_sync_session.rs (7 hunks)
  • tutorials/src/bin/replication.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Tests
  • GitHub Check: Linting
  • GitHub Check: Compilation errors
  • GitHub Check: Code Coverage
🔇 Additional comments (10)
tutorials/src/bin/replication.rs (2)

280-289: Intermediate commit logic is conceptually sound
The usage of intermediate_commit helps avoid large monolithic transactions; this is beneficial for memory and stability. Verify that partial commits won’t cause unexpected inconsistencies if a crash/error occurs immediately after.


291-297: Check usage of unsafe block for transaction replacement
Replacing the transaction via unsafe is likely required, but carefully confirm that no references to the old transaction remain in use. If possible, provide a safer abstraction or thoroughly document why unsafe is necessary here.

grovedb/src/replication.rs (3)

35-41: Validate subtrees_batch_size before usage
When starting a syncing session, check if the user-provided subtrees_batch_size is zero or extremely large. This might lead to edge cases or potential panics in subsequent code.


191-192: Documentation looks consistent
The new documentation for subtrees_batch_size properly reflects its usage. Nice job keeping comments updated.


223-225: Ensure subtrees_batch_size behaves as intended
The updated method signature now includes subtrees_batch_size. Consider adding unit tests to confirm that subtrees are correctly batched and that boundary cases (like 0 or extremely large sizes) are handled.

Also applies to: 245-245

grovedb/src/replication/state_sync_session.rs (5)

5-5: Confirm necessity of std::mem import
This import is presumably for mem::replace in set_new_transaction. Ensure there are no inadvertent side effects or unintentional usage.


171-179: Batch-related fields introduced
The fields subtrees_batch_size, num_processed_subtrees_in_batch, and pending_discovered_subtrees effectively manage partial synchronization. Be sure to prevent negative values (or zero) for subtrees_batch_size from causing infinite loops or skipping commits.


185-190: Constructor correctness
The new constructor properly sets up the batch fields, but consider exposing version as a parameter in case multiple protocols are used in the future. For now, it’s fine that it defaults to CURRENT_STATE_SYNC_VERSION.

Also applies to: 197-200


323-327: Field access is straightforward and safe
These helper methods for returning mutable references to session fields are simple and used internally. They appear correct as long as the unsafe block is limited to safe usage of these references.

Also applies to: 329-335


353-357: Return value documentation is aligned
The docstring and function signature now clearly convey that apply_chunk returns a boolean indicating the need for an intermediate commit. This is helpful for orchestrating partial commits.

Also applies to: 380-380

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
tutorials/src/bin/replication.rs (5)

272-272: Replace hardcoded batch size with a named constant

The hardcoded value 2 for the subtrees batch size should be replaced with a named constant to improve code readability and maintainability.

-    let mut session = target_db.start_snapshot_syncing(app_hash, 2, CURRENT_STATE_SYNC_VERSION, grove_version)?;
+    const SUBTREES_BATCH_SIZE: u32 = 2; // Small value for demo purposes
+    let mut session = target_db.start_snapshot_syncing(app_hash, SUBTREES_BATCH_SIZE, CURRENT_STATE_SYNC_VERSION, grove_version)?;

283-294: Add error handling for intermediate transaction operations

While the code commits the old transaction, it doesn't handle potential errors that might occur during this process or during the resumption of sync. Consider adding specific error handling for these operations.

-        if intermediate_commit {
-            let old_tx = unsafe {
-                session.as_mut().set_new_transaction(target_db.start_transaction())
-            };
-            target_db.commit_transaction(old_tx).value?;
-            let more_new_chunks = session.resume_sync(&target_db, CURRENT_STATE_SYNC_VERSION, grove_version)?;
-            chunk_queue.extend(more_new_chunks);
-        }
-        else {
-            chunk_queue.extend(more_chunks);
-        }
+        if intermediate_commit {
+            // Perform intermediate commit and resume synchronization
+            let old_tx = unsafe {
+                session.as_mut().set_new_transaction(target_db.start_transaction())
+            };
+            
+            match target_db.commit_transaction(old_tx) {
+                Ok(result) => {
+                    if let Err(e) = result {
+                        println!("Error committing intermediate transaction: {:?}", e);
+                        return Err(e);
+                    }
+                    
+                    // Resume sync after successful commit
+                    match session.resume_sync(&target_db, CURRENT_STATE_SYNC_VERSION, grove_version) {
+                        Ok(more_new_chunks) => chunk_queue.extend(more_new_chunks),
+                        Err(e) => {
+                            println!("Error resuming sync after intermediate commit: {:?}", e);
+                            return Err(e);
+                        }
+                    }
+                },
+                Err(e) => {
+                    println!("Failed to commit intermediate transaction: {:?}", e);
+                    return Err(e);
+                }
+            }
+        } else {
+            chunk_queue.extend(more_chunks);
+        }

283-294: Add logging and metrics for intermediate commits

For production use, it would be beneficial to add logging and metrics to track the number of intermediate commits performed during synchronization. This would help monitor the effectiveness of this solution and identify potential issues.

+    let mut intermediate_commit_count = 0;
...
         if intermediate_commit {
+            intermediate_commit_count += 1;
+            println!("Performing intermediate commit #{}", intermediate_commit_count);
             let old_tx = unsafe {
                 session.as_mut().set_new_transaction(target_db.start_transaction())
             };
...
     }
+    println!("Total intermediate commits: {}", intermediate_commit_count);
     println!("num_chunks: {}", num_chunks);

299-301: Improve error handling in commit session

The error handling for committing the session has been simplified, but consider adding more detailed error messages to help debugging if a failure occurs.

     if session.is_sync_completed() {
         println!("state_sync completed");
-        target_db.commit_session(session)?;
+        match target_db.commit_session(session) {
+            Ok(_) => println!("Successfully committed sync session"),
+            Err(e) => {
+                println!("Failed to commit sync session: {:?}", e);
+                return Err(e);
+            }
+        }
     }

265-307: Add comments explaining the intermediate commit mechanism

The tutorial should include comments that explain the purpose and behavior of the intermediate commit mechanism, especially since this is a demonstration of a new feature.

 fn sync_db_demo(
     source_db: &GroveDb,
     target_db: &GroveDb,
     grove_version: &GroveVersion,
 ) -> Result<(), grovedb::Error> {
+    // This function demonstrates state synchronization with intermediate commits
+    // to reduce memory usage and minimize the risk of failures during large state transfers.
+    // 
+    // The synchronization process works as follows:
+    // 1. Start a snapshot syncing session with a specified batch size
+    // 2. Process chunks one by one
+    // 3. When a batch is complete (indicated by intermediate_commit=true):
+    //    - Commit the current transaction
+    //    - Start a new transaction
+    //    - Resume synchronization
+    // 4. When all chunks are processed, commit the final session
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a4666eb and 33a05d5.

📒 Files selected for processing (3)
  • grovedb/src/replication.rs (4 hunks)
  • grovedb/src/replication/state_sync_session.rs (8 hunks)
  • tutorials/src/bin/replication.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • grovedb/src/replication.rs
  • grovedb/src/replication/state_sync_session.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Tests
  • GitHub Check: Compilation errors
  • GitHub Check: Code Coverage
  • GitHub Check: Linting
🔇 Additional comments (1)
tutorials/src/bin/replication.rs (1)

283-294:

❓ Verification inconclusive

Review the unsafe block and transaction handling

The current implementation uses an unsafe block to modify the session state and set a new transaction. The use of unsafe is a potential source of bugs if not properly handled.


🏁 Script executed:

#!/bin/bash
# Check the implementation of the set_new_transaction method
# to understand why unsafe is needed and ensure it's used correctly

rg -A 10 "fn set_new_transaction" --type rust

Length of output: 987


Confirm Correct Use of Unsafe in Transaction Handling

The call to set_new_transaction in the unsafe block (located in grovedb/src/replication/state_sync_session.rs) is designed to replace the current transaction while dealing with a pinned reference. Please verify that:

  • Using unsafe via get_unchecked_mut() is fully justified in this context and that all required invariants for the pinned state are maintained.
  • The transaction lifecycle—starting a new transaction before the old one is committed—remains sound and free from race conditions or inconsistent states.

A brief inline comment documenting these invariants and the rationale for using unsafe would help future maintainers understand the design intent.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
grovedb/src/replication/state_sync_session.rs (1)

170-172: ⚠️ Potential issue

Validate that subtrees_batch_size is not zero

The subtrees_batch_size field is used for determining when to commit intermediate transactions, but there's no validation to ensure it's not zero. A zero value would cause issues with the condition on line 534 always being true, leading to excessive transaction commits.

Add validation in the constructor to prevent zero values:

pub fn new(db: &'db GroveDb, app_hash: [u8; 32], subtrees_batch_size: usize) -> Pin<Box<Self>> {
+    // Ensure subtrees_batch_size is not zero to prevent infinite transaction commits
+    let subtrees_batch_size = if subtrees_batch_size == 0 { 1 } else { subtrees_batch_size };
    Box::pin(MultiStateSyncSession {
        db,
        transaction: db.start_transaction(),
        current_prefixes: Default::default(),
        processed_prefixes: Default::default(),
        app_hash,
        version: CURRENT_STATE_SYNC_VERSION,
        subtrees_batch_size,
        num_processed_subtrees_in_batch: 0,
        pending_discovered_subtrees: None,
        _pin: PhantomPinned,
    })
}

This aligns with the comment in the past review addressing the zero-check issue.

🧹 Nitpick comments (4)
grovedb/src/replication/state_sync_session.rs (4)

229-235: Improve safety documentation for set_new_transaction method

The safety comment is brief and doesn't fully explain what makes this operation unsafe and what invariants must be maintained. The method replaces the transaction without proper safety guarantees beyond checking if current_prefixes is empty.

Enhance the safety documentation to clarify the invariants and constraints:

// SAFETY: This is unsafe as it requires `self.current_prefixes` to be empty
// and no references to the transaction should exist elsewhere. The operation
// commits the current transaction and creates a new one, potentially
// invalidating any existing references to the old transaction.
// Calling this method when references to the transaction still exist could
// lead to use-after-free bugs and undefined behavior.
unsafe fn set_new_transaction(
    self: &mut Pin<Box<MultiStateSyncSession<'db>>>,
) -> Result<(), Error> {
    let this = Pin::as_mut(self).get_unchecked_mut();
    let old_tx = mem::replace(&mut this.transaction, this.db.start_transaction());
    self.db.commit_transaction(old_tx).unwrap()
}

501-524: Refactor batch processing logic for better clarity

The conditional block for determining whether to store discovered subtrees as pending or process them immediately is complex and nested. Restructuring this logic would improve readability and maintainability.

- if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size {
-     match self.as_mut().pending_discovered_subtrees() {
-         None => {
-             *self.as_mut().pending_discovered_subtrees() =
-                 Some(new_subtrees_metadata);
-         }
-         Some(existing_subtrees_metadata) => {
-             existing_subtrees_metadata
-                 .data
-                 .extend(new_subtrees_metadata.data);
-         }
-     }
- } else {
-     if let Ok(res) =
-         self.prepare_sync_state_sessions(new_subtrees_metadata, grove_version)
-     {
-         next_chunk_ids.extend(res);
-         next_global_chunk_ids.extend(next_chunk_ids);
-     } else {
-         return Err(Error::InternalError(
-             "Unable to discover Subtrees".to_string(),
-         ));
-     }
- }
+ // Check if we've reached the batch size limit
+ if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size {
+     // Store subtrees for later processing after intermediate commit
+     if let Some(existing_subtrees_metadata) = self.as_mut().pending_discovered_subtrees() {
+         // Extend existing metadata with the new discoveries
+         existing_subtrees_metadata
+             .data
+             .extend(new_subtrees_metadata.data);
+     } else {
+         // No existing metadata, create new
+         *self.as_mut().pending_discovered_subtrees() = Some(new_subtrees_metadata);
+     }
+     return Ok(next_global_chunk_ids);
+ }
+ 
+ // Process newly discovered subtrees immediately
+ match self.prepare_sync_state_sessions(new_subtrees_metadata, grove_version) {
+     Ok(res) => {
+         next_chunk_ids.extend(res);
+         next_global_chunk_ids.extend(next_chunk_ids);
+     }
+     Err(_) => {
+         return Err(Error::InternalError(
+             "Unable to discover Subtrees".to_string(),
+         ));
+     }
+ }

544-544: Consider adding transaction metrics for monitoring

The implementation now supports intermediate transaction committing to reduce memory usage, but lacks monitoring capabilities to track transaction sizes, frequency, and performance.

Adding metrics collection would help validate the effectiveness of this change and assist with tuning the subtrees_batch_size parameter:

+ /// Metrics for tracking transaction behavior during state sync
+ struct StateSyncMetrics {
+     /// Total number of transactions committed
+     transactions_committed: usize,
+     /// Average number of subtrees per transaction
+     avg_subtrees_per_transaction: f64,
+     /// Maximum number of subtrees seen in a single transaction
+     max_subtrees_in_transaction: usize,
+ }
+
+ impl StateSyncMetrics {
+     fn new() -> Self {
+         StateSyncMetrics {
+             transactions_committed: 0,
+             avg_subtrees_per_transaction: 0.0,
+             max_subtrees_in_transaction: 0,
+         }
+     }
+
+     fn record_transaction(&mut self, subtrees_count: usize) {
+         self.transactions_committed += 1;
+         self.max_subtrees_in_transaction = self.max_subtrees_in_transaction.max(subtrees_count);
+         
+         // Update running average
+         self.avg_subtrees_per_transaction = 
+             ((self.avg_subtrees_per_transaction * (self.transactions_committed - 1) as f64) 
+             + subtrees_count as f64) / self.transactions_committed as f64;
+     }
+ }

This would integrate with the set_new_transaction method to record metrics when transactions are committed.


187-202: Store the original subtrees_batch_size configuration for future reference

The code doesn't retain the original subtrees_batch_size parameter, making it difficult to reset or adjust the batch size dynamically if needed during long-running operations.

pub struct MultiStateSyncSession<'db> {
    /// GroveDb instance to apply changes to
    db: &'db GroveDb,

    /// Map of currently processing subtrees.
    /// Keys are `SubtreePrefix` (path digests), and values are
    /// `SubtreeStateSyncInfo` for each subtree.
    current_prefixes: BTreeMap<SubtreePrefix, SubtreeStateSyncInfo<'db>>,

    /// Set of processed prefixes, represented as `SubtreePrefix` (path
    /// digests).
    processed_prefixes: BTreeSet<SubtreePrefix>,

    /// Root application hash (`app_hash`).
    app_hash: [u8; 32],

    /// Version of the state synchronization protocol.
    pub(crate) version: u16,

    /// Maximum number of subtrees that can be processed in a single batch.
    subtrees_batch_size: usize,
+    
+    /// The original configured batch size, useful for resetting after adjustments
+    original_subtrees_batch_size: usize,

    /// Counter tracking the number of subtrees processed in the current batch.
    num_processed_subtrees_in_batch: usize,

    /// Metadata for newly discovered subtrees that are pending processing.
    pending_discovered_subtrees: Option<SubtreesMetadata>,

    /// Transaction used for the synchronization process.
    /// This is placed last to ensure it is dropped last.
    transaction: Transaction<'db>,

    /// Marker to ensure this struct is not moved in memory.
    _pin: PhantomPinned,
}

impl<'db> MultiStateSyncSession<'db> {
    /// Initializes a new state sync session.
    pub fn new(db: &'db GroveDb, app_hash: [u8; 32], subtrees_batch_size: usize) -> Pin<Box<Self>> {
        // Ensure subtrees_batch_size is not zero to prevent infinite transaction commits
        let subtrees_batch_size = if subtrees_batch_size == 0 { 1 } else { subtrees_batch_size };
        
        Box::pin(MultiStateSyncSession {
            db,
            transaction: db.start_transaction(),
            current_prefixes: Default::default(),
            processed_prefixes: Default::default(),
            app_hash,
            version: CURRENT_STATE_SYNC_VERSION,
            subtrees_batch_size,
+            original_subtrees_batch_size: subtrees_batch_size,
            num_processed_subtrees_in_batch: 0,
            pending_discovered_subtrees: None,
            _pin: PhantomPinned,
        })
    }
    
+    /// Resets the batch size to the original configured value
+    pub fn reset_batch_size(&mut self) {
+        self.subtrees_batch_size = self.original_subtrees_batch_size;
+    }
+    
+    /// Adjusts the batch size dynamically based on system conditions
+    pub fn adjust_batch_size(&mut self, new_size: usize) {
+        self.subtrees_batch_size = if new_size == 0 { 1 } else { new_size };
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 33a05d5 and 1a125e2.

📒 Files selected for processing (2)
  • grovedb/src/replication.rs (4 hunks)
  • grovedb/src/replication/state_sync_session.rs (10 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • grovedb/src/replication.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Tests
  • GitHub Check: Linting
  • GitHub Check: Compilation errors
  • GitHub Check: Code Coverage
🔇 Additional comments (5)
grovedb/src/replication/state_sync_session.rs (5)

599-610: Improve error message clarity in resume_sync method

The error message when no pending subtrees are available or when the protocol version is unsupported is not specific enough, which might make debugging more difficult.

if version != self.version {
    return Err(Error::CorruptedData(
-        "Unsupported state sync protocol version".to_string(),
+        format!("Protocol version mismatch: expected {}, got {}", self.version, version),
    ));
}

let new_subtrees_metadata =
    self.as_mut()
        .pending_discovered_subtrees()
        .take()
        .ok_or(Error::CorruptedData(
-            "No pending subtrees available for resume_sync".to_string(),
+            "No pending subtrees available for resume_sync. This may indicate an incorrect sync flow.".to_string(),
        ))?;

This aligns with the comment from a previous review about clarifying error messages.


222-226: Good transaction handling approach

The commit method is cleanly implemented, properly using the Pin API to safely consume the session and commit the transaction. The unsafe block is used appropriately to convert from a pinned box to a regular value.


173-178: Well-designed fields for batch tracking

The addition of the num_processed_subtrees_in_batch and pending_discovered_subtrees fields effectively supports the intermediate transaction committing functionality, allowing the system to track progress and resume synchronization after commits.


612-625: Well-implemented resume_sync function

The resume_sync method correctly handles the continuation of synchronization after intermediate commits. It validates the protocol version, retrieves and processes pending subtrees, and resets the batch counter.


534-543:

❓ Verification inconclusive

Ensure transaction safety with comprehensive checking

The transaction replacement logic checks only if current_prefixes is empty, but doesn't verify or handle other potential references to the transaction. Additionally, there's no error handling for the transaction commit.


🏁 Script executed:

#!/bin/bash
# Find all places where transaction or its references are used
echo "Searching for transaction references in the codebase..."
rg -A 2 -B 2 "transaction" --glob "*.rs" | grep -v "start_transaction" | head -n 20

# Check specifically for uses of the transaction field
echo "Looking for direct access to the transaction field..."
ast-grep --pattern 'self.transaction' | head -n 10

Length of output: 3193


Transaction Safety: Verify Additional References & Error Handling

The segment in grovedb/src/replication/state_sync_session.rs currently relies solely on checking that self.current_prefixes is empty before replacing the transaction. Our repository search did not reveal additional transaction references within this module—references to self.transaction were only found in other parts of the code (e.g., in the RocksDB storage context). This suggests that the design intentionally limits the lifetime of transaction references within this context.

That said, please review the following:

  • Reference Assurance: Confirm that no other indirect references to the transaction exist in this workflow beyond current_prefixes. If any are used elsewhere (even in related modules), ensure they are properly dropped or managed before calling set_new_transaction.
  • Error Handling: Currently, any error from unsafe { self.set_new_transaction()?; } propagates upward using the ? operator. Consider if additional context, logging, or a recovery/rollback mechanism is needed when the transaction commit (or replacement) fails.

If the design intentionally restricts transaction references to what’s tracked by current_prefixes and upstream error handling is sufficient, then this code can remain as is. However, it’s worthwhile to reassess both the reference tracking and the error-reporting strategy during the next review.

/// - `Ok(Vec<Vec<u8>>)`: A vector of global chunk IDs (each represented as
/// a vector of bytes) that can be fetched from sources for further
/// synchronization.
/// - `Ok(Vec<Vec<u8>>)`: A tuple of: vector of global chunk IDs (each
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no tuple anymore

if let Ok(res) =
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version)
if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size {
match self.as_mut().pending_discovered_subtrees() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of match is preferred as if let Some(existing_subtrees_metadata) = .. {} else {}

@ogabrielides ogabrielides merged commit 8c0dbea into develop Mar 4, 2025
8 checks passed
@ogabrielides ogabrielides deleted the feat/intermediate_transaction_commits branch March 4, 2025 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants