Monitoring & Observability

This guide explains how to monitor your DataBridge usage, track performance metrics, and set up observability for your applications.

OpenTelemetry Integration

DataBridge uses OpenTelemetry for comprehensive distributed tracing and metrics collection. This provides detailed insights into system performance and behavior.

Configuration

DataBridge automatically configures OpenTelemetry based on your environment:

from databridge import DataBridge

# Development mode (writes to local files)
db = DataBridge("your-uri", is_local=True)
# Writes to:
# - logs/telemetry/traces.log
# - logs/telemetry/metrics.log

# Production mode (uses OTLP exporters)
db = DataBridge("your-uri")
# Exports to configured OpenTelemetry collector

Resource Attributes

All telemetry data includes standard resource attributes:

  • service.name: "databridge-core"

  • Custom attributes for your application

Traces

Every operation in DataBridge is automatically traced, including:

  • Document ingestion

  • Semantic search queries

  • Completion requests

  • Authentication flows

Each trace includes:

  • Operation type

  • User ID

  • Duration

  • Status (success/error)

  • Custom metadata

Example of using traces:

from databridge import DataBridge

db = DataBridge("your-uri")

# Traces are automatically collected
async with db.track_operation("search", user_id="user123") as span:
    # Add custom attributes to the trace
    span.set_attribute("query_type", "semantic")
    
    # Perform operation
    results = await db.retrieve_chunks("query")
    
    # Record result metadata
    span.set_attribute("results_count", len(results))

Metrics

DataBridge automatically collects key metrics:

  1. Operation Counts (databridge.operations)

    • Description: "Number of operations performed"

    • Tagged with operation type and status

    • Counter metric type

    • Helps monitor usage patterns

  2. Token Usage (databridge.tokens)

    • Description: "Number of tokens processed"

    • Tagged with operation type

    • Counter metric type

    • Essential for cost monitoring

  3. Operation Duration (databridge.operation.duration)

    • Description: "Duration of operations"

    • Unit: milliseconds

    • Tagged with operation type

    • Histogram metric type

    • Helps identify performance issues

Example of accessing metrics:

# Token usage is automatically tracked
response = await db.query("What are the insights?")
print(f"Tokens used: {response.usage['total_tokens']}")

# Get user-specific usage statistics
usage_stats = db.get_user_usage("user123")
print(f"Total tokens: {usage_stats['total']}")

Local Development Mode

When running in local development mode (is_local=True), telemetry data is written to log files:

from databridge import DataBridge

# Enable local development mode
db = DataBridge("your-uri", is_local=True)

# Telemetry data will be written to:
# - logs/telemetry/traces.log: Detailed operation traces
# - logs/telemetry/metrics.log: System metrics (exported every minute)

Local Trace Format

Traces are written as JSON objects with the following structure:

{
    "name": "operation_name",
    "trace_id": "hex_trace_id",
    "span_id": "hex_span_id",
    "parent_id": "hex_parent_id",
    "start_time": "timestamp",
    "end_time": "timestamp",
    "attributes": {
        "operation.type": "search",
        "user.id": "user123",
        "metadata.query_type": "semantic"
    },
    "status": "OK"
}

Local Metric Format

Metrics are written as JSON objects with the following structure:

{
    "name": "databridge.operations",
    "description": "Number of operations performed",
    "unit": "",
    "data": {
        "data_points": [
            {
                "attributes": {
                    "operation": "search",
                    "status": "success"
                },
                "value": 1,
                "timestamp": "timestamp"
            }
        ]
    }
}

Production Monitoring

In production environments, DataBridge exports telemetry data through OpenTelemetry exporters:

  1. Trace Export

    • Uses OTLP (OpenTelemetry Protocol)

    • BatchSpanProcessor for efficient export

    • Includes detailed operation context

    • Supports distributed tracing

  2. Metric Export

    • Uses OTLP (OpenTelemetry Protocol)

    • PeriodicExportingMetricReader (60-second intervals)

    • Cumulative aggregation temporality

    • Supports standard monitoring systems

