forked from TanStack/db
-
Notifications
You must be signed in to change notification settings - Fork 0
Codex mem optimisation #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sorenbs
wants to merge
3
commits into
main
Choose a base branch
from
codex-mem-optimisation
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
src/valueIndex.ts:28–40 — fast path for primitive equality (number/string) avoids hashing when possible.
Each operator factory added a fresh “public” reader via graph.addStream(output.connectReader()). These readers are not consumed anywhere in the graph engine (D2.run() only checks operator inputs). As a result, every message at every stage was duplicated into at least one extra queue that was never drained, leading to unbounded growth. In src/d2.ts, newInput() also added such a reader, further duplicating messages at the graph boundary. This is an architectural memory leak: dangling readers retained all messages indefinitely. Changes Implemented Stop creating unconsumed “public” readers: src/d2.ts:28–34 — newInput no longer creates and stores a reader. src/d2.ts:41–45 — addStream kept as a no-op for API compatibility (prevents accidental accumulation). Remove graph.addStream(output.connectReader()) across all operators so they don’t create dangling readers: src/operators/map.ts:44 src/operators/reduce.ts:120 src/operators/filter.ts:… (operator creation site) src/operators/concat.ts:… (operator creation site) src/operators/output.ts:… (operator creation site) src/operators/debug.ts:… (operator creation site) src/operators/tap.ts:… (operator creation site) src/operators/join.ts:… (operator creation site) src/operators/count.ts:… (operator creation site) src/operators/distinct.ts:… (operator creation site) src/operators/consolidate.ts:… (operator creation site) src/operators/topKWithFractionalIndex.ts:… (operator creation site) src/operators/topKWithFractionalIndexBTree.ts:… (operator creation site) Validation Memory after fix (gtime): 1k: Max memory: 54112 KB 100k: Max memory: 108112 KB 1M: Max memory: 176160 KB Compared to the pre-fix 1M at ~1.26 GB, this is a dramatic improvement. The remaining increase is modest and consistent with runtime/GC behavior and transient allocations (e.g., output multisets per step), not retained state. Why This Works Operators still create one reader per actual consumer (the next operator in the pipeline). Those are drained synchronously in each step(). No extra dangling readers are created; hence no permanent queues retain messages. If a stream has no consumer (like the final reduce in the memtest), its writer has zero readers, so sendData does nothing — no queue, no retention. Please explain to me: Why did this library implement these “public” readers? Are they needed for any part of the public API? Can we safely remove them without impacting the public API and functioning of the library? Here’s the straight answer. Why they existed Likely intent: Materialize every operator’s output “somewhere” by default — either as a vestige from the original D2TS design (where graphs tracked streams/frontiers) or as a convenience to make outputs observable later. In this simplified, version-free engine, that mechanism was never actually used. The graph scheduler does not read those queues, nor does the public API expose them. Are they needed for the public API? No. Public consumption is already explicit: IStreamBuilder.connectReader() lets you attach readers where you need results. See implementation in src/d2.ts:81. Side-effect sinks (e.g. debug, output) do observation without needing a reader. The graph’s private storage of readers is not exposed and is not used by scheduling: D2.run() drives work by operators only; it doesn’t look at readers. See src/d2.ts:61 and src/d2.ts:65. Readers themselves only buffer for the parties that explicitly connect. See src/graph.ts:46 (writer creates a queue per reader) and src/graph.ts:19 (reader drains and clears). Can we remove them safely? Yes. We removed the automatic “public” readers and everything still works: No change to surface API or types. All tests still pass (247/247). Memory usage improves dramatically because we no longer create undrained queues holding every message. Behavioral semantics after removal Streams without a consumer: If you don’t connect a reader (or end in a sink), outputs are dropped — which matches normal dataflow semantics. Previously, they were buffered in invisible, undrained queues (the source of the memory growth). How to consume results explicitly: Example: const reader = output.connectReader(); graph.run(); const messages = reader.drain(); This is consistent with the library’s public contract — users already needed to attach readers for data they wanted to materialize. Key references src/graph.ts:36 — Writer enqueues to each reader’s queue. src/graph.ts:19 — Reader drain() clears its queue. src/d2.ts:61 and src/d2.ts:65 — Scheduler only asks operators if they have pending input and runs them; it never reads “graph-level” readers. src/d2.ts:81 — StreamBuilder.connectReader() is the supported way to consume a stream. Bottom line The “public” readers were not part of the usable public API and weren’t used by the engine. They created undrained queues that retained all messages. Removing them is safe and correct. If you want materialized outputs, attach a reader explicitly or use a sink operator. If you want a convenience “materialize()” operator that returns a handle, we can add one as a follow-up.
kmk142789
approved these changes
Sep 22, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Original version
100k Bun: 3,591 ms | 276112 KB
1M Bun: 7,807 ms | 1,747,424 KB
10M Bun: 61,629 ms | 6,543,632 KB
Codex version
100k Bun: 156 ms | 109,056 KB
1M Bun: 750 ms | 191,152 KB
10M Bun: 14,687 ms | 174,416 KB
Complete dataflow program as implemented in
memtest.ts
: