Skip to main content
You can create and manage Apache Iceberg tables directly in RisingWave. When you create an internal Iceberg table (that is, a RisingWave-managed Iceberg table), RisingWave handles its lifecycle, while the underlying data is stored in the open Apache Iceberg format in an object store you configure.

Create an internal Iceberg table

Creating and using an internal Iceberg table is a two-step process: first, you define the storage and catalog details in a CONNECTION object, and then you create the table itself.

Step 1: Create an Iceberg Connection

An Iceberg CONNECTION defines the catalog and object storage configuration. You must specify the type and warehouse.path parameters, along with the required parameters for your catalog and object storage. To use the JDBC-based hosted catalog, set hosted_catalog to true. You can also set the optional commit_checkpoint_interval parameter to control the commit frequency. For example, setting it to 10 means RisingWave will commit data every 10 checkpoints. The following tabs show examples for different catalog types. For a complete list of parameters, refer to Catalog configuration.
  • Hosted catalog - JDBC
  • JDBC catalog
  • Glue catalog
  • REST catalog
  • S3 Tables catalog
For the simplest setup, use RisingWave’s built-in JDBC-based hosted catalog. This requires no external dependencies.
CREATE CONNECTION my_iceberg_conn WITH (
    type = 'iceberg',
    warehouse.path = 's3://my-bucket/warehouse/',
    s3.region = 'us-west-2',
    s3.access.key = 'your-key',
    s3.secret.key = 'your-secret',
    hosted_catalog = true
);
For more details, see Hosted Iceberg catalog.

Step 2: Create an internal Iceberg table

Create an internal Iceberg table using the ENGINE = iceberg clause and associate it with your connection. To simplify creation, you can set a default connection for your session.
-- Option 1: Set a default connection for the session
SET iceberg_engine_connection = 'my_iceberg_conn';

CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg;

-- Option 2: Specify the connection explicitly
CREATE TABLE user_events (
    user_id INT,
    event_type VARCHAR,
    timestamp TIMESTAMPTZ,
    PRIMARY KEY (user_id, timestamp)
) ENGINE = iceberg
  WITH (connection = 'my_iceberg_conn');
You can also define a partition strategy in the WITH clause to optimize query performance.
CREATE TABLE partitioned_events (
    user_id INT,
    event_type VARCHAR,
    event_date DATE,
    PRIMARY KEY (event_date, user_id)
) WITH (
    partition_by = 'event_date'
) ENGINE = iceberg;
Supported partitioning strategies include by column, by multiple columns, and by applying transforms like bucket(n, column) or truncate(n, column). The partition key must be a prefix of the primary key.

Work with internal tables

Once created, an internal Iceberg table behaves like any other table in RisingWave.

Ingest data

You can ingest data using standard INSERT statements or by streaming data from a source using CREATE SINK ... INTO.
-- Manual inserts
INSERT INTO user_events VALUES (1, 'login', '2024-01-01 10:00:00Z');

-- Stream data from a Kafka source into the table
CREATE SOURCE sales_src (
  item_id INT,
  customer_id INT,
  price DOUBLE,
  ts TIMESTAMP
) WITH (
  connector = 'kafka',
  topic = 'sales_events',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SINK to_sales_events INTO sales_events AS
SELECT item_id, customer_id, price, ts
FROM sales_src;

Query data

Query the table directly with SELECT or use it as a source for a materialized view.
-- Ad hoc query
SELECT * FROM user_events WHERE event_type = 'login';

-- Create a materialized view
CREATE MATERIALIZED VIEW user_login_count AS
SELECT user_id, COUNT(*) as login_count
FROM user_events 
WHERE event_type = 'login'
GROUP BY user_id;

Time travel

Query historical snapshots of the table using FOR SYSTEM_TIME AS OF or FOR SYSTEM_VERSION AS OF.
-- Query a snapshot by timestamp
SELECT * FROM user_events FOR SYSTEM_TIME AS OF TIMESTAMPTZ '2024-01-01 12:00:00Z';

-- Query a snapshot by ID
SELECT * FROM user_events FOR SYSTEM_VERSION AS OF 1234567890;

Table maintenance

To maintain good performance and manage storage costs, internal Iceberg tables require periodic maintenance, including compaction and snapshot expiration. RisingWave provides both automatic and manual maintenance options. For complete details, see the Iceberg table maintenance guide.

External access

Because internal tables are standard Iceberg tables, they can be read by external query engines like Spark or Trino using the same catalog and storage configuration. Spark Example:
spark.sql("SELECT * FROM iceberg_catalog.your_database.user_events")

Limitations

  • Advanced schema evolution operations are not yet supported.
  • To ensure data consistency, only RisingWave should write to internal Iceberg tables.
I