Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.risingwave.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

Syntax

ALTER SOURCE current_source_name
    alter_option;
alter_option depends on the operation you want to perform on the source. For all supported clauses, see the sections below.

Clause

ADD COLUMN

ALTER SOURCE source_name
    ADD COLUMN col_name data_type;
Parameter or clauseDescription
ADD COLUMNThis clause adds a column to the specified source.
col_nameThe name of the new column you want to add to the source.
data_typeThe data type of the newly added column. With the struct data type, you can create a nested table. Elements in a nested table need to be enclosed with angle brackets (<>).
-- Add a column named "v3" to a source named "src1"
ALTER SOURCE src1
    ADD COLUMN v3 int;
  • To alter columns in a source created with a schema registry, see REFRESH SCHEMA.
  • You cannot add a primary key column to a source or table in RisingWave. To modify the primary key of a source or table, you need to recreate the table.
  • You cannot remove a column from a source in RisingWave. If you intend to remove a column from a source, you’ll need to drop the source and create the source again.

CONNECTOR WITH

Added in v2.6.0.
ALTER SOURCE source_name CONNECTOR WITH (
  'property_name' = 'value',
  ...
);
ParameterDescription
CONNECTOR WITHAllows you to update connector-specific properties for an existing source without recreating it. Be aware that the properties defined in CONNECTION should be altered by ALTER CONNECTION.
property_nameWe currently support altering Kafka connectors only. Support for more sources and properties will be added in future release. For Kafka, supported parameters include: properties.client.id, properties.sync.call.timeout, properties.enable.auto.commit, properties.enable.ssl.certificate.verification, properties.fetch.max.bytes, properties.fetch.queue.backoff.ms, properties.fetch.wait.max.ms, properties.message.max.bytes, properties.queued.max.messages.kbytes, properties.queued.min.messages, properties.receive.message.max.bytes, properties.statistics.interval.ms, properties.ssl.endpoint.identification.algorithm, properties.security.protocol, properties.sasl.mechanism, properties.sasl.username, properties.sasl.password
Example
ALTER SOURCE user_events_src
CONNECTOR WITH (
  'properties.security.protocol' = 'SASL_SSL'
);
For routine, supported property updates, prefer using ALTER SOURCE ... CONNECTOR WITH. For advanced operational scenarios (such as updating properties not supported by ALTER SOURCE), you can use low-level risectl commands. These are intended for cluster operators and require direct access to the RisingWave binary. See Update source properties with risectl for details.

REFRESH SCHEMA

Fetch the latest schema from the schema registry and update the source schema.
ALTER SOURCE source_name REFRESH SCHEMA;
Currently when refreshing the schema registry of a source, it is not allowed to drop columns or change types.
For example, assume we have a source as follows:
Create a source
CREATE SOURCE src_user WITH (
    connector = 'kafka',
    topic = 'sr_pb_test',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
    schema.registry = 'http://message_queue:8081',
    message = 'test.User'
);
Then we can refresh its schema with the following statement:
Refresh schema
ALTER SOURCE src_user REFRESH SCHEMA;

RENAME TO

ALTER SOURCE source_name
    RENAME TO new_source_name;
Parameter or clauseDescription
RENAME TOThis clause changes the name of the source.
new_source_nameThe new name of the source.
-- Change the name of a source named "src" to "src1"
ALTER SOURCE src
   RENAME TO src1;

OWNER TO

ALTER SOURCE current_source_name
    OWNER TO new_user;
Parameter or clauseDescription
OWNER TOThis clause changes the owner of the source.
new_userThe new owner you want to assign to the source.
-- Change the owner of the source named "src" to user "user1"
ALTER SOURCE src OWNER TO user1;

SET SCHEMA

ALTER SOURCE current_source_name
    SET SCHEMA schema_name;
Parameter or clauseDescription
SET SCHEMAThis clause moves the source to a different schema.
schema_nameThe name of the schema to which the source will be moved.
-- Move the source named "test_source" to the schema named "test_schema"
ALTER SOURCE test_source SET SCHEMA test_schema;

SET PARALLELISM

Added in v2.3.0.
ALTER SOURCE source_name
SET PARALLELISM { TO | = } parallelism_number;
Parameter or clauseDescription
SET PARALLELISMThis clause controls the degree of parallelism for the targeted streaming job.
source_nameShould be a shared source.
parallelism_numberCan be ADAPTIVE or a fixed number (e.g., 1, 2, 3). Setting it to ADAPTIVE expands the job’s parallelism to all available units, while a fixed number locks it at that value. Setting it to 0 is equivalent to ADAPTIVE. The maximum allowed value is determined by the max_parallelism of the job. For more information, see Configuring maximum parallelism.
ALTER SOURCE s_kafka SET PARALLELISM = 2;

SET BACKFILL_PARALLELISM

ALTER SOURCE source_name
    SET BACKFILL_PARALLELISM { TO | = } parallelism_value [ DEFERRED ];
This statement controls the degree of parallelism used during the backfill phase of a source. It lets you tune resource usage for backfilling independently from the source’s steady-state parallelism.
Parameter or clauseDescription
SET BACKFILL_PARALLELISMSets the parallelism for the backfill phase of the streaming job.
parallelism_valueCan be ADAPTIVE or a fixed number (e.g., 1, 2, 3). Setting it to ADAPTIVE or 0 expands backfill parallelism to all available units. A fixed number locks it at that value.
DEFERREDOptional. When specified, the change takes effect after the current backfill completes, without interrupting an ongoing backfill. If omitted, the change is applied immediately.
-- Set backfill parallelism to a fixed value
ALTER SOURCE kafka_source SET BACKFILL_PARALLELISM = 4;

-- Set backfill parallelism to adaptive
ALTER SOURCE kafka_source SET BACKFILL_PARALLELISM = ADAPTIVE;

-- Defer the parallelism change until the current backfill completes
ALTER SOURCE kafka_source SET BACKFILL_PARALLELISM = 4 DEFERRED;

SET SOURCE_RATE_LIMIT

ALTER SOURCE source_name
    SET SOURCE_RATE_LIMIT { TO | = } { default | rate_limit_number };
Use this statement to modify the rate limit of a source. For the specific value of SOURCE_RATE_LIMIT, refer to How to view runtime parameters.
For a newly created materialized view on a source with historical data e.g. Kafka source, it will backfill from the source. The backfilling process will not be affected by the SOURCE_RATE_LIMIT of the source.To modify the rate limit of the backfilling process, please refer to SET BACKFILL_RATE_LIMIT.
Example
-- Alter the rate limit of a source to default
ALTER SOURCE kafka_source SET source_rate_limit TO default;
Example
-- Alter the rate limit of a source to 1000
ALTER SOURCE kafka_source SET source_rate_limit TO 1000;

SWAP WITH

ALTER SOURCE name
SWAP WITH target_name;
ParameterDescription
nameThe current name of the source to swap.
target_nameThe target name of the source you want to swap with.
-- Swap the names of the api_data source and the file_data source.
ALTER SOURCE api_data
SWAP WITH file_data;