RisingWave provides a Python SDK risingwave-py (currently in public preview) to help users develop event-driven applications. This SDK provides a simple way to perform ad-hoc queries, subscribe to changes, and define event handlers for tables and materialized views, making it easier to integrate real-time data into applications.

Use risingwave-py to connect to RisingWave

risingwave-py is a RisingWave Python SDK that provides the following capabilities:

  • Interact with RisingWave via Pandas DataFrame.
  • Subscribe and process changes from RisingWave tables or materialized views.
  • Run SQL commands supported in RisingWave.

Run RisingWave

To learn about how to run RisingWave, see Run RisingWave.

Connect to RisingWave

To connect to RisingWave via risingwave-py:

from risingwave import RisingWave, RisingWaveConnOptions

# Connect to RisingWave instance on localhost with named parameters
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="root", database="dev"
    )
)

# Connect to RisingWave instance on localhost with connection string
rw = RisingWave(RisingWaveConnOptions("postgresql://root:root@localhost:4566/dev"))

# You can create a new SQL connection and execute operations under the with statement. 
# This is the recommended way for python sdk usage.
with rw.getconn() as conn:
    conn.insert(...)
    conn.fetch(...)
    conn.execute(...)
    conn.mv(...)
    conn.on_change(...)


# You can also use the existing connection created by RisingWave object to execute operations.
# This will be used in the later sections for simplicity.
rw.insert(...)
rw.fetch(...)
rw.execute(...)
rw.mv(...)
rw.on_change(...)

Ingestion into RisingWave

Load a Pandas DataFrame into RisingWave:

from datetime import datetime
import pandas as pd

df = pd.DataFrame(
    {
        "product": ["foo", "bar"],
        "price": [123.4, 456.7],
        "ts": [datetime.strptime("2023-10-05 14:30:00", "%Y-%m-%d %H:%M:%S"), 
               datetime.strptime("2023-10-05 14:31:20", "%Y-%m-%d %H:%M:%S")],
    }
)

# A test table will be created if not exist in risingwave with the correct schema
rw.insert(table_name="test", data=df)

# You can provide an optional force_flush parameter and set it to True
# if you would the inserted data to be visible in fetch query immediately.
# Otherwise, data will be inserted in batches asynchronously for better performance.
# rw.insert(table_name="test", data=df, force_flush = True)

Load data into RisingWave from external systems:

# Create a table and load data from upstream kafka
rw.execute("""
    CREATE TABLE IF NOT EXISTS source_abc
    WITH (
        connector='kafka',
        properties.bootstrap.server='localhost:9092',
        topic='test_topic'
    ) 
    FORMAT UPSERT ENCODE AVRO (
        schema.registry = 'http://127.0.0.1:8081',
        schema.registry.username='your_schema_registry_username',
        schema.registry.password='your_schema_registry_password'
)""")

For supported sources and the SQL syntax, see this topic.

Query from RisingWave

from risingwave import OutputFormat

result: pd.DataFrame = rw.fetch("""
        SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
        FROM tumble(test, ts, interval '10 seconds') 
        GROUP BY window_start, window_end, product""", 
        format=OutputFormat.DATAFRAME)

print(result)
# Output:
#          window_start          window_end product  avg_price
# 0 2023-10-05 14:31:20 2023-10-05 14:31:30     bar      457.0
# 1 2023-10-05 14:30:00 2023-10-05 14:30:10     foo      123.0

# You can also use OutputFormat.RAW to get back list of tuples as the query results
# rw.fetch("...",  format=OutputFormat.RAW)
# [(datetime.datetime(2023, 10, 5, 14, 31, 20), datetime.datetime(2023, 10, 5, 14, 31, 30), 'bar', 457.0), 
#  (datetime.datetime(2023, 10, 5, 14, 30), datetime.datetime(2023, 10, 5, 14, 30, 10), 'foo', 123.0)]

Event-driven processing with RisingWave

Event-driven applications depend on real-time data processing to react to events as they occur. With risingwave-py, you can define materialized views using SQL and run them in RisingWave. Behind the scenes, events are processed continuously, and the results are incrementally maintained.

In the following example, test_mv is created to incrementally maintain the result of the defined SQL as events are ingested in to the test table.

mv = rw.mv(name="test_mv",
           stmt="""SELECT window_start, window_end, product, ROUND(avg(price)) as avg_price
                   FROM tumble(test, ts, interval '10 seconds') 
                   GROUP BY window_start, window_end, product""")

In addition to using SQL to do ad-hoc query on tables and materialized views. With risingwave-py, You can also subscribe changes from table / materialized view and define handler of the change events from table / materialized view for you applications.

# Write your own handler
# the event will contains all fields of the subscribed MV/Table plus two additional columns:
# - op: varchar. The change operations. Valid values: [Insert, UpdateInsert, Delete, UpdateDelete]. 
#                The reason why we have UpdateInsert and UpdateDelete is because RisingWave treats an UPDATE 
#                as a delete of the old value followed by an insert of the new value.
# - rw_timestamp: bigint. The Unix timestamp in milliseconds when the data was written.
def simple_event_handler(event: pd.DataFrame):
    for _, row in event.iterrows():
        # Trigger an action (e.g. place an order via REST API) when the avg_price exceeds 300
        if (row["op"] == "UpdateInsert" or row["op"] == "Insert") and row["avg_price"] >= 300:
                print(
                    f"{row['window_start']} - {row['window_end']}: {row['product']} avg price {row['avg_price']} exceeds 300")
                # ...


import threading

# Subscribe changes of a materialized view and feed it to your own handler.
# Run on_change in a separate thread without blocking the main thread.
threading.Thread(
    target=lambda: mv.on_change(
        # Pass your handler here
        handler = simple_event_handler,
        # Support DATAFRAME and RAW tuples here
        output_format=OutputFormat.DATAFRAME,
        # If set to True, progress of the subscription will be saved and can be recovered on python application crashed
        persist_progress=True, 
        # Maximal number of rows returned each time
        max_batch_size = 10)
    ).start()

# Subscribe changes of a table and print them to console.
# Run on_change in a separate thread without blocking the main thread.
threading.Thread(
    target=lambda: rw.on_change(
        subscribe_from="test",
        handler = lambda data: print(data),
        output_format=OutputFormat.RAW,
        persist_progress=False, 
        max_batch_size = 5)
    ).start()


# Future inserted data into the base table will be reflected in the subscriptions
# df = pd.DataFrame(
#     {
#         "product": ["foo", "bar"],
#         "price": [1000.2, 5000.4],
#         "ts": [datetime.strptime("2023-10-05 17:30:00", "%Y-%m-%d %H:%M:%S"), 
#                datetime.strptime("2023-10-05 17:31:20", "%Y-%m-%d %H:%M:%S")],
#     }
# )
# rw.insert(table_name="test", data=df)

For more details, please refer to the risingwave-py GitHub repo.