Skip to content

RAG

Vector search power recommendation engines, chatbots, AI agentes and search engines.

Traditional keyword search works by matching exact words, when needing to search through images, audio, video, code, or unstructured text, keyword search is not effective.

Instead o relying on keywords, vector search uses embeddings to represent data as high-dimensional vectors, capturing semantic meaning and context. This allows for more accurate and relevant search results based on the actual content and meaning of the data.

1
2
3
4
5
docker run --rm --name qdrant\
     -p 6333:6333 \
     -p 6334:6334 \
     -v "${PWD}/qdrant_storage:/qdrant/storage:z" \
     qdrant/qdrant:v1.16.3

Go to http://localhost:6333/dashboard

Install packages

1
2
3
4
5
6
!uv pip install -q \
    requests==2.32.5 \
    python-dotenv==1.2.1 \
    litellm==1.78.5 \
    qdrant-client==1.16.2 \
    fastembed==0.7.4

Import packages

import json
import random
import uuid
from typing import Any, Dict, List

import litellm
import requests
from dotenv import load_dotenv
from fastembed import TextEmbedding
from qdrant_client import QdrantClient, models

load_dotenv()
True

Download documents

docs_url = "https://github.com/alexeygrigorev/llm-rag-workshop/raw/main/notebooks/documents.json"
docs_response = requests.get(docs_url)
documents_raw = docs_response.json()

documents = []

for course in documents_raw:
    course_name = course["course"]
    for doc in course["documents"]:
        doc["course"] = course_name
        documents.append(doc)

documents[0]
{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
 'section': 'General course-related questions',
 'question': 'Course - When will the course start?',
 'course': 'data-engineering-zoomcamp'}

Create a Qdrant client instance

qd_client = QdrantClient("http://localhost:6333")

Verify models compatible with current setup

1
2
3
4
5
EMBEDDING_DIMENSIONALITY = 512

for model in TextEmbedding.list_supported_models():
    if model["dim"] == EMBEDDING_DIMENSIONALITY:
        print(json.dumps(model, indent=2))
{

  "model": "BAAI/bge-small-zh-v1.5",

  "sources": {

    "hf": "Qdrant/bge-small-zh-v1.5",

    "url": "https://storage.googleapis.com/qdrant-fastembed/fast-bge-small-zh-v1.5.tar.gz",

    "_deprecated_tar_struct": true

  },

  "model_file": "model_optimized.onnx",

  "description": "Text embeddings, Unimodal (text), Chinese, 512 input tokens truncation, Prefixes for queries/documents: not so necessary, 2023 year.",

  "license": "mit",

  "size_in_GB": 0.09,

  "additional_files": [],

  "dim": 512,

  "tasks": {}

}

{

  "model": "Qdrant/clip-ViT-B-32-text",

  "sources": {

    "hf": "Qdrant/clip-ViT-B-32-text",

    "url": null,

    "_deprecated_tar_struct": false

  },

  "model_file": "model.onnx",

  "description": "Text embeddings, Multimodal (text&image), English, 77 input tokens truncation, Prefixes for queries/documents: not necessary, 2021 year",

  "license": "mit",

  "size_in_GB": 0.25,

  "additional_files": [],

  "dim": 512,

  "tasks": {}

}

{

  "model": "jinaai/jina-embeddings-v2-small-en",

  "sources": {

    "hf": "xenova/jina-embeddings-v2-small-en",

    "url": null,

    "_deprecated_tar_struct": false

  },

  "model_file": "onnx/model.onnx",

  "description": "Text embeddings, Unimodal (text), English, 8192 input tokens truncation, Prefixes for queries/documents: not necessary, 2023 year.",

  "license": "apache-2.0",

  "size_in_GB": 0.12,

  "additional_files": [],

  "dim": 512,

  "tasks": {}

}

Choose model

model_handle = "jinaai/jina-embeddings-v2-small-en"

Define a collection

collection_name = "zoomcamp-rag"

if qd_client.collection_exists(collection_name=collection_name):
    qd_client.delete_collection(collection_name=collection_name)

qd_client.create_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(
        size=EMBEDDING_DIMENSIONALITY, distance=models.Distance.COSINE
    ),
)
True

Create points

points = []
id = 0

for course in documents_raw:
    for doc in course["documents"]:
        point = models.PointStruct(
            id=id,
            vector=models.Document(text=doc["text"], model=model_handle),
            payload={
                "text": doc["text"],
                "section": doc["section"],
                "course": course["course"],
            },
        )
        points.append(point)

        id += 1

Generate embeddings and insert into Qdrant

qd_client.upsert(collection_name=collection_name, points=points)
UpdateResult(operation_id=1, status=<UpdateStatus.COMPLETED: 'completed'>)
1
2
3
4
5
6
7
8
9
def search(query, limit=1):
    results = qd_client.query_points(
        collection_name=collection_name,
        query=models.Document(text=query, model=model_handle),
        limit=limit,
        with_payload=True,
    )

    return results
1
2
3
course = random.choice(documents_raw)
course_piece = random.choice(course["documents"])
print(json.dumps(course_piece, indent=2))
{

  "text": "I have faced a problem while reading the large parquet file. I tried some workarounds but they were NOT successful with Jupyter.\nThe error message is:\nIndexError: index 311297 is out of bounds for axis 0 with size 131743\nI solved it by performing the homework directly as a python script.\nAdded by Ibraheem Taha (ibraheemtaha91@gmail.com)\nYou can try using the Pyspark library\nAnswered by kamaldeen (kamaldeen32@gmail.com)",

  "section": "Module 1: Introduction",

  "question": "Reading large parquet files",

  "course": "mlops-zoomcamp"

}
result = search(course_piece["question"])
result
QueryResponse(points=[ScoredPoint(id=237, version=1, score=0.86789715, payload={'text': 'The read_parquet function supports a list of files as an argument. The list of files will be merged into a single result table.', 'section': "error: Error while reading table: trips_data_all.external_fhv_tripdata, error message: Parquet column 'DOlocationID' has type INT64 which does not match the target cpp_type DOUBLE.", 'course': 'data-engineering-zoomcamp'}, vector=None, shard_key=None, order_value=None)])

Index fields that will be used as filters

1
2
3
4
5
qd_client.create_payload_index(
    collection_name=collection_name,
    field_name="course",
    field_schema="keyword",
)
UpdateResult(operation_id=3, status=<UpdateStatus.COMPLETED: 'completed'>)

Search with filters

def search_in_course(query, course, limit=1):
    results = qd_client.query_points(
        collection_name=collection_name,
        query=models.Document(text=query, model=model_handle),
        query_filter=models.Filter(
            must=[
                models.FieldCondition(
                    key="course", match=models.MatchValue(value=course)
                )
            ]
        ),
        limit=limit,
        with_payload=True,
    )

    return results

Apply filters in search

for course in [
    "data-engineering-zoomcamp",
    "machine-learning-zoomcamp",
    "mlops-zoomcamp",
]:
    print(
        course + "\n",
        search_in_course(
            "What if I submit homeworks late?",
            course=course,
        )
        .points[0]
        .payload["text"],
    )
data-engineering-zoomcamp

 No, late submissions are not allowed. But if the form is still not closed and it’s after the due date, you can still submit the homework. confirm your submission by the date-timestamp on the Course page.y

Older news:[source1] [source2]

machine-learning-zoomcamp

 Depends on whether the form will still be open. If you're lucky and it's open, you can submit your homework and it will be evaluated. if closed - it's too late.

(Added by Rileen Sinha, based on answer by Alexey on Slack)

mlops-zoomcamp

 Please choose the closest one to your answer. Also do not post your answer in the course slack channel.
collection_name = "zoomcamp-faq"

if qd_client.collection_exists(collection_name=collection_name):
    qd_client.delete_collection(collection_name=collection_name)

qd_client.create_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(
        size=EMBEDDING_DIMENSIONALITY, distance=models.Distance.COSINE
    ),
)
True
1
2
3
4
5
qd_client.create_payload_index(
    collection_name=collection_name,
    field_name="course",
    field_schema="keyword",
)
UpdateResult(operation_id=2, status=<UpdateStatus.COMPLETED: 'completed'>)
1
2
3
4
5
6
7
points = []

for i, doc in enumerate(documents):
    text = doc["question"] + " " + doc["text"]
    vector = models.Document(text=text, model=model_handle)
    point = models.PointStruct(id=i, vector=vector, payload=doc)
    points.append(point)
