> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# PostgreSQL CDC

> Replicate data from PostgreSQL to RisingWave in real time using native CDC. Supports shared sources for multi-table ingestion, AWS RDS, Neon, and Supabase. No Kafka or Debezium required.

Need help generating SQL? Use [Claude Code](https://claude.ai/claude-code) or [Cursor](https://cursor.com) with the [RisingWave MCP server](https://github.com/risingwavelabs/risingwave-mcp) to generate and run SQL interactively.

This guide explains how to connect RisingWave to a PostgreSQL database to ingest data changes in real time using the native PostgreSQL CDC source connector.

RisingWave's PostgreSQL CDC connector is compatible with any PostgreSQL-compliant database that supports logical replication, and supports **PostgreSQL versions 10 through 17**.

## Prerequisites

Before using the native Postgres CDC connector in RisingWave, you need to configure your Postgres database properly.

* **[Set up a self-hosted PostgreSQL database](/ingestion/sources/postgresql/self-hosted)**
* **[Set up AWS RDS or Aurora PostgreSQL](/ingestion/sources/postgresql/aws-rds)**
* **[Set up a Neon serverless PostgreSQL database](/ingestion/sources/postgresql/neon)**
* **[Set up a Supabase project](/ingestion/sources/postgresql/supabase)**

## Connect to PostgreSQL

To ingest CDC data from PostgreSQL, you first create a shared source using the `CREATE SOURCE` statement. This source establishes the connection to the PostgreSQL database. Then, for each upstream table you want to ingest, you define a corresponding table in RisingWave using the `CREATE TABLE FROM SOURCE` statement.

### Create a shared source

Use the `CREATE SOURCE` statement to create a shared source.

```sql theme={null}
CREATE SOURCE [ IF NOT EXISTS ] <shared_source_name> WITH (
    connector='postgres-cdc',
    <field>=<value>, ...
);
```

### Create a table from the shared source

Next, create a table from the shared source to ingest data from a specific upstream PostgreSQL table. When defining this table in RisingWave, you must specify a primary key that matches the primary key of the upstream table. You also need to provide the name of the upstream table.

```sql theme={null}
CREATE TABLE [ IF NOT EXISTS ] <rw_table_name> (
    <column_name> <data_type> PRIMARY KEY , ...
    PRIMARY KEY ( <column_name>, ... )
)
[ INCLUDE timestamp AS <column_name> ]
WITH (
    snapshot='true'
)
FROM <shared_source_name> TABLE <upstream_table_name>;
```

### Basic connection example

```sql theme={null}
-- Create a shared CDC source
CREATE SOURCE shared_source WITH (
    connector='postgres-cdc',
    hostname='localhost',
    port='5432',
    username='your_user',
    password='your_password',
    database.name='your_database',
    schema.name='public' -- Optional, defaults to 'public'
);

-- Create a table from the source, representing a specific PostgreSQL table
CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM shared_source TABLE 'public.my_upstream_table';
```

## Parameters

These parameters are used in the `WITH` clause of a `CREATE SOURCE` statement.

| Parameter                   | Description                                                                                                                                                                                                  | Required |
| :-------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------- |
| `connector`                 | Must be set to `'postgres-cdc'`.                                                                                                                                                                             | Yes      |
| `hostname`                  | The hostname or IP address of your PostgreSQL database server.                                                                                                                                               | Yes      |
| `port`                      | The port number of your PostgreSQL database server. The default is 5432.                                                                                                                                     | Yes      |
| `username`                  | The username for connecting to your PostgreSQL database.                                                                                                                                                     | Yes      |
| `password`                  | The password for the PostgreSQL user.                                                                                                                                                                        | Yes      |
| `database.name`             | The name of the PostgreSQL database to connect to.                                                                                                                                                           | Yes      |
| `schema.name`               | The name of the PostgreSQL schema to capture changes from. Defaults to `'public'`.                                                                                                                           | No       |
| `slot.name`                 | The name of the PostgreSQL replication slot to use. If not specified, a unique name is generated. Each source needs a unique slot. Valid characters: lowercase letters, numbers, underscore. Max length: 63. | No       |
| `publication.name`          | The name of the PostgreSQL publication to use. Defaults to `'rw_publication'`.                                                                                                                               | No       |
| `publication.create.enable` | Whether to automatically create the publication if it doesn't exist. Defaults to `true`.                                                                                                                     | No       |
| `auto.schema.change`        | Set to `true` to enable automatic replication of DDL changes from Postgres. Defaults to `false`.                                                                                                             | No       |
| `ssl.mode`                  | SSL/TLS encryption mode. Accepted values: `disabled`, `preferred`, `required`, `verify-ca`, `verify-full`. Defaults to `disabled`.                                                                           | No       |
| `ssl.root.cert`             | The PEM-encoded root certificate for `verify-ca` or `verify-full` mode. Use a [secret](/operate/manage-secrets).                                                                                             | No       |
| `postgres.is.aws.rds`       | Set to `true` to specify that the upstream database is AWS RDS. This enhances RDS detection and supports scenarios that use a custom DNS endpoint. The default value is `false`.                             | No       |
| `transactional`             | Ensures that changes from a single upstream transaction are processed atomically. Defaults to `true` for shared sources.                                                                                     | No       |

<Note>
  For PostgreSQL CDC sources, RisingWave manages the publication and replication slot automatically. By default, RisingWave will create the publication and slot if they don't exist (`publication.create.enable = 'true'`). If you want to use an existing publication, set `publication.create.enable = 'false'`. This is useful when the RisingWave database user doesn't have `CREATE PUBLICATION` permissions.
</Note>

These parameters are used in the `WITH` clause of a `CREATE TABLE ... FROM source` statement.

| Parameter                       | Description                                                                                                                                                               |
| :------------------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `snapshot`                      | **Optional**. If `false`, disables the initial snapshot (backfill) of the table. Only new changes will be ingested. The default value is `true`.                          |
| snapshot.interval               | **Optional**. Specifies the barrier interval for buffering upstream events. The default value is 1.                                                                       |
| snapshot.batch\_size            | **Optional**. Specifies the batch size of a snapshot read query from the upstream table. The default value is 1000.                                                       |
| backfill.parallelism            | **Optional**. Controls the parallelism of CDC table backfill. When set to a value greater than 0, enables parallelized CDC backfill. The default value is `0` (disabled). |
| backfill\_num\_rows\_per\_split | **Optional**. Specifies number of rows per split for parallelized CDC backfill. Only effective when `backfill.parallelism > 0`. The default value is `100000`.            |
| backfill\_as\_even\_splits      | **Optional**. Whether to create even splits for parallelized CDC backfill. Only effective when `backfill.parallelism > 0`. The default value is `true`.                   |

For large tables, you can significantly speed up the initial data load by enabling parallelized backfill. Configure this feature using the `backfill.parallelism`, `backfill_num_rows_per_split`, and `backfill_as_even_splits` parameters.

```sql theme={null}
CREATE TABLE large_table (
  id integer primary key,
  data varchar
)
WITH (
  backfill.parallelism = '4',
  backfill.num_rows_per_split = '50000',
  backfill.as_even_splits = 'true'
)
FROM pg_mydb TABLE 'public.large_table';
```

### Debezium parameters

You can also specify any valid [Debezium PostgreSQL connector configuration property](https://debezium.io/documentation/reference/2.6/connectors/postgresql.html#postgresql-advanced-configuration-properties) in the `WITH` clause. Prefix the Debezium parameter name with `debezium.`.

For example, to skip unknown DDL statements, use:

```sql theme={null}
debezium.schema.history.internal.skip.unparseable.ddl = 'true'
```

## Features and reference

### Data format

The PostgreSQL CDC connector uses the Debezium JSON format for data.

### Supported data types

The following table shows the data type mapping from PostgreSQL to RisingWave.

<Note>
  RisingWave does not support directly creating tables from PostgreSQL composite types. If you want to read composite type data, you will need to use a source and create a materialized view based off that source.
</Note>

| PostgreSQL type                                      | RisingWave type                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| :--------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BOOLEAN                                              | BOOLEAN                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| BIT(1)                                               | BOOLEAN                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| BIT( > 1)                                            | No support                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| BIT VARYING\[(M)]                                    | No support                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| SMALLINT, SMALLSERIAL                                | SMALLINT                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| INTEGER, SERIAL                                      | INTEGER                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| BIGINT, BIGSERIAL, OID                               | BIGINT                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| REAL                                                 | REAL                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| DOUBLE PRECISION                                     | DOUBLE PRECISION                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| CHAR\[(M)]                                           | CHARACTER VARYING                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| VARCHAR\[(M)]                                        | CHARACTER VARYING                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| CHARACTER\[(M)]                                      | CHARACTER VARYING                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| CHARACTER VARYING\[(M)]                              | CHARACTER VARYING                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE                | TIMESTAMP WITH TIME ZONE                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| TIMETZ, TIME WITH TIME ZONE                          | TIME WITHOUT TIME ZONE (assume UTC time zone)                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| INTERVAL \[P]                                        | INTERVAL                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| BYTEA                                                | BYTEA                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| JSON, JSONB                                          | JSONB                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| XML                                                  | CHARACTER VARYING                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| UUID                                                 | CHARACTER VARYING                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| POINT                                                | STRUCT (with form `<x REAL, y REAL>`)                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| LTREE                                                | No support                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| CITEXT                                               | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| INET                                                 | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| INT4RANGE                                            | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| INT8RANGE                                            | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| NUMRANGE                                             | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| TSRANGE                                              | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| TSTZRANGE                                            | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| DATERANGE                                            | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| ENUM                                                 | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| DATE                                                 | DATE                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to \[1973-03-03 09:46:40, 5138-11-16 09:46:40))                                                                                                                                                                                                                                                                                                                                                                                                  |
| TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)             | TIMESTAMP WITHOUT TIME ZONE (limited to \[1973-03-03 09:46:40, 5138-11-16 09:46:40))                                                                                                                                                                                                                                                                                                                                                                                             |
| TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP  | TIMESTAMP WITHOUT TIME ZONE                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| NUMERIC\[(M\[,D])], DECIMAL\[(M\[,D])]               | numeric, [rw\_int256](/sql/data-types/rw-int256), or varchar. numeric supports values with a precision of up to 28 digits, and any values beyond this precision will be treated as NULL. To process values exceeding 28 digits, use rw\_int256 or varchar instead. When creating a table, make sure to specify the data type of the column corresponding to numeric as rw\_int256 or varchar. Note that rw\_int256 treats inf, -inf, nan, or numeric with decimal parts as NULL. |
| MONEY\[(M\[,D])]                                     | NUMERIC                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| HSTORE                                               | No support                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| HSTORE                                               | No support                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| INET                                                 | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| CIDR                                                 | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| MACADDR                                              | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| MACADDR8                                             | CHARACTER VARYING\*                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| VECTOR(N)                                            | [VECTOR(N)](/sql/data-types/vector)                                                                                                                                                                                                                                                                                                                                                                                                                                              |

`pgvector` columns are supported when the upstream column is declared as `vector(n)` with a defined dimension. Dimension-less `vector`, `halfvec`, and `sparsevec` are not supported.

### Support for PostgreSQL TOAST

<Note>
  Added in v2.6.0.
</Note>

RisingWave supports [TOASTed (The Oversized-Attribute Storage Technique) data](https://www.postgresql.org/docs/current/storage-toast.html) from PostgreSQL when using the CDC connector. This ensures that even columns with very large values, such as long text or large JSON objects, are ingested completely and accurately during both the initial backfill and incremental changes.

Supported TOAST-able data types in Postgres:

* Standard types: `varchar`, `text`, `xml`, `jsonb`, `bytea`.
* One-dimensional array of the above types: `varchar[]`, `text[]`, `jsonb[]`, `bytea[], xml[]`.

<Note>
  RisingWave currently supports the TOAST-able data types mentioned above. Other types that may trigger TOAST, mainly simple one-dimensional arrays with low probability, are not yet supported. For more details, please refer to the [issue](https://github.com/risingwavelabs/risingwave/issues/22916).
</Note>

The example below demonstrates how RisingWave ingests large data that triggers PostgreSQL's TOAST mechanism and ensures that large fields are not lost even when non-TOAST columns are updated.

1. Create a table with TOAST-able columns in PostgreSQL.

```
CREATE TABLE test_toast (
    id int PRIMARY KEY,
    v_text text,
    v_json jsonb,
    v_bytea bytea
);
```

2. Insert large TOAST data in PostgreSQL.

```
INSERT INTO test_toast (id, v_text, v_json, v_bytea)
VALUES (
    1,
    repeat('long_text_', 10000),  -- large text triggers TOAST
    jsonb_build_object('data', repeat('x', 50000)),  -- large JSON triggers TOAST
    repeat('a', 2000000)::bytea  -- large bytea triggers TOAST
);
```

3. Create a RisingWave source from PostgreSQL CDC.

```
CREATE SOURCE src_test_toast WITH (
    connector = 'postgres-cdc',
    hostname = 'localhost',
    port = 5432,
    username = 'pguser',
    password = 'pgpassword',
    database.name = 'mydb',
    schema.name = 'public',
    slot.name = 'rw_slot_toast_example'
);
```

4. Create a RisingWave table from the source.

```
CREATE TABLE rw_test_toast(*) FROM src_test_toast TABLE 'public.test_toast';
```

5. Verify data is ingested in RisingWave with TOAST preserved.

```
SELECT
    id,
    CASE WHEN octet_length(v_text) > 15000 THEN 'toast-triggered' ELSE 'small' END AS v_text_check,
    CASE WHEN octet_length(v_json::text) > 50000 THEN 'toast-triggered' ELSE 'small' END AS v_json_check,
    CASE WHEN octet_length(v_bytea) > 1500000 THEN 'toast-triggered' ELSE 'small' END AS v_bytea_check
FROM rw_test_toast
WHERE id = 1;
```

6. Update non-TOAST column to test placeholder handling.

```
ALTER TABLE test_toast ADD COLUMN v_counter int DEFAULT 0;
UPDATE test_toast SET v_counter = 1 WHERE id = 1;
```

Verify TOAST columns remain intact.

```
SELECT
    id,
    octet_length(v_text) AS text_len,
    octet_length(v_json::text) AS json_len,
    octet_length(v_bytea) AS bytea_len,
    v_counter
FROM rw_test_toast
WHERE id = 1;
```

RisingWave can process changes to rows with TOASTed data even if the upstream table's `REPLICA IDENTITY` is set to `default`. It achieves this by leveraging its own materialized state of the source data. When an `UPDATE` or `DELETE` event occurs, RisingWave uses the record stored within its materialized view to construct the full change event, rather than relying on the `before` field in the CDC message.

### Use dbt to ingest data from PostgreSQL CDC

Here is an example of how to use dbt to ingest data from PostgreSQL CDC. In this dbt example, `source` and `table_with_connector` models will be used. For more details about these two models, please refer to [Use dbt for data transformations](/integrations/other/dbt#define-dbt-models).

First, we create a `source` model `pg_mydb.sql`.

```sql theme={null}
{{ config(materialized='source') }}
CREATE SOURCE {{ this }} WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '8306',
    username = 'root',
    password = '123456',
    database.name = 'mydb',
    slot.name = 'mydb_slot'
);
```

And then we create a `table_with_connector` model `tt3.sql`.

```sql theme={null}
{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (
    v1 integer primary key,
    v2 timestamp with time zone
) FROM {{ ref('pg_mydb') }} TABLE 'public.tt3';
```

### Extract metadata from sources

The `INCLUDE` clause allows you to ingest fields *not* included in the main Debezium payload (such as metadata). See [Extracting metadata from sources](/ingestion/extract-metadata-from-sources) for details. The available fields are:

* `timestamp`
* `partition`
* `offset`
* `database_name`
* `collection_name`

### Automatically map upstream table schema

RisingWave supports automatically mapping the upstream table schema when creating a CDC table from a PostgreSQL CDC source.
Instead of defining columns individually, you can use `*` when creating a table to ingest all columns from the source table. Note that `*` cannot be used if other columns are specified in the table creation process.

```sql theme={null}
CREATE TABLE <table_name> (*) FROM <source_name> TABLE '<schema_name>.<table_name>';
```

### Auto schema change

<Tip>
  **PREMIUM FEATURE**

  This is a premium feature. For a comprehensive overview of all premium features and their usage, please see [RisingWave premium features](/get-started/premium-features).
</Tip>

RisingWave supports auto schema changes in Postgres CDC. It ensures that your RisingWave pipeline stays synchronized with any schema changes in the source database, reducing the need for manual updates and preventing inconsistencies.

Currently, RisingWave supports the `ALTER TABLE` command with the following operations, and we plan to add support for additional DDL operations in future releases.

* `ADD COLUMN [DEFAULT expr]`: Allows you to add a new column to an existing table. Only constant value expressions are supported for the default value.
* `DROP COLUMN`: Allows you to remove an existing column from a table.

To enable this feature, set `auto.schema.change = 'true'` in your PostgreSQL CDC source configuration:

```SQL theme={null}
CREATE SOURCE pg_source WITH (
 connector = 'postgres-cdc',
 hostname = 'localhost',
 port = '5432',
 username = 'your_user',
 password = 'your_password',
 database.name = 'your_database',
 schema.name = 'public',
 auto.schema.change = 'true'
);
```

Create a RisingWave table from the PostgreSQL source:

```SQL theme={null}
CREATE TABLE my_table (
    id INT PRIMARY KEY,
    name VARCHAR
)
FROM pg_source TABLE 'public.my_upstream_table';
```

Add columns to the PostgreSQL table and observe the changes in RisingWave:

```sql theme={null}
-- In your PostgreSQL database:
ALTER TABLE my_upstream_table ADD COLUMN v1 VARCHAR(255);
ALTER TABLE my_upstream_table ADD COLUMN v2 NUMERIC(5,2);
```

After the changes in the upstream table, the schema of the table in RisingWave will also be changed. You can verify this by running `DESCRIBE my_table;` in RisingWave.

```sql theme={null}
-- In RisingWave:
DESCRIBE my_table;
```

### Ingest data from a partitioned table

RisingWave supports ingesting data from a partitioned table. To configure a publication for your CDC stream, PostgreSQL, by default, creates publications with `publish_via_partition_root = false`. This setting causes replication slot events to contain separate events for each partition, rather than for the root partitioned table.

If you need to read from the partitioned table, you should explicitly set this property to `TRUE` when creating a publication. Execute the following command in your upstream PostgreSQL database:

```sql theme={null}
CREATE PUBLICATION publication_name FOR table_name WITH (publish_via_partition_root = true);
```

If you let RisingWave create the publication, it will automatically set `publish_via_partition_root = true`.

Please be aware that PostgreSQL does not support adding both a partitioned table and its individual partitions to the same publication; however, it does not generate an error if attempted. If you need to ingest data from both the root table and its partitions, you should create separate publications for each. Otherwise, you will not be able to read from the table partitions. Meanwhile, in RisingWave, you should create separate sources with dedicated publication names for the partitioned table and its partitions.

### Expression as a column

RisingWave allows users to define expressions as table columns. For example, in the SQL statement below, `next_id` is not a column from the source PostgreSQL table. Instead, it is a generated column that RisingWave computes dynamically while ingesting data. The value of `next_id` for each row is always equal to `id + 1`:

```sql theme={null}
CREATE TABLE person (
  id integer PRIMARY KEY,
  name varchar,
  next_id int AS id + 1,
  PRIMARY KEY (id)
) FROM pg_mydb TABLE 'public.person';
```

Currently, generated columns must appear at the end of the schema definition. If a column from the upstream source appears after a generated column, RisingWave will return an error. For example, the following statement will fail because `name`, an upstream column, is placed after the generated column `next_id`:

```sql theme={null}
CREATE TABLE person (
  id integer PRIMARY KEY,
  next_id int AS id + 1,
  name varchar,
  PRIMARY KEY (id)
) FROM pg_mydb TABLE 'public.person';
```

To avoid errors, ensure that all generated columns are positioned at the end of the schema definition.

### Time travel

RisingWave does not support time travel for the native PostgreSQL CDC connector.

## What's next?

* **Supported data formats:** [Data formats and encoding options](/ingestion/formats-and-encoding-options)
* [Monitor CDC ingestion progress](/ingestion/monitor-cdc-progress)
