Skip to main content

vector/transforms/lua/v2/
mod.rs

1use std::{path::PathBuf, sync::Arc, time::Duration};
2
3use serde_with::serde_as;
4use snafu::{ResultExt, Snafu};
5pub use vector_lib::event::lua;
6use vector_lib::{
7    codecs::MetricTagValues,
8    configurable::configurable_component,
9    transform::runtime_transform::{RuntimeTransform, Timer},
10};
11
12use crate::{
13    config::{self, CONFIG_PATHS, ComponentKey, DataType, Input, OutputId, TransformOutput},
14    event::{Event, lua::event::LuaEvent},
15    internal_events::{LuaBuildError, LuaGcTriggered},
16    schema,
17    schema::Definition,
18    transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum BuildError {
23    #[snafu(display("Invalid \"search_dirs\": {}", source))]
24    InvalidSearchDirs { source: mlua::Error },
25    #[snafu(display("Cannot evaluate Lua code in \"source\": {}", source))]
26    InvalidSource { source: mlua::Error },
27
28    #[snafu(display("Cannot evaluate Lua code defining \"hooks.init\": {}", source))]
29    InvalidHooksInit { source: mlua::Error },
30    #[snafu(display("Cannot evaluate Lua code defining \"hooks.process\": {}", source))]
31    InvalidHooksProcess { source: mlua::Error },
32    #[snafu(display("Cannot evaluate Lua code defining \"hooks.shutdown\": {}", source))]
33    InvalidHooksShutdown { source: mlua::Error },
34    #[snafu(display("Cannot evaluate Lua code defining timer handler: {}", source))]
35    InvalidTimerHandler { source: mlua::Error },
36
37    #[snafu(display("Runtime error in \"hooks.init\" function: {}", source))]
38    RuntimeErrorHooksInit { source: mlua::Error },
39    #[snafu(display("Runtime error in \"hooks.process\" function: {}", source))]
40    RuntimeErrorHooksProcess { source: mlua::Error },
41    #[snafu(display("Runtime error in \"hooks.shutdown\" function: {}", source))]
42    RuntimeErrorHooksShutdown { source: mlua::Error },
43    #[snafu(display("Runtime error in timer handler: {}", source))]
44    RuntimeErrorTimerHandler { source: mlua::Error },
45
46    #[snafu(display("Cannot call GC in Lua runtime: {}", source))]
47    RuntimeErrorGc { source: mlua::Error },
48}
49
50/// Configuration for the version two of the `lua` transform.
51#[configurable_component]
52#[derive(Clone, Debug)]
53#[serde(deny_unknown_fields)]
54pub struct LuaConfig {
55    /// The Lua program to initialize the transform with.
56    ///
57    /// The program can be used to import external dependencies, as well as define the functions
58    /// used for the various lifecycle hooks. However, it's not strictly required, as the lifecycle
59    /// hooks can be configured directly with inline Lua source for each respective hook.
60    #[configurable(metadata(
61        docs::examples = "function init()\n\tcount = 0\nend\n\nfunction process()\n\tcount = count + 1\nend\n\nfunction timer_handler(emit)\n\temit(make_counter(counter))\n\tcounter = 0\nend\n\nfunction shutdown(emit)\n\temit(make_counter(counter))\nend\n\nfunction make_counter(value)\n\treturn metric = {\n\t\tname = \"event_counter\",\n\t\tkind = \"incremental\",\n\t\ttimestamp = os.date(\"!*t\"),\n\t\tcounter = {\n\t\t\tvalue = value\n\t\t}\n \t}\nend",
62        docs::examples = "-- external file with hooks and timers defined\nrequire('custom_module')",
63    ))]
64    source: Option<String>,
65
66    /// A list of directories to search when loading a Lua file via the `require` function.
67    ///
68    /// If not specified, the modules are looked up in the configuration directories.
69    #[serde(default = "default_config_paths")]
70    #[configurable(metadata(docs::examples = "/etc/vector/lua"))]
71    #[configurable(metadata(docs::human_name = "Search Directories"))]
72    search_dirs: Vec<PathBuf>,
73
74    #[configurable(derived)]
75    hooks: HooksConfig,
76
77    /// A list of timers which should be configured and executed periodically.
78    #[serde(default)]
79    timers: Vec<TimerConfig>,
80
81    /// When set to `single`, metric tag values are exposed as single strings, the
82    /// same as they were before this config option. Tags with multiple values show the last assigned value, and null values
83    /// are ignored.
84    ///
85    /// When set to `full`, all metric tags are exposed as arrays of either string or null
86    /// values.
87    #[serde(default)]
88    metric_tag_values: MetricTagValues,
89}
90
91fn default_config_paths() -> Vec<PathBuf> {
92    match CONFIG_PATHS.lock().ok() {
93        Some(config_paths) => config_paths
94            .clone()
95            .into_iter()
96            .map(|config_path| match config_path {
97                config::ConfigPath::File(mut path, _format) => {
98                    path.pop();
99                    path
100                }
101                config::ConfigPath::Dir(path) => path,
102            })
103            .collect(),
104        None => vec![],
105    }
106}
107
108/// Lifecycle hooks.
109///
110/// These hooks can be set to perform additional processing during the lifecycle of the transform.
111#[configurable_component]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields)]
114struct HooksConfig {
115    /// The function called when the first event comes in, before `hooks.process` is called.
116    ///
117    /// It can produce new events using the `emit` function.
118    ///
119    /// This can either be inline Lua that defines a closure to use, or the name of the Lua function to call. In both
120    /// cases, the closure/function takes a single parameter, `emit`, which is a reference to a function for emitting events.
121    #[configurable(metadata(
122        docs::examples = "function (emit)\n\t-- Custom Lua code here\nend",
123        docs::examples = "init",
124    ))]
125    init: Option<String>,
126
127    /// The function called for each incoming event.
128    ///
129    /// It can produce new events using the `emit` function.
130    ///
131    /// This can either be inline Lua that defines a closure to use, or the name of the Lua function to call. In both
132    /// cases, the closure/function takes two parameters. The first parameter, `event`, is the event being processed,
133    /// while the second parameter, `emit`, is a reference to a function for emitting events.
134    #[configurable(metadata(
135        docs::examples = "function (event, emit)\n\tevent.log.field = \"value\" -- set value of a field\n\tevent.log.another_field = nil -- remove field\n\tevent.log.first, event.log.second = nil, event.log.first -- rename field\n\t-- Very important! Emit the processed event.\n\temit(event)\nend",
136        docs::examples = "process",
137    ))]
138    process: String,
139
140    /// The function called when the transform is stopped.
141    ///
142    /// It can produce new events using the `emit` function.
143    ///
144    /// This can either be inline Lua that defines a closure to use, or the name of the Lua function to call. In both
145    /// cases, the closure/function takes a single parameter, `emit`, which is a reference to a function for emitting events.
146    #[configurable(metadata(
147        docs::examples = "function (emit)\n\t-- Custom Lua code here\nend",
148        docs::examples = "shutdown",
149    ))]
150    shutdown: Option<String>,
151}
152
153/// A Lua timer.
154#[serde_as]
155#[configurable_component]
156#[derive(Clone, Debug)]
157struct TimerConfig {
158    /// The interval to execute the handler, in seconds.
159    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
160    #[configurable(metadata(docs::human_name = "Interval"))]
161    interval_seconds: Duration,
162
163    /// The handler function which is called when the timer ticks.
164    ///
165    /// It can produce new events using the `emit` function.
166    ///
167    /// This can either be inline Lua that defines a closure to use, or the name of the Lua function
168    /// to call. In both cases, the closure/function takes a single parameter, `emit`, which is a
169    /// reference to a function for emitting events.
170    #[configurable(metadata(docs::examples = "timer_handler"))]
171    handler: String,
172}
173
174impl LuaConfig {
175    pub fn build(&self, key: ComponentKey) -> crate::Result<Transform> {
176        Lua::new(self, key).map(Transform::event_task)
177    }
178
179    pub fn input(&self) -> Input {
180        Input::new(DataType::Metric | DataType::Log)
181    }
182
183    pub fn outputs(
184        &self,
185        input_definitions: &[(OutputId, schema::Definition)],
186    ) -> Vec<TransformOutput> {
187        // Lua causes the type definition to be reset
188        let namespaces = input_definitions
189            .iter()
190            .flat_map(|(_output, definition)| definition.log_namespaces().clone())
191            .collect();
192
193        let definition = input_definitions
194            .iter()
195            .map(|(output, _definition)| {
196                (
197                    output.clone(),
198                    Definition::default_for_namespace(&namespaces),
199                )
200            })
201            .collect();
202
203        vec![TransformOutput::new(
204            DataType::Metric | DataType::Log,
205            definition,
206        )]
207    }
208}
209
210// Lua's garbage collector sometimes seems to be not executed automatically on high event rates,
211// which leads to leak-like RAM consumption pattern. This constant sets the number of invocations of
212// the Lua transform after which GC would be called, thus ensuring that the RAM usage is not too high.
213//
214// This constant is larger than 1 because calling GC is an expensive operation, so doing it
215// after each transform would have significant footprint on the performance.
216const GC_INTERVAL: usize = 16;
217
218pub struct Lua {
219    lua: mlua::Lua,
220    invocations_after_gc: usize,
221    hook_init: Option<mlua::RegistryKey>,
222    hook_process: mlua::RegistryKey,
223    hook_shutdown: Option<mlua::RegistryKey>,
224    timers: Vec<(Timer, mlua::RegistryKey)>,
225    multi_value_tags: bool,
226    source_id: Arc<ComponentKey>,
227}
228
229// Helper to create `RegistryKey` from Lua function code
230fn make_registry_value(lua: &mlua::Lua, source: &str) -> mlua::Result<mlua::RegistryKey> {
231    lua.load(source)
232        .eval::<mlua::Function>()
233        .and_then(|f| lua.create_registry_value(f))
234}
235
236impl Lua {
237    pub fn new(config: &LuaConfig, key: ComponentKey) -> crate::Result<Self> {
238        // In order to support loading C modules in Lua, we need to create unsafe instance
239        // without debug library.
240        let lua = unsafe {
241            mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
242        };
243
244        let additional_paths = config
245            .search_dirs
246            .iter()
247            .map(|d| format!("{}/?.lua", d.to_string_lossy()))
248            .collect::<Vec<_>>()
249            .join(";");
250
251        let mut timers = Vec::new();
252
253        if !additional_paths.is_empty() {
254            let package = lua.globals().get::<mlua::Table>("package")?;
255            let current_paths = package
256                .get::<String>("path")
257                .unwrap_or_else(|_| ";".to_string());
258            let paths = format!("{additional_paths};{current_paths}");
259            package.set("path", paths)?;
260        }
261
262        if let Some(source) = &config.source {
263            lua.load(source).eval::<()>().context(InvalidSourceSnafu)?;
264        }
265
266        let hook_init_code = config.hooks.init.as_ref();
267        let hook_init = hook_init_code
268            .map(|code| make_registry_value(&lua, code))
269            .transpose()
270            .context(InvalidHooksInitSnafu)?;
271
272        let hook_process =
273            make_registry_value(&lua, &config.hooks.process).context(InvalidHooksProcessSnafu)?;
274
275        let hook_shutdown_code = config.hooks.shutdown.as_ref();
276        let hook_shutdown = hook_shutdown_code
277            .map(|code| make_registry_value(&lua, code))
278            .transpose()
279            .context(InvalidHooksShutdownSnafu)?;
280
281        for (id, timer) in config.timers.iter().enumerate() {
282            let handler_key = lua
283                .load(&timer.handler)
284                .eval::<mlua::Function>()
285                .and_then(|f| lua.create_registry_value(f))
286                .context(InvalidTimerHandlerSnafu)?;
287
288            let timer = Timer {
289                id: id as u32,
290                interval: timer.interval_seconds,
291            };
292            timers.push((timer, handler_key));
293        }
294
295        let multi_value_tags = config.metric_tag_values == MetricTagValues::Full;
296
297        Ok(Self {
298            lua,
299            invocations_after_gc: 0,
300            timers,
301            hook_init,
302            hook_process,
303            hook_shutdown,
304            multi_value_tags,
305            source_id: Arc::new(key),
306        })
307    }
308
309    #[cfg(test)]
310    fn process(&mut self, event: Event, output: &mut Vec<Event>) -> Result<(), mlua::Error> {
311        let source_id = event.source_id().cloned();
312        let lua = &self.lua;
313        let result = lua.scope(|scope| {
314            let emit = scope.create_function_mut(|_, mut event: Event| {
315                if let Some(source_id) = &source_id {
316                    event.set_source_id(Arc::clone(source_id));
317                }
318                output.push(event);
319                Ok(())
320            })?;
321
322            lua.registry_value::<mlua::Function>(&self.hook_process)?
323                .call((
324                    LuaEvent {
325                        event,
326                        metric_multi_value_tags: self.multi_value_tags,
327                    },
328                    emit,
329                ))
330        });
331
332        self.attempt_gc();
333        result
334    }
335
336    #[cfg(test)]
337    fn process_single(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
338        let mut out = Vec::new();
339        self.process(event, &mut out)?;
340        assert!(out.len() <= 1);
341        Ok(out.into_iter().next())
342    }
343
344    fn attempt_gc(&mut self) {
345        self.invocations_after_gc += 1;
346        if self.invocations_after_gc.is_multiple_of(GC_INTERVAL) {
347            emit!(LuaGcTriggered {
348                used_memory: self.lua.used_memory()
349            });
350            _ = self
351                .lua
352                .gc_collect()
353                .context(RuntimeErrorGcSnafu)
354                .map_err(|error| error!(%error, rate_limit = 30));
355            self.invocations_after_gc = 0;
356        }
357    }
358}
359
360// A helper that reduces code duplication.
361fn wrap_emit_fn<'scope, 'env, F: 'scope + FnMut(Event)>(
362    scope: &'scope mlua::Scope<'scope, 'env>,
363    mut emit_fn: F,
364    source_id: Arc<ComponentKey>,
365) -> mlua::Result<mlua::Function> {
366    scope.create_function_mut(move |_, mut event: Event| -> mlua::Result<()> {
367        event.set_source_id(Arc::clone(&source_id));
368        emit_fn(event);
369        Ok(())
370    })
371}
372
373impl RuntimeTransform for Lua {
374    fn hook_process<F>(&mut self, event: Event, emit_fn: F)
375    where
376        F: FnMut(Event),
377    {
378        let lua = &self.lua;
379        let source_id = Arc::clone(event.source_id().unwrap_or(&self.source_id));
380        _ = lua
381            .scope(|scope| -> mlua::Result<()> {
382                lua.registry_value::<mlua::Function>(&self.hook_process)?
383                    .call((
384                        LuaEvent {
385                            event,
386                            metric_multi_value_tags: self.multi_value_tags,
387                        },
388                        wrap_emit_fn(scope, emit_fn, source_id)?,
389                    ))
390            })
391            .context(RuntimeErrorHooksProcessSnafu)
392            .map_err(|e| emit!(LuaBuildError { error: e }));
393
394        self.attempt_gc();
395    }
396
397    fn hook_init<F>(&mut self, emit_fn: F)
398    where
399        F: FnMut(Event),
400    {
401        let lua = &self.lua;
402        _ = lua
403            .scope(|scope| -> mlua::Result<()> {
404                match &self.hook_init {
405                    Some(key) => lua
406                        .registry_value::<mlua::Function>(key)?
407                        .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?),
408                    None => Ok(()),
409                }
410            })
411            .context(RuntimeErrorHooksInitSnafu)
412            .map_err(|error| error!(%error, rate_limit = 30));
413
414        self.attempt_gc();
415    }
416
417    fn hook_shutdown<F>(&mut self, emit_fn: F)
418    where
419        F: FnMut(Event),
420    {
421        let lua = &self.lua;
422        _ = lua
423            .scope(|scope| -> mlua::Result<()> {
424                match &self.hook_shutdown {
425                    Some(key) => lua
426                        .registry_value::<mlua::Function>(key)?
427                        .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?),
428                    None => Ok(()),
429                }
430            })
431            .context(RuntimeErrorHooksShutdownSnafu)
432            .map_err(|error| error!(%error, rate_limit = 30));
433
434        self.attempt_gc();
435    }
436
437    fn timer_handler<F>(&mut self, timer: Timer, emit_fn: F)
438    where
439        F: FnMut(Event),
440    {
441        let lua = &self.lua;
442        _ = lua
443            .scope(|scope| -> mlua::Result<()> {
444                let handler_key = &self.timers[timer.id as usize].1;
445                lua.registry_value::<mlua::Function>(handler_key)?
446                    .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?)
447            })
448            .context(RuntimeErrorTimerHandlerSnafu)
449            .map_err(|error| error!(%error, rate_limit = 30));
450
451        self.attempt_gc();
452    }
453
454    fn timers(&self) -> Vec<Timer> {
455        self.timers.iter().map(|(timer, _)| *timer).collect()
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use std::{future::Future, sync::Arc};
462
463    use indoc::indoc;
464    use similar_asserts::assert_eq;
465    use tokio::sync::{
466        Mutex,
467        mpsc::{self, Receiver, Sender},
468    };
469    use tokio_stream::wrappers::ReceiverStream;
470
471    use super::*;
472    use crate::{
473        event::{
474            Event, LogEvent, Value,
475            metric::{Metric, MetricKind, MetricValue},
476        },
477        test_util,
478        test_util::{components::assert_transform_compliance, random_string},
479        transforms::test::create_topology,
480    };
481
482    fn format_error(error: &mlua::Error) -> String {
483        match error {
484            mlua::Error::CallbackError { traceback, cause } => {
485                format_error(cause) + "\n" + traceback
486            }
487            err => err.to_string(),
488        }
489    }
490
491    fn from_config(config: &str) -> crate::Result<Box<Lua>> {
492        Lua::new(&serde_yaml::from_str(config).unwrap(), "transform".into()).map(Box::new)
493    }
494
495    async fn run_transform<T: Future>(
496        config: &str,
497        func: impl FnOnce(Sender<Event>, Arc<Mutex<Receiver<Event>>>) -> T,
498    ) -> T::Output {
499        test_util::trace_init();
500        assert_transform_compliance(async move {
501            let config = super::super::LuaConfig::V2(serde_yaml::from_str(config).unwrap());
502            let (tx, rx) = mpsc::channel(1);
503            let (topology, out) = create_topology(ReceiverStream::new(rx), config).await;
504
505            let out = Arc::new(tokio::sync::Mutex::new(out));
506
507            let result = func(tx, Arc::clone(&out)).await;
508
509            topology.stop().await;
510            assert_eq!(out.lock().await.recv().await, None);
511
512            result
513        })
514        .await
515    }
516
517    async fn next_event(out: &Arc<Mutex<Receiver<Event>>>, source: &str) -> Event {
518        let event = out
519            .lock()
520            .await
521            .recv()
522            .await
523            .expect("Event was not received");
524        assert_eq!(
525            event.source_id(),
526            Some(&Arc::new(ComponentKey::from(source)))
527        );
528        event
529    }
530
531    #[tokio::test]
532    async fn lua_runs_init_hook() {
533        let line1 = random_string(9);
534        run_transform(
535            &indoc::formatdoc! {r#"
536                version: "2"
537                hooks:
538                  init: |
539                    function (emit)
540                      event = {{log={{message="{line1}"}}}}
541                      emit(event)
542                    end
543                  process: |
544                    function (event, emit)
545                      emit(event)
546                    end
547            "#},
548            |tx, out| async move {
549                let line2 = random_string(9);
550                tx.send(Event::Log(LogEvent::from(line2.as_str())))
551                    .await
552                    .unwrap();
553                drop(tx);
554                assert_eq!(
555                    next_event(&out, "transform").await.as_log()["message"],
556                    line1.into()
557                );
558                assert_eq!(
559                    next_event(&out, "in").await.as_log()["message"],
560                    line2.into(),
561                );
562            },
563        )
564        .await;
565    }
566
567    #[tokio::test]
568    async fn lua_add_field() {
569        run_transform(
570            indoc! {r#"
571            version: "2"
572            hooks:
573              process: |
574                function (event, emit)
575                  event["log"]["hello"] = "goodbye"
576                  emit(event)
577                end
578            "#},
579            |tx, out| async move {
580                let event = Event::Log(LogEvent::from("program me"));
581                tx.send(event).await.unwrap();
582
583                assert_eq!(
584                    next_event(&out, "in").await.as_log()["hello"],
585                    "goodbye".into()
586                );
587            },
588        )
589        .await;
590    }
591
592    #[tokio::test]
593    async fn lua_read_field() {
594        run_transform(
595            indoc! {r#"
596            version: "2"
597            hooks:
598              process: |
599                function (event, emit)
600                  _, _, name = string.find(event.log.message, "Hello, my name is (%a+).")
601                  event.log.name = name
602                  emit(event)
603                end
604            "#},
605            |tx, out| async move {
606                let event = Event::Log(LogEvent::from("Hello, my name is Bob."));
607                tx.send(event).await.unwrap();
608
609                assert_eq!(next_event(&out, "in").await.as_log()["name"], "Bob".into());
610            },
611        )
612        .await;
613    }
614
615    #[tokio::test]
616    async fn lua_remove_field() {
617        run_transform(
618            indoc! {r#"
619            version: "2"
620            hooks:
621              process: |
622                function (event, emit)
623                  event.log.name = nil
624                  emit(event)
625                end
626            "#},
627            |tx, out| async move {
628                let mut event = LogEvent::default();
629                event.insert("name", "Bob");
630
631                tx.send(event.into()).await.unwrap();
632
633                assert_eq!(next_event(&out, "in").await.as_log().get("name"), None);
634            },
635        )
636        .await;
637    }
638
639    #[tokio::test]
640    async fn lua_drop_event() {
641        run_transform(
642            indoc! {r#"
643            version: "2"
644            hooks:
645              process: |
646                function (event, emit)
647                  -- emit nothing
648                end
649            "#},
650            |tx, _out| async move {
651                let event = LogEvent::default().into();
652                tx.send(event).await.unwrap();
653
654                // "run_transform" will assert that the output stream is empty
655            },
656        )
657        .await;
658    }
659
660    #[tokio::test]
661    async fn lua_duplicate_event() {
662        run_transform(
663            indoc! {r#"
664            version: "2"
665            hooks:
666              process: |
667                function (event, emit)
668                  emit(event)
669                  emit(event)
670                end
671            "#},
672            |tx, out| async move {
673                let mut event = LogEvent::default();
674                event.insert("host", "127.0.0.1");
675                tx.send(event.into()).await.unwrap();
676
677                assert!(out.lock().await.recv().await.is_some());
678                assert!(out.lock().await.recv().await.is_some());
679            },
680        )
681        .await;
682    }
683
684    #[tokio::test]
685    async fn lua_read_empty_field() {
686        run_transform(
687            indoc! {r#"
688            version: "2"
689            hooks:
690              process: |
691                function (event, emit)
692                  if event["log"]["non-existent"] == nil then
693                    event["log"]["result"] = "empty"
694                  else
695                    event["log"]["result"] = "found"
696                  end
697                  emit(event)
698                end
699            "#},
700            |tx, out| async move {
701                let event = LogEvent::default();
702                tx.send(event.into()).await.unwrap();
703
704                assert_eq!(
705                    next_event(&out, "in").await.as_log()["result"],
706                    "empty".into()
707                );
708            },
709        )
710        .await;
711    }
712
713    #[tokio::test]
714    async fn lua_integer_value() {
715        run_transform(
716            indoc! {r#"
717            version: "2"
718            hooks:
719              process: |
720                function (event, emit)
721                  event["log"]["number"] = 3
722                  emit(event)
723                end
724            "#},
725            |tx, out| async move {
726                let event = LogEvent::default();
727                tx.send(event.into()).await.unwrap();
728
729                assert_eq!(
730                    next_event(&out, "in").await.as_log()["number"],
731                    Value::Integer(3)
732                );
733            },
734        )
735        .await;
736    }
737
738    #[tokio::test]
739    async fn lua_numeric_value() {
740        run_transform(
741            indoc! {r#"
742            version: "2"
743            hooks:
744              process: |
745                function (event, emit)
746                  event["log"]["number"] = 3.14159
747                  emit(event)
748                end
749            "#},
750            |tx, out| async move {
751                let event = LogEvent::default();
752                tx.send(event.into()).await.unwrap();
753
754                assert_eq!(
755                    next_event(&out, "in").await.as_log()["number"],
756                    Value::from(3.14159)
757                );
758            },
759        )
760        .await;
761    }
762
763    #[tokio::test]
764    async fn lua_boolean_value() {
765        run_transform(
766            indoc! {r#"
767            version: "2"
768            hooks:
769              process: |
770                function (event, emit)
771                  event["log"]["bool"] = true
772                  emit(event)
773                end
774            "#},
775            |tx, out| async move {
776                let event = LogEvent::default();
777                tx.send(event.into()).await.unwrap();
778
779                assert_eq!(
780                    next_event(&out, "in").await.as_log()["bool"],
781                    Value::Boolean(true)
782                );
783            },
784        )
785        .await;
786    }
787
788    #[tokio::test]
789    async fn lua_non_coercible_value() {
790        run_transform(
791            indoc! {r#"
792            version: "2"
793            hooks:
794              process: |
795                function (event, emit)
796                  event["log"]["junk"] = nil
797                  emit(event)
798                end
799            "#},
800            |tx, out| async move {
801                let event = LogEvent::default();
802                tx.send(event.into()).await.unwrap();
803
804                assert_eq!(next_event(&out, "in").await.as_log().get("junk"), None);
805            },
806        )
807        .await;
808    }
809
810    #[tokio::test]
811    async fn lua_non_string_key_write() -> crate::Result<()> {
812        let mut transform = from_config(indoc! {r#"
813            hooks:
814              process: |
815                function (event, emit)
816                  event["log"][false] = "hello"
817                  emit(event)
818                end
819            "#})
820        .unwrap();
821
822        let err = transform
823            .process_single(LogEvent::default().into())
824            .unwrap_err();
825        let err = format_error(&err);
826        assert!(
827            err.contains("error converting Lua boolean to String"),
828            "{}",
829            err
830        );
831        Ok(())
832    }
833
834    #[tokio::test]
835    async fn lua_non_string_key_read() {
836        run_transform(
837            indoc! {r#"
838            version: "2"
839            hooks:
840              process: |
841                function (event, emit)
842                  event.log.result = event.log[false]
843                  emit(event)
844                end
845            "#},
846            |tx, out| async move {
847                let event = LogEvent::default();
848                tx.send(event.into()).await.unwrap();
849
850                assert_eq!(next_event(&out, "in").await.as_log().get("result"), None);
851            },
852        )
853        .await;
854    }
855
856    #[tokio::test]
857    async fn lua_script_error() -> crate::Result<()> {
858        let mut transform = from_config(indoc! {r#"
859            hooks:
860              process: |
861                function (event, emit)
862                  error("this is an error")
863                end
864            "#})
865        .unwrap();
866
867        let err = transform
868            .process_single(LogEvent::default().into())
869            .unwrap_err();
870        let err = format_error(&err);
871        assert!(err.contains("this is an error"), "{}", err);
872        Ok(())
873    }
874
875    #[tokio::test]
876    async fn lua_syntax_error() -> crate::Result<()> {
877        let err = from_config(indoc! {r#"
878            hooks:
879              process: |
880                function (event, emit)
881                  1234 = sadf <>&*!#@
882                end
883            "#})
884        .map(|_| ())
885        .unwrap_err()
886        .to_string();
887
888        assert!(err.contains("syntax error:"), "{}", err);
889        Ok(())
890    }
891
892    #[tokio::test]
893    async fn lua_load_file() {
894        use std::{fs::File, io::Write};
895
896        let dir = tempfile::tempdir().unwrap();
897        let mut file = File::create(dir.path().join("script2.lua")).unwrap();
898        write!(
899            &mut file,
900            r#"
901            local M = {{}}
902
903            local function modify(event2)
904              event2["log"]["new field"] = "new value"
905            end
906            M.modify = modify
907
908            return M
909            "#
910        )
911        .unwrap();
912
913        run_transform(
914            &indoc::formatdoc! {r#"
915                version: "2"
916                hooks:
917                  process: |
918                    function (event, emit)
919                      local script2 = require("script2")
920                      script2.modify(event)
921                      emit(event)
922                    end
923                search_dirs:
924                  - {dir}
925            "#,
926                dir = dir.path().as_os_str().to_string_lossy(), // recall we also support windows
927            },
928            |tx, out| async move {
929                let event = LogEvent::default();
930                tx.send(event.into()).await.unwrap();
931
932                assert_eq!(
933                    next_event(&out, "in").await.as_log()["\"new field\""],
934                    "new value".into()
935                );
936            },
937        )
938        .await;
939    }
940
941    #[tokio::test]
942    async fn lua_pairs() {
943        run_transform(
944            indoc! {r#"
945            version: "2"
946            hooks:
947              process: |
948                function (event, emit)
949                  for k,v in pairs(event.log) do
950                    event.log[k] = k .. v
951                  end
952                  emit(event)
953                end
954            "#},
955            |tx, out| async move {
956                let mut event = LogEvent::default();
957                event.insert("name", "Bob");
958                event.insert("friend", "Alice");
959                tx.send(event.into()).await.unwrap();
960
961                let output = next_event(&out, "in").await;
962
963                assert_eq!(output.as_log()["name"], "nameBob".into());
964                assert_eq!(output.as_log()["friend"], "friendAlice".into());
965            },
966        )
967        .await;
968    }
969
970    #[tokio::test]
971    async fn lua_metric() {
972        run_transform(
973            indoc! {r#"
974            version: "2"
975            hooks:
976              process: |
977                function (event, emit)
978                  event.metric.counter.value = event.metric.counter.value + 1
979                  emit(event)
980                end
981            "#},
982            |tx, out| async move {
983                let metric = Metric::new(
984                    "example counter",
985                    MetricKind::Absolute,
986                    MetricValue::Counter { value: 1.0 },
987                );
988
989                let mut expected = metric
990                    .clone()
991                    .with_value(MetricValue::Counter { value: 2.0 });
992                let metadata = expected.metadata_mut();
993                metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
994                metadata.set_source_id(Arc::new(ComponentKey::from("in")));
995
996                tx.send(metric.into()).await.unwrap();
997
998                assert_eq!(next_event(&out, "in").await.as_metric(), &expected);
999            },
1000        )
1001        .await;
1002    }
1003
1004    #[tokio::test]
1005    async fn lua_multiple_events() {
1006        run_transform(
1007            indoc! {r#"
1008            version: "2"
1009            hooks:
1010              process: |
1011                function (event, emit)
1012                  event["log"]["hello"] = "goodbye"
1013                  emit(event)
1014                end
1015            "#},
1016            |tx, out| async move {
1017                let n: usize = 10;
1018                let events = (0..n).map(|i| Event::Log(LogEvent::from(format!("program me {i}"))));
1019                for event in events {
1020                    tx.send(event).await.unwrap();
1021                    assert!(out.lock().await.recv().await.is_some());
1022                }
1023            },
1024        )
1025        .await;
1026    }
1027}