gcp_spanner_cdc

Creates an input that consumes from a spanner change stream.

Introduced in version 4.56.0.

  • Common

  • Advanced

input:
  label: ""
  gcp_spanner_cdc:
    credentials_json: ""

    project_id: "" # No default (required)
    instance_id: "" # No default (required)
    database_id: "" # No default (required)
    stream_id: "" # No default (required)
    start_timestamp: ""

    end_timestamp: ""

    batching:
      count: 0
      byte_size: 0
      period: ""

      check: ""

      processors: [] # No default (optional)
    auto_replay_nacks: true
input:
  label: ""
  gcp_spanner_cdc:
    credentials_json: ""

    project_id: "" # No default (required)
    instance_id: "" # No default (required)
    database_id: "" # No default (required)
    stream_id: "" # No default (required)
    start_timestamp: ""

    end_timestamp: ""

    heartbeat_interval: 10s

    metadata_table: ""

    min_watermark_cache_ttl: 5s

    allowed_mod_types: [] # No default (optional)
    batching:
      count: 0
      byte_size: 0
      period: ""

      check: ""

      processors: [] # No default (optional)
    auto_replay_nacks: true

Consumes change records from a Google Cloud Spanner change stream. This input allows you to track and process database changes in real-time, making it useful for data replication, event-driven architectures, and maintaining derived data stores.

The input reads from a specified change stream within a Spanner database and converts each change record into a message. The message payload contains the change records in JSON format, and metadata is added with details about the Spanner instance, database, and stream.

Change streams provide a way to track mutations to your Spanner database tables. For more information about Spanner change streams, refer to the Google Cloud documentation.

Fields

allowed_mod_types[]

List of modification types to process. If not specified, all modification types are processed. Allowed values: INSERT, UPDATE, DELETE

Type: array

# Examples:
allowed_mod_types:
  - INSERT
  - UPDATE
  - DELETE

auto_replay_nacks

Whether to automatically replay messages that are rejected (nacked) at the output level. If the cause of rejections is persistent, leaving this option enabled can result in back pressure.

Set auto_replay_nacks to false to delete rejected messages. Disabling auto replays can greatly improve memory efficiency of high throughput streams, as the original shape of the data is discarded immediately upon consumption and mutation.

Type: bool

Default: true

batching

Allows you to configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

The maximum total size (in bytes) that a batch can reach before it is passed on for processing or delivery (flushed). When the combined size of all messages in the batch exceeds this limit, the batch is immediately sent to the next stage (such as a processor or output).

Set to 0 to disable size-based batching. When disabled, messages are flushed based on other conditions (such as batching.count or batching.period).

Type: int

Default: 0

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

The number of messages at which the batch should be flushed. Set the value to 0 to disable count-based batching.

Type: int

Default: 0

batching.period

The length of time after which an incomplete batch should be flushed regardless of its size. Supported time units are ns, us, ms, s, m, and h. For example, 1s flushes a batch after one second.

Type: string

Default: ""

# Examples:
period: 1s
period: 1m
period: 500ms

batching.processors[]

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. All resulting messages are flushed as a single batch, so any attempt to split it into smaller batches with these processors will be ignored.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate

  - archive:
      format: lines

  - archive:
      format: json_array

credentials_json

Base64-encoded JSON credentials file for authenticating to GCP with a service account. If not provided, Application Default Credentials (ADC) is used.

For more information about how to create a service account and obtain the credentials JSON, see the Google Cloud documentation.

Type: string

Default: ""

database_id

The ID of the Spanner database to read from. This is the name of the database as it appears in the Spanner console or API.

For more information about how to create a Spanner database, see the Google Cloud documentation.

Type: string

end_timestamp

The timestamp at which to stop reading change records from the change stream. This is an optional field that allows you to limit the range of change records processed by the input.

The timestamp should be in RFC3339 format, such as 2023-10-01T00:00:00Z. If not provided, the input reads all available change records up to the current time.

Type: string

Default: ""

# Examples:
end_timestamp: 2022-01-01T00:00:00Z

heartbeat_interval

The interval at which to send heartbeat messages to the output. Heartbeat messages are sent to indicate that the input is still active and processing changes. This can help prevent timeouts in downstream systems.

Supported time units are ns, us, ms, s, m, and h. For example, 1s sends a heartbeat every second.

Type: string

Default: 10s

instance_id

The ID of the Spanner instance to read from. This is the name of the instance as it appears in the Spanner console or API.

For more information about how to create a Spanner instance, see the Google Cloud documentation.

Type: string

metadata_table

The table to store metadata in (default: cdc_metadata_<stream_id>).

Type: string

Default: ""

min_watermark_cache_ttl

Sets how frequently to query Spanner for the minimum watermark.

Type: string

Default: 5s

project_id

The ID of the GCP project that contains the Spanner instance and database. This is the name of the project as it appears in the GCP console or API.

For more information about how to create a GCP project, see the Google Cloud documentation.

Type: string

start_timestamp

The timestamp at which to start reading change records from the change stream. This is an optional field that allows you to limit the range of change records processed by the input.

The timestamp should be in RFC3339 format, such as 2023-10-01T00:00:00Z (default: current time).

Type: string

Default: ""

# Examples:
start_timestamp: 2022-01-01T00:00:00Z

stream_id

The name of the change stream to track. The stream must exist in the Spanner database. To create a change stream, follow the Google Cloud documentation.

Type: string