This page provides a complete reference for all configuration parameters used when connecting RisingWave to Apache Iceberg tables via the WITH clause of a CREATE SOURCE statement.

Syntax

CREATE SOURCE [IF NOT EXISTS] source_name
WITH (
   connector='iceberg',
   connector_parameter='value', ...
);

You don’t need to specify the column names for the Iceberg source, as RisingWave can derive them from the Iceberg table metadata directly. Use the DESCRIBE statement to view the column names and data types.

Parameters

ParameterDescriptionRequired (Conditional)
connectorMust be set to 'iceberg'.Yes
typeThe type of the Iceberg source. Allowed values:
  • 'append-only': For Iceberg tables where data is only appended (no updates or deletes). This is the most common and efficient option.
  • 'upsert': For Iceberg tables that support updates and deletes (using the “merge-on-read” approach). This option has performance implications, as it requires more complex processing.
Yes
database.nameThe name of the Iceberg database containing the table.Yes
table.nameThe name of the Iceberg table to ingest data from.Yes

Storage configuration (S3-compatible)

To integrate RisingWave with Iceberg tables stored in S3-compatible object storage, you must provide the necessary connection details. These configurations tell RisingWave exactly where your data warehouse resides and how to authenticate and access it.

Basic connection

These parameters configure the connection to the underlying S3-compatible storage system where the Iceberg data files are stored. This includes AWS S3, MinIO, and other compatible services.

ParameterDescriptionRequired (Conditional)
warehouse.pathThe base path to your Iceberg warehouse in the S3-compatible storage. Example: 's3://my-bucket/iceberg-warehouse'Conditional
s3.endpointThe endpoint URL for your S3-compatible storage service.
  • Example (MinIO): 'http://minio-host:9000'
  • Example (AWS S3): See AWS S3 documentation for your region’s endpoint.
Conditional
s3.regionThe AWS region where your S3 bucket is located (e.g., 'us-east-1', 'eu-west-2'). Required if using AWS S3 and s3.endpoint is not specified.Conditional
s3.access.keyYour S3 access key ID.Conditional
s3.secret.keyYour S3 secret access key.Conditional
s3.path.style.accessDetermines the access style for S3. If true, use path-style; if false, use virtual-hosted–style.No

Use Amazon S3 Tables with the Iceberg source

You can configure the RisingWave Iceberg source connector to read data from Iceberg tables managed by Amazon S3 Tables. This allows you to ingest data into RisingWave directly from tables registered in your S3 Tables catalog.

To set this up, specify connector = 'iceberg' and catalog.type = 'rest' within your CREATE SOURCE statement. You must also include the necessary parameters for SigV4 authentication against the S3 Tables REST API.

Required REST Catalog parameters for S3 Tables:

While these parameters might be optional in general Iceberg source configurations, they are required when catalog.type = 'rest' is used to connect to Amazon S3 Tables:

Parameter NameDescriptionValue for S3 TablesRequired (for S3 Tables)
catalog.rest.signing_regionThe AWS region for signing REST catalog requests.e.g., us-east-1Yes
catalog.rest.signing_nameThe service name for signing REST catalog requests.s3tablesYes
catalog.rest.sigv4_enabledEnables SigV4 signing for REST catalog requests. Must be true.trueYes

For details on general source parameters, see Basic connection.

Example CREATE SOURCE Statement:

This example creates a RisingWave source named my_s3_tables_source that reads from an Iceberg table named <your-table-name> within the <your-database-name> database managed by Amazon S3 Tables.

