Skip to content

without

Core contracts and executor for without: the sans-IO stream-processor substrate.

without

Fold

Fold = Callable[[Stream[In]], Awaitable[S]]

Sink

Sink = Callable[[Stream[In]], Awaitable[None]]

Endo

Endo = Callable[[T], T]

Context

Bases: Protocol

A stream viewed as its latest value: the "behavior" half of the model.

Where consuming a stream sees every event, current samples the latest and never blocks. This is how long-lived state (config, a connection pool) is read: a context is just another processor's output that a reader samples rather than consumes. current MUST return a value; a context is never "not ready". The reader only ever gets a value, never a writable place.

current

current() -> T

Processor

Bases: Protocol

A transformation from a stream of inputs to a stream of outputs.

This is the only thing a user writes, and the only node type: a processor's output stream becomes another processor's input stream, all the way down.

I/O is decoupled, not forbidden. A processor MAY await I/O while handling an event (a database query, a closed-lifespan sub-request), reading its dependencies from injected Context values; this is why a scan's step is async. The point is not to ban I/O but to separate it into the right abstractions so the parts stay reusable: sources at the edge, behaviors via sample, effects contained in the step. The one rule: an effect MUST NOT escape the entrypoint. A processor awaits its I/O to completion and MUST NOT hand a half-open resource (an open socket, an unfinished task it does not own) back to the runtime. Testing injects fake Context dependencies.

Stream

Bases: Protocol

An asynchronous sequence of values.

A stream is the single shape every connection has. Sources that touch the outside world (a socket, a file watcher, a clock) are streams too: a stream is just the one shape every connection takes, whoever does the I/O.

Transition dataclass

Transition(state: S, output: Out)

The result of folding one event into a scan's state.

A value, never a place: a step returns the next state and the single output it emits, and mutates nothing the caller can observe. Splitting one event into several outputs is a wiring-style concern, not a per-step one, so a transition carries one output rather than a collection.

state instance-attribute

state: S

output instance-attribute

output: Out

Sample dataclass

Sample(_value: T)

current

current() -> T

updated async

updated() -> T

Wait for the drain to publish the next value, then return it.

The deterministic counterpart to current on the behavior edge: where current reads the latest value and never blocks, updated blocks until the background drain consumes and publishes the next value from the source, then returns it. It is the "await next update" signal a reader waits on (a test asserting on post-reload state, a control loop reacting to a config change) instead of guessing how long the background task needs. If the source raises instead of yielding, the wait raises that error rather than hanging, and the failure is terminal: once the source has failed, every later call re-raises it rather than registering a waiter that can never resolve. If the context closes first, the wait is cancelled.

Each call registers its own one-shot future resolved by the next publish, so concurrent waiters are independent: cancelling one deregisters it at once and never disturbs another. Like current, it inherits latest-wins: a waiter sees only publishes after it starts waiting, and a source that publishes faster than the reader re-arms collapses the values it missed. So updated is a "the state has moved on" signal, not a way to observe every value; consume the stream for that.

from_fold

from_fold(
    initial: S, step: Callable[[In, S], Awaitable[S]]
) -> Fold[In, S]

Build a leaf that folds a stream of events into a single final state.

The stateful terminus, dual to from_scan: where from_scan threads state and emits an output every step (a scan), from_fold threads state and yields only the final accumulated value when the stream ends (a true reduce). The step MAY await contained I/O, so a fold whose result you ignore is also how you run a stateful consumer for its effects.

from_map

from_map(
    step: Callable[[In], Awaitable[Out]],
) -> Processor[In, Out]

Build a processor from a stateless step: each event maps to one output.

The counterpart to from_scan for a processor that holds no state. Each event is handled independently of every other, so there is no initial to seed and no Transition to thread: step maps an event straight to its single output. Like from_scan the step is async so it MAY await contained I/O, and the effect MUST complete within each call (see Processor). Splitting one event into several outputs is a separate, wiring-style concern, not a per-step one, so the step returns a single value rather than a collection.

from_scan

from_scan(
    initial: S,
    step: Callable[
        [In, S], Awaitable[Transition[S, Out]]
    ],
) -> Processor[In, Out]

Build a processor from a stateful step that emits an output every event.

step is the kernel: given an event and the current state it returns the next state and the output it emits. It is async so it MAY await contained I/O (reading dependencies from Context values captured by closure), but a step that does no I/O is just an async def that never awaits. from_scan supplies the loop that threads state across the input stream, emitting one output per event: a scan, not a reduce (the collapse-to-one- value form is from_fold). The effect MUST complete within each call (see Processor).

from_sink

from_sink(
    step: Callable[[In], Awaitable[None]],
) -> Sink[In]

Build a leaf that consumes a stream for its effects and emits nothing.

The stateless terminus, dual to from_map: where a map turns each event into an output, a sink turns each event into an effect and yields no output stream at all. Awaiting it drains the stream to completion (or runs forever, for an unbounded source driven inside a background_task). The step MAY await contained I/O.

as_async_iterator async

as_async_iterator(
    items: AsyncIterable[T] | Iterable[T],
) -> AsyncIterator[T]

Normalize a sync or async iterable into a single async iterator.

Lets code that consumes via async for/anext accept either kind without branching on the iteration protocol at every use.

background_task async

background_task(
    coro: Coroutine[object, object, T],
) -> AsyncIterator[Task[T]]

Run coro as a task for the duration of the with block.

The task is started on entry and cancelled (then awaited) on exit, so it is bounded by the block and never leaks. If it finishes on its own with an exception, that surfaces when the block exits.

cancel_futures async

