> ## Documentation Index
> Fetch the complete documentation index at: https://docs.risingwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Use agents to analyze data ingested into RisingWave

> Build an AI agent that connects RisingWave with Anthropic's LLM to query streaming data using natural language.

## Overview

This demo shows you how to build an AI agent that connects RisingWave with Anthropic's LLM. The agent processes natural language queries and returns clean, readable results to users.

The agent includes several key components:

* Table extraction from SQL-like queries
* Dynamic table schema fetching
* MCP tool invocation
* Result parsing and formatting
* Chat loop management between RisingWave and Anthropic

In this tutorial, you'll learn how to:

* Implement a custom Anthropic agent
* Integrate it with RisingWave MCP
* Perform simple stream processing with RisingWave

For a complete project reference, clone the [awesome-stream-processing repository](https://github.com/risingwavelabs/awesome-stream-processing/).

## Prerequisites

Before you begin, you need:

* **RisingWave**: Install and run RisingWave. See the [Quick start guide](https://docs.risingwave.com/get-started/quickstart).
* **Anthropic API key**: Get your key from the [Anthropic console](https://console.anthropic.com/settings/keys).
* **PostgreSQL client**: Install `psql`. See [Download PostgreSQL](https://www.postgresql.org/download/).
* **RisingWave MCP**: Clone the [RisingWave MCP repository](https://github.com/risingwavelabs/risingwave-mcp.git).

## Step 1: Set up your environment

Create a `.env` file with your RisingWave connection configuration:

```env theme={null}
RISINGWAVE_HOST=0.0.0.0
RISINGWAVE_USER=root
RISINGWAVE_PASSWORD=root
RISINGWAVE_PORT=4566
RISINGWAVE_DATABASE=dev
RISINGWAVE_SSLMODE=disable
RISINGWAVE_TIMEOUT=30
```

## Step 2: Create the agent

### Import the required dependencies

Import these modules for your agent:

```python theme={null}
"""Client for interacting with RisingWave via MCP and Anthropic."""

import os
import sys
import asyncio
import json
import re
from fastmcp import Client
from anthropic import Anthropic
from dotenv import load_dotenv
```

### Build the main agent class

The `RisingWaveMCPAgent` class manages connections and caches. Adjust the environment variables if you're connecting to a remote RisingWave instance.

```python theme={null}
class RisingWaveMCPAgent:
    """Agent for interacting with RisingWave via MCP and Anthropic."""

    def __init__(self, server_script_path: str):
        """Initialize the agent and set up environment variables."""
        self.client = Client(server_script_path)
        env = {
            "RISINGWAVE_HOST": os.getenv("RISINGWAVE_HOST", "0.0.0.0"),
            "RISINGWAVE_USER": os.getenv("RISINGWAVE_USER", "root"),
            "RISINGWAVE_PASSWORD": os.getenv("RISINGWAVE_PASSWORD", "root"),
            "RISINGWAVE_PORT": os.getenv("RISINGWAVE_PORT", "4566"),
            "RISINGWAVE_DATABASE": os.getenv("RISINGWAVE_DATABASE", "dev"),
            "RISINGWAVE_SSLMODE": os.getenv("RISINGWAVE_SSLMODE", "disable")
        }
        self.client.transport.env = env
        self.anthropic = Anthropic()
        self.conversation = []
        self._tools_cache = None
        self.table_cache = set()
        self.mv_cache = set()
        self.table_descriptions = {}
        self.mv_descriptions = {}
        self._init_done = False

    async def initialize_caches(self):
        """Initialize and cache table and materialized view names and their descriptions."""
        if self._init_done:
            return
        # Get table names
        try:
            tables_result = await self.client.call_tool("show_tables", {})
            tables = json.loads(tables_result) if isinstance(tables_result, str) else tables_result
            if isinstance(tables, list):
                for t in tables:
                    name = t[0] if isinstance(t, list) else t.get("table_name") or t.get("name")
                    if name:
                        self.table_cache.add(name)
                        # Cache description
                        try:
                            desc = await self.client.call_tool(
                                "describe_table", {"table_name": name}
                            )
                            self.table_descriptions[name] = desc
                        except Exception:
                            pass
        except Exception:
            pass
        # Get materialized view names
        try:
            mvs_result = await self.client.call_tool("list_materialized_views", {})
            mvs = json.loads(mvs_result) if isinstance(mvs_result, str) else mvs_result
            if isinstance(mvs, list):
                for mv in mvs:
                    name = mv[0] if isinstance(mv, list) else mv.get("name") or mv.get("mv_name")
                    if name:
                        self.mv_cache.add(name)
                        # Cache description
                        try:
                            desc = await self.client.call_tool(
                                "describe_materialized_view", {"mv_name": name}
                            )
                            self.mv_descriptions[name] = desc
                        except Exception:
                            pass
        except Exception:
            pass
        self._init_done = True

    async def list_tools(self):
        """List available tools from the client."""
        if self._tools_cache is None:
            tools = await self.client.list_tools()
            self._tools_cache = [{
                "name": tool.name,
                "description": tool.description,
                "input_schema": tool.inputSchema
            } for tool in tools]
        return self._tools_cache

    async def __aenter__(self):
        """Async context manager entry."""
        await self.client.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        """Async context manager exit."""
        await self.client.__aexit__(exc_type, exc, tb)
```

### Add core functionality

Add these functions to handle tool usage and provide table context:

```python theme={null}
    async def call_tool(self, tool_name, args):
        """Call a tool by name with arguments."""
        return await self.client.call_tool(tool_name, args)

    async def fetch_schema_context(self, table_names):
        """Fetch schema context for a list of table names."""
        async def fetch(table):
            try:
                schema = await self.call_tool("describe_table", {"table_name": table})
                return f"Schema for table '{table}':\n{schema}"
            except Exception as exc:
                return f"Could not fetch schema for table '{table}': {exc}"
        return await asyncio.gather(*(fetch(table) for table in set(table_names)))

    async def handle_tool_use(self, content, final_text):
        """Handle tool use content from the LLM."""
        tool_name = content.name
        tool_args = content.input
        result = await self.call_tool(tool_name, tool_args)
        final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
        final_text.append(result)
        self.conversation.append({"role": "user", "content": result})

        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=self.conversation,
        )
        self.handle_llm_response(response, final_text)

    def handle_llm_response(self, response, final_text):
        """Handle the LLM response and update conversation."""
        if response.content and response.content[0].type == 'text':
            final_text.append(response.content[0].text)
            self.conversation.append({"role": "assistant", "content": response.content[0].text})
```

Add these final methods to handle query processing and manage the chat loop:

```python theme={null}
    async def process_query(self, query: str) -> str:
        """Process a user query and return the response."""
        await self.initialize_caches()
        self.conversation.append({"role": "user", "content": query})

        # Use cached table/mv names for extraction
        def extract_known_names(q):
            """Extract known table or materialized view names from the query."""
            words = set(re.findall(r"\b([a-zA-Z_][a-zA-Z0-9_]*)\b", q))
            found = [w for w in words if w in self.table_cache or w in self.mv_cache]
            return found
        table_names = extract_known_names(query)
        schema_contexts = []
        for t in table_names:
            if t in self.table_descriptions:
                schema_contexts.append(
                    f"Schema for table '{t}':\n{self.table_descriptions[t]}"
                )
            elif t in self.mv_descriptions:
                schema_contexts.append(
                    f"Schema for materialized view '{t}':\n{self.mv_descriptions[t]}"
                )

        messages = self.conversation.copy()
        for schema in schema_contexts:
            messages.append({"role": "user", "content": schema})

        tools = await self.list_tools()

        final_text = []
        while True:
            response = self.anthropic.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=4096,
                messages=messages,
                tools=tools
            )

            tool_used = False
            for content in response.content:
                if content.type == 'text':
                    final_text.append(content.text)
                    self.conversation.append({"role": "assistant", "content": content.text})
                elif content.type == 'tool_use':
                    tool_used = True
                    await self.handle_tool_use(content, final_text)
                    messages = self.conversation.copy()
            if not tool_used:
                break

        return "\n".join(final_text)

    async def chat_loop(self):
        """Main chat loop for user interaction."""
        print("Rising Wave MCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        while True:
            try:
                query = input("\nQuery: ").strip()
                if query.lower() == 'quit':
                    break
                response = await self.process_query(query)
                print("\n" + response)
            except Exception as exc:
                print(f"\nError: {str(exc)}")

```

### Add the main function

Add the entry point to start the program:

```python theme={null}
async def main():
    """Entry point for the client."""
    load_dotenv()
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    server_script_path = sys.argv[1]
    async with RisingWaveMCPAgent(server_script_path) as client:
        await client.chat_loop()


if __name__ == "__main__":
    asyncio.run(main())
```

## Step 3: Create test data

Create test data using the [load generator tutorial](https://docs.risingwave.com/ingestion/advanced/generate-test-data) or run this script:

```psql theme={null}
CREATE TABLE users (
  id int,
  risk int,
  cost int,
  time timestamp
)
WITH (
  connector = 'datagen',

  fields.id.length = '1',
  fields.id.kind = 'sequence',
  fields.id.start = '1000',

  fields.risk.length = '1',
  fields.risk.kind = 'random',
  fields.risk.min = '0',
  fields.risk.max = '10',
  fields.risk.seed = '5',

  fields.cost.length = '1',
  fields.cost.kind = 'random',
  fields.cost.min = '1',
  fields.cost.max = '10000',
  fields.cost.seed = '8',

  fields.time.kind = 'random',
  fields.time.max_past = '5h',
  fields.time.max_past_mode = 'relative',

  datagen.rows.per.second = '10'
)
FORMAT PLAIN ENCODE JSON;
```

## Step 4: Run and test the agent

### Start the agent

Run this command to start the agent:

```shell theme={null}
python client.py risingwave-mcp/src/main.py
```

### Test the agent

Try these example commands:

**Get database information:**

* Give me the database version

```bash theme={null}
I'll help you get the RisingWave database version information using the `get_database_version` function.
[Calling tool get_database_version with args {}]
The database is running PostgreSQL 13.14.0 with RisingWave version 2.2.0 (Homebrew build).
```

**Explore table schemas:**

* Show me the users table structure

```bash theme={null}
Columns:
1. id (integer)
2. risk (integer)
3. cost (integer)
4. time (timestamp without time zone)
5. _row_id (serial) - hidden, primary key
6. _rw_timestamp (timestamp with time zone) - hidden

Additional Information:
- Primary Key: _row_id
- Distribution Key: _row_id
- Table Name: users
```

**Create and query materialized views:**

* Track the highest risk scores

```bash theme={null}
I'll help you create a materialized view to track the highest risk scores from the users table. I'll create a view that orders by risk score in descending order.
[Calling tool create_materialized_view with args {'name': 'high_risk_users', 'sql_statement': 'SELECT id, risk, cost, time \nFROM users \nWHERE risk >= 7 \nORDER BY risk DESC'}]
Great! The materialized view 'high_risk_users' has been created. Let me show you the results from this view to see the users with the highest risk scores.
```

* Sort that mv by highest spending and display

```bash theme={null}
I'll query the materialized view 'high_risk_users' and sort it by cost in descending order (highest spending) using the run_select_query function.
[Calling tool run_select_query with args {'query': 'SELECT * FROM high_risk_users ORDER BY cost DESC;'}]
Here's an analysis of the high-risk users sorted by spending:

Top 5 highest spending high-risk users:
1. ID: 1398 - Risk: 8, Cost: $9,999
2. ID: 1021 - Risk: 10, Cost: $9,967
3. ID: 1277 - Risk: 8, Cost: $9,949
4. ID: 1642 - Risk: 8, Cost: $9,913
5. ID: 2048 - Risk: 10, Cost: $9,855

Key observations:
- There are 451 users classified as high-risk
- Spending ranges from $11 (lowest) to $9,999 (highest)
- Some users have maximum risk score (10) but varying spending levels
- The view includes users with risk scores 7 and above
```

Experiment with your own prompts and data! You can also use any of the [demos](https://docs.risingwave.com/demos/overview) to test the agent.

## Clean up

When you finish testing:

1. **Stop the agent**: Type `quit` in the agent interface.
2. **Clean up test data** (optional): Ask the agent to "Drop the users table".
3. **Stop RisingWave**: Stop the RisingWave service if you started it for this demo.
