only as strong as their knowledge base. An accurate and curated knowledge base improves both model speed and accuracy—areas where current models often fall short. In fact, a recent study shows that major AI chatbots are wrong for almost every second query.
In this article, I’ll cover how you can build a reliable knowledge base with detailed steps and mistakes to avoid.
6 steps to build an effective knowledge base
Taking a systematic approach to building a knowledge base helps you create one that is standardized, scalable, and self-explanatory. Any new developer can easily add or update the knowledge base over time to keep it up to date and reliable.
To ensure you get there, you can follow these six steps whenever you start creating a knowledge base:
1. Collect data
A main misconception with collecting data for a knowledge base is assuming more is better. It makes you fall into the classic “garbage in, garbage out” issue.
Prioritize value over volume and collect all data that is relevant for your model. It could be in the form of:
- Factual and tutorial content covering facts and procedures
- Problem-solving content in the form of an instructive text or videos
- Historical data showing past issues or execution log
- Real-time data covering live system status or recent news feeds
- Domain data for the model to get more context
It’s important to understand that your system doesn’t need every information. For example, if you are building a customer support chatbot, then your model may need only factual and tutorial content explaining company policy and procedures. It ensures your model doesn’t invent an invalid or out-of-scope response and sticks to what is provided to it.
Tip: There is an increasing trend to feed AI-generated data while building a knowledge base of new AI models. I feel this practice is a bit of a double-edged sword. It does offer speed, but you must check the output for reliability and fluff. Always optimize the content for crisp responses and verify the output before adding it to the knowledge base.
2. Clean and segment data into chunks
After you have the raw data ready, you can clean it first. The cleaning process would typically include:
- Removing duplicate and outdated content
- Deleting irrelevant details such as headers, footers, and page numbers
- Standardizing content, both format and content-wise (consistent terminology)
This cleaned data is then divided into logical chunks, where each chunk contains one clear idea or topic.
Every chunk is also assigned metadata that provides quick context about the content in it. This metadata helps AI models to browse through knowledge bases faster and quickly reach chunks that have relevant details.
You can also set role-based access in chunks to ensure which roles get access to information in that chunk. While many roles may have access to a model, not everyone can access all the data. Chunking is where you can set security and access control within the model.
Tip: A best practice I always follow is to chunk data based on user queries instead of document structure. For example, you have a document on login and access management. You can chunk it on common user questions like ‘How to change password?’, ‘What is the password policy?’, etc. You can then validate these chunks by testing against real queries. A safe set can be 10-12 questions.
3. Organize and index data
The text chunks are converted into numbers called vectors using an embedding model like OpenAI v3-Large, BGE-M3, etc.
AI models can skim through vectors faster than a huge block of text. After vectorization, the metadata attached to the chunk is then attached to the vector. The final chunk will look like this:
[ Vector (numbers) ] + [ Original text ] + [ Metadata ]
4. Choose a platform to store data
You can store this vector output in a vector database such as Pinecone, Milvus, or Weaviate for retrieval. You can upload the vector data by writing a simple python code.
import math
import time
import json
from dataclasses import dataclass, field
from typing import Any
import numpy as np
# Vector Normalization + Metadata
def normalize_l2(vector: list[float]) -> list[float]:
"""
Return an L2-normalized copy of `vec`.
Many vector stores use dot-product similarity. If you normalize vectors to
unit length, dot-product becomes equivalent to cosine similarity.
"""
arr = np.array(vector, dtype=np.float32)
norm = np.linalg.norm(arr)
if norm == 0:
return vector
return (arr / norm).tolist()
def prepare_record(
doc_id: str,
embedding: list[float],
text: str,
source: str,
extra_metadata: dict[str, Any] | None = None,
) -> dict:
"""
Prepare a single record for vector DB upsert.
Metadata serves two purposes:
- Filtering: narrow down search to a subset
"""
metadata = {
"source": source,
"text_preview": text[:500],
"char_count": len(text),
}
if extra_metadata:
metadata.update(extra_metadata)
return {
"id": doc_id,
"values": normalize_l2(embedding),
"metadata": metadata,
}
# Vector Quantization
# Scalar Quantization / SQ
def scalar_quantization(input_vec) -> dict:
"""
This funtion demonstrates
how to compress float32 input_vec to uint8
"""
input_arr = np.array(input_vec, dtype=np.float32)
min, max = input_arr.min(), input_arr.max()
range = (max - min)
if range == 0:
quantized = np.zeros_like(arr, dtype=np.uint8)
else:
quantized = ((input_arr - min) / range * 255).astype(np.uint8)
return {
"quantized": quantized.tolist(),
"min": float(min),
"max": float(max),
}
def scalar_dequantization(record: dict) -> list[float]:
"""
You can Reconstruct the original vector
by approximate float32 vector from uint8.
"""
arr = np.array(record["quantized"], dtype=np.float32)
return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()
# Product Quantization / PQ
def train_product_quantizer( vectors, num_subvectors: int = 8, num_centroids: int = 256, max_iterations: int = 20) -> list:
"""
This function demonstrates
split vector into subvectors, cluster each independently
"""
from sklearn.cluster import KMeans
dim = vectors.shape[1]
assert dim % num_subvectors == 0, "dim must be divisible by num_subvectors"
sub_dim = dim // num_subvectors
codebooks = []
for i in range(num_subvectors):
sub_vectors = vectors[:, i * sub_dim : (i + 1) * sub_dim]
kmeans = KMeans(n_clusters=num_centroids, max_iter=max_iterations, n_init=1)
kmeans.fit(sub_vectors)
codebooks.append(kmeans.cluster_centers_)
return codebooks
def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
"""
Encode a single vector into PQ codes (one uint8 per subvector)
"""
num_subvectors = len(codebooks)
sub_dim = len(vector) // num_subvectors
codes = []
for i, codebook in enumerate(codebooks):
sub_vec = vector[i * sub_dim : (i + 1) * sub_dim]
distances = np.linalg.norm(codebook - sub_vec, axis=1)
codes.append(int(np.argmin(distances)))
return codes
def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
"""
Reconstruct approximate vector from PQ codes
"""
return np.concatenate(
[codebook[code] for code, codebook in zip(codes, codebooks)]
)
Tip: To increase upload speed, I suggest using the batch insert option. You can also normalize the vectors (make them all of the same sizes) during the upload phase. After normalization, quantize (compress) it to optimize storage. This additional normalization and quantization step fastens the retrieval later.
5. Optimize retrieval
To enable retrieval from the vector database, you can use orchestration frameworks such as LlamaIndex and LangChain.
LlamaIndex can browse through the vector database faster and get to the exact chunk where there is related content to the user query.
LangChain then takes data from the chunk and transforms it as per the user query. For example, summarizing text or writing an email out of it.
"""
Hybrid Retrieval: Take benefits from both keyword search and vector similarity
Where each approach shines:
- Keywords: looks for exact matches, but will miss searches with synonym
- Embeddings: has advantage of capturing the meaning, but there is possibility of missing exact keyword
Hybrid is a combination of both to get the best of each.
"""
import math
from collections import defaultdict
from dataclasses import dataclass
import numpy as np
@dataclass
class Document:
id: str
text: str
embedding: list[float]
class BestMatching25Index:
def __init__(self, k1: float = 1.5, b: float = 0.75):
# Here k1 is the term frequency saturation limit
# and b is length of normalization
self.k1 = k1
self.b = b
self.doc_lengths: dict[str, int] = {}
self.avg_doc_length: float = 0
self.doc_freqs: dict[str, int] = {}
self.term_freqs: dict[str, dict[str, int]] = {}
self.corpus_size: int = 0
def _tokenize(self, text: str) -> list[str]:
return text.lower().split()
def index(self, documents: list[Document]) -> None:
self.corpus_size = len(documents)
for doc in documents:
tokens = self._tokenize(doc.text)
self.doc_lengths[doc.id] = len(tokens)
self.term_freqs[doc.id] = {}
seen_terms: set[str] = set()
for token in tokens:
self.term_freqs[doc.id][token] = self.term_freqs[doc.id].get(token, 0) + 1
if token not in seen_terms:
self.doc_freqs[token] = self.doc_freqs.get(token, 0) + 1
seen_terms.add(token)
self.avg_doc_length = sum(self.doc_lengths.values()) / self.corpus_size
def score(self, query: str, doc_id: str) -> float:
query_terms = self._tokenize(query)
doc_len = self.doc_lengths[doc_id]
score = 0.0
for term in query_terms:
if term not in self.doc_freqs or term not in self.term_freqs.get(doc_id, {}):
continue
tf = self.term_freqs[doc_id][term]
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
tf_norm = (tf * (self.k1 + 1)) / (
tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length)
)
score += idf * tf_norm
return score
def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
scores = [
(doc_id, self.score(query, doc_id))
for doc_id in self.doc_lengths
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
class VectorIndex:
"""This class implements the smart search using the hybrid search.
The index function normalize and stores the document
search implements a cosine similarity search
hybrid_search_weighted merges BM25 index and vector index using weighted average
Reciprocal_rank_fusion Combines the results in an efficient way
"""
def __init__(self):
self.documents: dict[str, np.ndarray] = {}
def index(self, documents: list[Document]) -> None:
for doc in documents:
arr = np.array(doc.embedding, dtype=np.float32)
norm = np.linalg.norm(arr)
self.documents[doc.id] = arr / norm if norm > 0 else arr
def search(self, query_embedding: list[float], top_k: int = 10) -> list[tuple[str, float]]:
q = np.array(query_embedding, dtype=np.float32)
q = q / np.linalg.norm(q)
scores = [
(doc_id, float(np.dot(q, emb)))
for doc_id, emb in self.documents.items()
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
def hybrid_search_weighted(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
alpha: float = 0.5,
top_k: int = 10,
) -> list[dict]:
"""Combine keyword and vector scores with a tunable weight.
alpha = 1.0 → pure vector search
alpha = 0.0 → pure keyword search
alpha = 0.5 → equal weight (good starting point)
"""
keyword_results = bm25_index.search(query, top_k=top_k * 2)
vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
# Normalize (min-max) each score list to [0, 1]
def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
rng = max_s - min_s
if rng == 0:
return {doc_id: 1.0 for doc_id, _ in results}
return {doc_id: (s - min_s) / rng for doc_id, s in results}
keyword_scores = normalize_scores(keyword_results)
vector_scores = normalize_scores(vector_results)
# Merge
all_doc_ids = set(keyword_scores) | set(vector_scores)
combined = []
for doc_id in all_doc_ids:
ks = keyword_scores.get(doc_id, 0.0)
vs = vector_scores.get(doc_id, 0.0)
combined.append({
"id": doc_id,
"score": alpha * vs + (1 - alpha) * ks,
"keyword_score": ks,
"vector_score": vs,
})
combined.sort(key=lambda x: x["score"], reverse=True)
return combined[:top_k]
def reciprocal_rank_fusion(
*ranked_lists: list[tuple[str, float]],
k: int = 60,
top_n: int = 10,
) -> list[dict]:
"""
Merge multiple ranked lists, uses RRF (Reciprocal Rank Fusion)
RRF score = sum over all lists of: 1 / (k + rank)
Why RRF over weighted combination?
- No score normalization needed (works on ranks, not raw scores)
- No alpha tuning needed
- Robust across different score distributions
- Used by Elasticsearch, Pinecone, Weaviate under the hood
"""
rrf_scores: dict[str, float] = defaultdict(float)
doc_details: dict[str, dict] = {}
for list_idx, ranked_list in enumerate(ranked_lists):
for rank, (doc_id, raw_score) in enumerate(ranked_list, start=1):
rrf_scores[doc_id] += 1.0 / (k + rank)
if doc_id not in doc_details:
doc_details[doc_id] = {}
doc_details[doc_id][f"list_{list_idx}_rank"] = rank
doc_details[doc_id][f"list_{list_idx}_score"] = raw_score
results = []
for doc_id, rrf_score in rrf_scores.items():
results.append({
"id": doc_id,
"rrf_score": round(rrf_score, 6),
**doc_details[doc_id],
})
results.sort(key=lambda x: x["rrf_score"], reverse=True)
return results[:top_n]
def hybrid_search_rrf(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
top_k: int = 10,
) -> list[dict]:
keyword_results = bm25_index.search(query, top_k=top_k * 2)
vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
return reciprocal_rank_fusion(keyword_results, vector_results, top_n=top_k)
Tip: I recommend hybrid retrieval based on both keywords and embeddings for fast retrieval. Keyword retrieval is great for exact terms (“Password policy”). Embeddings are better for conceptual or meaning-based matches. LlamaIndex is excellent at hybrid retrieval, where it can search for exact terms and for context around the question.
6. Establish automatic update and refresh routine
The final step is ensuring you keep the knowledge base always up to date. For this, you can implement selective forgetting. It’s the process of overwriting or deleting outdated and redundant data to keep the model accurate.
How to find which data to delete? There are valuation and observability platforms to assist. You can schedule test rules/queries in the DeepEval framework that regularly check if your AI model is accurate. If the answers are incorrect, TruLens platform helps you reach the exact chunk from where this answer was picked.
"""
Knowledge Base Quality Monitoring
Knowledge base health with the help of automated checks:
1. Retrieval quality — is it finding the right documents?
2. Freshness detection — Are documents stale or embeddings drifting?
3. Unified pipeline — Scheduled monitoring with alerts
"""
import time
import json
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Any, Callable
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kb_monitor")
def setup_deepeval_metrics():
"""Define retrieval quality metrics using DeepEval.
DeepEval provides LLM-evaluated metrics — it uses a judge LLM to score
whether retrieved context actually helps answer the question.
"""
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
from deepeval.test_case import LLMTestCase
metrics = {
# Does the answer address the question?
"relevancy": AnswerRelevancyMetric(threshold=0.7),
# Is the answer grounded in the retrieved context (no hallucination)?
"faithfulness": FaithfulnessMetric(threshold=0.7),
# Are the top-ranked retrieved docs actually relevant?
"context_precision": ContextualPrecisionMetric(threshold=0.7),
# Did we retrieve all the docs needed to answer?
"context_recall": ContextualRecallMetric(threshold=0.7),
}
return metrics, LLMTestCase
def evaluate_retrieval_quality(
rag_pipeline: Callable,
test_cases: list[dict],
) -> list[dict]:
"""Run a set of test queries through your RAG pipeline and score them.
Each test case should have:
- query: the user question
- expected_answer: ground truth answer (for recall/relevancy)
"""
from deepeval import evaluate
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
results = []
for tc in test_cases:
# Run your actual RAG pipeline
response = rag_pipeline(tc["query"])
test_case = LLMTestCase(
input=tc["query"],
actual_output=response["answer"],
expected_output=tc["expected_answer"],
retrieval_context=response["retrieved_contexts"],
)
metrics = [
AnswerRelevancyMetric(threshold=0.7),
FaithfulnessMetric(threshold=0.7),
ContextualPrecisionMetric(threshold=0.7),
ContextualRecallMetric(threshold=0.7),
]
for metric in metrics:
metric.measure(test_case)
results.append({
"query": tc["query"],
"scores": {m.__class__.__name__: m.score for m in metrics},
"passed": all(m.is_successful() for m in metrics),
})
return results
def setup_trulens_monitoring(rag_pipeline: Callable, app_name: str = "my_kb"):
"""Wrap your RAG pipeline with TruLens for continuous feedback logging.
TruLens records every query + response + retrieved context, then
runs feedback functions asynchronously to score each interaction.
"""
from trulens.core import TruSession, Feedback, Select
from trulens.providers.openai import OpenAI as TruLensOpenAI
from trulens.apps.custom import TruCustomApp, instrument
session = TruSession()
# Feedback provider (uses an LLM to judge quality)
provider = TruLensOpenAI()
feedbacks = [
# Is the response relevant to the query?
Feedback(provider.relevance)
.on_input()
.on_output(),
# Is the response grounded in retrieved context?
Feedback(provider.groundedness_measure_with_cot_reasons)
.on(Select.RecordCalls.retrieve.rets)
.on_output(),
# Is the retrieved context relevant to the query?
Feedback(provider.context_relevance)
.on_input()
.on(Select.RecordCalls.retrieve.rets),
]
# Wrap your pipeline — every call is now logged and scored
@instrument
class InstrumentedRAG:
def __init__(self, pipeline):
self._pipeline = pipeline
@instrument
def retrieve(self, query: str) -> list[str]:
result = self._pipeline(query)
return result["retrieved_contexts"]
@instrument
def query(self, query: str) -> str:
result = self._pipeline(query)
return result["answer"]
instrumented = InstrumentedRAG(rag_pipeline)
tru_app = TruCustomApp(
instrumented,
app_name=app_name,
feedbacks=feedbacks,
)
return tru_app, session
def get_trulens_dashboard_url(session) -> str:
"""Launch the TruLens dashboard to visualize quality over time."""
session.run_dashboard(port=8501)
return "http://localhost:8501"
@dataclass
class DocumentFreshness:
doc_id: str
last_updated: datetime
last_embedded: datetime
source_hash: str # hash of source content at embedding time
class FreshnessMonitor:
"""Detect stale documents and embedding drift."""
def __init__(self, staleness_threshold_days: int = 30):
self.threshold = timedelta(days=staleness_threshold_days)
self.freshness_records: dict[str, DocumentFreshness] = {}
def register(self, doc_id: str, source_hash: str) -> None:
now = datetime.utcnow()
self.freshness_records[doc_id] = DocumentFreshness(
doc_id=doc_id,
last_updated=now,
last_embedded=now,
source_hash=source_hash,
)
def check_staleness(self) -> dict:
"""Find documents that haven't been re-embedded recently."""
now = datetime.utcnow()
stale, fresh = [], []
for doc_id, record in self.freshness_records.items():
age = now - record.last_embedded
if age > self.threshold:
stale.append({"id": doc_id, "days_stale": age.days})
else:
fresh.append(doc_id)
return {
"total": len(self.freshness_records),
"fresh": len(fresh),
"stale": len(stale),
"stale_documents": stale,
}
def check_content_drift(
self, doc_id: str, current_source_hash: str
) -> bool:
"""Check if source content changed since last embedding."""
record = self.freshness_records.get(doc_id)
if not record:
return True # unknown doc, treat as drifted
return record.source_hash != current_source_hash
def detect_embedding_drift(
old_embeddings: dict[str, list[float]],
new_embeddings: dict[str, list[float]],
drift_threshold: float = 0.1,
) -> dict:
"""Compare old vs new embeddings for the same documents.
If your embedding model gets updated (or you switch models),
existing vectors may no longer be compatible. This detects that.
"""
drifted = []
common_ids = set(old_embeddings) & set(new_embeddings)
for doc_id in common_ids:
old = np.array(old_embeddings[doc_id])
new = np.array(new_embeddings[doc_id])
# cosine distance: 0 = identical, 2 = opposite
cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
cos_dist = 1 - cos_sim
if cos_dist > drift_threshold:
drifted.append({
"id": doc_id,
"cosine_distance": round(float(cos_dist), 4),
})
return {
"documents_compared": len(common_ids),
"drifted": len(drifted),
"drift_threshold": drift_threshold,
"drifted_documents": sorted(drifted, key=lambda x: x["cosine_distance"], reverse=True),
}
Using DeepEval in combination with TruLens automates the periodic testing of your knowledge base.
Top challenges in building a knowledge base (+ solutions)
Here are the common problems I’ve seen with the knowledge base:
1. Rise in data quality errors
AI models built over the years, even by reputed companies with solid teams, are hallucinating. The famous Air Canada chatbot mishap is one example where the model promised a refund to a customer against a policy that never existed.
While all engineers try to put relevant content in the knowledge base, the output still has issues. In my experience, a lack of domain expertise creates mistakes in identifying what is relevant. Remove the technical hat and wear a domain cap to identify outdated, conflicting, and irrelevant information in your knowledge base.
2. Slowness in retrieval
An AI model just providing the right answer is not enough. Users hate the loading or lag and want answers in the blink of an eye, at least from a machine.
Developers often get stuck on functionality and do not prioritize the optimization part, which is completely non-negotiable. Use the following tips to resolve the common slowness issue:
- Follow HNSW (Hierarchical Navigable Small World) or IVF indexes instead of flat indexes, as these groups relevant topics together for speedy retrieval
- Do quantization (shrinking the converted vectors from queries so they take up less memory) or recursive character splitting (breaking it into snippets) of queries so they take up less memory
- Keep your database and AI service in the same cloud region for faster access.
3. Poor scalability
To speed the implementation developers often make poor design decisions which affect scalability in the long run. One such issue is following a monolithic architecture in which all data storage and query processing occur in a single, tightly coupled cluster. As the model usage grows, CPU/RAM usage spikes across the entire cluster for every query. I suggest horizontal sharding (splitting data into multiple small servers) to handle scale effectively.
Another problem is the growing cost with scale, which typically happens if you are not quantizing or compressing the vectors to optimize storage. Developers miss the quantization step to get to the model faster. The downside is not visible initially, but soon the slowness and growing cloud bills show the gap.
A knowledge base isn’t a data dump but a curated asset
Building a knowledge base isn’t a one-time project. It’s an evolving asset that needs regular optimization. The structure you create today will reveal gaps tomorrow. Every failed query is feedback and each successful retrieval validates your design choices.
I suggest starting small, picking the ten most common questions for the model, building clear documentation for them, and then testing whether your model can actually give the right answers in the right time. Once you start getting expected output, you can iterate the process to expand the knowledge base.
The difference between a model that guesses and one that knows comes down to this deliberate curation work. Continuous refinement makes the next search easier and results more reliable.

