Streaming Manager
The streaming manager is the shared policy engine for out-of-core execution. It decides when to stream, how to tile, how deep to queue, and records what happened so tests and users can see the plan. Matmul, invert, eigvalsh, eigh, and eigvals_arnoldi register descriptors that plug in access patterns, guards, and resource budgets.
What the manager owns
- Routing: picks
route(streamingordirect) andreason, combining IO observability thresholds with per-op guards. - Planning: fills plan fields (
tile_shape,queue_depth,plan.access_pattern,trace_tag,events,storagesummary). - Execution glue: best-effort prefetch/discard for streaming routes and
impl=...annotations for the chosen implementation. - Registry: per-op
StreamingDescriptorentries that provide access patterns and policy hooks.
Lifecycle at a glance
- plan(op, operands, allow_huge=False): snapshots operands, chooses route, runs the descriptor guard, and computes tiles/queue depth.
- prefetch(plan, operands): only when
route == "streaming"; defaults to calling accelerators on backing files. - compute: the op executes (native, Python fallback, or BlockMatrix orchestration). The manager can annotate the implementation (
impl=...). - discard(plan, operands, result): only when streaming; best-effort discard on backing ranges.
- inspect:
pc.last_io_trace(...)returns the plan and event timeline for debugging and tests.
Plan schema (recorded via IO observability)
route/reason: routing choice and justification.tile_shape:(rows, cols)when streaming;Nonewhen direct. Tiles clamp to operand shapes.queue_depth: bounded to[1, 8]when streaming;0when direct.plan.access_pattern: descriptor-supplied tag (e.g.,blocked_rowcol).trace_tag: monotonic tag per op (op:N).events: list of{type, detail, reason?}; includes plan/prefetch/discard/compute annotations.storage: backing files, temporary flags, and storage roots gathered from operand snapshots.
Routing rules
- File-backed operands: force
streamingwith reasonfile-backed operand. - allow_huge=True: force
directwith reasonallow_huge bypassed threshold. - Threshold set: any estimated operand bytes over threshold →
streamingwithestimated bytes exceed threshold. - Threshold None:
directwithno threshold configuredunless a guard overrides. - Guards: per-op hooks can override route/reason. Example: non-square invert/eig* →
directwithnon_square; matmul mismatched shapes →directwithshape_mismatch.
Descriptor catalog (current)
| op | access_pattern | guard | tile budget | queue depth | notes |
|---|---|---|---|---|---|
| matmul | blocked_rowcol | shape mismatch → direct | budgeted square tiles clamped to shapes | 3 when streaming | Python tiling fallback annotates impl=streaming_python |
| invert | invert_dense | non-square → direct | default square tile | 1 when streaming | Fallback annotates impl=streaming_python |
| eigvalsh | symmetric_eigvals | non-square → direct | default square tile | 1 when streaming | Fallback annotates impl=streaming_python |
| eigh | symmetric_eigh | non-square → direct | default square tile | 1 when streaming | Fallback annotates impl=streaming_python |
| eigvals_arnoldi | arnoldi_topk | non-square → direct | default square tile | 1 when streaming | Fallback annotates impl=streaming_python |
Hooks and defaults
tile_budget_fn(threshold_bytes, snapshots): derives tiles from the memory threshold and itemsize; matmul halves budget across A/B and clamps to shapes; square ops reuse the default derivation.queue_depth_fn(route, snapshots): returns depth before coercion; manager caps to[1, 8]and zeroes when direct.guard(operands, snapshots, allow_huge): may override route/reason early; does not materialize data, uses snapshots.prefetch/discard: optional per-op hooks; defaults call accelerator prefetch/discard on backing files.annotate_impl(record, label): attachesimpl=labeland records a compute event.
Safety guarantees
- No streaming queue depth above 8; non-streaming queues are 0.
- Tile shapes always finite and clamped to operand extents; failures fall back to conservative defaults.
- Guards run before tiling so invalid shapes revert to direct routes instead of crashing in streaming codepaths.
- IO observability storage summaries remain intact for spill/backing-file diagnostics.
Debugging and tests
pc.last_io_trace()shows the latest plan;pc.last_io_trace("matmul")fetches by op.- Event timeline should contain
planplusio(prefetch/discard) andcompute(impl) entries for streaming routes. - Threshold-driven scenarios: set
pc.set_io_streaming_threshold(bytes)to force streaming in tests; set toNoneto validate the direct path. - File-backed fakes should force streaming regardless of threshold, exercising the guardrail for spill-backed inputs.
Extending
- Add a new op by registering a
StreamingDescriptorinpycauset.__init__with an access pattern, guard, and budget/queue hooks. - Keep guards deterministic and non-materializing (only consult snapshots and metadata).
- Prefer small, conservative tile/queue defaults; tighten once end-to-end tests validate throughput.