Monitoring & Observability
This guide explains how to monitor your DataBridge usage, track performance metrics, and set up observability for your applications.
OpenTelemetry Integration
DataBridge uses OpenTelemetry for comprehensive distributed tracing and metrics collection. This provides detailed insights into system performance and behavior.
Configuration
DataBridge automatically configures OpenTelemetry based on your environment:
from databridge import DataBridge
# Development mode (writes to local files)
db = DataBridge("your-uri", is_local=True)
# Writes to:
# - logs/telemetry/traces.log
# - logs/telemetry/metrics.log
# Production mode (uses OTLP exporters)
db = DataBridge("your-uri")
# Exports to configured OpenTelemetry collector
Resource Attributes
All telemetry data includes standard resource attributes:
service.name
: "databridge-core"Custom attributes for your application
Traces
Every operation in DataBridge is automatically traced, including:
Document ingestion
Semantic search queries
Completion requests
Authentication flows
Each trace includes:
Operation type
User ID
Duration
Status (success/error)
Custom metadata
Example of using traces:
from databridge import DataBridge
db = DataBridge("your-uri")
# Traces are automatically collected
async with db.track_operation("search", user_id="user123") as span:
# Add custom attributes to the trace
span.set_attribute("query_type", "semantic")
# Perform operation
results = await db.retrieve_chunks("query")
# Record result metadata
span.set_attribute("results_count", len(results))
Metrics
DataBridge automatically collects key metrics:
Operation Counts (
databridge.operations
)Description: "Number of operations performed"
Tagged with operation type and status
Counter metric type
Helps monitor usage patterns
Token Usage (
databridge.tokens
)Description: "Number of tokens processed"
Tagged with operation type
Counter metric type
Essential for cost monitoring
Operation Duration (
databridge.operation.duration
)Description: "Duration of operations"
Unit: milliseconds
Tagged with operation type
Histogram metric type
Helps identify performance issues
Example of accessing metrics:
# Token usage is automatically tracked
response = await db.query("What are the insights?")
print(f"Tokens used: {response.usage['total_tokens']}")
# Get user-specific usage statistics
usage_stats = db.get_user_usage("user123")
print(f"Total tokens: {usage_stats['total']}")
Local Development Mode
When running in local development mode (is_local=True
), telemetry data is written to log files:
from databridge import DataBridge
# Enable local development mode
db = DataBridge("your-uri", is_local=True)
# Telemetry data will be written to:
# - logs/telemetry/traces.log: Detailed operation traces
# - logs/telemetry/metrics.log: System metrics (exported every minute)
Local Trace Format
Traces are written as JSON objects with the following structure:
{
"name": "operation_name",
"trace_id": "hex_trace_id",
"span_id": "hex_span_id",
"parent_id": "hex_parent_id",
"start_time": "timestamp",
"end_time": "timestamp",
"attributes": {
"operation.type": "search",
"user.id": "user123",
"metadata.query_type": "semantic"
},
"status": "OK"
}
Local Metric Format
Metrics are written as JSON objects with the following structure:
{
"name": "databridge.operations",
"description": "Number of operations performed",
"unit": "",
"data": {
"data_points": [
{
"attributes": {
"operation": "search",
"status": "success"
},
"value": 1,
"timestamp": "timestamp"
}
]
}
}
Production Monitoring
In production environments, DataBridge exports telemetry data through OpenTelemetry exporters:
Trace Export
Uses OTLP (OpenTelemetry Protocol)
BatchSpanProcessor for efficient export
Includes detailed operation context
Supports distributed tracing
Metric Export
Uses OTLP (OpenTelemetry Protocol)
PeriodicExportingMetricReader (60-second intervals)
Cumulative aggregation temporality
Supports standard monitoring systems
Example configuration:
# Metrics are automatically exported to your configured backend
# No additional code required for basic monitoring
# For custom metric tracking:
from databridge import DataBridge
from datetime import datetime, timedelta
db = DataBridge("your-uri")
# Get recent usage metrics
since = datetime.now() - timedelta(days=7)
usage_records = db.get_recent_usage(
user_id="user123",
operation_type="completion",
since=since,
status="success"
)
# Analyze usage patterns
for record in usage_records:
print(f"Operation: {record.operation_type}")
print(f"Tokens: {record.tokens_used}")
print(f"Duration: {record.duration_ms}ms")
print(f"Status: {record.status}")
Usage Analytics
Track detailed usage patterns programmatically:
# Get usage breakdown by operation type
token_usage = db.get_user_usage("user123")
print(f"Completion tokens: {token_usage['completion']}")
print(f"Embedding tokens: {token_usage['embedding']}")
# Get detailed usage records
records = db.get_recent_usage(
user_id="user123",
operation_type="search",
since=datetime.now() - timedelta(hours=24)
)
# Analyze patterns
for record in records:
print(f"Timestamp: {record.timestamp}")
print(f"Operation: {record.operation_type}")
print(f"Tokens: {record.tokens_used}")
print(f"Duration: {record.duration_ms}")
print(f"Metadata: {record.metadata}")
Health Checks
Implement comprehensive health monitoring:
from databridge import DataBridge
db = DataBridge("your-uri")
# Check system health
health = db.health_check()
print(f"System status: {health['status']}")
print(f"Components: {health['components']}")
# Implement periodic health checks
async def monitor_health():
while True:
try:
health = await db.health_check()
if health['status'] != 'healthy':
logger.error(f"Unhealthy status: {health}")
except Exception as e:
logger.error(f"Health check failed: {e}")
await asyncio.sleep(60) # Check every minute
Common Monitoring Patterns
Operation Tracking
async with db.track_operation(
operation_type="search",
user_id="user123",
metadata={"query_type": "semantic"}
) as span:
try:
results = await db.retrieve_chunks("query")
span.set_attribute("results_count", len(results))
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise
Usage Monitoring
class UsageMonitor:
def __init__(self, db):
self.db = db
self.operation_counts = defaultdict(int)
self.token_counts = defaultdict(int)
async def track_operation(self, operation_type, user_id):
async with self.db.track_operation(operation_type, user_id) as span:
self.operation_counts[operation_type] += 1
return span
def track_tokens(self, operation_type, tokens):
self.token_counts[operation_type] += tokens
def get_metrics(self):
return {
"operations": dict(self.operation_counts),
"tokens": dict(self.token_counts)
}
monitor = UsageMonitor(db)
Performance Tracking
class PerformanceTracker:
def __init__(self):
self.latencies = defaultdict(list)
async def track_operation(self, operation_type, func):
start_time = time.time()
try:
result = await func()
duration = (time.time() - start_time) * 1000
self.latencies[operation_type].append(duration)
return result
except Exception as e:
logger.error(f"Operation failed: {operation_type}, {e}")
raise
def get_average_latency(self, operation_type):
latencies = self.latencies[operation_type]
return sum(latencies) / len(latencies) if latencies else 0
tracker = PerformanceTracker()
Next Steps
After setting up monitoring:
Configure alerting thresholds
Set up visualization dashboards
Implement automated health checks
Monitor usage patterns
Optimize based on metrics
Last updated