Kafka
Publish observability event data to Apache Kafka topics
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "kafka",
"inputs": [
"my-source-or-transform-id"
],
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"key_field": "user_id",
"topic": "topic-1234",
"acknowledgements": null,
"batch": null,
"compression": "none",
"encoding": {
"codec": "json"
},
"healthcheck": null
}
}
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
key_field = "user_id"
topic = "topic-1234"
compression = "none"
[sinks.my_sink_id.encoding]
codec = "json"
---
sinks:
my_sink_id:
type: kafka
inputs:
- my-source-or-transform-id
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
key_field: user_id
topic: topic-1234
acknowledgements: null
batch: null
compression: none
encoding:
codec: json
healthcheck: null
{
"sinks": {
"my_sink_id": {
"type": "kafka",
"inputs": [
"my-source-or-transform-id"
],
"bootstrap_servers": "10.14.22.123:9092,10.14.23.332:9092",
"key_field": "user_id",
"librdkafka_options": {
"client.id": "${ENV_VAR}",
"fetch.error.backoff.ms": "1000",
"socket.send.buffer.bytes": "100"
},
"message_timeout_ms": 300000,
"sasl": null,
"socket_timeout_ms": 60000,
"topic": "topic-1234",
"buffer": null,
"acknowledgements": null,
"batch": null,
"compression": "none",
"encoding": {
"codec": "json"
},
"healthcheck": null,
"tls": null,
"headers_key": "headers"
}
}
}
[sinks.my_sink_id]
type = "kafka"
inputs = [ "my-source-or-transform-id" ]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
key_field = "user_id"
message_timeout_ms = 300_000
socket_timeout_ms = 60_000
topic = "topic-1234"
compression = "none"
headers_key = "headers"
[sinks.my_sink_id.librdkafka_options]
"client.id" = "${ENV_VAR}"
"fetch.error.backoff.ms" = "1000"
"socket.send.buffer.bytes" = "100"
[sinks.my_sink_id.encoding]
codec = "json"
---
sinks:
my_sink_id:
type: kafka
inputs:
- my-source-or-transform-id
bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
key_field: user_id
librdkafka_options:
client.id: ${ENV_VAR}
fetch.error.backoff.ms: "1000"
socket.send.buffer.bytes: "100"
message_timeout_ms: 300000
sasl: null
socket_timeout_ms: 60000
topic: topic-1234
buffer: null
acknowledgements: null
batch: null
compression: none
encoding:
codec: json
healthcheck: null
tls: null
headers_key: headers
acknowledgements
common optional objectacknowledgement
settings.acknowledgements.enabled
common optional boolfalse
batch
common optional objectbatch.max_bytes
common optional uintbatch.max_events
common optional uintbatch.timeout_secs
common optional floatbootstrap_servers
required string literalbuffer
optional objectbuffer.max_events
common optional uinttype = "memory"
500
(events)buffer.max_size
required uinttype = "disk"
buffer.type
common optional string literal enumOption | Description |
---|---|
disk | Stores the sink’s buffer on disk. This is less performant, but durable. Data will not be lost between restarts. Will also hold data in memory to enhance performance. WARNING: This may stall the sink if disk performance isn’t on par with the throughput. For comparison, AWS gp2 volumes are usually too slow for common cases. |
memory | Stores the sink’s buffer in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Applies back pressure when the buffer is full. This prevents data loss, but will cause data to pile up on the edge. |
drop_newest | Drops new data as it’s received. This data is lost. This should be used when performance is the highest priority. |
block
compression
common optional string literal enumThe compression strategy used to compress the encoded event data before transmission.
Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.
none
encoding
required objectencoding.codec
required string literal enumOption | Description |
---|---|
json | JSON encoded event. |
text | The message field from the event. |
encoding.except_fields
optional [string]encoding.only_fields
optional [string]encoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Formats as a RFC3339 string |
unix | Formats as a unix timestamp |
rfc3339
headers_key
optional string literalhealthcheck
common optional objecthealthcheck.enabled
common optional booltrue
inputs
required [string]A list of upstream source or transform
IDs. Wildcards (*
) are supported.
See configuration for more info.
key_field
common optional string literalsasl
optional objectsasl.enabled
common optional boolsasl.mechanism
common optional string literalsasl.password
common optional string literalsasl.username
common optional string literaltls
optional objecttls.ca_file
optional string literaltls.crt_file
common optional string literalkey_file
must also be set.tls.key_file
common optional string literalcrt_file
must also be set.tls.key_pass
optional string literalkey_file
is set.topic
required string templateTelemetry
Metrics
linkbuffer_byte_size
gaugecomponent_id
instead. The value is the same as component_id
.buffer_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_events
gaugecomponent_id
instead. The value is the same as component_id
.buffer_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_received_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramcomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_discarded_total
counterevents_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.kafka_consumed_messages_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_consumed_messages_total
countercomponent_id
instead. The value is the same as component_id
.kafka_produced_messages_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_produced_messages_total
countercomponent_id
instead. The value is the same as component_id
.kafka_queue_messages
gaugecomponent_id
instead. The value is the same as component_id
.kafka_queue_messages_bytes
gaugecomponent_id
instead. The value is the same as component_id
.kafka_requests_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_requests_total
countercomponent_id
instead. The value is the same as component_id
.kafka_responses_bytes_total
countercomponent_id
instead. The value is the same as component_id
.kafka_responses_total
countercomponent_id
instead. The value is the same as component_id
.processing_errors_total
countercomponent_errors_total
.component_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.How it works
Buffers and batches
This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.
Batches are flushed when 1 of 2 conditions are met:
- The batch age meets or exceeds the configured
timeout_secs
. - The batch size meets or exceeds the configured
max_bytes
ormax_events
.
Buffers are controlled via the buffer.*
options.
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
healthcheck
option to
false
.librdkafka
kafka
sink uses librdkafka
under the hood. This
is a battle-tested, high performance, and reliable library that facilitates
communication with Kafka. As Vector produces static MUSL builds,
this dependency is packaged with Vector, meaning you do not need to install it.