Citus database is an extension to PostgreSQL that transforms PostgreSQL into a distributed database. For more details, see Citus data.

PUBLIC PREVIEW

This feature is in the public preview stage, meaning it’s nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.

Set up Citus

  1. Create a superuser role in the Citus cluster.
---create role on coordinator node
CREATE ROLE dbz Superuser LOGIN;
---create role on worker nodes
SELECT run_command_on_workers ($cmd$
CREATE ROLE dbz Superuser LOGIN;
$cmd$);
  1. Ensure that wal_level is logical on each worker node by modifying postgresql.conf.
  2. Set the replica identity to FULL for the table you want to ingest data from.
---execute on coordinator node
ALTER TABLE github_events REPLICA IDENTITY FULL;

Limitations

There are a few limitations when ingesting CDC data from Citus in RisingWave.

  • A PostgreSQL superuser role is required.
  • Only distributed tables are supported.
  • Newly added worker nodes are not detected.

Notes about running RisingWave from binaries

If you are running RisingWave locally from binaries and intend to use the native CDC source connectors or the JDBC sink connector, make sure that you have JDK 11 or later versions installed in your environment.

Create a table in RisingWave using the native CDC connector

Syntax

Note that a primary key must be specified.

CREATE TABLE [ IF NOT EXISTS ] source_name (
    column_name data_type PRIMARY KEY , ...
    [PRIMARY KEY ( column_name, ... )]
)
WITH (
    connector='citus-cdc',
    connector_parameter='value', ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];

Connector parameters

Unless specified otherwise, the fields listed are required. Note that the value of these parameters should be enclosed in single quotation marks.

FieldNotes
hostnameHostname of the coordinator node.
portPort number of the coordinator node.
usernameUsername of the database.
passwordPassword of the database.
database.serversThe host and port of Citus worker nodes.
database.nameName of the database.
schema.nameOptional. Name of the schema. By default, the value is public.
table.nameName of the table that you want to ingest data from.
slot.nameOptional. The slot name for each source. Each source should have a unique slot name.

Examples

CREATE TABLE github_events_rw (
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    user_id bigint,
    org jsonb,
    created_at timestamp,
    PRIMARY KEY (event_id, user_id)
) WITH (
    connector = 'citus-cdc',
    hostname = '127.0.0.1',
    port = '5432',
    username = 'dbz',
    password = '123456',
    database.servers = '172.31.29.245:5432,172.31.31.177:5432',
    database.name = 'postgres',
    schema.name = 'public',
    table.name = 'github_events',
    slot.name = 'github_events_dbz_slot1',
);