cancel_futures(
    futures: Iterable[Future[T] | None],
) -> None

Cancel every future, then await them all so their teardown completes.

Two phases on purpose: cancelling the whole set before awaiting any of them lets them tear down concurrently, instead of serially cancelling and waiting for one at a time. None entries are skipped, so a caller holding an optional task (task: asyncio.Task | None) can pass it without a guard. The futures are materialized first, so a caller may pass a live set the awaits will mutate. Each future's own CancelledError is suppressed; any other exception it raises during teardown propagates.

limit_concurrency async

limit_concurrency(
    aws: AsyncIterable[Awaitable[T]]
    | Iterable[Awaitable[T]],
    limit: int,
) -> AsyncIterator[Future[T]]

Run awaitables from aws with at most limit in flight, yielding each as it finishes.

A bounded-concurrency driver: it pulls the next awaitable from aws only while fewer than limit are already running. So a lazy source (an async generator that produces each unit of work on demand) is never advanced past the limit. That is what lets it gate a side-effecting source: an accept loop whose generator awaits socket.accept() only when pulled will never accept more connections than it can serve.

Each completed awaitable is yielded as a Future; call .result() on it to read the value or re-raise its exception. On early exit or cancellation, any still-running awaitables are cancelled and awaited, so none outlive the iteration.

limit must be at least 1; a non-positive limit is a ValueError, since it could only ever stall the source rather than run it.

Adapted from Limiting concurrency in asyncio.

sleep_forever async

sleep_forever() -> None

Suspend the current task until it is cancelled.

The idiom for a coroutine whose job is to stay alive until its surrounding scope tears it down: a server's run loop holding a bound socket open, a process that should idle until signalled. It awaits a future that never resolves, so it consumes nothing and ends only on cancellation.

buffer async

buffer(
    source: Stream[T], maxsize: int
) -> AsyncIterator[T]

Decouple a stream's producer from its consumer through a bounded queue.

A background task pulls from source as fast as backpressure allows and drops each value into a queue of at most maxsize items; the returned stream yields from that queue. So the source is driven independently of how fast the consumer pulls: a pull-based producer (an accept loop, a DAG's executed iterator) keeps making progress while a slower consumer catches up, up to maxsize items of slack before put blocks and backpressure reaches the producer.

maxsize must be at least 1: the bound is the backpressure, so an unbounded buffer (which could let a fast producer grow memory without limit) is a ValueError rather than a silent default. When source ends the queue is shut down and the stream ends once drained; if source raises, the buffered items still drain and then the error surfaces. Closing the stream early cancels the background task, so the producer never outlives its consumer.

collect async

collect(source: Stream[T]) -> list[T]

Drain a Stream into a list: the terminal that materializes every value.

The dual of stream_from_iterable. It runs until the source ends, so it suits bounded streams (a finished request, a shut-down queue); an endless source never returns.

compose

compose(
    first: Processor[A, B], second: Processor[B, C]
) -> Processor[A, C]

Compose two processors on the event edge: first then second.

The join type B may differ from A and C, so this adapts as well as chains. Pure composition (the only event-edge connector that needs nothing running); nest for three or more stages.

sample async

sample(source: Stream[T]) -> AsyncIterator[Sample[T]]

Connect to a stream on the behavior edge: read its latest value, not each.

The first value is sampled eagerly, so the context is never "not ready". A background task keeps the held value current while the with block is open, dropping intermediate values (latest-wins, no backpressure). A reader reads the held value through current (latest, non-blocking) or waits for the next one through updated (the deterministic "await next update" signal); the held value is mutated only by the drain. The yielded Sample is a Context, so a caller that only reads current can treat it as one. When the block exits, any still-pending updated waits are cancelled, so a task awaiting one is not left hanging on a context that has closed.

stack

stack(
    *middleware: Callable[[H, *Ctx], H],
) -> Callable[[H, *Ctx], H]

Compose middleware into one, first argument outermost; stack() is identity.

A middleware is (handler, *context) -> handler: it wraps a handler, given some fixed context, into a new handler of the same type. The context is whatever the setting threads through unchanged: nothing for a client exchange (Endo[H]), the connection state and scope for a server handler. stack threads the same context into every middleware and chains the handler through them, first outermost, so stack(f, g)(handler, *context) is f(g(handler, *context), *context).

Generic over the handler H (the value each middleware transforms) and the context pack *Ctx, which is bound once per call: every middleware in one stack(...) must therefore share a shape, and mixing shapes is a type error. The pack passes through untouched (never wrapped element-wise), which is exactly why one variadic generic covers every arity here where a heterogeneous ladder would be needed.

stream_from_iterable async

stream_from_iterable(
    values: Iterable[T],
) -> AsyncIterator[T]

Expose a fixed iterable as a Stream: the simplest source.

Turns already-in-hand values into the pull-based Stream the rest of without consumes, e.g. to emit a fixed reply or to feed a processor under test. stream_from_queue is the push-source counterpart.

stream_from_queue async

stream_from_queue(queue: Queue[T]) -> AsyncIterator[T]

Expose a queue as a Stream: the bridge from a push source to a pull stream.

A source that pushes (a server's accept loop, a callback-based client, a pub/sub subscriber) drops values into a queue; this turns that queue into the pull-based Stream the rest of without consumes. It ends gracefully when the queue is shut down (queue.shutdown()): remaining items still drain, then get raises QueueShutDown and the stream ends, letting a downstream fold return its final value. Shutting the queue down is thus the closable-stream signal; without it the stream never ends on its own and must be driven inside a background_task or otherwise cancelled by its consumer.