SDK Reference

DataBridge provides both synchronous and asynchronous clients for flexibility in different application contexts.

Synchronous Client

Initialization

from databridge import DataBridge

# Create client instance
db = DataBridge(
    uri="databridge://dev_123:token@localhost:8000",
    timeout=30,  # Optional request timeout in seconds
    is_local=True  # Set True for local development (disables SSL verification)
)

# Or use as context manager (recommended)
with DataBridge(uri) as db:
    # Your code here

Document Operations

ingest_text()

Ingest a text document.

def ingest_text(
    content: str,
    metadata: Optional[Dict[str, Any]] = None
) -> Document

Parameters:

  • content: Text content to ingest

  • metadata: Optional dictionary of metadata

Returns: Document object with the following fields:

  • external_id: Unique document identifier

  • content_type: Content type (always "text/plain" for text)

  • filename: Always None for text documents

  • metadata: User-provided metadata dictionary

  • storage_info: Empty for text documents

  • system_metadata: System-managed metadata (created_at, updated_at, version)

  • access_control: Access control lists (readers, writers, admins)

  • chunk_ids: List of chunk identifiers

Example:

doc = db.ingest_text(
    content="Machine learning is transforming industries...",
    metadata={
        "title": "ML Overview",
        "category": "tech",
        "tags": ["ml", "ai"],
        "author": "user123"
    }
)
print(f"Document ID: {doc.external_id}")
print(f"Created at: {doc.system_metadata['created_at']}")

ingest_file()

Ingest a file document.

def ingest_file(
    file: Union[str, bytes, BinaryIO, Path],
    filename: str,
    content_type: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
) -> Document

Parameters:

  • file: File to ingest (path string, bytes, file object, or Path)

  • filename: Name of the file

  • content_type: MIME type (optional, will be guessed if not provided)

  • metadata: Optional dictionary of metadata

Returns: Document object with storage information

Examples:

# From file path
doc = db.ingest_file(
    file="report.pdf",
    filename="Q4_Report.pdf",
    content_type="application/pdf",
    metadata={
        "department": "Finance",
        "year": 2024,
        "quarter": 4
    }
)

# From file object
with open("presentation.pptx", "rb") as f:
    doc = db.ingest_file(
        file=f,
        filename="presentation.pptx"
    )

# Access storage info
print(f"File stored at: {doc.storage_info['bucket']}/{doc.storage_info['key']}")

query()

Generate AI completions using relevant document context.

def query(
    query: str,
    filters: Optional[Dict[str, Any]] = None,
    k: int = 4,
    min_score: float = 0.0,
    max_tokens: Optional[int] = None,
    temperature: Optional[float] = None
) -> CompletionResponse

Parameters:

  • query: The question or prompt

  • filters: Optional metadata filters dictionary

  • k: Number of context chunks to use (default: 4)

  • min_score: Minimum similarity score threshold (default: 0.0)

  • max_tokens: Maximum tokens in completion

  • temperature: Sampling temperature for completion

Returns: CompletionResponse object with:

  • completion: Generated text response

  • usage: Token usage statistics (completion_tokens, prompt_tokens, total_tokens)

Example:

# Generate completion from context
response = db.query(
    "What are the main applications of machine learning in finance?",
    k=3,
    filters={"department": "Finance"},
    max_tokens=150,
    temperature=0.7
)

print("Answer:", response.completion)
print(f"Total tokens used: {response.usage.total_tokens}")

retrieve_chunks()

Search for relevant document chunks using semantic similarity.

def retrieve_chunks(
    query: str,
    filters: Optional[Dict[str, Any]] = None,
    k: int = 4,
    min_score: float = 0.0
) -> List[ChunkResult]

Parameters:

  • query: Search query text

  • filters: Optional metadata filters dictionary

  • k: Number of chunks to return (default: 4)

  • min_score: Minimum similarity score threshold (default: 0.0)

Returns: List of ChunkResult objects

Example:

# Search for relevant chunks
chunks = db.retrieve_chunks(
    "machine learning applications in finance",
    k=3,
    filters={"department": "Finance"}
)

for chunk in chunks:
    print(f"\nMatch (score: {chunk.score:.2f}):")
    print(chunk.content)
    print(f"From document: {chunk.document_id}")
    if chunk.download_url:
        print(f"Download URL: {chunk.download_url}")

retrieve_docs()

Search for relevant documents using semantic similarity.

def retrieve_docs(
    query: str,
    filters: Optional[Dict[str, Any]] = None,
    k: int = 4,
    min_score: float = 0.0
) -> List[DocumentResult]

Parameters: Same as retrieve_chunks()

Returns: List of DocumentResult objects

Example:

# Search for relevant documents
docs = db.retrieve_docs(
    "quarterly financial reports",
    filters={
        "department": "Finance",
        "year": 2024
    }
)

for doc in docs:
    print(f"\nDocument (score: {doc.score:.2f}):")
    print(f"ID: {doc.document_id}")
    print(f"Metadata: {doc.metadata}")
    print(f"Content Type: {doc.content_type}")
    if doc.download_url:
        print(f"Download URL: {doc.download_url}")

list_documents()

List accessible documents with pagination and filtering.

def list_documents(
    skip: int = 0,
    limit: int = 100,
    filters: Optional[Dict[str, Any]] = None
) -> List[Document]

Parameters:

  • skip: Number of documents to skip (default: 0)

  • limit: Maximum documents to return (default: 100)

  • filters: Optional metadata filters dictionary

Returns: List of Document objects

Example:

# Get first page of finance documents
docs = db.list_documents(
    limit=10,
    filters={"department": "Finance"}
)

# Get next page
next_page = db.list_documents(
    skip=10,
    limit=10,
    filters={"department": "Finance"}
)

get_document()

Get document metadata by ID.

def get_document(document_id: str) -> Document

Parameters:

  • document_id: The external ID of the document

Returns: Document object

Example:

doc = db.get_document("doc_abc123")
print(f"Title: {doc.metadata.get('title')}")
print(f"Created: {doc.system_metadata['created_at']}")
if doc.storage_info:
    print(f"Storage: {doc.storage_info['bucket']}/{doc.storage_info['key']}")

Asynchronous Client

The AsyncDataBridge client provides the same functionality as the synchronous client but with async/await support.

Initialization

from databridge import AsyncDataBridge

async with AsyncDataBridge(
    uri="databridge://dev_123:token@localhost:8000",
    timeout=30,
    is_local=True
) as db:
    # Your async code here

Usage Example

from databridge import AsyncDataBridge

async def main():
    async with AsyncDataBridge(uri) as db:
        # Ingest a document
        doc = await db.ingest_text(
            content="Example content",
            metadata={"category": "test"}
        )
        
        # Generate completion
        response = await db.query(
            "What are the main points in the content?",
            k=3,
            max_tokens=150
        )
        print("Answer:", response.completion)
        
        # Search chunks
        chunks = await db.retrieve_chunks(
            "example search",
            k=3
        )
        
        # List documents
        docs = await db.list_documents(limit=10)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Response Models

Document

class Document:
    external_id: str
    owner: Dict[str, str]
    content_type: str
    filename: Optional[str]
    metadata: Dict[str, Any]  # user-defined metadata
    storage_info: Dict[str, str]  # storage backend info
    system_metadata: Dict[str, Any]  # creation date, version, etc.
    additional_metadata: Dict[str, Any]  # e.g., frame descriptions and transcripts for videos
    access_control: Dict[str, List[str]]  # readers, writers, admins
    chunk_ids: List[str]

ChunkResult

class ChunkResult:
    content: str
    score: float
    document_id: str  # external_id
    chunk_number: int
    metadata: Dict[str, Any]
    content_type: str
    filename: Optional[str]
    download_url: Optional[str]

    def augmented_content(self, doc: DocumentResult) -> str:
        """Get augmented content for video chunks with frame/transcript info"""

DocumentResult

class DocumentResult:
    score: float  # Highest chunk score
    document_id: str  # external_id
    metadata: Dict[str, Any]
    content: DocumentContent  # type and value fields
    additional_metadata: Dict[str, Any]  # e.g., frame descriptions and transcripts

DocumentContent

class DocumentContent:
    type: Literal["url", "string"]  # Content type
    value: str  # URL or actual content
    filename: Optional[str]  # Required for URL type, None for string type

CompletionResponse

class CompletionResponse:
    completion: str
    usage: TokenUsage  # completion_tokens, prompt_tokens, total_tokens

Error Handling

from databridge.exceptions import (
    DataBridgeError,      # Base exception
    AuthenticationError,  # Auth issues
    ConnectionError      # Network issues
)

# Synchronous error handling
try:
    doc = db.ingest_text("content")
except AuthenticationError:
    print("Authentication failed - check your token")
except ConnectionError:
    print("Connection failed - check server status")
except DataBridgeError as e:
    print(f"Operation failed: {str(e)}")

# Asynchronous error handling
async def example():
    try:
        doc = await db.ingest_text("content")
    except AuthenticationError:
        print("Authentication failed - check your token")
    except ConnectionError:
        print("Connection failed - check server status")
    except DataBridgeError as e:
        print(f"Operation failed: {str(e)}")

Last updated