-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: intermediate tx commiting in state sync #361
Changes from 1 commit
07d1b28
a4666eb
33a05d5
1a125e2
19a572f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
collections::{BTreeMap, BTreeSet}, | ||
fmt, | ||
marker::PhantomPinned, | ||
mem, | ||
pin::Pin, | ||
}; | ||
|
||
|
@@ -167,19 +168,35 @@ | |
/// This is placed last to ensure it is dropped last. | ||
transaction: Transaction<'db>, | ||
|
||
/// Maximum number of subtrees that can be processed in a single batch. | ||
subtrees_batch_size: usize, | ||
|
||
/// Counter tracking the number of subtrees processed in the current batch. | ||
num_processed_subtrees_in_batch: usize, | ||
|
||
/// Metadata for newly discovered subtrees that are pending processing. | ||
pending_discovered_subtrees: Option<SubtreesMetadata>, | ||
|
||
/// Marker to ensure this struct is not moved in memory. | ||
_pin: PhantomPinned, | ||
} | ||
|
||
impl<'db> MultiStateSyncSession<'db> { | ||
/// Initializes a new state sync session. | ||
pub fn new(transaction: Transaction<'db>, app_hash: [u8; 32]) -> Pin<Box<Self>> { | ||
pub fn new( | ||
transaction: Transaction<'db>, | ||
app_hash: [u8; 32], | ||
subtrees_batch_size: usize, | ||
) -> Pin<Box<Self>> { | ||
Box::pin(MultiStateSyncSession { | ||
transaction, | ||
current_prefixes: Default::default(), | ||
processed_prefixes: Default::default(), | ||
app_hash, | ||
version: CURRENT_STATE_SYNC_VERSION, | ||
subtrees_batch_size, | ||
num_processed_subtrees_in_batch: 0, | ||
pending_discovered_subtrees: None, | ||
_pin: PhantomPinned, | ||
}) | ||
} | ||
|
@@ -204,6 +221,14 @@ | |
unsafe { Pin::into_inner_unchecked(self) }.transaction | ||
} | ||
|
||
pub unsafe fn set_new_transaction( | ||
self: Pin<&mut Self>, | ||
new_tx: Transaction<'db>, | ||
) -> Transaction<'db> { | ||
Check warning on line 227 in grovedb/src/replication/state_sync_session.rs
|
||
let this = self.get_unchecked_mut(); | ||
mem::replace(&mut this.transaction, new_tx) | ||
} | ||
|
||
/// Adds synchronization information for a subtree into the current | ||
/// synchronization session. | ||
/// | ||
|
@@ -295,6 +320,20 @@ | |
&mut unsafe { self.get_unchecked_mut() }.processed_prefixes | ||
} | ||
|
||
fn num_processed_subtrees_in_batch(self: Pin<&mut MultiStateSyncSession<'db>>) -> &mut usize { | ||
// SAFETY: no memory-sensitive assumptions are made about fields except the | ||
// `transaciton` so it will be safe to modify them | ||
&mut unsafe { self.get_unchecked_mut() }.num_processed_subtrees_in_batch | ||
} | ||
|
||
fn pending_discovered_subtrees( | ||
self: Pin<&mut MultiStateSyncSession<'db>>, | ||
) -> &mut Option<SubtreesMetadata> { | ||
// SAFETY: no memory-sensitive assumptions are made about fields except the | ||
// `transaciton` so it will be safe to modify them | ||
&mut unsafe { self.get_unchecked_mut() }.pending_discovered_subtrees | ||
} | ||
|
||
/// Applies a chunk during the state synchronization process. | ||
/// This method should be called by ABCI when the `ApplySnapshotChunk` | ||
/// method is invoked. | ||
|
@@ -311,9 +350,10 @@ | |
/// GroveDB version. | ||
/// | ||
/// # Returns | ||
/// - `Ok(Vec<Vec<u8>>)`: A vector of global chunk IDs (each represented as | ||
/// a vector of bytes) that can be fetched from sources for further | ||
/// synchronization. | ||
/// - `Ok((Vec<Vec<u8>>, bool))`: A tuple of: vector of global chunk IDs | ||
/// (each represented as a vector of bytes) that can be fetched from | ||
/// sources for further synchronization, and boolean indicating wether the | ||
/// intermediate tx should be commited. | ||
/// - `Err(Error)`: An error if the chunk application fails or if the chunk | ||
/// proof is invalid. | ||
/// | ||
|
@@ -337,7 +377,7 @@ | |
packed_global_chunks: &[u8], | ||
version: u16, | ||
grove_version: &GroveVersion, | ||
) -> Result<Vec<Vec<u8>>, Error> { | ||
) -> Result<(Vec<Vec<u8>>, bool), Error> { | ||
// For now, only CURRENT_STATE_SYNC_VERSION is supported | ||
if version != CURRENT_STATE_SYNC_VERSION { | ||
return Err(Error::CorruptedData( | ||
|
@@ -449,22 +489,131 @@ | |
|
||
self.as_mut().processed_prefixes().insert(chunk_prefix); | ||
|
||
*self.as_mut().num_processed_subtrees_in_batch() += 1; | ||
|
||
let new_subtrees_metadata = | ||
self.discover_new_subtrees_metadata(db, &completed_path, grove_version)?; | ||
|
||
if let Ok(res) = | ||
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) | ||
{ | ||
next_chunk_ids.extend(res); | ||
next_global_chunk_ids.extend(next_chunk_ids); | ||
if self.num_processed_subtrees_in_batch >= self.subtrees_batch_size { | ||
match self.as_mut().pending_discovered_subtrees() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this kind of match is preferred as |
||
None => { | ||
*self.as_mut().pending_discovered_subtrees() = | ||
Some(new_subtrees_metadata); | ||
} | ||
Some(existing_subtrees_metadata) => { | ||
existing_subtrees_metadata | ||
.data | ||
.extend(new_subtrees_metadata.data); | ||
} | ||
} | ||
} else { | ||
return Err(Error::InternalError( | ||
"Unable to discover Subtrees".to_string(), | ||
)); | ||
if let Ok(res) = | ||
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) | ||
{ | ||
next_chunk_ids.extend(res); | ||
next_global_chunk_ids.extend(next_chunk_ids); | ||
} else { | ||
return Err(Error::InternalError( | ||
"Unable to discover Subtrees".to_string(), | ||
)); | ||
} | ||
} | ||
Check warning on line 520 in grovedb/src/replication/state_sync_session.rs
|
||
ogabrielides marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
let mut res: Vec<Vec<u8>> = vec![]; | ||
for grouped_next_global_chunk_ids in next_global_chunk_ids.chunks(CONST_GROUP_PACKING_SIZE) | ||
{ | ||
res.push(pack_nested_bytes(grouped_next_global_chunk_ids.to_vec())); | ||
} | ||
Ok(( | ||
res, | ||
self.num_processed_subtrees_in_batch >= self.subtrees_batch_size, | ||
)) | ||
} | ||
|
||
/// Resumes the state synchronization process. | ||
/// | ||
/// This function attempts to continue the state synchronization by | ||
/// processing any pending discovered subtrees and preparing new | ||
/// synchronization sessions. | ||
/// | ||
/// # Parameters | ||
/// | ||
/// - `self`: A pinned, boxed reference to `MultiStateSyncSession`, ensuring | ||
/// the struct remains in place during processing. | ||
/// - `db`: A reference to the `GroveDb` instance used for querying the | ||
/// database. | ||
/// - `version`: The state synchronization protocol version being used. | ||
/// - `grove_version`: A reference to the `GroveVersion`, which represents | ||
/// the version of the underlying database structure. | ||
/// | ||
/// # Returns | ||
/// | ||
/// - `Ok(Vec<Vec<u8>>)` if the synchronization process successfully | ||
/// discovers and prepares new subtree chunks. | ||
/// - `Err(Error)`: An error if the synchronization process encounters | ||
/// issues, such as: | ||
/// - The provided `version` does not match the expected protocol version. | ||
/// - No pending discovered subtrees are available. | ||
/// - Failure to discover new subtrees in the database. | ||
/// | ||
/// # Errors | ||
/// | ||
/// - Returns `Error::CorruptedData` if an unsupported state sync protocol | ||
/// version is provided or if there are no pending discovered subtrees. | ||
/// - Returns `Error::InternalError` if it fails to discover new subtrees. | ||
/// | ||
/// # Behavior | ||
/// | ||
/// - Validates that the provided protocol `version` matches the expected | ||
/// `CURRENT_STATE_SYNC_VERSION`. | ||
/// - Retrieves and clears any pending discovered subtrees. | ||
/// - Resets the processed subtrees counter for the current batch. | ||
/// - Attempts to prepare new synchronization state sessions. | ||
/// - Packs the discovered chunk IDs into nested byte vectors before | ||
/// returning them. | ||
pub fn resume_sync( | ||
self: &mut Pin<Box<MultiStateSyncSession<'db>>>, | ||
db: &'db GroveDb, | ||
version: u16, | ||
grove_version: &GroveVersion, | ||
) -> Result<Vec<Vec<u8>>, Error> { | ||
// For now, only CURRENT_STATE_SYNC_VERSION is supported | ||
if version != CURRENT_STATE_SYNC_VERSION { | ||
return Err(Error::CorruptedData( | ||
"Unsupported state sync protocol version".to_string(), | ||
)); | ||
} | ||
if version != self.version { | ||
return Err(Error::CorruptedData( | ||
"Unsupported state sync protocol version".to_string(), | ||
)); | ||
} | ||
|
||
let new_subtrees_metadata = | ||
self.as_mut() | ||
.pending_discovered_subtrees() | ||
.take() | ||
.ok_or(Error::CorruptedData( | ||
"Unsupported state sync protocol version".to_string(), | ||
))?; | ||
*self.as_mut().num_processed_subtrees_in_batch() = 0; | ||
|
||
let mut next_chunk_ids = vec![]; | ||
let mut next_global_chunk_ids: Vec<Vec<u8>> = vec![]; | ||
|
||
if let Ok(discovered_chunk_ids) = | ||
self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) | ||
{ | ||
next_chunk_ids.extend(discovered_chunk_ids); | ||
next_global_chunk_ids.extend(next_chunk_ids); | ||
} else { | ||
return Err(Error::InternalError( | ||
"Unable to discover Subtrees".to_string(), | ||
)); | ||
} | ||
|
||
ogabrielides marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut res: Vec<Vec<u8>> = vec![]; | ||
for grouped_next_global_chunk_ids in next_global_chunk_ids.chunks(CONST_GROUP_PACKING_SIZE) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub unsafe
is too much, and no safety docs evenThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fominok reduced the scope of unsafe.
Is there a better way to set a new transaction without consuming the session?