Skip to content

MCP - LLM Integration

Create MCP server file

server.py
# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "mcp==1.21.0",
#     "duckdb==1.4.1",
# ]
# ///

import argparse
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any

import duckdb  # type: ignore
from mcp.server.fastmcp import Context, FastMCP  # type: ignore

DB_PATH = "database.duckdb"


@dataclass
class AppContext:
    db: duckdb.DuckDBPyConnection


@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
    db = duckdb.connect(DB_PATH)
    try:
        yield AppContext(db=db)
    finally:
        db.close()


mcp = FastMCP(
    name="DuckDB",
    host="0.0.0.0",
    port=8333,
    stateless_http=True,
    lifespan=app_lifespan,
)


@mcp.tool()
def list_tables(ctx: Context) -> list[dict[str, Any]]:
    """List all tables in the database"""
    result = ctx.request_context.lifespan_context.db.execute(
        "SHOW TABLES"
    ).fetchall()
    return [{"name": row[0]} for row in result]


@mcp.tool()
def create_table(ctx: Context, table_name: str, columns: str) -> str:
    """Create a new table. columns should be SQL column definitions like 'id INTEGER, name VARCHAR, amount DECIMAL(10,2)'"""
    ctx.request_context.lifespan_context.db.execute(
        f"CREATE TABLE {table_name} ({columns})"
    )
    return f"Table {table_name} created"


@mcp.tool()
def insert_data(
    ctx: Context, table_name: str, columns: str, values: str
) -> str:
    """Insert data into table. columns like 'name, amount' and values like \"'Product A', 100.50\" """
    ctx.request_context.lifespan_context.db.execute(
        f"INSERT INTO {table_name} ({columns}) VALUES ({values})"
    )
    return f"Data inserted into {table_name}"


@mcp.tool()
def query_data(ctx: Context, sql: str) -> list[dict[str, Any]]:
    """Execute SELECT query and return results"""
    db = ctx.request_context.lifespan_context.db
    result = db.execute(sql).fetchall()
    columns = [desc[0] for desc in db.description]
    return [dict(zip(columns, row)) for row in result]


@mcp.tool()
def create_table_from_csv(ctx: Context, table_name: str, csv_url: str) -> str:
    """Create a table from a CSV URL. csv_url should be an HTTP(S) URL to a CSV file"""
    ctx.request_context.lifespan_context.db.execute(
        f"CREATE TABLE {table_name} AS SELECT * FROM read_csv('{csv_url}')"
    )
    return f"Table {table_name} created from {csv_url}"


@mcp.tool()
def drop_table(ctx: Context, table_name: str) -> str:
    """Drop a table from the database"""
    ctx.request_context.lifespan_context.db.execute(f"DROP TABLE {table_name}")
    return f"Table {table_name} dropped"


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Run MCP server with optional transport"
    )
    parser.add_argument(
        "--transport",
        choices=["stdio", "sse", "streamable-http"],
        default="stdio",
        help="Transport method to use (default: stdio)",
    )
    args = parser.parse_args()

    print(f"Running server with {args.transport} transport")
    mcp.run(transport=args.transport)

Install packages

1
2
3
4
5
6
!uv pip install -q \
    litellm==1.78.5 \
    python-dotenv==1.1.1 \
    pydantic==2.12.3 \
    mcp==1.21.0 \
    nest-asyncio==1.6.0

Import packages

import asyncio
import json
from contextlib import AsyncExitStack
from typing import Any, Dict, List, Optional

import litellm  # type: ignore
import nest_asyncio  # type: ignore
from dotenv import load_dotenv  # type: ignore
from mcp import ClientSession, StdioServerParameters  # type: ignore
from mcp.client.stdio import stdio_client  # type: ignore

nest_asyncio.apply()  # Needed to run interactive python (Jupyter Notebooks)

load_dotenv()
True

Define client

class MCPLiteLLMClient:
    def __init__(
        self, model: str = "gemini/gemini-2.0-flash", max_iterations: int = 3
    ):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.model = model
        self.messages: Optional[List[Dict[str, Any]]] = None
        self.stdio: Optional[Any] = None
        self.write: Optional[Any] = None
        self.max_iterations = max_iterations

    async def connect(self, server_script_path: str = "server.py"):
        server_params = StdioServerParameters(
            command="uv", args=["run", server_script_path]
        )

        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )

        if self.session is None:
            raise Exception("No session")

        await self.session.initialize()

        available_tools = await self.session.list_tools()
        print("Available tools:")
        for tool in available_tools.tools:
            print(f"  - {tool.name}: {tool.description}")

    async def disconnect(self):
        await self.exit_stack.aclose()

    async def get_available_tools(self) -> List[Dict[str, Any]]:
        if self.session is None:
            raise Exception("No session")

        tools_result = await self.session.list_tools()
        return [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema,
                },
            }
            for tool in tools_result.tools
        ]

    async def process_query(self, query: str):
        if self.session is None:
            raise Exception("No session")

        tools = await self.get_available_tools()

        if not self.messages:
            self.messages = [{"role": "user", "content": query}]
        else:
            self.messages.append({"role": "user", "content": query})

        current_iteration = 0

        while current_iteration <= self.max_iterations:
            current_iteration += 1

            response = litellm.completion(
                model=self.model,
                messages=self.messages,
                tools=tools,
            )

            choice = response.choices[0].message
            tool_calls = getattr(choice, "tool_calls", None)

            if not tool_calls:
                self.messages.append(
                    {
                        "role": "assistant",
                        "content": choice.content,
                    }
                )
                return choice.content

            self.messages.append(
                {
                    "role": "assistant",
                    "content": choice.content,
                    "tool_calls": [
                        {
                            "id": tc.id,
                            "type": "function",
                            "function": {
                                "name": tc.function.name,
                                "arguments": tc.function.arguments,
                            },
                        }
                        for tc in tool_calls
                    ],
                }
            )

            for tool_call in tool_calls:
                name = tool_call.function.name
                kwargs = json.loads(tool_call.function.arguments)
                result = await self.session.call_tool(name, arguments=kwargs)

                if hasattr(result, "content"):
                    content_str = json.dumps(
                        [
                            (
                                {"type": item.type, "text": item.text}
                                if hasattr(item, "text")
                                else str(item)
                            )
                            for item in result.content
                        ]
                    )
                else:
                    content_str = str(result)

                self.messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": content_str,
                    }
                )
        return "Reach max iterations"

Run client

async def main():
    client = MCPLiteLLMClient()
    await client.connect()

    drop_table_query = "Drop table iris if exists"
    drop_table_response = await client.process_query(drop_table_query)
    print(drop_table_response)

    create_table_query = "Create a table named iris from the csv https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv"
    create_table_response = await client.process_query(create_table_query)
    print(create_table_response)

    select_query = (
        "Show me first 2 items of iris table in a markdown table format"
    )
    select_response = await client.process_query(select_query)
    print(select_response)


if __name__ == "__main__":
    asyncio.run(main())
Available tools:

  - list_tables: List all tables in the database

  - create_table: Create a new table. columns should be SQL column definitions like 'id INTEGER, name VARCHAR, amount DECIMAL(10,2)'

  - insert_data: Insert data into table. columns like 'name, amount' and values like "'Product A', 100.50" 

  - query_data: Execute SELECT query and return results

  - create_table_from_csv: Create a table from a CSV URL. csv_url should be an HTTP(S) URL to a CSV file

  - drop_table: Drop a table from the database

OK. I have dropped the table named iris.



I have created the table iris from the given CSV URL.



| sepal_length | sepal_width | petal_length | petal_width | species   |

|--------------|-------------|--------------|-------------|-----------|

| 5.1          | 3.5         | 1.4          | 0.2         | setosa    |

| 4.9          | 3.0         | 1.4          | 0.2         | setosa    |