Manage subscription
Use the syntax below to create, drop or alter subscription.Create subscription
To create a subscription, use the syntax below:FROM
clause must specify either a table or a materialized view (mv).
The retention
parameter should be provided as a string in the format of an interval. It represents the duration for which incremental data will be retained. Any incremental data that exceeds the specified retention duration will be automatically deleted and will no longer be accessible.
Drop subscription
To drop a subscription, use the syntax below:Alter subscription
To rename a subscription, change the owner, or set a new schema, use the syntax below:Subscription cursor
A subscription cursor is a unit used to consume data from a subscription. In RisingWave, it’s a tool specifically designed to work in conjunction with a subscription, differing from the general cursor. In RisingWave, the subscription cursor allows you to specify a specific starting point within the subscription data. After creating the subscription cursor, you can use a loop to fetch and consume data starting from that point onwards. Results returned by the cursor are sorted by primary key. A subscription can have multiple subscription cursors, each consuming different ranges or intervals of data from the subscription.Syntax
The syntax of creating a subscription cursor is as follows:since_clause
is used to specify the starting point for reading data. By setting this clause, you can control the range of data that is returned, allowing you to retrieve only the incremental data or data starting from a specific time or event.
Below are the available choices for since_clause
. If you don’t specify the since_clause
, the returned data will just include the incremental data after declaration, which equals to the first choice below.
since now()/proctime()
: The returned data will include only the incremental data starting from the time of declaration.since begin()
: The returned data will include the oldest incremental data available, typically starting from the beginning of the subscription’s retention period.since unix_ms
: Starts reading from the first time point greater than or equal to the specifiedunix_ms
value. It’s important to note that theunix_ms
value should fall within the range ofnow() - subscription's retention
andnow
.
FULL
instead of the since_clause
, the subscription cursor starts consuming data from stock.
Fetch from cursor
FETCH from cursor function is supported in the PSQL simple query mode and extended mode.
Non-blocking data fetch
op
column in the result indicates the type of change operations. There are four options: Insert
, UpdateInsert
, Delete
, and UpdateDelete
. For a single UPDATE statement, the subscription log will contain two separate rows: one with UpdateInsert
and another with UpdateDelete
. This is because RisingWave treats an UPDATE as a delete of the old value followed by an insert of the new value. As for rw_timestamp
, it corresponds to the Unix timestamp in milliseconds when the data was written.
Blocking data fetch
timeout
value should be a string in the interval format. In this case, the fetch statement will return when either N rows have been fetched or the timeout occurs. If the timeout occurs, whatever has been read up to that point will be returned. Here are two scenarios to trigger the timeout:
- The cursor has reached the latest data and has been waiting too long for new data to arrive.
- At least N rows are available for the cursor to read, but retrieving all of them takes an extended period.
FETCH
, you can set a longer timeout to simulate a scenario where you want the FETCH
to block until new data arrives.
Order of the fetched data
- For data with different
rw_timestamp
, values are returned in the order the events occurred. - For data with the same
rw_timestamp
, the order matches the event sequence if the data belongs to the same primary key in the subscribed materialized view or table. - For data with the same
rw_timestamp
but different primary keys, the order may not reflect the exact event sequence.
Show subscription cursors
To show all subscription cursors in the current session, use the syntax below:Examples
Let’s create a tablet1
and subscribe this table, then create a cursor for this subscription.
FETCH NEXT FROM cursor_name
statement to fetch data from this cursor:
t1
and fetch again to view the changes:
since_clause
. Let’s use since unix_ms
to rebuild the cursor:
Subscribing via Postgres driver
For this feature, you only need to use the Postgres driver, and no extra dependencies are required. Here’s an example using Python and psycopg2.Exactly-once delivery
The persistent nature of subscriptions allows the subscriber to resume from a specific point in time (rw_timestamp
) without data loss after a failure recovery. We also guarantee no duplicates in subscriptions, thus ensuring exactly-once delivery.
Persisting the consumption progress
To achieve exactly-once delivery, it’s required to periodically persist the timestamp in storage. We recommend using RisingWave as the store, as no extra component is needed. First, we need to create a table for storing the progress.Use case
Potential use cases for subscriptions are as follows. If you have explored more use cases, feel free to share them with us in our Slack channel.- Real-time alerting/notification: Subscribers can employ sophisticated alerting rules to detect abnormal events and notify downstream applications.
- Event-driven architectures: Develop event-driven systems that react to changes based on specific business logic, such as synchronizing data to microservices.