Skip to content

API reference

Autogenerated from docstrings.

Safety

lazytools.safety.Allowlist

Allowlist(allowed: Iterable[object] | None)

Case-insensitive, string-normalized target allow-list.

None means "no allow-list configured" → permits everything. An empty iterable means "deny everything".

Source code in src/lazytools/safety/allowlist.py
def __init__(self, allowed: Iterable[object] | None) -> None:
    self._allowed = None if allowed is None else {str(a).lower() for a in allowed}

lazytools.safety.ConfirmationGate

ConfirmationGate(*, enabled: bool = True)

One-shot, target-bound confirmation grants for dangerous actions.

Not a sticky boolean: each grant authorizes exactly one action, so an approved single message can never silently authorize a flood. Grants are matched from most to least specific: a target+scope grant before a target-only one, then an any-target+scope grant before an any-target one. A scope-bound grant is never spendable when no scope (None) is supplied at consume time. No process-global mutable state — grants live on the instance.

The gate is scope-agnostic: the caller decides what scope means (in LazyPulse it is the running task id, read from :func:lazytools.safety.current_scope) and passes it in. This is what keeps the safety layer free of any orchestration dependency.

Source code in src/lazytools/safety/gates.py
def __init__(self, *, enabled: bool = True) -> None:
    self._enabled = enabled
    # Keys are ``(target, scope)`` where target is a lowercased string or
    # ``_ANY`` and scope is an opaque binding (the task id) or ``None``.
    self._grants: dict[tuple[str, str | None], int] = {}

grant

grant(target: object, *, scope: str | None = None) -> None

Authorize exactly one action to target (the tighter grant).

Source code in src/lazytools/safety/gates.py
def grant(self, target: object, *, scope: str | None = None) -> None:
    """Authorize exactly one action to ``target`` (the tighter grant)."""
    self._add((str(target).lower(), scope))

grant_any

grant_any(*, scope: str | None = None) -> None

Authorize exactly one action to any target (subject to allow-list).

Source code in src/lazytools/safety/gates.py
def grant_any(self, *, scope: str | None = None) -> None:
    """Authorize exactly one action to any target (subject to allow-list)."""
    self._add((_ANY, scope))

consume

consume(target: object, *, scope: str | None = None) -> bool

Spend one matching grant for target in scope; True if found.

Returns True immediately when the gate is disabled. A scope-bound grant is only matched when the same scope is supplied here.

Source code in src/lazytools/safety/gates.py
def consume(self, target: object, *, scope: str | None = None) -> bool:
    """Spend one matching grant for ``target`` in ``scope``; ``True`` if found.

    Returns ``True`` immediately when the gate is disabled. A scope-bound
    grant is only matched when the same ``scope`` is supplied here.
    """
    if not self._enabled:
        return True
    target_l = str(target).lower()
    candidates: list[tuple[str, str | None]] = []
    if scope is not None:
        candidates.append((target_l, scope))
    candidates.append((target_l, None))
    if scope is not None:
        candidates.append((_ANY, scope))
    candidates.append((_ANY, None))
    for key in candidates:
        if self._grants.get(key, 0) > 0:
            self._grants[key] -= 1
            return True
    return False

lazytools.safety.ActionBlocked

Bases: PermissionError

Base for dangerous-action denials (allow-list / confirmation).

Subclasses PermissionError so existing except PermissionError handlers keep working. Carries an audit-friendly message that names the action and the reason and never leaks secrets.

lazytools.safety.current_scope

current_scope() -> str | None

Return the ambient scope of the current run, if any.

Source code in src/lazytools/safety/context.py
def current_scope() -> str | None:
    """Return the ambient scope of the current run, if any."""
    return active_scope.get()

Gmail

lazytools.connectors.gmail.GmailTools

GmailTools(client: GmailService, *, allowed_recipients: list[str] | None = None, require_confirmation: bool = True)

A ToolProvider wrapping a :class:GmailService for the worker.

Exposes four tools: gmail_list_emails, gmail_get_email, gmail_create_draft, and gmail_send.

The underlying :class:~lazytools.connectors.gmail.client.GmailClient is thread-safe (serialises calls through an internal lock), so all four tools are safe to invoke from concurrent PulseAgent task workers.

Source code in src/lazytools/connectors/gmail/tools.py
def __init__(
    self,
    client: GmailService,
    *,
    allowed_recipients: list[str] | None = None,
    require_confirmation: bool = True,
) -> None:
    self._client = client
    self._allowlist = Allowlist(allowed_recipients)
    self._gate = ConfirmationGate(enabled=require_confirmation)

require_confirmation property

require_confirmation: bool

Whether a send needs an outstanding confirmation (public attribute).

confirm_once

confirm_once(*, task_id: str | None = None) -> None

Authorize exactly one send to any recipient (subject to the allow-list). Call once per approved message. Pass task_id= to bind the grant to a single task so a concurrent task cannot consume it.

Source code in src/lazytools/connectors/gmail/tools.py
def confirm_once(self, *, task_id: str | None = None) -> None:
    """Authorize exactly one send to any recipient (subject to the
    allow-list). Call once per approved message. Pass ``task_id=`` to bind
    the grant to a single task so a concurrent task cannot consume it."""
    self._gate.grant_any(scope=task_id)

confirm_send

confirm_send(*, to: str, task_id: str | None = None) -> None

Authorize exactly one send to a specific recipient — the tighter, preferred grant. Pass task_id= to also bind it to a single task.

Source code in src/lazytools/connectors/gmail/tools.py
def confirm_send(self, *, to: str, task_id: str | None = None) -> None:
    """Authorize exactly one send to a specific recipient — the tighter,
    preferred grant. Pass ``task_id=`` to also bind it to a single task."""
    self._gate.grant(to, scope=task_id)

lazytools.connectors.gmail.GmailClient

GmailClient(service: Any)

Production :class:GmailService backed by googleapiclient.

All methods acquire a per-instance lock before touching the underlying googleapiclient resource so the client is safe to call from multiple threads concurrently (e.g. parallel PulseAgent task workers).

Source code in src/lazytools/connectors/gmail/client.py
def __init__(self, service: Any) -> None:
    # ``service`` is a googleapiclient Resource (or any object exposing
    # the same ``users().messages()`` shape).
    self._service = service
    self._lock = threading.Lock()

from_credentials classmethod

from_credentials(*, credentials_path: str, token_path: str, scopes: list[str]) -> GmailClient

Build a client from an OAuth client-secret + cached token file.

Imports the Google libraries lazily; raises a friendly ImportError if the gmail extra is not installed.

Source code in src/lazytools/connectors/gmail/client.py
@classmethod
def from_credentials(
    cls,
    *,
    credentials_path: str,
    token_path: str,
    scopes: list[str],
) -> GmailClient:
    """Build a client from an OAuth client-secret + cached token file.

    Imports the Google libraries lazily; raises a friendly
    ``ImportError`` if the ``gmail`` extra is not installed.
    """
    try:
        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials
        from google_auth_oauthlib.flow import InstalledAppFlow
        from googleapiclient.discovery import build
    except ImportError as exc:  # pragma: no cover — exercised only without the extra
        raise ImportError(
            "GmailClient.from_credentials requires the 'gmail' extra. "
            "Install it with: pip install 'lazytoolkit[gmail]'"
        ) from exc

    import os

    creds = None
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, scopes)
    if creds is None or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(credentials_path, scopes)
            creds = flow.run_local_server(port=0)
        with open(token_path, "w") as fh:
            fh.write(creds.to_json())
    # The cached token holds a long-lived OAuth refresh token; a
    # world-readable file (default umask often yields 0644) would let any
    # local user steal it.  Tighten to owner-only whenever the token file
    # exists — this also covers a still-valid token written by an older
    # version with loose permissions, where the rewrite branch above is
    # skipped entirely.
    if os.path.exists(token_path):
        try:
            os.chmod(token_path, 0o600)
        except OSError:  # pragma: no cover — e.g. unusual filesystems
            pass
    service = build("gmail", "v1", credentials=creds, cache_discovery=False)
    return cls(service)

lazytools.connectors.gmail.parse_authentication_results

parse_authentication_results(header: str | None, *, trusted_authserv_id: str | None = None) -> dict[str, bool]

Return {"dkim": bool, "spf": bool, "dmarc": bool}.

True means an authoritative result token for the method was pass. A missing or empty header yields all-False.

When trusted_authserv_id is set (e.g. "mx.google.com"), the header is accepted only if its leading authserv-id is exactly that value. A forged header with a different authserv-id (or no authserv-id at all) is rejected as all-False. The match is exact rather than a prefix, so neither evil-mx.google.com nor mx.google.com.evil.com is accepted. The caller is responsible for passing the first / top-most Authentication-Results header from the message — the one prepended by the receiving MTA — rather than a later one.

Source code in src/lazytools/connectors/gmail/auth.py
def parse_authentication_results(
    header: str | None,
    *,
    trusted_authserv_id: str | None = None,
) -> dict[str, bool]:
    """Return ``{"dkim": bool, "spf": bool, "dmarc": bool}``.

    ``True`` means an authoritative result token for the method was ``pass``.
    A missing or empty header yields all-``False``.

    When ``trusted_authserv_id`` is set (e.g. ``"mx.google.com"``), the
    header is accepted **only** if its leading authserv-id is *exactly* that
    value. A forged header with a different authserv-id (or no authserv-id at
    all) is rejected as all-``False``. The match is exact rather than a
    prefix, so neither ``evil-mx.google.com`` nor ``mx.google.com.evil.com``
    is accepted. The caller is responsible for passing the first / top-most
    ``Authentication-Results`` header from the message — the one prepended by
    the receiving MTA — rather than a later one.
    """
    result = {m: False for m in _METHODS}
    if not header:
        return result

    if trusted_authserv_id is not None:
        authserv_id = _extract_authserv_id(header)
        if authserv_id != trusted_authserv_id.lower():
            return result  # authserv-id absent or does not match exactly: reject

    # Collapse comments (a few passes handles the rare nested case).
    cleaned = header
    for _ in range(5):
        stripped = _COMMENT_RE.sub(" ", cleaned)
        if stripped == cleaned:
            break
        cleaned = stripped

    for method, pattern in _RESULT_RE.items():
        # After comment-stripping + anchoring, every match is a genuine result
        # token. A message may carry several (multiple DKIM signatures); one
        # authoritative ``pass`` is enough, matching standard DKIM semantics.
        for match in pattern.finditer(cleaned):
            if match.group(1).lower() == "pass":
                result[method] = True
                break
    return result

Telegram

lazytools.connectors.telegram.TelegramTools

TelegramTools(client: TelegramService, *, allowed_chat_ids: list[int | str] | None = None, require_confirmation: bool = True)

A ToolProvider wrapping a :class:TelegramService for the worker.

Source code in src/lazytools/connectors/telegram/tools.py
def __init__(
    self,
    client: TelegramService,
    *,
    allowed_chat_ids: list[int | str] | None = None,
    require_confirmation: bool = True,
) -> None:
    self._client = client
    self._allowlist = Allowlist(allowed_chat_ids)
    self._gate = ConfirmationGate(enabled=require_confirmation)

require_confirmation property

require_confirmation: bool

Whether a send needs an outstanding confirmation (public attribute).

confirm_once

confirm_once(*, task_id: str | None = None) -> None

Authorize exactly one send to any chat (subject to the allow-list). Pass task_id= to bind the grant to a single task so a concurrent task cannot consume it.

Source code in src/lazytools/connectors/telegram/tools.py
def confirm_once(self, *, task_id: str | None = None) -> None:
    """Authorize exactly one send to any chat (subject to the allow-list).
    Pass ``task_id=`` to bind the grant to a single task so a concurrent
    task cannot consume it."""
    self._gate.grant_any(scope=task_id)

confirm_send

confirm_send(*, chat_id: int | str, task_id: str | None = None) -> None

Authorize exactly one send to a specific chat — the tighter grant. Pass task_id= to also bind it to a single task.

Source code in src/lazytools/connectors/telegram/tools.py
def confirm_send(self, *, chat_id: int | str, task_id: str | None = None) -> None:
    """Authorize exactly one send to a specific chat — the tighter grant.
    Pass ``task_id=`` to also bind it to a single task."""
    self._gate.grant(chat_id, scope=task_id)

lazytools.connectors.telegram.TelegramClient

TelegramClient(token: str, *, http: Any | None = None, base_url: str = 'https://api.telegram.org')

Production :class:TelegramService backed by the Bot API over HTTPS.

Source code in src/lazytools/connectors/telegram/client.py
def __init__(self, token: str, *, http: Any | None = None, base_url: str = "https://api.telegram.org") -> None:
    # ``http`` is an ``httpx.Client`` (or any object exposing
    # ``post(url, json=...) -> response`` with ``raise_for_status`` + ``json``).
    self._token = token
    self._base = f"{base_url}/bot{token}"
    self._http = http

from_token classmethod

from_token(token: str, *, timeout: float = 30.0) -> TelegramClient

Build a client from a bot token (obtained from @BotFather).

Imports httpx lazily; raises a friendly ImportError if the telegram extra is not installed.

