> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Sink data from RisingWave to MQTT

> This guide describes how to sink data from RisingWave to the MQTT topic using the MQTT sink connector in RisingWave.

The [Message Queuing Telemetry Transport](https://mqtt.org/) (MQTT) protocol is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

<Note>
  MQTT sink is currently in the **[technical preview](/changelog/product-lifecycle#product-release-lifecycle)** stage.
</Note>

## Prerequisites

Before sinking data from RisingWave to an MQTT topic, please ensure the following:

* The RisingWave cluster is running.
* An MQTT broker is running and accessible from your RisingWave cluster.
* Create an MQTT topic that you want to sink data to.
* You have permission to publish data to the MQTT topic.

For example, we have an `iot_sensor_data` table in RisingWave that stores data from various IoT devices at a given timestamp, including temperature and humidity readings, along with a status field indicating whether the device is in a normal or abnormal state. For more information to learn about MQTT and get started with it, refer to the [MQTT guide](https://mqtt.org/getting-started/).

### Syntax

To sink data from RisingWave to an MQTT topic, create a sink using the syntax below:

```sql theme={null}
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='mqtt',
   url = '<your MQTT server>:<port>',
   topic = '<topic>',
   qos =  '<qos_level>',
   type = '<append-only>'
   username = '<your user name>',
   password = '<your password>')
FORMAT PLAIN ENCODE data_encode -- Format options: plain (encode BYTES and JSON) (
    force_append_only='true',
);
```

This query sets up an MQTT sink `mqtt_sink` to forward data from `iot_sensor_data` to an MQTT server. It configures the MQTT connector, server URL, target topic, data type, message retention, quality of service, and JSON encoding.

```sql theme={null}
CREATE SINK mqtt_sink
FROM iot_sensor_data
WITH
(
    connector='mqtt',
    url='tcp://mqtt-server',
    topic= 'sink_iot_data',
    type = 'append-only',
    retain = 'true',
    qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true',
);
```

After the sink is created, you will continuously consume the data in the MQTT topic from RisingWave in append-only mode.

### Parameters

| Field              | Notes                                                                                                                                                                                                                                                                              |
| :----------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| url                | **Required**. The URL of the broker to connect to, e.g., tcp\://localhost. Must be prefixed with `tcp://`, `mqtt://`, `ssl://`, or `mqtts://` to denote the protocol. `mqtts://` and `ssl://` use native certificates if no CA is specified.                                       |
| qos                | **Optional**. The quality of service for publishing messages. Defaults to at\_most\_once. Options include at\_most\_once, at\_least\_once, or exactly\_once.                                                                                                                       |
| username           | **Optional**. Username for the MQTT broker.                                                                                                                                                                                                                                        |
| password           | **Optional**. Password for the MQTT broker.                                                                                                                                                                                                                                        |
| client\_prefix     | **Optional**. Prefix for the MQTT client ID. Defaults to "risingwave".                                                                                                                                                                                                             |
| clean\_start       | **Optional**. Determines if all states from queues are removed when the client disconnects.<ul><li>If true, the broker clears all client states upon disconnect;</li><li>If false, the broker retains the client state and resumes pending operations upon reconnection.</li></ul> |
| inflight\_messages | **Optional**. Maximum number of inflight messages. Defaults to 100.                                                                                                                                                                                                                |
| tls.client\_cert   | **Optional**. Path to the client's certificate file (PEM) or a string with the certificate content. Required for client authentication. Can use `fs://` prefix for file paths.                                                                                                     |
| tls.client\_key    | **Optional**. Path to the client's private key file (PEM) or a string with the private key content. Required for client authentication. Can use `fs://` prefix for file paths.                                                                                                     |
| topic              | **Optional**. The topic name to subscribe or publish to. Can include wildcard topics, e.g., `/topic/#`.                                                                                                                                                                            |
| topic.field        | **Optional**. Enables dynamic writing to multiple topics based on the value of a specified column. If set, the target topic is the value of this column.                                                                                                                           |
| retain             | **Optional**. Whether the message should be retained by the broker.                                                                                                                                                                                                                |
| r#type             | **Required**. Type identifier.                                                                                                                                                                                                                                                     |
