Skip to content

RAG

Retriever

RAGRetriever

Bases: BaseModel

RAG Retriever combining embedding and vector store.

Provides a unified interface for: - Adding documents (with automatic embedding) - Retrieving relevant context for queries - Chunking large documents

Example

from locus.rag import RAGRetriever, OCIEmbeddings, OracleVectorStore

retriever = RAGRetriever( ... embedder=OCIEmbeddings(model_id="cohere.embed-english-v3.0"), ... store=OracleVectorStore(dsn="..."), ... )

Add documents

await retriever.add_documents( ... [ ... "Python is a programming language.", ... "Oracle Database supports vectors.", ... ] ... )

Retrieve relevant context

results = await retriever.retrieve("What is Python?", limit=3) for r in results.documents: ... print(f"{r.score:.2f}: {r.document.content[:50]}...")

Example with chunking

retriever = RAGRetriever( ... embedder=embedder, ... store=store, ... chunk_size=500, ... chunk_overlap=50, ... ) await retriever.add_document(long_document, metadata={"source": "manual"})

add_document async

add_document(content: str, doc_id: str | None = None, metadata: dict[str, Any] | None = None, chunk: bool = True) -> list[str]

Add a document, optionally chunking it.

Parameters:

Name Type Description Default
content str

Document text

required
doc_id str | None

Optional document ID (auto-generated if not provided)

None
metadata dict[str, Any] | None

Optional metadata

None
chunk bool

Whether to chunk large documents

True

Returns:

Type Description
list[str]

List of document IDs (multiple if chunked)

Source code in src/locus/rag/retriever.py
async def add_document(
    self,
    content: str,
    doc_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    chunk: bool = True,
) -> list[str]:
    """
    Add a document, optionally chunking it.

    Args:
        content: Document text
        doc_id: Optional document ID (auto-generated if not provided)
        metadata: Optional metadata
        chunk: Whether to chunk large documents

    Returns:
        List of document IDs (multiple if chunked)
    """
    base_id = doc_id or uuid4().hex
    base_metadata = metadata or {}

    if chunk and len(content) > self.chunk_size:
        chunks = self._chunk_text(content)
    else:
        chunks = [content]

    # Embed all chunks
    embeddings = await self.embedder.embed_documents(chunks)

    # Create documents
    documents = []
    for i, (chunk_text, emb_result) in enumerate(zip(chunks, embeddings, strict=False)):
        chunk_id = f"{base_id}_{i}" if len(chunks) > 1 else base_id
        chunk_metadata = {
            **base_metadata,
            "chunk_index": i,
            "total_chunks": len(chunks),
            "parent_id": base_id,
        }

        doc = Document(
            id=chunk_id,
            content=chunk_text,
            embedding=emb_result.embedding,
            metadata=chunk_metadata,
        )
        documents.append(doc)

    # Store all documents
    added: list[str] = await self.store.add_batch(documents)
    return added

add_documents async

add_documents(contents: list[str], metadata: dict[str, Any] | None = None, chunk: bool = True) -> list[str]

Add multiple documents.

Parameters:

Name Type Description Default
contents list[str]

List of document texts

required
metadata dict[str, Any] | None

Optional metadata (applied to all)

None
chunk bool

Whether to chunk large documents

True

Returns:

Type Description
list[str]

List of all document IDs

Source code in src/locus/rag/retriever.py
async def add_documents(
    self,
    contents: list[str],
    metadata: dict[str, Any] | None = None,
    chunk: bool = True,
) -> list[str]:
    """
    Add multiple documents.

    Args:
        contents: List of document texts
        metadata: Optional metadata (applied to all)
        chunk: Whether to chunk large documents

    Returns:
        List of all document IDs
    """
    all_ids = []
    for content in contents:
        ids = await self.add_document(content, metadata=metadata, chunk=chunk)
        all_ids.extend(ids)
    return all_ids

add_file async

add_file(file_path: str | Path, doc_id: str | None = None, metadata: dict[str, Any] | None = None, chunk: bool = True) -> list[str]

Add a file (text, PDF, image, or audio).

Automatically detects content type and processes accordingly: - PDFs: Extracts text (with OCR fallback for scanned docs) - Images: OCR text extraction + optional description - Audio: Speech-to-text transcription - Text: Direct processing

Parameters:

Name Type Description Default
file_path str | Path

Path to the file

required
doc_id str | None

Optional document ID

None
metadata dict[str, Any] | None

Optional metadata

None
chunk bool

Whether to chunk large documents

True

Returns:

Type Description
list[str]

List of document IDs

Example

await retriever.add_file("manual.pdf") await retriever.add_file("diagram.png") await retriever.add_file("meeting.mp3")

