Initial support for routing failed events released
Routing failed events from the remap
transform
We’ve released a new feature that enables users to route failed events through separate pipelines.
Previously, when Vector encountered an event that failed in a given transform component, the event was either dropped or was forwarded to the next step of the process.
With this new release, you can configure Vector to send events that fail to
process down a different pipeline, without any manual workaround, to catch
errors, tag, fanout, or filter. In other words, users can now handle failed
events in a way the user sees fit, such as routing the failed events to another
sink for storage, inspection, and replay. We are piloting this feature with the
remap
transform but plan to roll this out to other transforms in the future so
keep your eyes peeled for announcements down the line. Note: unit tests do not
currently support assertions on failed events but this is also in the works.
For the remap
transform, there is now a new .dropped
output that can be used
to catch and route events that would have otherwise been dropped. To use this,
you need to configure drop_on_error
to true
and reroute_dropped
to true
.
The latter lets you opt into this new feature. Once enabled, you can use the
component id of the remap
transform suffixed with .dropped
as an input to
another component to handle failed events differently.
See the below example for how this works.
Let’s start with this configuration:
[sources.in]
type = "demo_logs"
format = "shuffle"
interval = 1.0
lines = [
'{ "message": "valid message", "foo": "bar"}',
'{ "message": "valid message", "foo": "baz"}',
'invalid message',
]
[transforms.my_remap]
type = "remap"
inputs = ["in"]
drop_on_error = true
drop_on_abort = true
reroute_dropped = true
source = """
. |= object!(parse_json!(.message))
if .foo == "baz" {
abort
}
.processed = true
"""
[sinks.foo]
type = "console"
inputs = ["my_remap"]
encoding.codec = "json"
[sinks.bar]
type = "console"
inputs = ["my_remap.dropped"] # note the new `.dropped` here!
encoding.codec = "json"
If run, this would emit the following output:
{
"foo": "bar",
"message": "valid message",
"processed": true,
"timestamp": "2021-11-09T16:11:47.330713806Z"
}
{
"foo": "bar",
"message": "valid message",
"processed": true,
"timestamp": "2021-11-09T16:11:48.330756592Z"
}
{
"message": "invalid message",
"metadata": {
"dropped": {
"component_id": "my_remap",
"component_type": "remap",
"component_kind": "transform",
"message": "function call error for \"object\" at (9:39): function call error for \"parse_json\" at (17:38): unable to parse json: expected value at line 1 column 1",
"reason": "error"
}
},
"timestamp": "2021-11-09T16:11:49.330157298Z"
}
{
"message": "{ \"message\": \"valid message\", \"foo\": \"baz\"}",
"metadata": {
"dropped": {
"component_id": "my_remap",
"component_type": "remap",
"component_kind": "transform",
"message": "aborted",
"reason": "abort"
}
},
"timestamp": "2021-11-09T16:11:50.329966720Z"
}
Events that either caused an error or were aborted are written out by the bar
console sink with the relevant metadata added; on the other hand, events that
were valid JSON were processed and output by the foo
console sink. More
information is available on the remap docs page. As a side note, the
metadata
key (above under the "invalid message"
) is configurable via
log_schema.metadata_key
.
We will be continuing to expand support for routing failed events from other
transforms like filter
. In the meantime, if you any feedback for us, let us
know on our Discord chat or on Twitter!