From 1f246c77ae8ac2636606f8b7940475dbac0937a6 Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Fri, 21 Feb 2025 15:44:09 +0100 Subject: [PATCH] fix(server): release lock before async save to prevent deadlock (#1567) - In `consumer_offsets.rs`, clone the consumer offset's path and drop the mutable guard before awaiting the async save. - Refactor `save_consumer_offset` to accept a new offset value and path instead of a ConsumerOffset reference. - Update error messages and trace logs accordingly. - Bump version in Cargo.toml from 0.4.210 to 0.4.211. --- Cargo.lock | 2 +- integration/tests/streaming/consumer_offset.rs | 11 +++++------ server/Cargo.toml | 2 +- .../streaming/partitions/consumer_offsets.rs | 9 +++++---- server/src/streaming/partitions/partition.rs | 4 ++-- server/src/streaming/partitions/storage.rs | 18 ++++++------------ server/src/streaming/storage.rs | 5 +++-- 7 files changed, 23 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1f82560f..e729fb88b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4623,7 +4623,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.210" +version = "0.4.211" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/integration/tests/streaming/consumer_offset.rs b/integration/tests/streaming/consumer_offset.rs index b0f2aa665..813500596 100644 --- a/integration/tests/streaming/consumer_offset.rs +++ b/integration/tests/streaming/consumer_offset.rs @@ -42,7 +42,10 @@ async fn assert_persisted_offset( consumer_offset: &ConsumerOffset, expected_offsets_count: u32, ) { - storage.save_consumer_offset(consumer_offset).await.unwrap(); + storage + .save_consumer_offset(consumer_offset.offset, &consumer_offset.path) + .await + .unwrap(); let consumer_offsets = storage .load_consumer_offsets(consumer_offset.kind, path) .await @@ -51,11 +54,7 @@ async fn assert_persisted_offset( assert_eq!(consumer_offsets.len(), expected_offsets_count); let loaded_consumer_offset = consumer_offsets.get(expected_offsets_count - 1).unwrap(); - // TODO(hubcio): This is a workaround: sometimes offset is 4, sometimes 5 - let offset_ok = loaded_consumer_offset.offset == consumer_offset.offset - || loaded_consumer_offset.offset == consumer_offset.offset + 1 - || loaded_consumer_offset.offset == consumer_offset.offset - 1; - assert!(offset_ok); + assert!(loaded_consumer_offset.offset == consumer_offset.offset); assert_eq!(loaded_consumer_offset.kind, consumer_offset.kind); assert_eq!( diff --git a/server/Cargo.toml b/server/Cargo.toml index 76a896a67..9ac0d4cda 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.210" +version = "0.4.211" edition = "2021" build = "src/build.rs" license = "Apache-2.0" diff --git a/server/src/streaming/partitions/consumer_offsets.rs b/server/src/streaming/partitions/consumer_offsets.rs index 2ad4a48b1..8c90fa904 100644 --- a/server/src/streaming/partitions/consumer_offsets.rs +++ b/server/src/streaming/partitions/consumer_offsets.rs @@ -78,14 +78,15 @@ impl Partition { let consumer_offsets = self.get_consumer_offsets(kind); if let Some(mut consumer_offset) = consumer_offsets.get_mut(&consumer_id) { consumer_offset.offset = offset; + let path = consumer_offset.path.clone(); + drop(consumer_offset); self.storage .partition - .save_consumer_offset(&consumer_offset) + .save_consumer_offset(offset, &path) .await .with_error_context(|error| { format!( - "{COMPONENT} (error: {error}) - failed to save consumer offset, consumer ID: {}, offset: {}", - consumer_id, offset + "{COMPONENT} (error: {error}) - failed to save consumer offset, consumer ID: {consumer_id}, offset: {offset}, path: {path}", ) })?; return Ok(()); @@ -98,7 +99,7 @@ impl Partition { let consumer_offset = ConsumerOffset::new(kind, consumer_id, offset, path); self.storage .partition - .save_consumer_offset(&consumer_offset) + .save_consumer_offset(offset, &consumer_offset.path) .await .with_error_context(|error| { format!( diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 6fceddf27..940b5f2b1 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -54,7 +54,7 @@ pub struct ConsumerOffset { pub kind: ConsumerKind, pub consumer_id: u32, pub offset: u64, - pub path: String, + pub path: Arc, } impl ConsumerOffset { @@ -63,7 +63,7 @@ impl ConsumerOffset { kind, consumer_id, offset, - path: format!("{path}/{consumer_id}"), + path: Arc::new(format!("{path}/{consumer_id}")), } } } diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 80c8dfe36..9e35dc6f5 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -335,21 +335,15 @@ impl PartitionStorage for FilePartitionStorage { Ok(()) } - async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError> { + async fn save_consumer_offset(&self, offset: u64, path: &str) -> Result<(), IggyError> { self.persister - .overwrite(&offset.path, &offset.offset.to_le_bytes()) + .overwrite(path, &offset.to_le_bytes()) .await .with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - failed to overwrite consumer offset with value: {}, kind: {}, consumer ID: {}, path: {}", - offset.offset, offset.kind, offset.consumer_id, offset.path, + "{COMPONENT} (error: {error}) - failed to overwrite consumer offset with value: {}, path: {}", + offset, path, ))?; - trace!( - "Stored consumer offset value: {} for {} with ID: {}, path: {}", - offset.offset, - offset.kind, - offset.consumer_id, - offset.path - ); + trace!("Stored consumer offset value: {}, path: {}", offset, path); Ok(()) } @@ -390,7 +384,7 @@ impl PartitionStorage for FilePartitionStorage { continue; } - let path = path.unwrap().to_string(); + let path = Arc::new(path.unwrap().to_string()); let consumer_id = consumer_id.unwrap(); let mut file = file::open(&path) .await diff --git a/server/src/streaming/storage.rs b/server/src/streaming/storage.rs index 3e7ae46d6..4bbb1f57e 100644 --- a/server/src/streaming/storage.rs +++ b/server/src/streaming/storage.rs @@ -105,7 +105,8 @@ pub trait PartitionStorage: Send { fn delete(&self, partition: &Partition) -> impl Future> + Send; fn save_consumer_offset( &self, - offset: &ConsumerOffset, + offset: u64, + path: &str, ) -> impl Future> + Send; fn load_consumer_offsets( &self, @@ -177,7 +178,7 @@ impl PartitionStorageKind { -> Result<(), IggyError>; async fn save(&self, partition: &mut Partition) -> Result<(), IggyError>; async fn delete(&self, partition: &Partition) -> Result<(), IggyError>; - async fn save_consumer_offset(&self, offset: &ConsumerOffset) -> Result<(), IggyError>; + async fn save_consumer_offset(&self, offset: u64, path: &str) -> Result<(), IggyError>; async fn load_consumer_offsets( &self, kind: ConsumerKind,