Documentation Index
Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
Use this file to discover all available pages before exploring further.
What this does: Creates a subscription on a materialized view and consumes change events (insert, update, delete) in your application as they happen.
When to use this: Your app or agent needs to react to data changes immediately, without polling a materialized view in a loop.
Setup
1. Create a materialized view to subscribe to
CREATE SOURCE transactions (
user_id VARCHAR,
amount DOUBLE PRECISION,
event_time TIMESTAMPTZ,
WATERMARK FOR event_time AS event_time - INTERVAL '5 SECONDS'
) WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'localhost:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
CREATE MATERIALIZED VIEW high_value_tx AS
SELECT user_id, amount, event_time
FROM transactions
WHERE amount > 10000;
2. Create a subscription
CREATE SUBSCRIPTION high_value_alerts
FROM high_value_tx
WITH (retention = '1D');
3. Consume changes in Python
import psycopg2
conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")
conn.autocommit = True
cur = conn.cursor()
# Declare cursor on the subscription
cur.execute("DECLARE alert_cursor SUBSCRIPTION CURSOR FOR high_value_alerts")
print("Listening for high-value transactions...")
while True:
# FETCH NEXT blocks for up to 5s waiting for data, then returns empty
cur.execute("FETCH NEXT FROM alert_cursor WITH (timeout = '5s')")
row = cur.fetchone()
if row:
# Columns: user_id, amount, event_time, op, rw_timestamp
user_id = row[0]
amount = row[1]
event_time = row[2]
op = row[3] # 'Insert', 'Delete', 'UpdateInsert', 'UpdateDelete'
if op == 'Insert': # New row inserted
print(f"ALERT: user {user_id} — ${amount:.2f} at {event_time}")
4. Consume with a start timestamp (resume from checkpoint)
-- Start consuming from a specific point in time (Unix timestamp in milliseconds)
DECLARE alert_cursor SUBSCRIPTION CURSOR FOR high_value_alerts
SINCE 1714000000000;
Key points
- Change types:
'Insert', 'Delete', 'UpdateInsert' (new value after update), 'UpdateDelete' (old value before update)
retention controls how far back you can start consuming (default: 24 hours)
FETCH NEXT FROM cur WITH (timeout = 'Ns') blocks for up to N seconds waiting for data, then returns empty — eliminates the need for a polling sleep loop
- A single subscription can have multiple cursors consuming independently
- Drop a subscription when no longer needed:
DROP SUBSCRIPTION high_value_alerts
Next steps