qd_client.upsert(collection_name=collection_name, points=points)
UpdateResult(operation_id=3, status=<UpdateStatus.COMPLETED: 'completed'>)
def vector_search(question):
    course = "data-engineering-zoomcamp"
    query_points = qd_client.query_points(
        collection_name=collection_name,
        query=models.Document(text=question, model=model_handle),
        query_filter=models.Filter(
            must=[
                models.FieldCondition(
                    key="course", match=models.MatchValue(value=course)
                )
            ]
        ),
        limit=5,
        with_payload=True,
    )

    results = []

    for point in query_points.points:
        results.append(point.payload)

    return results
def build_prompt(query: str, search_results: List[Dict[str, Any]]):
    prompt_template = """
    You're a course teaching assistant. Answer the question based on the CONTEXT.
    Use only the facts from the CONTEXT when answering the QUESTION.

    QUESTION: {question}

    CONTEXT:
    {context}
    """
    context = ""

    for doc in search_results:
        context = (
            context
            + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
        )

    prompt = prompt_template.format(question=query, context=context).strip()

    return prompt
def llm(prompt: str):
    messages = [
        {
            "role": "user",
            "content": prompt,
        },
    ]

    completion = litellm.completion(
        model="gemini/gemini-2.5-flash",
        messages=messages,
    )

    return completion.choices[0].message.content
1
2
3
4
5
def rag(query):
    search_results = vector_search(query)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt)
    return answer
print(rag("how do I run kafka?"))
To run Kafka, you typically need to ensure the Kafka broker is running and then execute your client applications (producers, consumers).



1.  **Start the Kafka Broker (if using Docker):**

    If you encounter a `kafka.errors.NoBrokersAvailable` error, it likely means your Kafka broker Docker container isn't working. You can confirm this with `docker ps`. To start all instances, navigate to the docker compose yaml file folder and run:

    `docker compose up -d`



2.  **Run Java Kafka Applications (e.g., Producer/Consumer/KStreams):**

    In the project directory, run:

    `java -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java`

    (Note: Replace `JsonProducer.java` with your specific application, like `JsonConsumer.java`, etc.)