Source code in src/locus/rag/retriever.py
async def add_file(
    self,
    file_path: str | Path,
    doc_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    chunk: bool = True,
) -> list[str]:
    """
    Add a file (text, PDF, image, or audio).

    Automatically detects content type and processes accordingly:
    - PDFs: Extracts text (with OCR fallback for scanned docs)
    - Images: OCR text extraction + optional description
    - Audio: Speech-to-text transcription
    - Text: Direct processing

    Args:
        file_path: Path to the file
        doc_id: Optional document ID
        metadata: Optional metadata
        chunk: Whether to chunk large documents

    Returns:
        List of document IDs

    Example:
        >>> await retriever.add_file("manual.pdf")
        >>> await retriever.add_file("diagram.png")
        >>> await retriever.add_file("meeting.mp3")
    """
    from locus.rag.multimodal import MultimodalProcessor

    path = Path(file_path)
    processor = MultimodalProcessor()

    # Process the file
    result = await processor.process(path)

    # Create metadata with content type info
    file_metadata = {
        **(metadata or {}),
        "source_file": path.name,
        "content_type": result.content_type.value,
        **result.metadata,
    }

    # Add to store
    base_id = doc_id or uuid4().hex

    if chunk and len(result.text) > self.chunk_size:
        chunks = self._chunk_text(result.text)
    else:
        chunks = [result.text]

    # Embed all chunks
    embeddings = await self.embedder.embed_documents(chunks)

    # Create documents
    documents = []
    for i, (chunk_text, emb_result) in enumerate(zip(chunks, embeddings, strict=False)):
        chunk_id = f"{base_id}_{i}" if len(chunks) > 1 else base_id
        chunk_metadata = {
            **file_metadata,
            "chunk_index": i,
            "total_chunks": len(chunks),
            "parent_id": base_id,
        }

        doc = Document(
            id=chunk_id,
            content=chunk_text,
            embedding=emb_result.embedding,
            metadata=chunk_metadata,
            content_type=result.content_type.value,
            raw_content=result.raw_content if i == 0 else None,  # Store raw only in first chunk
        )
        documents.append(doc)

    added: list[str] = await self.store.add_batch(documents)
    return added

add_image async

add_image(image: bytes | str | Path, doc_id: str | None = None, metadata: dict[str, Any] | None = None, use_ocr: bool = True) -> str

Add an image document.

Parameters:

Name Type Description Default
image bytes | str | Path

Image bytes, base64 string, or file path

required
doc_id str | None

Optional document ID

None
metadata dict[str, Any] | None

Optional metadata

None
use_ocr bool

Whether to use OCR for text extraction

True

Returns:

Type Description
str

Document ID

Source code in src/locus/rag/retriever.py
async def add_image(
    self,
    image: bytes | str | Path,
    doc_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    use_ocr: bool = True,
) -> str:
    """
    Add an image document.

    Args:
        image: Image bytes, base64 string, or file path
        doc_id: Optional document ID
        metadata: Optional metadata
        use_ocr: Whether to use OCR for text extraction

    Returns:
        Document ID
    """
    from locus.rag.multimodal import ContentType, ImageProcessor

    processor = ImageProcessor(use_ocr=use_ocr)
    result = await processor.process(image)

    # Embed the extracted text
    embedding_result = await self.embedder.embed(result.text)

    doc = Document(
        id=doc_id or uuid4().hex,
        content=result.text,
        embedding=embedding_result.embedding,
        metadata={**(metadata or {}), **result.metadata},
        content_type=ContentType.IMAGE.value,
        raw_content=result.raw_content,
    )

    doc_added: str = await self.store.add(doc)
    return doc_added

add_pdf async

add_pdf(pdf: bytes | str | Path, doc_id: str | None = None, metadata: dict[str, Any] | None = None, chunk: bool = True) -> list[str]

Add a PDF document.

Parameters:

Name Type Description Default
pdf bytes | str | Path

PDF bytes, base64 string, or file path

required
doc_id str | None

Optional document ID

None
metadata dict[str, Any] | None

Optional metadata

None
chunk bool

Whether to chunk the document

True

Returns:

Type Description
list[str]

List of document IDs (multiple if chunked)

Source code in src/locus/rag/retriever.py
async def add_pdf(
    self,
    pdf: bytes | str | Path,
    doc_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    chunk: bool = True,
) -> list[str]:
    """
    Add a PDF document.

    Args:
        pdf: PDF bytes, base64 string, or file path
        doc_id: Optional document ID
        metadata: Optional metadata
        chunk: Whether to chunk the document

    Returns:
        List of document IDs (multiple if chunked)
    """
    from locus.rag.multimodal import ContentType, PDFProcessor

    processor = PDFProcessor(use_ocr_fallback=True)
    result = await processor.process(pdf)

    return await self.add_document(
        result.text,
        doc_id=doc_id,
        metadata={**(metadata or {}), **result.metadata, "content_type": ContentType.PDF.value},
        chunk=chunk,
    )

add_audio async

