Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(topology): Add source id to metadata #17369

Merged
merged 8 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions lib/vector-core/src/event/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! This module contains the definitions and wrapper types for handling
//! arrays of type `Event`, in the various forms they may appear.
use std::{iter, slice, vec};
use std::{iter, slice, sync::Arc, vec};

use futures::{stream, Stream};
#[cfg(test)]
Expand All @@ -14,7 +14,7 @@ use super::{
EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
LogEvent, Metric, TraceEvent,
};
use crate::ByteSizeOf;
use crate::{config::OutputId, ByteSizeOf};

/// The type alias for an array of `LogEvent` elements.
pub type LogArray = Vec<LogEvent>;
Expand Down Expand Up @@ -138,6 +138,27 @@ pub enum EventArray {
}

impl EventArray {
/// Sets the `OutputId` in the metadata for all the events in this array.
pub fn set_output_id(&mut self, output_id: &Arc<OutputId>) {
match self {
EventArray::Logs(logs) => {
for log in logs {
log.metadata_mut().set_source_id(Arc::clone(output_id));
}
}
EventArray::Metrics(metrics) => {
for metric in metrics {
metric.metadata_mut().set_source_id(Arc::clone(output_id));
}
}
EventArray::Traces(traces) => {
for trace in traces {
trace.metadata_mut().set_source_id(Arc::clone(output_id));
}
}
}
}

/// Iterate over references to this array's events.
pub fn iter_events(&self) -> impl Iterator<Item = EventRef> {
match self {
Expand Down
17 changes: 16 additions & 1 deletion lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vector_common::EventDataEq;
use vrl::value::{Kind, Secrets, Value};

use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus};
use crate::config::LogNamespace;
use crate::config::{LogNamespace, OutputId};
use crate::{schema, ByteSizeOf};

const DATADOG_API_KEY: &str = "datadog_api_key";
Expand All @@ -29,6 +29,9 @@ pub struct EventMetadata {
#[serde(default, skip)]
finalizers: EventFinalizers,

/// The id of the source
source_id: Option<Arc<OutputId>>,

/// An identifier for a globally registered schema definition which provides information about
/// the event shape (type information, and semantic meaning of fields).
///
Expand Down Expand Up @@ -70,6 +73,17 @@ impl EventMetadata {
&mut self.secrets
}

/// Returns a reference to the metadata source.
#[must_use]
pub fn source_id(&self) -> Option<&OutputId> {
self.source_id.as_deref()
}

/// Sets the `source_id` in the metadata to the provided value.
pub fn set_source_id(&mut self, source_id: Arc<OutputId>) {
self.source_id = Some(source_id);
}

/// Return the datadog API key, if it exists
pub fn datadog_api_key(&self) -> Option<Arc<str>> {
self.secrets.get(DATADOG_API_KEY).cloned()
Expand Down Expand Up @@ -98,6 +112,7 @@ impl Default for EventMetadata {
secrets: Secrets::new(),
finalizers: Default::default(),
schema_definition: default_schema_definition(),
source_id: None,
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
fmt::Debug,
sync::Arc,
};

use crate::ByteSizeOf;
use crate::{config::OutputId, ByteSizeOf};
pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
pub use finalization::{
Expand Down Expand Up @@ -280,6 +281,24 @@ impl Event {
Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
}
}

/// Returns a reference to the event metadata source.
#[must_use]
pub fn source_id(&self) -> Option<&OutputId> {
self.metadata().source_id()
}

/// Sets the `source_id` in the event metadata to the provided value.
pub fn set_source_id(&mut self, source_id: Arc<OutputId>) {
self.metadata_mut().set_source_id(source_id);
}

/// Sets the `source_id` in the event metadata to the provided value.
#[must_use]
pub fn with_source_id(mut self, source_id: Arc<OutputId>) -> Self {
self.metadata_mut().set_source_id(source_id);
self
}
}

impl EventDataEq for Event {
Expand Down
8 changes: 7 additions & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,16 @@ impl<'a> Builder<'a> {
let mut rx = builder.add_source_output(output.clone());

let (mut fanout, control) = Fanout::new();
let source = Arc::new(OutputId {
component: key.clone(),
port: output.port.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the output port actually used/useful? What is the envisioned use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. The primary use case is for the telemetry that the sinks will emit. Adding the source as a tag to these metrics will allow an event to be tracked from source to sink.

A secondary benefit will be to give VRL access to the field, something that has been asked for a number of times.

As it stands at the minute, the port is probably not that useful since the only sources that specify a port at the moment use the port to distinguish between logs, traces or metric outputs. So the port can be determined through other means. I'm not sure whether this will always be the case though..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm 50/50 on this one. If there's not a known use, I don't think we should include it from the basic YAGNI principle. However, since it's in an Arc, cloning it costs no more than cloning the base component name, so it's pretty much free.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will shortly be adding another field to the metadata to indicate the previous component in the pipeline. That field will need the port, since the output of the port determines the schema definition that the component outputs. Keeping it like this does make it consistent with that.

But still I am also 50/50 on this.

});

let pump = async move {
debug!("Source pump starting.");

while let Some(array) = rx.next().await {
while let Some(mut array) = rx.next().await {
array.set_output_id(&source);
fanout.send(array).await.map_err(|e| {
debug!("Source pump finished with an error.");
TaskError::wrapped(e)
Expand Down
19 changes: 15 additions & 4 deletions src/topology/test/compliance.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::sync::Arc;

use tokio::sync::oneshot::{channel, Receiver};
use vector_core::event::{Event, EventArray, EventContainer, LogEvent};
use vector_core::{
config::OutputId,
event::{Event, EventArray, EventContainer, LogEvent},
};

use crate::{
config::{unit_test::UnitTestSourceConfig, ConfigBuilder},
Expand Down Expand Up @@ -44,7 +49,7 @@ async fn create_topology(
#[tokio::test]
async fn test_function_transform_single_event() {
assert_transform_compliance(async {
let original_event = Event::Log(LogEvent::from("function transform being tested"));
let mut original_event = Event::Log(LogEvent::from("function transform being tested"));

let (topology, rx) = create_topology(original_event.clone(), TransformType::Function).await;
topology.stop().await;
Expand All @@ -53,6 +58,8 @@ async fn test_function_transform_single_event() {
let mut events = events.into_events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);

original_event.set_source_id(Arc::new(OutputId::from("in")));

let event = events.remove(0);
assert_eq!(original_event, event);
})
Expand All @@ -62,7 +69,7 @@ async fn test_function_transform_single_event() {
#[tokio::test]
async fn test_sync_transform_single_event() {
assert_transform_compliance(async {
let original_event = Event::Log(LogEvent::from("function transform being tested"));
let mut original_event = Event::Log(LogEvent::from("function transform being tested"));

let (topology, rx) =
create_topology(original_event.clone(), TransformType::Synchronous).await;
Expand All @@ -72,6 +79,8 @@ async fn test_sync_transform_single_event() {
let mut events = events.into_events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);

original_event.set_source_id(Arc::new(OutputId::from("in")));

let event = events.remove(0);
assert_eq!(original_event, event);
})
Expand All @@ -81,7 +90,7 @@ async fn test_sync_transform_single_event() {
#[tokio::test]
async fn test_task_transform_single_event() {
assert_transform_compliance(async {
let original_event = Event::Log(LogEvent::from("function transform being tested"));
let mut original_event = Event::Log(LogEvent::from("function transform being tested"));

let (topology, rx) = create_topology(original_event.clone(), TransformType::Task).await;
topology.stop().await;
Expand All @@ -90,6 +99,8 @@ async fn test_task_transform_single_event() {
let mut events = events.into_events().collect::<Vec<_>>();
assert_eq!(events.len(), 1);

original_event.set_source_id(Arc::new(OutputId::from("in")));

let event = events.remove(0);
assert_eq!(original_event, event);
})
Expand Down
Loading