Source code in src/lazytools/connectors/telegram/client.py
@classmethod
def from_token(cls, token: str, *, timeout: float = 30.0) -> TelegramClient:
    """Build a client from a bot token (obtained from @BotFather).

    Imports ``httpx`` lazily; raises a friendly ``ImportError`` if the
    ``telegram`` extra is not installed.
    """
    try:
        import httpx
    except ImportError as exc:  # pragma: no cover — exercised only without the extra
        raise ImportError(
            "TelegramClient.from_token requires the 'telegram' extra. "
            "Install it with: pip install 'lazytoolkit[telegram]'"
        ) from exc
    return cls(token, http=httpx.Client(timeout=timeout))

MCP

lazytools.connectors.mcp.MCP

Public factory for :class:MCPServer instances.

stdio classmethod

stdio(name: str, *, command: str, args: list[str] | None = None, env: dict[str, str] | None = None, namespace: bool = True, prefix: str | None = None, allow: Iterable[str] | None = None, deny: Iterable[str] | None = None, cache_tools_ttl: float | None = MCPServer._DEFAULT_CACHE_TTL) -> MCPServer

Build an MCP server bound to a stdio (subprocess) transport.

allow= (or deny=) is required — same deny-by-default posture as :meth:http. A warn-and-proceed default would expose every advertised tool to the LLM silently; that's a non-trivial blast radius for filesystem / git / shell MCP servers, so we fail at construction instead. Pass allow=["*"] to opt every advertised tool in explicitly after auditing the surface.

Source code in src/lazytools/connectors/mcp/server.py
@classmethod
def stdio(
    cls,
    name: str,
    *,
    command: str,
    args: list[str] | None = None,
    env: dict[str, str] | None = None,
    namespace: bool = True,
    prefix: str | None = None,
    allow: Iterable[str] | None = None,
    deny: Iterable[str] | None = None,
    cache_tools_ttl: float | None = MCPServer._DEFAULT_CACHE_TTL,
) -> MCPServer:
    """Build an MCP server bound to a stdio (subprocess) transport.

    ``allow=`` (or ``deny=``) is **required** — same deny-by-default
    posture as :meth:`http`.  A warn-and-proceed default would expose
    every advertised tool to the LLM silently; that's a non-trivial
    blast radius for filesystem / git / shell MCP servers, so we fail
    at construction instead.  Pass ``allow=["*"]`` to opt every
    advertised tool in explicitly after auditing the surface.
    """
    if allow is None and deny is None:
        raise ValueError(
            f"MCP.stdio({name!r}, command={command!r}) requires an explicit\n"
            f"  allow=[...] or deny=[...] filter (deny-by-default).\n"
            f"  Pass allow=['*'] to opt every advertised tool in after auditing\n"
            f"  the surface, or pass an explicit allow / deny list of fnmatch\n"
            f"  globs (e.g. allow=['fs.read_*', 'fs.list_*']).\n"
            f"  A warn-and-proceed default would be unsafe for filesystem /\n"
            f"  git / shell MCP servers — the LLM could invoke any tool the\n"
            f"  subprocess advertised."
        )
    from lazytools.connectors.mcp.transports import StdioTransport

    return MCPServer(
        name,
        transport=StdioTransport(command, args=args, env=env),
        namespace=namespace,
        prefix=prefix,
        allow=allow,
        deny=deny,
        cache_tools_ttl=cache_tools_ttl,
    )

http classmethod

http(name: str, url: str, *, headers: dict[str, str] | None = None, namespace: bool = True, prefix: str | None = None, allow: Iterable[str] | None = None, deny: Iterable[str] | None = None, cache_tools_ttl: float | None = MCPServer._DEFAULT_CACHE_TTL) -> MCPServer

Build an MCP server bound to a Streamable HTTP transport.

allow= is required. Omitting it raises ValueError because a remote server could advertise any number of tools and silently exposing them all to the LLM is a security mistake. Pass an explicit list of the tools you want to expose::

MCP.http("github", url, allow=["create_issue", "list_prs"])

To permit all tools advertised by a server you fully control::

MCP.http("internal", url, allow=["*"])
Source code in src/lazytools/connectors/mcp/server.py
@classmethod
def http(
    cls,
    name: str,
    url: str,
    *,
    headers: dict[str, str] | None = None,
    namespace: bool = True,
    prefix: str | None = None,
    allow: Iterable[str] | None = None,
    deny: Iterable[str] | None = None,
    cache_tools_ttl: float | None = MCPServer._DEFAULT_CACHE_TTL,
) -> MCPServer:
    """Build an MCP server bound to a Streamable HTTP transport.

    ``allow=`` is **required**. Omitting it raises ``ValueError`` because
    a remote server could advertise any number of tools and silently
    exposing them all to the LLM is a security mistake. Pass an explicit
    list of the tools you want to expose::

        MCP.http("github", url, allow=["create_issue", "list_prs"])

    To permit all tools advertised by a server you fully control::

        MCP.http("internal", url, allow=["*"])
    """
    if allow is None:
        raise ValueError(
            f"MCP.http({name!r}, {url!r}) requires an explicit allow= list. "
            f"Every tool the remote server advertises would otherwise be exposed to the LLM. "
            f"Pass allow=['tool_a', 'tool_b'] to restrict the tool surface, "
            f"or allow=['*'] to permit everything and silence this error."
        )
    from lazytools.connectors.mcp.transports import HttpTransport

    return MCPServer(
        name,
        transport=HttpTransport(url, headers=headers),
        namespace=namespace,
        prefix=prefix,
        allow=allow,
        deny=deny,
        cache_tools_ttl=cache_tools_ttl,
    )

from_transport classmethod

from_transport(name: str, transport: _Transport, *, namespace: bool = True, prefix: str | None = None, allow: Iterable[str] | None = None, deny: Iterable[str] | None = None, cache_tools_ttl: float | None = MCPServer._DEFAULT_CACHE_TTL) -> MCPServer

Build an MCP server from a custom :class:_Transport.

Useful for tests (in-process fake transport) or for adapters to non-standard MCP variants. The transport must implement the abstract :class:_Transport interface.

Source code in src/lazytools/connectors/mcp/server.py
@classmethod
def from_transport(
    cls,
    name: str,
    transport: _Transport,
    *,
    namespace: bool = True,
    prefix: str | None = None,
    allow: Iterable[str] | None = None,
    deny: Iterable[str] | None = None,
    cache_tools_ttl: float | None = MCPServer._DEFAULT_CACHE_TTL,
) -> MCPServer:
    """Build an MCP server from a custom :class:`_Transport`.

    Useful for tests (in-process fake transport) or for adapters to
    non-standard MCP variants. The transport must implement the
    abstract :class:`_Transport` interface.
    """
    return MCPServer(
        name,
        transport=transport,
        namespace=namespace,
        prefix=prefix,
        allow=allow,
        deny=deny,
        cache_tools_ttl=cache_tools_ttl,
    )

lazytools.connectors.mcp.MCPServer

MCPServer(name: str, transport: _Transport, *, namespace: bool = True, prefix: str | None = None, allow: Iterable[str] | None = None, deny: Iterable[str] | None = None, cache_tools_ttl: float | None = _DEFAULT_CACHE_TTL)

A tool provider backed by an MCP server.

Add it directly to Agent(tools=[...]); the framework calls :meth:as_tools to expand it into individual :class:Tool entries. Tool names are namespaced as "<server-name>.<mcp-tool-name>" by default; pass namespace=False to keep the raw names, or prefix="..." to override.

The transport connects lazily on first :meth:as_tools. For explicit cleanup, use the server as an async context manager::

async with MCP.stdio("fs", command="...", args=[...]) as fs:
    agent = Agent("claude-opus-4-8", tools=[fs])
    await agent.run("...")

Without that, the transport stays open for the process lifetime; the underlying subprocess is normally cleaned up when the parent exits.

Closure is terminal. Once :meth:aclose (or the async with block) finishes, the server is single-shot: a subsequent :meth:aconnect / :meth:as_tools raises RuntimeError. Construct a new MCPServer if you need to re-use the same transport configuration.

Source code in src/lazytools/connectors/mcp/server.py
def __init__(
    self,
    name: str,
    transport: _Transport,
    *,
    namespace: bool = True,
    prefix: str | None = None,
    allow: Iterable[str] | None = None,
    deny: Iterable[str] | None = None,
    cache_tools_ttl: float | None = _DEFAULT_CACHE_TTL,
) -> None:
    self.name = name
    self._transport = transport
    self._namespace = namespace
    if prefix is not None:
        self._prefix = prefix
    else:
        self._prefix = f"{name}." if namespace else ""
    self._allow = list(allow) if allow else None
    self._deny = list(deny) if deny else None

    if cache_tools_ttl is not None and cache_tools_ttl <= 0:
        raise ValueError(f"cache_tools_ttl must be > 0 or None, got {cache_tools_ttl!r}")
    self._cache_ttl: float | None = cache_tools_ttl
    self._tools_cache: list[Tool] | None = None
    self._tools_cache_ts: float = 0.0
    self._connected = False
    self._closed = False
    # Lazy-init the asyncio.Lock on first async use.  Constructing it
    # inside ``__init__`` (a sync context) couples to whatever event
    # loop happens to be running at instantiation time and warns /
    # raises on Python ≥3.12 when there is none.  Deferring is safe
    # because the lock only ever guards async coroutines.
    self._lock: asyncio.Lock | None = None

aconnect async

aconnect() -> None

Connect the underlying transport. Idempotent.

Source code in src/lazytools/connectors/mcp/server.py
async def aconnect(self) -> None:
    """Connect the underlying transport. Idempotent."""
    async with self._get_lock():
        if not self._connected:
            if self._closed:
                raise RuntimeError(f"MCPServer {self.name!r} is closed and cannot be reused")
            await self._transport.connect()
            self._connected = True

alist_tools async

alist_tools() -> list[Tool]

Discover and wrap the server's tools.

Cached for cache_tools_ttl seconds (default 60 s). Once the cache expires the next call re-fetches from the upstream transport so an MCP server that hot-loads or unloads tools is eventually reflected in the agent's tool list. Pass cache_tools_ttl=None to disable expiry entirely and :meth:invalidate_tools_cache to flush explicitly.

Source code in src/lazytools/connectors/mcp/server.py
async def alist_tools(self) -> list[Tool]:
    """Discover and wrap the server's tools.

    Cached for ``cache_tools_ttl`` seconds (default 60 s).  Once the
    cache expires the next call re-fetches from the upstream
    transport so an MCP server that hot-loads or unloads tools is
    eventually reflected in the agent's tool list.
    Pass ``cache_tools_ttl=None`` to disable expiry entirely and
    :meth:`invalidate_tools_cache` to flush explicitly.
    """
    await self.aconnect()
    # Guard the cache check + refill under the lock so two concurrent
    # callers can't both observe a miss and issue duplicate
    # ``list_tools()`` round-trips (and clobber each other's cache).
    # ``aconnect`` already released the lock above (asyncio.Lock is not
    # reentrant), so acquiring it here is safe.
    async with self._get_lock():
        now = time.monotonic()
        if self._tools_cache is not None and (
            self._cache_ttl is None or (now - self._tools_cache_ts) < self._cache_ttl
        ):
            return self._tools_cache
        mcp_tools = await self._transport.list_tools()
        wrapped = [self._wrap_tool(t) for t in mcp_tools]
        self._tools_cache = self._filter(wrapped)
        self._tools_cache_ts = now
        return self._tools_cache

invalidate_tools_cache

invalidate_tools_cache() -> None

Drop the cached tool list so the next call re-fetches.

Use this when an out-of-band signal tells you the MCP server's tool registry has changed (plugin install / uninstall, hot reload). No-op when nothing is cached yet.

Source code in src/lazytools/connectors/mcp/server.py
def invalidate_tools_cache(self) -> None:
    """Drop the cached tool list so the next call re-fetches.

    Use this when an out-of-band signal tells you the MCP server's
    tool registry has changed (plugin install / uninstall, hot
    reload).  No-op when nothing is cached yet.
    """
    self._tools_cache = None
    self._tools_cache_ts = 0.0

aclose async

aclose() -> None

Close the underlying transport. Idempotent.

Source code in src/lazytools/connectors/mcp/server.py
async def aclose(self) -> None:
    """Close the underlying transport. Idempotent."""
    async with self._get_lock():
        if self._connected and not self._closed:
            try:
                await self._transport.close()
            finally:
                self._connected = False
                self._closed = True

as_tools

as_tools() -> list[Tool]

Sync wrapper around :meth:alist_tools. Called by build_tool_map.

Triggers a lazy connect on first use. If the call happens inside an already-running event loop, the work is dispatched to a worker thread (mirrors :meth:lazybridge.Tool.run_sync).

Source code in src/lazytools/connectors/mcp/server.py
def as_tools(self) -> list[Tool]:
    """Sync wrapper around :meth:`alist_tools`. Called by ``build_tool_map``.

    Triggers a lazy connect on first use. If the call happens inside
    an already-running event loop, the work is dispatched to a worker
    thread (mirrors :meth:`lazybridge.Tool.run_sync`).
    """
    from lazytools.connectors.mcp.transports import _run_sync

    return _run_sync(self.alist_tools())

Code Support Agent

lazytools.connectors.code_support.claude_code

claude_code(task: str, *, mode: str = 'read', cwd: str | None = None, session_id: str | None = None, timeout: float = 300.0) -> str

Delegate a task to Claude Code CLI and return the result as a string.

Parameters

task: Instruction for Claude Code. mode: "read" (default) — read-only analysis (Read, Bash, Grep, Glob). "write" — may edit files (acceptEdits permission mode). "plan" — plan mode, no file modifications. cwd: Working directory for the subprocess. session_id: If given, resumes an existing Claude Code session via --resume. timeout: Maximum seconds for the subprocess. Set tool_timeout=None on LLMEngine so the engine never cancels before the subprocess finishes (zombie-process hazard when engine fires first).

Notes

Auth is left to the Claude Code CLI itself: it reads its own on-disk login (~/.claude/.credentials.json), and the subprocess inherits the current environment, so CLAUDE_CODE_OAUTH_TOKEN (from claude setup-token) or ANTHROPIC_API_KEY are honoured if set. We do not synthesize CLAUDE_CODE_OAUTH_TOKEN from the credentials file — that env var is a token string, not the JSON store, and overriding it would break a valid disk login.

Source code in src/lazytools/connectors/code_support/_claude_code.py
def claude_code(
    task: str,
    *,
    mode: str = "read",
    cwd: str | None = None,
    session_id: str | None = None,
    timeout: float = 300.0,
) -> str:
    """Delegate a task to Claude Code CLI and return the result as a string.

    Parameters
    ----------
    task:
        Instruction for Claude Code.
    mode:
        ``"read"`` (default) — read-only analysis (Read, Bash, Grep, Glob).
        ``"write"`` — may edit files (acceptEdits permission mode).
        ``"plan"`` — plan mode, no file modifications.
    cwd:
        Working directory for the subprocess.
    session_id:
        If given, resumes an existing Claude Code session via ``--resume``.
    timeout:
        Maximum seconds for the subprocess. Set ``tool_timeout=None`` on
        ``LLMEngine`` so the engine never cancels before the subprocess
        finishes (zombie-process hazard when engine fires first).

    Notes
    -----
    Auth is left to the Claude Code CLI itself: it reads its own on-disk
    login (``~/.claude/.credentials.json``), and the subprocess inherits the
    current environment, so ``CLAUDE_CODE_OAUTH_TOKEN`` (from
    ``claude setup-token``) or ``ANTHROPIC_API_KEY`` are honoured if set. We do
    not synthesize ``CLAUDE_CODE_OAUTH_TOKEN`` from the credentials file — that
    env var is a token *string*, not the JSON store, and overriding it would
    break a valid disk login.
    """
    if mode not in _TOOL_FLAGS:
        return f"[claude_code] invalid mode={mode!r}. Use 'read', 'write', or 'plan'."

    cmd = ["claude", "-p", task, "--output-format", "json", *_TOOL_FLAGS[mode]]
    if session_id:
        cmd += ["--resume", session_id]

    try:
        proc = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=cwd,
        )
    except subprocess.TimeoutExpired:
        return f"[claude_code] timeout after {timeout}s"
    except FileNotFoundError:
        return "[claude_code] CLI 'claude' not found in PATH — install Claude Code first"

    if proc.returncode != 0:
        stderr = proc.stderr.strip()
        _log.error("claude_code exit %d: %s", proc.returncode, stderr)
        return f"[claude_code] error (exit {proc.returncode}): {stderr[:500]}"

    try:
        data = json.loads(proc.stdout)
        if data.get("subtype") == "error":
            return f"[claude_code] {data.get('result', 'unknown error')}"
        return data.get("result", "")
    except json.JSONDecodeError:
        return proc.stdout.strip()

lazytools.connectors.code_support.claude_code_mcp

claude_code_mcp(*, name: str = 'claude_code', allow: Iterable[str] | None = None, deny: Iterable[str] | None = None, args: list[str] | None = None, env: dict[str, str] | None = None, namespace: bool = True, prefix: str | None = None, cache_tools_ttl: float | None = 60.0) -> MCPServer

Claude Code as an MCP server (claude mcp serve).

Returns an :class:~lazytools.connectors.mcp.MCPServer exposing Claude Code's own tools (View, Edit, LS, Bash, …) over stdio. Drop it straight into Agent(tools=[claude_code_mcp(allow=["*"])]).

allow= (or deny=) is required — deny-by-default, the same posture as :meth:MCP.stdio. The patterns match the namespaced tool names, e.g. allow=["claude_code.View", "claude_code.LS"] (or allow=["*"] after auditing the surface). Tool names are not hardcoded here because they are owned by the Claude Code version you have installed; discover them by running with allow=["*"] once and inspecting the map.

Parameters

name: Server name and default namespace prefix ("claude_code"). allow / deny: fnmatch globs against the namespaced tool name (deny-by-default). args: Extra args appended after mcp serve (rarely needed). env: Extra environment for the subprocess. Auth is otherwise inherited from the parent environment / the CLI's own on-disk login. namespace / prefix / cache_tools_ttl: Forwarded to :meth:MCP.stdio unchanged.

Requires the mcp extra: pip install lazytoolkit[mcp].

Source code in src/lazytools/connectors/code_support/_claude_code.py
def claude_code_mcp(
    *,
    name: str = "claude_code",
    allow: Iterable[str] | None = None,
    deny: Iterable[str] | None = None,
    args: list[str] | None = None,
    env: dict[str, str] | None = None,
    namespace: bool = True,
    prefix: str | None = None,
    cache_tools_ttl: float | None = 60.0,
) -> MCPServer:
    """Claude Code as an MCP server (``claude mcp serve``).

    Returns an :class:`~lazytools.connectors.mcp.MCPServer` exposing Claude
    Code's own tools (View, Edit, LS, Bash, …) over stdio. Drop it straight into
    ``Agent(tools=[claude_code_mcp(allow=["*"])])``.

    ``allow=`` (or ``deny=``) is **required** — deny-by-default, the same
    posture as :meth:`MCP.stdio`. The patterns match the *namespaced* tool
    names, e.g. ``allow=["claude_code.View", "claude_code.LS"]`` (or
    ``allow=["*"]`` after auditing the surface). Tool names are not hardcoded
    here because they are owned by the Claude Code version you have installed;
    discover them by running with ``allow=["*"]`` once and inspecting the map.

    Parameters
    ----------
    name:
        Server name and default namespace prefix (``"claude_code"``).
    allow / deny:
        fnmatch globs against the namespaced tool name (deny-by-default).
    args:
        Extra args appended after ``mcp serve`` (rarely needed).
    env:
        Extra environment for the subprocess. Auth is otherwise inherited
        from the parent environment / the CLI's own on-disk login.
    namespace / prefix / cache_tools_ttl:
        Forwarded to :meth:`MCP.stdio` unchanged.

    Requires the ``mcp`` extra: ``pip install lazytoolkit[mcp]``.
    """
    return MCP.stdio(
        name,
        command="claude",
        args=["mcp", "serve", *(args or [])],
        env=env,
        allow=allow,
        deny=deny,
        namespace=namespace,
        prefix=prefix,
        cache_tools_ttl=cache_tools_ttl,
    )

lazytools.connectors.code_support.codex

codex(task: str, *, mode: str = 'read', cwd: str | None = None, resume_last: bool = False, timeout: float = 300.0, skip_git_check: bool = True) -> str

Delegate a task to the Codex CLI and return the result as a string.

Parameters

task: Instruction for Codex. mode: "read" (default) — read-only sandbox (-s read-only). "write" — workspace-write sandbox, full-auto (no interactive confirmation prompts). Ideally run inside a git repo. cwd: Working directory for the subprocess. resume_last: If True, continues the most recent Codex session in the working directory via exec resume --last. timeout: Maximum seconds for the subprocess. Set tool_timeout=None on LLMEngine so the engine never cancels before the subprocess finishes (zombie-process hazard when engine fires first). skip_git_check: Pass --skip-git-repo-check. Required outside a git repo. In mode="write" a git repo is recommended for reliable behaviour.

Source code in src/lazytools/connectors/code_support/_codex.py
def codex(
    task: str,
    *,
    mode: str = "read",
    cwd: str | None = None,
    resume_last: bool = False,
    timeout: float = 300.0,
    skip_git_check: bool = True,
) -> str:
    """Delegate a task to the Codex CLI and return the result as a string.

    Parameters
    ----------
    task:
        Instruction for Codex.
    mode:
        ``"read"`` (default) — read-only sandbox (``-s read-only``).
        ``"write"`` — workspace-write sandbox, full-auto (no interactive
        confirmation prompts). Ideally run inside a git repo.
    cwd:
        Working directory for the subprocess.
    resume_last:
        If True, continues the most recent Codex session in the working
        directory via ``exec resume --last``.
    timeout:
        Maximum seconds for the subprocess. Set ``tool_timeout=None`` on
        ``LLMEngine`` so the engine never cancels before the subprocess
        finishes (zombie-process hazard when engine fires first).
    skip_git_check:
        Pass ``--skip-git-repo-check``. Required outside a git repo.
        In ``mode="write"`` a git repo is recommended for reliable behaviour.
    """
    if mode not in _SANDBOX_FLAGS:
        return f"[codex] invalid mode={mode!r}. Use 'read' or 'write'."

    sandbox = _SANDBOX_FLAGS[mode]

    if resume_last:
        cmd = ["codex", "exec", "resume", "--last", task, *sandbox]
    else:
        cmd = ["codex", "exec", task, *sandbox]

    if skip_git_check:
        cmd.append("--skip-git-repo-check")

    try:
        proc = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=cwd,
        )
    except subprocess.TimeoutExpired:
        return f"[codex] timeout after {timeout}s"
    except FileNotFoundError:
        return "[codex] CLI 'codex' not found in PATH — install OpenAI Codex CLI first"

    if proc.returncode != 0:
        stderr = proc.stderr.strip()
        _log.error("codex exit %d: %s", proc.returncode, stderr)
        return f"[codex] error (exit {proc.returncode}): {stderr[:500]}"

    return proc.stdout.strip()

lazytools.connectors.code_support.codex_mcp

codex_mcp(*, name: str = 'codex', allow: Iterable[str] | None = None, deny: Iterable[str] | None = None, args: list[str] | None = None, env: dict[str, str] | None = None, namespace: bool = True, prefix: str | None = None, cache_tools_ttl: float | None = 60.0) -> MCPServer

Codex as an MCP server (codex mcp-server).

Returns an :class:~lazytools.connectors.mcp.MCPServer exposing Codex's MCP interface over stdio. Drop it into Agent(tools=[codex_mcp(allow=["*"])]).

Warning

Codex's MCP-server interface is experimental (per OpenAI's docs) and may change without notice. Pin your Codex version if you depend on the exposed tool shape.

allow= (or deny=) is required — deny-by-default. Patterns match the namespaced tool name (codex.*). Tool names are not hardcoded because they depend on the installed Codex version; discover them with allow=["*"] once.

Parameters

name: Server name and default namespace prefix ("codex"). allow / deny: fnmatch globs against the namespaced tool name (deny-by-default). args: Extra args appended after mcp-server (rarely needed). env: Extra environment for the subprocess. Auth is otherwise inherited from codex login / the current shell environment. namespace / prefix / cache_tools_ttl: Forwarded to :meth:MCP.stdio unchanged.

Requires the mcp extra: pip install lazytoolkit[mcp].

Source code in src/lazytools/connectors/code_support/_codex.py
def codex_mcp(
    *,
    name: str = "codex",
    allow: Iterable[str] | None = None,
    deny: Iterable[str] | None = None,
    args: list[str] | None = None,
    env: dict[str, str] | None = None,
    namespace: bool = True,
    prefix: str | None = None,
    cache_tools_ttl: float | None = 60.0,
) -> MCPServer:
    """Codex as an MCP server (``codex mcp-server``).

    Returns an :class:`~lazytools.connectors.mcp.MCPServer` exposing Codex's
    MCP interface over stdio. Drop it into
    ``Agent(tools=[codex_mcp(allow=["*"])])``.

    !!! warning
        Codex's MCP-server interface is **experimental** (per OpenAI's docs)
        and may change without notice. Pin your Codex version if you depend
        on the exposed tool shape.

    ``allow=`` (or ``deny=``) is **required** — deny-by-default. Patterns match
    the namespaced tool name (``codex.*``). Tool names are not hardcoded because
    they depend on the installed Codex version; discover them with
    ``allow=["*"]`` once.

    Parameters
    ----------
    name:
        Server name and default namespace prefix (``"codex"``).
    allow / deny:
        fnmatch globs against the namespaced tool name (deny-by-default).
    args:
        Extra args appended after ``mcp-server`` (rarely needed).
    env:
        Extra environment for the subprocess. Auth is otherwise inherited
        from ``codex login`` / the current shell environment.
    namespace / prefix / cache_tools_ttl:
        Forwarded to :meth:`MCP.stdio` unchanged.

    Requires the ``mcp`` extra: ``pip install lazytoolkit[mcp]``.
    """
    return MCP.stdio(
        name,
        command="codex",
        args=["mcp-server", *(args or [])],
        env=env,
        allow=allow,
        deny=deny,
        namespace=namespace,
        prefix=prefix,
        cache_tools_ttl=cache_tools_ttl,
    )

lazytools.connectors.code_support.build_cli_collaboration

build_cli_collaboration(*, name: str = 'cli_collaboration', description: str | None = None, claude_model: str = 'claude-opus-4-8', codex_model: str = 'gpt-5.4', synthesizer_model: str = 'claude-opus-4-8', executor_model: str = 'claude-opus-4-8', execute: bool = True) -> Agent

Build the Claude Code + Codex collaboration pipeline as a reusable tool.

Returns a named :class:~lazybridge.Agent (Plan engine) that you drop straight into Agent(tools=[build_cli_collaboration()]) — the same way you pass the :func:claude_code / :func:codex function tools. Because an Agent is a tool in LazyBridge, the whole multi-agent pipeline appears to the parent agent as a single callable taking one task string.

Parameters

name: Tool name the parent agent sees. Must be explicit (used as the tool-map key); defaults to "cli_collaboration". description: Tool description shown to the parent LLM. Defaults to a summary of the pipeline's behaviour. claude_model: Model driving the Claude-Code analyst (step 1). codex_model: Model driving the Codex analyst/critic (step 2). synthesizer_model: Model that merges the two analyses into one plan (step 3). executor_model: Model that implements the plan via claude_code(mode='write') (step 4). execute: When True (default) the pipeline ends by implementing the plan (writes files). When False it stops after synthesis — a read-only "analyse + plan" pipeline that never modifies the codebase.

Notes

The claude_analyst writes its analysis into a shared Memory that the codex_analyst reads via sources=. This is safe because Plan runs steps strictly sequentially — there is no concurrent writer/reader race.

Source code in src/lazytools/connectors/code_support/_collaboration.py
def build_cli_collaboration(
    *,
    name: str = "cli_collaboration",
    description: str | None = None,
    claude_model: str = "claude-opus-4-8",
    codex_model: str = "gpt-5.4",
    synthesizer_model: str = "claude-opus-4-8",
    executor_model: str = "claude-opus-4-8",
    execute: bool = True,
) -> Agent:
    """Build the Claude Code + Codex collaboration pipeline as a reusable tool.

    Returns a named :class:`~lazybridge.Agent` (``Plan`` engine) that you drop
    straight into ``Agent(tools=[build_cli_collaboration()])`` — the same way you
    pass the :func:`claude_code` / :func:`codex` function tools. Because an
    ``Agent`` *is* a tool in LazyBridge, the whole multi-agent pipeline appears
    to the parent agent as a single callable taking one ``task`` string.

    Parameters
    ----------
    name:
        Tool name the parent agent sees. Must be explicit (used as the tool-map
        key); defaults to ``"cli_collaboration"``.
    description:
        Tool description shown to the parent LLM. Defaults to a summary of the
        pipeline's behaviour.
    claude_model:
        Model driving the Claude-Code analyst (step 1).
    codex_model:
        Model driving the Codex analyst/critic (step 2).
    synthesizer_model:
        Model that merges the two analyses into one plan (step 3).
    executor_model:
        Model that implements the plan via ``claude_code(mode='write')`` (step 4).
    execute:
        When ``True`` (default) the pipeline ends by implementing the plan
        (writes files). When ``False`` it stops after synthesis — a read-only
        "analyse + plan" pipeline that never modifies the codebase.

    Notes
    -----
    The ``claude_analyst`` writes its analysis into a shared ``Memory`` that the
    ``codex_analyst`` reads via ``sources=``. This is safe because ``Plan`` runs
    steps strictly sequentially — there is no concurrent writer/reader race.
    """
    # Deferred imports: keep module import stdlib-light (see module docstring).
    from lazybridge import Agent, LLMEngine, Memory, Plan, Step, from_step
    from lazybridge.dedup_guard import DeduplicateGuard

    # Shared dialogue: claude_analyst writes (memory=), codex_analyst reads
    # (sources=). Safe under Plan's sequential execution — no parallel access.
    dialogue = Memory(strategy="summary")

    claude_analyst = Agent(
        name="claude_analyst",
        engine=LLMEngine(
            claude_model,
            tool_timeout=None,
            system=(
                "Analyse the task using claude_code in mode='read'. "
                "Propose a concrete implementation approach. Be concise."
            ),
        ),
        tools=[claude_code],
        memory=dialogue,
        guard=DeduplicateGuard(verbose=False),
    )

    codex_analyst = Agent(
        name="codex_analyst",
        engine=LLMEngine(
            codex_model,
            tool_timeout=None,
            system=(
                "Analyse the task using codex in mode='read'. "
                "Critique or confirm claude_analyst's approach. Be concise."
            ),
        ),
        tools=[codex],
        sources=[dialogue],  # sees claude_analyst's analysis as context
        guard=DeduplicateGuard(verbose=False),
    )

    synthesizer = Agent(
        name="synthesizer",
        engine=LLMEngine(
            synthesizer_model,
            system=(
                "You receive two code analyses (Claude Code and Codex). "
                "Produce a single, concrete, step-by-step implementation plan."
            ),
        ),
    )

    steps = [
        Step("claude_analyst"),
        Step("codex_analyst", context=from_step("claude_analyst")),
        Step("synthesizer", context=from_step("codex_analyst")),
    ]
    # Annotated as the Agent(tools=) element type: a bare list[Agent] is
    # rejected because list is invariant against that wider union.
    tools: list[Any] = [claude_analyst, codex_analyst, synthesizer]

    if execute:
        executor = Agent(
            name="executor",
            engine=LLMEngine(
                executor_model,
                tool_timeout=None,
                system="Implement the plan you receive using claude_code in mode='write'.",
            ),
            tools=[claude_code],
        )
        steps.append(Step("executor", context=from_step("synthesizer")))
        tools.append(executor)

    return Agent(
        name=name,
        description=description or _DEFAULT_DESCRIPTION,
        engine=Plan(*steps),
        tools=tools,
    )

lazytools.connectors.code_support.check_clis_available

check_clis_available() -> dict[str, bool]

Return availability of 'claude' and 'codex' in PATH.

Returns a {"claude": bool, "codex": bool} dict. Call this at startup to surface missing CLIs immediately rather than at the first tool call.

Source code in src/lazytools/connectors/code_support/__init__.py
def check_clis_available() -> dict[str, bool]:
    """Return availability of 'claude' and 'codex' in PATH.

    Returns a ``{"claude": bool, "codex": bool}`` dict. Call this at startup
    to surface missing CLIs immediately rather than at the first tool call.
    """
    return {
        "claude": shutil.which("claude") is not None,
        "codex": shutil.which("codex") is not None,
    }

Gateway

lazytools.connectors.gateway.ExternalToolProvider

ExternalToolProvider(client: ExternalToolClient, *, specs: Iterable[ExternalToolSpec | Mapping[str, Any]] | None = None, include: Iterable[str] | None = None, exclude: Iterable[str] | None = None, name_prefix: str = '', strict: bool | None = None)

Expose an external tool registry as a LazyBridge tool provider.

Agent(tools=[ExternalToolProvider(client)]) expands the provider into normal :class:Tool objects through LazyBridge's existing _is_lazy_tool_provider hook.

Source code in src/lazytools/connectors/gateway/__init__.py
def __init__(
    self,
    client: ExternalToolClient,
    *,
    specs: Iterable[ExternalToolSpec | Mapping[str, Any]] | None = None,
    include: Iterable[str] | None = None,
    exclude: Iterable[str] | None = None,
    name_prefix: str = "",
    strict: bool | None = None,
) -> None:
    self.client = client
    self._specs = list(specs) if specs is not None else None
    self.include = set(include or []) or None
    self.exclude = set(exclude or [])
    self.name_prefix = name_prefix
    self.strict = strict

lazytools.connectors.gateway.JsonHttpExternalToolClient

JsonHttpExternalToolClient(base_url: str, *, api_key: str | None = None, headers: Mapping[str, str] | None = None, timeout: float = 30.0, tools_path: str = '/tools', call_path_template: str = '/tools/{name}/call')

Small stdlib HTTP client for a JSON external-tool gateway.

Default endpoint contract: - GET {base_url}/tools returns either [{...}] or {"tools": [{...}]}. - POST {base_url}/tools/{name}/call with {"arguments": {...}} returns JSON.

This class is intentionally narrow. For Pipedream/Composio/Arcade, wrap their SDK/API behind :class:ExternalToolClient when their HTTP shape differs from the default contract.

Source code in src/lazytools/connectors/gateway/__init__.py
def __init__(
    self,
    base_url: str,
    *,
    api_key: str | None = None,
    headers: Mapping[str, str] | None = None,
    timeout: float = 30.0,
    tools_path: str = "/tools",
    call_path_template: str = "/tools/{name}/call",
) -> None:
    self.base_url = base_url.rstrip("/")
    self.api_key = api_key
    self.headers = dict(headers or {})
    self.timeout = timeout
    self.tools_path = tools_path
    self.call_path_template = call_path_template

lazytools.connectors.gateway.ExternalToolSpec dataclass

ExternalToolSpec(name: str, description: str, parameters: Mapping[str, Any], strict: bool = False)

A remotely hosted tool definition.

parameters is the provider-agnostic JSON Schema object used by LazyBridge providers when advertising tools to an LLM.

from_mapping classmethod

from_mapping(raw: Mapping[str, Any]) -> ExternalToolSpec

Build a spec from common external registry shapes.

Accepted inputs: - {"name", "description", "parameters"} - OpenAI-style {"function": {"name", "description", "parameters"}}

Source code in src/lazytools/connectors/gateway/__init__.py
@classmethod
def from_mapping(cls, raw: Mapping[str, Any]) -> ExternalToolSpec:
    """Build a spec from common external registry shapes.

    Accepted inputs:
    - ``{"name", "description", "parameters"}``
    - OpenAI-style ``{"function": {"name", "description", "parameters"}}``
    """
    data: Mapping[str, Any]
    if isinstance(raw.get("function"), Mapping):
        data = raw["function"]  # type: ignore[index]
    else:
        data = raw

    name = data.get("name")
    if not isinstance(name, str) or not name:
        raise ValueError("External tool spec must include a non-empty string name")

    description = data.get("description")
    if description is None:
        description = f"Call external tool {name}."
    if not isinstance(description, str):
        raise ValueError(f"External tool {name!r} description must be a string")

    parameters = data.get("parameters") or _JSON_OBJECT_SCHEMA
    if not isinstance(parameters, Mapping):
        raise ValueError(f"External tool {name!r} parameters must be a JSON Schema object")

    strict = bool(data.get("strict", raw.get("strict", False)))
    return cls(name=name, description=description, parameters=parameters, strict=strict)

Documents

lazytools.documents.read_folder_docs

read_folder_docs(path: str, extensions: str = 'txt,md,pdf,docx,html', html_mode: str = 'parsed', recursive: bool = False, output_format: str = 'text', *, base_dir: str | None = None, max_file_bytes: int | None = DEFAULT_MAX_FILE_BYTES, max_files: int | None = DEFAULT_MAX_FILES) -> str

Read documents from a file or folder and return their text content.

Accepts either a single file path or a folder path. When given a folder, scans for all matching files (optionally recursive). When given a file, reads that file directly regardless of the extensions filter.

Supported formats: .txt, .md, .pdf, .docx, .html/.htm. HTML files can be returned as clean extracted body text, raw HTML, or both.

Parameters:

Name Type Description Default
path str

Path to a single file OR a folder to scan. File example: "/reports/q4.pdf" Folder example: "/reports"

required
extensions str

Comma-separated list of file extensions to include when scanning a folder. Ignored when path points to a single file. Supported values: txt, md, pdf, docx, html. Default: "txt,md,pdf,docx,html" (all formats). Example: "pdf,docx" to read only PDFs and Word files.

'txt,md,pdf,docx,html'
html_mode str

How to process HTML and HTM files. "parsed" — clean readable text extracted by trafilatura (default). "full" — raw HTML source, unmodified. "both" — parsed body text first, then raw HTML source.

'parsed'
recursive bool

Whether to search subfolders recursively when path is a folder. False (default) — top-level files only. True — all files in all subfolders. Ignored when path points to a single file.

False
output_format str

How to format the combined output. "text" (default) — a single human/LLM-readable string with headers. "json" — a JSON object with a "records" array (one entry per file, each with per-file metadata and content) plus truncation fields: "truncated" (bool), "max_files" (the cap applied), and "total_found" (matches discovered before the cap). Parse the output with json.loads and index ["records"] for the file list. Note: the not-found and empty-folder cases below are reported as plain strings even when "json" is requested, so guard json.loads for those (e.g. only parse output that starts with "{").

'text'

Returns:

Type Description
str

A single string. For output_format="text" this is the concatenated,

str

human/LLM-readable document text; for output_format="json" it is the

str

serialized JSON object described above. Two cases always return a plain

str

(non-JSON) description string regardless of output_format: when the

str

path does not exist, and when a scanned folder contains no files

str

matching extensions ("[No documents found ...]").

