Skip to main content

vector/sources/
logstash.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    convert::TryFrom,
4    io::{self, Read},
5    net::SocketAddr,
6    num::NonZeroUsize,
7    time::Duration,
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use flate2::read::ZlibDecoder;
12use smallvec::{SmallVec, smallvec};
13use snafu::{ResultExt, Snafu};
14use tokio_util::codec::Decoder;
15use vector_lib::{
16    codecs::{BytesDeserializerConfig, StreamDecodingError},
17    config::{LegacyKey, LogNamespace},
18    configurable::configurable_component,
19    ipallowlist::IpAllowlistConfig,
20    lookup::{OwnedValuePath, event_path, metadata_path, owned_value_path, path},
21    schema::Definition,
22};
23use vrl::value::{KeyString, Kind, kind::Collection};
24
25use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
26use crate::{
27    config::{
28        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
29        SourceContext, SourceOutput, log_schema,
30    },
31    event::{Event, LogEvent, Value},
32    serde::bool_or_struct,
33    tcp::TcpKeepaliveConfig,
34    tls::{MaybeTlsSettings, TlsSourceConfig},
35    types,
36};
37
38/// Configuration for the `logstash` source.
39#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
40#[derive(Clone, Debug)]
41pub struct LogstashConfig {
42    #[configurable(derived)]
43    address: SocketListenAddr,
44
45    #[configurable(derived)]
46    #[configurable(metadata(docs::advanced))]
47    keepalive: Option<TcpKeepaliveConfig>,
48
49    #[configurable(derived)]
50    pub permit_origin: Option<IpAllowlistConfig>,
51
52    #[configurable(derived)]
53    tls: Option<TlsSourceConfig>,
54
55    /// The size of the receive buffer used for each connection.
56    #[configurable(metadata(docs::type_unit = "bytes"))]
57    #[configurable(metadata(docs::examples = 65536))]
58    #[configurable(metadata(docs::advanced))]
59    receive_buffer_bytes: Option<usize>,
60
61    /// The maximum number of TCP connections that are allowed at any given time.
62    #[configurable(metadata(docs::type_unit = "connections"))]
63    #[configurable(metadata(docs::advanced))]
64    connection_limit: Option<u32>,
65
66    #[configurable(derived)]
67    #[serde(default, deserialize_with = "bool_or_struct")]
68    acknowledgements: SourceAcknowledgementsConfig,
69
70    /// The namespace to use for logs. This overrides the global setting.
71    #[configurable(metadata(docs::hidden))]
72    #[serde(default)]
73    log_namespace: Option<bool>,
74}
75
76impl LogstashConfig {
77    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
78    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
79        // `host_key` is only inserted if not present already.
80        let host_key = log_schema()
81            .host_key()
82            .cloned()
83            .map(LegacyKey::InsertIfEmpty);
84
85        let tls_client_metadata_path = self
86            .tls
87            .as_ref()
88            .and_then(|tls| tls.client_metadata_key.as_ref())
89            .and_then(|k| k.path.clone())
90            .map(LegacyKey::Overwrite);
91
92        BytesDeserializerConfig
93            .schema_definition(log_namespace)
94            .with_standard_vector_source_metadata()
95            .with_source_metadata(
96                LogstashConfig::NAME,
97                None,
98                &owned_value_path!("timestamp"),
99                Kind::timestamp().or_undefined(),
100                Some("timestamp"),
101            )
102            .with_source_metadata(
103                LogstashConfig::NAME,
104                host_key,
105                &owned_value_path!("host"),
106                Kind::bytes(),
107                Some("host"),
108            )
109            .with_source_metadata(
110                Self::NAME,
111                tls_client_metadata_path,
112                &owned_value_path!("tls_client_metadata"),
113                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
114                None,
115            )
116    }
117}
118
119impl Default for LogstashConfig {
120    fn default() -> Self {
121        Self {
122            address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
123            keepalive: None,
124            permit_origin: None,
125            tls: None,
126            receive_buffer_bytes: None,
127            acknowledgements: Default::default(),
128            connection_limit: None,
129            log_namespace: None,
130        }
131    }
132}
133
134impl GenerateConfig for LogstashConfig {
135    fn generate_config() -> toml::Value {
136        toml::Value::try_from(LogstashConfig::default()).unwrap()
137    }
138}
139
140#[async_trait::async_trait]
141#[typetag::serde(name = "logstash")]
142impl SourceConfig for LogstashConfig {
143    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
144        let log_namespace = cx.log_namespace(self.log_namespace);
145        let source = LogstashSource {
146            timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
147            legacy_host_key_path: log_schema().host_key().cloned(),
148            log_namespace,
149        };
150        let shutdown_secs = Duration::from_secs(30);
151        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
152        let tls_client_metadata_key = self
153            .tls
154            .as_ref()
155            .and_then(|tls| tls.client_metadata_key.clone())
156            .and_then(|k| k.path);
157
158        let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
159        source.run(
160            self.address,
161            self.keepalive,
162            shutdown_secs,
163            tls,
164            tls_client_metadata_key,
165            self.receive_buffer_bytes,
166            None,
167            cx,
168            self.acknowledgements,
169            self.connection_limit,
170            self.permit_origin.clone().map(Into::into),
171            LogstashConfig::NAME,
172            log_namespace,
173        )
174    }
175
176    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177        // There is a global and per-source `log_namespace` config.
178        // The source config overrides the global setting and is merged here.
179        vec![SourceOutput::new_maybe_logs(
180            DataType::Log,
181            self.schema_definition(global_log_namespace.merge(self.log_namespace)),
182        )]
183    }
184
185    fn resources(&self) -> Vec<Resource> {
186        vec![self.address.as_tcp_resource()]
187    }
188
189    fn can_acknowledge(&self) -> bool {
190        true
191    }
192}
193
194#[derive(Debug, Clone)]
195struct LogstashSource {
196    timestamp_converter: types::Conversion,
197    log_namespace: LogNamespace,
198    legacy_host_key_path: Option<OwnedValuePath>,
199}
200
201impl TcpSource for LogstashSource {
202    type Error = DecodeError;
203    type Item = LogstashEventFrame;
204    type Decoder = LogstashDecoder;
205    type Acker = LogstashAcker;
206
207    fn decoder(&self) -> Self::Decoder {
208        LogstashDecoder::new()
209    }
210
211    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
212        let now = chrono::Utc::now();
213        for event in events {
214            let log = event.as_mut_log();
215
216            self.log_namespace.insert_vector_metadata(
217                log,
218                log_schema().source_type_key(),
219                path!("source_type"),
220                Bytes::from_static(LogstashConfig::NAME.as_bytes()),
221            );
222
223            let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
224                self.timestamp_converter
225                    .convert::<Value>(timestamp.coerce_to_bytes())
226                    .ok()
227            });
228
229            // Vector: always insert `ingest_timestamp`. Insert `timestamp` if found in event.
230            //
231            // Legacy: always insert the global log schema timestamp key- use timestamp from
232            //         event if present, otherwise use ingest.
233            match self.log_namespace {
234                LogNamespace::Vector => {
235                    if let Some(timestamp) = log_timestamp {
236                        log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
237                    }
238                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
239                }
240                LogNamespace::Legacy => {
241                    if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
242                        log.insert(
243                            timestamp_key,
244                            log_timestamp.unwrap_or_else(|| Value::from(now)),
245                        );
246                    }
247                }
248            }
249
250            self.log_namespace.insert_source_metadata(
251                LogstashConfig::NAME,
252                log,
253                self.legacy_host_key_path
254                    .as_ref()
255                    .map(LegacyKey::InsertIfEmpty),
256                path!("host"),
257                host.ip().to_string(),
258            );
259        }
260    }
261
262    fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
263        LogstashAcker::new(frames)
264    }
265}
266
267struct LogstashAcker {
268    // Batched reads can contain multiple writer windows. Preserve a separate
269    // ACK point for each completed window so Filebeat never sees an ACK that
270    // advances past the current window it is waiting on. If the batch ends in
271    // the middle of a window, ACK the last received event in that final ACK
272    // domain so clients are not forced to wait for the advertised window size.
273    // Lumberjack defines WindowSize as a maximum unacked count, so a sender can
274    // legitimately advertise a fresh window after a previously ACKed partial
275    // tail. Within a single ReadyFrames batch, the only incomplete ACK domain
276    // we can represent independently is the final tail we have actually seen.
277    // We expect most batches to need only one ACK point, either for a single
278    // completed window or for one partial tail. Multiple ACKs are only needed
279    // when ReadyFrames coalesces multiple logical windows into one batch.
280    acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>,
281}
282
283impl LogstashAcker {
284    fn new(frames: &[LogstashEventFrame]) -> Self {
285        let acknowledgements = frames
286            .iter()
287            .enumerate()
288            // ACK each completed writer window and the last frame in a partial batch if ReadyFrames
289            // flushes before the current window is complete.
290            .filter(|(index, frame)| frame.window_end || index + 1 == frames.len())
291            .map(|(_, frame)| (frame.protocol, frame.sequence_number))
292            .collect();
293
294        Self { acknowledgements }
295    }
296}
297
298impl TcpSourceAcker for LogstashAcker {
299    // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
300    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
301        match ack {
302            TcpSourceAck::Ack if !self.acknowledgements.is_empty() => {
303                let mut bytes: Vec<u8> = Vec::with_capacity(self.acknowledgements.len() * 6);
304                for (protocol_version, sequence_number) in self.acknowledgements {
305                    bytes.push(protocol_version.into());
306                    bytes.push(LogstashFrameType::Ack.into());
307                    bytes.extend(sequence_number.to_be_bytes().iter());
308                }
309                Some(Bytes::from(bytes))
310            }
311            _ => None,
312        }
313    }
314}
315
316#[derive(Debug)]
317enum LogstashDecoderReadState {
318    ReadProtocol,
319    ReadType(LogstashProtocolVersion),
320    ReadFrame(LogstashProtocolVersion, LogstashFrameType),
321    PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
322}
323
324#[derive(Debug)]
325struct LogstashDecoder {
326    state: LogstashDecoderReadState,
327    // Tracks how many events remain in the current writer window. This lets us
328    // preserve sender window boundaries even if ReadyFrames later batches
329    // multiple decoded windows together before ACKing.
330    window_events_remaining: Option<NonZeroUsize>,
331}
332
333impl LogstashDecoder {
334    const fn new() -> Self {
335        Self::new_with_window_events_remaining(None)
336    }
337
338    const fn new_with_window_events_remaining(
339        window_events_remaining: Option<NonZeroUsize>,
340    ) -> Self {
341        Self {
342            state: LogstashDecoderReadState::ReadProtocol,
343            window_events_remaining,
344        }
345    }
346
347    /// Marks whether a decoded frame closes the current writer window.
348    ///
349    /// Filebeat expects ACKs to stay within the current window announced by the
350    /// most recent `WindowSize` frame. The generic TCP batching layer can merge
351    /// frames from multiple windows before we build an ACK, so we record the
352    /// per-frame window boundary here and let the acker emit one ACK frame per
353    /// completed window later.
354    ///
355    /// If a sender omits `WindowSize`, we keep the previous behavior and treat
356    /// each standalone frame as ACKable on its own.
357    const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) {
358        match self.window_events_remaining {
359            Some(remaining) if remaining.get() == 1 => {
360                frame.window_end = true;
361                self.window_events_remaining = None;
362            }
363            Some(remaining) => {
364                frame.window_end = false;
365                self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); // safe because we know remaining is greater than 1
366            }
367            None => {
368                // Preserve existing behavior for inputs that send standalone data frames
369                // without an explicit WindowSize frame.
370                frame.window_end = true;
371            }
372        }
373    }
374}
375
376#[derive(Debug, Snafu)]
377pub enum DecodeError {
378    #[snafu(display("i/o error: {}", source))]
379    IO { source: io::Error },
380    #[snafu(display("Unknown logstash protocol version: {}", version))]
381    UnknownProtocolVersion { version: char },
382    #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
383    UnknownFrameType { frame_type: char },
384    #[snafu(display("Failed to decode JSON frame: {}", source))]
385    JsonFrameFailedDecode { source: serde_json::Error },
386    #[snafu(display("Failed to decompress compressed frame: {}", source))]
387    DecompressionFailed { source: io::Error },
388}
389
390impl StreamDecodingError for DecodeError {
391    fn can_continue(&self) -> bool {
392        // No decode error is recoverable on this stream. Lumberjack is a
393        // length-prefixed binary protocol with no resync marker, so once a
394        // frame fails to decode the stream position is no longer trustworthy:
395        // continuing would misframe subsequent bytes and emit ACKs for bogus
396        // sequence numbers.
397        false
398    }
399}
400
401impl From<io::Error> for DecodeError {
402    fn from(source: io::Error) -> Self {
403        DecodeError::IO { source }
404    }
405}
406
407#[derive(Debug, Clone, Copy)]
408enum LogstashProtocolVersion {
409    V1, // 1
410    V2, // 2
411}
412
413impl From<LogstashProtocolVersion> for u8 {
414    fn from(frame_type: LogstashProtocolVersion) -> u8 {
415        use LogstashProtocolVersion::*;
416
417        match frame_type {
418            V1 => b'1',
419            V2 => b'2',
420        }
421    }
422}
423
424impl TryFrom<u8> for LogstashProtocolVersion {
425    type Error = DecodeError;
426
427    fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
428        use LogstashProtocolVersion::*;
429
430        match frame_type {
431            b'1' => Ok(V1),
432            b'2' => Ok(V2),
433            version => Err(DecodeError::UnknownProtocolVersion {
434                version: version as char,
435            }),
436        }
437    }
438}
439
440#[derive(Debug, Clone, Copy)]
441enum LogstashFrameType {
442    Ack,        // A
443    WindowSize, // W
444    Data,       // D
445    Json,       // J
446    Compressed, // C
447}
448
449impl From<LogstashFrameType> for u8 {
450    fn from(frame_type: LogstashFrameType) -> u8 {
451        use LogstashFrameType::*;
452
453        match frame_type {
454            Ack => b'A',
455            WindowSize => b'W',
456            Data => b'D',
457            Json => b'J',
458            Compressed => b'C',
459        }
460    }
461}
462
463impl TryFrom<u8> for LogstashFrameType {
464    type Error = DecodeError;
465
466    fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
467        use LogstashFrameType::*;
468
469        match frame_type {
470            b'A' => Ok(Ack),
471            b'W' => Ok(WindowSize),
472            b'D' => Ok(Data),
473            b'J' => Ok(Json),
474            b'C' => Ok(Compressed),
475            frame_type => Err(DecodeError::UnknownFrameType {
476                frame_type: frame_type as char,
477            }),
478        }
479    }
480}
481
482/// Normalized event from logstash frame
483#[derive(Debug)]
484struct LogstashEventFrame {
485    protocol: LogstashProtocolVersion,
486    sequence_number: u32,
487    fields: BTreeMap<KeyString, serde_json::Value>,
488    window_end: bool,
489}
490
491struct DecodedCompressedFrames {
492    frames: VecDeque<(LogstashEventFrame, usize)>,
493    window_events_remaining: Option<NonZeroUsize>,
494}
495
496// Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md
497// And implementation from logstash: https://github.com/logstash-plugins/logstash-input-beats/blob/27bad62a26a81fc000a9d21495b8dc7174ab63e9/src/main/java/org/logstash/beats/BeatsParser.java
498impl Decoder for LogstashDecoder {
499    type Item = (LogstashEventFrame, usize);
500    type Error = DecodeError;
501
502    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
503        // This implements a sort of simple state machine to read the frames from the wire
504        //
505        // Each matched arm with either:
506        // * Return that there is not enough data
507        // * Return an error
508        // * Read some bytes and advance the state
509        loop {
510            self.state = match self.state {
511                // if we have any unsent frames, send them before reading new logstash frame
512                LogstashDecoderReadState::PendingFrames(ref mut frames) => {
513                    match frames.pop_front() {
514                        Some(frame) => return Ok(Some(frame)),
515                        None => LogstashDecoderReadState::ReadProtocol,
516                    }
517                }
518                LogstashDecoderReadState::ReadProtocol => {
519                    if src.remaining() < 1 {
520                        return Ok(None);
521                    }
522
523                    use LogstashProtocolVersion::*;
524
525                    match LogstashProtocolVersion::try_from(src.get_u8())? {
526                        V1 => LogstashDecoderReadState::ReadType(V1),
527                        V2 => LogstashDecoderReadState::ReadType(V2),
528                    }
529                }
530                LogstashDecoderReadState::ReadType(protocol) => {
531                    if src.remaining() < 1 {
532                        return Ok(None);
533                    }
534
535                    use LogstashFrameType::*;
536
537                    match LogstashFrameType::try_from(src.get_u8())? {
538                        WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
539                        Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
540                        Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
541                        Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
542                        Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
543                    }
544                }
545                // The window size indicates how many events the writer will send before waiting
546                // for acks. We preserve this boundary so the acker can emit one ACK per
547                // completed window, even if multiple windows are batched together later.
548                // Filebeat accepts cumulative ACKs, but not ACKs that advance past the
549                // current writer window it is waiting on. WindowSize is a maximum unacked
550                // count, not necessarily an exact count of immediately following frames, so a
551                // sender can legitimately advertise a new window after a previously ACKed
552                // partial tail. If a malformed sender does this before that earlier tail has
553                // actually been ACKed, we tolerate the reset here even though it can collapse
554                // the older incomplete domain into the new one.
555                //
556                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type
557                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
558                    if src.remaining() < 4 {
559                        return Ok(None);
560                    }
561
562                    let window_size = src.get_u32() as usize;
563                    self.window_events_remaining = NonZeroUsize::new(window_size);
564
565                    LogstashDecoderReadState::ReadProtocol
566                }
567                // we shouldn't receive acks from the writer, just skip
568                //
569                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
570                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
571                    if src.remaining() < 4 {
572                        return Ok(None);
573                    }
574
575                    let _sequence_number = src.get_u32();
576
577                    LogstashDecoderReadState::ReadProtocol
578                }
579                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type
580                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
581                    let Some((mut frame, byte_size)) = decode_data_frame(protocol, src) else {
582                        return Ok(None);
583                    };
584                    self.annotate_frame(&mut frame);
585
586                    LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into())
587                }
588                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type
589                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
590                    let Some((mut frame, byte_size)) = decode_json_frame(protocol, src)? else {
591                        return Ok(None);
592                    };
593                    self.annotate_frame(&mut frame);
594
595                    LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into())
596                }
597                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type
598                //
599                // The compressed payload is still part of the same logical Lumberjack stream, so
600                // the nested decoder must inherit the current window state and return the updated
601                // state after expanding the payload. Re-annotating the emitted frames here would
602                // overwrite any WindowSize boundaries that were established inside the compressed
603                // payload and can also lose progress from a partially consumed outer window.
604                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
605                    let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)?
606                    else {
607                        return Ok(None);
608                    };
609                    self.window_events_remaining = decoded.window_events_remaining;
610
611                    LogstashDecoderReadState::PendingFrames(decoded.frames)
612                }
613            };
614        }
615    }
616}
617
618/// Decode the Lumberjack version 1 protocol, which use the Key:Value format.
619fn decode_data_frame(
620    protocol: LogstashProtocolVersion,
621    src: &mut BytesMut,
622) -> Option<(LogstashEventFrame, usize)> {
623    let mut rest = src.as_ref();
624
625    if rest.remaining() < 8 {
626        return None;
627    }
628    let sequence_number = rest.get_u32();
629    let pair_count = rest.get_u32();
630    if pair_count == 0 {
631        return None; // Invalid number of fields
632    }
633
634    let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
635    for _ in 0..pair_count {
636        let (key, value, right) = decode_pair(rest)?;
637        rest = right;
638
639        fields.insert(
640            String::from_utf8_lossy(key).into(),
641            String::from_utf8_lossy(value).into(),
642        );
643    }
644
645    let byte_size = bytes_remaining(src, rest);
646    src.advance(byte_size);
647
648    Some((
649        LogstashEventFrame {
650            protocol,
651            sequence_number,
652            fields,
653            window_end: false,
654        },
655        byte_size,
656    ))
657}
658
659fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
660    if rest.remaining() < 4 {
661        return None;
662    }
663    let key_length = rest.get_u32() as usize;
664
665    if rest.remaining() < key_length {
666        return None;
667    }
668    let (key, right) = rest.split_at(key_length);
669    rest = right;
670
671    if rest.remaining() < 4 {
672        return None;
673    }
674    let value_length = rest.get_u32() as usize;
675    if rest.remaining() < value_length {
676        return None;
677    }
678    let (value, right) = rest.split_at(value_length);
679    Some((key, value, right))
680}
681
682fn decode_json_frame(
683    protocol: LogstashProtocolVersion,
684    src: &mut BytesMut,
685) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
686    let mut rest = src.as_ref();
687
688    if rest.remaining() < 8 {
689        return Ok(None);
690    }
691    let sequence_number = rest.get_u32();
692    let payload_size = rest.get_u32() as usize;
693
694    if rest.remaining() < payload_size {
695        return Ok(None);
696    }
697
698    let (slice, right) = rest.split_at(payload_size);
699    rest = right;
700
701    let fields: BTreeMap<KeyString, serde_json::Value> =
702        serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
703
704    let byte_size = bytes_remaining(src, rest);
705    src.advance(byte_size);
706
707    Ok(Some((
708        LogstashEventFrame {
709            protocol,
710            sequence_number,
711            fields,
712            window_end: false,
713        },
714        byte_size,
715    )))
716}
717
718fn decode_compressed_frame(
719    src: &mut BytesMut,
720    window_events_remaining: Option<NonZeroUsize>,
721) -> Result<Option<DecodedCompressedFrames>, DecodeError> {
722    let mut rest = src.as_ref();
723
724    if rest.remaining() < 4 {
725        return Ok(None);
726    }
727    let payload_size = rest.get_u32() as usize;
728
729    if rest.remaining() < payload_size {
730        src.reserve(payload_size);
731        return Ok(None);
732    }
733
734    let (slice, right) = rest.split_at(payload_size);
735    rest = right;
736
737    let mut buf = Vec::new();
738
739    let res = ZlibDecoder::new(io::Cursor::new(slice))
740        .read_to_end(&mut buf)
741        .context(DecompressionFailedSnafu)
742        .map(|_| BytesMut::from(&buf[..]));
743
744    let byte_size = bytes_remaining(src, rest);
745    src.advance(byte_size);
746
747    let mut buf = res?;
748
749    let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining);
750
751    let mut frames = VecDeque::new();
752
753    while let Some(s) = decoder.decode(&mut buf)? {
754        frames.push_back(s);
755    }
756    Ok(Some(DecodedCompressedFrames {
757        frames,
758        window_events_remaining: decoder.window_events_remaining,
759    }))
760}
761
762fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
763    let remaining = rest.remaining();
764    src.remaining() - remaining
765}
766
767impl From<LogstashEventFrame> for Event {
768    fn from(frame: LogstashEventFrame) -> Self {
769        Event::Log(LogEvent::from(
770            frame
771                .fields
772                .into_iter()
773                .map(|(key, value)| (key, Value::from(value)))
774                .collect::<BTreeMap<_, _>>(),
775        ))
776    }
777}
778
779impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
780    fn from(frame: LogstashEventFrame) -> Self {
781        smallvec![frame.into()]
782    }
783}
784
785#[cfg(test)]
786mod test {
787    use std::io::Write;
788
789    use bytes::BufMut;
790    use flate2::{Compression, write::ZlibEncoder};
791    use futures::{Stream, StreamExt, stream};
792    use rand::{Rng, rng};
793    use tokio::io::{AsyncReadExt, AsyncWriteExt};
794    use vector_lib::codecs::ReadyFrames;
795    use vector_lib::lookup::OwnedTargetPath;
796    use vrl::value::kind::Collection;
797
798    use super::*;
799    use crate::{
800        SourceSender,
801        event::EventStatus,
802        test_util::{
803            addr::next_addr,
804            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
805            spawn_collect_n, wait_for_tcp,
806        },
807    };
808
809    #[test]
810    fn generate_config() {
811        crate::test_util::test_generate_config::<LogstashConfig>();
812    }
813
814    #[tokio::test]
815    async fn test_delivered() {
816        test_protocol(EventStatus::Delivered, true).await;
817    }
818
819    #[tokio::test]
820    async fn test_failed() {
821        test_protocol(EventStatus::Rejected, false).await;
822    }
823
824    async fn start_logstash(
825        status: EventStatus,
826    ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
827        let (sender, recv) = SourceSender::new_test_finalize(status);
828        let (_guard, address) = next_addr();
829        let source = LogstashConfig {
830            address: address.into(),
831            tls: None,
832            permit_origin: None,
833            keepalive: None,
834            receive_buffer_bytes: None,
835            acknowledgements: true.into(),
836            connection_limit: None,
837            log_namespace: None,
838        }
839        .build(SourceContext::new_test(sender, None))
840        .await
841        .unwrap();
842        tokio::spawn(source);
843        wait_for_tcp(address).await;
844        (address, recv)
845    }
846
847    async fn test_protocol(status: EventStatus, sends_ack: bool) {
848        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
849            let (address, recv) = start_logstash(status).await;
850            spawn_collect_n(
851                send_req(address, &[("message", "Hello, world!")], sends_ack),
852                recv,
853                1,
854            )
855            .await
856        })
857        .await;
858
859        assert_eq!(events.len(), 1);
860        let log = events[0].as_log();
861        assert_eq!(
862            log.get("message").unwrap().to_string_lossy(),
863            "Hello, world!".to_string()
864        );
865        assert_eq!(
866            log.get("source_type").unwrap().to_string_lossy(),
867            "logstash".to_string()
868        );
869        assert!(log.get("host").is_some());
870        assert!(log.get("timestamp").is_some());
871    }
872
873    fn push_req(req: &mut BytesMut, seq: u32, pairs: &[(&str, &str)]) {
874        req.put_u8(b'2');
875        req.put_u8(b'D');
876        req.put_u32(seq);
877        req.put_u32(pairs.len() as u32);
878        for (key, value) in pairs {
879            req.put_u32(key.len() as u32);
880            req.put(key.as_bytes());
881            req.put_u32(value.len() as u32);
882            req.put(value.as_bytes());
883        }
884    }
885
886    fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
887        let mut req = BytesMut::new();
888        push_req(&mut req, seq, pairs);
889        req.into()
890    }
891
892    fn push_window_size(req: &mut BytesMut, size: u32) {
893        req.put_u8(b'2');
894        req.put_u8(b'W');
895        req.put_u32(size);
896    }
897
898    fn push_compressed(req: &mut BytesMut, inner: &[u8]) {
899        let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
900        encoder.write_all(inner).unwrap();
901        let compressed = encoder.finish().unwrap();
902
903        req.put_u8(b'2');
904        req.put_u8(b'C');
905        req.put_u32(compressed.len() as u32);
906        req.put(compressed.as_slice());
907    }
908
909    fn decode_frames(mut src: BytesMut) -> Vec<(LogstashEventFrame, usize)> {
910        let mut decoder = LogstashDecoder::new();
911        let mut frames = Vec::new();
912
913        while let Some(frame) = decoder.decode(&mut src).unwrap() {
914            frames.push(frame);
915        }
916
917        assert_eq!(src.len(), 0);
918        frames
919    }
920
921    fn decode_acknowledgements(mut ack: Bytes) -> Vec<u32> {
922        let mut acknowledgements = Vec::new();
923
924        while !ack.is_empty() {
925            assert!(
926                ack.len() >= 6,
927                "ack stream ended with {} trailing bytes",
928                ack.len()
929            );
930            assert_eq!(ack.get_u8(), b'2');
931            assert_eq!(ack.get_u8(), b'A');
932            acknowledgements.push(ack.get_u32());
933        }
934
935        acknowledgements
936    }
937
938    fn decoded_sequence_numbers(decoded: &[(LogstashEventFrame, usize)]) -> Vec<u32> {
939        decoded
940            .iter()
941            .map(|(frame, _)| frame.sequence_number)
942            .collect::<Vec<_>>()
943    }
944
945    fn assert_decoded_sequences(
946        decoded: &[(LogstashEventFrame, usize)],
947        expected_sequences: &[u32],
948    ) {
949        assert_eq!(decoded_sequence_numbers(decoded), expected_sequences);
950    }
951
952    async fn assert_acknowledgements_for_ready_frames(
953        decoded: Vec<(LogstashEventFrame, usize)>,
954        expected_sequences: &[u32],
955        expected_acknowledgements: &[u32],
956    ) {
957        assert_decoded_sequences(&decoded, expected_sequences);
958
959        let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>));
960        let mut ready = ReadyFrames::with_capacity(stream, 16);
961        let (frames, _) = ready.next().await.unwrap().unwrap();
962
963        let ack = LogstashAcker::new(&frames)
964            .build_ack(TcpSourceAck::Ack)
965            .unwrap();
966        let acknowledgements = decode_acknowledgements(ack);
967
968        assert!(ready.next().await.is_none());
969        assert_eq!(acknowledgements, expected_acknowledgements);
970    }
971
972    fn decode_frames_and_assert_sequences(
973        src: BytesMut,
974        expected_sequences: &[u32],
975    ) -> Vec<(LogstashEventFrame, usize)> {
976        let decoded = decode_frames(src);
977        assert_decoded_sequences(&decoded, expected_sequences);
978        decoded
979    }
980
981    fn decode_frames_with_decoder(
982        decoder: &mut LogstashDecoder,
983        mut src: BytesMut,
984    ) -> Vec<(LogstashEventFrame, usize)> {
985        let mut frames = Vec::new();
986
987        while let Some(frame) = decoder.decode(&mut src).unwrap() {
988            frames.push(frame);
989        }
990
991        assert_eq!(src.len(), 0);
992        frames
993    }
994
995    fn decode_frames_with_decoder_and_assert_sequences(
996        decoder: &mut LogstashDecoder,
997        src: BytesMut,
998        expected_sequences: &[u32],
999    ) -> Vec<(LogstashEventFrame, usize)> {
1000        let decoded = decode_frames_with_decoder(decoder, src);
1001        assert_decoded_sequences(&decoded, expected_sequences);
1002        decoded
1003    }
1004
1005    #[test]
1006    fn v1_decoder_does_not_panic() {
1007        let seq = rng().random_range(1..u32::MAX);
1008        let req = encode_req(seq, &[("message", "Hello, World!")]);
1009        for i in 0..req.len() - 1 {
1010            assert!(
1011                decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
1012                    .is_none()
1013            );
1014        }
1015    }
1016
1017    // A malformed frame must be a fatal (non-continuable) decode error: the
1018    // Lumberjack stream can't be resynced, so the connection is closed rather
1019    // than continuing with a desynced decoder (which would emit bogus ACKs).
1020    // This matches upstream logstash-input-beats, which closes the channel on
1021    // any decode exception.
1022
1023    #[test]
1024    fn malformed_json_frame_is_a_fatal_decode_error() {
1025        let mut decoder = LogstashDecoder::new();
1026        let mut src = BytesMut::new();
1027        src.put_u8(b'2');
1028        src.put_u8(b'J');
1029        src.put_u32(1); // sequence number
1030        let bad = b"{ not valid json ";
1031        src.put_u32(bad.len() as u32); // payload size
1032        src.put(&bad[..]);
1033
1034        let err = decoder.decode(&mut src).unwrap_err();
1035        assert!(matches!(err, DecodeError::JsonFrameFailedDecode { .. }));
1036        assert!(
1037            !err.can_continue(),
1038            "a malformed JSON frame must be fatal so the connection closes",
1039        );
1040    }
1041
1042    #[test]
1043    fn malformed_compressed_frame_is_a_fatal_decode_error() {
1044        let mut decoder = LogstashDecoder::new();
1045        let mut src = BytesMut::new();
1046        src.put_u8(b'2');
1047        src.put_u8(b'C');
1048        let garbage = b"this is not a zlib stream";
1049        src.put_u32(garbage.len() as u32); // payload size
1050        src.put(&garbage[..]);
1051
1052        let err = decoder.decode(&mut src).unwrap_err();
1053        assert!(matches!(err, DecodeError::DecompressionFailed { .. }));
1054        assert!(!err.can_continue());
1055    }
1056
1057    #[tokio::test]
1058    async fn malformed_frame_closes_connection_without_ack() {
1059        let (address, _recv) = start_logstash(EventStatus::Delivered).await;
1060
1061        let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1062
1063        // A '2' 'J' frame whose payload is not valid JSON.
1064        let mut req = BytesMut::new();
1065        req.put_u8(b'2');
1066        req.put_u8(b'J');
1067        req.put_u32(1); // sequence number
1068        let bad = b"{ not valid json ";
1069        req.put_u32(bad.len() as u32); // payload size
1070        req.put(&bad[..]);
1071        socket.write_all(&req).await.unwrap();
1072
1073        // The source must close the connection on the decode error and send no
1074        // ACK; the client will reconnect and retransmit.
1075        let mut output = BytesMut::new();
1076        let result = socket.read_buf(&mut output).await;
1077        assert!(
1078            matches!(result, Ok(0)) || result.is_err(),
1079            "expected the connection to close; read returned {result:?} with {output:?}",
1080        );
1081        assert!(
1082            output.is_empty(),
1083            "no ACK should be sent for a malformed frame, got {output:?}",
1084        );
1085    }
1086
1087    #[tokio::test]
1088    async fn distinct_windows_do_not_share_an_ack_domain() {
1089        let mut req = BytesMut::new();
1090        push_window_size(&mut req, 1);
1091        push_req(&mut req, 1, &[("message", "first window")]);
1092        push_window_size(&mut req, 2);
1093        push_req(&mut req, 1, &[("message", "second window first")]);
1094        push_req(&mut req, 2, &[("message", "second window second")]);
1095
1096        let decoded = decode_frames_and_assert_sequences(req, &[1, 1, 2]);
1097        assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await;
1098    }
1099
1100    #[tokio::test]
1101    async fn distinct_windows_with_monotonic_sequences_ack_the_first_window() {
1102        let mut req = BytesMut::new();
1103        push_window_size(&mut req, 2);
1104        push_req(&mut req, 1, &[("message", "first window first")]);
1105        push_req(&mut req, 2, &[("message", "first window second")]);
1106        push_window_size(&mut req, 2);
1107        push_req(&mut req, 3, &[("message", "second window first")]);
1108        push_req(&mut req, 4, &[("message", "second window second")]);
1109
1110        let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]);
1111        assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await;
1112    }
1113
1114    #[tokio::test]
1115    async fn incomplete_final_window_is_acked_to_the_last_received_event() {
1116        let mut req = BytesMut::new();
1117        push_window_size(&mut req, 4);
1118        push_req(&mut req, 1, &[("message", "only event in partial window")]);
1119
1120        let decoded = decode_frames_and_assert_sequences(req, &[1]);
1121        assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await;
1122    }
1123
1124    #[tokio::test]
1125    async fn compressed_frames_preserve_inner_window_boundaries() {
1126        let mut inner = BytesMut::new();
1127        push_window_size(&mut inner, 2);
1128        push_req(&mut inner, 1, &[("message", "compressed first")]);
1129        push_req(&mut inner, 2, &[("message", "compressed second")]);
1130
1131        let mut req = BytesMut::new();
1132        push_compressed(&mut req, &inner);
1133
1134        let decoded = decode_frames_and_assert_sequences(req, &[1, 2]);
1135        assert_acknowledgements_for_ready_frames(decoded, &[1, 2], &[2]).await;
1136    }
1137
1138    #[tokio::test]
1139    async fn single_window_split_across_ready_frames_keeps_progressive_acks() {
1140        let mut req = BytesMut::new();
1141        push_window_size(&mut req, 4);
1142        push_req(&mut req, 1, &[("message", "first")]);
1143        push_req(&mut req, 2, &[("message", "second")]);
1144        push_req(&mut req, 3, &[("message", "third")]);
1145        push_req(&mut req, 4, &[("message", "fourth")]);
1146
1147        let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]);
1148
1149        let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>));
1150        let mut ready = ReadyFrames::with_capacity(stream, 2);
1151        let mut acknowledgements = Vec::new();
1152
1153        while let Some(result) = ready.next().await {
1154            let (frames, _byte_size) = result.unwrap();
1155            let ack = LogstashAcker::new(&frames)
1156                .build_ack(TcpSourceAck::Ack)
1157                .unwrap();
1158            acknowledgements.push(decode_acknowledgements(ack));
1159        }
1160
1161        assert_eq!(acknowledgements, vec![vec![2], vec![4]]);
1162    }
1163
1164    #[tokio::test]
1165    async fn fresh_window_after_acked_partial_tail_is_accepted() {
1166        let mut decoder = LogstashDecoder::new();
1167
1168        let mut first_batch = BytesMut::new();
1169        push_window_size(&mut first_batch, 2);
1170        push_req(&mut first_batch, 1, &[("message", "first partial tail")]);
1171        let decoded =
1172            decode_frames_with_decoder_and_assert_sequences(&mut decoder, first_batch, &[1]);
1173        assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await;
1174
1175        let mut second_batch = BytesMut::new();
1176        push_window_size(&mut second_batch, 1);
1177        push_req(
1178            &mut second_batch,
1179            1,
1180            &[("message", "fresh window after ack")],
1181        );
1182        let decoded =
1183            decode_frames_with_decoder_and_assert_sequences(&mut decoder, second_batch, &[1]);
1184        assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await;
1185    }
1186
1187    async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
1188        let seq = rng().random_range(1..u32::MAX);
1189        let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1190
1191        let req = encode_req(seq, pairs);
1192        socket.write_all(&req).await.unwrap();
1193
1194        let mut output = BytesMut::new();
1195        socket.read_buf(&mut output).await.unwrap();
1196
1197        if sends_ack {
1198            assert_eq!(output.get_u8(), b'2');
1199            assert_eq!(output.get_u8(), b'A');
1200            assert_eq!(output.get_u32(), seq);
1201        }
1202        assert_eq!(output.len(), 0);
1203    }
1204
1205    #[test]
1206    fn output_schema_definition_vector_namespace() {
1207        let config = LogstashConfig {
1208            log_namespace: Some(true),
1209            ..Default::default()
1210        };
1211
1212        let definitions = config
1213            .outputs(LogNamespace::Vector)
1214            .remove(0)
1215            .schema_definition(true);
1216
1217        let expected_definition =
1218            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1219                .with_meaning(OwnedTargetPath::event_root(), "message")
1220                .with_metadata_field(
1221                    &owned_value_path!("vector", "source_type"),
1222                    Kind::bytes(),
1223                    None,
1224                )
1225                .with_metadata_field(
1226                    &owned_value_path!("vector", "ingest_timestamp"),
1227                    Kind::timestamp(),
1228                    None,
1229                )
1230                .with_metadata_field(
1231                    &owned_value_path!(LogstashConfig::NAME, "timestamp"),
1232                    Kind::timestamp().or_undefined(),
1233                    Some("timestamp"),
1234                )
1235                .with_metadata_field(
1236                    &owned_value_path!(LogstashConfig::NAME, "host"),
1237                    Kind::bytes(),
1238                    Some("host"),
1239                )
1240                .with_metadata_field(
1241                    &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
1242                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1243                    None,
1244                );
1245
1246        assert_eq!(definitions, Some(expected_definition))
1247    }
1248
1249    #[test]
1250    fn output_schema_definition_legacy_namespace() {
1251        let config = LogstashConfig::default();
1252
1253        let definitions = config
1254            .outputs(LogNamespace::Legacy)
1255            .remove(0)
1256            .schema_definition(true);
1257
1258        let expected_definition = Definition::new_with_default_metadata(
1259            Kind::object(Collection::empty()),
1260            [LogNamespace::Legacy],
1261        )
1262        .with_event_field(
1263            &owned_value_path!("message"),
1264            Kind::bytes(),
1265            Some("message"),
1266        )
1267        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1268        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1269        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
1270
1271        assert_eq!(definitions, Some(expected_definition))
1272    }
1273}
1274
1275#[cfg(all(test, feature = "logstash-integration-tests"))]
1276mod integration_tests {
1277    use std::time::Duration;
1278
1279    use futures::Stream;
1280    use tokio::time::timeout;
1281
1282    use super::*;
1283    use crate::{
1284        SourceSender,
1285        config::SourceContext,
1286        event::EventStatus,
1287        test_util::{
1288            collect_n,
1289            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
1290            wait_for_tcp,
1291        },
1292        tls::{TlsConfig, TlsEnableableConfig},
1293    };
1294
1295    fn heartbeat_address() -> String {
1296        std::env::var("HEARTBEAT_ADDRESS")
1297            .expect("Address of Beats Heartbeat service must be specified.")
1298    }
1299
1300    #[tokio::test]
1301    async fn beats_heartbeat() {
1302        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1303            let out = source(heartbeat_address(), None).await;
1304
1305            timeout(Duration::from_secs(60), collect_n(out, 1))
1306                .await
1307                .unwrap()
1308        })
1309        .await;
1310
1311        assert!(!events.is_empty());
1312
1313        let log = events[0].as_log();
1314        assert_eq!(
1315            log.get("@metadata.beat"),
1316            Some(String::from("heartbeat").into()).as_ref()
1317        );
1318        assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
1319        assert!(log.get("timestamp").is_some());
1320        assert!(log.get("host").is_some());
1321    }
1322
1323    fn logstash_address() -> String {
1324        std::env::var("LOGSTASH_ADDRESS")
1325            .expect("Listen address for `logstash` source must be specified.")
1326    }
1327
1328    #[tokio::test]
1329    async fn logstash() {
1330        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1331            let out = source(
1332                logstash_address(),
1333                Some(TlsEnableableConfig {
1334                    enabled: Some(true),
1335                    options: TlsConfig {
1336                        crt_file: Some(
1337                            "tests/integration/shared/data/host.docker.internal.crt".into(),
1338                        ),
1339                        key_file: Some(
1340                            "tests/integration/shared/data/host.docker.internal.key".into(),
1341                        ),
1342                        ..Default::default()
1343                    },
1344                }),
1345            )
1346            .await;
1347
1348            timeout(Duration::from_secs(60), collect_n(out, 1))
1349                .await
1350                .unwrap()
1351        })
1352        .await;
1353
1354        assert!(!events.is_empty());
1355
1356        let log = events[0].as_log();
1357        assert!(
1358            log.get("line")
1359                .unwrap()
1360                .to_string_lossy()
1361                .contains("Hello World")
1362        );
1363        assert!(log.get("host").is_some());
1364    }
1365
1366    async fn source(
1367        address: String,
1368        tls: Option<TlsEnableableConfig>,
1369    ) -> impl Stream<Item = Event> + Unpin {
1370        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1371        let address: SocketAddr = address.parse().unwrap();
1372        let tls_config = TlsSourceConfig {
1373            client_metadata_key: None,
1374            tls_config: tls.unwrap_or_default(),
1375        };
1376        tokio::spawn(async move {
1377            LogstashConfig {
1378                address: address.into(),
1379                tls: Some(tls_config),
1380                keepalive: None,
1381                permit_origin: None,
1382                receive_buffer_bytes: None,
1383                acknowledgements: false.into(),
1384                connection_limit: None,
1385                log_namespace: None,
1386            }
1387            .build(SourceContext::new_test(sender, None))
1388            .await
1389            .unwrap()
1390            .await
1391            .unwrap()
1392        });
1393        wait_for_tcp(address).await;
1394        recv
1395    }
1396}