+
Skip to content

Conversation

sorenbs
Copy link
Owner

@sorenbs sorenbs commented Sep 12, 2025

Original version

100k Bun: 3,591 ms | 276112 KB
1M Bun: 7,807 ms | 1,747,424 KB
10M Bun: 61,629 ms | 6,543,632 KB

Codex version

100k Bun: 156 ms | 109,056 KB
1M Bun: 750 ms | 191,152 KB
10M Bun: 14,687 ms | 174,416 KB

Complete dataflow program as implemented in memtest.ts:

import { D2 } from "../../src/d2.js"
import { MultiSet } from "../multiset.js"
import { map, reduce } from "../operators/index.js"

const graph = new D2()

const reviews = graph.newInput<{
  id: number
  listingId: number
  score: number
  text: string
}>()

// Group reviews by listingId and sum score
reviews.pipe(
  map((x) => [x.listingId, x.score]),
  reduce((values) => {
    // `values` is an array of [value, multiplicity] pairs for a specific key
    let sum = 0
    for (const [value, multiplicity] of values) {
      sum += value * multiplicity
    }
    return [[sum, 1]]
  })
)
graph.finalize()

// Get iteration count from command line argument, default to 100000
const ITER_COUNT = process.argv[2] ? parseInt(process.argv[2], 10) : 1000
console.log(`ITER_COUNT: ${ITER_COUNT}`)

const t0 = Date.now()

for (let i = 1; i < ITER_COUNT; i++) {
  reviews.sendData(
    new MultiSet([[{ id: 3 + i, listingId: 1, score: 1, text: `tada` }, 1]])
  )
  //   reviews.sendFrontier(i+1)

  graph.run()
}

const t1 = Date.now()

console.log(`Time taken: ${t1 - t0} ms`)

src/valueIndex.ts:28–40 — fast path for primitive equality (number/string) avoids hashing when possible.
Each operator factory added a fresh “public” reader via graph.addStream(output.connectReader()). These readers are not consumed anywhere in the graph engine (D2.run() only checks operator inputs).
As a result, every message at every stage was duplicated into at least one extra queue that was never drained, leading to unbounded growth.
In src/d2.ts, newInput() also added such a reader, further duplicating messages at the graph boundary.
This is an architectural memory leak: dangling readers retained all messages indefinitely.

Changes Implemented

Stop creating unconsumed “public” readers:
src/d2.ts:28–34 — newInput no longer creates and stores a reader.
src/d2.ts:41–45 — addStream kept as a no-op for API compatibility (prevents accidental accumulation).
Remove graph.addStream(output.connectReader()) across all operators so they don’t create dangling readers:
src/operators/map.ts:44
src/operators/reduce.ts:120
src/operators/filter.ts:… (operator creation site)
src/operators/concat.ts:… (operator creation site)
src/operators/output.ts:… (operator creation site)
src/operators/debug.ts:… (operator creation site)
src/operators/tap.ts:… (operator creation site)
src/operators/join.ts:… (operator creation site)
src/operators/count.ts:… (operator creation site)
src/operators/distinct.ts:… (operator creation site)
src/operators/consolidate.ts:… (operator creation site)
src/operators/topKWithFractionalIndex.ts:… (operator creation site)
src/operators/topKWithFractionalIndexBTree.ts:… (operator creation site)

Validation

Memory after fix (gtime):
1k: Max memory: 54112 KB
100k: Max memory: 108112 KB
1M: Max memory: 176160 KB

Compared to the pre-fix 1M at ~1.26 GB, this is a dramatic improvement. The remaining increase is modest and consistent with runtime/GC behavior and transient allocations (e.g., output multisets per step), not retained state.

Why This Works

Operators still create one reader per actual consumer (the next operator in the pipeline). Those are drained synchronously in each step().
No extra dangling readers are created; hence no permanent queues retain messages.
If a stream has no consumer (like the final reduce in the memtest), its writer has zero readers, so sendData does nothing — no queue, no retention.

Please explain to me:

Why did this library implement these “public” readers? Are they needed for any part of the public API? Can we safely remove them without impacting the public API and functioning of the library?

Here’s the straight answer.

Why they existed

Likely intent: Materialize every operator’s output “somewhere” by default — either as a vestige from the original D2TS design (where graphs tracked streams/frontiers) or as a convenience to make outputs observable later.
In this simplified, version-free engine, that mechanism was never actually used. The graph scheduler does not read those queues, nor does the public API expose them.

Are they needed for the public API?

No. Public consumption is already explicit:
IStreamBuilder.connectReader() lets you attach readers where you need results. See implementation in src/d2.ts:81.
Side-effect sinks (e.g. debug, output) do observation without needing a reader.
The graph’s private storage of readers is not exposed and is not used by scheduling:
D2.run() drives work by operators only; it doesn’t look at readers. See src/d2.ts:61 and src/d2.ts:65.
Readers themselves only buffer for the parties that explicitly connect. See src/graph.ts:46 (writer creates a queue per reader) and src/graph.ts:19 (reader drains and clears).

Can we remove them safely?

Yes. We removed the automatic “public” readers and everything still works:
No change to surface API or types.
All tests still pass (247/247).
Memory usage improves dramatically because we no longer create undrained queues holding every message.

Behavioral semantics after removal

Streams without a consumer: If you don’t connect a reader (or end in a sink), outputs are dropped — which matches normal dataflow semantics. Previously, they were buffered in invisible, undrained queues (the source of the memory growth).
How to consume results explicitly:
Example: const reader = output.connectReader(); graph.run(); const messages = reader.drain();
This is consistent with the library’s public contract — users already needed to attach readers for data they wanted to materialize.
Key references

src/graph.ts:36 — Writer enqueues to each reader’s queue.
src/graph.ts:19 — Reader drain() clears its queue.
src/d2.ts:61 and src/d2.ts:65 — Scheduler only asks operators if they have pending input and runs them; it never reads “graph-level” readers.
src/d2.ts:81 — StreamBuilder.connectReader() is the supported way to consume a stream.
Bottom line

The “public” readers were not part of the usable public API and weren’t used by the engine. They created undrained queues that retained all messages.
Removing them is safe and correct. If you want materialized outputs, attach a reader explicitly or use a sink operator. If you want a convenience “materialize()” operator that returns a handle, we can add one as a follow-up.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载