1use std::{path::PathBuf, sync::Arc, time::Duration};
2
3use serde_with::serde_as;
4use snafu::{ResultExt, Snafu};
5pub use vector_lib::event::lua;
6use vector_lib::{
7 codecs::MetricTagValues,
8 configurable::configurable_component,
9 transform::runtime_transform::{RuntimeTransform, Timer},
10};
11
12use crate::{
13 config::{self, CONFIG_PATHS, ComponentKey, DataType, Input, OutputId, TransformOutput},
14 event::{Event, lua::event::LuaEvent},
15 internal_events::{LuaBuildError, LuaGcTriggered},
16 schema,
17 schema::Definition,
18 transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum BuildError {
23 #[snafu(display("Invalid \"search_dirs\": {}", source))]
24 InvalidSearchDirs { source: mlua::Error },
25 #[snafu(display("Cannot evaluate Lua code in \"source\": {}", source))]
26 InvalidSource { source: mlua::Error },
27
28 #[snafu(display("Cannot evaluate Lua code defining \"hooks.init\": {}", source))]
29 InvalidHooksInit { source: mlua::Error },
30 #[snafu(display("Cannot evaluate Lua code defining \"hooks.process\": {}", source))]
31 InvalidHooksProcess { source: mlua::Error },
32 #[snafu(display("Cannot evaluate Lua code defining \"hooks.shutdown\": {}", source))]
33 InvalidHooksShutdown { source: mlua::Error },
34 #[snafu(display("Cannot evaluate Lua code defining timer handler: {}", source))]
35 InvalidTimerHandler { source: mlua::Error },
36
37 #[snafu(display("Runtime error in \"hooks.init\" function: {}", source))]
38 RuntimeErrorHooksInit { source: mlua::Error },
39 #[snafu(display("Runtime error in \"hooks.process\" function: {}", source))]
40 RuntimeErrorHooksProcess { source: mlua::Error },
41 #[snafu(display("Runtime error in \"hooks.shutdown\" function: {}", source))]
42 RuntimeErrorHooksShutdown { source: mlua::Error },
43 #[snafu(display("Runtime error in timer handler: {}", source))]
44 RuntimeErrorTimerHandler { source: mlua::Error },
45
46 #[snafu(display("Cannot call GC in Lua runtime: {}", source))]
47 RuntimeErrorGc { source: mlua::Error },
48}
49
50#[configurable_component]
52#[derive(Clone, Debug)]
53#[serde(deny_unknown_fields)]
54pub struct LuaConfig {
55 #[configurable(metadata(
61 docs::examples = "function init()\n\tcount = 0\nend\n\nfunction process()\n\tcount = count + 1\nend\n\nfunction timer_handler(emit)\n\temit(make_counter(counter))\n\tcounter = 0\nend\n\nfunction shutdown(emit)\n\temit(make_counter(counter))\nend\n\nfunction make_counter(value)\n\treturn metric = {\n\t\tname = \"event_counter\",\n\t\tkind = \"incremental\",\n\t\ttimestamp = os.date(\"!*t\"),\n\t\tcounter = {\n\t\t\tvalue = value\n\t\t}\n \t}\nend",
62 docs::examples = "-- external file with hooks and timers defined\nrequire('custom_module')",
63 ))]
64 source: Option<String>,
65
66 #[serde(default = "default_config_paths")]
70 #[configurable(metadata(docs::examples = "/etc/vector/lua"))]
71 #[configurable(metadata(docs::human_name = "Search Directories"))]
72 search_dirs: Vec<PathBuf>,
73
74 #[configurable(derived)]
75 hooks: HooksConfig,
76
77 #[serde(default)]
79 timers: Vec<TimerConfig>,
80
81 #[serde(default)]
88 metric_tag_values: MetricTagValues,
89}
90
91fn default_config_paths() -> Vec<PathBuf> {
92 match CONFIG_PATHS.lock().ok() {
93 Some(config_paths) => config_paths
94 .clone()
95 .into_iter()
96 .map(|config_path| match config_path {
97 config::ConfigPath::File(mut path, _format) => {
98 path.pop();
99 path
100 }
101 config::ConfigPath::Dir(path) => path,
102 })
103 .collect(),
104 None => vec![],
105 }
106}
107
108#[configurable_component]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields)]
114struct HooksConfig {
115 #[configurable(metadata(
122 docs::examples = "function (emit)\n\t-- Custom Lua code here\nend",
123 docs::examples = "init",
124 ))]
125 init: Option<String>,
126
127 #[configurable(metadata(
135 docs::examples = "function (event, emit)\n\tevent.log.field = \"value\" -- set value of a field\n\tevent.log.another_field = nil -- remove field\n\tevent.log.first, event.log.second = nil, event.log.first -- rename field\n\t-- Very important! Emit the processed event.\n\temit(event)\nend",
136 docs::examples = "process",
137 ))]
138 process: String,
139
140 #[configurable(metadata(
147 docs::examples = "function (emit)\n\t-- Custom Lua code here\nend",
148 docs::examples = "shutdown",
149 ))]
150 shutdown: Option<String>,
151}
152
153#[serde_as]
155#[configurable_component]
156#[derive(Clone, Debug)]
157struct TimerConfig {
158 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
160 #[configurable(metadata(docs::human_name = "Interval"))]
161 interval_seconds: Duration,
162
163 #[configurable(metadata(docs::examples = "timer_handler"))]
171 handler: String,
172}
173
174impl LuaConfig {
175 pub fn build(&self, key: ComponentKey) -> crate::Result<Transform> {
176 Lua::new(self, key).map(Transform::event_task)
177 }
178
179 pub fn input(&self) -> Input {
180 Input::new(DataType::Metric | DataType::Log)
181 }
182
183 pub fn outputs(
184 &self,
185 input_definitions: &[(OutputId, schema::Definition)],
186 ) -> Vec<TransformOutput> {
187 let namespaces = input_definitions
189 .iter()
190 .flat_map(|(_output, definition)| definition.log_namespaces().clone())
191 .collect();
192
193 let definition = input_definitions
194 .iter()
195 .map(|(output, _definition)| {
196 (
197 output.clone(),
198 Definition::default_for_namespace(&namespaces),
199 )
200 })
201 .collect();
202
203 vec![TransformOutput::new(
204 DataType::Metric | DataType::Log,
205 definition,
206 )]
207 }
208}
209
210const GC_INTERVAL: usize = 16;
217
218pub struct Lua {
219 lua: mlua::Lua,
220 invocations_after_gc: usize,
221 hook_init: Option<mlua::RegistryKey>,
222 hook_process: mlua::RegistryKey,
223 hook_shutdown: Option<mlua::RegistryKey>,
224 timers: Vec<(Timer, mlua::RegistryKey)>,
225 multi_value_tags: bool,
226 source_id: Arc<ComponentKey>,
227}
228
229fn make_registry_value(lua: &mlua::Lua, source: &str) -> mlua::Result<mlua::RegistryKey> {
231 lua.load(source)
232 .eval::<mlua::Function>()
233 .and_then(|f| lua.create_registry_value(f))
234}
235
236impl Lua {
237 pub fn new(config: &LuaConfig, key: ComponentKey) -> crate::Result<Self> {
238 let lua = unsafe {
241 mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
242 };
243
244 let additional_paths = config
245 .search_dirs
246 .iter()
247 .map(|d| format!("{}/?.lua", d.to_string_lossy()))
248 .collect::<Vec<_>>()
249 .join(";");
250
251 let mut timers = Vec::new();
252
253 if !additional_paths.is_empty() {
254 let package = lua.globals().get::<mlua::Table>("package")?;
255 let current_paths = package
256 .get::<String>("path")
257 .unwrap_or_else(|_| ";".to_string());
258 let paths = format!("{additional_paths};{current_paths}");
259 package.set("path", paths)?;
260 }
261
262 if let Some(source) = &config.source {
263 lua.load(source).eval::<()>().context(InvalidSourceSnafu)?;
264 }
265
266 let hook_init_code = config.hooks.init.as_ref();
267 let hook_init = hook_init_code
268 .map(|code| make_registry_value(&lua, code))
269 .transpose()
270 .context(InvalidHooksInitSnafu)?;
271
272 let hook_process =
273 make_registry_value(&lua, &config.hooks.process).context(InvalidHooksProcessSnafu)?;
274
275 let hook_shutdown_code = config.hooks.shutdown.as_ref();
276 let hook_shutdown = hook_shutdown_code
277 .map(|code| make_registry_value(&lua, code))
278 .transpose()
279 .context(InvalidHooksShutdownSnafu)?;
280
281 for (id, timer) in config.timers.iter().enumerate() {
282 let handler_key = lua
283 .load(&timer.handler)
284 .eval::<mlua::Function>()
285 .and_then(|f| lua.create_registry_value(f))
286 .context(InvalidTimerHandlerSnafu)?;
287
288 let timer = Timer {
289 id: id as u32,
290 interval: timer.interval_seconds,
291 };
292 timers.push((timer, handler_key));
293 }
294
295 let multi_value_tags = config.metric_tag_values == MetricTagValues::Full;
296
297 Ok(Self {
298 lua,
299 invocations_after_gc: 0,
300 timers,
301 hook_init,
302 hook_process,
303 hook_shutdown,
304 multi_value_tags,
305 source_id: Arc::new(key),
306 })
307 }
308
309 #[cfg(test)]
310 fn process(&mut self, event: Event, output: &mut Vec<Event>) -> Result<(), mlua::Error> {
311 let source_id = event.source_id().cloned();
312 let lua = &self.lua;
313 let result = lua.scope(|scope| {
314 let emit = scope.create_function_mut(|_, mut event: Event| {
315 if let Some(source_id) = &source_id {
316 event.set_source_id(Arc::clone(source_id));
317 }
318 output.push(event);
319 Ok(())
320 })?;
321
322 lua.registry_value::<mlua::Function>(&self.hook_process)?
323 .call((
324 LuaEvent {
325 event,
326 metric_multi_value_tags: self.multi_value_tags,
327 },
328 emit,
329 ))
330 });
331
332 self.attempt_gc();
333 result
334 }
335
336 #[cfg(test)]
337 fn process_single(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
338 let mut out = Vec::new();
339 self.process(event, &mut out)?;
340 assert!(out.len() <= 1);
341 Ok(out.into_iter().next())
342 }
343
344 fn attempt_gc(&mut self) {
345 self.invocations_after_gc += 1;
346 if self.invocations_after_gc.is_multiple_of(GC_INTERVAL) {
347 emit!(LuaGcTriggered {
348 used_memory: self.lua.used_memory()
349 });
350 _ = self
351 .lua
352 .gc_collect()
353 .context(RuntimeErrorGcSnafu)
354 .map_err(|error| error!(%error, rate_limit = 30));
355 self.invocations_after_gc = 0;
356 }
357 }
358}
359
360fn wrap_emit_fn<'scope, 'env, F: 'scope + FnMut(Event)>(
362 scope: &'scope mlua::Scope<'scope, 'env>,
363 mut emit_fn: F,
364 source_id: Arc<ComponentKey>,
365) -> mlua::Result<mlua::Function> {
366 scope.create_function_mut(move |_, mut event: Event| -> mlua::Result<()> {
367 event.set_source_id(Arc::clone(&source_id));
368 emit_fn(event);
369 Ok(())
370 })
371}
372
373impl RuntimeTransform for Lua {
374 fn hook_process<F>(&mut self, event: Event, emit_fn: F)
375 where
376 F: FnMut(Event),
377 {
378 let lua = &self.lua;
379 let source_id = Arc::clone(event.source_id().unwrap_or(&self.source_id));
380 _ = lua
381 .scope(|scope| -> mlua::Result<()> {
382 lua.registry_value::<mlua::Function>(&self.hook_process)?
383 .call((
384 LuaEvent {
385 event,
386 metric_multi_value_tags: self.multi_value_tags,
387 },
388 wrap_emit_fn(scope, emit_fn, source_id)?,
389 ))
390 })
391 .context(RuntimeErrorHooksProcessSnafu)
392 .map_err(|e| emit!(LuaBuildError { error: e }));
393
394 self.attempt_gc();
395 }
396
397 fn hook_init<F>(&mut self, emit_fn: F)
398 where
399 F: FnMut(Event),
400 {
401 let lua = &self.lua;
402 _ = lua
403 .scope(|scope| -> mlua::Result<()> {
404 match &self.hook_init {
405 Some(key) => lua
406 .registry_value::<mlua::Function>(key)?
407 .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?),
408 None => Ok(()),
409 }
410 })
411 .context(RuntimeErrorHooksInitSnafu)
412 .map_err(|error| error!(%error, rate_limit = 30));
413
414 self.attempt_gc();
415 }
416
417 fn hook_shutdown<F>(&mut self, emit_fn: F)
418 where
419 F: FnMut(Event),
420 {
421 let lua = &self.lua;
422 _ = lua
423 .scope(|scope| -> mlua::Result<()> {
424 match &self.hook_shutdown {
425 Some(key) => lua
426 .registry_value::<mlua::Function>(key)?
427 .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?),
428 None => Ok(()),
429 }
430 })
431 .context(RuntimeErrorHooksShutdownSnafu)
432 .map_err(|error| error!(%error, rate_limit = 30));
433
434 self.attempt_gc();
435 }
436
437 fn timer_handler<F>(&mut self, timer: Timer, emit_fn: F)
438 where
439 F: FnMut(Event),
440 {
441 let lua = &self.lua;
442 _ = lua
443 .scope(|scope| -> mlua::Result<()> {
444 let handler_key = &self.timers[timer.id as usize].1;
445 lua.registry_value::<mlua::Function>(handler_key)?
446 .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?)
447 })
448 .context(RuntimeErrorTimerHandlerSnafu)
449 .map_err(|error| error!(%error, rate_limit = 30));
450
451 self.attempt_gc();
452 }
453
454 fn timers(&self) -> Vec<Timer> {
455 self.timers.iter().map(|(timer, _)| *timer).collect()
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use std::{future::Future, sync::Arc};
462
463 use indoc::indoc;
464 use similar_asserts::assert_eq;
465 use tokio::sync::{
466 Mutex,
467 mpsc::{self, Receiver, Sender},
468 };
469 use tokio_stream::wrappers::ReceiverStream;
470
471 use super::*;
472 use crate::{
473 event::{
474 Event, LogEvent, Value,
475 metric::{Metric, MetricKind, MetricValue},
476 },
477 test_util,
478 test_util::{components::assert_transform_compliance, random_string},
479 transforms::test::create_topology,
480 };
481
482 fn format_error(error: &mlua::Error) -> String {
483 match error {
484 mlua::Error::CallbackError { traceback, cause } => {
485 format_error(cause) + "\n" + traceback
486 }
487 err => err.to_string(),
488 }
489 }
490
491 fn from_config(config: &str) -> crate::Result<Box<Lua>> {
492 Lua::new(&serde_yaml::from_str(config).unwrap(), "transform".into()).map(Box::new)
493 }
494
495 async fn run_transform<T: Future>(
496 config: &str,
497 func: impl FnOnce(Sender<Event>, Arc<Mutex<Receiver<Event>>>) -> T,
498 ) -> T::Output {
499 test_util::trace_init();
500 assert_transform_compliance(async move {
501 let config = super::super::LuaConfig::V2(serde_yaml::from_str(config).unwrap());
502 let (tx, rx) = mpsc::channel(1);
503 let (topology, out) = create_topology(ReceiverStream::new(rx), config).await;
504
505 let out = Arc::new(tokio::sync::Mutex::new(out));
506
507 let result = func(tx, Arc::clone(&out)).await;
508
509 topology.stop().await;
510 assert_eq!(out.lock().await.recv().await, None);
511
512 result
513 })
514 .await
515 }
516
517 async fn next_event(out: &Arc<Mutex<Receiver<Event>>>, source: &str) -> Event {
518 let event = out
519 .lock()
520 .await
521 .recv()
522 .await
523 .expect("Event was not received");
524 assert_eq!(
525 event.source_id(),
526 Some(&Arc::new(ComponentKey::from(source)))
527 );
528 event
529 }
530
531 #[tokio::test]
532 async fn lua_runs_init_hook() {
533 let line1 = random_string(9);
534 run_transform(
535 &indoc::formatdoc! {r#"
536 version: "2"
537 hooks:
538 init: |
539 function (emit)
540 event = {{log={{message="{line1}"}}}}
541 emit(event)
542 end
543 process: |
544 function (event, emit)
545 emit(event)
546 end
547 "#},
548 |tx, out| async move {
549 let line2 = random_string(9);
550 tx.send(Event::Log(LogEvent::from(line2.as_str())))
551 .await
552 .unwrap();
553 drop(tx);
554 assert_eq!(
555 next_event(&out, "transform").await.as_log()["message"],
556 line1.into()
557 );
558 assert_eq!(
559 next_event(&out, "in").await.as_log()["message"],
560 line2.into(),
561 );
562 },
563 )
564 .await;
565 }
566
567 #[tokio::test]
568 async fn lua_add_field() {
569 run_transform(
570 indoc! {r#"
571 version: "2"
572 hooks:
573 process: |
574 function (event, emit)
575 event["log"]["hello"] = "goodbye"
576 emit(event)
577 end
578 "#},
579 |tx, out| async move {
580 let event = Event::Log(LogEvent::from("program me"));
581 tx.send(event).await.unwrap();
582
583 assert_eq!(
584 next_event(&out, "in").await.as_log()["hello"],
585 "goodbye".into()
586 );
587 },
588 )
589 .await;
590 }
591
592 #[tokio::test]
593 async fn lua_read_field() {
594 run_transform(
595 indoc! {r#"
596 version: "2"
597 hooks:
598 process: |
599 function (event, emit)
600 _, _, name = string.find(event.log.message, "Hello, my name is (%a+).")
601 event.log.name = name
602 emit(event)
603 end
604 "#},
605 |tx, out| async move {
606 let event = Event::Log(LogEvent::from("Hello, my name is Bob."));
607 tx.send(event).await.unwrap();
608
609 assert_eq!(next_event(&out, "in").await.as_log()["name"], "Bob".into());
610 },
611 )
612 .await;
613 }
614
615 #[tokio::test]
616 async fn lua_remove_field() {
617 run_transform(
618 indoc! {r#"
619 version: "2"
620 hooks:
621 process: |
622 function (event, emit)
623 event.log.name = nil
624 emit(event)
625 end
626 "#},
627 |tx, out| async move {
628 let mut event = LogEvent::default();
629 event.insert("name", "Bob");
630
631 tx.send(event.into()).await.unwrap();
632
633 assert_eq!(next_event(&out, "in").await.as_log().get("name"), None);
634 },
635 )
636 .await;
637 }
638
639 #[tokio::test]
640 async fn lua_drop_event() {
641 run_transform(
642 indoc! {r#"
643 version: "2"
644 hooks:
645 process: |
646 function (event, emit)
647 -- emit nothing
648 end
649 "#},
650 |tx, _out| async move {
651 let event = LogEvent::default().into();
652 tx.send(event).await.unwrap();
653
654 },
656 )
657 .await;
658 }
659
660 #[tokio::test]
661 async fn lua_duplicate_event() {
662 run_transform(
663 indoc! {r#"
664 version: "2"
665 hooks:
666 process: |
667 function (event, emit)
668 emit(event)
669 emit(event)
670 end
671 "#},
672 |tx, out| async move {
673 let mut event = LogEvent::default();
674 event.insert("host", "127.0.0.1");
675 tx.send(event.into()).await.unwrap();
676
677 assert!(out.lock().await.recv().await.is_some());
678 assert!(out.lock().await.recv().await.is_some());
679 },
680 )
681 .await;
682 }
683
684 #[tokio::test]
685 async fn lua_read_empty_field() {
686 run_transform(
687 indoc! {r#"
688 version: "2"
689 hooks:
690 process: |
691 function (event, emit)
692 if event["log"]["non-existent"] == nil then
693 event["log"]["result"] = "empty"
694 else
695 event["log"]["result"] = "found"
696 end
697 emit(event)
698 end
699 "#},
700 |tx, out| async move {
701 let event = LogEvent::default();
702 tx.send(event.into()).await.unwrap();
703
704 assert_eq!(
705 next_event(&out, "in").await.as_log()["result"],
706 "empty".into()
707 );
708 },
709 )
710 .await;
711 }
712
713 #[tokio::test]
714 async fn lua_integer_value() {
715 run_transform(
716 indoc! {r#"
717 version: "2"
718 hooks:
719 process: |
720 function (event, emit)
721 event["log"]["number"] = 3
722 emit(event)
723 end
724 "#},
725 |tx, out| async move {
726 let event = LogEvent::default();
727 tx.send(event.into()).await.unwrap();
728
729 assert_eq!(
730 next_event(&out, "in").await.as_log()["number"],
731 Value::Integer(3)
732 );
733 },
734 )
735 .await;
736 }
737
738 #[tokio::test]
739 async fn lua_numeric_value() {
740 run_transform(
741 indoc! {r#"
742 version: "2"
743 hooks:
744 process: |
745 function (event, emit)
746 event["log"]["number"] = 3.14159
747 emit(event)
748 end
749 "#},
750 |tx, out| async move {
751 let event = LogEvent::default();
752 tx.send(event.into()).await.unwrap();
753
754 assert_eq!(
755 next_event(&out, "in").await.as_log()["number"],
756 Value::from(3.14159)
757 );
758 },
759 )
760 .await;
761 }
762
763 #[tokio::test]
764 async fn lua_boolean_value() {
765 run_transform(
766 indoc! {r#"
767 version: "2"
768 hooks:
769 process: |
770 function (event, emit)
771 event["log"]["bool"] = true
772 emit(event)
773 end
774 "#},
775 |tx, out| async move {
776 let event = LogEvent::default();
777 tx.send(event.into()).await.unwrap();
778
779 assert_eq!(
780 next_event(&out, "in").await.as_log()["bool"],
781 Value::Boolean(true)
782 );
783 },
784 )
785 .await;
786 }
787
788 #[tokio::test]
789 async fn lua_non_coercible_value() {
790 run_transform(
791 indoc! {r#"
792 version: "2"
793 hooks:
794 process: |
795 function (event, emit)
796 event["log"]["junk"] = nil
797 emit(event)
798 end
799 "#},
800 |tx, out| async move {
801 let event = LogEvent::default();
802 tx.send(event.into()).await.unwrap();
803
804 assert_eq!(next_event(&out, "in").await.as_log().get("junk"), None);
805 },
806 )
807 .await;
808 }
809
810 #[tokio::test]
811 async fn lua_non_string_key_write() -> crate::Result<()> {
812 let mut transform = from_config(indoc! {r#"
813 hooks:
814 process: |
815 function (event, emit)
816 event["log"][false] = "hello"
817 emit(event)
818 end
819 "#})
820 .unwrap();
821
822 let err = transform
823 .process_single(LogEvent::default().into())
824 .unwrap_err();
825 let err = format_error(&err);
826 assert!(
827 err.contains("error converting Lua boolean to String"),
828 "{}",
829 err
830 );
831 Ok(())
832 }
833
834 #[tokio::test]
835 async fn lua_non_string_key_read() {
836 run_transform(
837 indoc! {r#"
838 version: "2"
839 hooks:
840 process: |
841 function (event, emit)
842 event.log.result = event.log[false]
843 emit(event)
844 end
845 "#},
846 |tx, out| async move {
847 let event = LogEvent::default();
848 tx.send(event.into()).await.unwrap();
849
850 assert_eq!(next_event(&out, "in").await.as_log().get("result"), None);
851 },
852 )
853 .await;
854 }
855
856 #[tokio::test]
857 async fn lua_script_error() -> crate::Result<()> {
858 let mut transform = from_config(indoc! {r#"
859 hooks:
860 process: |
861 function (event, emit)
862 error("this is an error")
863 end
864 "#})
865 .unwrap();
866
867 let err = transform
868 .process_single(LogEvent::default().into())
869 .unwrap_err();
870 let err = format_error(&err);
871 assert!(err.contains("this is an error"), "{}", err);
872 Ok(())
873 }
874
875 #[tokio::test]
876 async fn lua_syntax_error() -> crate::Result<()> {
877 let err = from_config(indoc! {r#"
878 hooks:
879 process: |
880 function (event, emit)
881 1234 = sadf <>&*!#@
882 end
883 "#})
884 .map(|_| ())
885 .unwrap_err()
886 .to_string();
887
888 assert!(err.contains("syntax error:"), "{}", err);
889 Ok(())
890 }
891
892 #[tokio::test]
893 async fn lua_load_file() {
894 use std::{fs::File, io::Write};
895
896 let dir = tempfile::tempdir().unwrap();
897 let mut file = File::create(dir.path().join("script2.lua")).unwrap();
898 write!(
899 &mut file,
900 r#"
901 local M = {{}}
902
903 local function modify(event2)
904 event2["log"]["new field"] = "new value"
905 end
906 M.modify = modify
907
908 return M
909 "#
910 )
911 .unwrap();
912
913 run_transform(
914 &indoc::formatdoc! {r#"
915 version: "2"
916 hooks:
917 process: |
918 function (event, emit)
919 local script2 = require("script2")
920 script2.modify(event)
921 emit(event)
922 end
923 search_dirs:
924 - {dir}
925 "#,
926 dir = dir.path().as_os_str().to_string_lossy(), },
928 |tx, out| async move {
929 let event = LogEvent::default();
930 tx.send(event.into()).await.unwrap();
931
932 assert_eq!(
933 next_event(&out, "in").await.as_log()["\"new field\""],
934 "new value".into()
935 );
936 },
937 )
938 .await;
939 }
940
941 #[tokio::test]
942 async fn lua_pairs() {
943 run_transform(
944 indoc! {r#"
945 version: "2"
946 hooks:
947 process: |
948 function (event, emit)
949 for k,v in pairs(event.log) do
950 event.log[k] = k .. v
951 end
952 emit(event)
953 end
954 "#},
955 |tx, out| async move {
956 let mut event = LogEvent::default();
957 event.insert("name", "Bob");
958 event.insert("friend", "Alice");
959 tx.send(event.into()).await.unwrap();
960
961 let output = next_event(&out, "in").await;
962
963 assert_eq!(output.as_log()["name"], "nameBob".into());
964 assert_eq!(output.as_log()["friend"], "friendAlice".into());
965 },
966 )
967 .await;
968 }
969
970 #[tokio::test]
971 async fn lua_metric() {
972 run_transform(
973 indoc! {r#"
974 version: "2"
975 hooks:
976 process: |
977 function (event, emit)
978 event.metric.counter.value = event.metric.counter.value + 1
979 emit(event)
980 end
981 "#},
982 |tx, out| async move {
983 let metric = Metric::new(
984 "example counter",
985 MetricKind::Absolute,
986 MetricValue::Counter { value: 1.0 },
987 );
988
989 let mut expected = metric
990 .clone()
991 .with_value(MetricValue::Counter { value: 2.0 });
992 let metadata = expected.metadata_mut();
993 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
994 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
995
996 tx.send(metric.into()).await.unwrap();
997
998 assert_eq!(next_event(&out, "in").await.as_metric(), &expected);
999 },
1000 )
1001 .await;
1002 }
1003
1004 #[tokio::test]
1005 async fn lua_multiple_events() {
1006 run_transform(
1007 indoc! {r#"
1008 version: "2"
1009 hooks:
1010 process: |
1011 function (event, emit)
1012 event["log"]["hello"] = "goodbye"
1013 emit(event)
1014 end
1015 "#},
1016 |tx, out| async move {
1017 let n: usize = 10;
1018 let events = (0..n).map(|i| Event::Log(LogEvent::from(format!("program me {i}"))));
1019 for event in events {
1020 tx.send(event).await.unwrap();
1021 assert!(out.lock().await.recv().await.is_some());
1022 }
1023 },
1024 )
1025 .await;
1026 }
1027}