Ingest data from Kinesis
Use the SQL statement below to connect RisingWave to Kinesis Data Streams.
When creating a source, you can choose to persist the data from the source in RisingWave by using CREATE TABLE
instead of CREATE SOURCE
and specifying the connection settings and data format.
Syntax
schema_definition:
INFO
For Avro and Protobuf data, do not specify schema_definition
in the CREATE SOURCE
or CREATE TABLE
statement. The schema should be provided in a Web location in the option schema.location
in the ENCODE
section.
NOTE
RisingWave performs primary key constraint checks on tables with connector settings but not on regular sources. If you need the checks to be performed, please create a table with connector settings.
For a table with primary key constraints, if a new data record with an existing key comes in, the new record will overwrite the existing record.
Connector parameters
Field | Notes |
---|---|
stream | Required. Name of the stream. |
aws.region | Required. AWS service region. For example, US East (N. Virginia). |
endpoint | Optional. URL of the entry point for the AWS Kinesis service. |
aws.credentials.access_key_id | Required. This field indicates the access key ID of AWS. |
aws.credentials.secret_access_key | Required. This field indicates the secret access key of AWS. |
aws.credentials.session_token | Optional. The session token associated with the temporary security credentials. Using this field is not recommended as RisingWave contains long-running jobs and the token may expire. Creating a new role is preferred. |
aws.credentials.role.arn | Optional. The Amazon Resource Name (ARN) of the role to assume. |
aws.credentials.role.external_id | Optional. The external id used to authorize access to third-party resources. |
scan.startup.mode | Optional. The startup mode for Kinesis consumer. Supported modes: earliest (corresponding to starting position TRIM_HORIZON ), latest (corresponding to starting position LATEST ), and timestamp (starts from a specific timestamp specified by scan.startup.timestamp.millis , corresponding to starting position AT_TIMESTAMP ). The default mode is earliest . |
scan.startup.timestamp.millis | Optional. This field specifies the timestamp, represented in i64, to start consuming from. |
Other parameters
Field | Notes |
---|---|
data_format | Supported formats: DEBEZIUM, MAXWELL, CANAL, UPSERT, PLAIN. |
data_encode | Supported encodes: JSON, AVRO, PROTOBUF, CSV, BYTES. |
message | Message name of the main Message in schema definition. Required when data_encode is PROTOBUF. |
location | Web location of the schema file in http://…, https://…, or S3://… format. Required when data_encode is AVRO or PROTOBUF. Examples:https://<example_host>/risingwave/proto-simple-schema.protos3://risingwave-demo/schema-location |
Example
Here is an example of connecting RisingWave to Kinesis Data Streams to read data from individual streams.
Was this page helpful?