Synchronous Responses

In a regular Redpanda Connect pipeline, messages flow in one direction and acknowledgements in the other:

    ----------- Message ------------->

Input (AMQP) -> Processors -> Output (AMQP)

    <------- Acknowledgement ---------

However, Redpanda Connect supports bidirectional protocols like HTTP and WebSocket, which allow responses to be returned directly from the pipeline.

For example, HTTP is a request/response protocol, and inputs like http_server (Self-Managed) or gateway (Redpanda Cloud) support returning response payloads to the requester.

           --------- Request Body -------->

Input (HTTP) -> Processors -> Output (Sync Response)

           <--- Response Body (and ack) ---

Routing processed messages back

To return a processed response, use the sync_response output.

Use the http_server input in Self-Managed deployments:

input:
  http_server:
    path: /weather

pipeline:
  processors:
    - mapping: |
        root = {
          city: json("location"),
          forecast: "Clear skies with light winds",
          temperature_c: 22
        }

output:
  sync_response: {}

Sending this request:

{ "location": "Berlin" }

Returns:

{
  "city": "Berlin",
  "forecast": "Clear skies with light winds",
  "temperature_c": 22
}

Combine with other outputs

You can route processed messages to storage and return a response using a broker output.

input:
  http_server:
    path: /weather

output:
  broker:
    pattern: fan_out
    outputs:
      - kafka:
          addresses: [localhost:9092]
          topic: weather.requests
      - sync_response:
          processors:
            - mapping: |
                root = {
                  status: "received",
                  received_at: now()
                }

Returning partially processed messages

You can return a response before the message is fully processed by using the sync_response processor.

This allows continued processing after the response is set.

pipeline:
  processors:
    - mapping: root = "Received weather report for %s".format(json("location"))
    - sync_response: {}
    - mapping: root.reported_at = now()

This returns "Received weather report for Berlin" to the client, but continues modifying the message before storing or forwarding it.

Due to delivery guarantees, the response is not sent until all downstream processing and acknowledgements are complete.

Routing output responses back

Some outputs, such as http_client, support returning a downstream response back to the input.

input:
  http_server:
    path: /proxy-weather

output:
  http_client:
    url: https://api.example.com/forecast
    verb: POST
    propagate_response: true

This forwards the request to an external weather API and returns its response to the original requester.

You can combine this with other outputs:

input:
  http_server:
    path: /proxy-weather

output:
  broker:
    pattern: fan_out
    outputs:
      - kafka:
          addresses: [localhost:9092]
          topic: weather.proxy
      - http_client:
          url: https://api.example.com/forecast
          verb: POST
          propagate_response: true