Skip to content

Commit

Permalink
enhancement(topology): Add source id to metadata (#17369)
Browse files Browse the repository at this point in the history
* Add source to metadata

Signed-off-by: Stephen Wakely <[email protected]>

* Added test for source

Signed-off-by: Stephen Wakely <[email protected]>

* Fixed existing tests

Signed-off-by: Stephen Wakely <[email protected]>

* Remove dbg

Signed-off-by: Stephen Wakely <[email protected]>

* Rename source to source_id

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Bruce

Signed-off-by: Stephen Wakely <[email protected]>

* Fix remaining tests

Signed-off-by: Stephen Wakely <[email protected]>

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored May 15, 2023
1 parent a2b8903 commit c683999
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 53 deletions.
25 changes: 23 additions & 2 deletions lib/vector-core/src/event/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! This module contains the definitions and wrapper types for handling
//! arrays of type `Event`, in the various forms they may appear.
use std::{iter, slice, vec};
use std::{iter, slice, sync::Arc, vec};

use futures::{stream, Stream};
#[cfg(test)]
Expand All @@ -14,7 +14,7 @@ use super::{
EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
LogEvent, Metric, TraceEvent,
};
use crate::ByteSizeOf;
use crate::{config::OutputId, ByteSizeOf};

/// The type alias for an array of `LogEvent` elements.
pub type LogArray = Vec<LogEvent>;
Expand Down Expand Up @@ -138,6 +138,27 @@ pub enum EventArray {
}

impl EventArray {
/// Sets the `OutputId` in the metadata for all the events in this array.
pub fn set_output_id(&mut self, output_id: &Arc<OutputId>) {
match self {
EventArray::Logs(logs) => {
for log in logs {
log.metadata_mut().set_source_id(Arc::clone(output_id));
}
}
EventArray::Metrics(metrics) => {
for metric in metrics {
metric.metadata_mut().set_source_id(Arc::clone(output_id));
}
}
EventArray::Traces(traces) => {
for trace in traces {
trace.metadata_mut().set_source_id(Arc::clone(output_id));
}
}
}
}

/// Iterate over references to this array's events.
pub fn iter_events(&self) -> impl Iterator<Item = EventRef> {
match self {
Expand Down
17 changes: 16 additions & 1 deletion lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vector_common::EventDataEq;
use vrl::value::{Kind, Secrets, Value};

use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus};
use crate::config::LogNamespace;
use crate::config::{LogNamespace, OutputId};
use crate::{schema, ByteSizeOf};

const DATADOG_API_KEY: &str = "datadog_api_key";
Expand All @@ -29,6 +29,9 @@ pub struct EventMetadata {
#[serde(default, skip)]
finalizers: EventFinalizers,

/// The id of the source
source_id: Option<Arc<OutputId>>,

/// An identifier for a globally registered schema definition which provides information about
/// the event shape (type information, and semantic meaning of fields).
///
Expand Down Expand Up @@ -70,6 +73,17 @@ impl EventMetadata {
&mut self.secrets
}

/// Returns a reference to the metadata source.
#[must_use]
pub fn source_id(&self) -> Option<&OutputId> {
self.source_id.as_deref()
}

/// Sets the `source_id` in the metadata to the provided value.
pub fn set_source_id(&mut self, source_id: Arc<OutputId>) {
self.source_id = Some(source_id);
}

/// Return the datadog API key, if it exists
pub fn datadog_api_key(&self) -> Option<Arc<str>> {
self.secrets.get(DATADOG_API_KEY).cloned()
Expand Down Expand Up @@ -98,6 +112,7 @@ impl Default for EventMetadata {
secrets: Secrets::new(),
finalizers: Default::default(),
schema_definition: default_schema_definition(),
source_id: None,
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
fmt::Debug,
sync::Arc,
};

use crate::ByteSizeOf;
use crate::{config::OutputId, ByteSizeOf};
pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
pub use finalization::{
Expand Down Expand Up @@ -280,6 +281,24 @@ impl Event {
Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
}
}

/// Returns a reference to the event metadata source.
#[must_use]
pub fn source_id(&self) -> Option<&OutputId> {
self.metadata().source_id()
}

/// Sets the `source_id` in the event metadata to the provided value.
pub fn set_source_id(&mut self, source_id: Arc<OutputId>) {
self.metadata_mut().set_source_id(source_id);
}

/// Sets the `source_id` in the event metadata to the provided value.
#[must_use]
pub fn with_source_id(mut self, source_id: Arc<OutputId>) -> Self {
self.metadata_mut().set_source_id(source_id);
self
}
}

impl EventDataEq for Event {
Expand Down
8 changes: 7 additions & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,16 @@ impl<'a> Builder<'a> {
let mut rx = builder.add_source_output(output.clone());

let (mut fanout, control) = Fanout::new();
let source = Arc::new(OutputId {
component: key.clone(),
port: output.port.clone(),
});

let pump = async move {
debug!("Source pump starting.");

while let Some(array) = rx.next().await {
while let Some(mut array) = rx.next().await {
array.set_output_id(&source);
fanout.send(array).await.map_err(|e| {
debug!("Source pump finished with an error.");
TaskError::wrapped(e)
Expand Down
19 changes: 15 additions & 4 deletions src/topology/test/compliance.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::sync::Arc;

use tokio::sync::oneshot::{channel, Receiver};
use vector_core::event::{Event, EventArray, EventContainer, LogEvent};
use vector_core::{
config::OutputId,
event::{Event, EventArray, EventContainer, LogEvent},
};

use crate::{
config::{unit_test::UnitTestSourceConfig, ConfigBuilder},
Expand Down Expand Up @@ -44,7 +49,7 @@ async fn create_topology(
#[tokio::test]
async fn test_function_transform_single_event() {
assert_transform_compliance(async {
let original_event = Event::Log(LogEvent::from("function transform being tested"));
let mut original_event = Event::Log(LogEvent::from("function transform being tested"));

let (topology, rx) = create_topology(original_event.clone(), TransformType::Function).await;
topology.stop().await;
Expand All @@ -53,6 +58,8 @@ async fn test_function_transform_single_event() {
let mut events = events.into_events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);

original_event.set_source_id(Arc::new(OutputId::from("in")));

let event = events.remove(0);
assert_eq!(original_event, event);
})
Expand All @@ -62,7 +69,7 @@ async fn test_function_transform_single_event() {
#[tokio::test]
async fn test_sync_transform_single_event() {
assert_transform_compliance(async {
let original_event = Event::Log(LogEvent::from("function transform being tested"));
let mut original_event = Event::Log(LogEvent::from("function transform being tested"));

let (topology, rx) =
create_topology(original_event.clone(), TransformType::Synchronous).await;
Expand All @@ -72,6 +79,8 @@ async fn test_sync_transform_single_event() {
let mut events = events.into_events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);

original_event.set_source_id(Arc::new(OutputId::from("in")));

let event = events.remove(0);
assert_eq!(original_event, event);
})
Expand All @@ -81,7 +90,7 @@ async fn test_sync_transform_single_event() {
#[tokio::test]
async fn test_task_transform_single_event() {
assert_transform_compliance(async {
let original_event = Event::Log(LogEvent::from("function transform being tested"));
let mut original_event = Event::Log(LogEvent::from("function transform being tested"));

let (topology, rx) = create_topology(original_event.clone(), TransformType::Task).await;
topology.stop().await;
Expand All @@ -90,6 +99,8 @@ async fn test_task_transform_single_event() {
let mut events = events.into_events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);

original_event.set_source_id(Arc::new(OutputId::from("in")));

let event = events.remove(0);
assert_eq!(original_event, event);
})
Expand Down
Loading

0 comments on commit c683999

Please sign in to comment.