Redis is an open-source, in-memory data structure store, often referred to as a data structure server. RisingWave sinks data to Redis in the form of strings storing key-value pairs in the specified format (JSON
or TEMPLATE
), so a primary key must always be provided.
You can test out this process on your own device by using the redis-sink
demo in the integration_test directory of the RisingWave repository.
Prerequisites
Before sinking data from RisingWave to Redis, please ensure the following:
- The Redis database you want to sink to is accessible from RisingWave.
- Ensure you have an upstream materialized view or source in RisingWave that you can sink data from.
Syntax
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='redis',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
key = 'value' ) ]
[KEY ENCODE key_encode [(...)]]
;
Parameters
Name | Description |
---|
redis.url | Required. Choose either the Redis cluster address or a non-cluster Redis address. - If the address is a cluster address, it should be in the form of a JSON array, like
redis.url= '["redis://redis-server:6379/"]' . - If the address is a non-cluster address, it should be in the form of a string, like
redis.url= 'redis://redis-server:6379/' .
|
primary_key | Required. The primary keys of the sink. If necessary, use , to delimit the primary key columns. |
These options should be set in FORMAT data_format ENCODE data_encode (key = 'value')
, instead of the WITH
clause.
Field | Notes |
---|
data_format | Data format. Allowed formats:-
PLAIN : Output data with insert operations. -
UPSERT : Output data as a changelog stream.
|
data_encode | Data encoding. Supported encodings: JSON date : number of days since the Common Era (CE).
interval : P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S format string.
time without time zone : number of milliseconds past the last midnight.
timestamp : number of milliseconds since the Epoch.
TEMPLATE : converts data to the string specified by key_format /value_format .- Use
{column_name} to insert a column’s value.
- To include a literal
{ or } character, escape it with a backslash: \{ or \} . Backslash not followed by brace preserves its original meaning, like the first character in \\{ and this template outputs \{ .
|
force_append_only | If true, forces the sink to be PLAIN (also known as append-only), even if it cannot be. |
key_format | Specify the format for the key as a string. |
value_format | Specify the format for the value as a string. |
key_encode | Optional. - When specified, the key encode can only be
TEXT , and the primary key should be one and only one of the following types: varchar , bool , smallint , int , and bigint ; - When absent, both key and value will use the same setting of
ENCODE data_encode ( ... ) .
|
redis_value_type | Optional. Controls how data is written to Redis. For details, see examples below.string (Default): Key-value storage. key_format and value_format are required. pubsub : Publishes messages to Redis Pub/Sub channels. value_format is required. geospatial : Stores data using Redis’ geospatial index. key_format is required.
|
channel | Optional. Required if redis_value_type = 'pubsub' and channel_column is not set. The name of the Redis Pub/Sub channel to publish messages to. |
channel_column | Optional. Required if redis_value_type = 'pubsub' and channel is not set. The values from this column will be used as the names of the Redis Pub/Sub channels to publish messages to. Must be type of VARCHAR . |
longitude | Optional. Required if redis_value_type = 'geospatial' . Contains the longitude value. Must be of type FLOAT , REAL , or VARCHAR . |
latitude | Optional. Required if redis_value_type = 'geospatial' . Contains the latitude value. Must be of type FLOAT , REAL , or VARCHAR . |
member | Optional. Required when redis_value_type = 'geospatial' . Contains the member names for the geospatial set. Must be of type VARCHAR and part of the primary key. |
Example
This section provides examples for sinking data from RisingWave to Redis, covering key-value storage, geospatial data storage, and Pub/Sub messaging.
Key-value storage
This approach is suitable when you want to use Redis as a fast data store or cache, storing data as key-value pairs. Specify redis_value_type = 'string'
or omit this parameter, as string
is the default.
Assume we create a materialized view, bhv_mv
, from a source.
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
source_1;
We can sink data from bhv_mv
to Redis by creating a sink. Here, data_encode
is JSON
.
CREATE SINK redis_sink
FROM bhv_mv WITH (
connector = 'redis',
primary_key = 'user_id',
redis.url= 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);
We can sink data from bhv_mv
to Redis by creating a sink. Here, data_encode
is TEMPLATE
, so key_format
and value_format
must be defined.
CREATE SINK redis_sink_2
FROM bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
) FORMAT PLAIN ENCODE TEMPLATE (
force_append_only='true',
key_format = 'UserID:{user_id}',
value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}'
);
Geospatial data storage
This approach allows RisingWave to store geospatial data (longitude, latitude, member) in Redis. Make sure to specify FORMAT UPSERT ENCODE TEMPLATE
, redis_value_type = 'geospatial'
, key_format
, longitude
, latitude
, and member
.
Assume we have a table t1
:
CREATE TABLE t1(v1 float, v2 float, v3 varchar, v4 varchar);
We can sink geospatial data to Redis:
CREATE SINK s1
FROM
t1 WITH (
primary_key = 'v3,v4',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='geospatial',
longitude = 'v1',
latitude = 'v2',
member = 'v3',
key_format = 'abcd3:{v4}'
);
Now insert the geospatial data into the table t1
:
INSERT INTO t1 VALUES (1.1, 1.1, 'test','test');
You can retrieve the geospatial information from Redis:
GEOPOS abcd3:test test
------
1) 1) "1.10000044298171997"
2) "1.10000000482478555"
For more information, see Geospatial indexing.
Pub/Sub messaging
This approach allows RisingWave to publish messages to Redis channels. Make sure to specify FORMAT UPSERT ENCODE TEMPLATE
, redis_value_type = 'pubsub'
, and value_format
. At least one of channel
or channel_column
must also be specified.
Assume we have a table t1
:
CREATE TABLE t1(v1 float, v2 float, v3 varchar, v4 varchar);
We can sink data from t1
as messages to a predefined Redis Pub/Sub channel:
CREATE SINK s1
FROM
t1 WITH (
primary_key = 'v3,v4',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='pubsub',
channel= 'test123',
value_format = 'abcd3:{v4}'
);
Alternatively, you can publish to a dynamic channel using channel_column
:
CREATE SINK s1
FROM
t1 WITH (
primary_key = 'v3,v4',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT UPSERT ENCODE TEMPLATE
(redis_value_type ='pubsub',
channel_column = 'v3',
value_format = 'abcd3:{v4}'
);