Kafka

Publish observability data to Apache Kafka topics

status: stable delivery: at-least-once acknowledgements: yes egress: dynamic state: stateless

Configuration

Example configurations

{
  "sinks": {
    "my_sink_id": {
      "type": "kafka",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "topic": "topic-1234"
    }
  }
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
topic = "topic-1234"
sinks:
  my_sink_id:
    type: kafka
    inputs:
      - my-source-or-transform-id
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    topic: topic-1234
{
  "sinks": {
    "my_sink_id": {
      "type": "kafka",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
      "compression": "none",
      "headers_key": "headers",
      "key_field": "user_id",
      "librdkafka_options": {
        "client.id": "${ENV_VAR}",
        "fetch.error.backoff.ms": "1000",
        "socket.send.buffer.bytes": "100"
      },
      "message_timeout_ms": 300000,
      "socket_timeout_ms": 60000,
      "topic": "topic-1234"
    }
  }
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
compression = "none"
headers_key = "headers"
key_field = "user_id"
message_timeout_ms = 300_000
socket_timeout_ms = 60_000
topic = "topic-1234"

  [sinks.my_sink_id.librdkafka_options]
  "client.id" = "${ENV_VAR}"
  "fetch.error.backoff.ms" = "1000"
  "socket.send.buffer.bytes" = "100"
sinks:
  my_sink_id:
    type: kafka
    inputs:
      - my-source-or-transform-id
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    compression: none
    headers_key: headers
    key_field: user_id
    librdkafka_options:
      client.id: ${ENV_VAR}
      fetch.error.backoff.ms: "1000"
      socket.send.buffer.bytes: "100"
    message_timeout_ms: 300000
    socket_timeout_ms: 60000
    topic: topic-1234

acknowledgements

optional object

Controls how acknowledgements are handled for this sink.

See End-to-end Acknowledgements for more information on how event acknowledgement is handled.

Whether or not end-to-end acknowledgements are enabled.

When enabled for a sink, any source connected to that sink, where the source supports end-to-end acknowledgements as well, waits for events to be acknowledged by all connected sinks before acknowledging them at the source.

Enabling or disabling acknowledgements at the sink level takes precedence over any global acknowledgements configuration.

batch

optional object
Event batching behavior.

batch.max_bytes

optional uint

The maximum size of a batch that is processed by a sink.

This is based on the uncompressed size of the batched events, before they are serialized/compressed.

batch.max_events

optional uint
The maximum size of a batch before it is flushed.

batch.timeout_secs

optional float
The maximum age of a batch before it is flushed.
default: 1 (seconds)

bootstrap_servers

required string literal

A comma-separated list of Kafka bootstrap servers.

These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster, allowing discovery of all the other hosts in the cluster.

Must be in the form of host:port, and comma-separated.

Examples
"10.14.22.123:9092,10.14.23.332:9092"

buffer

optional object

Configures the buffering behavior for this sink.

More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.

buffer.max_events

optional uint
The maximum number of events allowed in the buffer.
Relevant when: type = "memory"
default: 500

buffer.max_size

required uint

The maximum size of the buffer on disk.

Must be at least ~256 megabytes (268435488 bytes).

Relevant when: type = "disk"

buffer.type

optional string literal enum
The type of buffer to use.
Enum options
OptionDescription
disk

Events are buffered on disk.

This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes.

Data is synchronized to disk every 500ms.

memory

Events are buffered in memory.

This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes.

default: memory

buffer.when_full

optional string literal enum
Event handling behavior when a buffer is full.
Enum options
OptionDescription
block

Wait for free space in the buffer.

This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge.

drop_newest

Drops the event instead of waiting for free space in buffer.

The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events.

default: block

compression

optional string literal enum
Supported compression types for Kafka.
Enum options string literal
OptionDescription
gzipGzip.
lz4LZ4.
noneNo compression.
snappySnappy.
zstdZstandard.
default: none

encoding

required object
Configures how events are encoded into raw bytes.

encoding.avro

required object
Apache Avro-specific encoder options.
Relevant when: codec = "avro"
encoding.avro.schema
required string literal
The Avro schema.
Examples
"{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"

encoding.cef

required object
The CEF Serializer Options.
Relevant when: codec = "cef"
encoding.cef.device_event_class_id
required string literal
Unique identifier for each event type. Identifies the type of event reported. The value length must be less than or equal to 1023.
encoding.cef.device_product
required string literal
Identifies the product of a vendor. The part of a unique device identifier. No two products can use the same combination of device vendor and device product. The value length must be less than or equal to 63.
encoding.cef.device_vendor
required string literal
Identifies the vendor of the product. The part of a unique device identifier. No two products can use the same combination of device vendor and device product. The value length must be less than or equal to 63.
encoding.cef.device_version
required string literal
Identifies the version of the problem. In combination with device product and vendor, it composes the unique id of the device that sends messages. The value length must be less than or equal to 31.
The collection of key-value pairs. Keys are the keys of the extensions, and values are paths that point to the extension values of a log event. The event can have any number of key-value pairs in any order.
encoding.cef.extensions.*
required string literal
This is a path that points to the extension value of a log event.
encoding.cef.name
required string literal
This is a path that points to the human-readable description of a log event. The value length must be less than or equal to 512. Equals “cef.name” by default.
encoding.cef.severity
required string literal

This is a path that points to the field of a log event that reflects importance of the event. Reflects importance of the event.

It must point to a number from 0 to 10. 0 = Lowest, 10 = Highest. Equals to “cef.severity” by default.

encoding.cef.version
required string literal enum
CEF Version. Can be either 0 or 1. Equals to “0” by default.
Enum options
OptionDescription
V0CEF specification version 0.1.
V1CEF specification version 1.x.
Examples
"V0"
"V1"

encoding.codec

required string literal enum
The codec to use for encoding events.
Enum options
OptionDescription
avroEncodes an event as an Apache Avro message.
cefEncodes an event as a CEF (Common Event Format) formatted message.
csv

Encodes an event as a CSV message.

This codec must be configured with fields to encode.

gelf

Encodes an event as a GELF message.

This codec is experimental for the following reason:

The GELF specification is more strict than the actual Graylog receiver. Vector’s encoder currently adheres more strictly to the GELF spec, with the exception that some characters such as @ are allowed in field names.

Other GELF codecs such as Loki’s, use a Go SDK that is maintained by Graylog, and is much more relaxed than the GELF spec.

Going forward, Vector will use that Go SDK as the reference implementation, which means the codec may continue to relax the enforcement of specification.

jsonEncodes an event as JSON.
logfmtEncodes an event as a logfmt message.
native

Encodes an event in the native Protocol Buffers format.

This codec is experimental.

native_json

Encodes an event in the native JSON format.

This codec is experimental.

protobufEncodes an event as a Protobuf message.
raw_message

No encoding.

This encoding uses the message field of a log event.

Be careful if you are modifying your log events (for example, by using a remap transform) and removing the message field while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given event.

text

Plain text encoding.

This encoding uses the message field of a log event. For metrics, it uses an encoding that resembles the Prometheus export format.

Be careful if you are modifying your log events (for example, by using a remap transform) and removing the message field while doing additional parsing on it, as this could lead to the encoding emitting empty strings for the given event.

Examples
"avro"
"cef"
"csv"
"gelf"
"json"
"logfmt"
"native"
"native_json"
"protobuf"
"raw_message"
"text"

encoding.csv

required object
The CSV Serializer Options.
Relevant when: codec = "csv"
Set the capacity (in bytes) of the internal buffer used in the CSV writer. This defaults to a reasonable setting.
default: 8192
encoding.csv.delimiter
optional ascii_char
The field delimiter to use when writing CSV.
default: ,

Enable double quote escapes.

This is enabled by default, but it may be disabled. When disabled, quotes in field data are escaped instead of doubled.

default: true
encoding.csv.escape
optional ascii_char

The escape character to use when writing CSV.

In some variants of CSV, quotes are escaped using a special escape character like \ (instead of escaping quotes by doubling them).

To use this, double_quotes needs to be disabled as well otherwise it is ignored.

default: "
encoding.csv.fields
required [string]

Configures the fields that will be encoded, as well as the order in which they appear in the output.

If a field is not present in the event, the output will be an empty string.

Values of type Array, Object, and Regex are not supported and the output will be an empty string.

encoding.csv.quote
optional ascii_char
The quote character to use when writing CSV.
default: "
encoding.csv.quote_style
optional string literal enum
The quoting style to use when writing CSV data.
Enum options
OptionDescription
alwaysAlways puts quotes around every field.
necessaryPuts quotes around fields only when necessary. They are necessary when fields contain a quote, delimiter, or record terminator. Quotes are also necessary when writing an empty record (which is indistinguishable from a record with one empty field).
neverNever writes quotes, even if it produces invalid CSV data.
non_numericPuts quotes around all fields that are non-numeric. Namely, when writing a field that does not parse as a valid float or integer, then quotes are used even if they aren’t strictly necessary.
default: necessary

encoding.except_fields

optional [string]
List of fields that are excluded from the encoded event.

encoding.json

optional object
Options for the JsonSerializer.
Relevant when: codec = "json"
Whether to use pretty JSON formatting.
default: false

encoding.metric_tag_values

optional string literal enum

Controls how metric tag values are encoded.

When set to single, only the last non-bare value of tags are displayed with the metric. When set to full, all metric tags are exposed as separate assignments.

Relevant when: codec = "json" or codec = "text"
Enum options
OptionDescription
fullAll tags are exposed as arrays of either string or null values.
singleTag values are exposed as single strings, the same as they were before this config option. Tags with multiple values show the last assigned value, and null values are ignored.
default: single

encoding.only_fields

optional [string]
List of fields that are included in the encoded event.

encoding.protobuf

required object
Options for the Protobuf serializer.
Relevant when: codec = "protobuf"
encoding.protobuf.desc_file
required string literal

The path to the protobuf descriptor set file.

This file is the output of protoc -o <path> ...

Examples
"/etc/vector/protobuf_descriptor_set.desc"
encoding.protobuf.message_type
required string literal
The name of the message type to use for serializing.
Examples
"package.Message"

encoding.timestamp_format

optional string literal enum
Format used for timestamp fields.
Enum options
OptionDescription
rfc3339Represent the timestamp as a RFC 3339 timestamp.
unixRepresent the timestamp as a Unix timestamp.
unix_floatRepresent the timestamp as a Unix timestamp in floating point.
unix_msRepresent the timestamp as a Unix timestamp in milliseconds.
unix_nsRepresent the timestamp as a Unix timestamp in nanoseconds.
unix_usRepresent the timestamp as a Unix timestamp in microseconds

headers_key

optional string literal

The log field name to use for the Kafka headers.

If omitted, no headers are written.

Examples
"headers"

healthcheck

optional object
Healthcheck configuration.

healthcheck.enabled

optional bool
Whether or not to check the health of the sink when Vector starts up.
default: true

healthcheck_topic

optional string literal

The topic name to use for healthcheck. If omitted, topic is used. This option helps prevent healthcheck warnings when topic is templated.

It is ignored when healthcheck is disabled.

inputs

required [string]

A list of upstream source or transform IDs.

Wildcards (*) are supported.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

key_field

optional string literal

The log field name or tag key to use for the topic key.

If the field does not exist in the log or in the tags, a blank value is used. If unspecified, the key is not sent.

Kafka uses a hash of the key to choose the partition or uses round-robin if the record has no key.

Examples
"user_id"
".my_topic"
"%my_topic"

librdkafka_options

optional object

A map of advanced options to pass directly to the underlying librdkafka client.

For more information on configuration options, see Configuration properties.

librdkafka_options.*

required string literal
A librdkafka configuration option.

message_timeout_ms

optional uint
Local message timeout, in milliseconds.
Examples
150000
450000
default: 300000 (milliseconds)

sasl

optional object
Configuration for SASL authentication when interacting with Kafka.

sasl.enabled

optional bool

Enables SASL authentication.

Only PLAIN- and SCRAM-based mechanisms are supported when configuring SASL authentication using sasl.*. For other mechanisms, librdkafka_options.* must be used directly to configure other librdkafka-specific values. If using sasl.kerberos.* as an example, where * is service.name, principal, kinit.md, etc., then librdkafka_options.* as a result becomes librdkafka_options.sasl.kerberos.service.name, librdkafka_options.sasl.kerberos.principal, etc.

See the librdkafka documentation for details.

SASL authentication is not supported on Windows.

sasl.mechanism

optional string literal
The SASL mechanism to use.
Examples
"SCRAM-SHA-256"
"SCRAM-SHA-512"

sasl.password

optional string literal
The SASL password.
Examples
"password"

sasl.username

optional string literal
The SASL username.
Examples
"username"

socket_timeout_ms

optional uint
Default timeout, in milliseconds, for network requests.
Examples
30000
60000
default: 60000 (milliseconds)

tls

optional object
Configures the TLS options for incoming/outgoing connections.

tls.alpn_protocols

optional [string]

Sets the list of supported ALPN protocols.

Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.

tls.ca_file

optional string literal

Absolute path to an additional CA certificate file.

The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.

Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal

Absolute path to a certificate file used to identify this server.

The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.

If this is set, and is not a PKCS#12 archive, key_file must also be set.

Examples
"/path/to/host_certificate.crt"

tls.enabled

optional bool

Whether or not to require TLS for incoming or outgoing connections.

When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file for more information.

tls.key_file

optional string literal

Absolute path to a private key file used to identify this server.

The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.

Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal

Passphrase used to unlock the encrypted key file.

This has no effect unless key_file is set.

Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"

tls.server_name

optional string literal

Server name to use when using Server Name Indication (SNI).

Only relevant for outgoing connections.

Examples
"www.example.com"

Enables certificate verification. For components that create a server, this requires that the client connections have a valid client certificate. For components that initiate requests, this validates that the upstream has a valid certificate.

If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.

Do NOT set this to false unless you understand the risks of not verifying the validity of certificates.

tls.verify_hostname

optional bool

Enables hostname verification.

If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.

Only relevant for outgoing connections.

Do NOT set this to false unless you understand the risks of not verifying the remote hostname.

topic

required string template
The Kafka topic name to write events to.
Note: This parameter supports Vector's template syntax, which enables you to use dynamic per-event values.
Examples
"topic-1234"
"logs-{{unit}}-%Y-%m-%d"

Telemetry

Metrics

link

buffer_byte_size

gauge
The number of bytes current in the buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_discarded_events_total

counter
The number of events dropped by this non-blocking buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_events

gauge
The number of events currently in the buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_event_bytes_total

counter
The number of bytes received by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_events_total

counter
The number of events received by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_event_bytes_total

counter
The number of bytes sent by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_events_total

counter
The number of events sent by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

component_discarded_events_total

counter
The number of events dropped by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
intentional
True if the events were discarded intentionally, like a filter transform, or false if due to an error.
pid optional
The process ID of the Vector instance.

component_errors_total

counter
The total number of errors encountered by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
error_type
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
stage
The stage within the component at which the error occurred.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_count

histogram

A histogram of the number of events passed in each internal batch in Vector’s internal topology.

Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.

component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_sent_bytes_total

counter
The number of raw bytes sent by this component to destination sinks.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
endpoint optional
The endpoint to which the bytes were sent. For HTTP, this will be the host and path only, excluding the query string.
file optional
The absolute path of the destination file.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
protocol
The protocol used to send the bytes.
region optional
The AWS region name to which the bytes were sent. In some configurations, this may be a literal hostname.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
output optional
The specific output of the component.
pid optional
The process ID of the Vector instance.

kafka_consumed_messages_bytes_total

counter
Total number of message bytes (including framing) received from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_consumed_messages_total

counter
Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_produced_messages_bytes_total

counter
Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_produced_messages_total

counter
Total number of messages transmitted (produced) to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_queue_messages

gauge
Current number of messages in producer queues.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_queue_messages_bytes

gauge
Current total size of messages in producer queues.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_requests_bytes_total

counter
Total number of bytes transmitted to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_requests_total

counter
Total number of requests sent to Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_responses_bytes_total

counter
Total number of bytes received from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

kafka_responses_total

counter
Total number of responses received from Kafka brokers.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

utilization

gauge
A ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Azure Event Hubs

It is possible to use the kafka source and sink with Azure Event Hubs for all tiers other than the Basic tier. More details can be found here. To configure the source and sink to connect to Azure Event Hubs set the following options:

  • bootstrap_servers - <namespace name>.servicebus.windows.net:9093
  • group_id - The consumer group. Note that if the default group ($Default) is used it must be specified as $$Default to escape the $ used for environment variables.
  • topics - The event hub name.
  • sasl.enabled - Set to true.
  • sasl.mechanism - Set to PLAIN.
  • sasl.username - Set to $$ConnectionString (note the double $$).
  • sasl.password - Set to the connection string. See here.
  • tls.enabled - Set to true.
  • tls.ca_file - The certificate authority file.
  • tls.verify_certificate - Set to true.

Buffers and batches

This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured max_bytes or max_events.

Buffers are controlled via the buffer.* options.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you’d like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.yaml --require-healthy

Disable health checks

If you’d like to disable health checks for this sink you can set the healthcheck option to false.

librdkafka

The kafka source and sink use librdkafka under the hood. This is a battle-tested, high performance, and reliable library that facilitates communication with Kafka. As Vector produces static MUSL builds, this dependency is packaged with Vector, meaning you do not need to install it.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols due to OpenSSL’s maturity. You can enable and adjust TLS behavior via the tls.* options and/or via an OpenSSL configuration file. The file location defaults to /usr/local/ssl/openssl.cnf or can be specified with the OPENSSL_CONF environment variable.