Kafka
Publish observability data to Apache Kafka topics
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "kafka",
"inputs": [
"my-source-or-transform-id"
],
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"topic": "topic-1234"
}
}
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
topic = "topic-1234"
---
sinks:
my_sink_id:
type: kafka
inputs:
- my-source-or-transform-id
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
topic: topic-1234
{
"sinks": {
"my_sink_id": {
"type": "kafka",
"inputs": [
"my-source-or-transform-id"
],
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"compression": "none",
"headers_key": "headers",
"key_field": "user_id",
"librdkafka_options": {
"client.id": "${ENV_VAR}",
"fetch.error.backoff.ms": "1000",
"socket.send.buffer.bytes": "100"
},
"message_timeout_ms": 300000,
"socket_timeout_ms": 60000,
"topic": "topic-1234"
}
}
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
compression = "none"
headers_key = "headers"
key_field = "user_id"
message_timeout_ms = 300_000
socket_timeout_ms = 60_000
topic = "topic-1234"
[sinks.my_sink_id.librdkafka_options]
"client.id" = "${ENV_VAR}"
"fetch.error.backoff.ms" = "1000"
"socket.send.buffer.bytes" = "100"
---
sinks:
my_sink_id:
type: kafka
inputs:
- my-source-or-transform-id
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
compression: none
headers_key: headers
key_field: user_id
librdkafka_options:
client.id: ${ENV_VAR}
fetch.error.backoff.ms: "1000"
socket.send.buffer.bytes: "100"
message_timeout_ms: 300000
socket_timeout_ms: 60000
topic: topic-1234
acknowledgements
optional objectControls how acknowledgements are handled for this sink.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolWhether or not end-to-end acknowledgements are enabled.
When enabled for a sink, any source connected to that sink, where the source supports end-to-end acknowledgements as well, waits for events to be acknowledged by the sink before acknowledging them at the source.
Enabling or disabling acknowledgements at the sink level takes precedence over any global
acknowledgements
configuration.
batch
optional objectbatch.max_bytes
optional uintThe maximum size of a batch that is processed by a sink.
This is based on the uncompressed size of the batched events, before they are serialized/compressed.
batch.max_events
optional uintbatch.timeout_secs
optional float1
(seconds)bootstrap_servers
required string literalA comma-separated list of Kafka bootstrap servers.
These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster, allowing discovery of all the other hosts in the cluster.
Must be in the form of host:port
, and comma-separated.
buffer
optional objectConfigures the buffering behavior for this sink.
More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.
buffer.max_events
optional uinttype = "memory"
500
buffer.max_size
required uintThe maximum size of the buffer on disk.
Must be at least ~256 megabytes (268435488 bytes).
type = "disk"
buffer.type
optional string literal enumOption | Description |
---|---|
disk | Events are buffered on disk. This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes. Data is synchronized to disk every 500ms. |
memory | Events are buffered in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. |
drop_newest | Drops the event instead of waiting for free space in buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events. |
block
compression
optional string literal enumOption | Description |
---|---|
gzip | Gzip. |
lz4 | LZ4. |
none | No compression. |
snappy | Snappy. |
zstd | Zstandard. |
none
encoding
required objectencoding.avro
required objectcodec = "avro"
encoding.avro.schema
required string literalencoding.codec
required string literal enumOption | Description |
---|---|
avro | Encodes an event as an Apache Avro message. |
csv | Encodes an event as a CSV message. This codec must be configured with fields to encode. |
gelf | Encodes an event as a GELF message. |
json | Encodes an event as JSON. |
logfmt | Encodes an event as a logfmt message. |
native | Encodes an event in the native Protocol Buffers format. This codec is experimental. |
native_json | Encodes an event in the native JSON format. This codec is experimental. |
raw_message | No encoding. This encoding uses the Be careful if you are modifying your log events (for example, by using a |
text | Plain text encoding. This encoding uses the Be careful if you are modifying your log events (for example, by using a |
encoding.csv
required objectcodec = "csv"
encoding.csv.fields
required [string]Configures the fields that will be encoded, as well as the order in which they appear in the output.
If a field is not present in the event, the output will be an empty string.
Values of type Array
, Object
, and Regex
are not supported and the
output will be an empty string.
encoding.except_fields
optional [string]encoding.metric_tag_values
optional string literal enumControls how metric tag values are encoded.
When set to single
, only the last non-bare value of tags are displayed with the
metric. When set to full
, all metric tags are exposed as separate assignments.
codec = "json" or codec = "text"
Option | Description |
---|---|
full | All tags are exposed as arrays of either string or null values. |
single | Tag values are exposed as single strings, the same as they were before this config option. Tags with multiple values show the last assigned value, and null values are ignored. |
single
encoding.only_fields
optional [string]encoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Represent the timestamp as a RFC 3339 timestamp. |
unix | Represent the timestamp as a Unix timestamp. |
headers_key
optional string literalThe log field name to use for the Kafka headers.
If omitted, no headers are written.
healthcheck
optional objecthealthcheck.enabled
optional booltrue
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
key_field
optional string literalThe log field name or tag key to use for the topic key.
If the field does not exist in the log or in the tags, a blank value is used. If unspecified, the key is not sent.
Kafka uses a hash of the key to choose the partition or uses round-robin if the record has no key.
librdkafka_options
optional objectA map of advanced options to pass directly to the underlying librdkafka
client.
For more information on configuration options, see Configuration properties.
librdkafka_options.*
required string literalmessage_timeout_ms
optional uint300000
(milliseconds)sasl
optional objectsasl.enabled
optional boolEnables SASL authentication.
Only PLAIN
- and SCRAM
-based mechanisms are supported when configuring SASL authentication using sasl.*
. For
other mechanisms, librdkafka_options.*
must be used directly to configure other librdkafka
-specific values.
If using sasl.kerberos.*
as an example, where *
is service.name
, principal
, kinit.md
, etc., then
librdkafka_options.*
as a result becomes librdkafka_options.sasl.kerberos.service.name
,
librdkafka_options.sasl.kerberos.principal
, etc.
See the librdkafka documentation for details.
SASL authentication is not supported on Windows.
sasl.mechanism
optional string literalsocket_timeout_ms
optional uint60000
(milliseconds)tls
optional objecttls.alpn_protocols
optional [string]Sets the list of supported ALPN protocols.
Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order that they are defined.
tls.ca_file
optional string literalAbsolute path to an additional CA certificate file.
The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format.
tls.crt_file
optional string literalAbsolute path to a certificate file used to identify this server.
The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as an inline string in PEM format.
If this is set, and is not a PKCS#12 archive, key_file
must also be set.
tls.enabled
optional boolWhether or not to require TLS for incoming or outgoing connections.
When enabled and used for incoming connections, an identity certificate is also required. See tls.crt_file
for
more information.
tls.key_file
optional string literalAbsolute path to a private key file used to identify this server.
The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format.
tls.key_pass
optional string literalPassphrase used to unlock the encrypted key file.
This has no effect unless key_file
is set.
tls.verify_certificate
optional boolEnables certificate verification.
If enabled, certificates must not be expired and must be issued by a trusted issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and so on until the verification process reaches a root certificate.
Relevant for both incoming and outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the validity of certificates.
tls.verify_hostname
optional boolEnables hostname verification.
If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension.
Only relevant for outgoing connections.
Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.
topic
required string templateTelemetry
Metrics
linkbuffer_byte_size
gaugecomponent_id
instead. The value is the same as component_id
.buffer_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_events
gaugecomponent_id
instead. The value is the same as component_id
.buffer_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_received_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.component_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.component_errors_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.kafka_consumed_messages_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_consumed_messages_total
countercomponent_id
instead. The value is the same as component_id
.kafka_produced_messages_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_produced_messages_total
countercomponent_id
instead. The value is the same as component_id
.kafka_queue_messages
gaugecomponent_id
instead. The value is the same as component_id
.kafka_queue_messages_bytes
gaugecomponent_id
instead. The value is the same as component_id
.kafka_requests_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_requests_total
countercomponent_id
instead. The value is the same as component_id
.kafka_responses_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_responses_total
countercomponent_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.How it works
Azure Event Hubs
It is possible to use the kafka
source and sink with Azure Event Hubs
for all tiers other than the Basic tier. More details
can be found here. To configure the source and
sink to connect to Azure Event Hubs set the following options:
bootstrap_servers
-<namespace name>.servicebus.windows.net:9093
group_id
- The consumer group. Note that if the default group ($Default
) is used it must be specified as$$Default
to escape the$
used for environment variables.topics
- The event hub name.sasl.enabled
- Set totrue
.sasl.mechanism
- Set toPLAIN
.sasl.username
- Set to$$ConnectionString
(note the double$$
).sasl.password
- Set to the connection string. See here.tls.enabled
- Set totrue
.tls.ca_file
- The certificate authority file.tls.verify_certificate
- Set totrue
.
Buffers and batches
This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured
max_bytes
ormax_events
.
Buffers are controlled via the buffer.*
options.
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
healthcheck
option to
false
.librdkafka
kafka
sink uses librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.