Docs Cloud Redpanda Connect Components Outputs sql_raw sql_raw Type: OutputInputProcessor Available in: Cloud, Self-Managed Executes an arbitrary SQL query for each message. Common Advanced # Common configuration fields, showing default values output: label: "" sql_raw: driver: "" # No default (required) dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required) query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional) args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional) queries: [] # No default (optional) max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: "" # All configuration fields, showing default values output: label: "" sql_raw: driver: "" # No default (required) dsn: "clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60" # No default (required) query: INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?); # No default (optional) unsafe_dynamic_query: false args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional) queries: [] # No default (optional) max_in_flight: 64 init_files: [] # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS some_table ( column1 varchar(50) not null, column2 integer, column3 varchar(50), primary key (column1) ) WITHOUT ROWID; conn_max_idle_time: "" # No default (optional) conn_max_life_time: "" # No default (optional) conn_max_idle: 2 conn_max_open: 0 # No default (optional) batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional) For some scenarios where you might use this output, see Examples. Fields args_mapping An optional Bloblang mapping that includes the same number of values in an array as the placeholder arguments in the query field. Type: string # Examples: args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] args_mapping: root = [ meta("user.id") ] batching Allows you to configure a batching policy. Type: object # Examples: batching: byte_size: 5000 count: 0 period: 1s batching: count: 10 period: 1s batching: check: this.contains("END BATCH") count: 0 period: 1m batching.byte_size An amount of bytes at which the batch should be flushed. If 0 disables size based batching. Type: int Default: 0 batching.check A Bloblang query that should return a boolean value indicating whether a message should end a batch. Type: string Default: "" # Examples: check: this.type == "end_of_transaction" batching.count A number of messages at which the batch should be flushed. If 0 disables count based batching. Type: int Default: 0 batching.period A period in which an incomplete batch should be flushed regardless of its size. Type: string Default: "" # Examples: period: 1s period: 1m period: 500ms batching.processors[] A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. Type: processor # Examples: processors: - archive: format: concatenate - archive: format: lines - archive: format: json_array conn_max_idle An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value ⇐ 0, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release. Type: int Default: 2 conn_max_idle_time An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections idle time. Type: string conn_max_life_time An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value ⇐ 0, connections are not closed due to a connections age. Type: string conn_max_open An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value ⇐ 0, then there is no limit on the number of open connections. The default is 0 (unlimited). Type: int driver A database driver to use. Type: string Options: mysql, postgres, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner dsn A Data Source Name to identify the target database. Type: string # Examples: dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 dsn: foouser:foopassword@tcp(localhost:3306)/foodb dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable dsn: oracle://foouser:foopass@localhost:1521/service_name init_files[] An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star). Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files. If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped. Type: array # Examples: init_files: - ./init/*.sql - ./foo.sql - ./bar.sql init_statement An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement and init_files are specified the init_statement is executed after the init_files. If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped. Type: string # Examples: init_statement: |- CREATE TABLE IF NOT EXISTS some_table ( foo varchar(50) not null, bar integer, baz varchar(50), primary key (foo) ) WITHOUT ROWID; max_in_flight The maximum number of database statements to execute in parallel. Type: int Default: 64 queries[] A list of database statements to run in addition to your main query. If you specify multiple queries, they are executed within a single transaction. For more information, see Examples. Type: object queries[].args_mapping An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query. Type: string # Examples: args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] args_mapping: root = [ meta("user.id") ] queries[].query The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2 and so on). The style to use is outlined in this table: | Driver | Placeholder Style | |---|---| | clickhouse | Dollar sign | | mysql | Question mark | | postgres | Dollar sign | | mssql | Question mark | | sqlite | Question mark | | oracle | Colon | | snowflake | Question mark | | trino | Question mark | | gocosmos | Colon | Type: string query The query to execute. You must include the correct placeholders for the specified database driver. Some drivers use question marks (?), whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2, and so on). Driver Placeholder Style clickhouse Dollar sign ($) gocosmos Colon (:) mysql Question mark (?) mssql Question mark (?) oracle Colon (:) postgres Dollar sign ($) snowflake Question mark (?) spanner Question mark (?) sqlite Question mark (?) trino Question mark (?) Type: string # Examples: query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); unsafe_dynamic_query Whether to enable interpolation functions in the query. Great care should be made to ensure your queries are defended against injection attacks. Type: bool Default: false Examples Table Insert (MySQL) Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata: output: sql_raw: driver: mysql dsn: foouser:foopassword@tcp(localhost:3306)/foodb query: "INSERT INTO footable (id, name, topic) VALUES (?, ?, ?);" args_mapping: | root = [ this.user.id, this.user.name, meta("kafka_topic"), ] Dynamically Creating Tables (PostgreSQL) Here we dynamically create output tables transactionally with inserting a record into the newly created table. output: processors: - mapping: | root = this # Prevent SQL injection when using unsafe_dynamic_query meta table_name = "\"" + metadata("table_name").replace_all("\"", "\"\"") + "\"" sql_raw: driver: postgres dsn: postgres://localhost/postgres unsafe_dynamic_query: true queries: - query: | CREATE TABLE IF NOT EXISTS ${!metadata("table_name")} (id varchar primary key, document jsonb); - query: | INSERT INTO ${!metadata("table_name")} (id, document) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document; args_mapping: | root = [ this.id, this.document.string() ] Back to top × Simple online edits For simple changes, such as fixing a typo, you can edit the content directly on GitHub. Edit on GitHub Or, open an issue to let us know about something that you want us to change. Open an issue Contribution guide For extensive content updates, or if you prefer to work locally, read our contribution guide . Was this helpful? thumb_up thumb_down group Ask in the community mail Share your feedback group_add Make a contribution 🎉 Thanks for your feedback! sql_insert switch