This topic describes how to sink data from RisingWave to a Kafka broker and how to specify security (encryption and authentication) settings.
A sink is an external target that you can send data to. To stream data out of RisingWave, you need to create a sink. Use the CREATE SINK statement to create a sink. You can create a sink with data from a materialized view or a table. RisingWave only supports writing messages in non-transactional mode.
CREATE SINK [ IF NOT EXISTS ] sink_name[FROM sink_from | AS select_query]WITH ( connector='kafka', connector_parameter = 'value', ...)FORMAT data_format ENCODE data_encode [ ( key = 'value') ][KEY ENCODE key_encode [(...)]];
Names and unquoted identifiers are case-insensitive. Therefore, you must double-quote any of these fields for them to be case-sensitive. See also Identifiers.
All WITH options are required unless explicitly mentioned as optional.
Parameter or clause
Description
sink_name
Name of the sink to be created.
sink_from
A clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_query
A SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command.
connector
Sink connector type must be kafka for Kafka sink.
properties.bootstrap.server
Address of the Kafka broker. Format: ip:port. If there are multiple brokers, separate them with commas.
topic
Address of the Kafka topic. One sink can only correspond to one topic.
primary_key
Conditional. The primary keys of the sink. Use , to delimit the primary key columns. This field is optional if creating a PLAIN sink, but required if creating a DEBEZIUM or UPSERT sink.
When creating a Kafka sink in RisingWave, you can specify the following Kafka-specific parameters. To set the parameter, add the RisingWave equivalent of the Kafka parameter as a WITH option. For additional details on these parameters, see the Configuration properties.
Kafka parameter name
RisingWave parameter name
Type
allow.auto.create.topics
properties.allow.auto.create.topics
bool
batch.num.messages
properties.batch.num.messages
int
batch.size
properties.batch.size
int
client.id
properties.client.id
string
enable.idempotence
properties.enable.idempotence
bool
enable.ssl.certificate.verification
properties.enable.ssl.certificate.verification
bool
max.in.flight.requests.per.connection
properties.max.in.flight.requests.per.connection
int
message.max.bytes
properties.message.max.bytes
int
message.send.max.retries
properties.message.send.max.retries
int
message.timeout.ms
properties.message.timeout.ms
int
queue.buffering.max.kbytes
properties.queue.buffering.max.kbytes
int
queue.buffering.max.messages
properties.queue.buffering.max.messages
int
queue.buffering.max.ms
properties.queue.buffering.max.ms
float
request.required.acks
properties.request.required.acks
int
retry.backoff.ms
properties.retry.backoff.ms
int
receive.message.max.bytes
properties.receive.message.max.bytes
int
ssl.endpoint.identification.algorithm
properties.ssl.endpoint.identification.algorithm
str
statistics.interval.ms
properties.statistics.interval.ms
int
Set properties.ssl.endpoint.identification.algorithm to none to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either https or none. By default, it is https.To monitor Kafka metrics in Grafana, set properties.statistics.interval.ms to a non-zero value. The granularity is 1000ms.Starting with version 2.0, the default value for properties.message.timeout.ms has changed from 5 seconds to 5 minutes, aligning with the default setting in the official Kafka library.
These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause.
Field
Notes
data_format
Data format. Allowed formats:
PLAIN: Output data with insert operations.
DEBEZIUM: Output change data capture (CDC) log in Debezium format.
UPSERT: Output data as a changelog stream. primary_key must be specified in this case.
To learn about when to define the primary key if creating an UPSERT sink, see the Overview.
data_encode
Data encode. Allowed encodes:
JSON: Supports PLAIN JSON, UPSERT JSON and DEBEZIUM JSON sinks.
AVRO: Supports UPSERT AVRO and PLAIN AVRO sinks.
PROTOBUF: Supports PLAIN PROTOBUF and UPSERT PROTOBUF sinks.
For UPSERT PROTOBUF sinks, you must specify key encode text, while it remains optional for other format/encode combinations.
force_append_only
If true, forces the sink to be PLAIN (also known as append-only), even if it cannot be.
timestamptz.handling.mode
Controls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding.
If omitted, the output format of timestamptz is 2023-11-11T18:30:09.453000Z which includes the UTC suffix Z.
When utc_without_suffix is specified, the format is changed to 2023-11-11 18:30:09.453000.
schemas.enable
Only configurable for upsert JSON sinks. By default, this value is false for upsert JSON sinks and true for debezium JSON sinks. If true, RisingWave will sink the data with the schema to the Kafka sink. This is not referring to a schema registry containing a JSON schema, but rather schema formats defined using Kafka Connect.
key_encode
Optional. When specified, the key encode can only be TEXT or BYTES. If set to TEXT, the primary key should be one and only one of the following types: varchar, bool, smallint, int, and bigint; If set to BYTES, the primary key should be one and only one of type bytea; When absent, both key and value will use the same setting of ENCODE data_encode ( ... ).
PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.
AWS Glue Schema Registry is a serverless feature of AWS Glue that allows you to centrally manage and enforce schemas for data streams, enabling data validation and compatibility checks. It helps in improving the quality of data streams by providing a central repository for managing and enforcing schemas across various AWS services and custom applications.You can specify the following configurations in the FORMAT PLAIN ENCODE AVRO (...) or FORMAT UPSERT ENCODE AVRO (...) clause. This allows RisingWave to load schemas from and encode metadata for AWS Glue Schema Registry, in addition to Confluent Schema Registry.Auth-related configurations:
Parameter
Description
aws.region
The region of the AWS Glue Schema Registry. For example, us-west-2.
aws.credentials.access_key_id
Your AWS access key ID.
aws.credentials.secret_access_key
Your AWS secret access key.
aws.credentials.role.arn
The Amazon Resource Name (ARN) of the role to assume. For example, arn:aws:iam::123456123456:role/MyGlueRole. This IAM role shall be granted permissions for the action glue:GetSchemaVersion.
ARN to the schema:
Parameter
Description
aws.glue.schema_arn
The ARN of the schema in AWS Glue Schema Registry. For example, 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent'.
When creating an append-only Protobuf sink, the following options can be used following FORMAT PLAIN ENCODE PROTOBUF or FORMAT UPSERT ENCODE PROTOBUF.
Field
Notes
message
Required. Package qualified message name of the main Message in the schema definition.
schema.location
Required if schema.registry is not specified. Only one of schema.location or schema.registry can be defined. The schema location. This can be in either file://, http://, https:// format.
schema.registry
Required if schema.location is not specified. Only one of schema.location or schema.registry can be defined. The address of the schema registry.
schema.registry.username
Optional. The user name used to access the schema registry.
schema.registry.password
Optional. The password associated with the user name.
schema.registry.name.strategy
Optional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy.
The file:// format is not recommended for production use. If it is used, it needs to be available for both meta and compute nodes.
Syntax:
FORMAT as PLAIN
Copy
Ask AI
FORMAT PLAINENCODE PROTOBUF ( message = 'com.example.MyMessage', schema.location = 'location')
FORMAT as UPSERT
Copy
Ask AI
FORMAT UPSERTENCODE PROTOBUF ( message = 'com.example.MyMessage', schema.location = 'location') KEY ENCODE TEXT
For data type mapping, the serial type is supported. We map the serial type to the 64-bit signed integer.
The jsonb.handling.mode determines how jsonb data types are encoded. This parameter has two possible values:
string: Encodes the jsonb type to a string. For example, if you set this parameter, {"k": 2} will be converted to "{\"k\": 2}".
dynamic: Dynamically encodes a jsonb type value to a JSON type value. For example, if you set this parameter, {"k": 2} will be converted to {"k": 2}. Here the jsonb value is encoded to a JSON object type value.
You can set this parameter in the WITH clause of ENCODE JSON.
For data mapping, the serial type is supported. However, note that it is mapped into a JSON string like "0x05fb93d677c4e000" instead of a JSON number 431100738685689856. This string form avoids JSON number precision issues with large int64 values, and you can still order by the fixed-length hexadecimal string to obtain the same order as the serial number (whereas variable-length string "12" sorts before "7").
If your Kafka sink service is located in a different VPC from RisingWave, use AWS PrivateLink or GCP Private Service Connect to establish a secure and direct connection. For details on how to set up an AWS PrivateLink connection, see Create an AWS PrivateLink connection.To create a Kafka sink with a PrivateLink connection, in the WITH section of your CREATE SINK statement, specify the following parameters.
Parameter
Notes
privatelink.targets
The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues.
privatelink.endpoint
The DNS name of the VPC endpoint. If you’re using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in Create a VPC connection.
Here is an example of creating a Kafka sink using a PrivateLink connection. Notice that {"port": 8001} corresponds to the broker ip1:9092, and {"port": 8002} corresponds to the broker ip2:9092.
RisingWave can sink data to Kafka that is encrypted with Transport Layer Security (TLS) and/or authenticated with SASL.Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015. For historical reasons, SSL is used in configuration and code instead of TLS.Simple Authentication and Security Layer (SASL) is a framework for authentication and data security in Internet protocols.RisingWave supports these SASL authentication mechanisms:
SASL/PLAIN
SASL/SCRAM
SASL/GSSAPI
SASL/OAUTHBEARER
SSL encryption can be used concurrently with SASL authentication mechanisms.To learn about how to enable SSL encryption and SASL authentication in Kafka, including how to generate the keys and certificates, see the Security Tutorial from Confluent.You need to specify encryption and authentication parameters in the WITH section of a CREATE SINK statement.
To sink data encrypted with SSL without SASL authentication, specify these parameters in the WITH section of your CREATE SINK statement.
Parameter
Notes
properties.security.protocol
Set to SSL.
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
For the definitions of the parameters, see the librdkafka properties list. Note that the parameters in the list assumes all parameters start with properties. and therefore do not include this prefix.
Here is an example of creating a sink encrypted with SSL without using SASL authentication.