1use std::{
2 collections::{BTreeMap, VecDeque},
3 convert::TryFrom,
4 io::{self, Read},
5 net::SocketAddr,
6 num::NonZeroUsize,
7 time::Duration,
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use flate2::read::ZlibDecoder;
12use smallvec::{SmallVec, smallvec};
13use snafu::{ResultExt, Snafu};
14use tokio_util::codec::Decoder;
15use vector_lib::{
16 codecs::{BytesDeserializerConfig, StreamDecodingError},
17 config::{LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 ipallowlist::IpAllowlistConfig,
20 lookup::{OwnedValuePath, event_path, metadata_path, owned_value_path, path},
21 schema::Definition,
22};
23use vrl::value::{KeyString, Kind, kind::Collection};
24
25use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
26use crate::{
27 config::{
28 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
29 SourceContext, SourceOutput, log_schema,
30 },
31 event::{Event, LogEvent, Value},
32 serde::bool_or_struct,
33 tcp::TcpKeepaliveConfig,
34 tls::{MaybeTlsSettings, TlsSourceConfig},
35 types,
36};
37
38#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
40#[derive(Clone, Debug)]
41pub struct LogstashConfig {
42 #[configurable(derived)]
43 address: SocketListenAddr,
44
45 #[configurable(derived)]
46 #[configurable(metadata(docs::advanced))]
47 keepalive: Option<TcpKeepaliveConfig>,
48
49 #[configurable(derived)]
50 pub permit_origin: Option<IpAllowlistConfig>,
51
52 #[configurable(derived)]
53 tls: Option<TlsSourceConfig>,
54
55 #[configurable(metadata(docs::type_unit = "bytes"))]
57 #[configurable(metadata(docs::examples = 65536))]
58 #[configurable(metadata(docs::advanced))]
59 receive_buffer_bytes: Option<usize>,
60
61 #[configurable(metadata(docs::type_unit = "connections"))]
63 #[configurable(metadata(docs::advanced))]
64 connection_limit: Option<u32>,
65
66 #[configurable(derived)]
67 #[serde(default, deserialize_with = "bool_or_struct")]
68 acknowledgements: SourceAcknowledgementsConfig,
69
70 #[configurable(metadata(docs::hidden))]
72 #[serde(default)]
73 log_namespace: Option<bool>,
74}
75
76impl LogstashConfig {
77 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
79 let host_key = log_schema()
81 .host_key()
82 .cloned()
83 .map(LegacyKey::InsertIfEmpty);
84
85 let tls_client_metadata_path = self
86 .tls
87 .as_ref()
88 .and_then(|tls| tls.client_metadata_key.as_ref())
89 .and_then(|k| k.path.clone())
90 .map(LegacyKey::Overwrite);
91
92 BytesDeserializerConfig
93 .schema_definition(log_namespace)
94 .with_standard_vector_source_metadata()
95 .with_source_metadata(
96 LogstashConfig::NAME,
97 None,
98 &owned_value_path!("timestamp"),
99 Kind::timestamp().or_undefined(),
100 Some("timestamp"),
101 )
102 .with_source_metadata(
103 LogstashConfig::NAME,
104 host_key,
105 &owned_value_path!("host"),
106 Kind::bytes(),
107 Some("host"),
108 )
109 .with_source_metadata(
110 Self::NAME,
111 tls_client_metadata_path,
112 &owned_value_path!("tls_client_metadata"),
113 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
114 None,
115 )
116 }
117}
118
119impl Default for LogstashConfig {
120 fn default() -> Self {
121 Self {
122 address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
123 keepalive: None,
124 permit_origin: None,
125 tls: None,
126 receive_buffer_bytes: None,
127 acknowledgements: Default::default(),
128 connection_limit: None,
129 log_namespace: None,
130 }
131 }
132}
133
134impl GenerateConfig for LogstashConfig {
135 fn generate_config() -> toml::Value {
136 toml::Value::try_from(LogstashConfig::default()).unwrap()
137 }
138}
139
140#[async_trait::async_trait]
141#[typetag::serde(name = "logstash")]
142impl SourceConfig for LogstashConfig {
143 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
144 let log_namespace = cx.log_namespace(self.log_namespace);
145 let source = LogstashSource {
146 timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
147 legacy_host_key_path: log_schema().host_key().cloned(),
148 log_namespace,
149 };
150 let shutdown_secs = Duration::from_secs(30);
151 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
152 let tls_client_metadata_key = self
153 .tls
154 .as_ref()
155 .and_then(|tls| tls.client_metadata_key.clone())
156 .and_then(|k| k.path);
157
158 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
159 source.run(
160 self.address,
161 self.keepalive,
162 shutdown_secs,
163 tls,
164 tls_client_metadata_key,
165 self.receive_buffer_bytes,
166 None,
167 cx,
168 self.acknowledgements,
169 self.connection_limit,
170 self.permit_origin.clone().map(Into::into),
171 LogstashConfig::NAME,
172 log_namespace,
173 )
174 }
175
176 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177 vec![SourceOutput::new_maybe_logs(
180 DataType::Log,
181 self.schema_definition(global_log_namespace.merge(self.log_namespace)),
182 )]
183 }
184
185 fn resources(&self) -> Vec<Resource> {
186 vec![self.address.as_tcp_resource()]
187 }
188
189 fn can_acknowledge(&self) -> bool {
190 true
191 }
192}
193
194#[derive(Debug, Clone)]
195struct LogstashSource {
196 timestamp_converter: types::Conversion,
197 log_namespace: LogNamespace,
198 legacy_host_key_path: Option<OwnedValuePath>,
199}
200
201impl TcpSource for LogstashSource {
202 type Error = DecodeError;
203 type Item = LogstashEventFrame;
204 type Decoder = LogstashDecoder;
205 type Acker = LogstashAcker;
206
207 fn decoder(&self) -> Self::Decoder {
208 LogstashDecoder::new()
209 }
210
211 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
212 let now = chrono::Utc::now();
213 for event in events {
214 let log = event.as_mut_log();
215
216 self.log_namespace.insert_vector_metadata(
217 log,
218 log_schema().source_type_key(),
219 path!("source_type"),
220 Bytes::from_static(LogstashConfig::NAME.as_bytes()),
221 );
222
223 let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
224 self.timestamp_converter
225 .convert::<Value>(timestamp.coerce_to_bytes())
226 .ok()
227 });
228
229 match self.log_namespace {
234 LogNamespace::Vector => {
235 if let Some(timestamp) = log_timestamp {
236 log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
237 }
238 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
239 }
240 LogNamespace::Legacy => {
241 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
242 log.insert(
243 timestamp_key,
244 log_timestamp.unwrap_or_else(|| Value::from(now)),
245 );
246 }
247 }
248 }
249
250 self.log_namespace.insert_source_metadata(
251 LogstashConfig::NAME,
252 log,
253 self.legacy_host_key_path
254 .as_ref()
255 .map(LegacyKey::InsertIfEmpty),
256 path!("host"),
257 host.ip().to_string(),
258 );
259 }
260 }
261
262 fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
263 LogstashAcker::new(frames)
264 }
265}
266
267struct LogstashAcker {
268 acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>,
281}
282
283impl LogstashAcker {
284 fn new(frames: &[LogstashEventFrame]) -> Self {
285 let acknowledgements = frames
286 .iter()
287 .enumerate()
288 .filter(|(index, frame)| frame.window_end || index + 1 == frames.len())
291 .map(|(_, frame)| (frame.protocol, frame.sequence_number))
292 .collect();
293
294 Self { acknowledgements }
295 }
296}
297
298impl TcpSourceAcker for LogstashAcker {
299 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
301 match ack {
302 TcpSourceAck::Ack if !self.acknowledgements.is_empty() => {
303 let mut bytes: Vec<u8> = Vec::with_capacity(self.acknowledgements.len() * 6);
304 for (protocol_version, sequence_number) in self.acknowledgements {
305 bytes.push(protocol_version.into());
306 bytes.push(LogstashFrameType::Ack.into());
307 bytes.extend(sequence_number.to_be_bytes().iter());
308 }
309 Some(Bytes::from(bytes))
310 }
311 _ => None,
312 }
313 }
314}
315
316#[derive(Debug)]
317enum LogstashDecoderReadState {
318 ReadProtocol,
319 ReadType(LogstashProtocolVersion),
320 ReadFrame(LogstashProtocolVersion, LogstashFrameType),
321 PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
322}
323
324#[derive(Debug)]
325struct LogstashDecoder {
326 state: LogstashDecoderReadState,
327 window_events_remaining: Option<NonZeroUsize>,
331}
332
333impl LogstashDecoder {
334 const fn new() -> Self {
335 Self::new_with_window_events_remaining(None)
336 }
337
338 const fn new_with_window_events_remaining(
339 window_events_remaining: Option<NonZeroUsize>,
340 ) -> Self {
341 Self {
342 state: LogstashDecoderReadState::ReadProtocol,
343 window_events_remaining,
344 }
345 }
346
347 const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) {
358 match self.window_events_remaining {
359 Some(remaining) if remaining.get() == 1 => {
360 frame.window_end = true;
361 self.window_events_remaining = None;
362 }
363 Some(remaining) => {
364 frame.window_end = false;
365 self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); }
367 None => {
368 frame.window_end = true;
371 }
372 }
373 }
374}
375
376#[derive(Debug, Snafu)]
377pub enum DecodeError {
378 #[snafu(display("i/o error: {}", source))]
379 IO { source: io::Error },
380 #[snafu(display("Unknown logstash protocol version: {}", version))]
381 UnknownProtocolVersion { version: char },
382 #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
383 UnknownFrameType { frame_type: char },
384 #[snafu(display("Failed to decode JSON frame: {}", source))]
385 JsonFrameFailedDecode { source: serde_json::Error },
386 #[snafu(display("Failed to decompress compressed frame: {}", source))]
387 DecompressionFailed { source: io::Error },
388}
389
390impl StreamDecodingError for DecodeError {
391 fn can_continue(&self) -> bool {
392 false
398 }
399}
400
401impl From<io::Error> for DecodeError {
402 fn from(source: io::Error) -> Self {
403 DecodeError::IO { source }
404 }
405}
406
407#[derive(Debug, Clone, Copy)]
408enum LogstashProtocolVersion {
409 V1, V2, }
412
413impl From<LogstashProtocolVersion> for u8 {
414 fn from(frame_type: LogstashProtocolVersion) -> u8 {
415 use LogstashProtocolVersion::*;
416
417 match frame_type {
418 V1 => b'1',
419 V2 => b'2',
420 }
421 }
422}
423
424impl TryFrom<u8> for LogstashProtocolVersion {
425 type Error = DecodeError;
426
427 fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
428 use LogstashProtocolVersion::*;
429
430 match frame_type {
431 b'1' => Ok(V1),
432 b'2' => Ok(V2),
433 version => Err(DecodeError::UnknownProtocolVersion {
434 version: version as char,
435 }),
436 }
437 }
438}
439
440#[derive(Debug, Clone, Copy)]
441enum LogstashFrameType {
442 Ack, WindowSize, Data, Json, Compressed, }
448
449impl From<LogstashFrameType> for u8 {
450 fn from(frame_type: LogstashFrameType) -> u8 {
451 use LogstashFrameType::*;
452
453 match frame_type {
454 Ack => b'A',
455 WindowSize => b'W',
456 Data => b'D',
457 Json => b'J',
458 Compressed => b'C',
459 }
460 }
461}
462
463impl TryFrom<u8> for LogstashFrameType {
464 type Error = DecodeError;
465
466 fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
467 use LogstashFrameType::*;
468
469 match frame_type {
470 b'A' => Ok(Ack),
471 b'W' => Ok(WindowSize),
472 b'D' => Ok(Data),
473 b'J' => Ok(Json),
474 b'C' => Ok(Compressed),
475 frame_type => Err(DecodeError::UnknownFrameType {
476 frame_type: frame_type as char,
477 }),
478 }
479 }
480}
481
482#[derive(Debug)]
484struct LogstashEventFrame {
485 protocol: LogstashProtocolVersion,
486 sequence_number: u32,
487 fields: BTreeMap<KeyString, serde_json::Value>,
488 window_end: bool,
489}
490
491struct DecodedCompressedFrames {
492 frames: VecDeque<(LogstashEventFrame, usize)>,
493 window_events_remaining: Option<NonZeroUsize>,
494}
495
496impl Decoder for LogstashDecoder {
499 type Item = (LogstashEventFrame, usize);
500 type Error = DecodeError;
501
502 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
503 loop {
510 self.state = match self.state {
511 LogstashDecoderReadState::PendingFrames(ref mut frames) => {
513 match frames.pop_front() {
514 Some(frame) => return Ok(Some(frame)),
515 None => LogstashDecoderReadState::ReadProtocol,
516 }
517 }
518 LogstashDecoderReadState::ReadProtocol => {
519 if src.remaining() < 1 {
520 return Ok(None);
521 }
522
523 use LogstashProtocolVersion::*;
524
525 match LogstashProtocolVersion::try_from(src.get_u8())? {
526 V1 => LogstashDecoderReadState::ReadType(V1),
527 V2 => LogstashDecoderReadState::ReadType(V2),
528 }
529 }
530 LogstashDecoderReadState::ReadType(protocol) => {
531 if src.remaining() < 1 {
532 return Ok(None);
533 }
534
535 use LogstashFrameType::*;
536
537 match LogstashFrameType::try_from(src.get_u8())? {
538 WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
539 Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
540 Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
541 Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
542 Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
543 }
544 }
545 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
558 if src.remaining() < 4 {
559 return Ok(None);
560 }
561
562 let window_size = src.get_u32() as usize;
563 self.window_events_remaining = NonZeroUsize::new(window_size);
564
565 LogstashDecoderReadState::ReadProtocol
566 }
567 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
571 if src.remaining() < 4 {
572 return Ok(None);
573 }
574
575 let _sequence_number = src.get_u32();
576
577 LogstashDecoderReadState::ReadProtocol
578 }
579 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
581 let Some((mut frame, byte_size)) = decode_data_frame(protocol, src) else {
582 return Ok(None);
583 };
584 self.annotate_frame(&mut frame);
585
586 LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into())
587 }
588 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
590 let Some((mut frame, byte_size)) = decode_json_frame(protocol, src)? else {
591 return Ok(None);
592 };
593 self.annotate_frame(&mut frame);
594
595 LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into())
596 }
597 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
605 let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)?
606 else {
607 return Ok(None);
608 };
609 self.window_events_remaining = decoded.window_events_remaining;
610
611 LogstashDecoderReadState::PendingFrames(decoded.frames)
612 }
613 };
614 }
615 }
616}
617
618fn decode_data_frame(
620 protocol: LogstashProtocolVersion,
621 src: &mut BytesMut,
622) -> Option<(LogstashEventFrame, usize)> {
623 let mut rest = src.as_ref();
624
625 if rest.remaining() < 8 {
626 return None;
627 }
628 let sequence_number = rest.get_u32();
629 let pair_count = rest.get_u32();
630 if pair_count == 0 {
631 return None; }
633
634 let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
635 for _ in 0..pair_count {
636 let (key, value, right) = decode_pair(rest)?;
637 rest = right;
638
639 fields.insert(
640 String::from_utf8_lossy(key).into(),
641 String::from_utf8_lossy(value).into(),
642 );
643 }
644
645 let byte_size = bytes_remaining(src, rest);
646 src.advance(byte_size);
647
648 Some((
649 LogstashEventFrame {
650 protocol,
651 sequence_number,
652 fields,
653 window_end: false,
654 },
655 byte_size,
656 ))
657}
658
659fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
660 if rest.remaining() < 4 {
661 return None;
662 }
663 let key_length = rest.get_u32() as usize;
664
665 if rest.remaining() < key_length {
666 return None;
667 }
668 let (key, right) = rest.split_at(key_length);
669 rest = right;
670
671 if rest.remaining() < 4 {
672 return None;
673 }
674 let value_length = rest.get_u32() as usize;
675 if rest.remaining() < value_length {
676 return None;
677 }
678 let (value, right) = rest.split_at(value_length);
679 Some((key, value, right))
680}
681
682fn decode_json_frame(
683 protocol: LogstashProtocolVersion,
684 src: &mut BytesMut,
685) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
686 let mut rest = src.as_ref();
687
688 if rest.remaining() < 8 {
689 return Ok(None);
690 }
691 let sequence_number = rest.get_u32();
692 let payload_size = rest.get_u32() as usize;
693
694 if rest.remaining() < payload_size {
695 return Ok(None);
696 }
697
698 let (slice, right) = rest.split_at(payload_size);
699 rest = right;
700
701 let fields: BTreeMap<KeyString, serde_json::Value> =
702 serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
703
704 let byte_size = bytes_remaining(src, rest);
705 src.advance(byte_size);
706
707 Ok(Some((
708 LogstashEventFrame {
709 protocol,
710 sequence_number,
711 fields,
712 window_end: false,
713 },
714 byte_size,
715 )))
716}
717
718fn decode_compressed_frame(
719 src: &mut BytesMut,
720 window_events_remaining: Option<NonZeroUsize>,
721) -> Result<Option<DecodedCompressedFrames>, DecodeError> {
722 let mut rest = src.as_ref();
723
724 if rest.remaining() < 4 {
725 return Ok(None);
726 }
727 let payload_size = rest.get_u32() as usize;
728
729 if rest.remaining() < payload_size {
730 src.reserve(payload_size);
731 return Ok(None);
732 }
733
734 let (slice, right) = rest.split_at(payload_size);
735 rest = right;
736
737 let mut buf = Vec::new();
738
739 let res = ZlibDecoder::new(io::Cursor::new(slice))
740 .read_to_end(&mut buf)
741 .context(DecompressionFailedSnafu)
742 .map(|_| BytesMut::from(&buf[..]));
743
744 let byte_size = bytes_remaining(src, rest);
745 src.advance(byte_size);
746
747 let mut buf = res?;
748
749 let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining);
750
751 let mut frames = VecDeque::new();
752
753 while let Some(s) = decoder.decode(&mut buf)? {
754 frames.push_back(s);
755 }
756 Ok(Some(DecodedCompressedFrames {
757 frames,
758 window_events_remaining: decoder.window_events_remaining,
759 }))
760}
761
762fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
763 let remaining = rest.remaining();
764 src.remaining() - remaining
765}
766
767impl From<LogstashEventFrame> for Event {
768 fn from(frame: LogstashEventFrame) -> Self {
769 Event::Log(LogEvent::from(
770 frame
771 .fields
772 .into_iter()
773 .map(|(key, value)| (key, Value::from(value)))
774 .collect::<BTreeMap<_, _>>(),
775 ))
776 }
777}
778
779impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
780 fn from(frame: LogstashEventFrame) -> Self {
781 smallvec![frame.into()]
782 }
783}
784
785#[cfg(test)]
786mod test {
787 use std::io::Write;
788
789 use bytes::BufMut;
790 use flate2::{Compression, write::ZlibEncoder};
791 use futures::{Stream, StreamExt, stream};
792 use rand::{Rng, rng};
793 use tokio::io::{AsyncReadExt, AsyncWriteExt};
794 use vector_lib::codecs::ReadyFrames;
795 use vector_lib::lookup::OwnedTargetPath;
796 use vrl::value::kind::Collection;
797
798 use super::*;
799 use crate::{
800 SourceSender,
801 event::EventStatus,
802 test_util::{
803 addr::next_addr,
804 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
805 spawn_collect_n, wait_for_tcp,
806 },
807 };
808
809 #[test]
810 fn generate_config() {
811 crate::test_util::test_generate_config::<LogstashConfig>();
812 }
813
814 #[tokio::test]
815 async fn test_delivered() {
816 test_protocol(EventStatus::Delivered, true).await;
817 }
818
819 #[tokio::test]
820 async fn test_failed() {
821 test_protocol(EventStatus::Rejected, false).await;
822 }
823
824 async fn start_logstash(
825 status: EventStatus,
826 ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
827 let (sender, recv) = SourceSender::new_test_finalize(status);
828 let (_guard, address) = next_addr();
829 let source = LogstashConfig {
830 address: address.into(),
831 tls: None,
832 permit_origin: None,
833 keepalive: None,
834 receive_buffer_bytes: None,
835 acknowledgements: true.into(),
836 connection_limit: None,
837 log_namespace: None,
838 }
839 .build(SourceContext::new_test(sender, None))
840 .await
841 .unwrap();
842 tokio::spawn(source);
843 wait_for_tcp(address).await;
844 (address, recv)
845 }
846
847 async fn test_protocol(status: EventStatus, sends_ack: bool) {
848 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
849 let (address, recv) = start_logstash(status).await;
850 spawn_collect_n(
851 send_req(address, &[("message", "Hello, world!")], sends_ack),
852 recv,
853 1,
854 )
855 .await
856 })
857 .await;
858
859 assert_eq!(events.len(), 1);
860 let log = events[0].as_log();
861 assert_eq!(
862 log.get("message").unwrap().to_string_lossy(),
863 "Hello, world!".to_string()
864 );
865 assert_eq!(
866 log.get("source_type").unwrap().to_string_lossy(),
867 "logstash".to_string()
868 );
869 assert!(log.get("host").is_some());
870 assert!(log.get("timestamp").is_some());
871 }
872
873 fn push_req(req: &mut BytesMut, seq: u32, pairs: &[(&str, &str)]) {
874 req.put_u8(b'2');
875 req.put_u8(b'D');
876 req.put_u32(seq);
877 req.put_u32(pairs.len() as u32);
878 for (key, value) in pairs {
879 req.put_u32(key.len() as u32);
880 req.put(key.as_bytes());
881 req.put_u32(value.len() as u32);
882 req.put(value.as_bytes());
883 }
884 }
885
886 fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
887 let mut req = BytesMut::new();
888 push_req(&mut req, seq, pairs);
889 req.into()
890 }
891
892 fn push_window_size(req: &mut BytesMut, size: u32) {
893 req.put_u8(b'2');
894 req.put_u8(b'W');
895 req.put_u32(size);
896 }
897
898 fn push_compressed(req: &mut BytesMut, inner: &[u8]) {
899 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
900 encoder.write_all(inner).unwrap();
901 let compressed = encoder.finish().unwrap();
902
903 req.put_u8(b'2');
904 req.put_u8(b'C');
905 req.put_u32(compressed.len() as u32);
906 req.put(compressed.as_slice());
907 }
908
909 fn decode_frames(mut src: BytesMut) -> Vec<(LogstashEventFrame, usize)> {
910 let mut decoder = LogstashDecoder::new();
911 let mut frames = Vec::new();
912
913 while let Some(frame) = decoder.decode(&mut src).unwrap() {
914 frames.push(frame);
915 }
916
917 assert_eq!(src.len(), 0);
918 frames
919 }
920
921 fn decode_acknowledgements(mut ack: Bytes) -> Vec<u32> {
922 let mut acknowledgements = Vec::new();
923
924 while !ack.is_empty() {
925 assert!(
926 ack.len() >= 6,
927 "ack stream ended with {} trailing bytes",
928 ack.len()
929 );
930 assert_eq!(ack.get_u8(), b'2');
931 assert_eq!(ack.get_u8(), b'A');
932 acknowledgements.push(ack.get_u32());
933 }
934
935 acknowledgements
936 }
937
938 fn decoded_sequence_numbers(decoded: &[(LogstashEventFrame, usize)]) -> Vec<u32> {
939 decoded
940 .iter()
941 .map(|(frame, _)| frame.sequence_number)
942 .collect::<Vec<_>>()
943 }
944
945 fn assert_decoded_sequences(
946 decoded: &[(LogstashEventFrame, usize)],
947 expected_sequences: &[u32],
948 ) {
949 assert_eq!(decoded_sequence_numbers(decoded), expected_sequences);
950 }
951
952 async fn assert_acknowledgements_for_ready_frames(
953 decoded: Vec<(LogstashEventFrame, usize)>,
954 expected_sequences: &[u32],
955 expected_acknowledgements: &[u32],
956 ) {
957 assert_decoded_sequences(&decoded, expected_sequences);
958
959 let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>));
960 let mut ready = ReadyFrames::with_capacity(stream, 16);
961 let (frames, _) = ready.next().await.unwrap().unwrap();
962
963 let ack = LogstashAcker::new(&frames)
964 .build_ack(TcpSourceAck::Ack)
965 .unwrap();
966 let acknowledgements = decode_acknowledgements(ack);
967
968 assert!(ready.next().await.is_none());
969 assert_eq!(acknowledgements, expected_acknowledgements);
970 }
971
972 fn decode_frames_and_assert_sequences(
973 src: BytesMut,
974 expected_sequences: &[u32],
975 ) -> Vec<(LogstashEventFrame, usize)> {
976 let decoded = decode_frames(src);
977 assert_decoded_sequences(&decoded, expected_sequences);
978 decoded
979 }
980
981 fn decode_frames_with_decoder(
982 decoder: &mut LogstashDecoder,
983 mut src: BytesMut,
984 ) -> Vec<(LogstashEventFrame, usize)> {
985 let mut frames = Vec::new();
986
987 while let Some(frame) = decoder.decode(&mut src).unwrap() {
988 frames.push(frame);
989 }
990
991 assert_eq!(src.len(), 0);
992 frames
993 }
994
995 fn decode_frames_with_decoder_and_assert_sequences(
996 decoder: &mut LogstashDecoder,
997 src: BytesMut,
998 expected_sequences: &[u32],
999 ) -> Vec<(LogstashEventFrame, usize)> {
1000 let decoded = decode_frames_with_decoder(decoder, src);
1001 assert_decoded_sequences(&decoded, expected_sequences);
1002 decoded
1003 }
1004
1005 #[test]
1006 fn v1_decoder_does_not_panic() {
1007 let seq = rng().random_range(1..u32::MAX);
1008 let req = encode_req(seq, &[("message", "Hello, World!")]);
1009 for i in 0..req.len() - 1 {
1010 assert!(
1011 decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
1012 .is_none()
1013 );
1014 }
1015 }
1016
1017 #[test]
1024 fn malformed_json_frame_is_a_fatal_decode_error() {
1025 let mut decoder = LogstashDecoder::new();
1026 let mut src = BytesMut::new();
1027 src.put_u8(b'2');
1028 src.put_u8(b'J');
1029 src.put_u32(1); let bad = b"{ not valid json ";
1031 src.put_u32(bad.len() as u32); src.put(&bad[..]);
1033
1034 let err = decoder.decode(&mut src).unwrap_err();
1035 assert!(matches!(err, DecodeError::JsonFrameFailedDecode { .. }));
1036 assert!(
1037 !err.can_continue(),
1038 "a malformed JSON frame must be fatal so the connection closes",
1039 );
1040 }
1041
1042 #[test]
1043 fn malformed_compressed_frame_is_a_fatal_decode_error() {
1044 let mut decoder = LogstashDecoder::new();
1045 let mut src = BytesMut::new();
1046 src.put_u8(b'2');
1047 src.put_u8(b'C');
1048 let garbage = b"this is not a zlib stream";
1049 src.put_u32(garbage.len() as u32); src.put(&garbage[..]);
1051
1052 let err = decoder.decode(&mut src).unwrap_err();
1053 assert!(matches!(err, DecodeError::DecompressionFailed { .. }));
1054 assert!(!err.can_continue());
1055 }
1056
1057 #[tokio::test]
1058 async fn malformed_frame_closes_connection_without_ack() {
1059 let (address, _recv) = start_logstash(EventStatus::Delivered).await;
1060
1061 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1062
1063 let mut req = BytesMut::new();
1065 req.put_u8(b'2');
1066 req.put_u8(b'J');
1067 req.put_u32(1); let bad = b"{ not valid json ";
1069 req.put_u32(bad.len() as u32); req.put(&bad[..]);
1071 socket.write_all(&req).await.unwrap();
1072
1073 let mut output = BytesMut::new();
1076 let result = socket.read_buf(&mut output).await;
1077 assert!(
1078 matches!(result, Ok(0)) || result.is_err(),
1079 "expected the connection to close; read returned {result:?} with {output:?}",
1080 );
1081 assert!(
1082 output.is_empty(),
1083 "no ACK should be sent for a malformed frame, got {output:?}",
1084 );
1085 }
1086
1087 #[tokio::test]
1088 async fn distinct_windows_do_not_share_an_ack_domain() {
1089 let mut req = BytesMut::new();
1090 push_window_size(&mut req, 1);
1091 push_req(&mut req, 1, &[("message", "first window")]);
1092 push_window_size(&mut req, 2);
1093 push_req(&mut req, 1, &[("message", "second window first")]);
1094 push_req(&mut req, 2, &[("message", "second window second")]);
1095
1096 let decoded = decode_frames_and_assert_sequences(req, &[1, 1, 2]);
1097 assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await;
1098 }
1099
1100 #[tokio::test]
1101 async fn distinct_windows_with_monotonic_sequences_ack_the_first_window() {
1102 let mut req = BytesMut::new();
1103 push_window_size(&mut req, 2);
1104 push_req(&mut req, 1, &[("message", "first window first")]);
1105 push_req(&mut req, 2, &[("message", "first window second")]);
1106 push_window_size(&mut req, 2);
1107 push_req(&mut req, 3, &[("message", "second window first")]);
1108 push_req(&mut req, 4, &[("message", "second window second")]);
1109
1110 let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]);
1111 assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await;
1112 }
1113
1114 #[tokio::test]
1115 async fn incomplete_final_window_is_acked_to_the_last_received_event() {
1116 let mut req = BytesMut::new();
1117 push_window_size(&mut req, 4);
1118 push_req(&mut req, 1, &[("message", "only event in partial window")]);
1119
1120 let decoded = decode_frames_and_assert_sequences(req, &[1]);
1121 assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await;
1122 }
1123
1124 #[tokio::test]
1125 async fn compressed_frames_preserve_inner_window_boundaries() {
1126 let mut inner = BytesMut::new();
1127 push_window_size(&mut inner, 2);
1128 push_req(&mut inner, 1, &[("message", "compressed first")]);
1129 push_req(&mut inner, 2, &[("message", "compressed second")]);
1130
1131 let mut req = BytesMut::new();
1132 push_compressed(&mut req, &inner);
1133
1134 let decoded = decode_frames_and_assert_sequences(req, &[1, 2]);
1135 assert_acknowledgements_for_ready_frames(decoded, &[1, 2], &[2]).await;
1136 }
1137
1138 #[tokio::test]
1139 async fn single_window_split_across_ready_frames_keeps_progressive_acks() {
1140 let mut req = BytesMut::new();
1141 push_window_size(&mut req, 4);
1142 push_req(&mut req, 1, &[("message", "first")]);
1143 push_req(&mut req, 2, &[("message", "second")]);
1144 push_req(&mut req, 3, &[("message", "third")]);
1145 push_req(&mut req, 4, &[("message", "fourth")]);
1146
1147 let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]);
1148
1149 let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>));
1150 let mut ready = ReadyFrames::with_capacity(stream, 2);
1151 let mut acknowledgements = Vec::new();
1152
1153 while let Some(result) = ready.next().await {
1154 let (frames, _byte_size) = result.unwrap();
1155 let ack = LogstashAcker::new(&frames)
1156 .build_ack(TcpSourceAck::Ack)
1157 .unwrap();
1158 acknowledgements.push(decode_acknowledgements(ack));
1159 }
1160
1161 assert_eq!(acknowledgements, vec![vec![2], vec![4]]);
1162 }
1163
1164 #[tokio::test]
1165 async fn fresh_window_after_acked_partial_tail_is_accepted() {
1166 let mut decoder = LogstashDecoder::new();
1167
1168 let mut first_batch = BytesMut::new();
1169 push_window_size(&mut first_batch, 2);
1170 push_req(&mut first_batch, 1, &[("message", "first partial tail")]);
1171 let decoded =
1172 decode_frames_with_decoder_and_assert_sequences(&mut decoder, first_batch, &[1]);
1173 assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await;
1174
1175 let mut second_batch = BytesMut::new();
1176 push_window_size(&mut second_batch, 1);
1177 push_req(
1178 &mut second_batch,
1179 1,
1180 &[("message", "fresh window after ack")],
1181 );
1182 let decoded =
1183 decode_frames_with_decoder_and_assert_sequences(&mut decoder, second_batch, &[1]);
1184 assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await;
1185 }
1186
1187 async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
1188 let seq = rng().random_range(1..u32::MAX);
1189 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1190
1191 let req = encode_req(seq, pairs);
1192 socket.write_all(&req).await.unwrap();
1193
1194 let mut output = BytesMut::new();
1195 socket.read_buf(&mut output).await.unwrap();
1196
1197 if sends_ack {
1198 assert_eq!(output.get_u8(), b'2');
1199 assert_eq!(output.get_u8(), b'A');
1200 assert_eq!(output.get_u32(), seq);
1201 }
1202 assert_eq!(output.len(), 0);
1203 }
1204
1205 #[test]
1206 fn output_schema_definition_vector_namespace() {
1207 let config = LogstashConfig {
1208 log_namespace: Some(true),
1209 ..Default::default()
1210 };
1211
1212 let definitions = config
1213 .outputs(LogNamespace::Vector)
1214 .remove(0)
1215 .schema_definition(true);
1216
1217 let expected_definition =
1218 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1219 .with_meaning(OwnedTargetPath::event_root(), "message")
1220 .with_metadata_field(
1221 &owned_value_path!("vector", "source_type"),
1222 Kind::bytes(),
1223 None,
1224 )
1225 .with_metadata_field(
1226 &owned_value_path!("vector", "ingest_timestamp"),
1227 Kind::timestamp(),
1228 None,
1229 )
1230 .with_metadata_field(
1231 &owned_value_path!(LogstashConfig::NAME, "timestamp"),
1232 Kind::timestamp().or_undefined(),
1233 Some("timestamp"),
1234 )
1235 .with_metadata_field(
1236 &owned_value_path!(LogstashConfig::NAME, "host"),
1237 Kind::bytes(),
1238 Some("host"),
1239 )
1240 .with_metadata_field(
1241 &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
1242 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1243 None,
1244 );
1245
1246 assert_eq!(definitions, Some(expected_definition))
1247 }
1248
1249 #[test]
1250 fn output_schema_definition_legacy_namespace() {
1251 let config = LogstashConfig::default();
1252
1253 let definitions = config
1254 .outputs(LogNamespace::Legacy)
1255 .remove(0)
1256 .schema_definition(true);
1257
1258 let expected_definition = Definition::new_with_default_metadata(
1259 Kind::object(Collection::empty()),
1260 [LogNamespace::Legacy],
1261 )
1262 .with_event_field(
1263 &owned_value_path!("message"),
1264 Kind::bytes(),
1265 Some("message"),
1266 )
1267 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1268 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1269 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
1270
1271 assert_eq!(definitions, Some(expected_definition))
1272 }
1273}
1274
1275#[cfg(all(test, feature = "logstash-integration-tests"))]
1276mod integration_tests {
1277 use std::time::Duration;
1278
1279 use futures::Stream;
1280 use tokio::time::timeout;
1281
1282 use super::*;
1283 use crate::{
1284 SourceSender,
1285 config::SourceContext,
1286 event::EventStatus,
1287 test_util::{
1288 collect_n,
1289 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
1290 wait_for_tcp,
1291 },
1292 tls::{TlsConfig, TlsEnableableConfig},
1293 };
1294
1295 fn heartbeat_address() -> String {
1296 std::env::var("HEARTBEAT_ADDRESS")
1297 .expect("Address of Beats Heartbeat service must be specified.")
1298 }
1299
1300 #[tokio::test]
1301 async fn beats_heartbeat() {
1302 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1303 let out = source(heartbeat_address(), None).await;
1304
1305 timeout(Duration::from_secs(60), collect_n(out, 1))
1306 .await
1307 .unwrap()
1308 })
1309 .await;
1310
1311 assert!(!events.is_empty());
1312
1313 let log = events[0].as_log();
1314 assert_eq!(
1315 log.get("@metadata.beat"),
1316 Some(String::from("heartbeat").into()).as_ref()
1317 );
1318 assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
1319 assert!(log.get("timestamp").is_some());
1320 assert!(log.get("host").is_some());
1321 }
1322
1323 fn logstash_address() -> String {
1324 std::env::var("LOGSTASH_ADDRESS")
1325 .expect("Listen address for `logstash` source must be specified.")
1326 }
1327
1328 #[tokio::test]
1329 async fn logstash() {
1330 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1331 let out = source(
1332 logstash_address(),
1333 Some(TlsEnableableConfig {
1334 enabled: Some(true),
1335 options: TlsConfig {
1336 crt_file: Some(
1337 "tests/integration/shared/data/host.docker.internal.crt".into(),
1338 ),
1339 key_file: Some(
1340 "tests/integration/shared/data/host.docker.internal.key".into(),
1341 ),
1342 ..Default::default()
1343 },
1344 }),
1345 )
1346 .await;
1347
1348 timeout(Duration::from_secs(60), collect_n(out, 1))
1349 .await
1350 .unwrap()
1351 })
1352 .await;
1353
1354 assert!(!events.is_empty());
1355
1356 let log = events[0].as_log();
1357 assert!(
1358 log.get("line")
1359 .unwrap()
1360 .to_string_lossy()
1361 .contains("Hello World")
1362 );
1363 assert!(log.get("host").is_some());
1364 }
1365
1366 async fn source(
1367 address: String,
1368 tls: Option<TlsEnableableConfig>,
1369 ) -> impl Stream<Item = Event> + Unpin {
1370 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1371 let address: SocketAddr = address.parse().unwrap();
1372 let tls_config = TlsSourceConfig {
1373 client_metadata_key: None,
1374 tls_config: tls.unwrap_or_default(),
1375 };
1376 tokio::spawn(async move {
1377 LogstashConfig {
1378 address: address.into(),
1379 tls: Some(tls_config),
1380 keepalive: None,
1381 permit_origin: None,
1382 receive_buffer_bytes: None,
1383 acknowledgements: false.into(),
1384 connection_limit: None,
1385 log_namespace: None,
1386 }
1387 .build(SourceContext::new_test(sender, None))
1388 .await
1389 .unwrap()
1390 .await
1391 .unwrap()
1392 });
1393 wait_for_tcp(address).await;
1394 recv
1395 }
1396}