Sink Connector | Connector Parameter |
---|---|
Apache Doris | connector = 'doris' |
Apache Iceberg | connector = 'iceberg' |
AWS Kinesis | connector = 'kinesis' |
Cassandra and ScyllaDB | connector = 'cassandra' |
ClickHouse | connector = 'clickhouse' |
CockroachDB | connector = 'jdbc' |
Delta Lake | connector = 'deltalake' |
Elasticsearch | connector = 'elasticsearch' |
Google BigQuery | connector = 'bigquery' |
Google Pub/Sub | connector = 'google_pubsub' |
JDBC: MySQL | PostgreSQL | TiDB | connector = 'jdbc' |
Kafka | connector = 'kafka' |
MQTT | connector = 'mqtt' |
NATS | connector = 'nats' |
Pulsar | connector = 'pulsar' |
Redis | connector = 'redis' |
Snowflake | connector = 'snowflake' |
StarRocks | connector = 'starrocks' |
Microsoft SQL Server | connector = 'sqlserver' |
Sink decoupling
Typically, sinks in RisingWave operate in a blocking manner. This means that if the downstream target system experiences performance fluctuations or becomes unavailable, it can potentially impact the stability of the RisingWave instance. However, sink decoupling can be implemented to address this issue. Sink decoupling introduces a buffering queue between a RisingWave sink and the downstream system. This buffering mechanism helps maintain the stability and performance of the RisingWave instance, even when the downstream system is temporarily slow or unavailable. Thesink_decouple
session variable can be specified to enable or disable sink decoupling. The default value for the session variable is default
.
To enable sink decoupling for all sinks created in the sessions, set sink_decouple
as true
or enable
.
sink_decouple
as false
or disable
, regardless of the default setting.
rw_sink_decouple
is provided to query whether a created sink has enabled sink decoupling or not.
Upsert sinks and primary keys
For each sink, you can specify the data format. The available data formats areupsert
, append-only
, and debezium
. To determine which data format is supported by each sink connector, please refer to the detailed guide listed above.
In the upsert
sink, a non-null value updates the last value for the same key or inserts a new value if the key doesn’t exist. A NULL value indicates the deletion of the corresponding key.
When creating an upsert
sink, note whether or not you need to specify the primary key in the following situations.
- If the downstream system supports primary keys and the table in the downstream system has a primary key, you must specify the primary key with the
primary_key
field when creating an upsert JDBC sink. - If the downstream system supports primary keys but the table in the downstream system has no primary key, then RisingWave does not allow users to create an upsert sink. A primary key must be defined in the table in the downstream system.
- If the downstream system does not support primary keys, then users must define the primary key when creating an upsert sink.
Sink buffering behavior
A sink will buffer incoming data within each barrier interval if the stream’s internal primary key (stream key) differs from the user-defined sink primary key and the sink is notappend-only
. This mismatch can cause update events for the same sink key to be split across multiple upstream fragments in a distributed execution, leading to out-of-order operations if sent directly. Buffering allows the sink to compact and reorder updates so that delete events are emitted before insert events, ensuring correct semantics.
If the stream key matches the sink primary key exactly, or if the sink is configured as append-only
or force_append_only
, no buffering is performed and the sink emits changes immediately as they arrive.
Sink data in parquet or json format
RisingWave supports sinking data in Parquet or JSON formats to cloud storage services, including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using RisingWave’s batch processing engine through thefile_scan
API. You can also leverage third-party OLAP query engines to enhance data processing capabilities.
Below is an example to sink data to S3:
File sink currently supports only append-only mode, so please change the query to
append-only
and specify this explicitly after the FORMAT ... ENCODE ...
statement.Batching strategy for file sink
Added in v2.1.0.
Category
-
Batching based on row numbers:
RisingWave monitors the number of rows written and completes the file once the maximum row count threshold is reached. Specify
max_row_count
option in theWITH
clause to configure this behavior. -
Batching based on rollover interval:
RisingWave checks the threshold each time a chunk is about to be written and when a barrier is encountered. Specify
rollover_seconds
option in theWITH
clause to configure this behavior. - If no batching strategy is specified, RisingWave defaults to writing a new file every 10 seconds.
The condition for batching is relatively coarse-grained. The actual number of rows or exact timing of file completion may vary from the specified thresholds, as this function is intentionally flexible to prioritize efficient file management.
File organization
You can specifypath_partition_prefix
option in the WITH
clause to organize files into subdirectories based on their creation time. The available options are month, day, or hour. If not specified, files will be stored directly in the root directory without any time-based subdirectories.
Regarding file naming rules, currently, files follow the naming pattern /Option<path_partition_prefix>/executor_id + timestamp.suffix
. Timestamp
differentiates files batched by the rollover interval.
The output files look like below: