The CREATE AGGREGATE command can be used to create user-defined aggregate functions (UDAFs). Currently, UDAFs are only supported in Python and JavaScript as embedded UDFs.
The name of the aggregate function that you want to declare in RisingWave.
argument_type
The data type of the input parameter(s) that the function expects to receive.
RETURNSreturn_type
The data type of the return value from the aggregate function.
LANGUAGE
The programming language used to implement the UDAF. Currently, Python and JavaScript are supported.
ASfunction_body
The source code of the UDAF.
In the function_body, the code should define several functions to implement the aggregate function.Required functions:
create_state() -> state: Create a new state.
accumulate(state, *args) -> state: Accumulate a new value into the state, returning the updated state.
Optional functions:
finish(state) -> value: Get the result of the aggregate function. If not defined, the state is returned as the result.
retract(state, *args) -> state: Retract a value from the state, returning the updated state. If not defined, the state can not be updated incrementally in materialized views and performance may be affected.
The following command creates an aggregate function named weighted_avg to calculate the weighted average.
Python UDAF
CREATE AGGREGATE weighted_avg(value int, weight int) RETURNS float LANGUAGE python AS $$def create_state(): return (0, 0)def accumulate(state, value, weight): if value is None or weight is None: return state (s, w) = state s += value * weight w += weight return (s, w)def retract(state, value, weight): if value is None or weight is None: return state (s, w) = state s -= value * weight w -= weight return (s, w)def finish(state): (sum, weight) = state if weight == 0: return None else: return sum / weight$$;
After creating aggregate functions, you can use them in SQL queries like any built-in aggregate functions.
Use UDAF
-- call UDAF in a batch querySELECT weighted_avg(value, weight) FROM (VALUES (1, 1), (NULL, 2), (3, 3)) AS t(value, weight);-----RESULT2.5-- call UDAF in a materialized viewCREATE TABLE t(value int, weight int);CREATE MATERIALIZED VIEW mv AS SELECT weighted_avg(value, weight) FROM t;INSERT INTO t VALUES (1, 1), (NULL, 2), (3, 3);FLUSH;SELECT * FROM mv;-----RESULT2.5