This guide describes how to sink data from RisingWave to Elasticsearch using the Elasticsearch sink connector in RisingWave.

Elasticsearch is a distributed, RESTful search and analytics engine capable of addressing a growing number of use cases. It centrally stores your data for lightning-fast search, fine‑tuned relevancy, and powerful analytics that scale with ease.

The Elasticsearch sink connecter in RisingWave will perform index operations via the bulk API, flushing updates whenever one of these criteria is met:

  • 1,000 operations
  • 5mb of updates
  • 5 seconds since the last flush (assuming new actions are queued)

PUBLIC PREVIEW

This feature is in the public preview stage, meaning it’s nearing the final product but is not yet fully stable. If you encounter any issues or have feedback, please contact us through our Slack channel. Your input is valuable in helping us improve the feature. For more information, see our Public preview feature list.

NOTE

The Elasticsearch sink connector in RisingWave provides at-least-once delivery semantics. Events may be redelivered in case of failures.

Prerequisites

  • Ensure the Elasticsearch cluster (version 7.x or 8.x) is accessible from RisingWave.
  • If you are running RisingWave locally from binaries, make sure that you have JDK 11 or later versions installed in your environment.

Create an Elasticsearch sink

Use the following syntax to create an Elasticsearch sink. Once a sink is created, any insert or update to the sink will be streamed to the specified Elasticsearch endpoint.

CREATE SINK sink_name
[ FROM sink_from | AS select_query ]
WITH (
  connector = 'elasticsearch',
  primary_key = '<primary key of the sink_from object>',
  { index = '<your Elasticsearch index>' | index_column = '<your index column>' },
  url = 'http://<ES hostname>:<ES port>',
  username = '<your ES username>',
  password = '<your password>',
  delimiter='<delimiter>'
);

Parameters

ParameterDescription
sink_nameName of the sink to be created.
sink_fromA clause that specifies the direct source from which data will be output. sink_from can be a materialized view or a table. Either this clause or a SELECT query must be specified.
AS select_queryA SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See SELECT for the syntax and examples of the SELECT command.
primary_keyOptional. The primary keys of the sink. If the primary key has multiple columns, set a delimiter in the delimiter parameter below to join them.
indexRequired if index_column is not set. Name of the Elasticsearch index that you want to write data to.
index_columnAllows writing to multiple indexes dynamically based on the column’s value. This parameter is mutually exclusive with index. When index is set, the write index is index; When index_column is set, the target index is the value of this column, and it must be of string type. Avoiding setting this column as the first column, because the sink defaults to the first column as the key.
routing_columnOptional. Allows setting a column as a routing key, so that it can direct writes to specific shards based on its value.
retry_on_conflictOptional. Number of retry attempts after an optimistic locking conflict occurs.
batch_size_kbOptional. Maximum size (in kilobytes) for each request batch sent to Elasticsearch. If the data exceeds this size,the current batch will be sent.
batch_num_messagesOptional. Maximum number of messages per request batch sent to Elasticsearch. If the number of messages exceeds this size,the current batch will be sent.
concurrent_requestsOptional. Maximum number of concurrent threads for sending requests.
urlRequired. URL of the Elasticsearch REST API endpoint.
usernameOptional. elastic user name for accessing the Elasticsearch endpoint. It must be used with password.
passwordOptional. Password for accessing the Elasticsearch endpoint. It must be used with username.
delimiterOptional. Delimiter for Elasticsearch ID when the sink’s primary key has multiple columns.

NOTE

For versions under 8.x, there was once a parameter type. In Elasticsearch 6.x, users could directly set the type, but starting from 7.x, it is set to not recommended and the default value is unified to _doc. In version 8.x, the type has been completely removed. See Elasticsearch’s official documentation for more details.

So, if you are using Elasticsearch 7.x, we set it to the official’s recommended value, which is _doc. If you are using Elasticsearch 8.x, this parameter has been removed by the Elasticsearch official, so no setting is required.

Notes about primary keys and Elasticsearch IDs

The Elasticsearch sink defaults to the upsert sink type. It does not support the append-only sink type.

If you want to customize your Elasticsearch ID, please specify it via the primary_key parameter. RisingWave will combine multiple primary key values into a single string with the delimiter you set, and use it as the Elasticsearch ID.

If you don’t want to customize your Elasticsearch ID, RisingWave will use the first column in the sink definition as the Elasticsearch ID.

Data type mapping

ElasticSearch uses a mechanism called dynamic field mapping to dynamically create fields and determine their types automatically. It treats all integer types as long and all floating-point types as float. To ensure data types in RisingWave are mapped to the data types in Elasticsearch correctly, we recommend that you specify the mapping via index templates or dynamic templates before creating the sink.

RisingWave Data TypeElasticSearch Field Type
booleanboolean
smallintlong
integerlong
bigintlong
numerictext
realfloat
double precisionfloat
character varyingtext
byteatext
datedate
time without time zonetext
timestamp without time zonetext
timestamp with time zonetext
intervaltext
structobject
arrayarray
JSONBobject (RisingWave’s Elasticsearch sink will send JSONB as a JSON string, and Elasticsearch will convert it into an object)

NOTE

Elasticsearch doesn’t require users to explicitly CREATE TABLE. Instead, it infers the schema on-the-fly based on the first record ingested. For example, if a record contains a jsonb {v1: 100}, v1 will be inferred as a long type. However, if the next record is {v1: "abc"}, the ingestion will fail because "abc" is inferred as a string and the two types are incompatible.

This behavior should be noted, or your data may be less than it should be. In terms of monitoring, you can check out Grafana, where there is a panel for all sink write errors.