Documentation Index
Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
Use this file to discover all available pages before exploring further.
What this does: Builds a full streaming lakehouse pipeline — CDC from PostgreSQL and events from Kafka flow through RisingWave transformations and land in Iceberg tables with automatic compaction.
When to use this: You want to build or extend a data lakehouse with fresh data, without managing Debezium, Kafka connectors, Flink jobs, and Iceberg maintenance separately.
Setup
1. Ingest database changes via CDC
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgres',
password = 'your_password',
database.name = 'mydb',
slot.name = 'rw_lakehouse_slot'
);
CREATE TABLE customers (
id INT PRIMARY KEY,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.customers';
CREATE TABLE orders (
id INT PRIMARY KEY,
customer_id INT,
amount DOUBLE PRECISION,
status VARCHAR,
created_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.orders';
2. Ingest events from Kafka
CREATE SOURCE clickstream (
session_id VARCHAR,
customer_id INT,
page VARCHAR,
action VARCHAR,
event_time TIMESTAMPTZ,
WATERMARK FOR event_time AS event_time - INTERVAL '10 SECONDS'
) WITH (
connector = 'kafka',
topic = 'clickstream',
properties.bootstrap.server = 'localhost:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
3. Create enriched views
-- Enrich orders with customer data
CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
o.id AS order_id,
o.customer_id,
c.name AS customer_name,
c.email AS customer_email,
o.amount,
o.status,
o.created_at
FROM orders o
JOIN customers c ON o.customer_id = c.id;
-- Aggregate clickstream by session
CREATE MATERIALIZED VIEW session_summary AS
SELECT
session_id,
customer_id,
COUNT(*) AS page_views,
COUNT(DISTINCT page) AS unique_pages,
MIN(event_time) AS session_start,
MAX(event_time) AS session_end
FROM clickstream
GROUP BY session_id, customer_id;
4. Sink to Iceberg
-- Sink enriched orders (upsert mode — reflects updates and deletes)
CREATE SINK orders_iceberg FROM enriched_orders
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'storage',
warehouse.path = 's3://my-lakehouse/warehouse',
database.name = 'analytics',
table.name = 'enriched_orders',
create_table_if_not_exists = 'true',
s3.region = 'us-east-1',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
-- Sink session summaries (upsert — session state updates as new events arrive)
CREATE SINK sessions_iceberg FROM session_summary
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'session_id',
catalog.type = 'storage',
warehouse.path = 's3://my-lakehouse/warehouse',
database.name = 'analytics',
table.name = 'session_summary',
create_table_if_not_exists = 'true',
s3.region = 'us-east-1',
s3.access.key = 'your-access-key',
s3.secret.key = 'your-secret-key'
);
Key points
- Use
type = 'upsert' for tables with changing state (CDC tables, aggregating MVs); type = 'append-only' only for truly append-only streams with no retractions
- RisingWave hosts the Iceberg catalog and runs compaction automatically — no Spark compaction jobs needed
- The
JOIN between CDC tables and streams is maintained incrementally — when customer data changes, enriched_orders updates automatically
- For production, use a proper Iceberg catalog (Glue, REST, Hive) instead of
catalog.type = 'storage'
Next steps