This guide explains how to monitor your DataBridge usage, track performance metrics, and set up observability for your applications.
OpenTelemetry Integration
DataBridge uses OpenTelemetry for comprehensive distributed tracing and metrics collection. This provides detailed insights into system performance and behavior.
Configuration
DataBridge automatically configures OpenTelemetry based on your environment:
from databridge import DataBridge
# Development mode (writes to local files)
db = DataBridge("your-uri", is_local=True)
# Writes to:
# - logs/telemetry/traces.log
# - logs/telemetry/metrics.log
# Production mode (uses OTLP exporters)
db = DataBridge("your-uri")
# Exports to configured OpenTelemetry collector
Resource Attributes
All telemetry data includes standard resource attributes:
service.name: "databridge-core"
Custom attributes for your application
Traces
Every operation in DataBridge is automatically traced, including:
Document ingestion
Semantic search queries
Completion requests
Authentication flows
Each trace includes:
Operation type
User ID
Duration
Status (success/error)
Custom metadata
Example of using traces:
from databridge import DataBridge
db = DataBridge("your-uri")
# Traces are automatically collected
async with db.track_operation("search", user_id="user123") as span:
# Add custom attributes to the trace
span.set_attribute("query_type", "semantic")
# Perform operation
results = await db.retrieve_chunks("query")
# Record result metadata
span.set_attribute("results_count", len(results))
# Token usage is automatically tracked
response = await db.query("What are the insights?")
print(f"Tokens used: {response.usage['total_tokens']}")
# Get user-specific usage statistics
usage_stats = db.get_user_usage("user123")
print(f"Total tokens: {usage_stats['total']}")
Local Development Mode
When running in local development mode (is_local=True), telemetry data is written to log files:
from databridge import DataBridge
# Enable local development mode
db = DataBridge("your-uri", is_local=True)
# Telemetry data will be written to:
# - logs/telemetry/traces.log: Detailed operation traces
# - logs/telemetry/metrics.log: System metrics (exported every minute)
Local Trace Format
Traces are written as JSON objects with the following structure:
# Metrics are automatically exported to your configured backend
# No additional code required for basic monitoring
# For custom metric tracking:
from databridge import DataBridge
from datetime import datetime, timedelta
db = DataBridge("your-uri")
# Get recent usage metrics
since = datetime.now() - timedelta(days=7)
usage_records = db.get_recent_usage(
user_id="user123",
operation_type="completion",
since=since,
status="success"
)
# Analyze usage patterns
for record in usage_records:
print(f"Operation: {record.operation_type}")
print(f"Tokens: {record.tokens_used}")
print(f"Duration: {record.duration_ms}ms")
print(f"Status: {record.status}")
Usage Analytics
Track detailed usage patterns programmatically:
# Get usage breakdown by operation type
token_usage = db.get_user_usage("user123")
print(f"Completion tokens: {token_usage['completion']}")
print(f"Embedding tokens: {token_usage['embedding']}")
# Get detailed usage records
records = db.get_recent_usage(
user_id="user123",
operation_type="search",
since=datetime.now() - timedelta(hours=24)
)
# Analyze patterns
for record in records:
print(f"Timestamp: {record.timestamp}")
print(f"Operation: {record.operation_type}")
print(f"Tokens: {record.tokens_used}")
print(f"Duration: {record.duration_ms}")
print(f"Metadata: {record.metadata}")
Health Checks
Implement comprehensive health monitoring:
from databridge import DataBridge
db = DataBridge("your-uri")
# Check system health
health = db.health_check()
print(f"System status: {health['status']}")
print(f"Components: {health['components']}")
# Implement periodic health checks
async def monitor_health():
while True:
try:
health = await db.health_check()
if health['status'] != 'healthy':
logger.error(f"Unhealthy status: {health}")
except Exception as e:
logger.error(f"Health check failed: {e}")
await asyncio.sleep(60) # Check every minute
Common Monitoring Patterns
Operation Tracking
async with db.track_operation(
operation_type="search",
user_id="user123",
metadata={"query_type": "semantic"}
) as span:
try:
results = await db.retrieve_chunks("query")
span.set_attribute("results_count", len(results))
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise