INCLUDE
clause to extract fields not included in the main message payload (the data defined by your schema) as separate columns. This is useful for accessing metadata like Kafka message offsets, timestamps, or keys.
Syntax
-
{ header | key | offset | partition | timestamp | payload | subject | file | database_name | collection_name }
: The field you want to include. The available fields depend on the connector (see Supported Connectors below). -
[AS column_name]
: (Optional) The name you want to give to the new column in RisingWave. If you omit AS column_name, RisingWave uses a default name in the format rw_, where:{connector}
is the connector name (e.g., kafka, pulsar).{col}
is the type of column (e.g., key, offset, timestamp).- Example: For an offset column from Kafka, the default name would be _rw_kafka_offset.
When using UPSERT formats (which handle updates and deletes based on a key), you must use INCLUDE KEY.
You cannot define a multi-column primary key in this case; the included key becomes the primary key.
Supported connectors
The INCLUDE clause is supported by several RisingWave connectors. For the specific fields supported by each connector, and their default data types, see the documentation for the individual connector. For example:- Connect to Kafka
- Connect to Kinesis
- Connect to Pulsar
- Connect to MongoDB CDC
- Connect to NATS JetStream
- Connect to MQTT
- Connect to S3, GCS, or Azure Blob
Examples
Include all supported fields
Here we create a table, additional_columns, that ingests data from a Kafka broker. Aside from the a column, which is part of the message payload, the additional fields key, partition, offset, timestamp, header, and payload are also added to the table.Include a timestamp column
Include header columns
There are two ways to define header columns. You can choose to generate headers with typeList[Struct<Varchar, Bytea>]
.
header_col
field can only be defined when including a header.
In this case, the generated column name will have the format _rw_kafka_header_{header col name}_{col type}
, where col type
is the data type of the header column.