Skip to main content
Watermarks are like markers or signals that track the progress of event time, allowing you to process events within their corresponding time windows. A watermark is an estimate of the maximum event time observed so far, or a threshold indicating that events received so far have a timestamp later than or equal to the current watermark. Events that arrive with a timestamp earlier than the current watermark are considered late and are not processed within its time window.

How watermarks work

The following example illustrates how watermarks are generated and how they influence window computation. Assume the following events arrive with their associated event-time timestamps:
EventTimestamp
Event F11:59:30 AM
Event G12:00:00 PM
Event H12:00:10 PM
Event I11:59:50 PM
Consider a scenario where the watermark is set as the maximum event time observed so far minus 10 seconds. So the following watermarks will be generated.
EventTimestampWatermark
Event F11:59:30 AM11:59:20 AM
Event G12:00:00 PM11:59:50 AM
Event H12:00:11 PM12:00:01 PM
Event I11:59:50 PM12:00:01 PM
Now consider a window that counts events for the hour ending at 12:00 PM. The window does not emit results until the watermark advances to at least 12:00:00 PM. As a result, Events F and G are considered on time and are included in the window computation. Events H and I are excluded from the window ending at 12:00 PM. Event I is treated as a late event, since its event-time timestamp is earlier than the current watermark.

Syntax

Watermarks can be generated directly on sources. The syntax of the WATERMARK clause in RisingWave is as follows:
WATERMARK FOR column_name AS expr [WITH TTL]
column_name is a column that is created when generating the source, usually the event time column. expr specifies the watermark generation strategy. The return type of the watermark must be of type timestamp. A watermark will be updated if the return value is greater than the current watermark. When WITH TTL is specified on a table, RisingWave treats rows whose event time is at or below the watermark as expired. Late changes (INSERT/UPDATE/DELETE) for those rows are ignored, and table state is cleaned based on event time. This cleanup only affects the table itself and does not delete previously computed results in downstream materialized views. For example, the watermark generation strategy can be specified as:
  • Maximum observed timestamp
WATERMARK FOR time_col as time_col
  • Maximum observed timestamp with a delay
WATERMARK FOR time_col as time_col - INTERVAL 'string' time_unit
Supported time_unit values include: second, minute, hour, day, month, and year. For more details, see the interval data type under Overview of data types.
Currently, RisingWave only supports using one of the columns from the table as the watermark column. To use nested fields (e.g., fields in STRUCT), or perform expression evaluation on the input rows (e.g., casting data types), please refer to generated columns.

Example

The following query generates the watermark as the latest timestamp observed in order_time minus 5 seconds.
CREATE SOURCE s1 (
    product VARCHAR,
    user VARCHAR,
    price DOUBLE PRECISION
    order_time TIMESTAMP,
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    connector = 'kafka',
    topic = 'test_topic',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
The following query uses a generated column to extract the timestamp column first, and then generates the watermark using it.
CREATE SOURCE s2 (
    order_id BITINT,
    detail STRUCT<
        product VARCHAR,
        user VARCHAR,
        price DOUBLE PRECISION
        order_time TIMESTAMP
    >,
    order_time AS (detail).order_time,
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( ... );
The following query enables event-time TTL for a table with a primary key by adding WITH TTL to the watermark definition.
CREATE TABLE deposits_src (
    trx_id VARCHAR PRIMARY KEY,
    created_at TIMESTAMP,
    deposit_status VARCHAR,
    WATERMARK FOR created_at AS created_at - INTERVAL '5' DAY WITH TTL
) WITH (
    connector = 'kafka',
    topic = 'deposits',
    properties.bootstrap.server = 'broker:29092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;