Skip to content

without_dag

Bounded-concurrency execution of DAG-shaped async workflows for without, liftable into a Processor.

without_dag

NodeKey

NodeKey = Hashable

Node dataclass

Node(
    key: NodeKey,
    dependencies: tuple[NodeKey, ...],
    run: Callable[[tuple[object, ...]], Awaitable[object]],
)

One async step in a graph, named by key and wired by dependencies.

The narrow seam a graph-defining frontend lowers onto: a Node is a value, not a place. run receives its dependencies' results as a tuple in dependencies order and returns this node's single result. Results cross this seam as object (the executor cannot know each step's type); a typed frontend restores precision above it, exactly as without-web's Extractor is collected as Extractor[object] and re-typed by into.

key instance-attribute

key: NodeKey

dependencies instance-attribute

dependencies: tuple[NodeKey, ...]

run instance-attribute

Plan dataclass

Plan(
    by_key: Mapping[NodeKey, Node],
    dependencies: Mapping[NodeKey, tuple[NodeKey, ...]],
    consumers: Mapping[NodeKey, int],
)

A node set compiled into its input-independent scheduling structure.

Everything a run needs that does not depend on the input values: the nodes by key, the dependency edges (as the graph graphlib wants), and the number of consumers per key (so a result can be freed once its last dependent has read it). Computed once and reused across runs as a value, so a fixed graph driven per event never rebuilds any of it.

by_key instance-attribute

by_key: Mapping[NodeKey, Node]

dependencies instance-attribute

dependencies: Mapping[NodeKey, tuple[NodeKey, ...]]

consumers instance-attribute

consumers: Mapping[NodeKey, int]

of classmethod

of(nodes: Iterable[Node]) -> Plan

Compile a node set into a reusable Plan.

CompiledGraph dataclass

CompiledGraph(
    nodes: tuple[Node, ...],
    inputs: tuple[NodeKey, ...],
    output: NodeKey,
    limit: int | None,
    _plan: Plan,
)

A frozen graph that is an async callable (*Ins) -> Out.

build returns one of these. It runs one bounded-concurrency execution per call, seeding each entry the graph was opened over with the matching positional argument and returning the value of its output. A single-input graph is a plain Callable[[In], Awaitable[Out]], so it lifts into a Processor with from_map. The scheduling structure is compiled once at build into _plan and reused by every call and stream, so a graph driven per event never recomputes it. nodes is kept so the structure is recoverable (a future diagram is derived from this one declaration, not maintained beside it).

nodes instance-attribute

nodes: tuple[Node, ...]

inputs instance-attribute

inputs: tuple[NodeKey, ...]

output instance-attribute

output: NodeKey

limit instance-attribute

limit: int | None

stream

stream(
    *values: *Ins,
) -> AsyncGenerator[tuple[NodeKey, object]]

Run the whole graph, yielding each node's (key, result) as it completes.

The streaming counterpart to calling the graph: __call__ samples the single output value (a behavior), stream reports every completion as it happens (the events), letting the caller react as results land or read several outputs. The positional arguments seed the graph's inputs, checked against *Ins exactly as the call is; match a yielded key against a Handle's key to pick out a node's result.

Graph dataclass

Graph(
    _nodes: list[Node] = list(),
    _input_keys: tuple[NodeKey, ...] = (),
)

A builder that records async steps and returns typed Handles.

of opens a graph over its entry types and hands back a tuple of one Handle per type, node adds a step wired to the handles it depends on, and build freezes the result into a CompiledGraph. Because the graph carries its entry pack in its type (Graph[*Ins]), build needs only the output handle: it recovers the inputs the graph already knows, so there is no second place to keep in sync. The builder itself is a frozen value; the only mutation is appending to its interior list of recorded nodes. Each step's function receives its dependencies' results as positional arguments in the order its handles were passed.

of staticmethod

of() -> tuple[Graph[], tuple[]]
of(a: type[A]) -> tuple[Graph[A], tuple[Handle[A]]]
of(
    a: type[A], b: type[B]
) -> tuple[Graph[A, B], tuple[Handle[A], Handle[B]]]
of(
    a: type[A], b: type[B], c: type[C]
) -> tuple[
    Graph[A, B, C],
    tuple[Handle[A], Handle[B], Handle[C]],
]
of(
    a: type[A], b: type[B], c: type[C], d: type[D]
) -> tuple[
    Graph[A, B, C, D],
    tuple[Handle[A], Handle[B], Handle[C], Handle[D]],
]
of(
    a: type[A],
    b: type[B],
    c: type[C],
    d: type[D],
    e: type[E],
) -> tuple[
    Graph[A, B, C, D, E],
    tuple[
        Handle[A],
        Handle[B],
        Handle[C],
        Handle[D],
        Handle[E],
    ],
]
of(
    a: type[A],
    b: type[B],
    c: type[C],
    d: type[D],
    e: type[E],
    f: type[F],
) -> tuple[
    Graph[A, B, C, D, E, F],
    tuple[
        Handle[A],
        Handle[B],
        Handle[C],
        Handle[D],
        Handle[E],
        Handle[F],
    ],
]
of(
    a: type[A],
    b: type[B],
    c: type[C],
    d: type[D],
    e: type[E],
    f: type[F],
    g: type[G],
) -> tuple[
    Graph[A, B, C, D, E, F, G],
    tuple[
        Handle[A],
        Handle[B],
        Handle[C],
        Handle[D],
        Handle[E],
        Handle[F],
        Handle[G],
    ],
]
of(
    a: type[A],
    b: type[B],
    c: type[C],
    d: type[D],
    e: type[E],
    f: type[F],
    g: type[G],
    h: type[H],
) -> tuple[
    Graph[A, B, C, D, E, F, G, H],
    tuple[
        Handle[A],
        Handle[B],
        Handle[C],
        Handle[D],
        Handle[E],
        Handle[F],
        Handle[G],
        Handle[H],
    ],
]
of(
    a: type[A],
    b: type[B],
    c: type[C],
    d: type[D],
    e: type[E],
    f: type[F],
    g: type[G],
    h: type[H],
    j: type[J],
) -> tuple[
    Graph[A, B, C, D, E, F, G, H, J],
    tuple[
        Handle[A],
        Handle[B],
        Handle[C],
        Handle[D],
        Handle[E],
        Handle[F],
        Handle[G],
        Handle[H],
        Handle[J],
    ],
]
of(
    a: type[A],
    b: type[B],
    c: type[C],
    d: type[D],
    e: type[E],
    f: type[F],
    g: type[G],
    h: type[H],
    j: type[J],
    k: type[K],
) -> tuple[
    Graph[A, B, C, D, E, F, G, H, J, K],
    tuple[
        Handle[A],
        Handle[B],
        Handle[C],
        Handle[D],
        Handle[E],
        Handle[F],
        Handle[G],
        Handle[H],
        Handle[J],
        Handle[K],
    ],
]
of(*inputs: type[object]) -> tuple[object, ...]

Open a graph over inputs, returning it and a tuple of one Handle per entry type.

node

node(fn: Callable[[], Awaitable[T]]) -> Handle[T]
node(
    fn: Callable[[A], Awaitable[T]], a: Handle[A]
) -> Handle[T]
node(
    fn: Callable[[A, B], Awaitable[T]],
    a: Handle[A],
    b: Handle[B],
) -> Handle[T]
node(
    fn: Callable[[A, B, C], Awaitable[T]],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
) -> Handle[T]
node(
    fn: Callable[[A, B, C, D], Awaitable[T]],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
) -> Handle[T]
node(
    fn: Callable[[A, B, C, D, E], Awaitable[T]],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
    e: Handle[E],
) -> Handle[T]
node(
    fn: Callable[[A, B, C, D, E, F], Awaitable[T]],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
    e: Handle[E],
    f: Handle[F],
) -> Handle[T]
node(
    fn: Callable[
        [A, B, C, D, E, F, G], Awaitable[T]
    ],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
    e: Handle[E],
    f: Handle[F],
    g: Handle[G],
) -> Handle[T]
node(
    fn: Callable[
        [A, B, C, D, E, F, G, H], Awaitable[T]
    ],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
    e: Handle[E],
    f: Handle[F],
    g: Handle[G],
    h: Handle[H],
) -> Handle[T]
node(
    fn: Callable[
        [A, B, C, D, E, F, G, H, J], Awaitable[T]
    ],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
    e: Handle[E],
    f: Handle[F],
    g: Handle[G],
    h: Handle[H],
    j: Handle[J],
) -> Handle[T]
node(
    fn: Callable[
        [A, B, C, D, E, F, G, H, J, K],
        Awaitable[T],
    ],
    a: Handle[A],
    b: Handle[B],
    c: Handle[C],
    d: Handle[D],
    e: Handle[E],
    f: Handle[F],
    g: Handle[G],
    h: Handle[H],
    j: Handle[J],
    k: Handle[K],
) -> Handle[T]
node(
    fn: Callable[..., Awaitable[T]], *deps: Handle[object]
) -> Handle[T]

Add a node computing fn from the handles it depends on, returning its result handle.

fn is called with the dependencies' results as positional arguments in the order their handles are passed. The overloads above tie each handle's type to fn's matching parameter, so a mismatch is a static error.

build

build(
    *, output: Handle[Out], limit: int | None = None
) -> CompiledGraph[*Ins, Out]

Freeze the recorded steps into a callable graph over the graph's inputs.

The scheduling structure is compiled once here, so running the graph repeats no graph analysis. limit caps how many nodes run concurrently; it defaults to None, which leaves concurrency unbounded (every ready node runs at once). Pass an integer to cap it when the steps contend for a scarce resource.

Handle dataclass

Handle(key: NodeKey)

Bases: Generic[_T_co]

A typed reference to a node's future result.

The token the builder hands back from of/node and takes back as a dependency. T is phantom: the handle carries only an opaque key, but the type flows through the wiring so a downstream step is checked against the types of the handles it depends on. Because a caller can only pass handles that already exist, a cycle is unrepresentable through this API.

key instance-attribute

key: NodeKey

drive async

drive(
    plan: Plan,
    inputs: Mapping[NodeKey, object],
    limit: int | None,
) -> AsyncGenerator[tuple[NodeKey, object]]

Run a compiled Plan, yielding each (key, result) the instant it completes.

The streaming core, and the events half of the model. Yields completions in whatever order nodes finish; the only ordering guarantee is the causal one, a node after the dependencies it consumed. inputs pre-supplies the values of source keys, marked done without running and never yielded. limit caps how many nodes run concurrently (None is unbounded).

Scheduling replicates the shape of without.limit_concurrency (asyncio.wait(..., return_when=FIRST_COMPLETED)) rather than calling it: the scheduler needs the completed task's NodeKey to unlock successors, which that lazy source hides. Acyclicity is proven by TopologicalSorter.prepare, which raises graphlib.CycleError. Each node runs once; a result is dropped as soon as its last dependent has captured it. A node that raises fails the whole run, cancelling in-flight siblings, which is also how closing the iterator early tears the run down.

evaluate async

evaluate(
    plan: Plan,
    target: NodeKey,
    inputs: Mapping[NodeKey, object],
    limit: int | None,
) -> object

Run every node in plan and return target's value: the behavior read.

A consumer of drive that runs the whole graph and keeps the one value the caller wants, dropping the rest. target is a node whose completion supplies the value, or a supplied input returned directly (an identity plan). There is deliberately no early return on target: the graph is run to completion, so the result reflects the whole graph and every node's effects have happened.

A target that is neither a defined node nor a supplied input raises KeyError, matching drive, rather than silently reading back as None.