sql_raw

Runs an arbitrary SQL query against a database and (optionally) returns the result as an array of objects, one for each row returned.

  • Common

  • Advanced

# Common configuration fields, showing default values
label: ""
sql_raw:
  driver: "" # No default (required)
  dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required)
  query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional)
  args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
  exec_only: false # No default (optional)
  queries: [] # No default (optional)
# All configuration fields, showing default values
label: ""
sql_raw:
  driver: "" # No default (required)
  dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required)
  query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional)
  unsafe_dynamic_query: false
  args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
  exec_only: false # No default (optional)
  queries: [] # No default (optional)
  init_files: [] # No default (optional)
  init_statement: | # No default (optional)
    CREATE TABLE IF NOT EXISTS some_table (
      foo varchar(50) not null,
      bar integer,
      baz varchar(50),
      primary key (foo)
    ) WITHOUT ROWID;
  conn_max_idle_time: "" # No default (optional)
  conn_max_life_time: "" # No default (optional)
  conn_max_idle: 2
  conn_max_open: 0 # No default (optional)

If the query fails to execute then the message will remain unchanged and the error can be caught using error handling methods.

For some scenarios where you might use this processor, see Examples.

Fields

args_mapping

An optional Bloblang mapping that includes the same number of values in an array as the placeholder arguments in the query field.

Type: string

# Examples:
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

conn_max_idle

An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value ⇐ 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.

Type: int

Default: 2

conn_max_idle_time

An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections idle time.

Type: string

conn_max_life_time

An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections age.

Type: string

conn_max_open

An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value ⇐ 0, then there is no limit on the number of open connections. The default is 0 (unlimited).

Type: int

driver

A database driver to use.

Type: string

Options: mysql, postgres, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner

dsn

A Data Source Name to identify the target database.

Type: string

# Examples:
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
dsn: oracle://foouser:foopass@localhost:1521/service_name

exec_only

Whether to discard the query result. Set to true to leave the message contents unchanged, which is useful when you are executing inserts, updates, and so on. By default, the message contents are kept for the last query executed, and previous queries don’t change the results.

Type: bool

init_files[]

An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).

Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files.

If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Type: array

# Examples:
init_files:
  - ./init/*.sql

  - ./foo.sql
  - ./bar.sql

init_statement

An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.

If both init_statement and init_files are specified the init_statement is executed after the init_files.

If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.

Type: string

# Examples:
init_statement: |-

  CREATE TABLE IF NOT EXISTS some_table (
    foo varchar(50) not null,
    bar integer,
    baz varchar(50),
    primary key (foo)
  ) WITHOUT ROWID;

queries[]

A list of database statements to run in addition to your main query. If you specify multiple queries, they are executed within a single transaction. For more information, see Examples.

Type: object

queries[].args_mapping

An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query.

Type: string

# Examples:
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

queries[].exec_only

Whether the query result should be discarded. When set to true the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc. By default this is true for the last query, and previous queries don’t change the results. If set to true for any query but the last one, the subsequent args_mappings input is overwritten.

Type: bool

queries[].query

The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2 and so on). The style to use is outlined in this table:

| Driver | Placeholder Style | |---|---| | clickhouse | Dollar sign | | mysql | Question mark | | postgres | Dollar sign | | mssql | Question mark | | sqlite | Question mark | | oracle | Colon | | snowflake | Question mark | | trino | Question mark | | gocosmos | Colon |

Type: string

query

The query to execute.

You must include the correct placeholders for the specified database driver. Some drivers use question marks (?), whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2, and so on).

Driver Placeholder Style

clickhouse

Dollar sign ($)

gocosmos

Colon (:)

mysql

Question mark (?)

mssql

Question mark (?)

oracle

Colon (:)

postgres

Dollar sign ($)

snowflake

Question mark (?)

spanner

Question mark (?)

sqlite

Question mark (?)

trino

Question mark (?)

Type: string

# Examples:
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);
query: SELECT * FROM footable WHERE user_id = $1;

unsafe_dynamic_query

Whether to enable interpolation functions in the query. Great care should be made to ensure your queries are defended against injection attacks.

Type: bool

Default: false

Examples

Table Insert (MySQL)

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages.

pipeline:
  processors:
    - sql_raw:
        driver: mysql
        dsn: foouser:foopassword@tcp(localhost:3306)/foodb
        query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
        args_mapping: '[ document.foo, document.bar, meta("kafka_topic") ]'
        exec_only: true

Table Query (PostgreSQL)

Here we query a database for columns of footable that share a user_id with the message field user.id. A branch processor is used in order to insert the resulting array into the original message at the path foo_rows.

pipeline:
  processors:
    - branch:
        processors:
          - sql_raw:
              driver: postgres
              dsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disable
              query: "SELECT * FROM footable WHERE user_id = $1;"
              args_mapping: '[ this.user.id ]'
        result_map: 'root.foo_rows = this'

Dynamically Creating Tables (PostgreSQL)

Here we query a database for columns of footable that share a user_id with the message field user.id. A branch processor is used in order to insert the resulting array into the original message at the path foo_rows.

pipeline:
  processors:
    - mapping: |
        root = this
        # Prevent SQL injection when using unsafe_dynamic_query
        meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\""
    - sql_raw:
        driver: postgres
        dsn: postgres://localhost/postgres
        unsafe_dynamic_query: true
        queries:
          - query: |
              CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb);
          - query: |
              INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2)
              ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document;
            args_mapping: |
              root = [ this.id, this.document.string() ]