cypher

Writes a batch of messages to any graph database that supports the Neo4j and Bolt URI schemes. For each incoming message, the connector can perform operations to store or delete data within the database using the Cypher query language.

Introduced in version 4.37.0.

  • Common

  • Advanced

# Common configuration fields, showing default values
output:
  label: ""
  cypher:
    uri: neo4j://demo.neo4jlabs.com # No default (required)
    cypher: 'MERGE (p:Person {name: $name})' # No default (required)
    database_name: ""
    args_mapping: root.name = this.displayName # No default (optional)
    basic_auth:
      enabled: false
      username: ""
      password: ""
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
    max_in_flight: 64
# All configuration fields, showing default values
output:
  label: ""
  cypher:
    uri: neo4j://demo.neo4jlabs.com # No default (required)
    cypher: 'MERGE (p:Person {name: $name})' # No default (required)
    database_name: ""
    args_mapping: root.name = this.displayName # No default (optional)
    basic_auth:
      enabled: false
      username: ""
      password: ""
      realm: ""
    tls:
      skip_cert_verify: false
      enable_renegotiation: false
      root_cas: ""
      root_cas_file: ""
      client_certs: []
    batching:
      count: 0
      byte_size: 0
      period: ""
      check: ""
      processors: [] # No default (optional)
    max_in_flight: 64

Examples

Write to Neo4j Aura

This is an example of how to write to Neo4j Aura

output:
  cypher:
    uri: neo4j+s://example.databases.neo4j.io
    cypher: |
      MERGE (product:Product {id: $id})
        ON CREATE SET product.name = $product,
                       product.title = $title,
                       product.description = $description,
    args_mapping: |
      root = {}
      root.id = this.product.id
      root.product = this.product.summary.name
      root.title = this.product.summary.displayName
      root.description = this.product.fullDescription
    basic_auth:
      enabled: true
      username: "${NEO4J_USER}"
      password: "${NEO4J_PASSWORD}"

Fields

args_mapping

Mappings from incoming messages to the data, which are passed into the cypher expression as parameters. All mappings must be objects. By default, this field processes the entire payload.

Type: string

# Examples:
args_mapping: root.name = this.displayName
args_mapping: root = {"orgId": this.org.id, "name": this.user.name}

basic_auth

Configure basic authentication for requests to your graphing database.

Type: object

basic_auth.enabled

Whether to use basic authentication in requests.

Type: bool

Default: false

basic_auth.password

The password of the account credentials to authenticate with.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

basic_auth.realm

The realm or process for authentication challenges.

Type: string

Default: ""

basic_auth.username

The username of the account credentials to authenticate as.

Type: string

Default: ""

batching

Configure a batching policy.

Type: object

# Examples:
batching:
  byte_size: 5000
  count: 0
  period: 1s
batching:
  count: 10
  period: 1s
batching:
  check: this.contains("END BATCH")
  count: 0
  period: 1m

batching.byte_size

The amount of bytes at which the batch is flushed. Set to 0 to disable size-based batching.

Type: int

Default: 0

batching.check

A Bloblang query that returns a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples:
check: this.type == "end_of_transaction"

batching.count

The number of messages after which the batch is flushed. Set to 0 to disable count-based batching.

Type: int

Default: 0

batching.period

The period of time after which an incomplete batch is flushed regardless of its size.

Type: string

Default: ""

# Examples:
period: 1s
period: 1m
period: 500ms

batching.processors[]

For aggregating and archiving message batches, you can add a list of processors to apply to a batch as it is flushed. All resulting messages are flushed as a single batch even when you configure processors to split the batch into smaller batches.

Type: processor

# Examples:
processors:
  - archive:
      format: concatenate

  - archive:
      format: lines

  - archive:
      format: json_array

cypher

The cypher expression to execute against the graph database.

Type: string

# Examples:
cypher: MERGE (p:Person {name: $name})
cypher: |-
  MATCH (o:Organization {id: $orgId})
  MATCH (p:Person {name: $name})
  MERGE (p)-[:WORKS_FOR]->(o)

database_name

Set the target database against which expressions are evaluated.

Type: string

Default: ""

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this value to improve throughput.

Type: int

Default: 64

tls

Override system defaults with custom TLS settings.

Type: object

tls.client_certs[]

A list of client certificates to use. For each certificate specify values for either the cert and key fields, or cert_file and key_file fields.

Type: object

Default: []

# Examples:
client_certs:
  - cert: foo
    key: bar

  - cert_file: ./example.pem
    key_file: ./example.key

tls.client_certs[].cert

A plain text certificate to use.

Type: string

Default: ""

tls.client_certs[].cert_file

The path of a certificate to use.

Type: string

Default: ""

tls.client_certs[].key

A plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
password: foo
password: ${KEY_PASSWORD}

tls.enable_renegotiation

Whether to allow the remote server to request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Requires version 3.45.0 or later.

Type: bool

Default: false

tls.root_cas

Specify a certificate authority to use (optional). This is a string that represents a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

tls.root_cas_file

Specify the path to a root certificate authority file (optional). This is a file, often with a .pem extension, which contains a certificate chain from the parent trusted root certificate, through possible intermediate signing certificates, to the host certificate.certificate.

Type: string

Default: ""

# Examples:
root_cas_file: ./root_cas.pem

tls.skip_cert_verify

Whether to skip server-side certificate verification.

Type: bool

Default: false

uri

The connection URI for your graphing database. For more information, see Neo4j’s documentation.

Type: string

# Examples:
uri: neo4j://demo.neo4jlabs.com
uri: neo4j+s://aura.databases.neo4j.io
uri: neo4j+ssc://self-signed.demo.neo4jlabs.com
uri: bolt://127.0.0.1:7687
uri: bolt+s://core.db.server:7687
uri: bolt+ssc://10.0.0.43