add_audio(audio: bytes | str | Path, doc_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Add an audio/voice document.

Parameters:

Name Type Description Default
audio bytes | str | Path

Audio bytes, base64 string, or file path

required
doc_id str | None

Optional document ID

None
metadata dict[str, Any] | None

Optional metadata

None

Returns:

Type Description
str

Document ID

Source code in src/locus/rag/retriever.py
async def add_audio(
    self,
    audio: bytes | str | Path,
    doc_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Add an audio/voice document.

    Args:
        audio: Audio bytes, base64 string, or file path
        doc_id: Optional document ID
        metadata: Optional metadata

    Returns:
        Document ID
    """
    from locus.rag.multimodal import AudioProcessor, ContentType

    processor = AudioProcessor(use_whisper=True)
    result = await processor.process(audio)

    # Embed the transcription
    embedding_result = await self.embedder.embed(result.text)

    doc = Document(
        id=doc_id or uuid4().hex,
        content=result.text,
        embedding=embedding_result.embedding,
        metadata={**(metadata or {}), **result.metadata},
        content_type=ContentType.AUDIO.value,
        raw_content=result.raw_content,
    )

    doc_added: str = await self.store.add(doc)
    return doc_added

retrieve async

retrieve(query: str, limit: int = 5, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> RetrievalResult

Retrieve relevant documents for a query.

Parameters:

Name Type Description Default
query str

Query text

required
limit int

Maximum documents to return

5
threshold float | None

Minimum similarity score (0.0-1.0)

None
metadata_filter dict[str, Any] | None

Filter by metadata fields

None

Returns:

Type Description
RetrievalResult

RetrievalResult with ranked documents

Source code in src/locus/rag/retriever.py
async def retrieve(
    self,
    query: str,
    limit: int = 5,
    threshold: float | None = None,
    metadata_filter: dict[str, Any] | None = None,
) -> RetrievalResult:
    """
    Retrieve relevant documents for a query.

    Args:
        query: Query text
        limit: Maximum documents to return
        threshold: Minimum similarity score (0.0-1.0)
        metadata_filter: Filter by metadata fields

    Returns:
        RetrievalResult with ranked documents
    """
    import time as _time

    from locus.observability.emit import (  # noqa: PLC0415
        EV_RAG_QUERY_COMPLETED,
        EV_RAG_QUERY_STARTED,
        emit,
    )

    await emit(
        EV_RAG_QUERY_STARTED,
        query_preview=query[:160],
        limit=limit,
        store_type=type(self.store).__name__,
        threshold=threshold,
    )
    _started = _time.perf_counter()

    # Embed the query
    query_result = await self.embedder.embed_query(query)

    # Search the store
    results = await self.store.search(
        query_embedding=query_result.embedding,
        limit=limit,
        threshold=threshold,
        metadata_filter=metadata_filter,
    )

    await emit(
        EV_RAG_QUERY_COMPLETED,
        hit_count=len(results),
        top_score=results[0].score if results else None,
        duration_ms=(_time.perf_counter() - _started) * 1000,
        store_type=type(self.store).__name__,
    )

    return RetrievalResult(
        documents=results,
        query=query,
        total_results=len(results),
    )

retrieve_text async

retrieve_text(query: str, limit: int = 5, threshold: float | None = None, separator: str = '\n\n---\n\n', spotlight: bool = True) -> str

Retrieve and concatenate relevant documents as text.

Convenience method for injecting context into prompts.

Parameters:

Name Type Description Default
query str

Query text

required
limit int

Maximum documents to return

5
threshold float | None

Minimum similarity score

None
separator str

Text to join documents

'\n\n---\n\n'
spotlight bool

When True (default), wrap each document in <retrieved_document>...</retrieved_document> markers so the LLM can distinguish untrusted retrieved data from trusted instructions. Disable only if the caller wraps content itself.

True

Returns:

Type Description
str

Concatenated document contents.

Security note

Retrieved content is untrusted data — a poisoned document can attempt an indirect prompt-injection. The spotlight wrappers let you instruct the model (in the system prompt) to treat anything inside those tags as data only, never as instructions, and to refuse to perform tool calls whose arguments are quoted verbatim from retrieved content.

Source code in src/locus/rag/retriever.py
async def retrieve_text(
    self,
    query: str,
    limit: int = 5,
    threshold: float | None = None,
    separator: str = "\n\n---\n\n",
    spotlight: bool = True,
) -> str:
    """
    Retrieve and concatenate relevant documents as text.

    Convenience method for injecting context into prompts.

    Args:
        query: Query text
        limit: Maximum documents to return
        threshold: Minimum similarity score
        separator: Text to join documents
        spotlight: When True (default), wrap each document in
            ``<retrieved_document>``...``</retrieved_document>`` markers so the
            LLM can distinguish untrusted retrieved data from trusted
            instructions. Disable only if the caller wraps content itself.

    Returns:
        Concatenated document contents.

    Security note:
        Retrieved content is **untrusted data** — a poisoned document can
        attempt an indirect prompt-injection. The spotlight wrappers let
        you instruct the model (in the system prompt) to treat anything
        inside those tags as data only, never as instructions, and to
        refuse to perform tool calls whose arguments are quoted verbatim
        from retrieved content.
    """
    result = await self.retrieve(query, limit=limit, threshold=threshold)
    contents = [r.document.content for r in result.documents]
    if spotlight:
        contents = [
            f"<retrieved_document>\n{_escape_spotlight(c)}\n</retrieved_document>"
            for c in contents
        ]
    return separator.join(contents)

delete_document async

delete_document(doc_id: str) -> bool

Delete a document by ID.

Source code in src/locus/rag/retriever.py
async def delete_document(self, doc_id: str) -> bool:
    """Delete a document by ID."""
    deleted: bool = await self.store.delete(doc_id)
    return deleted

clear async

clear() -> int

Delete all documents.

Source code in src/locus/rag/retriever.py
async def clear(self) -> int:
    """Delete all documents."""
    cleared: int = await self.store.clear()
    return cleared

count async

count() -> int

Count documents in store.

Source code in src/locus/rag/retriever.py
async def count(self) -> int:
    """Count documents in store."""
    n: int = await self.store.count()
    return n

close async

close() -> None

Close resources.

Source code in src/locus/rag/retriever.py
async def close(self) -> None:
    """Close resources."""
    await self.store.close()

as_tool

as_tool(name: str = 'search_knowledge', description: str | None = None) -> Any

Create a tool function for agent use.

Returns a tool that can be registered with an agent.

Parameters:

Name Type Description Default
name str

Tool name

'search_knowledge'
description str | None

Tool description

None

Returns:

Type Description
Any

Tool function decorated with @tool

Source code in src/locus/rag/retriever.py
def as_tool(self, name: str = "search_knowledge", description: str | None = None) -> Any:
    """
    Create a tool function for agent use.

    Returns a tool that can be registered with an agent.

    Args:
        name: Tool name
        description: Tool description

    Returns:
        Tool function decorated with @tool
    """
    from locus.rag.tools import create_rag_tool

    return create_rag_tool(self, name=name, description=description)

Embeddings

OCIEmbeddings

OCIEmbeddings(model_id: str = OCIEmbeddingModel.COHERE_EMBED_ENGLISH_V3.value, compartment_id: str = '', profile_name: str = 'DEFAULT', auth_type: str = 'api_key', service_endpoint: str | None = None, **kwargs: Any)

Bases: BaseModel, BaseEmbedding

OCI GenAI Embeddings using Cohere models.

Uses Oracle Cloud Infrastructure GenAI service which hosts Cohere embedding models with enterprise-grade reliability.

Example

embedder = OCIEmbeddings( ... model_id="cohere.embed-english-v3.0", ... profile_name="DEFAULT", ... auth_type="security_token", ... ) result = await embedder.embed("Hello world") print(len(result.embedding)) # 1024

Example with compartment

embedder = OCIEmbeddings( ... model_id="cohere.embed-multilingual-v3.0", ... compartment_id="ocid1.compartment.oc1..xxx", ... )

Source code in src/locus/rag/embeddings/oci.py
def __init__(
    self,
    model_id: str = OCIEmbeddingModel.COHERE_EMBED_ENGLISH_V3.value,
    compartment_id: str = "",
    profile_name: str = "DEFAULT",
    auth_type: str = "api_key",
    service_endpoint: str | None = None,
    **kwargs: Any,
) -> None:
    oci_config = OCIEmbeddingConfig(
        model_id=model_id,
        compartment_id=compartment_id,
        profile_name=profile_name,
        auth_type=auth_type,
        service_endpoint=service_endpoint,
        **kwargs,
    )
    super().__init__(oci_config=oci_config)

config property

config: EmbeddingConfig

Get embedding configuration.

capabilities property

capabilities: EmbeddingCapabilities

OCI Cohere embeddings: native batching (96), separate SEARCH_QUERY vs SEARCH_DOCUMENT input types, image-capable variants for cohere.embed-*-image-v3.0.

dimension property

dimension: int

Get embedding dimension.

embed async

embed(text: str) -> EmbeddingResult

Embed a single text.

Source code in src/locus/rag/embeddings/oci.py
async def embed(self, text: str) -> EmbeddingResult:
    """Embed a single text."""
    results = await self.embed_batch([text])
    return results[0]

embed_batch async

embed_batch(texts: list[str]) -> list[EmbeddingResult]

Embed multiple texts.

Source code in src/locus/rag/embeddings/oci.py
async def embed_batch(self, texts: list[str]) -> list[EmbeddingResult]:
    """Embed multiple texts."""
    from oci.generative_ai_inference.models import (
        EmbedTextDetails,
        OnDemandServingMode,
    )

    client = await self._get_client()

    embed_details = EmbedTextDetails(
        inputs=texts,
        serving_mode=OnDemandServingMode(model_id=self.oci_config.model_id),
        compartment_id=self._get_compartment_id(),
        truncate=self.oci_config.truncate,
        input_type=self.oci_config.input_type,
    )

    response = client.embed_text(embed_details)
    embeddings = response.data.embeddings

    results = []
    for i, text in enumerate(texts):
        results.append(
            EmbeddingResult(
                embedding=embeddings[i],
                text=text,
                model=self.oci_config.model_id,
                tokens=None,  # OCI doesn't return token count
            )
        )

    return results

embed_query async

embed_query(query: str) -> EmbeddingResult

Embed a query for retrieval.

Uses SEARCH_QUERY input type for Cohere models.

Source code in src/locus/rag/embeddings/oci.py
async def embed_query(self, query: str) -> EmbeddingResult:
    """Embed a query for retrieval.

    Uses SEARCH_QUERY input type for Cohere models.
    """
    # Temporarily set input type for query
    original_type = self.oci_config.input_type
    # Note: Can't modify frozen config, so we handle this differently
    from oci.generative_ai_inference.models import (
        EmbedTextDetails,
        OnDemandServingMode,
    )

    client = await self._get_client()

    embed_details = EmbedTextDetails(
        inputs=[query],
        serving_mode=OnDemandServingMode(model_id=self.oci_config.model_id),
        compartment_id=self._get_compartment_id(),
        truncate=self.oci_config.truncate,
        input_type="SEARCH_QUERY",  # Query-specific
    )

    response = client.embed_text(embed_details)

    return EmbeddingResult(
        embedding=response.data.embeddings[0],
        text=query,
        model=self.oci_config.model_id,
        tokens=None,
    )

embed_documents async

embed_documents(documents: list[str]) -> list[EmbeddingResult]

Embed documents for storage.

Uses SEARCH_DOCUMENT input type for Cohere models.

Source code in src/locus/rag/embeddings/oci.py
async def embed_documents(self, documents: list[str]) -> list[EmbeddingResult]:
    """Embed documents for storage.

    Uses SEARCH_DOCUMENT input type for Cohere models.
    """
    from oci.generative_ai_inference.models import (
        EmbedTextDetails,
        OnDemandServingMode,
    )

    client = await self._get_client()

    # Process in batches
    results = []
    batch_size = self.config.batch_size

    for i in range(0, len(documents), batch_size):
        batch = documents[i : i + batch_size]

        embed_details = EmbedTextDetails(
            inputs=batch,
            serving_mode=OnDemandServingMode(model_id=self.oci_config.model_id),
            compartment_id=self._get_compartment_id(),
            truncate=self.oci_config.truncate,
            input_type="SEARCH_DOCUMENT",  # Document-specific
        )

        response = client.embed_text(embed_details)

        for j, text in enumerate(batch):
            results.append(
                EmbeddingResult(
                    embedding=response.data.embeddings[j],
                    text=text,
                    model=self.oci_config.model_id,
                    tokens=None,
                )
            )

    return results

OpenAIEmbeddings

OpenAIEmbeddings(model: str = 'text-embedding-3-small', api_key: str | None = None, dimensions: int | None = None, base_url: str | None = None, **kwargs: Any)

Bases: BaseEmbedding

OpenAI Embeddings provider.

Uses OpenAI's text-embedding models for generating embeddings.

Example

embedder = OpenAIEmbeddings( ... model="text-embedding-3-small", ... api_key="sk-...", ... ) result = await embedder.embed("Hello world") print(len(result.embedding)) # 1536

Initialize OpenAI embeddings.

Parameters:

Name Type Description Default
model str

OpenAI embedding model ID

'text-embedding-3-small'
api_key str | None

API key (defaults to OPENAI_API_KEY env var)

None
dimensions int | None

Output dimensions (for supported models)

None
base_url str | None

Custom base URL

None
**kwargs Any

Additional configuration

{}
Source code in src/locus/rag/embeddings/openai.py
def __init__(
    self,
    model: str = "text-embedding-3-small",
    api_key: str | None = None,
    dimensions: int | None = None,
    base_url: str | None = None,
    **kwargs: Any,
) -> None:
    """Initialize OpenAI embeddings.

    Args:
        model: OpenAI embedding model ID
        api_key: API key (defaults to OPENAI_API_KEY env var)
        dimensions: Output dimensions (for supported models)
        base_url: Custom base URL
        **kwargs: Additional configuration
    """
    self._config_model = OpenAIEmbeddingsConfig(
        model=model,
        api_key=api_key or os.environ.get("OPENAI_API_KEY"),
        dimensions=dimensions,
        base_url=base_url,
    )
    self._client: AsyncOpenAI | None = None
    self._embedding_config = EmbeddingConfig(
        dimension=self._config_model.dimension,
        max_tokens=8191,
        batch_size=2048,
    )

config property

config: EmbeddingConfig

Get embedding configuration.

capabilities property

capabilities: EmbeddingCapabilities

OpenAI embeddings: text-only, native batching, no separate query/doc spaces (text-embedding-3-* use the same space).

dimension property

dimension: int

Get embedding dimension.

embed async

embed(text: str) -> EmbeddingResult

Embed a single text.

Parameters:

Name Type Description Default
text str

Text to embed

required

Returns:

Type Description
EmbeddingResult

EmbeddingResult with vector and metadata

Source code in src/locus/rag/embeddings/openai.py
async def embed(self, text: str) -> EmbeddingResult:
    """Embed a single text.

    Args:
        text: Text to embed

    Returns:
        EmbeddingResult with vector and metadata
    """
    client = self._get_client()

    kwargs: dict[str, Any] = {
        "model": self._config_model.model,
        "input": text,
    }
    if self._config_model.dimensions:
        kwargs["dimensions"] = self._config_model.dimensions

    response = await client.embeddings.create(**kwargs)

    return EmbeddingResult(
        embedding=response.data[0].embedding,
        text=text,
        model=self._config_model.model,
        tokens=response.usage.total_tokens if response.usage else None,
    )

embed_batch async

embed_batch(texts: list[str]) -> list[EmbeddingResult]

Embed multiple texts in a single request.

Parameters:

Name Type Description Default
texts list[str]

List of texts to embed

required

Returns:

Type Description
list[EmbeddingResult]

List of EmbeddingResult, one per input text

Source code in src/locus/rag/embeddings/openai.py
async def embed_batch(self, texts: list[str]) -> list[EmbeddingResult]:
    """Embed multiple texts in a single request.

    Args:
        texts: List of texts to embed

    Returns:
        List of EmbeddingResult, one per input text
    """
    if not texts:
        return []

    client = self._get_client()

    kwargs: dict[str, Any] = {
        "model": self._config_model.model,
        "input": texts,
    }
    if self._config_model.dimensions:
        kwargs["dimensions"] = self._config_model.dimensions

    response = await client.embeddings.create(**kwargs)

    results = []
    for i, data in enumerate(response.data):
        results.append(
            EmbeddingResult(
                embedding=data.embedding,
                text=texts[i],
                model=self._config_model.model,
                tokens=None,  # Per-text tokens not available in batch
            )
        )
    return results

close async

close() -> None

Close the client.

Source code in src/locus/rag/embeddings/openai.py
async def close(self) -> None:
    """Close the client."""
    if self._client is not None:
        await self._client.close()
        self._client = None

embed_query async

embed_query(query: str) -> EmbeddingResult

Embed a query. Override if model has query-specific embeddings.

Source code in src/locus/rag/embeddings/base.py
async def embed_query(self, query: str) -> EmbeddingResult:
    """Embed a query. Override if model has query-specific embeddings."""
    return await self.embed(query)

embed_documents async

embed_documents(documents: list[str]) -> list[EmbeddingResult]

Embed documents. Override if model has document-specific embeddings.

Source code in src/locus/rag/embeddings/base.py
async def embed_documents(self, documents: list[str]) -> list[EmbeddingResult]:
    """Embed documents. Override if model has document-specific embeddings."""
    return await self.embed_batch(documents)

Vector stores

OracleVectorStore

OracleVectorStore(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', host: str | None = None, port: int = 1521, service_name: str | None = None, dimension: int = 1024, distance_metric: str = 'COSINE', **kwargs: Any)

Bases: BaseModel, BaseVectorStore

Oracle 26ai Vector Store with native VECTOR support.

Uses Oracle Database 23ai/26ai's native VECTOR data type for efficient similarity search. Supports cosine, dot product, and Euclidean distance metrics.

Example with DSN

store = OracleVectorStore( ... dsn="mydb_high", ... user="admin", ... password="secret", ... dimension=1024, ... ) await store.add(document) results = await store.search(query_embedding, limit=5)

Example with connection string

store = OracleVectorStore( ... host="adb.us-ashburn-1.oraclecloud.com", ... port=1522, ... service_name="xxx_high.adb.oraclecloud.com", ... )

Source code in src/locus/rag/stores/oracle.py
def __init__(
    self,
    dsn: str | None = None,
    user: str = "admin",
    password: str | SecretStr = "",
    host: str | None = None,
    port: int = 1521,
    service_name: str | None = None,
    dimension: int = 1024,
    distance_metric: str = "COSINE",
    **kwargs: Any,
) -> None:
    oracle_config = OracleVectorConfig(
        dsn=dsn,
        user=user,
        password=SecretStr(password) if isinstance(password, str) else password,
        host=host,
        port=port,
        service_name=service_name,
        dimension=dimension,
        distance_metric=distance_metric,
        **kwargs,
    )
    super().__init__(oracle_config=oracle_config)

config property

config: VectorStoreConfig

Get store configuration.

add async

add(document: Document) -> str

Add a document with embedding.

Source code in src/locus/rag/stores/oracle.py
async def add(self, document: Document) -> str:
    """Add a document with embedding."""
    await self._ensure_table()
    pool = await self._get_pool()

    doc_id = document.id or uuid4().hex

    if document.embedding is None:
        raise ValueError("Document must have an embedding")

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
            INSERT INTO {self._full_table_name}
            (id, content, embedding, metadata, created_at)
            VALUES (:id, :content, TO_VECTOR(:embedding), :metadata, :created_at)
            """,
            {
                "id": doc_id,
                "content": document.content,
                "embedding": self._vector_to_string(document.embedding),
                "metadata": json.dumps(document.metadata),
                "created_at": document.created_at,
            },
        )
        await conn.commit()

    return doc_id

add_batch async

add_batch(documents: list[Document]) -> list[str]

Add multiple documents.

Source code in src/locus/rag/stores/oracle.py
async def add_batch(self, documents: list[Document]) -> list[str]:
    """Add multiple documents."""
    await self._ensure_table()
    pool = await self._get_pool()

    ids = []
    async with pool.acquire() as conn, conn.cursor() as cursor:
        for doc in documents:
            doc_id = doc.id or uuid4().hex
            ids.append(doc_id)

            if doc.embedding is None:
                raise ValueError(f"Document {doc_id} must have an embedding")

            await cursor.execute(
                f"""
                INSERT INTO {self._full_table_name}
                (id, content, embedding, metadata, created_at)
                VALUES (:id, :content, TO_VECTOR(:embedding), :metadata, :created_at)
                """,
                {
                    "id": doc_id,
                    "content": doc.content,
                    "embedding": self._vector_to_string(doc.embedding),
                    "metadata": json.dumps(doc.metadata),
                    "created_at": doc.created_at,
                },
            )
        await conn.commit()

    return ids

get async

get(doc_id: str) -> Document | None

Get a document by ID.

Source code in src/locus/rag/stores/oracle.py
async def get(self, doc_id: str) -> Document | None:
    """Get a document by ID."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
            SELECT id, content, VECTOR_SERIALIZE(embedding) as embedding,
                   metadata, created_at
            FROM {self._full_table_name}
            WHERE id = :id
            """,
            {"id": doc_id},
        )
        row = await cursor.fetchone()

    if row is None:
        return None

    # Parse embedding from serialized format
    embedding_str = row[2]
    if embedding_str:
        # Remove brackets and parse floats
        embedding_str = embedding_str.strip("[]")
        embedding = [float(x) for x in embedding_str.split(",")]
    else:
        embedding = None

    # Parse metadata (handle async LOB)
    metadata = row[3]
    if hasattr(metadata, "read"):
        metadata = await metadata.read()
    if isinstance(metadata, str):
        metadata = json.loads(metadata) if metadata else {}

    # Parse content (handle async LOB)
    content = row[1]
    if hasattr(content, "read"):
        content = await content.read()

    return Document(
        id=row[0],
        content=content,
        embedding=embedding,
        metadata=metadata,
        created_at=row[4] if row[4] else datetime.now(UTC),
    )

delete async

delete(doc_id: str) -> bool

Delete a document.

Source code in src/locus/rag/stores/oracle.py
async def delete(self, doc_id: str) -> bool:
    """Delete a document."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"DELETE FROM {self._full_table_name} WHERE id = :id",
            {"id": doc_id},
        )
        deleted: bool = cursor.rowcount > 0
        await conn.commit()

    return deleted

search async

search(query_embedding: list[float], limit: int = 10, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> list[SearchResult]

Search for similar documents using vector similarity.

Uses Oracle's VECTOR_DISTANCE function for efficient similarity search.

Source code in src/locus/rag/stores/oracle.py
async def search(
    self,
    query_embedding: list[float],
    limit: int = 10,
    threshold: float | None = None,
    metadata_filter: dict[str, Any] | None = None,
) -> list[SearchResult]:
    """Search for similar documents using vector similarity.

    Uses Oracle's VECTOR_DISTANCE function for efficient similarity search.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    # Build distance function based on metric
    metric = self.oracle_config.distance_metric.upper()
    distance_func = f"VECTOR_DISTANCE(embedding, TO_VECTOR(:query_vec), {metric})"

    # Build WHERE clause for metadata filtering
    where_clauses = []
    params: dict[str, Any] = {
        "query_vec": self._vector_to_string(query_embedding),
        "limit": limit,
    }

    if metadata_filter:
        for key, value in metadata_filter.items():
            if not key.isidentifier():
                msg = f"Invalid metadata filter key: {key!r}"
                raise ValueError(msg)
            param_name = f"meta_{key}"
            where_clauses.append(f"JSON_VALUE(metadata, '$.{key}') = :{param_name}")
            params[param_name] = str(value)

    where_sql = ""
    if where_clauses:
        where_sql = "WHERE " + " AND ".join(where_clauses)

    # For cosine distance, lower is better (0 = identical)
    # Convert to similarity score: 1 - distance for cosine
    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
            SELECT id, content, VECTOR_SERIALIZE(embedding) as embedding,
                   metadata, created_at,
                   {distance_func} as distance
            FROM {self._full_table_name}
            {where_sql}
            ORDER BY distance ASC
            FETCH FIRST :limit ROWS ONLY
            """,
            params,
        )
        rows = await cursor.fetchall()

    results = []
    for row in rows:
        distance = row[5]

        # Convert distance to similarity score (0-1, higher is better)
        if metric == "COSINE":
            # Cosine distance is 0-2, convert to similarity
            score = 1.0 - (distance / 2.0)
        elif metric == "DOT":
            # Dot product: higher is better, normalize
            score = max(0.0, min(1.0, (distance + 1.0) / 2.0))
        else:  # EUCLIDEAN
            # Euclidean: lower is better, use exponential decay
            score = 1.0 / (1.0 + distance)

        # Apply threshold filter
        if threshold is not None and score < threshold:
            continue

        # Parse embedding
        embedding_str = row[2]
        if embedding_str:
            embedding_str = embedding_str.strip("[]")
            embedding = [float(x) for x in embedding_str.split(",")]
        else:
            embedding = None

        # Parse metadata (handle async LOB)
        metadata = row[3]
        if hasattr(metadata, "read"):
            metadata = await metadata.read()
        if isinstance(metadata, str):
            metadata = json.loads(metadata) if metadata else {}

        # Parse content (handle async LOB)
        content = row[1]
        if hasattr(content, "read"):
            content = await content.read()

        doc = Document(
            id=row[0],
            content=content,
            embedding=embedding,
            metadata=metadata,
            created_at=row[4] if row[4] else datetime.now(UTC),
        )

        results.append(
            SearchResult(
                document=doc,
                score=score,
                distance=distance,
            )
        )

    return results

count async

count() -> int

Count documents in store.

Source code in src/locus/rag/stores/oracle.py
async def count(self) -> int:
    """Count documents in store."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(f"SELECT COUNT(*) FROM {self._full_table_name}")
        row = await cursor.fetchone()

    return row[0] if row else 0

clear async

clear() -> int

Delete all documents.

Source code in src/locus/rag/stores/oracle.py
async def clear(self) -> int:
    """Delete all documents."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(f"SELECT COUNT(*) FROM {self._full_table_name}")
        row = await cursor.fetchone()
        count = row[0] if row else 0

        await cursor.execute(f"TRUNCATE TABLE {self._full_table_name}")
        await conn.commit()

    return count

close async

close() -> None

Close connection pool.

Source code in src/locus/rag/stores/oracle.py
async def close(self) -> None:
    """Close connection pool."""
    if self._pool:
        await self._pool.close()
        self._pool = None

InMemoryVectorStore

InMemoryVectorStore(dimension: int = 1024, distance_metric: str = 'cosine')

Bases: BaseVectorStore

In-memory vector store for testing and development.

Fast but not persistent - data is lost when process exits.

Example

store = InMemoryVectorStore(dimension=1024) await store.add(document) results = await store.search(query_embedding, limit=5)

Source code in src/locus/rag/stores/memory.py
def __init__(
    self,
    dimension: int = 1024,
    distance_metric: str = "cosine",
):
    self._dimension = dimension
    self._distance_metric = distance_metric
    self._documents: dict[str, Document] = {}

add async

add(document: Document) -> str

Add a document.

Source code in src/locus/rag/stores/memory.py
async def add(self, document: Document) -> str:
    """Add a document."""
    if document.embedding is None:
        raise ValueError("Document must have an embedding")
    self._documents[document.id] = document
    return document.id

add_batch async

add_batch(documents: list[Document]) -> list[str]

Add multiple documents.

Source code in src/locus/rag/stores/memory.py
async def add_batch(self, documents: list[Document]) -> list[str]:
    """Add multiple documents."""
    ids = []
    for doc in documents:
        doc_id = await self.add(doc)
        ids.append(doc_id)
    return ids

get async

get(doc_id: str) -> Document | None

Get a document by ID.

Source code in src/locus/rag/stores/memory.py
async def get(self, doc_id: str) -> Document | None:
    """Get a document by ID."""
    return self._documents.get(doc_id)

delete async

delete(doc_id: str) -> bool

Delete a document.

Source code in src/locus/rag/stores/memory.py
async def delete(self, doc_id: str) -> bool:
    """Delete a document."""
    if doc_id in self._documents:
        del self._documents[doc_id]
        return True
    return False

search async

search(query_embedding: list[float], limit: int = 10, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> list[SearchResult]

Search for similar documents.

Source code in src/locus/rag/stores/memory.py
async def search(
    self,
    query_embedding: list[float],
    limit: int = 10,
    threshold: float | None = None,
    metadata_filter: dict[str, Any] | None = None,
) -> list[SearchResult]:
    """Search for similar documents."""
    results = []

    for doc in self._documents.values():
        if doc.embedding is None:
            continue

        # Apply metadata filter
        if metadata_filter:
            match = True
            for key, value in metadata_filter.items():
                if doc.metadata.get(key) != value:
                    match = False
                    break
            if not match:
                continue

        # Compute similarity/distance
        if self._distance_metric == "cosine":
            score = self._cosine_similarity(query_embedding, doc.embedding)
            distance = 1.0 - score
        elif self._distance_metric == "euclidean":
            distance = self._euclidean_distance(query_embedding, doc.embedding)
            score = 1.0 / (1.0 + distance)
        else:  # dot_product
            score = self._dot_product(query_embedding, doc.embedding)
            distance = -score  # Higher is better for dot product

        # Apply threshold
        if threshold is not None and score < threshold:
            continue

        results.append(
            SearchResult(
                document=doc,
                score=score,
                distance=distance,
            )
        )

    # Sort by score (descending)
    results.sort(key=lambda r: r.score, reverse=True)

    return results[:limit]

count async

count() -> int

Count documents.

Source code in src/locus/rag/stores/memory.py
async def count(self) -> int:
    """Count documents."""
    return len(self._documents)

clear async

clear() -> int

Delete all documents.

Source code in src/locus/rag/stores/memory.py
async def clear(self) -> int:
    """Delete all documents."""
    count = len(self._documents)
    self._documents.clear()
    return count

close async

close() -> None

Close any resources.

Source code in src/locus/rag/stores/base.py
async def close(self) -> None:
    """Close any resources."""