Skip to main content
Need help generating SQL? Use Claude Code or Cursor with the RisingWave MCP server to generate and run SQL interactively. You can create and manage Apache Iceberg tables directly in RisingWave. When you create an internal Iceberg table (that is, a RisingWave-managed Iceberg table), RisingWave handles its lifecycle, while the underlying data is stored in the open Apache Iceberg format in an object store you configure.

Create an internal Iceberg table

Creating and using an internal Iceberg table is a two-step process: first, you define the storage and catalog details in a CONNECTION object, and then you create the table itself.

Step 1: Create an Iceberg Connection

An Iceberg CONNECTION defines the catalog and object storage configuration. You must specify the type and warehouse.path parameters, along with the required parameters for your catalog and object storage. To use the JDBC-based built-in catalog, set hosted_catalog to true. commit_checkpoint_interval and commit_checkpoint_size_threshold_mb are configured on the table or sink WITH (...) properties (for example, CREATE TABLE ... ENGINE = iceberg WITH (...)), not on the CONNECTION object itself. In the default configuration, Iceberg commits happen about every 60 seconds (commit_checkpoint_interval = 60). You can also set commit_checkpoint_size_threshold_mb to trigger early commits based on buffered data size. When uncommitted write data exceeds this threshold (in MB), RisingWave commits at the next checkpoint barrier regardless of commit_checkpoint_interval. This value must be greater than 0; the default is 128 MB. This is especially useful during large backfills to avoid accumulating too much uncommitted data. When you create a CONNECTION, you specify the object storage backend where the table data will be stored. You also specify the catalog that will manage the table’s metadata.
For S3 credentials (applies to all catalogs):
  • If enable_config_load = false: you must provide s3.access.key and s3.secret.key (you may also set s3.iam_role_arn).
  • If enable_config_load = true: don’t provide s3.access.key/s3.secret.key (you may set s3.iam_role_arn, or rely on the role already available in your environment/config).
See Object storage configuration.
For more details on the available catalog options, see Iceberg catalog configuration.
CREATE CONNECTION my_iceberg_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    s3.region = 'us-west-2',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret',
    hosted_catalog = true
);
For more details, see Built-in catalog.

Step 2: Create an internal Iceberg table

Create an internal Iceberg table using the ENGINE = iceberg clause. To create Iceberg tables, RisingWave needs to know which Iceberg CONNECTION to use (this connection contains both the object storage settings and the catalog settings). Choose one option below.
-- Option 1: Set a default connection for the session
SET iceberg_engine_connection = 'public.my_iceberg_conn';

CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg;
-- Option 2: Specify the connection explicitly
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg
  WITH (connection = 'public.my_iceberg_conn');
You can also define a partition strategy in the WITH clause to optimize query performance.
CREATE TABLE partitioned_events (
    user_id INT,
    event_type VARCHAR,
    event_date DATE,
    PRIMARY KEY (event_date, user_id)
) WITH (
    partition_by = 'event_date'
) ENGINE = iceberg;
Supported partitioning strategies include by column, by multiple columns, and by applying transforms like bucket(n, column) or truncate(n, column). The partition key must be a prefix of the primary key.

Work with internal tables

Once created, you can work with an internal Iceberg table using familiar SQL (insert, query, materialized views). One important difference: new writes become queryable only after an Iceberg commit. By default, Iceberg commits happen about every 60 seconds (controlled by commit_checkpoint_interval), or earlier when the buffered write size exceeds commit_checkpoint_size_threshold_mb (default: 128 MB).

Ingest data

You can ingest data using standard INSERT statements or by streaming data from a source using CREATE SINK ... INTO.
-- Manual inserts
INSERT INTO user_events VALUES (1, 'login', '2024-01-01 10:00:00Z');

-- Stream data from a Kafka source into the same table
CREATE SOURCE user_events_src (
  user_id INT,
  event_type VARCHAR,
  timestamp TIMESTAMPTZ
) WITH (
  connector = 'kafka',
  topic = 'user_events',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SINK to_user_events INTO user_events AS
SELECT user_id, event_type, timestamp
FROM user_events_src;

Query data

Query the table directly with SELECT or use it as a source for a materialized view.
-- Ad hoc query
-- Note: you may need to wait for the next Iceberg commit (default ~60s) to see recent writes.
SELECT * FROM user_events WHERE event_type = 'login';

-- Create a materialized view
CREATE MATERIALIZED VIEW user_login_count AS
SELECT user_id, COUNT(*) as login_count
FROM user_events 
WHERE event_type = 'login'
GROUP BY user_id;

Time travel

Time travel queries work on committed Iceberg snapshots. Make sure at least one Iceberg commit has happened before using these queries.
-- Query a snapshot by timestamp
SELECT * FROM user_events FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00Z';

-- Query a snapshot by ID
SELECT * FROM user_events FOR SYSTEM_VERSION AS OF 1234567890;

Partition strategy

RisingWave’s Iceberg table engine supports table partitioning using the partition_by option. Partitioning helps organize data for efficient storage and query performance. You can partition by one or multiple columns, separated by commas, and optionally apply a Transform function to each column to customize partitioning. Supported transformations include identity, truncate(n), bucket(n), year, month, day, hour, and void. For more details on Iceberg partitioning, see Partition transforms.
CREATE TABLE t_partition (
    v1 INT,
    v2 INT,
    v3 TIMESTAMP,
    v4 TIMESTAMP,
    PRIMARY KEY (v1, v2, v3, v4)
)
WITH (
    -- `commit_checkpoint_interval` controls Iceberg commit frequency. Default: about every 60 seconds; set to 1 for faster commits and visibility.
    commit_checkpoint_interval = 1,
    partition_by = 'truncate(4,v2),bucket(5,v1)'
)
ENGINE = ICEBERG;

Query storage selection

Iceberg engine tables have two storage backends for batch reads:
  • Iceberg columnar storage – better for wide scans and analytical reads.
  • Hummock row storage – better for point reads and highly selective access.
You can control which backend a batch SELECT uses with the iceberg_query_storage_mode session variable.
ValueBehavior
iceberg (default)Always read from Iceberg columnar storage.
hummockAlways read from Hummock row storage.
autoLet the optimizer decide. Currently prefers Hummock for point lookups on the table primary key.
-- Use Iceberg columnar storage (default)
SET iceberg_query_storage_mode = 'iceberg';

-- Use Hummock row storage (good for primary-key point lookups)
SET iceberg_query_storage_mode = 'hummock';

-- Let the optimizer decide based on the query shape
SET iceberg_query_storage_mode = 'auto';
iceberg_query_storage_mode only affects batch SELECT on tables created with ENGINE = ICEBERG. Streaming queries are not affected.

Table maintenance

To maintain good performance and manage storage costs, internal Iceberg tables require periodic maintenance, including compaction and snapshot expiration. RisingWave provides both automatic and manual maintenance options. For complete details, see the Iceberg table maintenance guide.

External access

Because internal tables are standard Iceberg tables, they can be read by external query engines like Spark or Trino using the same catalog and storage configuration. Spark Example:
spark.sql("SELECT * FROM iceberg_catalog.your_database.user_events")

Limitations

  • Advanced schema evolution operations are not yet supported.
  • To ensure data consistency, only RisingWave should write to internal Iceberg tables.