Skip to main content

vector/transforms/lua/v1/
mod.rs

1// Derivative's Debug impl generates `let _ = field.fmt(f)` which triggers this lint.
2#![allow(clippy::let_underscore_must_use)]
3
4use std::{future::ready, pin::Pin};
5
6use futures::{Stream, StreamExt, stream};
7use mlua::{ExternalError, FromLua};
8use ordered_float::NotNan;
9use snafu::{ResultExt, Snafu};
10use vector_lib::configurable::configurable_component;
11use vrl::path::parse_target_path;
12
13use crate::{
14    config::{DataType, Input, OutputId, TransformOutput},
15    event::{Event, Value},
16    internal_events::{LuaGcTriggered, LuaScriptError},
17    schema,
18    schema::Definition,
19    transforms::{TaskTransform, Transform},
20};
21
22#[derive(Debug, Snafu)]
23enum BuildError {
24    #[snafu(display("Lua error: {}", source))]
25    InvalidLua { source: mlua::Error },
26}
27
28/// Configuration for version one of the `lua` transform.
29#[configurable_component]
30#[derive(Clone, Debug)]
31#[serde(deny_unknown_fields)]
32pub struct LuaConfig {
33    /// The Lua program to execute for each event.
34    source: String,
35
36    /// A list of directories to search when loading a Lua file via the `require` function.
37    ///
38    /// If not specified, the modules are looked up in the configuration directories.
39    #[serde(default)]
40    search_dirs: Vec<String>,
41}
42
43impl LuaConfig {
44    pub fn build(&self) -> crate::Result<Transform> {
45        warn!(
46            "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
47        );
48        Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
49    }
50
51    pub fn input(&self) -> Input {
52        Input::log()
53    }
54
55    pub fn outputs(
56        &self,
57        input_definitions: &[(OutputId, schema::Definition)],
58    ) -> Vec<TransformOutput> {
59        // Lua causes the type definition to be reset
60        let namespaces = input_definitions
61            .iter()
62            .flat_map(|(_output, definition)| definition.log_namespaces().clone())
63            .collect();
64
65        let definition = input_definitions
66            .iter()
67            .map(|(output, _definition)| {
68                (
69                    output.clone(),
70                    Definition::default_for_namespace(&namespaces),
71                )
72            })
73            .collect();
74
75        vec![TransformOutput::new(DataType::Log, definition)]
76    }
77}
78
79// Lua's garbage collector sometimes seems to be not executed automatically on high event rates,
80// which leads to leak-like RAM consumption pattern. This constant sets the number of invocations of
81// the Lua transform after which GC would be called, thus ensuring that the RAM usage is not too high.
82//
83// This constant is larger than 1 because calling GC is an expensive operation, so doing it
84// after each transform would have significant footprint on the performance.
85const GC_INTERVAL: usize = 16;
86
87#[derive(Derivative)]
88#[derivative(Debug)]
89pub struct Lua {
90    #[derivative(Debug = "ignore")]
91    source: String,
92    #[derivative(Debug = "ignore")]
93    search_dirs: Vec<String>,
94    #[derivative(Debug = "ignore")]
95    lua: mlua::Lua,
96    vector_func: mlua::RegistryKey,
97    invocations_after_gc: usize,
98}
99
100impl Clone for Lua {
101    fn clone(&self) -> Self {
102        Lua::new(self.source.clone(), self.search_dirs.clone())
103            .expect("Tried to clone existing valid lua transform. This is an invariant.")
104    }
105}
106
107// This wrapping structure is added in order to make it possible to have independent implementations
108// of `mlua::UserData` trait for event in version 1 and version 2 of the transform.
109#[derive(Clone, FromLua)]
110struct LuaEvent {
111    inner: Event,
112}
113
114impl Lua {
115    pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
116        // In order to support loading C modules in Lua, we need to create unsafe instance
117        // without debug library.
118        let lua = unsafe {
119            mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
120        };
121
122        let additional_paths = search_dirs
123            .iter()
124            .map(|d| format!("{d}/?.lua"))
125            .collect::<Vec<_>>()
126            .join(";");
127
128        if !additional_paths.is_empty() {
129            let package = lua
130                .globals()
131                .get::<mlua::Table>("package")
132                .context(InvalidLuaSnafu)?;
133            let current_paths = package
134                .get::<String>("path")
135                .unwrap_or_else(|_| ";".to_string());
136            let paths = format!("{additional_paths};{current_paths}");
137            package.set("path", paths).context(InvalidLuaSnafu)?;
138        }
139
140        let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
141        let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
142
143        Ok(Self {
144            source,
145            search_dirs,
146            lua,
147            vector_func,
148            invocations_after_gc: 0,
149        })
150    }
151
152    fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
153        let source_id = event.source_id().cloned();
154        let lua = &self.lua;
155        let globals = lua.globals();
156
157        globals.raw_set("event", LuaEvent { inner: event })?;
158
159        let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
160        func.call::<()>(())?;
161
162        let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
163            option.map(|lua_event| {
164                let mut event = lua_event.inner;
165                if let Some(source_id) = source_id {
166                    event.set_source_id(source_id);
167                }
168                event
169            })
170        });
171
172        self.invocations_after_gc += 1;
173        if self.invocations_after_gc.is_multiple_of(GC_INTERVAL) {
174            emit!(LuaGcTriggered {
175                used_memory: self.lua.used_memory()
176            });
177            self.lua.gc_collect()?;
178            self.invocations_after_gc = 0;
179        }
180
181        result
182    }
183
184    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
185        match self.process(event) {
186            Ok(event) => event,
187            Err(error) => {
188                emit!(LuaScriptError { error });
189                None
190            }
191        }
192    }
193}
194
195impl TaskTransform<Event> for Lua {
196    fn transform(
197        self: Box<Self>,
198        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
199    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
200    where
201        Self: 'static,
202    {
203        let mut inner = self;
204        Box::pin(
205            task.filter_map(move |event| {
206                let mut output = Vec::with_capacity(1);
207                ready(match inner.process(event) {
208                    Ok(event) => {
209                        output.extend(event);
210                        Some(stream::iter(output))
211                    }
212                    Err(error) => {
213                        emit!(LuaScriptError { error });
214                        None
215                    }
216                })
217            })
218            .flatten(),
219        )
220    }
221}
222
223impl mlua::UserData for LuaEvent {
224    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
225        methods.add_meta_method_mut(
226            mlua::MetaMethod::NewIndex,
227            |_lua, this, (key, value): (String, Option<mlua::Value>)| {
228                let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
229                match value {
230                    Some(mlua::Value::String(string)) => {
231                        this.inner.as_mut_log().insert(
232                            &key_path,
233                            Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
234                        );
235                    }
236                    Some(mlua::Value::Integer(integer)) => {
237                        this.inner
238                            .as_mut_log()
239                            .insert(&key_path, Value::Integer(integer));
240                    }
241                    Some(mlua::Value::Number(number)) if !number.is_nan() => {
242                        this.inner
243                            .as_mut_log()
244                            .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
245                    }
246                    Some(mlua::Value::Boolean(boolean)) => {
247                        this.inner
248                            .as_mut_log()
249                            .insert(&key_path, Value::Boolean(boolean));
250                    }
251                    Some(mlua::Value::Nil) | None => {
252                        this.inner.as_mut_log().remove(&key_path);
253                    }
254                    _ => {
255                        info!(
256                            message =
257                                "Could not set field to Lua value of invalid type, dropping field.",
258                            field = key.as_str()
259                        );
260                        this.inner.as_mut_log().remove(&key_path);
261                    }
262                }
263
264                Ok(())
265            },
266        );
267
268        methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
269            if let Some(value) = this
270                .inner
271                .as_log()
272                .parse_path_and_get_value(key.as_str())
273                .ok()
274                .flatten()
275            {
276                let string = lua.create_string(value.coerce_to_bytes())?;
277                Ok(Some(string))
278            } else {
279                Ok(None)
280            }
281        });
282
283        methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
284            let state = lua.create_table()?;
285            {
286                if let Some(keys) = event.inner.as_log().keys() {
287                    let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
288                    state.raw_set("keys", keys)?;
289                }
290                state.raw_set("event", event)?;
291            }
292            let function =
293                lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
294                    let event: LuaEvent = state.raw_get("event")?;
295                    let keys: mlua::Table = state.raw_get("keys")?;
296                    let next: mlua::Function = lua.globals().raw_get("next")?;
297                    let key: Option<String> = next.call((keys, prev))?;
298                    let value = key.clone().and_then(|k| {
299                        event
300                            .inner
301                            .as_log()
302                            .parse_path_and_get_value(k.as_str())
303                            .ok()
304                            .flatten()
305                    });
306                    match value {
307                        Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
308                        None => Ok((None, None)),
309                    }
310                })?;
311            Ok((function, state))
312        });
313    }
314}
315
316pub fn format_error(error: &mlua::Error) -> String {
317    match error {
318        mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
319        err => err.to_string(),
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::sync::Arc;
326
327    use super::*;
328    use crate::{
329        config::ComponentKey,
330        event::{Event, LogEvent, Value},
331        test_util,
332    };
333
334    #[test]
335    fn lua_add_field() {
336        let event = transform_one(
337            r#"
338              event["hello"] = "goodbye"
339            "#,
340            LogEvent::from("program me"),
341        )
342        .unwrap();
343
344        assert_eq!(event.as_log()["hello"], "goodbye".into());
345    }
346
347    #[test]
348    fn lua_read_field() {
349        let event = transform_one(
350            r#"
351              _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
352              event["name"] = name
353            "#,
354            LogEvent::from("Hello, my name is Bob."),
355        )
356        .unwrap();
357
358        assert_eq!(event.as_log()["name"], "Bob".into());
359    }
360
361    #[test]
362    fn lua_remove_field() {
363        let mut log = LogEvent::default();
364        log.insert("name", "Bob");
365        let event = transform_one(
366            r#"
367              event["name"] = nil
368            "#,
369            log,
370        )
371        .unwrap();
372
373        assert!(event.as_log().get("name").is_none());
374    }
375
376    #[test]
377    fn lua_drop_event() {
378        let mut log = LogEvent::default();
379        log.insert("name", "Bob");
380        let event = transform_one(
381            r"
382              event = nil
383            ",
384            log,
385        );
386
387        assert!(event.is_none());
388    }
389
390    #[test]
391    fn lua_read_empty_field() {
392        let event = transform_one(
393            r#"
394              if event["non-existent"] == nil then
395                event["result"] = "empty"
396              else
397                event["result"] = "found"
398              end
399            "#,
400            LogEvent::default(),
401        )
402        .unwrap();
403
404        assert_eq!(event.as_log()["result"], "empty".into());
405    }
406
407    #[test]
408    fn lua_integer_value() {
409        let event = transform_one(
410            r#"
411              event["number"] = 3
412            "#,
413            LogEvent::default(),
414        )
415        .unwrap();
416        assert_eq!(event.as_log()["number"], Value::Integer(3));
417    }
418
419    #[test]
420    fn lua_numeric_value() {
421        let event = transform_one(
422            r#"
423              event["number"] = 3.14159
424            "#,
425            LogEvent::default(),
426        )
427        .unwrap();
428        assert_eq!(event.as_log()["number"], Value::from(3.14159));
429    }
430
431    #[test]
432    fn lua_boolean_value() {
433        let event = transform_one(
434            r#"
435              event["bool"] = true
436            "#,
437            LogEvent::default(),
438        )
439        .unwrap();
440        assert_eq!(event.as_log()["bool"], Value::Boolean(true));
441    }
442
443    #[test]
444    fn lua_non_coercible_value() {
445        let event = transform_one(
446            r#"
447              event["junk"] = {"asdf"}
448            "#,
449            LogEvent::default(),
450        )
451        .unwrap();
452        assert_eq!(event.as_log().get("junk"), None);
453    }
454
455    #[test]
456    fn lua_non_string_key_write() {
457        crate::test_util::trace_init();
458        let mut transform = Lua::new(
459            r#"
460              event[false] = "hello"
461            "#
462            .to_string(),
463            vec![],
464        )
465        .unwrap();
466
467        let err = transform.process(LogEvent::default().into()).unwrap_err();
468        let err = format_error(&err);
469        assert!(
470            err.contains("error converting Lua boolean to String"),
471            "{}",
472            err
473        );
474    }
475
476    #[test]
477    fn lua_non_string_key_read() {
478        crate::test_util::trace_init();
479        let mut transform = Lua::new(
480            r"
481              print(event[false])
482            "
483            .to_string(),
484            vec![],
485        )
486        .unwrap();
487
488        let err = transform.process(LogEvent::default().into()).unwrap_err();
489        let err = format_error(&err);
490        assert!(
491            err.contains("error converting Lua boolean to String"),
492            "{}",
493            err
494        );
495    }
496
497    #[test]
498    fn lua_script_error() {
499        crate::test_util::trace_init();
500        let mut transform = Lua::new(
501            r#"
502              error("this is an error")
503            "#
504            .to_string(),
505            vec![],
506        )
507        .unwrap();
508
509        let err = transform.process(LogEvent::default().into()).unwrap_err();
510        let err = format_error(&err);
511        assert!(err.contains("this is an error"), "{}", err);
512    }
513
514    #[test]
515    fn lua_syntax_error() {
516        crate::test_util::trace_init();
517        let err = Lua::new(
518            r"
519              1234 = sadf <>&*!#@
520            "
521            .to_string(),
522            vec![],
523        )
524        .map(|_| ())
525        .unwrap_err()
526        .to_string();
527
528        assert!(err.contains("syntax error:"), "{}", err);
529    }
530
531    #[test]
532    fn lua_load_file() {
533        use std::{fs::File, io::Write};
534        crate::test_util::trace_init();
535
536        let dir = tempfile::tempdir().unwrap();
537
538        let mut file = File::create(dir.path().join("script2.lua")).unwrap();
539        write!(
540            &mut file,
541            r#"
542              local M = {{}}
543
544              local function modify(event2)
545                event2["\"new field\""] = "new value"
546              end
547              M.modify = modify
548
549              return M
550            "#
551        )
552        .unwrap();
553
554        let source = r#"
555          local script2 = require("script2")
556          script2.modify(event)
557        "#
558        .to_string();
559
560        let mut transform =
561            Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
562        let event = transform.transform_one(LogEvent::default().into()).unwrap();
563        assert_eq!(event.as_log()["\"new field\""], "new value".into());
564    }
565
566    #[test]
567    fn lua_pairs() {
568        let mut event = LogEvent::default();
569        event.insert("name", "Bob");
570        event.insert("friend", "Alice");
571
572        let event = transform_one(
573            r"
574              for k,v in pairs(event) do
575                event[k] = k .. v
576              end
577            ",
578            event,
579        )
580        .unwrap();
581
582        assert_eq!(event.as_log()["name"], "nameBob".into());
583        assert_eq!(event.as_log()["friend"], "friendAlice".into());
584    }
585
586    fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
587        crate::test_util::trace_init();
588
589        let source = source_id();
590        let mut event = event.into();
591        event.set_source_id(Arc::clone(&source));
592
593        let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
594        let event = transform.transform_one(event);
595
596        if let Some(event) = &event {
597            assert_eq!(event.source_id(), Some(&source));
598        }
599
600        event
601    }
602
603    fn source_id() -> Arc<ComponentKey> {
604        Arc::new(ComponentKey::from(test_util::random_string(16)))
605    }
606}