without_dag¶
Bounded-concurrency execution of DAG-shaped async workflows for without, liftable into a Processor.
without_dag
¶
Node
dataclass
¶
Node(
key: NodeKey,
dependencies: tuple[NodeKey, ...],
run: Callable[[tuple[object, ...]], Awaitable[object]],
)
One async step in a graph, named by key and wired by dependencies.
The narrow seam a graph-defining frontend lowers onto: a Node is a value,
not a place. run receives its dependencies' results as a tuple in
dependencies order and returns this node's single result. Results cross
this seam as object (the executor cannot know each step's type); a typed
frontend restores precision above it, exactly as without-web's Extractor
is collected as Extractor[object] and re-typed by into.
Plan
dataclass
¶
Plan(
by_key: Mapping[NodeKey, Node],
dependencies: Mapping[NodeKey, tuple[NodeKey, ...]],
consumers: Mapping[NodeKey, int],
)
A node set compiled into its input-independent scheduling structure.
Everything a run needs that does not depend on the input values: the nodes
by key, the dependency edges (as the graph graphlib wants), and the number
of consumers per key (so a result can be freed once its last dependent has
read it). Computed once and reused across runs as a value, so a fixed graph
driven per event never rebuilds any of it.
CompiledGraph
dataclass
¶
CompiledGraph(
nodes: tuple[Node, ...],
inputs: tuple[NodeKey, ...],
output: NodeKey,
limit: int | None,
_plan: Plan,
)
A frozen graph that is an async callable (*Ins) -> Out.
build returns one of these. It runs one bounded-concurrency execution per
call, seeding each entry the graph was opened over with the matching
positional argument and returning the value of its output. A single-input
graph is a plain Callable[[In], Awaitable[Out]], so it lifts into a
Processor with from_map. The scheduling structure is compiled once at
build into _plan and reused by every call and stream, so a graph driven
per event never recomputes it. nodes is kept so the structure is
recoverable (a future diagram is derived from this one declaration, not
maintained beside it).
stream
¶
stream(
*values: *Ins,
) -> AsyncGenerator[tuple[NodeKey, object]]
Run the whole graph, yielding each node's (key, result) as it completes.
The streaming counterpart to calling the graph: __call__ samples the
single output value (a behavior), stream reports every completion as
it happens (the events), letting the caller react as results land or read
several outputs. The positional arguments seed the graph's inputs,
checked against *Ins exactly as the call is; match a yielded key against
a Handle's key to pick out a node's result.
Graph
dataclass
¶
A builder that records async steps and returns typed Handles.
of opens a graph over its entry types and hands back a tuple of one Handle
per type, node adds a step wired to the handles it depends on, and build freezes the
result into a CompiledGraph. Because the graph carries its entry pack in
its type (Graph[*Ins]), build needs only the output handle: it recovers
the inputs the graph already knows, so there is no second place to keep in
sync. The builder itself is a frozen value; the only mutation is appending to
its interior list of recorded nodes. Each step's function receives its
dependencies' results as positional arguments in the order its handles were
passed.
of
staticmethod
¶
of(
a: type[A], b: type[B], c: type[C]
) -> tuple[
Graph[A, B, C],
tuple[Handle[A], Handle[B], Handle[C]],
]
of(
a: type[A], b: type[B], c: type[C], d: type[D]
) -> tuple[
Graph[A, B, C, D],
tuple[Handle[A], Handle[B], Handle[C], Handle[D]],
]
of(
a: type[A],
b: type[B],
c: type[C],
d: type[D],
e: type[E],
) -> tuple[
Graph[A, B, C, D, E],
tuple[
Handle[A],
Handle[B],
Handle[C],
Handle[D],
Handle[E],
],
]
of(
a: type[A],
b: type[B],
c: type[C],
d: type[D],
e: type[E],
f: type[F],
) -> tuple[
Graph[A, B, C, D, E, F],
tuple[
Handle[A],
Handle[B],
Handle[C],
Handle[D],
Handle[E],
Handle[F],
],
]
of(
a: type[A],
b: type[B],
c: type[C],
d: type[D],
e: type[E],
f: type[F],
g: type[G],
) -> tuple[
Graph[A, B, C, D, E, F, G],
tuple[
Handle[A],
Handle[B],
Handle[C],
Handle[D],
Handle[E],
Handle[F],
Handle[G],
],
]
of(
a: type[A],
b: type[B],
c: type[C],
d: type[D],
e: type[E],
f: type[F],
g: type[G],
h: type[H],
) -> tuple[
Graph[A, B, C, D, E, F, G, H],
tuple[
Handle[A],
Handle[B],
Handle[C],
Handle[D],
Handle[E],
Handle[F],
Handle[G],
Handle[H],
],
]
Open a graph over inputs, returning it and a tuple of one Handle per entry type.
node
¶
node(
fn: Callable[[A, B, C], Awaitable[T]],
a: Handle[A],
b: Handle[B],
c: Handle[C],
) -> Handle[T]
node(
fn: Callable[[A, B, C, D], Awaitable[T]],
a: Handle[A],
b: Handle[B],
c: Handle[C],
d: Handle[D],
) -> Handle[T]
node(
fn: Callable[[A, B, C, D, E], Awaitable[T]],
a: Handle[A],
b: Handle[B],
c: Handle[C],
d: Handle[D],
e: Handle[E],
) -> Handle[T]
node(
fn: Callable[[A, B, C, D, E, F], Awaitable[T]],
a: Handle[A],
b: Handle[B],
c: Handle[C],
d: Handle[D],
e: Handle[E],
f: Handle[F],
) -> Handle[T]
node(
fn: Callable[
[A, B, C, D, E, F, G], Awaitable[T]
],
a: Handle[A],
b: Handle[B],
c: Handle[C],
d: Handle[D],
e: Handle[E],
f: Handle[F],
g: Handle[G],
) -> Handle[T]
node(
fn: Callable[
[A, B, C, D, E, F, G, H], Awaitable[T]
],
a: Handle[A],
b: Handle[B],
c: Handle[C],
d: Handle[D],
e: Handle[E],
f: Handle[F],
g: Handle[G],
h: Handle[H],
) -> Handle[T]
Add a node computing fn from the handles it depends on, returning its
result handle.
fn is called with the dependencies' results as positional arguments in
the order their handles are passed. The overloads above tie each handle's
type to fn's matching parameter, so a mismatch is a static error.
build
¶
build(
*, output: Handle[Out], limit: int | None = None
) -> CompiledGraph[*Ins, Out]
Freeze the recorded steps into a callable graph over the graph's inputs.
The scheduling structure is compiled once here, so running the graph
repeats no graph analysis. limit caps how many nodes run concurrently;
it defaults to None, which leaves concurrency unbounded (every ready
node runs at once). Pass an integer to cap it when the steps contend for
a scarce resource.
Handle
dataclass
¶
Handle(key: NodeKey)
Bases: Generic[_T_co]
A typed reference to a node's future result.
The token the builder hands back from of/node and takes back as a
dependency. T is phantom: the handle carries only an opaque key, but the
type flows through the wiring so a downstream step is checked against the
types of the handles it depends on. Because a caller can only pass handles
that already exist, a cycle is unrepresentable through this API.
drive
async
¶
drive(
plan: Plan,
inputs: Mapping[NodeKey, object],
limit: int | None,
) -> AsyncGenerator[tuple[NodeKey, object]]
Run a compiled Plan, yielding each (key, result) the instant it completes.
The streaming core, and the events half of the model. Yields completions in
whatever order nodes finish; the only ordering guarantee is the causal one, a
node after the dependencies it consumed. inputs pre-supplies the values of
source keys, marked done without running and never yielded. limit caps how
many nodes run concurrently (None is unbounded).
Scheduling replicates the shape of without.limit_concurrency
(asyncio.wait(..., return_when=FIRST_COMPLETED)) rather than calling it: the
scheduler needs the completed task's NodeKey to unlock successors, which
that lazy source hides. Acyclicity is proven by TopologicalSorter.prepare,
which raises graphlib.CycleError. Each node runs once; a result is dropped
as soon as its last dependent has captured it. A node that raises fails the
whole run, cancelling in-flight siblings, which is also how closing the
iterator early tears the run down.
evaluate
async
¶
evaluate(
plan: Plan,
target: NodeKey,
inputs: Mapping[NodeKey, object],
limit: int | None,
) -> object
Run every node in plan and return target's value: the behavior read.
A consumer of drive that runs the whole graph and keeps the one value the
caller wants, dropping the rest. target is a node whose completion supplies
the value, or a supplied input returned directly (an identity plan). There is
deliberately no early return on target: the graph is run to completion, so
the result reflects the whole graph and every node's effects have happened.
A target that is neither a defined node nor a supplied input raises
KeyError, matching drive, rather than silently reading back as None.