Skip to main content

vector/sinks/gcp/
pubsub.rs

1use base64::prelude::{BASE64_STANDARD, Engine as _};
2use bytes::{Bytes, BytesMut};
3use futures::{FutureExt, SinkExt};
4use http::{Request, Uri};
5use hyper::Body;
6use indoc::indoc;
7use serde_json::{Value, json};
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::Encoder as _;
10use vector_lib::configurable::configurable_component;
11
12use crate::{
13    codecs::{Encoder, EncodingConfig, Transformer},
14    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15    event::Event,
16    gcp::{GcpAuthConfig, GcpAuthenticator, PUBSUB_URL, Scope},
17    http::HttpClient,
18    sinks::{
19        Healthcheck, UriParseSnafu, VectorSink,
20        gcs_common::config::healthcheck_response,
21        util::{
22            BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
23            http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
24        },
25    },
26    tls::{TlsConfig, TlsSettings},
27};
28
29#[derive(Debug, Snafu)]
30enum HealthcheckError {
31    #[snafu(display("Configured topic not found"))]
32    TopicNotFound,
33}
34
35// 10MB maximum message size: https://cloud.google.com/pubsub/quotas#resource_limits
36const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
37
38#[derive(Clone, Copy, Debug, Default)]
39pub struct PubsubDefaultBatchSettings;
40
41impl SinkBatchSettings for PubsubDefaultBatchSettings {
42    const MAX_EVENTS: Option<usize> = Some(1000);
43    const MAX_BYTES: Option<usize> = Some(10_000_000);
44    const TIMEOUT_SECS: f64 = 1.0;
45}
46
47/// Configuration for the `gcp_pubsub` sink.
48#[configurable_component(sink(
49    "gcp_pubsub",
50    "Publish observability events to GCP's Pub/Sub messaging system."
51))]
52#[derive(Clone, Debug)]
53pub struct PubsubConfig {
54    /// The project name to which to publish events.
55    #[configurable(metadata(docs::examples = "vector-123456"))]
56    pub project: String,
57
58    /// The topic within the project to which to publish events.
59    #[configurable(metadata(docs::examples = "this-is-a-topic"))]
60    pub topic: String,
61
62    /// The endpoint to which to publish events.
63    ///
64    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
65    /// by the [`GCP Pub/Sub`][pubsub_api] API are used.
66    ///
67    /// The trailing slash `/` must not be included.
68    ///
69    /// [pubsub_api]: https://cloud.google.com/pubsub/docs/reference/rest
70    #[serde(default = "default_endpoint")]
71    #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
72    pub endpoint: String,
73
74    #[serde(default, flatten)]
75    pub auth: GcpAuthConfig,
76
77    #[configurable(derived)]
78    #[serde(default)]
79    pub batch: BatchConfig<PubsubDefaultBatchSettings>,
80
81    #[configurable(derived)]
82    #[serde(default)]
83    pub request: TowerRequestConfig,
84
85    #[configurable(derived)]
86    encoding: EncodingConfig,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub tls: Option<TlsConfig>,
91
92    #[configurable(derived)]
93    #[serde(
94        default,
95        deserialize_with = "crate::serde::bool_or_struct",
96        skip_serializing_if = "crate::serde::is_default"
97    )]
98    acknowledgements: AcknowledgementsConfig,
99}
100
101fn default_endpoint() -> String {
102    PUBSUB_URL.to_string()
103}
104
105impl GenerateConfig for PubsubConfig {
106    fn generate_config() -> toml::Value {
107        toml::from_str(indoc! {r#"
108            project = "my-project"
109            topic = "my-topic"
110            encoding.codec = "json"
111        "#})
112        .unwrap()
113    }
114}
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "gcp_pubsub")]
118impl SinkConfig for PubsubConfig {
119    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
120        let sink = PubsubSink::from_config(self).await?;
121        let batch_settings = self
122            .batch
123            .validate()?
124            .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
125            .into_batch_settings()?;
126        let request_settings = self.request.into_settings();
127        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
128        let client = HttpClient::new(tls_settings, cx.proxy())?;
129
130        let healthcheck = healthcheck(client.clone(), sink.uri("")?, sink.auth.clone()).boxed();
131        sink.auth.spawn_regenerate_token();
132
133        let sink = BatchedHttpSink::new(
134            sink,
135            JsonArrayBuffer::new(batch_settings.size),
136            request_settings,
137            batch_settings.timeout,
138            client,
139        )
140        .sink_map_err(|error| error!(message = "Fatal gcp_pubsub sink error.", %error, internal_log_rate_limit = false));
141
142        #[allow(deprecated)]
143        Ok((VectorSink::from_event_sink(sink), healthcheck))
144    }
145
146    fn input(&self) -> Input {
147        Input::new(self.encoding.config().input_type())
148    }
149
150    fn acknowledgements(&self) -> &AcknowledgementsConfig {
151        &self.acknowledgements
152    }
153}
154
155struct PubsubSink {
156    auth: GcpAuthenticator,
157    uri_base: String,
158    transformer: Transformer,
159    encoder: Encoder<()>,
160}
161
162impl PubsubSink {
163    async fn from_config(config: &PubsubConfig) -> crate::Result<Self> {
164        // We only need to load the credentials if we are not targeting an emulator.
165        let auth = config.auth.build(Scope::PubSub).await?;
166
167        let uri_base = format!(
168            "{}/v1/projects/{}/topics/{}",
169            config.endpoint, config.project, config.topic,
170        );
171
172        let transformer = config.encoding.transformer();
173        let serializer = config.encoding.build()?;
174        let encoder = Encoder::<()>::new(serializer);
175
176        Ok(Self {
177            auth,
178            uri_base,
179            transformer,
180            encoder,
181        })
182    }
183
184    fn uri(&self, suffix: &str) -> crate::Result<Uri> {
185        let uri = format!("{}{}", self.uri_base, suffix);
186        let mut uri = uri.parse::<Uri>().context(UriParseSnafu)?;
187        self.auth.apply_uri(&mut uri);
188        Ok(uri)
189    }
190}
191
192struct PubSubSinkEventEncoder {
193    transformer: Transformer,
194    encoder: Encoder<()>,
195}
196
197impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
198    fn encode_event(&mut self, mut event: Event) -> Option<Value> {
199        self.transformer.transform(&mut event);
200        let mut bytes = BytesMut::new();
201        // Errors are handled by `Encoder`.
202        self.encoder.encode(event, &mut bytes).ok()?;
203        // Each event needs to be base64 encoded, and put into a JSON object
204        // as the `data` item.
205        Some(json!({ "data": BASE64_STANDARD.encode(&bytes) }))
206    }
207}
208
209impl HttpSink for PubsubSink {
210    type Input = Value;
211    type Output = Vec<BoxedRawValue>;
212    type Encoder = PubSubSinkEventEncoder;
213
214    fn build_encoder(&self) -> Self::Encoder {
215        PubSubSinkEventEncoder {
216            transformer: self.transformer.clone(),
217            encoder: self.encoder.clone(),
218        }
219    }
220
221    async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
222        let body = json!({ "messages": events });
223        let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
224
225        let uri = self.uri(":publish").unwrap();
226        let builder = Request::post(uri).header("Content-Type", "application/json");
227
228        let mut request = builder.body(body).unwrap();
229        self.auth.apply(&mut request);
230
231        Ok(request)
232    }
233}
234
235async fn healthcheck(client: HttpClient, uri: Uri, auth: GcpAuthenticator) -> crate::Result<()> {
236    let mut request = Request::get(uri).body(Body::empty()).unwrap();
237    auth.apply(&mut request);
238
239    let response = client.send(request).await?;
240    healthcheck_response(response, HealthcheckError::TopicNotFound.into())
241}
242
243#[cfg(test)]
244mod tests {
245    use indoc::indoc;
246
247    use super::*;
248
249    #[test]
250    fn generate_config() {
251        crate::test_util::test_generate_config::<PubsubConfig>();
252    }
253
254    #[tokio::test]
255    async fn fails_missing_creds() {
256        let config: PubsubConfig = serde_yaml::from_str(indoc! {r#"
257                project: project
258                topic: topic
259                encoding:
260                  codec: json
261            "#})
262        .unwrap();
263        if config.build(SinkContext::default()).await.is_ok() {
264            panic!("config.build failed to error");
265        }
266    }
267}
268
269#[cfg(all(test, feature = "gcp-integration-tests"))]
270mod integration_tests {
271    use reqwest::{Client, Method, Response};
272    use serde::{Deserialize, Serialize};
273    use serde_json::{Value, json};
274    use vector_lib::{
275        codecs::JsonSerializerConfig,
276        event::{BatchNotifier, BatchStatus},
277    };
278
279    use super::*;
280    use crate::{
281        gcp,
282        test_util::{
283            components::{
284                COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS, run_and_assert_sink_compliance,
285                run_and_assert_sink_error,
286            },
287            random_events_with_stream, random_metrics_with_stream, random_string, trace_init,
288        },
289    };
290
291    const PROJECT: &str = "testproject";
292
293    fn config(topic: &str) -> PubsubConfig {
294        PubsubConfig {
295            project: PROJECT.into(),
296            topic: topic.into(),
297            endpoint: gcp::PUBSUB_ADDRESS.clone(),
298            auth: GcpAuthConfig {
299                skip_authentication: true,
300                ..Default::default()
301            },
302            batch: Default::default(),
303            request: Default::default(),
304            encoding: JsonSerializerConfig::default().into(),
305            tls: Default::default(),
306            acknowledgements: Default::default(),
307        }
308    }
309
310    async fn config_build(topic: &str) -> (VectorSink, crate::sinks::Healthcheck) {
311        let cx = SinkContext::default();
312        config(topic).build(cx).await.expect("Building sink failed")
313    }
314
315    #[tokio::test]
316    async fn publish_metrics() {
317        trace_init();
318
319        let (topic, subscription) = create_topic_subscription().await;
320        let (sink, healthcheck) = config_build(&topic).await;
321
322        healthcheck.await.expect("Health check failed");
323
324        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
325        let (input, events) = random_metrics_with_stream(100, Some(batch), None);
326        run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
327        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
328
329        let response = pull_messages(&subscription, 1000).await;
330        let messages = response
331            .receivedMessages
332            .as_ref()
333            .expect("Response is missing messages");
334        assert_eq!(input.len(), messages.len());
335        for i in 0..input.len() {
336            let data = messages[i].message.decode_data_as_value();
337            let data = serde_json::to_value(data).unwrap();
338            let expected = serde_json::to_value(input[i].as_metric()).unwrap();
339            assert_eq!(data, expected);
340        }
341    }
342
343    #[tokio::test]
344    async fn publish_events() {
345        trace_init();
346
347        let (topic, subscription) = create_topic_subscription().await;
348        let (sink, healthcheck) = config_build(&topic).await;
349
350        healthcheck.await.expect("Health check failed");
351
352        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
353        let (input, events) = random_events_with_stream(100, 100, Some(batch));
354        run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
355        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
356
357        let response = pull_messages(&subscription, 1000).await;
358        let messages = response
359            .receivedMessages
360            .as_ref()
361            .expect("Response is missing messages");
362        assert_eq!(input.len(), messages.len());
363        for i in 0..input.len() {
364            let data = messages[i].message.decode_data();
365            let data = serde_json::to_value(data).unwrap();
366            let expected =
367                serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
368            assert_eq!(data, expected);
369        }
370    }
371
372    #[tokio::test]
373    async fn publish_events_broken_topic() {
374        trace_init();
375
376        let (topic, _subscription) = create_topic_subscription().await;
377        let (sink, _healthcheck) = config_build(&format!("BREAK{topic}BREAK")).await;
378        // Explicitly skip healthcheck
379
380        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
381        let (_input, events) = random_events_with_stream(100, 100, Some(batch));
382        run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
383        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
384    }
385
386    #[tokio::test]
387    async fn checks_for_valid_topic() {
388        trace_init();
389
390        let (topic, _subscription) = create_topic_subscription().await;
391        let topic = format!("BAD{topic}");
392        let (_sink, healthcheck) = config_build(&topic).await;
393        healthcheck.await.expect_err("Health check did not fail");
394    }
395
396    async fn create_topic_subscription() -> (String, String) {
397        let topic = format!("topic-{}", random_string(10));
398        let subscription = format!("subscription-{}", random_string(10));
399        request(Method::PUT, &format!("topics/{topic}"), json!({}))
400            .await
401            .json::<Value>()
402            .await
403            .expect("Creating new topic failed");
404        request(
405            Method::PUT,
406            &format!("subscriptions/{subscription}"),
407            json!({ "topic": format!("projects/{}/topics/{}", PROJECT, topic) }),
408        )
409        .await
410        .json::<Value>()
411        .await
412        .expect("Creating new subscription failed");
413        (topic, subscription)
414    }
415
416    async fn request(method: Method, path: &str, json: Value) -> Response {
417        let url = format!("{}/v1/projects/{}/{}", *gcp::PUBSUB_ADDRESS, PROJECT, path);
418        Client::new()
419            .request(method.clone(), &url)
420            .json(&json)
421            .send()
422            .await
423            .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
424    }
425
426    async fn pull_messages(subscription: &str, count: usize) -> PullResponse {
427        request(
428            Method::POST,
429            &format!("subscriptions/{subscription}:pull"),
430            json!({
431                "returnImmediately": true,
432                "maxMessages": count
433            }),
434        )
435        .await
436        .json::<PullResponse>()
437        .await
438        .expect("Extracting pull data failed")
439    }
440
441    #[derive(Debug, Deserialize)]
442    #[allow(non_snake_case)]
443    struct PullResponse {
444        receivedMessages: Option<Vec<PullMessageOuter>>,
445    }
446
447    #[derive(Debug, Deserialize)]
448    #[allow(non_snake_case)]
449    #[allow(dead_code)] // deserialize all fields
450    struct PullMessageOuter {
451        ackId: String,
452        message: PullMessage,
453    }
454
455    #[derive(Debug, Deserialize)]
456    #[allow(non_snake_case)]
457    #[allow(dead_code)] // deserialize all fields
458    struct PullMessage {
459        data: String,
460        messageId: String,
461        publishTime: String,
462    }
463
464    impl PullMessage {
465        fn decode_data(&self) -> TestMessage {
466            let data = BASE64_STANDARD
467                .decode(&self.data)
468                .expect("Invalid base64 data");
469            let data = String::from_utf8_lossy(&data);
470            serde_json::from_str(&data).expect("Invalid message structure")
471        }
472
473        fn decode_data_as_value(&self) -> Value {
474            let data = BASE64_STANDARD
475                .decode(&self.data)
476                .expect("Invalid base64 data");
477            let data = String::from_utf8_lossy(&data);
478            serde_json::from_str(&data).expect("Invalid json")
479        }
480    }
481
482    #[derive(Debug, Deserialize, Serialize)]
483    struct TestMessage {
484        timestamp: String,
485        message: String,
486    }
487}