vector/transforms/lua/v1/
mod.rs1#![allow(clippy::let_underscore_must_use)]
3
4use std::{future::ready, pin::Pin};
5
6use futures::{Stream, StreamExt, stream};
7use mlua::{ExternalError, FromLua};
8use ordered_float::NotNan;
9use snafu::{ResultExt, Snafu};
10use vector_lib::configurable::configurable_component;
11use vrl::path::parse_target_path;
12
13use crate::{
14 config::{DataType, Input, OutputId, TransformOutput},
15 event::{Event, Value},
16 internal_events::{LuaGcTriggered, LuaScriptError},
17 schema,
18 schema::Definition,
19 transforms::{TaskTransform, Transform},
20};
21
22#[derive(Debug, Snafu)]
23enum BuildError {
24 #[snafu(display("Lua error: {}", source))]
25 InvalidLua { source: mlua::Error },
26}
27
28#[configurable_component]
30#[derive(Clone, Debug)]
31#[serde(deny_unknown_fields)]
32pub struct LuaConfig {
33 source: String,
35
36 #[serde(default)]
40 search_dirs: Vec<String>,
41}
42
43impl LuaConfig {
44 pub fn build(&self) -> crate::Result<Transform> {
45 warn!(
46 "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
47 );
48 Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
49 }
50
51 pub fn input(&self) -> Input {
52 Input::log()
53 }
54
55 pub fn outputs(
56 &self,
57 input_definitions: &[(OutputId, schema::Definition)],
58 ) -> Vec<TransformOutput> {
59 let namespaces = input_definitions
61 .iter()
62 .flat_map(|(_output, definition)| definition.log_namespaces().clone())
63 .collect();
64
65 let definition = input_definitions
66 .iter()
67 .map(|(output, _definition)| {
68 (
69 output.clone(),
70 Definition::default_for_namespace(&namespaces),
71 )
72 })
73 .collect();
74
75 vec![TransformOutput::new(DataType::Log, definition)]
76 }
77}
78
79const GC_INTERVAL: usize = 16;
86
87#[derive(Derivative)]
88#[derivative(Debug)]
89pub struct Lua {
90 #[derivative(Debug = "ignore")]
91 source: String,
92 #[derivative(Debug = "ignore")]
93 search_dirs: Vec<String>,
94 #[derivative(Debug = "ignore")]
95 lua: mlua::Lua,
96 vector_func: mlua::RegistryKey,
97 invocations_after_gc: usize,
98}
99
100impl Clone for Lua {
101 fn clone(&self) -> Self {
102 Lua::new(self.source.clone(), self.search_dirs.clone())
103 .expect("Tried to clone existing valid lua transform. This is an invariant.")
104 }
105}
106
107#[derive(Clone, FromLua)]
110struct LuaEvent {
111 inner: Event,
112}
113
114impl Lua {
115 pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
116 let lua = unsafe {
119 mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
120 };
121
122 let additional_paths = search_dirs
123 .iter()
124 .map(|d| format!("{d}/?.lua"))
125 .collect::<Vec<_>>()
126 .join(";");
127
128 if !additional_paths.is_empty() {
129 let package = lua
130 .globals()
131 .get::<mlua::Table>("package")
132 .context(InvalidLuaSnafu)?;
133 let current_paths = package
134 .get::<String>("path")
135 .unwrap_or_else(|_| ";".to_string());
136 let paths = format!("{additional_paths};{current_paths}");
137 package.set("path", paths).context(InvalidLuaSnafu)?;
138 }
139
140 let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
141 let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
142
143 Ok(Self {
144 source,
145 search_dirs,
146 lua,
147 vector_func,
148 invocations_after_gc: 0,
149 })
150 }
151
152 fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
153 let source_id = event.source_id().cloned();
154 let lua = &self.lua;
155 let globals = lua.globals();
156
157 globals.raw_set("event", LuaEvent { inner: event })?;
158
159 let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
160 func.call::<()>(())?;
161
162 let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
163 option.map(|lua_event| {
164 let mut event = lua_event.inner;
165 if let Some(source_id) = source_id {
166 event.set_source_id(source_id);
167 }
168 event
169 })
170 });
171
172 self.invocations_after_gc += 1;
173 if self.invocations_after_gc.is_multiple_of(GC_INTERVAL) {
174 emit!(LuaGcTriggered {
175 used_memory: self.lua.used_memory()
176 });
177 self.lua.gc_collect()?;
178 self.invocations_after_gc = 0;
179 }
180
181 result
182 }
183
184 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
185 match self.process(event) {
186 Ok(event) => event,
187 Err(error) => {
188 emit!(LuaScriptError { error });
189 None
190 }
191 }
192 }
193}
194
195impl TaskTransform<Event> for Lua {
196 fn transform(
197 self: Box<Self>,
198 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
199 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
200 where
201 Self: 'static,
202 {
203 let mut inner = self;
204 Box::pin(
205 task.filter_map(move |event| {
206 let mut output = Vec::with_capacity(1);
207 ready(match inner.process(event) {
208 Ok(event) => {
209 output.extend(event);
210 Some(stream::iter(output))
211 }
212 Err(error) => {
213 emit!(LuaScriptError { error });
214 None
215 }
216 })
217 })
218 .flatten(),
219 )
220 }
221}
222
223impl mlua::UserData for LuaEvent {
224 fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
225 methods.add_meta_method_mut(
226 mlua::MetaMethod::NewIndex,
227 |_lua, this, (key, value): (String, Option<mlua::Value>)| {
228 let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
229 match value {
230 Some(mlua::Value::String(string)) => {
231 this.inner.as_mut_log().insert(
232 &key_path,
233 Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
234 );
235 }
236 Some(mlua::Value::Integer(integer)) => {
237 this.inner
238 .as_mut_log()
239 .insert(&key_path, Value::Integer(integer));
240 }
241 Some(mlua::Value::Number(number)) if !number.is_nan() => {
242 this.inner
243 .as_mut_log()
244 .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
245 }
246 Some(mlua::Value::Boolean(boolean)) => {
247 this.inner
248 .as_mut_log()
249 .insert(&key_path, Value::Boolean(boolean));
250 }
251 Some(mlua::Value::Nil) | None => {
252 this.inner.as_mut_log().remove(&key_path);
253 }
254 _ => {
255 info!(
256 message =
257 "Could not set field to Lua value of invalid type, dropping field.",
258 field = key.as_str()
259 );
260 this.inner.as_mut_log().remove(&key_path);
261 }
262 }
263
264 Ok(())
265 },
266 );
267
268 methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
269 if let Some(value) = this
270 .inner
271 .as_log()
272 .parse_path_and_get_value(key.as_str())
273 .ok()
274 .flatten()
275 {
276 let string = lua.create_string(value.coerce_to_bytes())?;
277 Ok(Some(string))
278 } else {
279 Ok(None)
280 }
281 });
282
283 methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
284 let state = lua.create_table()?;
285 {
286 if let Some(keys) = event.inner.as_log().keys() {
287 let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
288 state.raw_set("keys", keys)?;
289 }
290 state.raw_set("event", event)?;
291 }
292 let function =
293 lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
294 let event: LuaEvent = state.raw_get("event")?;
295 let keys: mlua::Table = state.raw_get("keys")?;
296 let next: mlua::Function = lua.globals().raw_get("next")?;
297 let key: Option<String> = next.call((keys, prev))?;
298 let value = key.clone().and_then(|k| {
299 event
300 .inner
301 .as_log()
302 .parse_path_and_get_value(k.as_str())
303 .ok()
304 .flatten()
305 });
306 match value {
307 Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
308 None => Ok((None, None)),
309 }
310 })?;
311 Ok((function, state))
312 });
313 }
314}
315
316pub fn format_error(error: &mlua::Error) -> String {
317 match error {
318 mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
319 err => err.to_string(),
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::sync::Arc;
326
327 use super::*;
328 use crate::{
329 config::ComponentKey,
330 event::{Event, LogEvent, Value},
331 test_util,
332 };
333
334 #[test]
335 fn lua_add_field() {
336 let event = transform_one(
337 r#"
338 event["hello"] = "goodbye"
339 "#,
340 LogEvent::from("program me"),
341 )
342 .unwrap();
343
344 assert_eq!(event.as_log()["hello"], "goodbye".into());
345 }
346
347 #[test]
348 fn lua_read_field() {
349 let event = transform_one(
350 r#"
351 _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
352 event["name"] = name
353 "#,
354 LogEvent::from("Hello, my name is Bob."),
355 )
356 .unwrap();
357
358 assert_eq!(event.as_log()["name"], "Bob".into());
359 }
360
361 #[test]
362 fn lua_remove_field() {
363 let mut log = LogEvent::default();
364 log.insert("name", "Bob");
365 let event = transform_one(
366 r#"
367 event["name"] = nil
368 "#,
369 log,
370 )
371 .unwrap();
372
373 assert!(event.as_log().get("name").is_none());
374 }
375
376 #[test]
377 fn lua_drop_event() {
378 let mut log = LogEvent::default();
379 log.insert("name", "Bob");
380 let event = transform_one(
381 r"
382 event = nil
383 ",
384 log,
385 );
386
387 assert!(event.is_none());
388 }
389
390 #[test]
391 fn lua_read_empty_field() {
392 let event = transform_one(
393 r#"
394 if event["non-existent"] == nil then
395 event["result"] = "empty"
396 else
397 event["result"] = "found"
398 end
399 "#,
400 LogEvent::default(),
401 )
402 .unwrap();
403
404 assert_eq!(event.as_log()["result"], "empty".into());
405 }
406
407 #[test]
408 fn lua_integer_value() {
409 let event = transform_one(
410 r#"
411 event["number"] = 3
412 "#,
413 LogEvent::default(),
414 )
415 .unwrap();
416 assert_eq!(event.as_log()["number"], Value::Integer(3));
417 }
418
419 #[test]
420 fn lua_numeric_value() {
421 let event = transform_one(
422 r#"
423 event["number"] = 3.14159
424 "#,
425 LogEvent::default(),
426 )
427 .unwrap();
428 assert_eq!(event.as_log()["number"], Value::from(3.14159));
429 }
430
431 #[test]
432 fn lua_boolean_value() {
433 let event = transform_one(
434 r#"
435 event["bool"] = true
436 "#,
437 LogEvent::default(),
438 )
439 .unwrap();
440 assert_eq!(event.as_log()["bool"], Value::Boolean(true));
441 }
442
443 #[test]
444 fn lua_non_coercible_value() {
445 let event = transform_one(
446 r#"
447 event["junk"] = {"asdf"}
448 "#,
449 LogEvent::default(),
450 )
451 .unwrap();
452 assert_eq!(event.as_log().get("junk"), None);
453 }
454
455 #[test]
456 fn lua_non_string_key_write() {
457 crate::test_util::trace_init();
458 let mut transform = Lua::new(
459 r#"
460 event[false] = "hello"
461 "#
462 .to_string(),
463 vec![],
464 )
465 .unwrap();
466
467 let err = transform.process(LogEvent::default().into()).unwrap_err();
468 let err = format_error(&err);
469 assert!(
470 err.contains("error converting Lua boolean to String"),
471 "{}",
472 err
473 );
474 }
475
476 #[test]
477 fn lua_non_string_key_read() {
478 crate::test_util::trace_init();
479 let mut transform = Lua::new(
480 r"
481 print(event[false])
482 "
483 .to_string(),
484 vec![],
485 )
486 .unwrap();
487
488 let err = transform.process(LogEvent::default().into()).unwrap_err();
489 let err = format_error(&err);
490 assert!(
491 err.contains("error converting Lua boolean to String"),
492 "{}",
493 err
494 );
495 }
496
497 #[test]
498 fn lua_script_error() {
499 crate::test_util::trace_init();
500 let mut transform = Lua::new(
501 r#"
502 error("this is an error")
503 "#
504 .to_string(),
505 vec![],
506 )
507 .unwrap();
508
509 let err = transform.process(LogEvent::default().into()).unwrap_err();
510 let err = format_error(&err);
511 assert!(err.contains("this is an error"), "{}", err);
512 }
513
514 #[test]
515 fn lua_syntax_error() {
516 crate::test_util::trace_init();
517 let err = Lua::new(
518 r"
519 1234 = sadf <>&*!#@
520 "
521 .to_string(),
522 vec![],
523 )
524 .map(|_| ())
525 .unwrap_err()
526 .to_string();
527
528 assert!(err.contains("syntax error:"), "{}", err);
529 }
530
531 #[test]
532 fn lua_load_file() {
533 use std::{fs::File, io::Write};
534 crate::test_util::trace_init();
535
536 let dir = tempfile::tempdir().unwrap();
537
538 let mut file = File::create(dir.path().join("script2.lua")).unwrap();
539 write!(
540 &mut file,
541 r#"
542 local M = {{}}
543
544 local function modify(event2)
545 event2["\"new field\""] = "new value"
546 end
547 M.modify = modify
548
549 return M
550 "#
551 )
552 .unwrap();
553
554 let source = r#"
555 local script2 = require("script2")
556 script2.modify(event)
557 "#
558 .to_string();
559
560 let mut transform =
561 Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
562 let event = transform.transform_one(LogEvent::default().into()).unwrap();
563 assert_eq!(event.as_log()["\"new field\""], "new value".into());
564 }
565
566 #[test]
567 fn lua_pairs() {
568 let mut event = LogEvent::default();
569 event.insert("name", "Bob");
570 event.insert("friend", "Alice");
571
572 let event = transform_one(
573 r"
574 for k,v in pairs(event) do
575 event[k] = k .. v
576 end
577 ",
578 event,
579 )
580 .unwrap();
581
582 assert_eq!(event.as_log()["name"], "nameBob".into());
583 assert_eq!(event.as_log()["friend"], "friendAlice".into());
584 }
585
586 fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
587 crate::test_util::trace_init();
588
589 let source = source_id();
590 let mut event = event.into();
591 event.set_source_id(Arc::clone(&source));
592
593 let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
594 let event = transform.transform_one(event);
595
596 if let Some(event) = &event {
597 assert_eq!(event.source_id(), Some(&source));
598 }
599
600 event
601 }
602
603 fn source_id() -> Arc<ComponentKey> {
604 Arc::new(ComponentKey::from(test_util::random_string(16)))
605 }
606}