Source code in src/lazytools/documents/read_docs.py
def read_folder_docs(
    path: str,
    extensions: str = "txt,md,pdf,docx,html",
    html_mode: str = "parsed",
    recursive: bool = False,
    output_format: str = "text",
    *,
    base_dir: str | None = None,
    max_file_bytes: int | None = DEFAULT_MAX_FILE_BYTES,
    max_files: int | None = DEFAULT_MAX_FILES,
) -> str:
    """Read documents from a file or folder and return their text content.

    Accepts either a single file path or a folder path.
    When given a folder, scans for all matching files (optionally recursive).
    When given a file, reads that file directly regardless of the extensions filter.

    Supported formats: .txt, .md, .pdf, .docx, .html/.htm.
    HTML files can be returned as clean extracted body text, raw HTML, or both.

    Args:
        path: Path to a single file OR a folder to scan.
            File example:   "/reports/q4.pdf"
            Folder example: "/reports"
        extensions: Comma-separated list of file extensions to include when
            scanning a folder. Ignored when path points to a single file.
            Supported values: txt, md, pdf, docx, html.
            Default: "txt,md,pdf,docx,html" (all formats).
            Example: "pdf,docx" to read only PDFs and Word files.
        html_mode: How to process HTML and HTM files.
            "parsed" — clean readable text extracted by trafilatura (default).
            "full"   — raw HTML source, unmodified.
            "both"   — parsed body text first, then raw HTML source.
        recursive: Whether to search subfolders recursively when path is a folder.
            False (default) — top-level files only.
            True  — all files in all subfolders.
            Ignored when path points to a single file.
        output_format: How to format the combined output.
            "text" (default) — a single human/LLM-readable string with headers.
            "json" — a JSON object with a "records" array (one entry per file,
                each with per-file metadata and content) plus truncation
                fields: "truncated" (bool), "max_files" (the cap applied), and
                "total_found" (matches discovered before the cap). Parse the
                output with ``json.loads`` and index ``["records"]`` for the
                file list. Note: the not-found and empty-folder cases below
                are reported as plain strings even when "json" is requested,
                so guard ``json.loads`` for those (e.g. only parse output that
                starts with "{").

    Returns:
        A single string. For ``output_format="text"`` this is the concatenated,
        human/LLM-readable document text; for ``output_format="json"`` it is the
        serialized JSON object described above. Two cases always return a plain
        (non-JSON) description string regardless of ``output_format``: when the
        path does not exist, and when a scanned folder contains no files
        matching ``extensions`` ("[No documents found ...]").
    """
    target = Path(path).expanduser().resolve()

    # When exposed as an agent tool, `path` is LLM-controlled and therefore
    # untrusted.  If the caller supplies `base_dir`, refuse any path that
    # resolves outside that sandbox.
    if base_dir is not None:
        base = Path(base_dir).expanduser().resolve()
        try:
            target.relative_to(base)
        except ValueError as exc:
            raise PermissionError(f"refused — path {str(target)!r} escapes base_dir {str(base)!r}") from exc

    if not target.exists():
        raise FileNotFoundError(f"path not found — {path}")

    if target.is_file():
        files = [target]
        root = target.parent
    elif target.is_dir():
        root = target
        exts: set[str] = set()
        for e in extensions.split(","):
            e = e.strip().lstrip(".").lower()
            if e:
                exts.add(f".{e}")
        if ".html" in exts:
            exts.add(".htm")
        glob_pattern = "**/*" if recursive else "*"
        # Walk the tree without following symlinks.  Doing so closes
        # symlink-loop hangs and prevents a symlink in the indexed
        # folder from silently widening the read surface to other
        # directories.
        files = sorted(
            f for f in root.glob(glob_pattern) if f.is_file() and not f.is_symlink() and f.suffix.lower() in exts
        )
        if not files:
            return f"[No documents found in '{path}' matching extensions: {extensions}]"
    else:
        raise ValueError(f"path is neither a file nor a directory — {path}")

    # Cap the number of files read in one call so a folder with thousands of
    # documents can't be slurped wholesale into a single tool result.
    files_truncated = False
    total_found = len(files)
    if max_files is not None and len(files) > max_files:
        files = files[:max_files]
        files_truncated = True

    records: list[dict] = []
    for fpath in files:
        suffix = fpath.suffix.lower()
        try:
            size = fpath.stat().st_size
        except OSError as exc:
            # The file may have vanished between glob and stat, or be
            # unreadable. Record the failure and move on rather than aborting
            # the whole scan.
            records.append(
                {
                    "filename": fpath.name,
                    "relative_path": str(fpath.relative_to(root)),
                    "extension": suffix.lstrip("."),
                    "size_bytes": 0,
                    "char_count": 0,
                    "content": f"[Error accessing file: {exc}]",
                }
            )
            continue
        reader = _EXT_READERS.get(suffix)
        if reader is None:
            content = f"[Unsupported extension: {suffix}]"
        elif max_file_bytes is not None and size > max_file_bytes:
            # Bound the on-disk size we will read into memory. This is a first
            # line of defence against memory exhaustion; note a small but
            # heavily-compressed file (e.g. a PDF) can still expand on extract.
            content = f"[Skipped: file is {size:,} bytes, exceeds max_file_bytes={max_file_bytes:,}]"
        else:
            try:
                content = reader(fpath, html_mode)  # type: ignore[operator]
            except Exception as exc:
                content = f"[Error reading file: {exc}]"
        records.append(
            {
                "filename": fpath.name,
                "relative_path": str(fpath.relative_to(root)),
                "extension": suffix.lstrip("."),
                "size_bytes": size,
                "char_count": len(content),
                "content": content,
            }
        )

    if output_format == "json":
        # Wrap in an object so callers can detect truncation. Returning a bare
        # list would silently drop the cap from JSON consumers (the text branch
        # appends a "NOTE" line, but downstream code parses the JSON shape).
        payload = {
            "records": records,
            "truncated": files_truncated,
            "max_files": max_files,
            "total_found": total_found,
        }
        return json.dumps(payload, ensure_ascii=False, indent=2)

    parts: list[str] = []
    for rec in records:
        header = (
            f"{'=' * 72}\n"
            f"FILE : {rec['relative_path']}\n"
            f"TYPE : {rec['extension'].upper()}   SIZE : {rec['size_bytes']:,} bytes   CHARS : {rec['char_count']:,}\n"
            f"{'=' * 72}"
        )
        parts.append(f"{header}\n\n{rec['content']}")

    truncation_note = (
        f" | NOTE: file list truncated to the first {max_files} files" if files_truncated else ""
    )
    summary = (
        f"[{len(records)} document(s) read from '{path}' | "
        f"extensions: {extensions} | html_mode: {html_mode} | recursive: {recursive}{truncation_note}]\n"
        f"{'─' * 72}\n\n"
    )
    return summary + "\n\n".join(parts)

lazytools.documents.read_docs_tools

read_docs_tools(*, base_dir: str, max_file_bytes: int | None = DEFAULT_MAX_FILE_BYTES, max_files: int | None = DEFAULT_MAX_FILES) -> list[Tool]

Return a single-element list with read_folder_docs wrapped as a Tool.

Parameters:

Name Type Description Default
base_dir str

Sandbox directory — required. read_folder_docs rejects any path that resolves outside this directory at runtime. The tool's path argument is LLM-controlled and therefore untrusted; without a sandbox an agent could read arbitrary files on the host (/etc/passwd, SSH keys, .env files, etc.). Passing None (or an empty string) raises ValueError — call read_folder_docs directly if you genuinely need un-sandboxed access from trusted code.

required
max_file_bytes int | None

Per-file size ceiling; a larger file is reported as skipped instead of read. Defaults to DEFAULT_MAX_FILE_BYTES.

DEFAULT_MAX_FILE_BYTES
max_files int | None

Ceiling on the number of files read per folder scan. Defaults to DEFAULT_MAX_FILES.

DEFAULT_MAX_FILES
Source code in src/lazytools/documents/read_docs.py
def read_docs_tools(
    *,
    base_dir: str,
    max_file_bytes: int | None = DEFAULT_MAX_FILE_BYTES,
    max_files: int | None = DEFAULT_MAX_FILES,
) -> list[Tool]:
    """Return a single-element list with ``read_folder_docs`` wrapped as a Tool.

    Args:
        base_dir: Sandbox directory — **required**. ``read_folder_docs`` rejects
            any path that resolves outside this directory at runtime. The tool's
            ``path`` argument is LLM-controlled and therefore untrusted; without
            a sandbox an agent could read arbitrary files on the host
            (``/etc/passwd``, SSH keys, ``.env`` files, etc.). Passing ``None``
            (or an empty string) raises ``ValueError`` — call ``read_folder_docs``
            directly if you genuinely need un-sandboxed access from trusted code.
        max_file_bytes: Per-file size ceiling; a larger file is reported as
            skipped instead of read. Defaults to ``DEFAULT_MAX_FILE_BYTES``.
        max_files: Ceiling on the number of files read per folder scan.
            Defaults to ``DEFAULT_MAX_FILES``.
    """
    from lazybridge import Tool

    if not base_dir:
        raise ValueError(
            "read_docs_tools(base_dir=...) is required. The tool's path argument "
            "is LLM-controlled, so without a sandbox an agent could read ANY file "
            "on the host. Pass base_dir='/safe/directory', or call read_folder_docs "
            "directly for trusted, non-LLM usage."
        )

    def _bound(
        path: str,
        extensions: str = "txt,md,pdf,docx,html",
        html_mode: str = "parsed",
        recursive: bool = False,
        output_format: str = "text",
    ) -> str:
        """Read documents from a file or folder, restricted to base_dir."""
        return read_folder_docs(
            path,
            extensions=extensions,
            html_mode=html_mode,
            recursive=recursive,
            output_format=output_format,
            base_dir=base_dir,
            max_file_bytes=max_file_bytes,
            max_files=max_files,
        )

    return [Tool(_bound, name="read_folder_docs", description=read_folder_docs.__doc__)]

Skills

lazytools.skills.build_skill

build_skill(source_dirs: Annotated[list[str], 'One or more folders containing documentation to index.'], skill_name: Annotated[str, 'Skill name — used as the bundle folder name and title.'], output_root: Annotated[str, 'Parent directory for the generated bundle.'] = './generated_skills', description: Annotated[str, 'What this skill covers (used in SKILL.md and tool description).'] = '', usage_notes: Annotated[str, 'Extra operational rules appended to SKILL.md.'] = '', include_extensions: Annotated[list[str], 'File extensions to index.'] = list(DEFAULT_EXTENSIONS), chunk_size: Annotated[int, 'Maximum characters per chunk.'] = 1800, chunk_overlap: Annotated[int, 'Overlap between char-mode chunks.'] = 180, copy_sources: Annotated[bool, 'Copy original docs into the bundle under sources/.'] = False, overwrite: Annotated[bool, 'Replace an existing bundle with the same name.'] = True, max_chars_per_file: Annotated[int, 'Safety cap on characters read per file.'] = 200000) -> dict[str, Any]

Index documentation folders and write a portable skill bundle to disk.

The bundle contains SKILL.md (LLM instructions), manifest.json (metadata + avgdl for BM25), vocab.json (Robertson IDF weights), and chunks.jsonl. Returns a metadata dict: skill_dir, indexed_files, total_chunks, avgdl.

