When creating a source, you can choose to persist the data from the source in RisingWave by using CREATE TABLE instead of CREATE SOURCE and specifying the connection settings and data format.

Syntax

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
[INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]]
WITH (
   connector='kinesis',
   connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
   message = 'message',
   schema.location = 'location'
);

schema_definition:

(
   column_name data_type [ PRIMARY KEY ], ...
   [ PRIMARY KEY ( column_name, ... ) ]
)

INFO

For Avro and Protobuf data, do not specify schema_definition in the CREATE SOURCE or CREATE TABLE statement. The schema should be provided in a Web location in the option schema.location in the ENCODE section.

NOTE

RisingWave performs primary key constraint checks on tables with connector settings but not on regular sources. If you need the checks to be performed, please create a table with connector settings.

For a table with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.

Connector parameters

FieldNotes
streamRequired. Name of the stream.
aws.regionRequired. AWS service region. For example, US East (N. Virginia).
endpointOptional. URL of the entry point for the AWS Kinesis service.
aws.credentials.access_key_idRequired. This field indicates the access key ID of AWS.
aws.credentials.secret_access_keyRequired. This field indicates the secret access key of AWS.
aws.credentials.session_tokenOptional. The session token associated with the temporary security credentials. Using this field is not recommended as RisingWave contains long-running jobs and the token may expire. Creating a new role is preferred.
aws.credentials.role.arnOptional. The Amazon Resource Name (ARN) of the role to assume.
aws.credentials.role.external_idOptional. The external id used to authorize access to third-party resources.
scan.startup.modeOptional. The startup mode for Kinesis consumer. Supported modes: earliest (corresponding to starting position TRIM_HORIZON), latest (corresponding to starting position LATEST), and timestamp (starts from a specific timestamp specified by scan.startup.timestamp.millis, corresponding to starting position AT_TIMESTAMP). The default mode is earliest.
scan.startup.timestamp.millisOptional. This field specifies the timestamp, represented in i64, to start consuming from.

Other parameters

FieldNotes
data_formatSupported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN.
data_encodeSupported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES.
messageMessage name of the main Message in schema definition. Required when data_encode is PROTOBUF.
locationWeb location of the schema file in http://…, https://…, or S3://… format. Required when data_encode is AVRO or PROTOBUF. Examples:https://<example_host>/risingwave/proto-simple-schema.protos3://risingwave-demo/schema-location

Example

Here is an example of connecting RisingWave to Kinesis Data Streams to read data from individual streams.

CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
WITH (
   connector='kinesis',
   stream='kafka',
   aws.region='user_test_topic',
   endpoint='172.10.1.1:9090,172.10.1.2:9090',
   aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
   aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
   aws.credentials.role.external_id='demo_external_id',
   aws.credentials.access_key_id = 'your_access_key',
   aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE AVRO (
    schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);