Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt

Use this file to discover all available pages before exploring further.

Added in v2.8.0.
RisingWave supports dynamic updates to source connector properties using risectl commands. This allows you to modify source configurations such as broker addresses, security credentials, and performance settings without dropping and recreating sources.

Overview

Three risectl commands are available for managing source properties:
  • alter-source-properties-safe: Update connector properties with automatic pause/resume workflow
  • reset-source-splits: Force re-discovery of source splits
  • inject-source-offsets: Manually inject specific offsets (advanced use only)

Prerequisites

Before using these commands, you need:
  • Access to the RisingWave cluster with appropriate permissions
  • The source ID from the rw_sources catalog table
  • risectl included in the RisingWave binary
Get the source ID:
SELECT id FROM rw_sources WHERE name = 'your_source_name';

alter-source-properties-safe

Updates source connector properties with an orchestrated pause/resume workflow.

Syntax

risectl meta alter-source-properties-safe \
    --source-id <SOURCE_ID> \
    --props '<JSON_PROPERTIES>' \
    [--reset-splits]
risectl is included in the pre-built RisingWave binary. Use the following command instead:
./risingwave ctl meta alter-source-properties-safe \
    --source-id <SOURCE_ID> \
    --props '<JSON_PROPERTIES>' \
    [--reset-splits]

Parameters

ParameterTypeRequiredDescription
--source-idintegerYesSource ID from rw_sources table
--propsJSON stringYesProperties to update as key-value pairs
--reset-splitsflagNoReset split assignments after update

Supported properties

Unlike ALTER SOURCE ... CONNECTOR WITH (which restricts updates to a predefined set of runtime-alterable fields), this risectl command bypasses the runtime-alterable field allowlist and can update valid source connector properties after connector validation. Kafka property updates are the primary documented and tested workflow. Use with caution. Below are common properties grouped by connector type.

Kafka

Connection and security:
  • properties.bootstrap.server
  • properties.security.protocol
  • properties.ssl.endpoint.identification.algorithm
  • properties.sasl.mechanism
  • properties.sasl.username
  • properties.sasl.password
  • properties.client.id
  • properties.enable.ssl.certificate.verification
Consumer performance:
  • properties.fetch.max.bytes
  • properties.fetch.wait.max.ms
  • properties.fetch.queue.backoff.ms
  • properties.queued.min.messages
  • properties.queued.max.messages.kbytes
  • properties.enable.auto.commit
Message size and monitoring:
  • properties.message.max.bytes
  • properties.receive.message.max.bytes
  • properties.statistics.interval.ms
Other:
  • properties.sync.call.timeout
  • group.id.prefix

CDC (MySQL, PostgreSQL, SQL Server, MongoDB)

  • cdc.source.wait.streaming.start.timeout
  • debezium.max.queue.size
  • debezium.queue.memory.ratio
The connector type (e.g., connector = 'kafka') cannot be changed. Only connector-specific properties can be updated.

Examples

Update Kafka broker address:
risectl meta alter-source-properties-safe \
    --source-id 123 \
    --props '{"properties.bootstrap.server": "new-kafka-cluster:9092"}' \
    --reset-splits
Update security settings:
risectl meta alter-source-properties-safe \
    --source-id 123 \
    --props '{
        "properties.security.protocol": "SASL_SSL",
        "properties.sasl.mechanism": "PLAIN",
        "properties.sasl.username": "new-user"
    }'
Update performance settings:
risectl meta alter-source-properties-safe \
    --source-id 123 \
    --props '{"properties.receive.message.max.bytes": "5000000"}'

How it works

The command follows this workflow:
  1. Pause: Pauses the streaming graph to stop source consumption
  2. Catalog update: Updates the source properties in the rw_sources catalog
  3. Validation: Validates the updated source configuration against the upstream connector
  4. Property propagation: Applies property changes to all running source readers
  5. Split reset (if --reset-splits is specified): Clears cached splits
  6. Resume: Resumes the streaming graph to restart consumption
All changes persist in the catalog and survive cluster restarts.

When to use

Use this command when you need to:
  • Migrate to a different Kafka cluster
  • Update security credentials
  • Tune performance parameters
  • Change any supported connector property

reset-source-splits

Forces rediscovery of source splits by clearing cached state.

Syntax

risectl meta reset-source-splits \
    --source-id <SOURCE_ID>
risectl is included in the pre-built RisingWave binary. Use the following command instead:
./risingwave ctl meta reset-source-splits \
    --source-id <SOURCE_ID>

Parameters

ParameterTypeRequiredDescription
--source-idintegerYesSource ID to reset splits for

Example

risectl meta reset-source-splits --source-id 123

How it works

This command:
  1. Clears the cached split state for the source
  2. Triggers an immediate re-discovery of splits from upstream
  3. Reassigns discovered splits on the next scheduling cycle
Resetting splits may cause data duplication or loss. When splits are reset, RisingWave clears the cached split state and re-discovers partitions/splits from the source. Depending on the connector’s offset tracking mechanism, this may result in re-consuming some messages. Monitor the source after resetting to detect any unexpected behavior.

When to use

Use this command when:
  • Upstream topology changes significantly (e.g., partition count changes)
  • Splits appear stale or incorrect
  • Following broker address updates with different partition assignments

inject-source-offsets

Manually injects specific offsets into source splits. This is an advanced command for operational troubleshooting.
This is an UNSAFE operation. Incorrect offsets will cause data inconsistency. Use only when you know the exact required offsets.

Syntax

risectl meta inject-source-offsets \
    --source-id <SOURCE_ID> \
    --offsets '<JSON_OFFSETS>'
risectl is included in the pre-built RisingWave binary. Use the following command instead:
./risingwave ctl meta inject-source-offsets \
    --source-id <SOURCE_ID> \
    --offsets '<JSON_OFFSETS>'

Parameters

ParameterTypeRequiredDescription
--source-idintegerYesSource ID to inject offsets for
--offsetsJSON stringYesSplit ID to offset mapping

Offset format

The --offsets parameter is a JSON object mapping split IDs to offset values. The format varies by connector type:
  • Kafka: Split ID is the partition number (e.g., "0", "1"). The offset value is the last consumed Kafka offset as a string. RisingWave resumes from the next offset (value + 1). Example: {"0": "1000", "1": "2000"} resumes partition 0 from offset 1001 and partition 1 from offset 2001.
  • Pulsar: Split ID is the full topic URL (e.g., persistent://public/default/my-topic). Offset is in the format "{ledger_id}:{entry_id}:{partition}:{batch_index}" (e.g., "1234:56:0:-1").
For CDC sources, the split ID and offset format are complex and connector-specific. The offset follows the Debezium envelope format. Refer to Monitor CDC progress for the actual offset structure before using this command with CDC sources.
All three commands work with multiple connector types. The properties listed above are the most commonly updated ones for Kafka and CDC connectors. For other connectors, refer to the connector-specific documentation for available property names.

Examples

Inject Kafka offsets (keys are partition IDs, values are last consumed offsets):
risectl meta inject-source-offsets \
    --source-id 123 \
    --offsets '{"0": "1000", "1": "2000", "2": "1500"}'

How it works

The command:
  1. Offset injection: Delivers the specified offsets to source executors via a barrier (without pausing the streaming graph)
  2. State update: Updates the split state for the specified splits. Splits not listed in --offsets are left untouched.
  3. Reader rebuild: Source readers are rebuilt automatically and consumption resumes from the connector-specific position derived from the injected offsets. For Kafka, consumption resumes from the next offset after the injected value.

When to use

Use this command only for:
  • Disaster recovery scenarios
  • Skipping corrupted data sections
  • Advanced operational troubleshooting when you know the exact offsets needed

Monitoring and verification

Check updated properties

Verify that properties were updated:
SELECT definition FROM rw_sources WHERE name = 'your_source_name';

Monitor source status

Monitor the source during updates: Use rw_fragments here because rw_fragment_parallelism does not include source objects.
SELECT
    fragment_id,
    distribution_type,
    state_table_ids,
    upstream_fragment_ids,
    flags,
    parallelism,
    max_parallelism,
    parallelism_policy
FROM rw_fragments
WHERE table_id = (
    SELECT id FROM rw_sources WHERE name = 'your_source_name'
);

Rollback

Properties can be reverted using the same alter-source-properties-safe command:
risectl meta alter-source-properties-safe \
    --source-id 123 \
    --props '{"properties.bootstrap.server": "old-broker:9092"}'

Best practices

  1. Get the source ID first: Always query the source ID before running commands
  2. Test in non-production: Test property updates in a development environment first
  3. Use reset-splits when needed: Include --reset-splits when changing broker addresses or significant topology changes
  4. Monitor after changes: Verify source behavior after property updates
  5. Avoid inject-source-offsets: Use offset injection only as a last resort for operational issues

Limitations

  • All three commands work with multiple connector types (Kafka, CDC, Pulsar, etc.).
  • The connector type cannot be changed (e.g., cannot change from Kafka to Pulsar).
  • Some properties are validated; invalid values will cause the command to fail.

See also

  • ALTER SOURCE: SQL command to alter source schema and settings
  • risectl: Overview of the risectl tool
  • Meta backup: Other risectl commands for metadata management