Documentation Index
Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
Use this file to discover all available pages before exploring further.
What this does: Reads the PostgreSQL transaction log (WAL) via CDC, syncs table changes into RisingWave in real time, and enables downstream transformations and delivery.
When to use this: You need to react to database changes (inserts, updates, deletes) in real time, or build a streaming lakehouse from an operational PostgreSQL database.
Prerequisites
Enable logical replication on your PostgreSQL instance:
-- Run on PostgreSQL (not RisingWave)
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL after this change
-- Create a replication slot
SELECT pg_create_logical_replication_slot('risingwave_slot', 'pgoutput');
Setup
1. Create the CDC source in RisingWave
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgres',
password = 'your_password',
database.name = 'mydb',
slot.name = 'risingwave_slot'
);
2. Create a table from the CDC source
CREATE TABLE orders (
id INT PRIMARY KEY,
user_id INT,
product_id INT,
amount DOUBLE PRECISION,
status VARCHAR,
created_at TIMESTAMPTZ
) FROM pg_source TABLE 'public.orders';
3. Build a materialized view on top
CREATE MATERIALIZED VIEW order_summary AS
SELECT
user_id,
COUNT(*) AS order_count,
SUM(amount) AS total_spent,
MAX(created_at) AS last_order_at
FROM orders
WHERE status = 'completed'
GROUP BY user_id;
4. Query results
SELECT user_id, order_count, total_spent
FROM order_summary
WHERE user_id = 42;
5. Deliver to downstream (optional)
-- Sink to Kafka
CREATE SINK order_summary_sink FROM order_summary
WITH (
connector = 'kafka',
properties.bootstrap.server = 'localhost:9092',
topic = 'order_summaries'
) FORMAT PLAIN ENCODE JSON (force_append_only='true');
-- Or sink to Iceberg
CREATE SINK order_summary_iceberg FROM order_summary
WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id',
catalog.type = 'storage',
warehouse.path = 's3://my-bucket/warehouse',
database.name = 'mydb',
table.name = 'order_summary',
create_table_if_not_exists = 'true'
);
Key points
- RisingWave reads from the PostgreSQL WAL — no triggers or application changes needed
- Changes (INSERT, UPDATE, DELETE) are all reflected in the RisingWave table
- The
slot.name must match the replication slot you created on PostgreSQL
- For AWS RDS, Supabase, and Neon, see platform-specific setup guides
Next steps