Pulsar
Publish observability events to Apache Pulsar topics
Configuration
Example configurations
{
"sinks": {
"my_sink_id": {
"type": "pulsar",
"inputs": [
"my-source-or-transform-id"
],
"endpoint": "pulsar://127.0.0.1:6650",
"topic": "topic-1234"
}
}
}
[sinks.my_sink_id]
type = "pulsar"
inputs = [ "my-source-or-transform-id" ]
endpoint = "pulsar://127.0.0.1:6650"
topic = "topic-1234"
---
sinks:
my_sink_id:
type: pulsar
inputs:
- my-source-or-transform-id
endpoint: pulsar://127.0.0.1:6650
topic: topic-1234
{
"sinks": {
"my_sink_id": {
"type": "pulsar",
"inputs": [
"my-source-or-transform-id"
],
"compression": "none",
"endpoint": "pulsar://127.0.0.1:6650",
"partition_key_field": "message",
"producer_name": "producer-name",
"topic": "topic-1234"
}
}
}
[sinks.my_sink_id]
type = "pulsar"
inputs = [ "my-source-or-transform-id" ]
compression = "none"
endpoint = "pulsar://127.0.0.1:6650"
partition_key_field = "message"
producer_name = "producer-name"
topic = "topic-1234"
---
sinks:
my_sink_id:
type: pulsar
inputs:
- my-source-or-transform-id
compression: none
endpoint: pulsar://127.0.0.1:6650
partition_key_field: message
producer_name: producer-name
topic: topic-1234
acknowledgements
optional objectControls how acknowledgements are handled for this sink.
See End-to-end Acknowledgements for more information on how event acknowledgement is handled.
acknowledgements.enabled
optional boolWhether or not end-to-end acknowledgements are enabled.
When enabled for a sink, any source connected to that sink, where the source supports end-to-end acknowledgements as well, will wait for events to be acknowledged by the sink before acknowledging them at the source.
Enabling or disabling acknowledgements at the sink level takes precedence over any global
acknowledgements
configuration.
auth
optional objectauth.name
optional string literalBasic authentication name/username.
This can be used either for basic authentication (username/password) or JWT authentication.
When used for JWT, the value should be token
.
auth.oauth2
optional objectauth.oauth2.audience
optional string literalauth.oauth2.credentials_url
required string literalThe credentials URL.
A data URL is also supported.
auth.oauth2.issuer_url
required string literalauth.token
optional string literalBasic authentication password/token.
This can be used either for basic authentication (username/password) or JWT authentication. When used for JWT, the value should be the signed JWT, in the compact representation.
buffer
optional objectConfigures the buffering behavior for this sink.
More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.
buffer.max_events
optional uinttype = "memory"
500
buffer.max_size
required uintThe maximum size of the buffer on disk.
Must be at least ~256 megabytes (268435488 bytes).
type = "disk"
buffer.type
optional string literal enumOption | Description |
---|---|
disk | Events are buffered on disk. This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes. Data is synchronized to disk every 500ms. |
memory | Events are buffered in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes. |
memory
buffer.when_full
optional string literal enumOption | Description |
---|---|
block | Wait for free space in the buffer. This applies backpressure up the topology, signalling that sources should slow down the acceptance/consumption of events. This means that while no data is lost, data will pile up at the edge. |
drop_newest | Drops the event instead of waiting for free space in buffer. The event will be intentionally dropped. This mode is typically used when performance is the highest priority, and it is preferable to temporarily lose events rather than cause a slowdown in the acceptance/consumption of events. |
block
compression
optional string literal enumOption | Description |
---|---|
lz4 | LZ4. |
none | No compression. |
snappy | Snappy. |
zlib | Zlib. |
zstd | Zstandard. |
none
encoding
required objectencoding.avro
required objectcodec = "avro"
encoding.avro.schema
required string literalencoding.codec
required string literal enumOption | Description |
---|---|
avro | Encodes an event as an Apache Avro message. |
csv | Encodes an event as a CSV message. This codec must be configured with fields to encode. |
gelf | Encodes an event as a GELF message. |
json | Encodes an event as JSON. |
logfmt | Encodes an event as a logfmt message. |
native | Encodes an event in Vector’s native Protocol Buffers format. This codec is experimental. |
native_json | Encodes an event in Vector’s native JSON format. This codec is experimental. |
raw_message | No encoding. This “encoding” simply uses the Users should take care if they’re modifying their log events (such as by using a |
text | Plain text encoding. This “encoding” simply uses the Users should take care if they’re modifying their log events (such as by using a |
encoding.csv
required objectcodec = "csv"
encoding.csv.fields
required [string]Configures the fields that will be encoded, as well as the order in which they appear in the output.
If a field is not present in the event, the output will be an empty string.
Values of type Array
, Object
, and Regex
are not supported and the
output will be an empty string.
encoding.except_fields
optional [string]encoding.metric_tag_values
optional string literal enumControls how metric tag values are encoded.
When set to single
, only the last non-bare value of tags will be displayed with the
metric. When set to full
, all metric tags will be exposed as separate assignments.
codec = "json" or codec = "text"
Option | Description |
---|---|
full | All tags will be exposed as arrays of either string or null values. |
single | Tag values will be exposed as single strings, the same as they were before this config option. Tags with multiple values will show the last assigned value, and null values will be ignored. |
single
encoding.only_fields
optional [string]encoding.timestamp_format
optional string literal enumOption | Description |
---|---|
rfc3339 | Represent the timestamp as a RFC 3339 timestamp. |
unix | Represent the timestamp as a Unix timestamp. |
endpoint
required string literalThe endpoint to which the Pulsar client should connect to.
The endpoint should specify the pulsar protocol and port.
healthcheck
optional objecthealthcheck.enabled
optional booltrue
inputs
required [string]A list of upstream source or transform IDs.
Wildcards (*
) are supported.
See configuration for more info.
partition_key_field
optional string literalproducer_name
optional string literalTelemetry
Metrics
linkbuffer_byte_size
gaugecomponent_id
instead. The value is the same as component_id
.buffer_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_events
gaugecomponent_id
instead. The value is the same as component_id
.buffer_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_received_events_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.buffer_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.component_discarded_events_total
countercomponent_id
instead. The value is the same as component_id
.component_errors_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramA histogram of the number of events passed in each internal batch in Vector’s internal topology.
Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.
component_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.encode_errors_total
counterevents_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.How it works
Health checks
Require health checks
If you’d like to exit immediately upon a health check failure, you can pass the
--require-healthy
flag:
vector --config /etc/vector/vector.toml --require-healthy
Disable health checks
healthcheck
option to
false
.