Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(lua transform): Emit events with the source_id set #17870

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
use std::{
collections::HashSet,
env,
fs::File,
io::Write,
path::{Path, PathBuf},
process::Command,
};
use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command};

struct TrackedEnv {
tracked: HashSet<String>,
Expand Down
12 changes: 11 additions & 1 deletion src/transforms/lua/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
pub mod v1;
pub mod v2;

use std::sync::{Arc, OnceLock};

use vector_config::configurable_component;
use vector_core::config::LogNamespace;

use crate::{
config::{GenerateConfig, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
config::{
ComponentKey, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
TransformOutput,
},
schema,
transforms::Transform,
};

pub(self) fn global_source_id() -> Arc<ComponentKey> {
static GLOBAL: OnceLock<Arc<ComponentKey>> = OnceLock::new();
Arc::clone(GLOBAL.get_or_init(|| Arc::new(ComponentKey::from("lua"))))
}

/// Marker type for the version one of the configuration for the `lua` transform.
#[configurable_component]
#[derive(Clone, Debug)]
Expand Down
11 changes: 10 additions & 1 deletion src/transforms/lua/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl Lua {
}

fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
let source_id = event.source_id().cloned();
let lua = &self.lua;
let globals = lua.globals();

Expand All @@ -156,7 +157,15 @@ impl Lua {

let result = globals
.raw_get::<_, Option<LuaEvent>>("event")
.map(|option| option.map(|lua_event| lua_event.inner));
.map(|option| {
option.map(|lua_event| {
let mut event = lua_event.inner;
if let Some(source_id) = source_id {
event.set_source_id(source_id);
}
event
})
});

self.invocations_after_gc += 1;
if self.invocations_after_gc % GC_INTERVAL == 0 {
Expand Down
24 changes: 16 additions & 8 deletions src/transforms/lua/v2/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{path::PathBuf, time::Duration};
use std::{path::PathBuf, sync::Arc, time::Duration};

use codecs::MetricTagValues;
use serde_with::serde_as;
Expand All @@ -7,7 +7,8 @@ use vector_config::configurable_component;
pub use vector_core::event::lua;
use vector_core::transform::runtime_transform::{RuntimeTransform, Timer};

use crate::config::OutputId;
use super::global_source_id;
use crate::config::{ComponentKey, OutputId};
use crate::event::lua::event::LuaEvent;
use crate::schema::Definition;
use crate::{
Expand Down Expand Up @@ -306,9 +307,13 @@ impl Lua {

#[cfg(test)]
fn process(&mut self, event: Event, output: &mut Vec<Event>) -> Result<(), mlua::Error> {
let source_id = event.source_id().cloned();
let lua = &self.lua;
let result = lua.scope(|scope| {
let emit = scope.create_function_mut(|_, event: Event| {
let emit = scope.create_function_mut(|_, mut event: Event| {
if let Some(source_id) = &source_id {
event.set_source_id(Arc::clone(source_id));
}
output.push(event);
Ok(())
})?;
Expand Down Expand Up @@ -355,11 +360,13 @@ impl Lua {
fn wrap_emit_fn<'lua, 'scope, F: 'scope>(
scope: &mlua::Scope<'lua, 'scope>,
mut emit_fn: F,
source_id: Arc<ComponentKey>,
) -> mlua::Result<mlua::Function<'lua>>
where
F: FnMut(Event),
{
scope.create_function_mut(move |_, event: Event| -> mlua::Result<()> {
scope.create_function_mut(move |_, mut event: Event| -> mlua::Result<()> {
event.set_source_id(Arc::clone(&source_id));
emit_fn(event);
Ok(())
})
Expand All @@ -371,6 +378,7 @@ impl RuntimeTransform for Lua {
F: FnMut(Event),
{
let lua = &self.lua;
let source_id = event.source_id().map_or_else(global_source_id, Arc::clone);
_ = lua
.scope(|scope| -> mlua::Result<()> {
lua.registry_value::<mlua::Function>(&self.hook_process)?
Expand All @@ -379,7 +387,7 @@ impl RuntimeTransform for Lua {
event,
metric_multi_value_tags: self.multi_value_tags,
},
wrap_emit_fn(scope, emit_fn)?,
wrap_emit_fn(scope, emit_fn, source_id)?,
))
})
.context(RuntimeErrorHooksProcessSnafu)
Expand All @@ -398,7 +406,7 @@ impl RuntimeTransform for Lua {
match &self.hook_init {
Some(key) => lua
.registry_value::<mlua::Function>(key)?
.call(wrap_emit_fn(scope, emit_fn)?),
.call(wrap_emit_fn(scope, emit_fn, global_source_id())?),
None => Ok(()),
}
})
Expand All @@ -418,7 +426,7 @@ impl RuntimeTransform for Lua {
match &self.hook_shutdown {
Some(key) => lua
.registry_value::<mlua::Function>(key)?
.call(wrap_emit_fn(scope, emit_fn)?),
.call(wrap_emit_fn(scope, emit_fn, global_source_id())?),
None => Ok(()),
}
})
Expand All @@ -437,7 +445,7 @@ impl RuntimeTransform for Lua {
.scope(|scope| -> mlua::Result<()> {
let handler_key = &self.timers[timer.id as usize].1;
lua.registry_value::<mlua::Function>(handler_key)?
.call(wrap_emit_fn(scope, emit_fn)?)
.call(wrap_emit_fn(scope, emit_fn, global_source_id())?)
})
.context(RuntimeErrorTimerHandlerSnafu)
.map_err(|error| error!(%error, rate_limit = 30));
Expand Down