Prerequisites
Before ingesting data from MQTT into RisingWave, please ensure the following:- The MQTT server is running and accessible from your RisingWave cluster.
- If authentication is required for the MQTT broker, make sure you have the client username and password. The client user must have the
subscribe
permission for the topic. - Create the MQTT topic from which you want to ingest data.
- Ensure that your RisingWave cluster is running.
iot_data
with the data from various IoT devices at a given timestamp, including temperature, humidity readings, and a status field indicating whether the device is in a normal or abnormal state. For more information to learn about MQTT and get started with it, refer to the MQTT guide.
Ingest data into RisingWave
When creating a source, we have the option to either persist the data in RisingWave usingCREATE TABLE
or use CREATE SOURCE
while specifying the connection settings and data format.
Syntax
Parameters
Field | Notes |
---|---|
url | Required. The URL of the broker to connect to, e.g., tcp://localhost. Must be prefixed with tcp:// , mqtt:// , ssl:// , or mqtts:// to denote the protocol. mqtts:// and ssl:// use native certificates if no CA is specified. |
qos | Optional. The quality of service for publishing messages. Defaults to at_most_once. Options include at_most_once , at_least_once , or exactly_once . |
username | Optional. Username for the MQTT broker. |
password | Optional. Password for the MQTT broker. |
client_prefix | Optional. Prefix for the MQTT client ID. Defaults to “risingwave”. |
clean_start | Optional. Determines if all states from queues are removed when the client disconnects.
|
inflight_messages | Optional. Maximum number of inflight messages. Defaults to 100. |
max_packet_size | Optional. The maximum message size for the MQTT client. |
tls.client_cert | Optional. Path to the client’s certificate file (PEM) or a string with the certificate content. Required for client authentication. Can use fs:// prefix for file paths. |
tls.client_key | Optional. Path to the client’s private key file (PEM) or a string with the private key content. Required for client authentication. Can use fs:// prefix for file paths. |
topic | Required. The topic name(s) to subscribe or publish to. Can include wildcard topics, (e.g., /topic/# ). Multiple topics can be specified as a comma-separated list (e.g., /topic/1,/topic/2 ). |
iot_sensor_data
with columns for device ID, timestamp, temperature, humidity, and device status. The table is configured to connect to an MQTT broker using the MQTT connector, with specific URL, topic, and quality of service (QoS) settings, the data is encoded as JSON.