Source code in src/lazytools/skills/doc_skills.py
def build_skill(
    source_dirs: Annotated[list[str], "One or more folders containing documentation to index."],
    skill_name: Annotated[str, "Skill name — used as the bundle folder name and title."],
    output_root: Annotated[str, "Parent directory for the generated bundle."] = "./generated_skills",
    description: Annotated[str, "What this skill covers (used in SKILL.md and tool description)."] = "",
    usage_notes: Annotated[str, "Extra operational rules appended to SKILL.md."] = "",
    include_extensions: Annotated[list[str], "File extensions to index."] = list(DEFAULT_EXTENSIONS),  # noqa: B006
    chunk_size: Annotated[int, "Maximum characters per chunk."] = 1800,
    chunk_overlap: Annotated[int, "Overlap between char-mode chunks."] = 180,
    copy_sources: Annotated[bool, "Copy original docs into the bundle under sources/."] = False,
    overwrite: Annotated[bool, "Replace an existing bundle with the same name."] = True,
    max_chars_per_file: Annotated[int, "Safety cap on characters read per file."] = 200_000,
) -> dict[str, Any]:
    """
    Index documentation folders and write a portable skill bundle to disk.

    The bundle contains SKILL.md (LLM instructions), manifest.json (metadata +
    avgdl for BM25), vocab.json (Robertson IDF weights), and chunks.jsonl.
    Returns a metadata dict: skill_dir, indexed_files, total_chunks, avgdl.
    """
    roots = [Path(p).expanduser().resolve() for p in source_dirs]
    for root in roots:
        if not root.is_dir():
            raise FileNotFoundError(f"Not a directory: {root}")

    skill_dir = Path(output_root).expanduser().resolve() / _slugify(skill_name)
    if skill_dir.exists():
        if not overwrite:
            raise FileExistsError(f"Skill already exists: {skill_dir}")
        shutil.rmtree(skill_dir)
    skill_dir.mkdir(parents=True)
    if copy_sources:
        (skill_dir / "sources").mkdir()

    indexed_files: list[str] = []
    all_chunks: list[DocChunk] = []

    for root in roots:
        for path in _iter_docs([root], include_extensions):
            try:
                text = path.read_text(encoding="utf-8", errors="ignore")[:max_chars_per_file]
            except Exception:
                continue
            chunks = _make_chunks(path, text, chunk_size, chunk_overlap)
            if not chunks:
                continue
            indexed_files.append(str(path))
            all_chunks.extend(chunks)
            if copy_sources:
                dest = skill_dir / "sources" / root.name / path.parent.relative_to(root)
                dest.mkdir(parents=True, exist_ok=True)
                shutil.copy2(path, dest / path.name)

    if not all_chunks:
        raise ValueError("No indexable documentation found in the provided folders.")

    description = description or f"Documentation skill built from {len(indexed_files)} files."
    avgdl = sum(c.doc_len for c in all_chunks) / len(all_chunks)
    idf = _build_idf(all_chunks)

    manifest = SkillManifest(
        name=skill_name,
        description=description,
        source_dirs=[str(p) for p in roots],
        indexed_files=indexed_files,
        total_chunks=len(all_chunks),
        avgdl=avgdl,
        extensions=list(include_extensions),
    )
    skill_md = _render_skill_md(
        name=skill_name,
        description=description,
        usage_notes=usage_notes,
        file_count=len(indexed_files),
        total_chunks=len(all_chunks),
        source_dirs=roots,
    )

    (skill_dir / "SKILL.md").write_text(skill_md, encoding="utf-8")
    (skill_dir / "manifest.json").write_text(
        json.dumps(asdict(manifest), ensure_ascii=False, indent=2), encoding="utf-8"
    )
    (skill_dir / "vocab.json").write_text(json.dumps(idf, ensure_ascii=False, indent=2), encoding="utf-8")
    with (skill_dir / "chunks.jsonl").open("w", encoding="utf-8") as f:
        for chunk in all_chunks:
            f.write(json.dumps(asdict(chunk), ensure_ascii=False) + "\n")

    return {
        "skill_dir": str(skill_dir),
        "skill_name": skill_name,
        "description": description,
        "indexed_files": indexed_files,
        "total_chunks": len(all_chunks),
        "avgdl": round(avgdl, 1),
    }

lazytools.skills.query_skill

query_skill(skill_dir: Annotated[str, 'Path to a skill bundle created by build_skill().'], task: Annotated[str, 'Question or task to answer from the indexed documentation.'], mode: Annotated[Literal['auto', 'answer', 'extract', 'locate', 'summarize'], "Execution mode. 'auto' detects intent from the task wording."] = 'auto', top_k: Annotated[int, 'Number of chunks to retrieve.'] = 8, max_chars: Annotated[int, 'Maximum characters in the returned context brief.'] = 10000, include_quotes: Annotated[bool, 'Append full excerpts after evidence bullets.'] = True) -> str

Retrieve the most relevant chunks via BM25 and return a grounded context brief ready to be injected into an LLM's context window.

Source code in src/lazytools/skills/doc_skills.py
def query_skill(
    skill_dir: Annotated[str, "Path to a skill bundle created by build_skill()."],
    task: Annotated[str, "Question or task to answer from the indexed documentation."],
    mode: Annotated[
        Literal["auto", "answer", "extract", "locate", "summarize"],
        "Execution mode. 'auto' detects intent from the task wording.",
    ] = "auto",
    top_k: Annotated[int, "Number of chunks to retrieve."] = 8,
    max_chars: Annotated[int, "Maximum characters in the returned context brief."] = 10_000,
    include_quotes: Annotated[bool, "Append full excerpts after evidence bullets."] = True,
) -> str:
    """
    Retrieve the most relevant chunks via BM25 and return a grounded context
    brief ready to be injected into an LLM's context window.
    """
    sdir = Path(skill_dir).expanduser().resolve()
    if not sdir.exists():
        raise FileNotFoundError(f"Skill directory not found: {sdir}")

    manifest = _load_manifest(sdir)
    chunks = _load_chunks(sdir)
    idf = _load_idf(sdir)
    skill_md = _load_skill_md(sdir)
    resolved_mode: MODE = _auto_mode(task) if mode == "auto" else mode  # type: ignore[assignment]

    q_tokens = _tokenize(task)
    ranked = sorted(((c, _bm25(c, q_tokens, idf, manifest.avgdl)) for c in chunks), key=lambda x: x[1], reverse=True)
    selected = [c for c, score in ranked[: max(1, top_k)] if score > 0]

    if not selected:
        return (
            f"[skill] {manifest.name}\n[task] {task}\n\n"
            "No relevant documentation was retrieved for this task. "
            "Do not answer beyond the indexed evidence."
        )[:max_chars]

    brief = _build_brief(manifest=manifest, skill_md=skill_md, task=task, mode=resolved_mode, selected=selected)
    result_lines: list[str] = []

    if resolved_mode == "locate":
        result_lines = ["Relevant files:"] + [f"  • {p}" for p in dict.fromkeys(c.path for c in selected)]
    elif resolved_mode == "extract":
        result_lines = ["Excerpts:"]
        for c in selected:
            result_lines.append(f"\n### {c.heading}  [{Path(c.path).name}]\n{_trim(c.text, 1000)}")
    elif resolved_mode == "summarize":
        result_lines = ["Summary:"]
        for c in selected[:6]:
            condensed = re.sub(r"\s+", " ", c.text)
            result_lines.append(f"  - {_trim(condensed, 300)}  [{Path(c.path).name}]")
    else:
        result_lines = ["Best evidence:"]
        for c in selected[:5]:
            condensed = re.sub(r"\s+", " ", c.text)
            result_lines.append(f"  - {_trim(condensed, 400)}  [{Path(c.path).name}]")
        if include_quotes:
            result_lines.append("\nFull excerpts:")
            for c in selected[:3]:
                result_lines.append(f"\n### {c.heading}  [{Path(c.path).name}]\n{_trim(c.text, 800)}")

    return (brief + "\n\n[result]\n" + "\n".join(result_lines))[:max_chars]

lazytools.skills.skill_tools

skill_tools(*, skill_dir: Annotated[str, 'Path to a skill bundle created by build_skill().'], name: Annotated[str | None, 'Tool name exposed to the agent.'] = None, description: Annotated[str | None, 'Tool description.'] = None, strict: Annotated[bool, 'Strict JSON schema validation.'] = False) -> list[Tool]

Return a single-element list containing a query_skill() Tool ready to be passed to any agent or pipeline.

Source code in src/lazytools/skills/doc_skills.py
def skill_tools(
    *,
    skill_dir: Annotated[str, "Path to a skill bundle created by build_skill()."],
    name: Annotated[str | None, "Tool name exposed to the agent."] = None,
    description: Annotated[str | None, "Tool description."] = None,
    strict: Annotated[bool, "Strict JSON schema validation."] = False,
) -> list[Tool]:
    """Return a single-element list containing a query_skill() Tool ready
    to be passed to any agent or pipeline."""
    sdir = Path(skill_dir).expanduser().resolve()
    manifest = _load_manifest(sdir)

    def _run(
        task: Annotated[str, "Question or task to answer from this skill."],
        mode: Annotated[Literal["auto", "answer", "extract", "locate", "summarize"], "Retrieval mode."] = "auto",
        top_k: Annotated[int, "Number of chunks to retrieve."] = 8,
        include_quotes: Annotated[bool, "Include full excerpts."] = True,
    ) -> str:
        """Query a local documentation skill and return a grounded context brief.

        Use when the task is about the documentation indexed by this skill;
        treat the result as grounded evidence and answer only from it.
        """
        return query_skill(str(sdir), task, mode=mode, top_k=top_k, include_quotes=include_quotes)

    return [
        Tool(
            _run,
            name=name or _slugify(manifest.name),
            description=description or manifest.description,
            strict=strict,
        )
    ]

lazytools.skills.skill_builder_tools

skill_builder_tools(*, name: Annotated[str, 'Tool name.'] = 'build_doc_skill', description: Annotated[str, 'Tool description.'] = 'Index documentation folders into a reusable local skill bundle. Call this to transform one or more documentation folders into a queryable local skill.', strict: Annotated[bool, 'Strict JSON schema validation.'] = False) -> list[Tool]

Return a single-element list containing a Tool that builds skill bundles.

Source code in src/lazytools/skills/doc_skills.py
def skill_builder_tools(
    *,
    name: Annotated[str, "Tool name."] = "build_doc_skill",
    description: Annotated[str, "Tool description."] = (
        "Index documentation folders into a reusable local skill bundle. "
        "Call this to transform one or more documentation folders into a queryable local skill."
    ),
    strict: Annotated[bool, "Strict JSON schema validation."] = False,
) -> list[Tool]:
    """Return a single-element list containing a Tool that builds skill bundles."""
    return [Tool(build_skill, name=name, description=description, strict=strict)]

lazytools.skills.skill_pipeline

skill_pipeline(*, skill_dir: Annotated[str, 'Path to a skill bundle.'], provider: Annotated[str | Any, 'LazyBridge provider alias or instance.'] = 'anthropic', router_model: Annotated[str | None, 'Model for the task-sharpening router.'] = None, executor_model: Annotated[str | None, 'Model for the grounded-answer executor.'] = None, session: Annotated[Any, 'Optional Session. Created if omitted.'] = None, native_tools: Annotated[list | None, 'Provider-native tools for the executor.'] = None) -> Tool

Two-step pipeline exposed as a single Tool.

  1. Router — rewrites the user task into a retrieval-optimised query.
  2. Executor — calls skill_tools() and synthesises a grounded answer.

Returns an Agent.chain(router, executor).as_tool().

Source code in src/lazytools/skills/doc_skills.py
def skill_pipeline(
    *,
    skill_dir: Annotated[str, "Path to a skill bundle."],
    provider: Annotated[str | Any, "LazyBridge provider alias or instance."] = "anthropic",
    router_model: Annotated[str | None, "Model for the task-sharpening router."] = None,
    executor_model: Annotated[str | None, "Model for the grounded-answer executor."] = None,
    session: Annotated[Any, "Optional Session. Created if omitted."] = None,
    native_tools: Annotated[list | None, "Provider-native tools for the executor."] = None,
) -> Tool:
    """
    Two-step pipeline exposed as a single Tool.

      1. Router   — rewrites the user task into a retrieval-optimised query.
      2. Executor — calls skill_tools() and synthesises a grounded answer.

    Returns an Agent.chain(router, executor).as_tool().
    """
    from lazybridge import LLMEngine

    sdir = Path(skill_dir).expanduser().resolve()
    manifest = _load_manifest(sdir)
    sess = session or Session()
    s_tool = skill_tools(skill_dir=str(sdir))[0]

    # Resolve model strings: use the explicit model override if given, else the provider alias.
    router_model_str = router_model or (provider if isinstance(provider, str) else "anthropic")
    executor_model_str = executor_model or (provider if isinstance(provider, str) else "anthropic")

    router = Agent(
        engine=LLMEngine(
            router_model_str,
            system=(
                "You sharpen user queries for a local documentation retrieval system. "
                "Return a single concise retrieval query. "
                "Preserve every technical identifier: class names, method names, "
                "parameter names, error codes, configuration keys. "
                "Do not answer the question. Do not add facts."
            ),
        ),
        name="skill_router",
        session=sess,
    )
    executor = Agent(
        engine=LLMEngine(
            executor_model_str,
            system=(
                "You answer from the local skill tool only. Always call the skill tool first. "
                "Build your answer exclusively from the tool result. "
                "Name every source file you use. "
                "If the skill returns weak or absent evidence, say so explicitly."
            ),
            native_tools=native_tools,
        ),
        name="skill_executor",
        session=sess,
        tools=[s_tool],
    )

    pipeline = Agent.chain(router, executor, name="doc_skill_pipeline", session=sess)
    return pipeline.as_tool(
        "doc_skill_pipeline",
        description=(
            f"Grounded local-docs pipeline: {manifest.description}. "
            "Use for questions grounded in the indexed documentation. "
            "The pipeline sharpens the query then retrieves and synthesises the answer."
        ),
    )

Planners

These ship in the LazyBridge core (lazybridge.ext.planners), not in lazytoolkit — documented here for completeness. See the Planners guide. orchestrator_agent / blackboard_orchestrator_agent are the canonical names; the make_* symbols below are the same callables.

The blackboard planner (make_blackboard_planner / blackboard_orchestrator_agent) takes the same arguments — see the Blackboard guide for its full reference.

lazybridge.ext.planners.make_planner

make_planner(agents: list[Agent], *, model: str = 'claude-opus-4-7', system: str | None = None, name: str = 'planner', verbose: bool = False, verify: Agent | None = None, max_verify: int = 3) -> Agent

Build a planner :class:Agent over the given sub-agents.

