Parsing CSV logs with Lua
Parse structured application logs in CSV format using Lua transform
- You understand the basic Lua concepts.
- You understand the basic Vector concepts and understand how to set up a pipeline
Vector has many built-in parsers for structured logs formats. However, when you need to ship logs in a custom or application-specific format, programmable transforms have got you covered.
This guide walks through reading CSV logs using file
source and parsing them using the lua
transform with a loadable Lua module.
Getting Started
For certainty, it is assumed in the following that the logs to be read are produced by
csvlog
in PostgreSQL. For example, there might be the following
log file:
2020-04-09 12:48:49.661 UTC,,,1,,localhost.1,1,,2020-04-09 12:48:49 UTC,,0,LOG,00000,"ending log output to stderr",,"Future log output will go to log destination ""csvlog"".",,,,,,,""
2020-04-09 12:48:49.669 UTC,,,27,,localhost.1b,1,,2020-04-09 12:48:49 UTC,,0,LOG,00000,"database system was shut down at 2020-04-09 12:48:25 UTC",,,,,,,,,""
2020-04-09 12:48:49.683 UTC,,,1,,localhost.1,2,,2020-04-09 12:48:49 UTC,,0,LOG,00000,"database system is ready to accept connections",,,,,,,,,""
Let us draft an initial version of the Vector’s configuration file:
data_dir = "."
[sources.file]
type = "file"
include = ["*.csv"]
start_at_beginning = true
[transforms.lua]
inputs = ["file"]
type = "lua"
version = "2"
hooks.process = """
function (event, emit)
-- to be expanded
emit(event)
end
"""
[sinks.console]
inputs = ["lua"]
type = "console"
encoding.codec = "json"
This config sets up a pipeline that reads log files, pipes them through the parsing
transform (which currently is configured to just pass the events through), and displays the produced log events using
console
sink.
At this point, running vector --config vector.toml
results in the following output:
{"file":"log.csv","host":"localhost","message":"2020-04-09 12:48:49.661 UTC,,,1,,localhost.1,1,,2020-04-09 12:48:49 UTC,,0,LOG,00000,\"ending log output to stderr\",,\"Future log output will go to log destination \"\"csvlog\"\".\",,,,,,,\"\"","timestamp":"2020-04-09T14:33:28Z"}
{"file":"log.csv","host":"localhost","message":"2020-04-09 12:48:49.669 UTC,,,27,,localhost.1b,1,,2020-04-09 12:48:49 UTC,,0,LOG,00000,\"database system was shut down at 2020-04-09 12:48:25 UTC\",,,,,,,,,\"\"","timestamp":"2020-04-09T14:33:28Z"}
{"file":"log.csv","host":"localhost","message":"2020-04-09 12:48:49.683 UTC,,,1,,localhost.1,2,,2020-04-09 12:48:49 UTC,,0,LOG,00000,\"database system is ready to accept connections\",,,,,,,,,\"\"","timestamp":"2020-04-09T14:33:28Z"}
Adding the CSV Module
In order to perform actual parsing, it is possible to leverage lua-csv
.
Because it consists of a single file, it is possible to just download it to the same
directory where vector.toml
is stored:
curl -o csv.lua https://raw.githubusercontent.com/geoffleyland/lua-csv/d20cd42d61dc52e7f6bcb13b596ac7a7d4282fbf/lua/csv.lua
Then it would be possible to load it by calling require
Lua function in the
source
configuration section:
source = """
csv = require("csv")
"""
With this source
the csv
module is loaded when Vector is started up (or if the lua
transform is added later and the
config is automatically reloaded) and can be used through the global variable csv
.
Implementing Custom Parsing
With the csv
module, the hooks.process
can be changed to the following:
hooks.process = """
function (event, emit)
fields = csv.openstring(event.log.message):lines()() -- parse the `message` field
event.log.message = nil -- drop the `message` field
column_names = { -- a sequence containing CSV column names
-- ...
}
for column, value in ipairs(fields) do -- iterate over CSV columns
column_name = column_names[column] -- get column name
event.log[column_name] = value -- set the corresponding field in the event
end
emit(event) -- emit the transformed event
end
"""
Note that the column_names
can be created just once, in the source
section instead to speed up processing.
Putting it there and using the column names from the PostgreSQL documentation results in the following definition of
the whole transform:
# ...
[transforms.lua]
inputs = ["file"]
type = "lua"
version = "2"
source = """
csv = require("csv") -- load external module for parsing CSV
column_names = { -- a sequence containing CSV column names
"log_time",
"user_name",
"database_name",
"process_id",
"connection_from",
"session_id",
"session_line_num",
"command_tag",
"session_start_time",
"virtual_transaction_id",
"transaction_id",
"error_severity",
"sql_state_code",
"message",
"detail",
"hint",
"internal_query",
"internal_query_pos",
"context",
"query",
"query_pos",
"location",
"application_name",
-- available only in postgres > 13, to remove for postgres <= 13
"backend_type",
"leader_pid",
"query_id"
}
"""
hooks.process = """
function (event, emit)
fields = csv.openstring(event.log.message):lines()() -- parse the `message` field
event.log.message = nil -- drop the `message` field
for column, column_name in ipairs(column_names) do -- iterate over column names
value = fields[column] -- get field value
event.log[column_name] = value -- set the corresponding field in the event
end
emit(event) -- emit the transformed event
end
"""
#...
Trying to run vector --config vector.toml
with the same input file results in structured events being output:
{"application_name":"","backend_type":"not initialized","command_tag":"","connection_from":"","context":"","database_name":"","detail":"","error_severity":"LOG","file":"log.csv","hint":"Future log output will go to log destination \"csvlog\".","host":"localhost","internal_query":"","internal_query_pos":"","leader_pid":"","location":"","log_time":"2020-04-09 12:48:49.661 UTC","message":"ending log output to stderr","process_id":"1","query":"","query_id":"0","query_pos":"","session_id":"localhost.1","session_line_num":"1","session_start_time":"2020-04-09 12:48:49 UTC","sql_state_code":"00000","timestamp":"2020-04-09T19:49:07Z","transaction_id":"0","user_name":"","virtual_transaction_id":""}
{"application_name":"","backend_type":"not initialized","command_tag":"","connection_from":"","context":"","database_name":"","detail":"","error_severity":"LOG","file":"log.csv","hint":"","host":"localhost","internal_query":"","internal_query_pos":"","leader_pid":"","location":"","log_time":"2020-04-09 12:48:49.669 UTC","message":"database system was shut down at 2020-04-09 12:48:25 UTC","process_id":"27","query":"","query_id":"0","query_pos":"","session_id":"localhost.1b","session_line_num":"1","session_start_time":"2020-04-09 12:48:49 UTC","sql_state_code":"00000","timestamp":"2020-04-09T19:49:07Z","transaction_id":"0","user_name":"","virtual_transaction_id":""}
{"application_name":"","backend_type":"not initialized","command_tag":"","connection_from":"","context":"","database_name":"","detail":"","error_severity":"LOG","file":"log.csv","hint":"","host":"localhost","internal_query":"","internal_query_pos":"","leader_pid":"","location":"","log_time":"2020-04-09 12:48:49.683 UTC","message":"database system is ready to accept connections","process_id":"1","query":"","query_id":"0","query_pos":"","session_id":"localhost.1","session_line_num":"2","session_start_time":"2020-04-09 12:48:49 UTC","sql_state_code":"00000","timestamp":"2020-04-09T19:49:07Z","transaction_id":"0","user_name":"","virtual_transaction_id":""}
Or, applying pretty formatting to one of the output events:
{
"application_name": "",
"backend_type":"not initialized",
"command_tag": "",
"connection_from": "",
"context": "",
"database_name": "",
"detail": "",
"error_severity": "LOG",
"file": "log.csv",
"hint": "Future log output will go to log destination \"csvlog\".",
"host": "localhost",
"internal_query": "",
"internal_query_pos": "",
"leader_pid":"",
"location": "",
"log_time": "2020-04-09 12:48:49.661 UTC",
"message": "ending log output to stderr",
"process_id": "1",
"query": "",
"query_id":"0",
"query_pos": "",
"session_id": "localhost.1",
"session_line_num": "1",
"session_start_time": "2020-04-09 12:48:49 UTC",
"sql_state_code": "00000",
"timestamp": "2020-04-09T19:49:07Z",
"transaction_id": "0",
"user_name": "",
"virtual_transaction_id": ""
}
Further Improvements
After the task of parsing the CSV logs is accomplished, the following improvements can take place.
Support for Multi-line Strings
CSV supports line breaks in strings. However, by default file
source creates a separate event from each line.
There are two options to deal with this:
- For simple cases it might be possible to use the
multiline
configuration option in thefile
source. - For more complex cases the messages from multiple events can be conditionally concatenated in the Lua code. See the aggregations guide for more details on this.
Change Fields Types
By default, all columns are parsed as strings. It is possible to convert them to other
data types right in the Lua code using
built-in functions, such as tonumber
. Alternatively, it is possible to add the
coercer
transform after the lua
transform, for example, to
[parse timestamps][docs.transforms.coercer#timestamps].