GCP PubSub
Fetch observability events from GCP’s PubSub messaging system
Requirements
The GCP Pub/Sub source requires a Pub/Sub subscription.
Configuration
Example configurations
{
"sources": {
"my_source_id": {
"type": "gcp_pubsub",
"acknowledgements": null,
"credentials_path": "/path/to/credentials.json",
"project": "vector-123456",
"subscription": "vector-123456"
}
}
}
[sources.my_source_id]
type = "gcp_pubsub"
credentials_path = "/path/to/credentials.json"
project = "vector-123456"
subscription = "vector-123456"
---
sources:
my_source_id:
type: gcp_pubsub
acknowledgements: null
credentials_path: /path/to/credentials.json
project: vector-123456
subscription: vector-123456
{
"sources": {
"my_source_id": {
"type": "gcp_pubsub",
"acknowledgements": null,
"ack_deadline_seconds": 600,
"ack_deadline_secs": 600,
"api_key": "${GCP_API_KEY}",
"credentials_path": "/path/to/credentials.json",
"endpoint": "https://pubsub.googleapis.com",
"full_response_size": 100,
"keepalive_secs": 60,
"max_concurrency": 5,
"poll_time_seconds": 2,
"project": "vector-123456",
"retry_delay_seconds": 1,
"retry_delay_secs": 1,
"framing": null,
"proxy": null,
"tls": null,
"subscription": "vector-123456",
"decoding": null
}
}
}
[sources.my_source_id]
type = "gcp_pubsub"
ack_deadline_seconds = 600
ack_deadline_secs = 600
api_key = "${GCP_API_KEY}"
credentials_path = "/path/to/credentials.json"
endpoint = "https://pubsub.googleapis.com"
full_response_size = 100
keepalive_secs = 60
max_concurrency = 5
poll_time_seconds = 2
project = "vector-123456"
retry_delay_seconds = 1
retry_delay_secs = 1
subscription = "vector-123456"
---
sources:
my_source_id:
type: gcp_pubsub
acknowledgements: null
ack_deadline_seconds: 600
ack_deadline_secs: 600
api_key: ${GCP_API_KEY}
credentials_path: /path/to/credentials.json
endpoint: https://pubsub.googleapis.com
full_response_size: 100
keepalive_secs: 60
max_concurrency: 5
poll_time_seconds: 2
project: vector-123456
retry_delay_seconds: 1
retry_delay_secs: 1
framing: null
proxy: null
tls: null
subscription: vector-123456
decoding: null
ack_deadline_seconds
optional uint600
(seconds)ack_deadline_secs
optional uint600
(seconds)acknowledgements
common optional objectacknowledgement
settings. This setting is deprecated in favor of enabling acknowledgements
in the destination sink.acknowledgements.enabled
common optional boolfalse
api_key
optional string literalcredentials_path
must be set.credentials_path
common optional string literalThe filename for a Google Cloud service account credentials JSON file used to authenticate access to the pubsub project and topic. If this is unset, Vector checks the GOOGLE_APPLICATION_CREDENTIALS
environment variable for a filename.
If no filename is named, Vector will attempt to fetch an instance service account for the compute instance the program is running on. If Vector is not running on a GCE instance, you must define a credentials file as above.
decoding
optional objectdecoding.codec
common optional string literal enumOption | Description |
---|---|
bytes | Events containing the byte frame as-is. |
json | Events being parsed from a JSON string. |
native | Events being parsed from Vector’s native protobuf format (EXPERIMENTAL). |
native_json | Events being parsed from Vector’s native JSON format (EXPERIMENTAL). |
syslog | Events being parsed from a Syslog message. |
bytes
endpoint
optional string literalhttps://pubsub.googleapis.com
framing
optional objectframing.character_delimited
required objectcharacter_delimited
framing.method = `character_delimited`
framing.character_delimited.delimiter
required ascii_charframing.character_delimited.max_length
optional uintmax_length
bytes will be discarded entirely.framing.method
common optional string literal enumOption | Description |
---|---|
bytes | Byte frames are passed through as-is according to the underlying I/O boundaries (e.g. split between messages or stream segments). |
character_delimited | Byte frames which are delimited by a chosen character. |
length_delimited | Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length. |
newline_delimited | Byte frames which are delimited by a newline character. |
octet_counting | Byte frames according to the octet counting format. |
bytes
framing.newline_delimited
optional objectnewline_delimited
framing.method = `newline_delimited`
framing.newline_delimited.max_length
optional uintmax_length
bytes will be discarded entirely.framing.octet_counting
optional objectoctet_counting
framing.method = `octet_counting`
framing.octet_counting.max_length
optional uintmax_length
bytes will be discarded entirely.full_response_size
optional uintThe number of messages in a response to mark a stream as "busy".
This is used to determine if more streams should be started.
The GCP Pub/Sub servers send responses with 100 or more messages when
the subscription is busy.
100
keepalive_secs
optional float60
, you may see periodic errors sent from the server.60
max_concurrency
optional uint5
(concurrency)poll_time_seconds
optional float2
(seconds)proxy
optional objectproxy.http
optional string literalproxy.https
optional string literalproxy.no_proxy
optional [string]A list of hosts to avoid proxying. Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.com matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
retry_delay_seconds
optional float1
(seconds)retry_delay_secs
optional float1
(seconds)subscription
required string literaltls
optional objecttls.ca_file
optional string literaltls.crt_file
common optional string literalkey_file
must also be set.tls.key_file
common optional string literalcrt_file
must also be set.tls.key_pass
optional string literalkey_file
is set.tls.verify_certificate
optional booltrue
(the default), Vector will validate the TLS certificate of the remote host. Specifically the issuer is checked but not CRLs (Certificate Revocation Lists).true
tls.verify_hostname
optional booltrue
(the default), Vector will validate the configured remote host name against the remote host’s TLS certificate. Do NOT set this to false
unless you understand the risks of not verifying the remote hostname.true
Environment variables
HTTPS_PROXY
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
HTTP_PROXY
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
NO_PROXY
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
http_proxy
common optional string literalThe global URL to proxy HTTP requests through.
If another HTTP proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
https_proxy
common optional string literalThe global URL to proxy HTTPS requests through.
If another HTTPS proxy is set in the configuration file or at a component level, this one will be overridden.
The lowercase variant has priority over the uppercase one.
no_proxy
common optional string literalList of hosts to avoid proxying globally.
Allowed patterns here include:
Pattern | Example match |
---|---|
Domain names | example.com matches requests to example.com |
Wildcard domains | .example.come matches requests to example.com and its subdomains |
IP addresses | 127.0.0.1 matches requests to 127.0.0.1 |
CIDR blocks | 192.168.0.0./16 matches requests to any IP addresses in this range |
Splat | * matches all hosts |
If another no_proxy
value is set in the configuration file or at a component level, this
one is overridden.
The lowercase variant has priority over the uppercase one.
Outputs
<component_id>
Output Data
Logs
Record
{
"key": "value"
}
53.126.150.246 - - [01/Oct/2020:11:25:58 -0400] "GET /disintermediate HTTP/2.0" 401 20308
2345
gcp_pubsub
2020-10-10T17:07:36.452332Z
Telemetry
Metrics
linkcomponent_received_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_out_total
countercomponent_sent_events_total
instead.component_id
instead. The value is the same as component_id
.lag_time_seconds
histogramcomponent_id
instead. The value is the same as component_id
.How it works
Automatic Concurrency Management
The `gcp_pubsub` source automatically manages the number of concurrent active streams by
monitoring the traffic flowing over the streams.
When a stream receives full responses (as determined by the `full_response_size` setting),
it marks itself as being "busy".
Periodically, the source will poll all the active connections and will start a new stream
if all the active streams are marked as busy and fewer than `max_concurrency` streams are
active.
Conversely, when a stream passes an idle interval (configured by the
`idle_timeout_seconds` setting) with no traffic and no outstanding acknowledgements,
it will drop the connection unless there are no other streams active.
This combination of actions allows this source to respond dynamically to high load levels
without opening up extra connections at startup.
GCP Pub/Sub
gcp_pubsub
source streams messages from GCP Pub/Sub.
This is a highly scalable / durable queueing system with at-least-once queuing semantics.
Messages are received in a stream and are either acknowledged immediately after receiving
or after it has been fully processed by the sink(s), depending on if any of the sink(s)
have the acknowledgements
setting enabled.Transport Layer Security (TLS)
tls.*
options.