The returned agent has
  • each sub-agent in agents as a direct tool (so it can call one when that's enough);
  • five builder tools (create_plan, add_step, inspect_plan, run_plan, discard_plan) that compose a :class:Plan one step at a time, with local validation per step.

Parameters:

Name Type Description Default
agents list[Agent]

The sub-agents the planner may dispatch to. Each must have a unique .name; the planner addresses them by that name in StepSpec.agent.

required
model str

Provider model id for the planner LLM. Default "claude-opus-4-7".

'claude-opus-4-7'
system str | None

Override the planner's system prompt. By default we prepend "You are a generalist assistant." to :data:PLANNER_GUIDANCE so the LLM has decision rules and worked examples for execute_plan.

None
name str

Display name for the planner agent.

'planner'
verbose bool

If True, print event traces to stdout.

False
verify Agent | None

Optional judge :class:Agent that vets the planner's final output. When set, the planner's response runs through verify (LazyBridge's built-in verify-with-retry loop). The judge should reply "approved" or "rejected: "; on rejection the planner retries up to max_verify times with the judge's feedback in context. Costs one extra LLM call per attempt — use it for tasks where wrong answers are expensive.

None
max_verify int

Max judge attempts when verify is set. Default 3.

3

Returns:

Type Description
Agent

A configured planner :class:Agent. Call it with the user task.

Raises:

Type Description
ValueError

if agents is empty or contains duplicate names.

Source code in lazybridge/ext/planners/builder.py
def make_planner(
    agents: list[Agent],
    *,
    model: str = "claude-opus-4-7",
    system: str | None = None,
    name: str = "planner",
    verbose: bool = False,
    verify: Agent | None = None,
    max_verify: int = 3,
) -> Agent:
    """Build a planner :class:`Agent` over the given sub-agents.

    The returned agent has:
      - each sub-agent in ``agents`` as a direct tool (so it can call one
        when that's enough);
      - five **builder** tools (``create_plan``, ``add_step``,
        ``inspect_plan``, ``run_plan``, ``discard_plan``) that compose a
        :class:`Plan` one step at a time, with local validation per step.

    Args:
        agents: The sub-agents the planner may dispatch to. Each must have
            a unique ``.name``; the planner addresses them by that name in
            ``StepSpec.agent``.
        model: Provider model id for the planner LLM. Default
            ``"claude-opus-4-7"``.
        system: Override the planner's system prompt. By default we prepend
            "You are a generalist assistant." to :data:`PLANNER_GUIDANCE`
            so the LLM has decision rules and worked examples for
            ``execute_plan``.
        name: Display name for the planner agent.
        verbose: If True, print event traces to stdout.
        verify: Optional judge :class:`Agent` that vets the planner's final
            output. When set, the planner's response runs through
            ``verify`` (LazyBridge's built-in verify-with-retry loop). The
            judge should reply "approved" or "rejected: <reason>"; on
            rejection the planner retries up to ``max_verify`` times with
            the judge's feedback in context. Costs one extra LLM call per
            attempt — use it for tasks where wrong answers are expensive.
        max_verify: Max judge attempts when ``verify`` is set. Default 3.

    Returns:
        A configured planner :class:`Agent`. Call it with the user task.

    Raises:
        ValueError: if ``agents`` is empty or contains duplicate names.
    """
    if not agents:
        raise ValueError("make_planner: agents list must not be empty")
    names = [a.name for a in agents]
    if len(set(names)) != len(names):
        raise ValueError(f"make_planner: agents must have unique names; got {names}")

    registry = {a.name: a for a in agents}
    builder_tools = make_plan_builder_tools(registry)

    if system is None:
        system = "You are a generalist assistant.\n\n" + PLANNER_GUIDANCE

    return Agent(
        engine=LLMEngine(model, system=system),
        tools=[*agents, *builder_tools],
        name=name,
        verbose=verbose,
        verify=verify,
        max_verify=max_verify,
    )

lazybridge.ext.planners.make_plan_builder_tools

make_plan_builder_tools(registry: dict[str, Agent], *, max_plans: int = 50) -> list[Tool]

Five builder tools that share state via closure.

Returns [create_plan, add_step, inspect_plan, run_plan, discard_plan].

The state is per-factory-instance — call make_plan_builder_tools fresh for each planner agent (or each session) if you want isolated blackboards. run_plan and discard_plan consume the plan from the dict, so memory stays bounded as long as the planner finishes its plans. max_plans is a hard cap on concurrent in-progress plans (oldest-evicted on overflow) so a misbehaving planner can't leak memory.

Source code in lazybridge/ext/planners/builder.py
def make_plan_builder_tools(
    registry: dict[str, Agent],
    *,
    max_plans: int = 50,
) -> list[Tool]:
    """Five builder tools that share state via closure.

    Returns ``[create_plan, add_step, inspect_plan, run_plan, discard_plan]``.

    The state is per-factory-instance — call ``make_plan_builder_tools``
    fresh for each planner agent (or each session) if you want isolated
    blackboards. ``run_plan`` and ``discard_plan`` consume the plan from
    the dict, so memory stays bounded as long as the planner finishes its
    plans. ``max_plans`` is a hard cap on concurrent in-progress plans
    (oldest-evicted on overflow) so a misbehaving planner can't leak memory.
    """
    if not registry:
        raise ValueError("plan tool registry must contain at least one agent")

    plans: dict[str, _PlanInProgress] = {}

    def _evict_if_full() -> None:
        if len(plans) >= max_plans:
            # Drop the oldest in-progress plan.
            oldest = min(plans.values(), key=lambda p: p.created_at)
            plans.pop(oldest.plan_id, None)

    # --- create_plan -----------------------------------------------------
    def create_plan(reasoning: str) -> str:
        """Start a new empty plan. Returns the plan_id to use in subsequent calls.

        Args:
            reasoning: Why this plan; which sub-agents and why; simplest
                shape that fits. Required — empty / boilerplate defeats
                the point of thinking first.
        """
        if not reasoning or not reasoning.strip():
            return (
                "REJECTED: reasoning is required and must be non-empty. "
                "Briefly state why this plan shape fits the task."
            )
        _evict_if_full()
        pid = uuid.uuid4().hex[:8]
        plans[pid] = _PlanInProgress(plan_id=pid, reasoning=reasoning.strip())
        return (
            f"plan_id={pid} (empty; add steps with add_step, then run_plan). "
            f"Available sub-agents: {sorted(registry)!r}."
        )

    # --- add_step --------------------------------------------------------
    def add_step(
        plan_id: str,
        name: str,
        agent: str,
        task_kind: Literal["literal", "from_prev", "from_step", "from_parallel"] = "from_prev",
        task_text: str | None = None,
        task_step: str | None = None,
        context_kind: Literal["from_step", "from_parallel"] | None = None,
        context_step: str | None = None,
        parallel: bool = False,
    ) -> str:
        """Append one step to a plan; validated immediately.

        On rejection, the plan is unchanged — fix the args and call again.

        Args:
            plan_id: From a prior ``create_plan``.
            name: Unique snake_case identifier within this plan.
            agent: Sub-agent name (must exist in the registry).
            task_kind: ``literal`` (use ``task_text``) / ``from_prev``
                (default; previous step's output) / ``from_step``
                (named earlier step's output) / ``from_parallel``
                (alias of ``from_step``, naming is for readability).
            task_text: Required when ``task_kind="literal"``.
            task_step: Required when ``task_kind`` is ``from_step`` or
                ``from_parallel``; must name an earlier step.
            context_kind: Optional secondary input pulled into the step's
                context. Useful to combine TWO parallel branches.
            context_step: Required when ``context_kind`` is set.
            parallel: ``true`` to run concurrently with adjacent
                ``parallel=true`` siblings.
        """
        if plan_id not in plans:
            return f"REJECTED: unknown plan_id {plan_id!r}."
        pip = plans[plan_id]
        err = _validate_step_addition(
            pip,
            name,
            agent,
            task_kind,
            task_text,
            task_step,
            context_kind,
            context_step,
            registry,
        )
        if err:
            return f"REJECTED: {err}"
        pip.steps.append(
            StepSpec(
                name=name,
                agent=agent,
                task_kind=task_kind,
                task_text=task_text,
                task_step=task_step,
                context_kind=context_kind,
                context_step=context_step,
                parallel=parallel,
            )
        )
        return f"ok ({len(pip.steps)} step(s) in plan {plan_id})"

    # --- inspect_plan ----------------------------------------------------
    def inspect_plan(plan_id: str) -> str:
        """Show the plan's current shape — useful between additions."""
        if plan_id not in plans:
            return f"REJECTED: unknown plan_id {plan_id!r}."
        return _format_progress(plans[plan_id])

    # --- run_plan --------------------------------------------------------
    async def run_plan(plan_id: str, task: str) -> str:
        """Materialise and run the plan; returns the final step's text.

        Consumes the plan (it's removed from the in-progress dict). To
        run again, build a new plan.
        """
        if plan_id not in plans:
            return f"REJECTED: unknown plan_id {plan_id!r}."
        pip = plans.pop(plan_id)
        if not pip.steps:
            return f"REJECTED: plan {plan_id} has no steps. Add at least one before running."
        spec = PlanSpec(reasoning=pip.reasoning, task=task, steps=pip.steps)
        try:
            plan = _materialize(spec, registry)
        except _PlanToolError as e:
            return f"PLAN_REJECTED: {e}"
        try:
            # 0.7.9 requires explicit name= on non-LLM engines; this throw-away
            # runner only exists to materialise + execute the plan once, so any
            # stable identifier suffices.
            runner = Agent(engine=plan, name=f"_planner_runner_{plan_id}")  # PlanCompiler defense-in-depth.
        except PlanCompileError as e:
            return _format_compile_error(e, registry)
        try:
            env = await runner.run(spec.task)
        except Exception as e:
            return f"PLAN_RUNTIME_ERROR: {type(e).__name__}: {e}"
        if env.error:
            return f"PLAN_RUNTIME_ERROR: {env.error.message}"
        return env.text()

    # --- discard_plan ----------------------------------------------------
    def discard_plan(plan_id: str) -> str:
        """Drop an in-progress plan without running it."""
        if plan_id not in plans:
            return f"REJECTED: unknown plan_id {plan_id!r}."
        plans.pop(plan_id)
        return f"ok (plan {plan_id} discarded)"

    # Customise descriptions so the LLM sees the registry inline.
    agents_summary = "Available sub-agents:\n" + "\n".join(
        f"- {n}: {(a.description or '').strip() or 'no description'}" for n, a in registry.items()
    )

    add_step.__doc__ = (add_step.__doc__ or "") + "\n\n" + agents_summary
    create_plan.__doc__ = (create_plan.__doc__ or "") + "\n\n" + agents_summary

    return [
        Tool(create_plan, mode="signature"),
        Tool(add_step, mode="signature"),
        Tool(inspect_plan, mode="signature"),
        Tool(run_plan, mode="signature"),
        Tool(discard_plan, mode="signature"),
    ]

lazybridge.ext.planners.PlanSpec

Bases: BaseModel

The argument shape of execute_plan.

lazybridge.ext.planners.StepSpec

Bases: BaseModel

One node in the plan DAG.

Composition sugar

chain and parallel are Agent classmethods in the LazyBridge core. See the Composition sugar guide.

lazybridge.Agent.chain classmethod

chain(*agents: Agent, **kwargs: Any) -> Agent

Run agents sequentially: output of each becomes input to the next.

Source code in lazybridge/agent.py
@classmethod
def chain(cls, *agents: Agent, **kwargs: Any) -> Agent:
    """Run agents sequentially: output of each becomes input to the next."""
    from lazybridge.engines.plan import Plan, Step

    steps = [Step(target=a, name=a.name) for a in agents]
    plan = Plan(*steps)
    name = kwargs.pop("name", "chain")
    # Don't auto-wrap agents as tools — ``Plan._exec_step`` dispatches
    # Agent targets via ``target.run()`` directly, so wrapping them
    # would just waste schema-compilation on every chain call.
    # Caller-supplied tools= in kwargs still pass through unchanged.
    return cls(engine=plan, name=name, **kwargs)

lazybridge.Agent.parallel classmethod

parallel(*agents: Agent, concurrency_limit: int | None = None, step_timeout: float | None = None, **kwargs: Any) -> ParallelAgent

Deterministic fan-out: run agents concurrently on the same task.

Returns a :class:ParallelAgent whose __call__ produces a single :class:Envelope — labelled-text join of every branch's output, with transitive cost rollup. For typed access to per-branch envelopes call ParallelAgent.run_branches(task) (async).

Use this when you know you want N things to happen in parallel. If you want the LLM to decide whether to call agents in parallel (and which, and how), don't use this — pass them as tools=[...] on a regular Agent instead; the engine emits parallel tool calls automatically when the model requests them.

Source code in lazybridge/agent.py
@classmethod
def parallel(
    cls,
    *agents: Agent,
    concurrency_limit: int | None = None,
    step_timeout: float | None = None,
    **kwargs: Any,
) -> ParallelAgent:
    """Deterministic fan-out: run ``agents`` concurrently on the same task.

    Returns a :class:`ParallelAgent` whose ``__call__`` produces a
    single :class:`Envelope` — labelled-text join of every branch's
    output, with transitive cost rollup.  For typed access to per-branch
    envelopes call ``ParallelAgent.run_branches(task)`` (async).

    Use this when you **know** you want N things to happen in
    parallel.  If you want the LLM to decide whether to call agents
    in parallel (and which, and how), don't use this — pass them as
    ``tools=[...]`` on a regular ``Agent`` instead; the engine emits
    parallel tool calls automatically when the model requests them.
    """
    return ParallelAgent(
        agents=list(agents),
        concurrency_limit=concurrency_limit,
        step_timeout=step_timeout,
        **kwargs,
    )

lazybridge.ParallelAgent

ParallelAgent(agents: list[Agent], *, concurrency_limit: int | None = None, step_timeout: float | None = None, name: str = 'parallel', description: str | None = None, session: Any | None = None)

Deterministic fan-out over N agents — the shape behind :meth:Agent.parallel.

Pre-scripted parallel runner. Every input agent receives the same task; the N branch results are folded into a single :class:Envelope via labelled-text join — same shape as :class:Plan's from_parallel_all aggregator. Cost roll-up is transitive. The first non-None branch error propagates as the wrapper's error so downstream consumers can short-circuit.

Prefer :class:Agent with tools=[...] when you want the engine (LLM, Supervisor, Plan) to decide dynamically which tools to invoke and when — parallel execution is automatic on that path.

Per-branch typed access: call :meth:run_branches (async) when you need list[Envelope] rather than the joined wrapper.

Source code in lazybridge/agent.py
def __init__(
    self,
    agents: list[Agent],
    *,
    concurrency_limit: int | None = None,
    step_timeout: float | None = None,
    name: str = "parallel",
    description: str | None = None,
    session: Any | None = None,
) -> None:
    self.agents = agents
    self.concurrency_limit = concurrency_limit
    self.step_timeout = step_timeout
    self.name = name
    self.description = description
    self.session = session

run_branches async

run_branches(task: str | Envelope) -> list[Envelope]

Async per-branch entry point — returns one Envelope per input agent in input order. Use this when you need typed access to individual branch results; for the framework-uniform single-Envelope view, use :meth:run or __call__.

Source code in lazybridge/agent.py
async def run_branches(self, task: str | Envelope) -> list[Envelope]:
    """Async per-branch entry point — returns one ``Envelope`` per
    input agent in input order.  Use this when you need typed
    access to individual branch results; for the framework-uniform
    single-Envelope view, use :meth:`run` or ``__call__``.
    """
    env = Agent._to_envelope(task) if isinstance(task, str) else task
    sem = asyncio.Semaphore(self.concurrency_limit) if self.concurrency_limit else None

    async def _run_one(agent: Agent) -> Envelope:
        async def _coro() -> Envelope:
            if self.step_timeout:
                return await asyncio.wait_for(agent.run(env), timeout=self.step_timeout)
            return await agent.run(env)

        if sem:
            async with sem:
                return await _coro()
        return await _coro()

    results = await asyncio.gather(*[_run_one(a) for a in self.agents], return_exceptions=True)
    out: list[Envelope] = []
    for r in results:
        if isinstance(r, Envelope):
            out.append(r)
        elif isinstance(r, asyncio.CancelledError):
            # CancelledError is BaseException (not Exception) in Python 3.8+;
            # wrapping it as an error envelope would silently swallow the
            # cancellation signal. Re-raise so structured cancellation works.
            raise r
        elif isinstance(r, Exception):
            out.append(Envelope.error_envelope(r))
        else:
            out.append(Envelope.error_envelope(RuntimeError(str(r))))
    return out

run async

run(task: str | Envelope) -> Envelope

Run every branch and return one folded :class:Envelope.

The wrapper's payload is the labelled-text join of every branch's .text(); metadata.nested_* rolls every branch's cost up so the outer envelope reports total spend. The first non-None branch error propagates as the wrapper's error.

For typed per-branch access, call :meth:run_branches.

Source code in lazybridge/agent.py
async def run(self, task: str | Envelope) -> Envelope:
    """Run every branch and return one folded :class:`Envelope`.

    The wrapper's ``payload`` is the labelled-text join of every
    branch's ``.text()``; ``metadata.nested_*`` rolls every branch's
    cost up so the outer envelope reports total spend.  The first
    non-``None`` branch error propagates as the wrapper's ``error``.

    For typed per-branch access, call :meth:`run_branches`.
    """
    branches = await self.run_branches(task)
    return self._join_branches(task, branches)

as_tool

as_tool(name: str | None = None, description: str | None = None) -> Tool

Expose the fan-out runner as a single :class:Tool.

Just delegates to :meth:run — same labelled-text Envelope as every direct caller sees, so a ParallelAgent passed in tools=[...] produces output identical to a hand-call.

Source code in lazybridge/agent.py
def as_tool(
    self,
    name: str | None = None,
    description: str | None = None,
) -> Tool:
    """Expose the fan-out runner as a single :class:`Tool`.

    Just delegates to :meth:`run` — same labelled-text Envelope as
    every direct caller sees, so a ``ParallelAgent`` passed in
    ``tools=[...]`` produces output identical to a hand-call.
    """
    from lazybridge.tools import Tool

    actual_name = name or self.name or "parallel"
    actual_desc = (
        description or self.description or (f"Run {len(self.agents)} agents in parallel and join their outputs.")
    )

    async def _run(task: str) -> Envelope:
        return await self.run(task)

    _run.__name__ = actual_name
    _run.__doc__ = actual_desc

    return Tool(
        _run,
        name=actual_name,
        description=actual_desc,
        mode="signature",
        returns_envelope=True,
    )

Human-in-the-loop

These ship in lazybridge.ext.hil. The *_agent factories return an Agent; HumanEngine / SupervisorEngine are Engines you pass via Agent(engine=…). See the Human-in-the-loop guide.

lazybridge.ext.hil.human_agent

human_agent(*, timeout: float | None = None, ui: Literal['terminal', 'web'] | Any = 'terminal', default: str | None = None, **agent_kwargs: Any) -> Agent

Build a human-input :class:Agent (approval gate / form-style HIL).

Symmetric counterpart of Agent.from_<kind>(...) for the :class:HumanEngine. Use this for synchronous human input — a prompt at the terminal or a web form — rather than the full REPL of :func:supervisor_agent.

Engine kwargs (timeout, ui, default) configure the :class:HumanEngine; remaining **agent_kwargs flow to the unified Agent constructor::

from lazybridge.ext.hil import human_agent

human_agent(timeout=60.0, default="approve")("Approve deploy?")
Source code in lazybridge/ext/hil/__init__.py
def human_agent(
    *,
    timeout: float | None = None,
    ui: Literal["terminal", "web"] | Any = "terminal",
    default: str | None = None,
    **agent_kwargs: Any,
) -> Agent:
    """Build a human-input :class:`Agent` (approval gate / form-style HIL).

    Symmetric counterpart of ``Agent.from_<kind>(...)`` for the
    :class:`HumanEngine`.  Use this for **synchronous human input** —
    a prompt at the terminal or a web form — rather than the full REPL
    of :func:`supervisor_agent`.

    Engine kwargs (``timeout``, ``ui``, ``default``) configure the
    :class:`HumanEngine`; remaining ``**agent_kwargs`` flow to the
    unified Agent constructor::

        from lazybridge.ext.hil import human_agent

        human_agent(timeout=60.0, default="approve")("Approve deploy?")
    """
    from lazybridge import Agent

    engine = HumanEngine(timeout=timeout, ui=ui, default=default)
    # 0.7.9 requires explicit name= on non-LLM engines.  Supply the
    # canonical default for the human-input factory; explicit ``name=``
    # in ``agent_kwargs`` wins.
    agent_kwargs.setdefault("name", "human")
    return Agent(engine=engine, **agent_kwargs)

lazybridge.ext.hil.supervisor_agent

supervisor_agent(*, tools: list[Any] | None = None, agents: list[Any] | None = None, store: Any | None = None, input_fn: Callable[[str], str] | None = None, ainput_fn: Callable[[str], Awaitable[str]] | None = None, timeout: float | None = None, default: str | None = None, **agent_kwargs: Any) -> Agent

Build a human-supervised :class:Agent (REPL + tool dispatch + retry).

Symmetric counterpart of Agent.from_<kind>(...) for the :class:SupervisorEngine. Kept on the ext side rather than as Agent.from_supervisor to respect the core/ext import boundary (see docs/guides/core-vs-ext.md).

Engine kwargs (tools, agents, store, input_fn / ainput_fn, timeout, default) configure the :class:SupervisorEngine; remaining **agent_kwargs (memory= / session= / output= / verify= / fallback= / guard= / name= / etc.) flow to the unified Agent constructor::

from lazybridge.ext.hil import supervisor_agent

supervisor_agent(
    tools=[search],
    agents=[researcher],   # human can `retry researcher: <feedback>`
    session=sess,
    name="ops-supervisor",
)("publish a policy brief")
Source code in lazybridge/ext/hil/__init__.py
def supervisor_agent(
    *,
    tools: list[Any] | None = None,
    agents: list[Any] | None = None,
    store: Any | None = None,
    input_fn: Callable[[str], str] | None = None,
    ainput_fn: Callable[[str], Awaitable[str]] | None = None,
    timeout: float | None = None,
    default: str | None = None,
    **agent_kwargs: Any,
) -> Agent:
    """Build a human-supervised :class:`Agent` (REPL + tool dispatch + retry).

    Symmetric counterpart of ``Agent.from_<kind>(...)`` for the
    :class:`SupervisorEngine`.  Kept on the ext side rather than as
    ``Agent.from_supervisor`` to respect the core/ext import boundary
    (see ``docs/guides/core-vs-ext.md``).

    Engine kwargs (``tools``, ``agents``, ``store``, ``input_fn`` /
    ``ainput_fn``, ``timeout``, ``default``) configure the
    :class:`SupervisorEngine`; remaining ``**agent_kwargs`` (``memory=`` /
    ``session=`` / ``output=`` / ``verify=`` / ``fallback=`` / ``guard=`` /
    ``name=`` / etc.) flow to the unified Agent constructor::

        from lazybridge.ext.hil import supervisor_agent

        supervisor_agent(
            tools=[search],
            agents=[researcher],   # human can `retry researcher: <feedback>`
            session=sess,
            name="ops-supervisor",
        )("publish a policy brief")
    """
    # Local import — ``Agent`` lives in core, but core never imports
    # from ext, only the reverse, so this is the architecturally
    # correct direction.
    from lazybridge import Agent

    engine = SupervisorEngine(
        tools=tools,
        agents=agents,
        store=store,
        input_fn=input_fn,
        ainput_fn=ainput_fn,
        timeout=timeout,
        default=default,
    )
    # 0.7.9 requires explicit name= on non-LLM engines.  ``supervisor_agent``
    # is the one-line ergonomic factory — give it a sensible default
    # (``"supervisor"``) when the caller didn't pass one.  An explicit
    # ``name=`` in ``agent_kwargs`` still wins.
    agent_kwargs.setdefault("name", "supervisor")
    return Agent(engine=engine, **agent_kwargs)

lazybridge.ext.hil.HumanEngine

HumanEngine(*, timeout: float | None = None, ui: Literal['terminal', 'web'] | _UIProtocol = 'terminal', default: str | None = None)

Presents the task to a human and returns their response as an Envelope.

With output=PydanticModel, terminal prompts each field; web renders a form. Emits the same 8 event types as LLMEngine for transparent observability.

Source code in lazybridge/ext/hil/human.py
def __init__(
    self,
    *,
    timeout: float | None = None,
    ui: Literal["terminal", "web"] | _UIProtocol = "terminal",
    default: str | None = None,
) -> None:
    self.timeout = timeout
    self.default = default
    if isinstance(ui, str):
        if ui == "terminal":
            self._ui: _UIProtocol = _TerminalUI(timeout=timeout, default=default)
        elif ui == "web":
            self._ui = _WebUI(timeout=timeout, default=default)
        else:
            raise ValueError(f"Unknown UI type: {ui!r}")
    else:
        self._ui = ui

lazybridge.ext.hil.SupervisorEngine

SupervisorEngine(*, tools: list[Tool | Callable | Any] | None = None, agents: list[Any] | None = None, store: Store | None = None, input_fn: Callable[[str], str] | None = None, ainput_fn: Callable[[str], Awaitable[str]] | None = None, timeout: float | None = None, default: str | None = None)

Human-in-the-loop engine with tool-calling and agent retry.

Source code in lazybridge/ext/hil/supervisor.py
def __init__(
    self,
    *,
    tools: list[Tool | Callable | Any] | None = None,
    agents: list[Any] | None = None,
    store: Store | None = None,
    input_fn: Callable[[str], str] | None = None,
    ainput_fn: Callable[[str], Awaitable[str]] | None = None,
    timeout: float | None = None,
    default: str | None = None,
) -> None:
    # Tool-is-Tool: accept plain functions and Agents too, not just Tool
    # instances.  Matches the contract of ``Agent(tools=[...])`` so the
    # same tools list can be handed to either surface.
    from lazybridge.tools import _wrap_tool

    wrapped = [_wrap_tool(t) for t in (tools or [])]
    self._tools = {t.name: t for t in wrapped}
    self._agents = {getattr(a, "name", f"agent-{i}"): a for i, a in enumerate(agents or [])}
    self._store = store
    self._input_fn = input_fn or (lambda prompt: input(prompt))
    self._ainput_fn = ainput_fn
    self.timeout = timeout
    self.default = default