diff --git a/lib/loki-logproto/src/lib.rs b/lib/loki-logproto/src/lib.rs index ec7fee42c7613..6a31918224154 100644 --- a/lib/loki-logproto/src/lib.rs +++ b/lib/loki-logproto/src/lib.rs @@ -54,9 +54,7 @@ pub mod util { let streams: Vec = self.0.into_iter().map(|stream| stream.into()).collect(); let push_request = logproto::PushRequest { streams }; - let buf = push_request.encode_to_vec(); - let mut encoder = snap::raw::Encoder::new(); - encoder.compress_vec(&buf).expect("out of memory") + push_request.encode_to_vec() } } @@ -121,10 +119,10 @@ mod tests { let batch = Batch(vec![Stream(labels, vec![entry1, entry2])]); // generated by test codes from promtail let expect = vec![ - 62, 176, 10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, - 111, 98, 117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, - 142, 6, 18, 5, 104, 101, 108, 108, 111, 5, 17, 44, 183, 204, 144, 142, 6, 18, 5, 119, - 111, 114, 108, 100, + 10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, 111, 98, + 117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, 142, 6, 18, + 5, 104, 101, 108, 108, 111, 18, 15, 10, 6, 8, 183, 204, 144, 142, 6, 18, 5, 119, 111, + 114, 108, 100, ]; let buf = batch.encode(); assert_eq!(expect, buf); diff --git a/src/sinks/loki/config.rs b/src/sinks/loki/config.rs index f72c68c964007..74ce254793ece 100644 --- a/src/sinks/loki/config.rs +++ b/src/sinks/loki/config.rs @@ -9,42 +9,8 @@ use crate::{ sinks::{prelude::*, util::UriSerde}, }; -/// Loki-specific compression. -#[configurable_component] -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum ExtendedCompression { - /// Snappy compression. - /// - /// This implies sending push requests as Protocol Buffers. - #[serde(rename = "snappy")] - Snappy, -} - -/// Compression configuration. -#[configurable_component] -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -#[serde(untagged)] -pub enum CompressionConfigAdapter { - /// Basic compression. - Original(Compression), - - /// Loki-specific compression. - Extended(ExtendedCompression), -} - -impl CompressionConfigAdapter { - pub const fn content_encoding(self) -> Option<&'static str> { - match self { - CompressionConfigAdapter::Original(compression) => compression.content_encoding(), - CompressionConfigAdapter::Extended(_) => Some("snappy"), - } - } -} - -impl Default for CompressionConfigAdapter { - fn default() -> Self { - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) - } +const fn default_compression() -> Compression { + Compression::Snappy } fn default_loki_path() -> String { @@ -106,9 +72,10 @@ pub struct LokiConfig { #[serde(default = "crate::serde::default_true")] pub remove_timestamp: bool, - #[configurable(derived)] - #[serde(default)] - pub compression: CompressionConfigAdapter, + /// Compression configuration. + /// Snappy compression implies sending push requests as Protocol Buffers. + #[serde(default = "default_compression")] + pub compression: Compression, #[configurable(derived)] #[serde(default)] diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index edcc762042fba..2a4c33280732f 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -5,7 +5,6 @@ use http::StatusCode; use snafu::Snafu; use tracing::Instrument; -use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::{ http::{Auth, HttpClient}, sinks::{prelude::*, util::UriSerde}, @@ -60,7 +59,7 @@ impl DriverResponse for LokiResponse { #[derive(Clone)] pub struct LokiRequest { - pub compression: CompressionConfigAdapter, + pub compression: Compression, pub finalizers: EventFinalizers, pub payload: Bytes, pub tenant_id: Option, @@ -113,10 +112,8 @@ impl Service for LokiService { fn call(&mut self, request: LokiRequest) -> Self::Future { let content_type = match request.compression { - CompressionConfigAdapter::Original(_) => "application/json", - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => { - "application/x-protobuf" - } + Compression::Snappy => "application/x-protobuf", + _ => "application/json", }; let mut req = http::Request::post(&self.endpoint.uri).header("Content-Type", content_type); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 1410a7c26430e..cc3127415bf62 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -12,7 +12,6 @@ use super::{ event::{LokiBatchEncoder, LokiEvent, LokiRecord, PartitionKey}, service::{LokiRequest, LokiRetryLogic, LokiService}, }; -use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::sinks::loki::event::LokiBatchEncoding; use crate::{ http::{get_http_scheme_from_uri, HttpClient}, @@ -65,7 +64,7 @@ impl Partitioner for RecordPartitioner { #[derive(Clone)] pub struct LokiRequestBuilder { - compression: CompressionConfigAdapter, + compression: Compression, encoder: LokiBatchEncoder, } @@ -92,10 +91,7 @@ impl RequestBuilder<(PartitionKey, Vec)> for LokiRequestBuilder { type Error = RequestBuildError; fn compression(&self) -> Compression { - match self.compression { - CompressionConfigAdapter::Original(compression) => compression, - CompressionConfigAdapter::Extended(_) => Compression::None, - } + self.compression } fn encoder(&self) -> &Self::Encoder { @@ -415,10 +411,8 @@ impl LokiSink { let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); let batch_encoder = match config.compression { - CompressionConfigAdapter::Original(_) => LokiBatchEncoder(LokiBatchEncoding::Json), - CompressionConfigAdapter::Extended(ExtendedCompression::Snappy) => { - LokiBatchEncoder(LokiBatchEncoding::Protobuf) - } + Compression::Snappy => LokiBatchEncoder(LokiBatchEncoding::Protobuf), + _ => LokiBatchEncoder(LokiBatchEncoding::Json), }; Ok(Self { diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index c44bc390a0c1a..6b381045937ed 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -110,8 +110,11 @@ base: components: sinks: loki: configuration: { } } compression: { - description: "Compression configuration." - required: false + description: """ + Compression configuration. + Snappy compression implies sending push requests as Protocol Buffers. + """ + required: false type: string: { default: "snappy" enum: { @@ -122,9 +125,9 @@ base: components: sinks: loki: configuration: { """ none: "No compression." snappy: """ - Snappy compression. + [Snappy][snappy] compression. - This implies sending push requests as Protocol Buffers. + [snappy]: https://github.com/google/snappy/blob/main/docs/README.md """ zlib: """ [Zlib][zlib] compression.