Skip to main content

vector/sinks/gcp_chronicle/
chronicle_unstructured.rs

1//! This sink sends data to Google Chronicles unstructured log entries endpoint.
2//! See <https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries>
3//! for more information.
4use std::{collections::HashMap, io};
5
6use bytes::{Bytes, BytesMut};
7use futures_util::{future::BoxFuture, task::Poll};
8use goauth::scopes::Scope;
9use http::{
10    Request, StatusCode, Uri,
11    header::{self, HeaderName, HeaderValue},
12};
13use hyper::Body;
14use indoc::indoc;
15use serde::Serialize;
16use serde_json::json;
17use snafu::Snafu;
18use tokio_util::codec::Encoder as _;
19use tower::{Service, ServiceBuilder};
20use vector_lib::{
21    EstimatedJsonEncodedSizeOf,
22    config::{AcknowledgementsConfig, Input, telemetry},
23    configurable::configurable_component,
24    event::{Event, EventFinalizers, Finalizable},
25    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
26    sink::VectorSink,
27};
28use vrl::value::Kind;
29
30use crate::{
31    codecs::{self, EncodingConfig},
32    config::{GenerateConfig, SinkConfig, SinkContext},
33    gcp::{GcpAuthConfig, GcpAuthenticator},
34    http::HttpClient,
35    schema,
36    sinks::{
37        Healthcheck,
38        gcp_chronicle::{
39            compression::ChronicleCompression,
40            partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
41            sink::ChronicleSink,
42        },
43        gcs_common::{
44            config::{GcsRetryLogic, healthcheck_response},
45            service::GcsResponse,
46        },
47        util::{
48            BatchConfig, Compression, RequestBuilder, SinkBatchSettings, TowerRequestConfig,
49            encoding::{Encoder, as_tracked_write},
50            metadata::RequestMetadataBuilder,
51            request_builder::EncodeResult,
52            service::TowerRequestConfigDefaults,
53        },
54    },
55    template::{Template, TemplateParseError},
56    tls::{TlsConfig, TlsSettings},
57};
58
59#[derive(Debug, Snafu)]
60#[snafu(visibility(pub))]
61pub enum GcsHealthcheckError {
62    #[snafu(display("log_type template parse error: {}", source))]
63    LogTypeTemplate { source: TemplateParseError },
64
65    #[snafu(display("Endpoint not found"))]
66    NotFound,
67}
68
69/// Google Chronicle regions.
70#[configurable_component]
71#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72#[serde(rename_all = "snake_case")]
73pub enum Region {
74    /// European Multi region
75    Eu,
76
77    /// US Multi region
78    Us,
79
80    /// APAC region (this is the same as the Singapore region endpoint retained for backwards compatibility)
81    Asia,
82
83    /// SãoPaulo Region
84    SãoPaulo,
85
86    /// Canada Region
87    Canada,
88
89    /// Dammam Region
90    Dammam,
91
92    /// Doha Region
93    Doha,
94
95    /// Frankfurt Region
96    Frankfurt,
97
98    /// London Region
99    London,
100
101    /// Mumbai Region
102    Mumbai,
103
104    /// Paris Region
105    Paris,
106
107    /// Singapore Region
108    Singapore,
109
110    /// Sydney Region
111    Sydney,
112
113    /// TelAviv Region
114    TelAviv,
115
116    /// Tokyo Region
117    Tokyo,
118
119    /// Turin Region
120    Turin,
121
122    /// Zurich Region
123    Zurich,
124}
125
126impl Region {
127    /// Each region has a its own endpoint.
128    const fn endpoint(self) -> &'static str {
129        match self {
130            Region::Eu => "https://europe-malachiteingestion-pa.googleapis.com",
131            Region::Us => "https://malachiteingestion-pa.googleapis.com",
132            Region::Asia => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
133            Region::SãoPaulo => "https://southamerica-east1-malachiteingestion-pa.googleapis.com",
134            Region::Canada => {
135                "https://northamerica-northeast2-malachiteingestion-pa.googleapis.com"
136            }
137            Region::Dammam => "https://me-central2-malachiteingestion-pa.googleapis.com",
138            Region::Doha => "https://me-central1-malachiteingestion-pa.googleapis.com",
139            Region::Frankfurt => "https://europe-west3-malachiteingestion-pa.googleapis.com",
140            Region::London => "https://europe-west2-malachiteingestion-pa.googleapis.com",
141            Region::Mumbai => "https://asia-south1-malachiteingestion-pa.googleapis.com",
142            Region::Paris => "https://europe-west9-malachiteingestion-pa.googleapis.com",
143            Region::Singapore => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
144            Region::Sydney => "https://australia-southeast1-malachiteingestion-pa.googleapis.com",
145            Region::TelAviv => "https://me-west1-malachiteingestion-pa.googleapis.com",
146            Region::Tokyo => "https://asia-northeast1-malachiteingestion-pa.googleapis.com",
147            Region::Turin => "https://europe-west12-malachiteingestion-pa.googleapis.com",
148            Region::Zurich => "https://europe-west6-malachiteingestion-pa.googleapis.com",
149        }
150    }
151}
152
153#[derive(Clone, Copy, Debug, Default)]
154pub struct ChronicleUnstructuredDefaultBatchSettings;
155
156// Chronicle Ingestion API has a 1MB limit[1] for unstructured log entries. We're also using a
157// conservatively low batch timeout to ensure events make it to Chronicle in a timely fashion, but
158// high enough that it allows for reasonable batching.
159//
160// [1]: https://cloud.google.com/chronicle/docs/reference/ingestion-api#unstructuredlogentries
161impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings {
162    const MAX_EVENTS: Option<usize> = None;
163    const MAX_BYTES: Option<usize> = Some(1_000_000);
164    const TIMEOUT_SECS: f64 = 15.0;
165}
166
167#[derive(Clone, Copy, Debug)]
168pub struct ChronicleUnstructuredTowerRequestConfigDefaults;
169
170impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
171    const RATE_LIMIT_NUM: u64 = 1_000;
172}
173
174/// Configuration for the `gcp_chronicle_unstructured` sink.
175#[configurable_component(sink(
176    "gcp_chronicle_unstructured",
177    "Store unstructured log events in Google Chronicle."
178))]
179#[derive(Clone, Debug)]
180pub struct ChronicleUnstructuredConfig {
181    /// The endpoint to send data to.
182    #[configurable(metadata(
183        docs::examples = "127.0.0.1:8080",
184        docs::examples = "example.com:12345"
185    ))]
186    pub endpoint: Option<String>,
187
188    /// The GCP region to use.
189    #[configurable(derived)]
190    pub region: Option<Region>,
191
192    /// The Unique identifier (UUID) corresponding to the Chronicle instance.
193    #[configurable(validation(format = "uuid"))]
194    #[configurable(metadata(docs::examples = "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c"))]
195    pub customer_id: String,
196
197    /// User-configured environment namespace to identify the data domain the logs originated from.
198    #[configurable(metadata(docs::templateable))]
199    #[configurable(metadata(
200        docs::examples = "production",
201        docs::examples = "production-{{ namespace }}",
202    ))]
203    #[configurable(metadata(docs::advanced))]
204    pub namespace: Option<Template>,
205
206    /// A set of labels that are attached to each batch of events.
207    #[configurable(metadata(docs::examples = "chronicle_labels_examples()"))]
208    #[configurable(metadata(docs::additional_props_description = "A Chronicle label."))]
209    pub labels: Option<HashMap<String, String>>,
210
211    #[serde(flatten)]
212    pub auth: GcpAuthConfig,
213
214    #[configurable(derived)]
215    #[serde(default)]
216    pub batch: BatchConfig<ChronicleUnstructuredDefaultBatchSettings>,
217
218    #[configurable(derived)]
219    pub encoding: EncodingConfig,
220
221    #[serde(default)]
222    #[configurable(derived)]
223    pub compression: ChronicleCompression,
224
225    #[configurable(derived)]
226    #[serde(default)]
227    pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,
228
229    #[configurable(derived)]
230    pub tls: Option<TlsConfig>,
231
232    /// The type of log entries in a request.
233    ///
234    /// This must be one of the [supported log types][unstructured_log_types_doc], otherwise
235    /// Chronicle rejects the entry with an error.
236    ///
237    /// [unstructured_log_types_doc]: https://cloud.google.com/chronicle/docs/ingestion/parser-list/supported-default-parsers
238    #[configurable(metadata(docs::examples = "WINDOWS_DNS", docs::examples = "{{ log_type }}"))]
239    pub log_type: Template,
240
241    /// The default `log_type` to attach to events if the template in `log_type` cannot be resolved.
242    #[configurable(metadata(docs::examples = "VECTOR_DEV"))]
243    pub fallback_log_type: Option<String>,
244
245    #[configurable(derived)]
246    #[serde(
247        default,
248        deserialize_with = "crate::serde::bool_or_struct",
249        skip_serializing_if = "crate::serde::is_default"
250    )]
251    acknowledgements: AcknowledgementsConfig,
252}
253
254fn chronicle_labels_examples() -> HashMap<String, String> {
255    let mut examples = HashMap::new();
256    examples.insert("source".to_string(), "vector".to_string());
257    examples.insert("tenant".to_string(), "marketing".to_string());
258    examples
259}
260
261impl GenerateConfig for ChronicleUnstructuredConfig {
262    fn generate_config() -> toml::Value {
263        toml::from_str(indoc! {r#"
264            credentials_path = "/path/to/credentials.json"
265            customer_id = "customer_id"
266            namespace = "namespace"
267            compression = "gzip"
268            log_type = "log_type"
269            fallback_log_type = "VECTOR_DEV"
270            encoding.codec = "text"
271        "#})
272        .unwrap()
273    }
274}
275
276pub fn build_healthcheck(
277    client: HttpClient,
278    base_url: &str,
279    auth: GcpAuthenticator,
280) -> crate::Result<Healthcheck> {
281    let uri = base_url.parse::<Uri>()?;
282
283    let healthcheck = async move {
284        let mut request = http::Request::get(&uri).body(Body::empty())?;
285        auth.apply(&mut request);
286
287        let response = client.send(request).await?;
288        healthcheck_response(response, GcsHealthcheckError::NotFound.into())
289    };
290
291    Ok(Box::pin(healthcheck))
292}
293
294#[derive(Debug, Snafu)]
295pub enum ChronicleError {
296    #[snafu(display("Region or endpoint not defined"))]
297    RegionOrEndpoint,
298    #[snafu(display("You can only specify one of region or endpoint"))]
299    BothRegionAndEndpoint,
300}
301
302#[async_trait::async_trait]
303#[typetag::serde(name = "gcp_chronicle_unstructured")]
304impl SinkConfig for ChronicleUnstructuredConfig {
305    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
306        let creds = self.auth.build(Scope::MalachiteIngestion).await?;
307
308        let tls = TlsSettings::from_options(self.tls.as_ref())?;
309        let client = HttpClient::new(tls, cx.proxy())?;
310
311        let endpoint = self.create_endpoint("v2/unstructuredlogentries:batchCreate")?;
312
313        // For the healthcheck we see if we can fetch the list of available log types.
314        let healthcheck_endpoint = self.create_endpoint("v2/logtypes")?;
315
316        let healthcheck = build_healthcheck(client.clone(), &healthcheck_endpoint, creds.clone())?;
317        creds.spawn_regenerate_token();
318        let sink = self.build_sink(client, endpoint, creds)?;
319
320        Ok((sink, healthcheck))
321    }
322
323    fn input(&self) -> Input {
324        let requirement =
325            schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
326
327        Input::log().with_schema_requirement(requirement)
328    }
329
330    fn acknowledgements(&self) -> &AcknowledgementsConfig {
331        &self.acknowledgements
332    }
333}
334
335impl ChronicleUnstructuredConfig {
336    fn build_sink(
337        &self,
338        client: HttpClient,
339        base_url: String,
340        creds: GcpAuthenticator,
341    ) -> crate::Result<VectorSink> {
342        use crate::sinks::util::service::ServiceBuilderExt;
343
344        let request = self.request.into_settings();
345
346        let batch_settings = self.batch.into_batcher_settings()?;
347
348        let partitioner = self.partitioner()?;
349
350        let svc = ServiceBuilder::new()
351            .settings(request, GcsRetryLogic::default())
352            .service(ChronicleService::new(client, base_url, creds));
353
354        let request_settings = ChronicleRequestBuilder::new(self)?;
355
356        let sink = ChronicleSink::new(svc, request_settings, partitioner, batch_settings, "http");
357
358        Ok(VectorSink::from_event_streamsink(sink))
359    }
360
361    fn partitioner(&self) -> crate::Result<ChroniclePartitioner> {
362        Ok(ChroniclePartitioner::new(
363            self.log_type.clone(),
364            self.fallback_log_type.clone(),
365            self.namespace.clone(),
366        ))
367    }
368
369    fn create_endpoint(&self, path: &str) -> Result<String, ChronicleError> {
370        Ok(format!(
371            "{}/{}",
372            match (&self.endpoint, self.region) {
373                (Some(endpoint), None) => endpoint.trim_end_matches('/'),
374                (None, Some(region)) => region.endpoint(),
375                (Some(_), Some(_)) => return Err(ChronicleError::BothRegionAndEndpoint),
376                (None, None) => return Err(ChronicleError::RegionOrEndpoint),
377            },
378            path
379        ))
380    }
381}
382
383#[derive(Clone, Debug)]
384pub struct ChronicleRequest {
385    pub body: Bytes,
386    pub finalizers: EventFinalizers,
387    pub headers: HashMap<HeaderName, HeaderValue>,
388    metadata: RequestMetadata,
389}
390
391impl Finalizable for ChronicleRequest {
392    fn take_finalizers(&mut self) -> EventFinalizers {
393        std::mem::take(&mut self.finalizers)
394    }
395}
396
397impl MetaDescriptive for ChronicleRequest {
398    fn get_metadata(&self) -> &RequestMetadata {
399        &self.metadata
400    }
401
402    fn metadata_mut(&mut self) -> &mut RequestMetadata {
403        &mut self.metadata
404    }
405}
406
407#[derive(Clone, Debug, Serialize)]
408struct ChronicleRequestBody {
409    customer_id: String,
410    #[serde(skip_serializing_if = "Option::is_none")]
411    namespace: Option<String>,
412    #[serde(skip_serializing_if = "Option::is_none")]
413    labels: Option<Vec<Label>>,
414    log_type: String,
415    entries: Vec<serde_json::Value>,
416}
417
418#[derive(Clone, Debug)]
419struct ChronicleEncoder {
420    customer_id: String,
421    labels: Option<Vec<Label>>,
422    encoder: codecs::Encoder<()>,
423    transformer: codecs::Transformer,
424}
425
426impl Encoder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleEncoder {
427    fn encode_input(
428        &self,
429        input: (ChroniclePartitionKey, Vec<Event>),
430        writer: &mut dyn io::Write,
431    ) -> io::Result<(usize, GroupedCountByteSize)> {
432        let (key, events) = input;
433        let mut encoder = self.encoder.clone();
434        let mut byte_size = telemetry().create_request_count_byte_size();
435        let events = events
436            .into_iter()
437            .filter_map(|mut event| {
438                let timestamp = event
439                    .as_log()
440                    .get_timestamp()
441                    .and_then(|ts| ts.as_timestamp())
442                    .cloned();
443                let mut bytes = BytesMut::new();
444                self.transformer.transform(&mut event);
445
446                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
447
448                encoder.encode(event, &mut bytes).ok()?;
449
450                let mut value = json!({
451                    "log_text": String::from_utf8_lossy(&bytes),
452                });
453
454                if let Some(ts) = timestamp {
455                    value.as_object_mut().unwrap().insert(
456                        "ts_rfc3339".to_string(),
457                        ts.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
458                            .into(),
459                    );
460                }
461
462                Some(value)
463            })
464            .collect::<Vec<_>>();
465
466        let json = json!(ChronicleRequestBody {
467            customer_id: self.customer_id.clone(),
468            namespace: key.namespace,
469            labels: self.labels.clone(),
470            log_type: key.log_type,
471            entries: events,
472        });
473
474        let size = as_tracked_write::<_, _, io::Error>(writer, &json, |writer, json| {
475            serde_json::to_writer(writer, json)?;
476            Ok(())
477        })?;
478
479        Ok((size, byte_size))
480    }
481}
482
483// Settings required to produce a request that do not change per
484// request. All possible values are pre-computed for direct use in
485// producing a request.
486#[derive(Clone, Debug)]
487struct ChronicleRequestBuilder {
488    encoder: ChronicleEncoder,
489    compression: Compression,
490}
491
492struct ChronicleRequestPayload {
493    bytes: Bytes,
494}
495
496impl From<Bytes> for ChronicleRequestPayload {
497    fn from(bytes: Bytes) -> Self {
498        Self { bytes }
499    }
500}
501
502impl AsRef<[u8]> for ChronicleRequestPayload {
503    fn as_ref(&self) -> &[u8] {
504        self.bytes.as_ref()
505    }
506}
507
508impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBuilder {
509    type Metadata = EventFinalizers;
510    type Events = (ChroniclePartitionKey, Vec<Event>);
511    type Encoder = ChronicleEncoder;
512    type Payload = ChronicleRequestPayload;
513    type Request = ChronicleRequest;
514    type Error = io::Error;
515
516    fn compression(&self) -> Compression {
517        self.compression
518    }
519
520    fn encoder(&self) -> &Self::Encoder {
521        &self.encoder
522    }
523
524    fn split_input(
525        &self,
526        input: (ChroniclePartitionKey, Vec<Event>),
527    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
528        let (partition_key, mut events) = input;
529        let finalizers = events.take_finalizers();
530
531        let builder = RequestMetadataBuilder::from_events(&events);
532        (finalizers, builder, (partition_key, events))
533    }
534
535    fn build_request(
536        &self,
537        finalizers: Self::Metadata,
538        metadata: RequestMetadata,
539        payload: EncodeResult<Self::Payload>,
540    ) -> Self::Request {
541        let mut headers = HashMap::new();
542        headers.insert(
543            header::CONTENT_TYPE,
544            HeaderValue::from_static("application/json"),
545        );
546
547        match payload.compressed_byte_size {
548            Some(compressed_byte_size) => {
549                headers.insert(
550                    header::CONTENT_LENGTH,
551                    HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
552                );
553                headers.insert(
554                    header::CONTENT_ENCODING,
555                    HeaderValue::from_str(self.compression.content_encoding().unwrap()).unwrap(),
556                );
557            }
558            None => {
559                headers.insert(
560                    header::CONTENT_LENGTH,
561                    HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
562                );
563            }
564        }
565
566        ChronicleRequest {
567            headers,
568            body: payload.into_payload().bytes,
569            finalizers,
570            metadata,
571        }
572    }
573}
574
575#[derive(Clone, Debug, Serialize)]
576struct Label {
577    key: String,
578    value: String,
579}
580
581impl ChronicleRequestBuilder {
582    fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
583        let transformer = config.encoding.transformer();
584        let serializer = config.encoding.config().build()?;
585        let compression = Compression::from(config.compression);
586        let encoder = vector_lib::codecs::Encoder::<()>::new(serializer);
587        let encoder = ChronicleEncoder {
588            customer_id: config.customer_id.clone(),
589            labels: config.labels.as_ref().map(|labs| {
590                labs.iter()
591                    .map(|(k, v)| Label {
592                        key: k.to_string(),
593                        value: v.to_string(),
594                    })
595                    .collect::<Vec<_>>()
596            }),
597            encoder,
598            transformer,
599        };
600        Ok(Self {
601            encoder,
602            compression,
603        })
604    }
605}
606
607#[derive(Debug, Clone)]
608pub struct ChronicleService {
609    client: HttpClient,
610    base_url: String,
611    creds: GcpAuthenticator,
612}
613
614impl ChronicleService {
615    pub const fn new(client: HttpClient, base_url: String, creds: GcpAuthenticator) -> Self {
616        Self {
617            client,
618            base_url,
619            creds,
620        }
621    }
622}
623
624#[derive(Debug, Snafu)]
625pub enum ChronicleResponseError {
626    #[snafu(display("Server responded with an error: {}", code))]
627    ServerError { code: StatusCode },
628    #[snafu(display("Failed to make HTTP(S) request: {}", error))]
629    HttpError { error: crate::http::HttpError },
630}
631
632impl Service<ChronicleRequest> for ChronicleService {
633    type Response = GcsResponse;
634    type Error = ChronicleResponseError;
635    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
636
637    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
638        Poll::Ready(Ok(()))
639    }
640
641    fn call(&mut self, request: ChronicleRequest) -> Self::Future {
642        let mut builder = Request::post(&self.base_url);
643        let metadata = request.get_metadata().clone();
644
645        let headers = builder.headers_mut().unwrap();
646        for (name, value) in request.headers {
647            headers.insert(name, value);
648        }
649
650        let mut http_request = builder.body(Body::from(request.body)).unwrap();
651        self.creds.apply(&mut http_request);
652
653        let mut client = self.client.clone();
654        Box::pin(async move {
655            match client.call(http_request).await {
656                Ok(response) => {
657                    let status = response.status();
658                    if status.is_success() {
659                        Ok(GcsResponse {
660                            inner: response,
661                            metadata,
662                        })
663                    } else {
664                        Err(ChronicleResponseError::ServerError { code: status })
665                    }
666                }
667                Err(error) => Err(ChronicleResponseError::HttpError { error }),
668            }
669        })
670    }
671}
672
673#[cfg(test)]
674mod unit_tests {
675    use std::io::Write;
676
677    use tempfile::NamedTempFile;
678    use wiremock::{Mock, MockServer, ResponseTemplate, matchers::method};
679
680    use super::*;
681    use crate::test_util::{random_string, trace_init};
682
683    #[tokio::test]
684    async fn invalid_credentials_rejected_by_oauth_server() {
685        trace_init();
686
687        let mock_server = MockServer::start().await;
688        Mock::given(method("POST"))
689            .respond_with(ResponseTemplate::new(401))
690            .mount(&mock_server)
691            .await;
692
693        let base_url = mock_server.uri();
694        let creds = serde_json::json!({
695            "type": "service_account",
696            "project_id": "test",
697            "private_key_id": "1",
698            "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDouHdVDVz0/M6PGe60Kf/g0nyOxCvbZgiUAZNzFimXDU+RpZ54\n6/oETl6VpRkbp8a4Xb8avll2lsamdHvGcsgnjJXdpp7LfWYLqHEpn0/XFM+womXg\nvglWCDwAsXmrmwpZKEC82mmyFigheyPA/sfuN6z+wa7P5B65xzIdDQX7nQIDAQAB\nAoGBANID/rUDrTrtll8v8Oon6OH0MjIIuOdzKhSfY3h9rKTDf2YaB2xq0KLoMpVr\ne8AoZb5l45t34naR1M3M2xKY7SSDAVJFfg/3Vxeot86DQ23IGLXj7LnNxXnvklXa\nEXaD8LNz/MXxS7/Lu0R+lEtjEkf23+BRb11fL6Q/EDToNHnhAkEA/FnwHhKMc/Bm\nXsS8bENuZP3SV2v7TU6MFTtXJFmsoZBxHnsM8UUi0gq9gBnApmdhy7v2N/Mv9gFI\nviSdr7vm1QJBAOwV3cHAciRHVK71TweOWIJKZBM9ZVut0VDs5GrBYZxGMBiOr3BI\ns7+0ugTKxVimuei6c0KNXw1kg3Vtc5+utakCQQDklAbXBpAomJHxt5zBKBc/7VXx\nEANyk/p5ZOXbLEsdkXuVU3p2tNwEi+v4s9r4H97Kr3goV+SSnbkpWntm6fn9AkBn\nFnE7rlXpA4C12QYGTaDWW7dxM0j0DGUvChH/j6uYuok73+o5hHWAy2DCwOwFduAN\nAIVd1S9hQLeqaf2oB3jpAkEAnRT+bAlMjtUOBO6XPNO4IbYwWJvGMcIEO7zu6AdB\nPJy3/U+bLimxFuYdrs6SnIHIUVdl35AlckHqzT54a5YKqQ==\n-----END RSA PRIVATE KEY-----",
699            "client_email": "test@test.com",
700            "client_id": "1",
701            "auth_uri": format!("{base_url}/o/oauth2/auth"),
702            "token_uri": format!("{base_url}/token"),
703            "auth_provider_x509_cert_url": format!("{base_url}/oauth2/v1/certs"),
704            "client_x509_cert_url": "https://example.com"
705        });
706
707        let mut tmp = NamedTempFile::new().unwrap();
708        write!(tmp, "{creds}").unwrap();
709
710        let log_type = random_string(10);
711        let cx = SinkContext::default();
712        // Normalize to forward slashes so YAML doesn't interpret Windows path separators as escapes.
713        let creds_path = tmp.path().to_str().unwrap().replace('\\', "/");
714        let config: ChronicleUnstructuredConfig = serde_yaml::from_str(&format!(
715            indoc! { r#"
716                endpoint: "http://127.0.0.1:1"
717                customer_id: test-customer
718                credentials_path: "{}"
719                log_type: "{}"
720                encoding:
721                  codec: text
722            "# },
723            creds_path, log_type
724        ))
725        .unwrap();
726        assert!(config.build(cx).await.is_err());
727    }
728}