3.  **Run Python Kafka Applications (e.g., producer.py):**

    First, make sure the Docker images (including the Kafka broker) are up and running. If you encounter a "Module `kafka` not found" error, you should create and activate a virtual environment and install the necessary packages.

    *   To create a virtual environment and install packages (run only once):

        `python -m venv env`

        `source env/bin/activate` (or `env/Scripts/activate` on Windows)

        `pip install -r ../requirements.txt`

    *   To activate it (you'll need to run it every time you need the virtual env):

        `source env/bin/activate` (or `env/Scripts/activate` on Windows)

    Then you can run your Python Kafka files within this environment.

Methods such as Bag of Words, TFIDF and BM25 are still widely used in search applications and sometimes preferred over dense embeddings.

Keyword-based search is also implemented as a vector search, but these vectors are usually sparse, meaning that most of their dimensions are zero. In contrast, dense embeddings have most of their dimensions filled with non-zero values. In sparese vectors, each word/phrase gets a unique position in vector space.

There are plenty of different options for creating sparse embeddings, but BM25 is one of the most popular ones. It's a statistical model, which makes it really fast and lightweight.

BM25 stands for Best Matching 25, and it's a ranking function that helps determine how relevant a document is to a query by combining Term Frequency (TF), Inverse Document Frequency (IDF), and document length normalization to prevent longer documents from being unfairly favored.

collection_name = "zoomcamp-sparse"

if qd_client.collection_exists(collection_name=collection_name):
    qd_client.delete_collection(collection_name=collection_name)

qd_client.create_collection(
    collection_name=collection_name,
    sparse_vectors_config={
        "bm25": models.SparseVectorParams(
            modifier=models.Modifier.IDF,
        )
    },
)
True
qd_client.upsert(
    collection_name=collection_name,
    points=[
        models.PointStruct(
            id=uuid.uuid4().hex,
            vector={
                "bm25": models.Document(text=doc["text"], model="Qdrant/bm25")
            },
            payload={
                "text": doc["text"],
                "section": doc["section"],
                "course": course["course"],
            },
        )
        for course in documents_raw
        for doc in course["documents"]
    ],
)
UpdateResult(operation_id=1, status=<UpdateStatus.COMPLETED: 'completed'>)
def search(query: str, limit: int = 1) -> List[models.ScoredPoint]:
    results = qd_client.query_points(
        collection_name=collection_name,
        query=models.Document(
            text=query,
            model="Qdrant/bm25",
        ),
        using="bm25",
        limit=limit,
        with_payload=True,
    )

    return results.points
results = search("Qdrant")
print(results)
[]

Sparse vectors can return no results, if none of the keywords in the query match the keywords in the documents. Dense embeddings, on the other hand, will always return results, even if they are not relevant.

results = search("How do I run kafka?")
print(results[0].payload["text"])
If you get an error while running the command python3 stream.py worker

Run pip uninstall kafka-python

Then run pip install kafka-python==1.4.6

What is the use of  Redpanda ?

Redpanda: Redpanda is built on top of the Raft consensus algorithm and is designed as a high-performance, low-latency alternative to Kafka. It uses a log-centric architecture similar to Kafka but with different underlying principles.

Redpanda is a powerful, yet simple, and cost-efficient streaming data platform that is compatible with Kafka® APIs while eliminating Kafka complexity.
results[0].score
10.442513
1
2
3
4
5
random.seed(202506)

course = random.choice(documents_raw)
course_piece = random.choice(course["documents"])
print(json.dumps(course_piece, indent=2))
{

  "text": "Even though the upload works using aws cli and boto3 in Jupyter notebook.\nSolution set the AWS_PROFILE environment variable (the default profile is called default)",

  "section": "Module 4: Deployment",

  "question": "Uploading to s3 fails with An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.\"",

  "course": "mlops-zoomcamp"

}
results = search(course_piece["question"])
print(results[0].payload["text"])

Reranking

collection_name = "zoomcamp-sparse-and-dense"
if qd_client.collection_exists(collection_name=collection_name):
    qd_client.delete_collection(collection_name=collection_name)

qd_client.create_collection(
    collection_name=collection_name,
    vectors_config={
        "jina-small": models.VectorParams(
            size=EMBEDDING_DIMENSIONALITY,
            distance=models.Distance.COSINE,
        ),
    },
    sparse_vectors_config={
        "bm25": models.SparseVectorParams(
            modifier=models.Modifier.IDF,
        )
    },
)
qd_client.upsert(
    collection_name=collection_name,
    points=[
        models.PointStruct(
            id=uuid.uuid4().hex,
            vector={
                "jina-small": models.Document(
                    text=doc["text"],
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                "bm25": models.Document(
                    text=doc["text"],
                    model="Qdrant/bm25",
                ),
            },
            payload={
                "text": doc["text"],
                "section": doc["section"],
                "course": course["course"],
            },
        )
        for course in documents_raw
        for doc in course["documents"]
    ],
)
def multi_stage_search(query: str, limit: int = 1) -> List[models.ScoredPoint]:
    results = qd_client.query_points(
        collection_name=collection_name,
        prefetch=[
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                using="jina-small",
                limit=(10 * limit),
            )
        ],
        query=models.Document(
            text=query,
            model="Qdrant/bm25",
        ),
        using="bm25",
        limit=limit,
        with_payload=True,
    )
    return results.points
print(json.dumps(course_piece, indent=2))
results = multi_stage_search(course_piece["question"])
print(results[0].payload["text"])

Fusion

Fusion reranking method

def rrf_search(query: str, limit: int = 1) -> List[models.ScoredPoint]:
    results = qd_client.query_points(
        collection_name=collection_name,
        prefetch=[
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="jinaai/jina-embeddings-v2-small-en",
                ),
                using="jina-small",
                limit=(5 * limit),
            ),
            models.Prefetch(
                query=models.Document(
                    text=query,
                    model="Qdrant/bm25",
                ),
                using="bm25",
                limit=(5 * limit),
            ),
        ],
        query=models.FusionQuery(fusion=models.Fusion.RRF),
        with_payload=True,
    )
    return results.points

Results

1
2
3
results = rrf_search(course_piece["question"])
print(json.dumps(course_piece, indent=2))
print(results[0].payload["text"])
{

  "text": "Even though the upload works using aws cli and boto3 in Jupyter notebook.\nSolution set the AWS_PROFILE environment variable (the default profile is called default)",

  "section": "Module 4: Deployment",

  "question": "Uploading to s3 fails with An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.\"",

  "course": "mlops-zoomcamp"

}

When executing an AWS CLI command (e.g., aws s3 ls), you can get the error <botocore.awsrequest.AWSRequest object at 0x7fbaf2666280>.

To fix it, simply set the AWS CLI environment variables:

export AWS_DEFAULT_REGION=eu-west-1

export AWS_ACCESS_KEY_ID=foobar

export AWS_SECRET_ACCESS_KEY=foobar

Their value is not important; anything would be ok.

Added by Giovanni Pecoraro