In RisingWave, the Iceberg table engine allows you to create and manage tables directly within the system, while storing their underlying data in the Apache Iceberg format on external object storage. This offers an alternative way to persist data compared to RisingWave’s default row-based internal storage format (which also typically uses object storage).

Using the Iceberg table engine provides several benefits:

  • Native management: RisingWave manages the table’s lifecycle (creation, schema, writes). You interact with it like any other RisingWave table (querying, inserting, using in materialized views).
  • Iceberg format storage: Data is physically stored according to the Iceberg specification, using a configured Iceberg catalog and object storage path. This ensures compatibility with the Iceberg ecosystem.
  • Simplified pipelines: You don’t need a separate CREATE SINK step to export data out of RisingWave into Iceberg format if Iceberg is your desired end format managed by RisingWave. Data ingested or computed can land directly in these Iceberg tables.
  • Interoperability: Tables created with the Iceberg engine are standard Iceberg tables and can be read by external Iceberg-compatible query engines (like Spark, Trino, Flink, Dremio) using the same catalog and storage configuration.

This guide details how to set up and use the Iceberg table engine.

Setup and usage

1. Create an Iceberg connection

The Iceberg connection contains information about catalog and object storage. For syntax and properties, see CREATE CONNECTION.

The example below creates an Iceberg connection conn using storage as catalog and MinIO as an S3-compatible object store.

Storage catalog
CREATE CONNECTION public.conn WITH (
    type = 'iceberg',
    catalog.name = 'demo',
    catalog.type = 'storage',
    warehouse.path = 's3://hummock001/iceberg-data',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.region = 'ap-southeast-2',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin'
);

The following examples show how to create an Iceberg connection using different catalog types.

JDBC catalog
CREATE CONNECTION public.conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://hummock001/iceberg-data',
    s3.access.key = secret secret_1,
    s3.secret.key = secret secret_1,
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.region = 'ap-southeast-2',
    catalog.type = 'jdbc',
    catalog.uri = 'jdbc:postgresql://127.0.0.1:8432/metadata',
    catalog.jdbc.user = 'postgres',
    catalog.jdbc.password = '123',
    catalog.name = 'dev',
);
Glue catalog
CREATE CONNECTION public.conn WITH (
    type = 'iceberg',
    catalog.type = 'glue',
    warehouse.path = 's3://my-iceberg-bucket/test',
    s3.endpoint = 'https://s3.ap-southeast-2.amazonaws.com',
    s3.region = 'ap-southeast-2',
    s3.access.key = secret secret_1,
    s3.secret.key = secret secret_1,
);
Rest catalog
CREATE CONNECTION public.conn WITH (
    type = 'iceberg',
    catalog.type='rest',
    catalog.uri= 'http://localhost:8181/catalog',
    warehouse.path = 'test',
    s3.endpoint = 'https://s3.ap-southeast-2.amazonaws.com',
    s3.region = 'ap-southeast-2'
    s3.access.key = secret secret_1,
    s3.secret.key = secret secret_1
);

2. Configure the engine to use the connection

You need to configure the iceberg table engine to use the connection that was just created. All tables created with the Iceberg table engine will use the connection by default.

SET iceberg_engine_connection = 'public.conn';
ALTER system SET iceberg_engine_connection = 'public.conn';

3. Create a table with the Iceberg engine

Now, you can create a table using the standard CREATE TABLE syntax, but adding the ENGINE = iceberg clause.

The commit_checkpoint_interval parameter controls how frequently (every N checkpoints) RisingWave commits changes to the Iceberg table, creating a new Iceberg snapshot. The default value is 60. Typically, the checkpoint time is 1s. That means RisingWave will commit changes to the Iceberg table every 60s.

CREATE TABLE t (
    id INT PRIMARY KEY, 
    name VARCHAR
) WITH (commit_checkpoint_interval = 1) 
ENGINE = iceberg;

4. Basic operations (insert and select)

For tables created with the Iceberg table engine, you can insert data using standard INSERT statements and query using SELECT. You can use them as base tables to create materialized views or join them with regular tables. However, RisingWave doesn’t support renaming or changing the schemas of natively managed Iceberg tables.

INSERT INTO users_iceberg VALUES (1, 'Alice', NOW());
INSERT INTO users_iceberg VALUES (2, 'Bob', NOW());

SELECT * FROM users_iceberg WHERE user_id = 1;
--  user_id | user_name |         signup_ts
-- ---------+-----------+---------------------------
--        1 | Alice     | 2023-10-27 10:30:00.123...
-- (1 row)

5. Streaming ingestion into Iceberg tables

You can define a source connector directly within the CREATE TABLE statement that uses the Iceberg engine. Data flows from the external source (e.g., Kafka, Pulsar) directly into the Iceberg table format managed by RisingWave.

-- Example: Stream Kafka topic directly into an Iceberg table
CREATE TABLE page_views_iceberg (
    view_id BIGINT,
    url VARCHAR,
    user_id INT,
    view_ts TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'page_views_topic',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest',
    commit_checkpoint_interval = 120 -- Commit every 120 checkpoints (e.g., ~2 mins)
)
FORMAT PLAIN ENCODE JSON  -- Specify the format of the incoming Kafka messages
ENGINE = iceberg;        -- Store data natively in Iceberg format

You can also sink data from another RisingWave source, table, or materialized view into a table created with the Iceberg engine.

-- Assume users_iceberg table exists (created with ENGINE = iceberg)
-- Assume raw_users_stream source exists

CREATE SINK user_sink INTO users_iceberg FROM raw_users_stream;

Features and considerations

Time travel

Iceberg’s snapshotting mechanism enables time travel queries. You can query the state of the table as of a specific timestamp or snapshot ID using the FOR SYSTEM_TIME AS OF or FOR SYSTEM_VERSION AS OF clauses.

-- Query state based on timestamp
SELECT * FROM users_iceberg FOR SYSTEM_TIME AS OF '2023-10-27 10:00:00';

-- Query state based on a specific Iceberg snapshot ID
SELECT * FROM users_iceberg FOR SYSTEM_VERSION AS OF 876543219876543210; -- Replace with actual snapshot ID

External access

Since the data is stored in the standard Iceberg format, external systems (like Spark, Trino, Dremio) can directly query the tables created by RisingWave’s Iceberg engine. To do this, configure the external system with the same Iceberg connection details used in RisingWave:

  • Catalog type (storage, jdbc, glue, rest)
  • Catalog configuration (URI, warehouse path, credentials)
  • Object storage configuration (endpoint, credentials)

The namespace and table name in the external catalog will typically match the schema and table name in RisingWave. For example, the table public.users_iceberg in RisingWave would be accessed as table users_iceberg within the public namespace/database in the configured Iceberg catalog by an external tool.

Limitations

RisingWave does not currently have a built-in automatic compaction service for tables created with the Iceberg engine. Streaming ingestion, especially with frequent commits (low commit_checkpoint_interval), can lead to many small files. You may need to run compaction procedures manually using external tools that operate on Iceberg tables to optimize read performance. We are actively working on integrating compaction features.