Syntax
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='mongodb',
connector_parameter = 'value', ...
);
Parameters
Parameter Name | Description |
---|
mongodb.url | The URL of MongoDB. |
type | Defines the type of the sink. Options include append-only or upsert . |
collection.name | The collection name where data should be written to or read from. For sinks, the format is db_name.collection_name . Data can also be written to dynamic collections, see collection.name.field below for more information. |
collection.name.field | Optional. The dynamic collection name where data should be sunk to. - If specified, the field value will be used as the collection name. The collection name format is the same as
collection.name . - If the field value is null or an empty string, then the
collection.name will be used as a fallback destination.
|
collection.name.field.drop | Optional. Controls whether the field value of collection.name.field should be dropped when sinking. Set this option to true to avoid the duplicate values of collection.name.field being written to the result collection. |
Data type mapping
MongoDB Type | RisingWave Type |
---|
Boolean | BOOLEAN |
32-bit integer | SMALLINT |
32-bit integer | INTEGER |
64-bit integer | BIGINT |
Double | REAL |
Double | DOUBLE |
Decimal128 | DECIMAL |
String | DATE |
String | VARCHAR |
String | TIME |
Date | TIMESTAMP WITHOUT TIME ZONE |
Date | TIMESTAMP WITH TIME ZONE |
String | INTERVAL |
Object | STRUCT |
Array | ARRAY |
Binary data | BYTEA |
Object | JSONB |
64-bit integer | SERIAL |
Examples
Below are some use cases for your reference.
Sink data with append-only
To create a sink with the append-only type:
CREATE sink t1_sink FROM t1
WITH (
connector='mongodb',
type = 'append-only',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t1'
);
In append-only mode, MongoDB will automatically generate an _id
field for each record, typically with a value of the ObjectId type. This is necessary because _id
is the primary key in MongoDB.
Sink data with upsert
To create a sink with the upsert type for a table with a single key:
CREATE sink t2_sink FROM t2
WITH (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t2',
primary_key='id'
);
Assuming the schema of t2
is:
name | type | pk |
---|
id | int | ✔ |
value | text | |
Given the record:
id | value |
---|
1 | ’example of record’ |
The record written to MongoDB will be:
{ "_id": 1, "id": 1, "value": "example of record" }
No redundant id
field will exist if the primary key of t2
is _id
.
CREATE TABLE t3(
a int,
b int,
value text,
primary key (a, b)
);
insert into t3 values(1, 2, 'abc');
CREATE sink t3_sink FROM t3
WITH (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t3',
primary_key='a,b'
);
The record written to MongoDB will be:
{ "_id": { "a": 1, "b": 2 }, "a": 1, "b": 2, "value": "abc" }
Dynamic collection name
Dynamic collection names are useful in certain scenarios. For example, a multi-tenant application may store its data using sharding, where tenant_id
is included as a prefix in the collection name, such as sharding_2024_01.tenant1_order
. This approach offers more flexibility and enables efficient data organization and retrieval based on specific tenant requirements.
To use a dynamic collection name:
CREATE sink t2_sink FROM t2
WITH (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t2',
collection.name.field = 'collection_name',
collection.name.field.drop = 'true',
primary_key='_id'
);
collection.name
: Serve as a fallback collection name if the value of collection.name.field
is empty or null. In this case, it defaults to demo.t2
.
collection.name.field
: Specify the field used for the collection name. This field must be of type varchar
.
collection.name.field.drop
: When set to true
, it avoids duplicate values of collection.name.field
in the result collection.