CREATE SOURCE my_s3_tables_source
WITH (
    connector = 'iceberg',

    -- Specify the S3 Tables warehouse ARN
    warehouse.path = 'arn:aws:s3tables:<your-region>:<your-account-id>:bucket/<your-bucket-name>',
    -- AWS Credentials
    s3.access.key = '<your-aws-access-key-id>',
    s.secret.key = '<your-aws-secret-access-key>',
    s3.region = '<your-region>', -- e.g., 'us-east-1'

    -- S3 Tables REST catalog endpoint
    catalog.uri = 'https://s3tables.<your-region>.amazonaws.com/iceberg',
    -- REST catalog signing configurations (Required for S3 Tables)
    catalog.rest.signing_region = '<your-region>', -- e.g., 'us-east-1'
    catalog.rest.sigv4_enabled = true,
    catalog.rest.signing_name = 's3tables',
    -- Specify REST catalog type
    catalog.type = 'rest',

    -- Target Iceberg table details within S3 Tables catalog
    database.name = '<your-database-name>', -- Database in S3 Tables
    table.name = '<your-table-name>'       -- Table name in S3 Tables
;
Replace placeholder values (<...>) with your specific AWS account, region, bucket, database, table names, and credentials.

Once created, RisingWave will use this source to read data from the specified Iceberg table, leveraging Amazon S3 Tables for metadata discovery.

Storage configuration (GCS)

These parameters configure the connection to the underlying GCS storage system where the Iceberg data files are stored.

ParameterDescriptionRequired (Conditional)
warehouse.pathThe base path to your Iceberg warehouse in GCS. Example: 'gs://bucket/path'Conditional
gcs.credentialThe credential to access GCS.Conditional

Catalog configuration

These parameters configure the Iceberg catalog. The catalog is responsible for managing table metadata (schema, partitioning, location). RisingWave supports several catalog types.

ParameterDescriptionRequired (Conditional)
catalog.nameThe name of the Iceberg catalog. This is a user-defined identifier. Optional for the storage catalog, but required for all other catalog types.Conditional
catalog.typeThe type of Iceberg catalog to use. Supported values:
  • 'storage': Uses the underlying file system (e.g., S3) directly for metadata.
  • 'rest': Uses the Iceberg REST catalog.
  • 'hive': Uses a Hive Metastore.
  • 'jdbc': Uses a JDBC catalog.
  • 'glue': Uses AWS Glue Data Catalog
If not specified, defaults to 'storage'.
No (defaults to storage)
catalog.uriThe URI of the catalog. The required format depends on the catalog.type:
  • rest: The base URL of the REST catalog server (e.g., 'http://rest-catalog:8181').
  • hive: The Hive Metastore URI (e.g., 'thrift://hive-metastore:9083').
  • jdbc: The JDBC connection string (e.g., 'jdbc:postgresql://postgres:5432/iceberg').
Conditional
warehouse.pathThe path of the Iceberg warehouse. Currently, only S3-compatible object storage systems and GCS, are supported. It is required if the catalog.type is not rest.Conditional
catalog.credentialCredential for accessing the Iceberg catalog, used to exchange for a token in the OAuth2 client credentials flow. Applicable only in the rest catalog.No
catalog.tokenA Bearer token for accessing the Iceberg catalog, used for interaction with the server. Applicable only in the rest catalog.No
catalog.oauth2_server_uriThe oauth2_server_uri for accessing the Iceberg catalog, serving as the token endpoint URI to fetch a token if the rest catalog is not the authorization server. Applicable only in the rest catalog.No
catalog.scopeScope for accessing the Iceberg catalog, providing additional scope for OAuth2. Applicable only in the rest catalog.No
catalog.jdbc.userUsername for JDBC catalog.No
catalog.jdbc.passwordPassword for JDBC catalog.No

Other parameters

ParameterDescriptionRequiredDefault
commit_checkpoint_intervalCommit every N checkpoints (N > 0).No60

Time travel

RisingWave supports querying historical data from Iceberg tables (time travel). You can query data as of a specific timestamp or snapshot ID.

Syntax

-- Query as of a timestamp (STRING format)
SELECT * FROM source_name FOR SYSTEM_TIME AS OF 'YYYY-MM-DD HH:MM:SS[+-HH:MM]';

-- Query as of a timestamp (UNIX timestamp in seconds)
SELECT * FROM source_name FOR SYSTEM_TIME AS OF <unix_timestamp_seconds>;

-- Query as of a snapshot ID
SELECT * FROM source_name FOR SYSTEM_VERSION AS OF <snapshot_id>;

Examples

SELECT * FROM s FOR SYSTEM_TIME AS OF '2100-01-01 00:00:00+00:00'; -- Example with timezone offset
SELECT * FROM s FOR SYSTEM_TIME AS OF '2024-04-10 12:34:56'; -- Example without timezone (local time)
SELECT * FROM s FOR SYSTEM_TIME AS OF 4102444800; -- Example with Unix timestamp
SELECT * FROM s FOR SYSTEM_VERSION AS OF 3023402865675048688; -- Example with Snapshot ID

Note: When the timestamp includes timezone, you should use the timestamp with time zone type.

System tables

RisingWave offers system tables for querying Iceberg metadata:

  • rw_iceberg_files: Contains information about the current data files of the Iceberg table.
  • rw_iceberg_snapshots: Contains information about all snapshots of the Iceberg table.
-- Query Iceberg data files
SELECT * FROM rw_iceberg_files;

-- Query Iceberg snapshots
SELECT * FROM rw_iceberg_snapshots;

Data Type Mapping

For information on how RisingWave maps Iceberg data types to RisingWave data types, see the Iceberg RisingWave data type mapping table.