Skip to content

Models

Registry

get_model

get_model(model_string: str, **kwargs: Any) -> ModelProtocol

Get a model from a string identifier.

Format: "provider:model_name"

Examples:

  • "openai:gpt-4o"
  • "oci:cohere.command-r-plus"

Parameters:

Name Type Description Default
model_string str

Model identifier in "provider:model" format

required
**kwargs Any

Provider-specific configuration

{}

Returns:

Type Description
ModelProtocol

Model instance

Raises:

Type Description
ValueError

If provider is unknown or model string is invalid

Source code in src/locus/models/registry.py
def get_model(model_string: str, **kwargs: Any) -> ModelProtocol:
    """
    Get a model from a string identifier.

    Format: "provider:model_name"

    Examples:
        - "openai:gpt-4o"
        - "oci:cohere.command-r-plus"

    Args:
        model_string: Model identifier in "provider:model" format
        **kwargs: Provider-specific configuration

    Returns:
        Model instance

    Raises:
        ValueError: If provider is unknown or model string is invalid
    """
    if ":" not in model_string:
        raise ValueError(
            f"Model string must be 'provider:model', got: {model_string}. "
            f"Available providers: {list(_PROVIDERS.keys())}"
        )

    provider, model_id = model_string.split(":", 1)

    if provider not in _PROVIDERS:
        raise ValueError(f"Unknown provider: {provider}. Available: {list(_PROVIDERS.keys())}")

    return _PROVIDERS[provider](model_id, **kwargs)

OCI Generative AI

OCIOpenAIModel

OCIOpenAIModel(model: str, *, profile: str | None = None, auth_type: str | None = None, compartment_id: str | None = None, region: str = DEFAULT_OCI_GENAI_REGION, config_file: str = '~/.oci/config', base_url: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: OpenAIModel

OCI GenAI model accessed through the /openai/v1 endpoint.

Reuses :class:OpenAIModel for message conversion, tool handling, response parsing, and streaming. The only thing this class adds is the OCI-specific auth wiring.

Pass exactly one of profile, auth_type.

Initialize the OCI OpenAI-compat model.

Parameters:

Name Type Description Default
model str

OCI model identifier (e.g. openai.gpt-5, meta.llama-3.3-70b-instruct).

required
profile str | None

OCI config profile name from config_file. Mutually exclusive with auth_type.

None
auth_type str | None

"instance_principal" or "resource_principal". Mutually exclusive with profile. Requires compartment_id.

None
compartment_id str | None

OCI compartment OCID, sent as opc-compartment-id. Auto-derived from the profile's tenancy under profile=. Must be supplied explicitly under auth_type=.

None
region str

OCI region hosting the inference endpoint.

DEFAULT_OCI_GENAI_REGION
config_file str

Path to the OCI config file (used with profile).

'~/.oci/config'
base_url str | None

Override the derived endpoint URL (e.g. for a custom realm). Defaults to the OpenAI-compat URL for region.

None
max_tokens int

Default max tokens. For o1/o3/gpt-5* this is automatically forwarded as max_completion_tokens — see :class:OpenAIModel.

4096
temperature float

Default sampling temperature.

0.7
**kwargs Any

Forwarded to :class:OCIOpenAIConfig (top_p, seed, frequency_penalty, presence_penalty, ...).

{}

Raises:

Type Description
ValueError

If zero or both auth modes are set, if auth_type is invalid, or if auth_type is set without compartment_id.

Source code in src/locus/models/providers/oci/openai_compat.py
def __init__(
    self,
    model: str,
    *,
    profile: str | None = None,
    auth_type: str | None = None,
    compartment_id: str | None = None,
    region: str = DEFAULT_OCI_GENAI_REGION,
    config_file: str = "~/.oci/config",
    base_url: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    """Initialize the OCI OpenAI-compat model.

    Args:
        model: OCI model identifier (e.g. ``openai.gpt-5``,
            ``meta.llama-3.3-70b-instruct``).
        profile: OCI config profile name from ``config_file``. Mutually
            exclusive with ``auth_type``.
        auth_type: ``"instance_principal"`` or ``"resource_principal"``.
            Mutually exclusive with ``profile``. Requires
            ``compartment_id``.
        compartment_id: OCI compartment OCID, sent as
            ``opc-compartment-id``. Auto-derived from the profile's
            tenancy under ``profile=``. Must be supplied explicitly
            under ``auth_type=``.
        region: OCI region hosting the inference endpoint.
        config_file: Path to the OCI config file (used with ``profile``).
        base_url: Override the derived endpoint URL (e.g. for a custom
            realm). Defaults to the OpenAI-compat URL for ``region``.
        max_tokens: Default max tokens. For ``o1``/``o3``/``gpt-5*``
            this is automatically forwarded as
            ``max_completion_tokens`` — see :class:`OpenAIModel`.
        temperature: Default sampling temperature.
        **kwargs: Forwarded to :class:`OCIOpenAIConfig` (top_p, seed,
            frequency_penalty, presence_penalty, ...).

    Raises:
        ValueError: If zero or both auth modes are set, if ``auth_type``
            is invalid, or if ``auth_type`` is set without
            ``compartment_id``.
    """
    modes_set = sum(x is not None for x in (profile, auth_type))
    if modes_set != 1:
        msg = (
            "OCIOpenAIModel: specify exactly one auth mode. "
            "Either profile='<section_from_~/.oci/config>' (api-key path; "
            "use profile='DEFAULT' for the default section), "
            "or auth_type='instance_principal'|'resource_principal'|"
            "'security_token'|'delegation_token' together with compartment_id=. "
            f"Got profile={profile!r}, auth_type={auth_type!r}."
        )
        raise ValueError(msg)
    if auth_type is not None and auth_type not in _VALID_AUTH_TYPES:
        msg = f"auth_type must be one of {_VALID_AUTH_TYPES}, got {auth_type!r}"
        raise ValueError(msg)
    if auth_type is not None and compartment_id is None:
        msg = "compartment_id is required when auth_type= is set"
        raise ValueError(msg)

    # Pop fields we set explicitly to avoid duplicate-kwarg errors
    # when callers splat a config dict that includes the same keys.
    for explicit in (
        "model",
        "profile",
        "auth_type",
        "compartment_id",
        "region",
        "config_file",
        "base_url",
        "max_tokens",
        "temperature",
    ):
        kwargs.pop(explicit, None)

    # Resolve compartment_id with this precedence:
    #   1. explicit ``compartment_id=`` arg
    #   2. ``OCI_COMPARTMENT`` env var (matches OCIModel + the test
    #      conftest convention so a single export drives every test)
    #   3. profile's tenancy (last-resort default; may not have GenAI
    #      policy if the profile lives in a different home tenancy than
    #      the GenAI compartment, which is the MY_PROFILE / cross-tenancy case)
    if compartment_id is None:
        import os

        compartment_id = os.getenv("OCI_COMPARTMENT") or os.getenv("OCI_COMPARTMENT_ID")
    if compartment_id is None and profile is not None:
        try:
            profile_cfg = _load_profile_config(profile, config_file)
            compartment_id = profile_cfg.get("tenancy")
        except Exception:  # noqa: BLE001 — profile load may fail; keep None
            compartment_id = None

    config = OCIOpenAIConfig(
        model=model,
        api_key=None,
        base_url=base_url or build_oci_openai_base_url(region),
        region=region,
        profile=profile,
        auth_type=auth_type,
        compartment_id=compartment_id,
        config_file=config_file,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    # Skip OpenAIModel.__init__ — it would rebuild the config without
    # OCI fields. Go straight to the Pydantic BaseModel init.
    super(OpenAIModel, self).__init__(config=config)

client property

client: AsyncOpenAI

Build the AsyncOpenAI client wired with the OCI request signer.

supports_structured_output property

supports_structured_output: bool

Native response_format={"type":"json_schema",...} support.

OpenAI's chat-completions API accepts a JSON-schema response_format and guarantees a parseable instance. The agent loop uses this property to skip the prompted-JSON fallback when the provider ships native structured output.

close async

close() -> None

Close the OpenAI client and release resources.

Source code in src/locus/models/native/openai.py
async def close(self) -> None:
    """Close the OpenAI client and release resources."""
    if self._client is not None:
        await self._client.close()
        self._client = None

__aenter__ async

__aenter__() -> OpenAIModel

Async context manager entry.

Source code in src/locus/models/native/openai.py
async def __aenter__(self) -> OpenAIModel:
    """Async context manager entry."""
    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit - close client.

Source code in src/locus/models/native/openai.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit - close client."""
    await self.close()

ainvoke async

ainvoke(messages: list[Any], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> Any

LangChain-compatible alias — returns Message (AIMessage equivalent).

Source code in src/locus/models/native/openai.py
async def ainvoke(
    self,
    messages: list[Any],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> Any:
    """LangChain-compatible alias — returns Message (AIMessage equivalent)."""
    response = await self.complete(messages, tools=tools, **kwargs)
    return response.message if hasattr(response, "message") else response

bind_tools

bind_tools(tools: list[Any], **kwargs: Any) -> OpenAIModel

LangChain-compatible bind_tools.

Source code in src/locus/models/native/openai.py
def bind_tools(self, tools: list[Any], **kwargs: Any) -> OpenAIModel:
    """LangChain-compatible bind_tools."""
    bound = self.model_copy()
    object.__setattr__(
        bound,
        "_bound_tools",
        [t.to_openai_schema() if hasattr(t, "to_openai_schema") else t for t in (tools or [])],
    )
    return bound

OpenAI

OpenAIModel

OpenAIModel(model: str = 'gpt-4o', api_key: str | None = None, base_url: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: BaseModel

OpenAI model provider.

Supports GPT-4o, GPT-4, o1, o3 models with streaming and tool calling.

Example

model = OpenAIModel(model="gpt-4o") response = await model.complete([Message.user("Hello!")])

Initialize OpenAI model.

Source code in src/locus/models/native/openai.py
def __init__(
    self,
    model: str = "gpt-4o",
    api_key: str | None = None,
    base_url: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    """Initialize OpenAI model."""
    config = OpenAIConfig(
        model=model,
        api_key=api_key,
        base_url=base_url,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

Native response_format={"type":"json_schema",...} support.

OpenAI's chat-completions API accepts a JSON-schema response_format and guarantees a parseable instance. The agent loop uses this property to skip the prompted-JSON fallback when the provider ships native structured output.

client property

client: AsyncOpenAI

Get or create the OpenAI client.

The client is configured with explicit max_retries and timeout from :class:OpenAIConfig so transient errors (429, 5xx, network resets) don't kill the agent loop on first try. The openai SDK retries with exponential backoff between attempts.

close async

close() -> None

Close the OpenAI client and release resources.

Source code in src/locus/models/native/openai.py
async def close(self) -> None:
    """Close the OpenAI client and release resources."""
    if self._client is not None:
        await self._client.close()
        self._client = None

__aenter__ async

__aenter__() -> OpenAIModel

Async context manager entry.

Source code in src/locus/models/native/openai.py
async def __aenter__(self) -> OpenAIModel:
    """Async context manager entry."""
    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Async context manager exit - close client.

Source code in src/locus/models/native/openai.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit - close client."""
    await self.close()

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format

None
**kwargs Any

Additional OpenAI-specific options

{}

Returns:

Type Description
ModelResponse

Model response with message and metadata

Source code in src/locus/models/native/openai.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """
    Complete a chat request.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format
        **kwargs: Additional OpenAI-specific options

    Returns:
        Model response with message and metadata
    """
    openai_messages = self._convert_messages(messages)
    openai_tools = self._convert_tools(tools)

    uses_completion_tokens = self._uses_max_completion_tokens(self.config.model)
    rejects_sampling = self._rejects_sampling_params(self.config.model)

    max_tokens_value = kwargs.get("max_tokens", self.config.max_tokens)

    request_kwargs: dict[str, Any] = {
        "model": self.config.model,
        "messages": openai_messages,
    }

    # Use appropriate token parameter based on model
    if uses_completion_tokens:
        request_kwargs["max_completion_tokens"] = max_tokens_value
    else:
        request_kwargs["max_tokens"] = max_tokens_value
        if not rejects_sampling:
            request_kwargs["temperature"] = kwargs.get("temperature", self.config.temperature)
            request_kwargs["top_p"] = kwargs.get("top_p", self.config.top_p)
            # Only send penalties when the user customized them. Some
            # providers (Grok) reject the parameter outright, even at
            # zero — server defaults are 0.0 anyway, so omitting the
            # default value is functionally identical for those that
            # accept it.
            freq = kwargs.get("frequency_penalty", self.config.frequency_penalty)
            if freq != 0.0:
                request_kwargs["frequency_penalty"] = freq
            pres = kwargs.get("presence_penalty", self.config.presence_penalty)
            if pres != 0.0:
                request_kwargs["presence_penalty"] = pres

    if openai_tools:
        request_kwargs["tools"] = openai_tools

    if self.config.seed is not None:
        request_kwargs["seed"] = self.config.seed

    if self.config.stop_sequences and not uses_completion_tokens:
        request_kwargs["stop"] = self.config.stop_sequences

    # Forward ``response_format`` for structured output. Caller is expected
    # to pass a fully-formed dict (see locus.core.structured.build_response_format).
    response_format = kwargs.get("response_format")
    if response_format is not None:
        request_kwargs["response_format"] = response_format

    response = await self.client.chat.completions.create(**request_kwargs)
    return self._parse_response(response)

ainvoke async

ainvoke(messages: list[Any], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> Any

LangChain-compatible alias — returns Message (AIMessage equivalent).

Source code in src/locus/models/native/openai.py
async def ainvoke(
    self,
    messages: list[Any],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> Any:
    """LangChain-compatible alias — returns Message (AIMessage equivalent)."""
    response = await self.complete(messages, tools=tools, **kwargs)
    return response.message if hasattr(response, "message") else response

bind_tools

bind_tools(tools: list[Any], **kwargs: Any) -> OpenAIModel

LangChain-compatible bind_tools.

Source code in src/locus/models/native/openai.py
def bind_tools(self, tools: list[Any], **kwargs: Any) -> OpenAIModel:
    """LangChain-compatible bind_tools."""
    bound = self.model_copy()
    object.__setattr__(
        bound,
        "_bound_tools",
        [t.to_openai_schema() if hasattr(t, "to_openai_schema") else t for t in (tools or [])],
    )
    return bound

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format

None
**kwargs Any

Additional OpenAI-specific options

{}

Yields:

Type Description
AsyncIterator[ModelChunkEvent]

Streaming chunks with content and/or tool calls

Source code in src/locus/models/native/openai.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """
    Stream a chat response.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format
        **kwargs: Additional OpenAI-specific options

    Yields:
        Streaming chunks with content and/or tool calls
    """
    openai_messages = self._convert_messages(messages)
    openai_tools = self._convert_tools(tools)

    uses_completion_tokens = self._uses_max_completion_tokens(self.config.model)
    rejects_sampling = self._rejects_sampling_params(self.config.model)

    max_tokens_value = kwargs.get("max_tokens", self.config.max_tokens)

    request_kwargs: dict[str, Any] = {
        "model": self.config.model,
        "messages": openai_messages,
        "stream": True,
    }

    # Use appropriate token parameter based on model
    if uses_completion_tokens:
        request_kwargs["max_completion_tokens"] = max_tokens_value
    elif rejects_sampling:
        request_kwargs["max_tokens"] = max_tokens_value
    else:
        request_kwargs["max_tokens"] = max_tokens_value
        request_kwargs["temperature"] = kwargs.get("temperature", self.config.temperature)
        request_kwargs["top_p"] = kwargs.get("top_p", self.config.top_p)
        # See note in complete() — same penalty conditional.
        freq = kwargs.get("frequency_penalty", self.config.frequency_penalty)
        if freq != 0.0:
            request_kwargs["frequency_penalty"] = freq
        pres = kwargs.get("presence_penalty", self.config.presence_penalty)
        if pres != 0.0:
            request_kwargs["presence_penalty"] = pres

    if openai_tools:
        request_kwargs["tools"] = openai_tools

    if self.config.seed is not None:
        request_kwargs["seed"] = self.config.seed

    if self.config.stop_sequences:
        request_kwargs["stop"] = self.config.stop_sequences

    # Forward ``response_format`` for streaming structured output —
    # symmetric with complete(). Caller is expected to pass a fully-
    # formed dict (see locus.core.structured.build_response_format).
    response_format = kwargs.get("response_format")
    if response_format is not None:
        request_kwargs["response_format"] = response_format

    # Track tool calls during streaming
    current_tool_calls: dict[int, dict[str, Any]] = {}

    stream = await self.client.chat.completions.create(**request_kwargs)

    async for chunk in stream:
        if not chunk.choices:
            continue

        choice = chunk.choices[0]
        delta = getattr(choice, "delta", None)

        # Some providers (Gemini) emit chunks where ``delta`` is None
        # — skip past content/tool-call handling but still let the
        # finish_reason check below run.
        if delta is None:
            if choice.finish_reason:
                pass  # fall through to finish-reason block
            else:
                continue

        # Handle content
        if delta is not None and delta.content:
            yield ModelChunkEvent(content=delta.content)

        # Handle tool calls
        if delta is not None and delta.tool_calls:
            for tc_delta in delta.tool_calls:
                idx = tc_delta.index
                if idx not in current_tool_calls:
                    current_tool_calls[idx] = {
                        "id": tc_delta.id or "",
                        "name": "",
                        "arguments": "",
                    }

                if tc_delta.id:
                    current_tool_calls[idx]["id"] = tc_delta.id
                if tc_delta.function:
                    if tc_delta.function.name:
                        current_tool_calls[idx]["name"] = tc_delta.function.name
                    if tc_delta.function.arguments:
                        current_tool_calls[idx]["arguments"] += tc_delta.function.arguments

        # Check for end of stream
        if choice.finish_reason:
            # Emit any accumulated tool calls
            if current_tool_calls:
                tool_calls = []
                for tc_data in current_tool_calls.values():
                    try:
                        arguments = (
                            json.loads(tc_data["arguments"]) if tc_data["arguments"] else {}
                        )
                    except json.JSONDecodeError:
                        arguments = {}
                    tool_calls.append(
                        ToolCall(
                            id=tc_data["id"],
                            name=tc_data["name"],
                            arguments=arguments,
                        )
                    )
                yield ModelChunkEvent(tool_calls=tool_calls)

            yield ModelChunkEvent(done=True)

Anthropic

AnthropicModel

AnthropicModel(model: str = 'claude-sonnet-4-20250514', api_key: str | None = None, base_url: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, prompt_cache: bool = False, **kwargs: Any)

Bases: BaseModel

Anthropic model provider.

Supports Claude 4.6, 4.5, 3.5 models with streaming and tool calling.

Example

model = AnthropicModel(model="claude-sonnet-4-20250514") response = await model.complete([Message.user("Hello!")])

Source code in src/locus/models/native/anthropic.py
def __init__(
    self,
    model: str = "claude-sonnet-4-20250514",
    api_key: str | None = None,
    base_url: str | None = None,
    max_tokens: int = 4096,
    temperature: float = 0.7,
    prompt_cache: bool = False,
    **kwargs: Any,
) -> None:
    config = AnthropicConfig(
        model=model,
        api_key=api_key,
        base_url=base_url,
        max_tokens=max_tokens,
        temperature=temperature,
        prompt_cache=prompt_cache,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

Anthropic doesn't ship OpenAI-style response_format.

The agent loop falls back to the prompted-JSON path with post-hoc parsing for Anthropic models.

client property

client: AsyncAnthropic

Get or create the Anthropic client.

Configured with explicit max_retries + timeout so a transient 529 (overloaded) / 5xx / connection reset doesn't kill the agent loop on the first try. Retries use exponential backoff inside the anthropic SDK.

close async

close() -> None

Close the underlying httpx client.

Agent.run_sync calls this in a finally block so the loop-bound httpx connections are shut down inside the same event loop that opened them. Without this, the next asyncio.run invocation closes the prior loop and the leftover client's __del__ later tries to aclose against it, raising RuntimeError: Event loop is closed.

Source code in src/locus/models/native/anthropic.py
async def close(self) -> None:
    """Close the underlying httpx client.

    ``Agent.run_sync`` calls this in a ``finally`` block so the
    loop-bound httpx connections are shut down inside the same
    event loop that opened them. Without this, the next
    ``asyncio.run`` invocation closes the prior loop and the
    leftover client's ``__del__`` later tries to ``aclose`` against
    it, raising ``RuntimeError: Event loop is closed``.
    """
    if self._client is not None:
        try:
            await self._client.close()
        finally:
            self._client = None

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Recognises an OpenAI-style response_format={"type": "json_schema", ...} kwarg and translates it into Anthropic's tool-use mechanism: a synthetic respond_with_schema tool is appended to the call and tool_choice is pinned to it. The tool arguments are then surfaced as the message content (canonical JSON) so callers can parse them with :func:locus.core.structured.parse_structured exactly as they would with native response_format providers.

Source code in src/locus/models/native/anthropic.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """Complete a chat request.

    Recognises an OpenAI-style ``response_format={"type": "json_schema", ...}``
    kwarg and translates it into Anthropic's tool-use mechanism: a synthetic
    ``respond_with_schema`` tool is appended to the call and ``tool_choice``
    is pinned to it. The tool arguments are then surfaced as the message
    content (canonical JSON) so callers can parse them with
    :func:`locus.core.structured.parse_structured` exactly as they would
    with native ``response_format`` providers.
    """
    import json as _json

    system_prompt, anthropic_messages = self._convert_messages(messages)
    anthropic_tools = self._convert_tools(tools) or []

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": anthropic_messages,
        "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
    }
    # Claude Opus 4.7 (and presumably later 4.x reasoning models) reject
    # `temperature` with `invalid_request_error: temperature is deprecated
    # for this model`. Silently drop the param for those models — locus's
    # own agent runtime_loop always passes `temperature=config.temperature`
    # in `complete_kwargs`, so honouring "caller intent" would still 400
    # every Agent(model="claude-opus-4-7") on the first turn. The
    # wrapper's job here is to keep the agent loop running; callers who
    # need the parameter back can pin to a model that accepts it.
    if not _rejects_temperature(self.config.model):
        params["temperature"] = kwargs.get("temperature", self.config.temperature)
    if system_prompt:
        # When prompt-caching is enabled, send the system prompt as a
        # block list with ``cache_control: ephemeral`` so subsequent
        # turns reuse the cached input at ~1/10x cost (Anthropic
        # ephemeral cache TTL is ~5 min).
        if self.config.prompt_cache:
            params["system"] = [
                {
                    "type": "text",
                    "text": system_prompt,
                    "cache_control": {"type": "ephemeral"},
                }
            ]
        else:
            params["system"] = system_prompt

    # Structured-output mode: emulate ``response_format`` via tool-use.
    response_format = kwargs.get("response_format")
    structured_mode = (
        isinstance(response_format, dict) and response_format.get("type") == "json_schema"
    )
    if structured_mode:
        assert isinstance(response_format, dict)  # narrowed by structured_mode
        anthropic_tools.append(self._structured_output_tool(response_format))
        params["tool_choice"] = {
            "type": "tool",
            "name": self._STRUCTURED_TOOL_NAME,
        }

    if anthropic_tools:
        # Cache the tool catalog too — it's typically the same across
        # turns and can be large. Anthropic walks the cache_control
        # markers in order; tagging the last tool covers the catalog.
        if self.config.prompt_cache and anthropic_tools:
            anthropic_tools = [
                *anthropic_tools[:-1],
                {
                    **anthropic_tools[-1],
                    "cache_control": {"type": "ephemeral"},
                },
            ]
        params["tools"] = anthropic_tools

    response = await self.client.messages.create(**params)

    # Parse response
    content: str | None = None
    tool_calls: list[ToolCall] = []
    structured_payload: dict[str, Any] | None = None

    for block in response.content:
        if block.type == "text":
            content = (content or "") + block.text
        elif block.type == "tool_use":
            if structured_mode and block.name == self._STRUCTURED_TOOL_NAME:
                structured_payload = block.input if isinstance(block.input, dict) else {}
                continue
            tool_calls.append(
                ToolCall(
                    id=block.id,
                    name=block.name,
                    arguments=block.input if isinstance(block.input, dict) else {},
                )
            )

    # In structured mode, surface the tool's arguments as the message
    # content so downstream ``parse_structured`` can validate it.
    if structured_mode and structured_payload is not None:
        content = _json.dumps(structured_payload)

    usage: dict[str, int] = {}
    if response.usage:
        usage = {
            "prompt_tokens": response.usage.input_tokens,
            "completion_tokens": response.usage.output_tokens,
        }
        # Anthropic returns these only when prompt caching is in play.
        # Surface them on usage so AgentResult.metrics can show
        # cache hits/misses and cost-saved estimates.
        cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
        cache_read = getattr(response.usage, "cache_read_input_tokens", None)
        if cache_creation is not None:
            usage["cache_creation_input_tokens"] = cache_creation
        if cache_read is not None:
            usage["cache_read_input_tokens"] = cache_read

    return ModelResponse(
        message=Message.assistant(content=content, tool_calls=tool_calls),
        usage=usage,
        stop_reason=response.stop_reason,
    )

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Source code in src/locus/models/native/anthropic.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """Stream a chat response."""
    system_prompt, anthropic_messages = self._convert_messages(messages)
    anthropic_tools = self._convert_tools(tools)

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": anthropic_messages,
        "max_tokens": kwargs.get("max_tokens", self.config.max_tokens),
    }
    if system_prompt:
        params["system"] = system_prompt
    if anthropic_tools:
        params["tools"] = anthropic_tools

    async with self.client.messages.stream(**params) as stream:
        async for text in stream.text_stream:
            yield ModelChunkEvent(content=text)

    yield ModelChunkEvent(done=True)

Ollama

OllamaModel

OllamaModel(model: str = 'llama3.3', base_url: str = 'http://localhost:11434', max_tokens: int = 4096, temperature: float = 0.7, **kwargs: Any)

Bases: BaseModel

Ollama model provider for local LLMs.

Supports any model available in Ollama (Llama, Mistral, Gemma, etc.) with tool calling support.

Example

model = OllamaModel(model="llama3.3") response = await model.complete([Message.user("Hello!")])

Source code in src/locus/models/native/ollama.py
def __init__(
    self,
    model: str = "llama3.3",
    base_url: str = "http://localhost:11434",
    max_tokens: int = 4096,
    temperature: float = 0.7,
    **kwargs: Any,
) -> None:
    config = OllamaConfig(
        model=model,
        base_url=base_url,
        max_tokens=max_tokens,
        temperature=temperature,
        **kwargs,
    )
    super().__init__(config=config)

supports_structured_output property

supports_structured_output: bool

Ollama doesn't yet ship OpenAI-style response_format.

The agent loop falls back to the prompted-JSON path with post-hoc parsing for Ollama models.

client property

client: Any

Get or create the Ollama async client.

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Source code in src/locus/models/native/ollama.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """Complete a chat request."""
    ollama_messages = self._convert_messages(messages)
    ollama_tools = self._convert_tools(tools)

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": ollama_messages,
        "options": {
            "temperature": kwargs.get("temperature", self.config.temperature),
            "num_predict": kwargs.get("max_tokens", self.config.max_tokens),
        },
    }
    if ollama_tools:
        params["tools"] = ollama_tools

    response = await self.client.chat(**params)

    # Parse response — ollama returns Message object, not dict
    msg = response.get("message") or response
    if hasattr(msg, "content"):
        # ollama Message object
        content = msg.content
    else:
        content = msg.get("content") if isinstance(msg, dict) else str(msg)
    tool_calls: list[ToolCall] = []

    raw_tool_calls = (
        getattr(msg, "tool_calls", None)
        or (msg.get("tool_calls") if isinstance(msg, dict) else None)
        or []
    )
    for tc in raw_tool_calls:
        func = tc.get("function", {})
        args = func.get("arguments", {})
        if isinstance(args, str):
            try:
                args = json.loads(args)
            except json.JSONDecodeError:
                args = {}
        tool_calls.append(
            ToolCall(
                id=f"call_{func.get('name', 'unknown')}",
                name=func.get("name", "unknown"),
                arguments=args if isinstance(args, dict) else {},
            )
        )

    usage = {}
    prompt_tokens = (
        getattr(response, "prompt_eval_count", None) or response.get("prompt_eval_count")
        if isinstance(response, dict)
        else None
    )
    if prompt_tokens:
        eval_count = getattr(response, "eval_count", None) or (
            response.get("eval_count") if isinstance(response, dict) else 0
        )
        usage = {"prompt_tokens": prompt_tokens, "completion_tokens": eval_count or 0}

    done = getattr(response, "done", None) or (
        response.get("done") if isinstance(response, dict) else None
    )

    return ModelResponse(
        message=Message.assistant(content=content, tool_calls=tool_calls),
        usage=usage,
        stop_reason="stop" if done else None,
    )

stream async

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Source code in src/locus/models/native/ollama.py
async def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """Stream a chat response."""
    ollama_messages = self._convert_messages(messages)

    params: dict[str, Any] = {
        "model": self.config.model,
        "messages": ollama_messages,
        "options": {
            "temperature": kwargs.get("temperature", self.config.temperature),
        },
    }

    response = await self.client.chat(**params, stream=True)

    async for chunk in response:
        msg = chunk.get("message", {})
        content = msg.get("content", "")
        if content:
            yield ModelChunkEvent(content=content)
        if chunk.get("done"):
            yield ModelChunkEvent(done=True)
            return

    yield ModelChunkEvent(done=True)