Documentation Index
Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
Use this file to discover all available pages before exploring further.
What this does: Creates a complete pipeline from a Kafka topic through a materialized view to low-latency query serving.
When to use this: You have event data in Kafka and need agents or applications to query continuously computed aggregates at low latency.
Setup
1. Create a Kafka source
CREATE SOURCE transactions (
user_id VARCHAR,
amount DOUBLE PRECISION,
status VARCHAR,
event_time TIMESTAMPTZ,
WATERMARK FOR event_time AS event_time - INTERVAL '5 SECONDS'
) WITH (
connector = 'kafka',
topic = 'transactions',
properties.bootstrap.server = 'localhost:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
2. Create a materialized view
CREATE MATERIALIZED VIEW suspicious_activity AS
SELECT
user_id,
COUNT(*) AS tx_count,
SUM(amount) AS total_amount,
window_start,
window_end
FROM TUMBLE(transactions, event_time, INTERVAL '5 MINUTES')
GROUP BY user_id, window_start, window_end
HAVING COUNT(*) > 5 AND SUM(amount) > 5000;
3. Query the results
-- Always fresh — no REFRESH needed
SELECT user_id, tx_count, total_amount
FROM suspicious_activity
ORDER BY window_end DESC
LIMIT 20;
import psycopg2
conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")
conn.autocommit = True
cur = conn.cursor()
cur.execute("""
SELECT user_id, tx_count, total_amount
FROM suspicious_activity
WHERE window_end > NOW() - INTERVAL '10 MINUTES'
ORDER BY total_amount DESC
LIMIT 10
""")
results = cur.fetchall()
4. Deliver results downstream (optional)
CREATE SINK suspicious_activity_alerts FROM suspicious_activity
WITH (
connector = 'kafka',
properties.bootstrap.server = 'localhost:9092',
topic = 'alerts'
) FORMAT PLAIN ENCODE JSON (force_append_only='true');
Key points
- The
WATERMARK declaration on the source is required for TUMBLE/HOP window aggregations to emit results
- Without
EMIT ON WINDOW CLOSE, the MV emits results incrementally as data arrives (default behavior)
scan.startup.mode = 'earliest' replays all historical Kafka data on first run; use 'latest' to start from now
- The materialized view is incrementally updated — only changed rows are recomputed when new Kafka messages arrive
Next steps