Example configuration:

# Metrics are automatically exported to your configured backend
# No additional code required for basic monitoring

# For custom metric tracking:
from databridge import DataBridge
from datetime import datetime, timedelta

db = DataBridge("your-uri")

# Get recent usage metrics
since = datetime.now() - timedelta(days=7)
usage_records = db.get_recent_usage(
    user_id="user123",
    operation_type="completion",
    since=since,
    status="success"
)

# Analyze usage patterns
for record in usage_records:
    print(f"Operation: {record.operation_type}")
    print(f"Tokens: {record.tokens_used}")
    print(f"Duration: {record.duration_ms}ms")
    print(f"Status: {record.status}")

Usage Analytics

Track detailed usage patterns programmatically:

# Get usage breakdown by operation type
token_usage = db.get_user_usage("user123")
print(f"Completion tokens: {token_usage['completion']}")
print(f"Embedding tokens: {token_usage['embedding']}")

# Get detailed usage records
records = db.get_recent_usage(
    user_id="user123",
    operation_type="search",
    since=datetime.now() - timedelta(hours=24)
)

# Analyze patterns
for record in records:
    print(f"Timestamp: {record.timestamp}")
    print(f"Operation: {record.operation_type}")
    print(f"Tokens: {record.tokens_used}")
    print(f"Duration: {record.duration_ms}")
    print(f"Metadata: {record.metadata}")

Health Checks

Implement comprehensive health monitoring:

from databridge import DataBridge

db = DataBridge("your-uri")

# Check system health
health = db.health_check()
print(f"System status: {health['status']}")
print(f"Components: {health['components']}")

# Implement periodic health checks
async def monitor_health():
    while True:
        try:
            health = await db.health_check()
            if health['status'] != 'healthy':
                logger.error(f"Unhealthy status: {health}")
        except Exception as e:
            logger.error(f"Health check failed: {e}")
        await asyncio.sleep(60)  # Check every minute

Common Monitoring Patterns

  1. Operation Tracking

async with db.track_operation(
    operation_type="search",
    user_id="user123",
    metadata={"query_type": "semantic"}
) as span:
    try:
        results = await db.retrieve_chunks("query")
        span.set_attribute("results_count", len(results))
    except Exception as e:
        span.set_status(Status(StatusCode.ERROR))
        span.record_exception(e)
        raise
  1. Usage Monitoring

class UsageMonitor:
    def __init__(self, db):
        self.db = db
        self.operation_counts = defaultdict(int)
        self.token_counts = defaultdict(int)
    
    async def track_operation(self, operation_type, user_id):
        async with self.db.track_operation(operation_type, user_id) as span:
            self.operation_counts[operation_type] += 1
            return span
    
    def track_tokens(self, operation_type, tokens):
        self.token_counts[operation_type] += tokens
    
    def get_metrics(self):
        return {
            "operations": dict(self.operation_counts),
            "tokens": dict(self.token_counts)
        }

monitor = UsageMonitor(db)
  1. Performance Tracking

class PerformanceTracker:
    def __init__(self):
        self.latencies = defaultdict(list)
    
    async def track_operation(self, operation_type, func):
        start_time = time.time()
        try:
            result = await func()
            duration = (time.time() - start_time) * 1000
            self.latencies[operation_type].append(duration)
            return result
        except Exception as e:
            logger.error(f"Operation failed: {operation_type}, {e}")
            raise
    
    def get_average_latency(self, operation_type):
        latencies = self.latencies[operation_type]
        return sum(latencies) / len(latencies) if latencies else 0

tracker = PerformanceTracker()

Next Steps

After setting up monitoring:

  1. Configure alerting thresholds

  2. Set up visualization dashboards

  3. Implement automated health checks

  4. Monitor usage patterns

  5. Optimize based on metrics

Last updated