Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt

Use this file to discover all available pages before exploring further.

Syntax

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
   connector='google_pubsub',
   connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
    key = 'value'
) ]
[KEY ENCODE key_encode [(...)]]
;

Basic parameters

ParameterDescription
pubsub.project_idRequired. The Pub/Sub Project ID.
pubsub.topicRequired. The Pub/Sub topic to publish messages.
pubsub.endpointRequired. The Pub/Sub endpoint. For real Google Cloud usage, set this to pubsub.googleapis.com. For a local emulator, set this to the emulator host and port (e.g., localhost:8900). Do not include a scheme prefix (https://) or angle brackets.
pubsub.emulator_hostOptional. The Pub/Sub emulator host. Set this only when testing locally against the Pub/Sub emulator. When this parameter is set, RisingWave skips TLS and authentication. Do not set this parameter for real GCP usage.
pubsub.credentialsOptional. The service account credentials used for authorization. Accepts either a raw JSON string or a secret reference in the form SECRET <secret_name>. The secret must contain the raw JSON content of the service account key file — it is not a file path. See Create credentials for a service account. The service account must have the roles/pubsub.publisher and roles/pubsub.viewer roles on the target topic.

FORMAT and ENCODE options

These options should be set in FORMAT data_format ENCODE data_encode (key = 'value'), instead of the WITH clause.
FieldNote
data_formatData format. Allowed format: PLAIN.
data_encodeData encode. Supported encode: JSON.
force_append_onlyRequired by default and must be true, which forces the sink to be PLAIN (also known as append-only).
key_encodeOptional. When specified, the key encode can only be TEXT, and the primary key should be one and only one of the following types: varchar, bool, smallint, int, and bigint; When absent, both key and value will use the same setting of ENCODE data_encode ( ... ).

IAM requirements

The service account used by the connector must have the following IAM roles on the target Pub/Sub topic:
RolePurpose
roles/pubsub.publisherPublish messages to the topic.
roles/pubsub.viewerRead topic metadata (required for the connector to verify the topic exists and is reachable).
Grant both roles using the Google Cloud Console or the gcloud CLI:
Example
gcloud pubsub topics add-iam-policy-binding TOPIC_NAME \
  --member="serviceAccount:SA_EMAIL" \
  --role="roles/pubsub.publisher"

gcloud pubsub topics add-iam-policy-binding TOPIC_NAME \
  --member="serviceAccount:SA_EMAIL" \
  --role="roles/pubsub.viewer"

Using Google Cloud Pub/Sub (non-emulator)

This section describes how to connect RisingWave to a real Google Cloud Pub/Sub topic.

Step 1: Store the service account key as a secret

CREATE SECRET stores a literal string value in RisingWave metadata. For Pub/Sub, pass the raw JSON content of the service account key file — not a file path.
Example
-- Paste the full JSON from your downloaded key file.
-- Key fields include: type, project_id, private_key_id, private_key, client_email.
CREATE SECRET pubsub_creds WITH (backend = 'meta') AS
'{
  "type": "service_account",
  "project_id": "my-project",
  "private_key_id": "KEY_ID",
  "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIE...\n-----END RSA PRIVATE KEY-----\n",
  "client_email": "my-sa@my-project.iam.gserviceaccount.com",
  "client_id": "CLIENT_ID",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  ...
}';
Paste the raw JSON content directly. Do not pass a file path such as /home/user/key.json. Passing a file path instead of the JSON content will cause authentication to fail.

Step 2: Create the sink

Example
CREATE TABLE IF NOT EXISTS personnel (id integer, name varchar);

CREATE SINK pubsub_sink
FROM personnel
WITH (
    connector = 'google_pubsub',
    pubsub.endpoint = 'pubsub.googleapis.com',
    pubsub.project_id = 'my-project',
    pubsub.topic = 'my-topic',
    pubsub.credentials = SECRET pubsub_creds,
) FORMAT PLAIN ENCODE JSON (
    force_append_only = 'true',
);

Example: local emulator

You can test the connector locally before deploying to Google Cloud. See Test locally with the Pub/Sub emulator. Configure the emulator in docker-compose.yaml:
services:
  pubsub-emulator:
    image: google/cloud-sdk:latest
    command: >
      gcloud beta emulators pubsub start --project=demo --host-port=0.0.0.0:8900
    ports:
      - "8900:8900"
Create the sink pointing at the emulator:
CREATE TABLE IF NOT EXISTS personnel (id integer, name varchar);

CREATE SINK pubsub_sink
FROM personnel
WITH (
    connector = 'google_pubsub',
    pubsub.endpoint = 'localhost:8900',
    pubsub.emulator_host = 'localhost:8900',
    pubsub.project_id = 'demo',
    pubsub.topic = 'test',
) FORMAT PLAIN ENCODE JSON (
    force_append_only = 'true',
);
Do not set pubsub.emulator_host when connecting to real Google Cloud. This parameter disables TLS and authentication, which are required for GCP.

Validation checklist

Before creating the sink, verify the following:
  • The Pub/Sub topic exists in the specified project.
  • The service account has both roles/pubsub.publisher and roles/pubsub.viewer on the topic.
  • The secret content is the raw JSON of the service account key, not a file path.
  • pubsub.endpoint is set to pubsub.googleapis.com for real GCP (no https:// prefix, no port suffix, no angle brackets).
  • pubsub.emulator_host is not set when connecting to real GCP.
  • After creating the sink, monitor throughput, latency, and successful publish metrics to confirm messages are flowing.

Common misconfigurations

IssueCauseFix
InvalidKeyFormatThe private_key field in the service account JSON has been corrupted or double-escaped during copy-paste (for example, \\n instead of \n for newlines within the PEM block). The downloaded key file from the Google Cloud Console already uses the correct \n escape sequences — do not modify them.Re-download the service account key from the Google Cloud Console and store it directly in the secret without reformatting.
PermissionDeniedThe service account is missing a required IAM role.Grant roles/pubsub.publisher and roles/pubsub.viewer on the target topic.
Transport error / connection refusedpubsub.endpoint contains an https:// prefix, angle brackets, or another unsupported format.Set pubsub.endpoint to the plain hostname pubsub.googleapis.com.
Authentication silently ignoredpubsub.emulator_host is set while connecting to real GCP.Remove pubsub.emulator_host for real GCP connections.
Credentials not foundpubsub.credentials contains a file path instead of JSON content.Paste the raw JSON content of the key file into the secret.