Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. The RisingWave Redshift sink connector provides efficient data ingestion with support for automatic schema changes and both S3-based and direct loading methods.
Syntax
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='redshift',
connector_parameter = 'value', ...
);
Parameters
| Parameter | Description |
| jdbc.url | JDBC URL to connect to Redshift |
| user | Redshift username |
| password | Redshift password |
| schema | Redshift schema name |
| table.name | Name of the target table |
| intermediate.table.name | Name of the intermediate table used for upsert mode. No need to fill this out in append-only mode |
| auto_schema_change | Enable automatic schema change for upsert sink; adds new columns to target table if needed |
| create_table_if_not_exists | Create target table if it does not exist |
| write.target.interval.seconds | Interval in seconds for scheduled writes (default: 3600) |
| batch_insert_rows | Number of rows per batch insert (default: 4096) |
S3 parameters
These options only need to be set when with_s3 = true:
| Parameter | Description |
| with_s3 | Enable writing via S3 (default: true) |
| s3.region_name | AWS region for S3 |
| s3.bucket_name | S3 bucket name |
| s3.path | S3 folder path for sink files |
| enable_config_load | Load S3 credentials from environment (self-hosted only) |
| s3.credentials.access | AWS access key ID |
| s3.credentials.secret | AWS secret access key |
| s3.endpoint_url | Custom S3 endpoint URL (for self-hosted setups) |
| s3.assume_role | IAM role ARN to assume for S3 access |
In RisingWave Cloud, the default AWS credential provider chain is disabled. Provide s3.credentials.access and s3.credentials.secret (or use a supported assume-role setup). These credentials cannot be omitted. The enable_config_load option is supported only in self-hosted deployments.
Auto schema change
PREMIUM FEATUREThis is a premium feature. For a comprehensive overview of all premium features and their usage, please see RisingWave premium features.
Redshift sinks support auto schema change to automatically adapt their output schema according to changes in the upstream table. This feature is supported only when sink_decoupling is disabled.
Once auto schema change is enabled, if you add new columns to the source table, the sink will automatically update to match the new schema. This reduces manual intervention and makes your data pipelines more robust to schema evolution.
To enable it, set the following option when creating the sink:
auto_schema_change = true
You need to configure how RisingWave authenticates with AWS S3. There are two primary methods:
- Access Key / Secret Key (AK/SK) authentication
This is the default method. Provide your AWS Access Key ID and Secret Access Key directly in the CREATE SINK statement.
s3.credentials.access = 'YOUR_AWS_ACCESS_KEY_ID',
s3.credentials.secret = 'YOUR_AWS_SECRET_ACCESS_KEY',
- Assume role authentication
For enhanced security, RisingWave can assume an IAM Role in your AWS account to gain temporary credentials for S3 access.
s3.assume_role = 'arn:aws:iam::123456789012:role/YourRisingWaveS3Role',
enable_config_load = 'true',
To use this method, you need to configure an IAM Role in AWS that RisingWave can assume. This involves:
- Obtaining the RisingWave service ARN from our support team.
- Creating an IAM policy with the necessary S3 read/write permissions for your bucket and prefix.
- Configuring the IAM Role’s trust policy to allow the RisingWave service ARN to assume it.
Append-only and upsert modes
Amazon Redshift sink connector supports both append-only and upsert modes for flexible data handling.
In upsert mode, performance is optimized through the use of an intermediate table:
-
An intermediate table is created to stage data before merging it into the target table. If
create_table_if_not_exists is set to true, the table is automatically named rw_<target_table_name>_<uuid>.
-
Data is periodically merged from the intermediate table into the target table according to the
write.target.interval.seconds setting.
-
By default, an S3 bucket is required to achieve optimal ingestion performance into the intermediate table.
-
Alternatively, you can use
INSERT SQL statements to load data directly into the intermediate table, though this approach is not recommended due to performance drawbacks.
Examples:
- Redshift sink with S3 writer (Append-only mode)
CREATE SINK redshift_sink FROM test_table WITH (
connector = 'redshift',
type = 'append-only',
table.name = 'test_table',
schema = 'RW_SCHEMA',
-- JDBC configs are always required for Redshift
jdbc.url = 'jdbc:redshift://...',
username = '...',
password = '...',
-- `with_s3` is default to true. If `with_s3` is set to false (not recommended), you to specify `batch.insert.rows`, and the value must be a power of 2, such as 1024 or 4096.
with_s3 = true,
s3.bucket_name = '...',
s3.region_name = '...',
s3.path = '...',
-- Authentication: access key / secret (default)
s3.credentials.access = '...',
s3.credentials.secret = '...',
-- Or assume role (requires cloud support)
s3.assume_role = '...',
enable_config_load = 'true',
-- Defaults (can be overridden)
auto.schema.change = 'false',
create_table_if_not_exists = 'false'
);
- Redshift sink with S3 writer (Upsert mode)
CREATE SINK redshift_sink FROM test_table WITH (
connector = 'redshift',
type = 'upsert',
table.name = 'test_table',
schema = 'RW_SCHEMA',
-- Intermediate table required for upsert
intermediate.table.name = '...',
-- Default: 3600 seconds (1 hour)
write.target.interval.seconds = '...',
-- JDBC configs are always required for Redshift
jdbc.url = 'jdbc:redshift://...',
username = '...',
password = '...',
-- `with_s3` is default to true. If `with_s3` is set to false (not recommended), you to specify `batch.insert.rows`, and the value must be a power of 2, such as 1024 or 4096.
with_s3 = true,
s3.bucket_name = '...',
s3.region_name = '...',
s3.path = '...',
-- Authentication: access key / secret (default)
s3.credentials.access = '...',
s3.credentials.secret = '...',
-- Or assume role (requires cloud support)
s3.assume_role = '...',
enable_config_load = 'true',
-- Defaults (can be overridden)
auto.schema.change = 'false',
create_table_if_not_exists = 'false'
);
Set up Redshift
When configuring RisingWave to write to Redshift, the JDBC user must have appropriate permissions depending on whether the create_table_if_not_exists option is enabled.
- If
create_table_if_not_exists is not enabled, the user must have permissions on the target table:
GRANT SELECT, INSERT, UPDATE, DELETE, ALTER ON [table_name] TO [username];
These permissions allow RisingWave to read, insert, update, delete, and alter the existing table.
- If
create_table_if_not_exists is enabled, the user needs the table permissions above plus schema-level permissions to create new tables:
GRANT CREATE ON SCHEMA [schema_name] TO [username];
This ensures that RisingWave can create tables in the specified schema when they do not already exist.
Set up Redshift S3 integration
When using S3 as an intermediate storage for Redshift sinks, you need to configure assume role permissions so that RisingWave can write to the user’s S3 bucket.
- Permissions for the S3 account configured in RisingWave
The S3 account credentials used in RisingWave must have the following permissions on the target bucket/path:
s3:GetObject
s3:ListBucket
s3:PutObject
s3:DeleteObject
These permissions allow RisingWave to read, list, write, and delete files in the staging S3 location.
- Permissions for Redshift to access the S3 account
Redshift itself also needs permissions to read from the same S3 bucket. Grant the following permissions to Redshift’s IAM role:
s3:GetObject
s3:ListBucket
This allows Redshift to copy data from S3 into the Redshift table.
Set up S3 IAM and role
To guarantee that the IAM role has sufficient permissions to connect to Redshift and access AWS resources:
- Attach policy to the IAM Role
Attach the policy AmazonRedshiftAllCommandsFullAccess to the role.
- Configure trusted entities
In the IAM Role trust relationship, allow Redshift service to assume the role:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
- Attach the IAM role to Redshift instance
-
Navigate to the Amazon Redshift service in the AWS Management Console.
-
In the left navigation pane, select Clusters and choose the cluster you want to configure.
-
From the Actions menu, select Manage IAM Roles, and attach the configured IAM role.