34 KiB
Chapter 03: Knowledge Base System - Vector Search and Semantic Retrieval
The General Bots Knowledge Base (gbkb) system implements a state-of-the-art semantic search infrastructure that enables intelligent document retrieval through vector embeddings and neural information retrieval. This chapter provides comprehensive technical documentation on the architecture, implementation, and optimization of the knowledge base subsystem.
Executive Summary
The knowledge base system transforms unstructured documents into queryable semantic representations, enabling natural language understanding and context-aware information retrieval. Unlike traditional keyword-based search systems, the gbkb implementation leverages dense vector representations to capture semantic meaning, supporting cross-lingual retrieval, conceptual similarity matching, and intelligent context augmentation for language model responses.
System Architecture Overview
Core Components and Data Flow
The knowledge base architecture implements a multi-stage pipeline for document processing and retrieval:
┌─────────────────────────────────────────────────────────────────┐
│ Document Ingestion Layer │
│ (PDF, Word, Excel, Text, HTML, Markdown) │
├─────────────────────────────────────────────────────────────────┤
│ Preprocessing Pipeline │
│ (Extraction, Cleaning, Normalization, Validation) │
├─────────────────────────────────────────────────────────────────┤
│ Chunking Engine │
│ (Semantic Segmentation, Overlap Management, Metadata) │
├─────────────────────────────────────────────────────────────────┤
│ Embedding Generation │
│ (Transformer Models, Dimensionality Reduction) │
├─────────────────────────────────────────────────────────────────┤
│ Vector Index Layer │
│ (HNSW Index, Quantization, Sharding) │
├─────────────────────────────────────────────────────────────────┤
│ Retrieval Engine │
│ (Semantic Search, Hybrid Retrieval, Re-ranking) │
└─────────────────────────────────────────────────────────────────┘
Technical Specifications
| Component | Specification | Performance Characteristics |
|---|---|---|
| Embedding Model | all-MiniLM-L6-v2 | 384 dimensions, 22M parameters |
| Vector Index | HNSW (Hierarchical Navigable Small World) | M=16, ef_construction=200 |
| Chunk Size | 512 tokens (configurable) | Optimal for context windows |
| Overlap | 50 tokens | Preserves boundary context |
| Distance Metric | Cosine Similarity | Range: [-1, 1], normalized |
| Index Build Time | ~1000 docs/minute | Single-threaded CPU |
| Query Latency | <50ms p99 | For 1M documents |
| Memory Usage | ~1GB per million chunks | Including metadata |
Document Processing Pipeline
Phase 1: Document Ingestion and Extraction
The system implements format-specific extractors for comprehensive document support:
PDF Processing
class PDFExtractor:
"""
Advanced PDF extraction with layout preservation
"""
def extract(self, file_path: str) -> DocumentContent:
# Initialize PDF parser with configuration
parser_config = {
'preserve_layout': True,
'extract_images': True,
'detect_tables': True,
'extract_metadata': True,
'ocr_enabled': True,
'ocr_language': 'eng+fra+deu+spa',
'ocr_dpi': 300
}
# Multi-stage extraction process
raw_text = self.extract_text_layer(file_path)
if self.requires_ocr(raw_text):
ocr_text = self.perform_ocr(file_path, parser_config)
raw_text = self.merge_text_sources(raw_text, ocr_text)
# Extract structural elements
tables = self.extract_tables(file_path)
images = self.extract_images(file_path)
metadata = self.extract_metadata(file_path)
# Preserve document structure
sections = self.detect_sections(raw_text)
headings = self.extract_headings(raw_text)
return DocumentContent(
text=raw_text,
tables=tables,
images=images,
metadata=metadata,
structure=DocumentStructure(sections, headings)
)
Supported File Formats and Parsers
| Format | Parser Library | Features | Max Size | Processing Time |
|---|---|---|---|---|
| Apache PDFBox + Tesseract | Text, OCR, Tables, Images | 500MB | ~10s/MB | |
| DOCX | Apache POI + python-docx | Formatted text, Styles, Comments | 100MB | ~5s/MB |
| XLSX | Apache POI + openpyxl | Sheets, Formulas, Charts | 100MB | ~8s/MB |
| PPTX | Apache POI + python-pptx | Slides, Notes, Shapes | 200MB | ~7s/MB |
| HTML | BeautifulSoup + lxml | DOM parsing, CSS extraction | 50MB | ~3s/MB |
| Markdown | CommonMark + mistune | GFM support, Tables, Code | 10MB | ~1s/MB |
| Plain Text | Native UTF-8 decoder | Encoding detection | 100MB | <1s/MB |
| RTF | python-rtf | Formatted text, Images | 50MB | ~4s/MB |
| CSV/TSV | pandas + csv module | Tabular data, Headers | 1GB | ~2s/MB |
| JSON | ujson + jsonschema | Nested structures, Validation | 100MB | ~1s/MB |
| XML | lxml + xmlschema | XPath, XSLT, Validation | 100MB | ~3s/MB |
Phase 2: Text Preprocessing and Cleaning
The preprocessing pipeline ensures consistent, high-quality text for embedding:
class TextPreprocessor:
"""
Multi-stage text preprocessing pipeline
"""
def preprocess(self, text: str) -> str:
# Stage 1: Encoding normalization
text = self.normalize_unicode(text)
text = self.fix_encoding_errors(text)
# Stage 2: Whitespace and formatting
text = self.normalize_whitespace(text)
text = self.remove_control_characters(text)
text = self.fix_line_breaks(text)
# Stage 3: Content cleaning
text = self.remove_boilerplate(text)
text = self.clean_headers_footers(text)
text = self.remove_watermarks(text)
# Stage 4: Language-specific processing
language = self.detect_language(text)
text = self.apply_language_rules(text, language)
# Stage 5: Semantic preservation
text = self.preserve_entities(text)
text = self.preserve_acronyms(text)
text = self.preserve_numbers(text)
return text
def normalize_unicode(self, text: str) -> str:
"""Normalize Unicode characters to canonical form"""
import unicodedata
# NFD normalization followed by recomposition
text = unicodedata.normalize('NFD', text)
text = ''.join(
char for char in text
if unicodedata.category(char) != 'Mn'
)
text = unicodedata.normalize('NFC', text)
# Replace common Unicode artifacts
replacements = {
'\u2018': "'", '\u2019': "'", # Smart quotes
'\u201c': '"', '\u201d': '"',
'\u2013': '-', '\u2014': '--', # Dashes
'\u2026': '...', # Ellipsis
'\xa0': ' ', # Non-breaking space
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
Phase 3: Intelligent Chunking Strategy
The chunking engine implements context-aware segmentation:
class SemanticChunker:
"""
Advanced chunking with semantic boundary detection
"""
def chunk_document(self,
text: str,
chunk_size: int = 512,
overlap: int = 50) -> List[Chunk]:
# Detect natural boundaries
boundaries = self.detect_boundaries(text)
chunks = []
current_pos = 0
while current_pos < len(text):
# Find optimal chunk end point
chunk_end = self.find_optimal_split(
text,
current_pos,
chunk_size,
boundaries
)
# Extract chunk with context
chunk_text = text[current_pos:chunk_end]
# Add overlap from previous chunk
if chunks and overlap > 0:
overlap_start = max(0, chunk_end - overlap)
chunk_text = text[overlap_start:chunk_end]
# Generate chunk metadata
chunk = Chunk(
text=chunk_text,
start_pos=current_pos,
end_pos=chunk_end,
metadata=self.generate_metadata(chunk_text),
boundaries=self.get_chunk_boundaries(
current_pos,
chunk_end,
boundaries
)
)
chunks.append(chunk)
current_pos = chunk_end - overlap
return chunks
def detect_boundaries(self, text: str) -> List[Boundary]:
"""
Detect semantic boundaries in text
"""
boundaries = []
# Paragraph boundaries
for match in re.finditer(r'\n\n+', text):
boundaries.append(
Boundary('paragraph', match.start(), 1.0)
)
# Sentence boundaries
sentences = self.sentence_tokenizer.tokenize(text)
for i, sent in enumerate(sentences):
pos = text.find(sent)
boundaries.append(
Boundary('sentence', pos + len(sent), 0.8)
)
# Section headers
for match in re.finditer(
r'^#+\s+.+$|^[A-Z][^.!?]*:$',
text,
re.MULTILINE
):
boundaries.append(
Boundary('section', match.start(), 0.9)
)
# List boundaries
for match in re.finditer(
r'^\s*[-*•]\s+',
text,
re.MULTILINE
):
boundaries.append(
Boundary('list_item', match.start(), 0.7)
)
return sorted(boundaries, key=lambda b: b.position)
Chunking Configuration Parameters
| Parameter | Default | Range | Description | Impact |
|---|---|---|---|---|
| chunk_size | 512 | 128-2048 | Target tokens per chunk | Affects context granularity |
| overlap | 50 | 0-200 | Overlapping tokens | Preserves boundary context |
| split_strategy | semantic | semantic, fixed, sliding | Chunking algorithm | Quality vs speed tradeoff |
| respect_boundaries | true | true/false | Honor semantic boundaries | Improves coherence |
| min_chunk_size | 100 | 50-500 | Minimum viable chunk | Prevents fragments |
| max_chunk_size | 1024 | 512-4096 | Maximum chunk size | Memory constraints |
Phase 4: Embedding Generation
The system generates dense vector representations using transformer models:
class EmbeddingGenerator:
"""
High-performance embedding generation with batching
"""
def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
self.model = self.load_model(model_name)
self.tokenizer = self.load_tokenizer(model_name)
self.dimension = 384
self.max_length = 512
self.batch_size = 32
def generate_embeddings(self,
chunks: List[str]) -> np.ndarray:
"""
Generate embeddings with optimal batching
"""
embeddings = []
# Process in batches for efficiency
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
# Tokenize with padding and truncation
encoded = self.tokenizer(
batch,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors='pt'
)
# Generate embeddings
with torch.no_grad():
model_output = self.model(**encoded)
# Mean pooling over token embeddings
token_embeddings = model_output[0]
attention_mask = encoded['attention_mask']
# Compute mean pooling
input_mask_expanded = (
attention_mask
.unsqueeze(-1)
.expand(token_embeddings.size())
.float()
)
sum_embeddings = torch.sum(
token_embeddings * input_mask_expanded,
1
)
sum_mask = torch.clamp(
input_mask_expanded.sum(1),
min=1e-9
)
embeddings_batch = sum_embeddings / sum_mask
# Normalize embeddings
embeddings_batch = F.normalize(
embeddings_batch,
p=2,
dim=1
)
embeddings.append(embeddings_batch.cpu().numpy())
return np.vstack(embeddings)
Embedding Model Comparison
| Model | Dimensions | Size | Speed | Quality | Memory |
|---|---|---|---|---|---|
| all-MiniLM-L6-v2 | 384 | 80MB | 14,200 sent/sec | 0.631 | 290MB |
| all-mpnet-base-v2 | 768 | 420MB | 2,800 sent/sec | 0.634 | 1.2GB |
| multi-qa-MiniLM-L6 | 384 | 80MB | 14,200 sent/sec | 0.618 | 290MB |
| paraphrase-multilingual | 768 | 1.1GB | 2,300 sent/sec | 0.628 | 2.1GB |
| e5-base-v2 | 768 | 440MB | 2,700 sent/sec | 0.642 | 1.3GB |
| bge-base-en | 768 | 440MB | 2,600 sent/sec | 0.644 | 1.3GB |
Phase 5: Vector Index Construction
The system builds high-performance vector indices for similarity search:
class VectorIndexBuilder:
"""
HNSW index construction with optimization
"""
def build_index(self,
embeddings: np.ndarray,
metadata: List[Dict]) -> VectorIndex:
# Configure HNSW parameters
index_config = {
'metric': 'cosine',
'm': 16, # Number of bi-directional links
'ef_construction': 200, # Size of dynamic candidate list
'ef_search': 100, # Size of search candidate list
'num_threads': 4, # Parallel construction
'seed': 42 # Reproducible builds
}
# Initialize index
index = hnswlib.Index(
space=index_config['metric'],
dim=embeddings.shape[1]
)
# Set construction parameters
index.init_index(
max_elements=len(embeddings) * 2, # Allow growth
M=index_config['m'],
ef_construction=index_config['ef_construction'],
random_seed=index_config['seed']
)
# Add vectors with IDs
index.add_items(
embeddings,
ids=np.arange(len(embeddings)),
num_threads=index_config['num_threads']
)
# Set runtime search parameters
index.set_ef(index_config['ef_search'])
# Build metadata index
metadata_index = self.build_metadata_index(metadata)
# Optional: Build secondary indices
secondary_indices = {
'date_index': self.build_date_index(metadata),
'category_index': self.build_category_index(metadata),
'author_index': self.build_author_index(metadata)
}
return VectorIndex(
vector_index=index,
metadata_index=metadata_index,
secondary_indices=secondary_indices,
config=index_config
)
Index Performance Characteristics
| Documents | Build Time | Memory Usage | Query Time (k=10) | Recall@10 |
|---|---|---|---|---|
| 1K | 0.5s | 12MB | 0.8ms | 0.99 |
| 10K | 5s | 95MB | 1.2ms | 0.98 |
| 100K | 52s | 890MB | 3.5ms | 0.97 |
| 1M | 9m | 8.7GB | 12ms | 0.95 |
| 10M | 95m | 86GB | 45ms | 0.93 |
Retrieval System Architecture
Semantic Search Implementation
The retrieval engine implements multi-stage retrieval with re-ranking:
class SemanticRetriever:
"""
Advanced retrieval with hybrid search and re-ranking
"""
def retrieve(self,
query: str,
k: int = 10,
filters: Dict = None) -> List[SearchResult]:
# Stage 1: Query processing
processed_query = self.preprocess_query(query)
query_expansion = self.expand_query(processed_query)
# Stage 2: Generate query embedding
query_embedding = self.embedding_generator.generate(
processed_query
)
# Stage 3: Dense retrieval (vector search)
dense_results = self.vector_search(
query_embedding,
k=k * 3, # Over-retrieve for re-ranking
filters=filters
)
# Stage 4: Sparse retrieval (keyword search)
sparse_results = self.keyword_search(
query_expansion,
k=k * 2,
filters=filters
)
# Stage 5: Hybrid fusion
fused_results = self.reciprocal_rank_fusion(
dense_results,
sparse_results,
k=60 # Fusion parameter
)
# Stage 6: Re-ranking
reranked_results = self.rerank(
query=processed_query,
candidates=fused_results[:k * 2],
k=k
)
# Stage 7: Result enhancement
enhanced_results = self.enhance_results(
results=reranked_results,
query=processed_query
)
return enhanced_results
def vector_search(self,
embedding: np.ndarray,
k: int,
filters: Dict = None) -> List[SearchResult]:
"""
Perform approximate nearest neighbor search
"""
# Apply pre-filters if specified
if filters:
candidate_ids = self.apply_filters(filters)
search_params = {
'filter': lambda idx: idx in candidate_ids
}
else:
search_params = {}
# Execute vector search
distances, indices = self.index.search(
embedding.reshape(1, -1),
k=k,
**search_params
)
# Convert to search results
results = []
for dist, idx in zip(distances[0], indices[0]):
if idx == -1: # Invalid result
continue
# Retrieve metadata
metadata = self.metadata_store.get(idx)
# Calculate relevance score
score = self.distance_to_score(dist)
results.append(SearchResult(
chunk_id=idx,
score=score,
text=metadata['text'],
metadata=metadata,
distance=dist,
retrieval_method='dense'
))
return results
Query Processing and Expansion
Sophisticated query understanding and expansion:
class QueryProcessor:
"""
Query understanding and expansion
"""
def process_query(self, query: str) -> ProcessedQuery:
# Language detection
language = self.detect_language(query)
# Spell correction
corrected = self.spell_correct(query, language)
# Entity recognition
entities = self.extract_entities(corrected)
# Intent classification
intent = self.classify_intent(corrected)
# Query expansion techniques
expanded = self.expand_query(corrected)
return ProcessedQuery(
original=query,
corrected=corrected,
language=language,
entities=entities,
intent=intent,
expansions=expanded
)
def expand_query(self, query: str) -> List[str]:
"""
Multi-strategy query expansion
"""
expansions = [query] # Original query
# Synonym expansion
for word in query.split():
synonyms = self.get_synonyms(word)
for synonym in synonyms[:3]:
expanded = query.replace(word, synonym)
expansions.append(expanded)
# Acronym expansion
acronyms = self.detect_acronyms(query)
for acronym, expansion in acronyms.items():
expanded = query.replace(acronym, expansion)
expansions.append(expanded)
# Conceptual expansion (using WordNet)
concepts = self.get_related_concepts(query)
expansions.extend(concepts[:5])
# Query reformulation
reformulations = self.reformulate_query(query)
expansions.extend(reformulations)
return list(set(expansions)) # Remove duplicates
Hybrid Search and Fusion
Combining dense and sparse retrieval methods:
class HybridSearcher:
"""
Hybrid search with multiple retrieval strategies
"""
def reciprocal_rank_fusion(self,
dense_results: List[SearchResult],
sparse_results: List[SearchResult],
k: int = 60) -> List[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) for result merging
"""
# Create score dictionaries
dense_scores = {}
for rank, result in enumerate(dense_results):
dense_scores[result.chunk_id] = 1.0 / (k + rank + 1)
sparse_scores = {}
for rank, result in enumerate(sparse_results):
sparse_scores[result.chunk_id] = 1.0 / (k + rank + 1)
# Combine scores
all_ids = set(dense_scores.keys()) | set(sparse_scores.keys())
fused_results = []
for chunk_id in all_ids:
# RRF score combination
score = (
dense_scores.get(chunk_id, 0) * 0.7 + # Dense weight
sparse_scores.get(chunk_id, 0) * 0.3 # Sparse weight
)
# Find original result object
result = None
for r in dense_results + sparse_results:
if r.chunk_id == chunk_id:
result = r
break
if result:
result.fusion_score = score
fused_results.append(result)
# Sort by fusion score
fused_results.sort(key=lambda x: x.fusion_score, reverse=True)
return fused_results
Re-ranking with Cross-Encoders
Advanced re-ranking for improved precision:
class CrossEncoderReranker:
"""
Neural re-ranking with cross-encoder models
"""
def __init__(self, model_name: str = 'cross-encoder/ms-marco-MiniLM-L-6-v2'):
self.model = CrossEncoder(model_name)
self.batch_size = 32
def rerank(self,
query: str,
candidates: List[SearchResult],
k: int) -> List[SearchResult]:
"""
Re-rank candidates using cross-encoder
"""
# Prepare input pairs
pairs = [
(query, candidate.text)
for candidate in candidates
]
# Score in batches
scores = []
for i in range(0, len(pairs), self.batch_size):
batch = pairs[i:i + self.batch_size]
batch_scores = self.model.predict(batch)
scores.extend(batch_scores)
# Update candidate scores
for candidate, score in zip(candidates, scores):
candidate.rerank_score = score
# Sort by rerank score
candidates.sort(key=lambda x: x.rerank_score, reverse=True)
return candidates[:k]
Context Management and Compaction
Context Window Optimization
Intelligent context management for LLM consumption:
class ContextManager:
"""
Context optimization for language models
"""
def prepare_context(self,
search_results: List[SearchResult],
max_tokens: int = 2048) -> str:
"""
Prepare optimized context for LLM
"""
# Calculate token budget
token_budget = max_tokens
used_tokens = 0
# Select and order chunks
selected_chunks = []
for result in search_results:
# Estimate tokens
chunk_tokens = self.estimate_tokens(result.text)
if used_tokens + chunk_tokens <= token_budget:
selected_chunks.append(result)
used_tokens += chunk_tokens
else:
# Try to fit partial chunk
remaining_budget = token_budget - used_tokens
if remaining_budget > 100: # Minimum useful size
truncated = self.truncate_to_tokens(
result.text,
remaining_budget
)
result.text = truncated
selected_chunks.append(result)
break
# Format context
context = self.format_context(selected_chunks)
# Apply compression if needed
if self.compression_enabled:
context = self.compress_context(context)
return context
def compress_context(self, context: str) -> str:
"""
Compress context while preserving information
"""
# Remove redundancy
context = self.remove_redundant_sentences(context)
# Summarize verbose sections
context = self.summarize_verbose_sections(context)
# Preserve key information
context = self.preserve_key_facts(context)
return context
Dynamic Context Strategies
Adaptive context selection based on query type:
class DynamicContextStrategy:
"""
Query-aware context selection
"""
def select_strategy(self,
query: ProcessedQuery) -> ContextStrategy:
"""
Choose optimal context strategy
"""
if query.intent == 'factual':
return FactualContextStrategy(
max_chunks=3,
focus='precision',
include_metadata=True
)
elif query.intent == 'exploratory':
return ExploratoryContextStrategy(
max_chunks=8,
focus='breadth',
include_related=True
)
elif query.intent == 'comparison':
return ComparativeContextStrategy(
max_chunks=6,
focus='contrast',
group_by='topic'
)
elif query.intent == 'summarization':
return SummarizationContextStrategy(
max_chunks=10,
focus='coverage',
remove_redundancy=True
)
else:
return DefaultContextStrategy(
max_chunks=5,
focus='relevance'
)
Performance Optimization
Caching Architecture
Multi-level caching for optimal performance:
class KnowledgeCacheManager:
"""
Hierarchical caching system
"""
def __init__(self):
# L1: Query result cache (in-memory)
self.l1_cache = LRUCache(
max_size=1000,
ttl_seconds=300
)
# L2: Embedding cache (in-memory)
self.l2_cache = EmbeddingCache(
max_embeddings=10000,
ttl_seconds=3600
)
# L3: Document cache (disk)
self.l3_cache = DiskCache(
cache_dir='/var/cache/kb',
max_size_gb=10,
ttl_seconds=86400
)
# L4: CDN cache (edge)
self.l4_cache = CDNCache(
provider='cloudflare',
ttl_seconds=604800
)
def get(self, key: str, level: int = 1) -> Optional[Any]:
"""
Hierarchical cache lookup
"""
# Try each cache level
if level >= 1:
result = self.l1_cache.get(key)
if result:
return result
if level >= 2:
result = self.l2_cache.get(key)
if result:
# Promote to L1
self.l1_cache.set(key, result)
return result
if level >= 3:
result = self.l3_cache.get(key)
if result:
# Promote to L2 and L1
self.l2_cache.set(key, result)
self.l1_cache.set(key, result)
return result
if level >= 4:
result = self.l4_cache.get(key)
if result:
# Promote through all levels
self.l3_cache.set(key, result)
self.l2_cache.set(key, result)
self.l1_cache.set(key, result)
return result
return None
Index Optimization Techniques
Strategies for large-scale deployments:
optimization_strategies:
index_sharding:
description: "Split index across multiple shards"
when_to_use: "> 10M documents"
configuration:
shard_count: 8
shard_strategy: "hash_based"
replication_factor: 2
quantization:
description: "Reduce vector precision"
when_to_use: "Memory constrained"
configuration:
type: "product_quantization"
subvectors: 8
bits: 8
training_samples: 100000
hierarchical_index:
description: "Multi-level index structure"
when_to_use: "> 100M documents"
configuration:
levels: 3
fanout: 100
rerank_top_k: 1000
gpu_acceleration:
description: "Use GPU for search"
when_to_use: "Low latency critical"
configuration:
device: "cuda:0"
batch_size: 1000
precision: "float16"
Integration with LLM Systems
Retrieval-Augmented Generation (RAG)
Seamless integration with language models:
class RAGPipeline:
"""
Retrieval-Augmented Generation implementation
"""
def generate_response(self,
query: str,
conversation_history: List[Message] = None) -> str:
"""
Generate LLM response with retrieved context
"""
# Step 1: Retrieve relevant context
search_results = self.knowledge_base.search(
query=query,
k=5,
filters=self.build_filters(conversation_history)
)
# Step 2: Prepare context
context = self.context_manager.prepare_context(
search_results=search_results,
max_tokens=2048
)
# Step 3: Build prompt
prompt = self.build_prompt(
query=query,
context=context,
history=conversation_history
)
# Step 4: Generate response
response = self.llm.generate(
prompt=prompt,
temperature=0.7,
max_tokens=512
)
# Step 5: Post-process response
response = self.post_process(
response=response,
citations=search_results
)
# Step 6: Update conversation state
self.update_conversation_state(
query=query,
response=response,
context_used=search_results
)
return response
def build_prompt(self,
query: str,
context: str,
history: List[Message] = None) -> str:
"""
Construct optimized prompt for LLM
"""
prompt_template = """
You are a helpful assistant with access to a knowledge base.
Use the following context to answer the user's question.
If the context doesn't contain relevant information, say so.
Context:
{context}
Conversation History:
{history}
User Question: {query}
Assistant Response:
"""
history_text = self.format_history(history) if history else "None"
return prompt_template.format(
context=context,
history=history_text,
query=query
)
Monitoring and Analytics
Knowledge Base Metrics
Comprehensive monitoring for system health:
{
"timestamp": "2024-03-15T14:30:00Z",
"metrics": {
"collection_stats": {
"total_documents": 15823,
"total_chunks": 234567,
"total_embeddings": 234567,
"index_size_mb": 892,
"storage_size_gb": 12.4
},
"performance_metrics": {
"indexing_rate": 1247,
"query_latency_p50": 23,
"query_latency_p99": 87,
"embedding_latency_p50": 12,
"embedding_latency_p99": 45,
"cache_hit_rate": 0.823
},
"quality_metrics": {
"mean_relevance_score": 0.784,
"recall_at_10": 0.923