这是indexloc提供的服务,不要输入任何密码
Skip to content

Conversation

@partic2
Copy link
Contributor

@partic2 partic2 commented May 20, 2025

No description provided.

@guest271314
Copy link
Contributor

Does this address Fetch #1254? Node.js, Deno, Bun, Workerd all full-duplex stream when duplex: "half" is set.

@saghul
Copy link
Owner

saghul commented Sep 9, 2025

Does this address Fetch #1254?

I can't seem to find that issue, can you link it?

@guest271314
Copy link
Contributor

Does this address Fetch #1254?

I can't seem to find that issue, can you link it?

whatwg/fetch#1254.

I brought this up previously over here

@saghul deno and node implement full-duplex streaming and processing upload streaming. I know what the implementation should look like. Might nt be today or tomorrow when we get this working. We should strive for excellence, no matter how long it takes to get there.

This issue I filed for Bun oven-sh/bun#7206.

The test

full_duplex_fetch_test.js

var wait = async (ms) => new Promise((r) => setTimeout(r, ms));
var encoder = new TextEncoder();
var decoder = new TextDecoder();
var { writable, readable } = new TransformStream();
var abortable = new AbortController();
var { signal } = abortable;
var writer = writable.getWriter();
var settings = { url: "https://comfortable-deer-52.deno.dev", method: "query" };
fetch(settings.url, {
  duplex: "half",
  method: settings.method,
  body: readable.pipeThrough(
    new TransformStream({
      transform(value, c) {
        c.enqueue(encoder.encode(value));
      },
    }),
  ),
  signal,
})
  .then((r) =>
    r.body.pipeTo(
      new WritableStream({
        async start() {
          this.now = performance.now();
          console.log(this.now);
          return;
        },
        async write(value) {
          console.log(decoder.decode(value));
        },
        async close() {
          console.log("Stream closed");
        },
        async abort(reason) {
          const now = ((performance.now() - this.now) / 1000) / 60;
          console.log({ reason });
        },
      }),
    )
  ).catch(async (e) => {
    console.log(e);
  });
await wait(1000);
await writer.write("test");
await wait(1500);
await writer.write("test, again");
await writer.close();

Here's the server code on Deno Deploy https://dash.deno.com/playground/comfortable-deer-52.

Note, technically it's possible to do upload streaming and full-duplex streaming over HTTP/1.1, or other means, using WHATWG fetch(). Doesn't necessarily have to be HTTP/2 (except for browsers, mainly Chromium because last time I checked Firefox Nightly doesn't implement upload streaming). See Allow streaming requests for HTTP/1.x
at https://issues.chromium.org/issues/434292497#comment4 and https://issues.chromium.org/issues/434292497#comment10, et al.

Results, using node nightly, deno canary, bun canary, tjs built a couple days ago

user@debian:~/bin$ node full_duplex_fetch_test.js
141976.214995
TEST
TEST, AGAIN
Stream closed
user@debian:~/bin$ deno -A full_duplex_fetch_test.js
A new canary release of Deno is available. Run `deno upgrade canary` to install it.
39487.778984
TESTTEST, AGAIN
Stream closed
user@debian:~/bin$ bun full_duplex_fetch_test.js
357.140365
TEST
TEST, AGAIN
Stream closed
user@debian:~/bin$ tjs run full_duplex_fetch_test.js
Error: Unsupported payload type
    at send (polyfills.js:10:8288)
    at <anonymous> (polyfills.js:10:15684)
    at Promise (native)
    at fetch (polyfills.js:10:14397)
    at <anonymous> (full_duplex_fetch_test.js:13:9)
    at evalFile (native)
    at <anonymous> (run-main.js:27:1459)

@guest271314
Copy link
Contributor

@saghul Re

(except for browsers, mainly Chromium

If you read the minutae you'll find the case of Chromium-based browsers implementing full-duplex streaming over fetch() between ServiceWorker and Client/WindowClient using Mojo internally, not hitting the network.

You'll also see the feature you can enable with --enable-features=FetchUploadStreaming which allows for HTTP/1.1 upload streaming over WHATWG Streams that wind up being Transfer-Encoding: chunked requests

var abortable = new AbortController();

var { readable, writable } = new TransformStream({
  async transform(v, c) {
    for (let i = 0; i < v.length; i += 8192) {
      c.enqueue(v.subarray(i, i + 8192));
      await scheduler.postTask(() => {}, { delay: 30 });
    }
  },
  flush() {
    console.log("flush");
    // abortable.abort("Stream over");
  },
});
var writer = writable.getWriter();
var response = fetch("http://127.0.0.1:44818", {
  method: "post",
  duplex: "half",
  body: readable,
  signal: abortable.signal,
  allowHTTP1ForStreamingUpload: true,
}).then((r) => {
  console.log(r);
  return r.body.pipeThrough(new TextDecoderStream()).pipeTo(
    new WritableStream({
      write(v) {
        console.log(v);
        //abortable.abort("a reason");
      },
      close() {
        console.log("close");
      },
      abort(reason) {
        console.log(reason);
      },
    }),
  );
})
  .catch((e) => {
    console.log(e);
  })
  .then(() => {
    console.log("Done streaming");
  })
  .catch(console.log);
await scheduler.postTask(() => {}, { delay: 45 });
await writer.write(new Uint8Array(1024**2*7).fill(1));
await writer.ready;
await writer.close();

Here's a JavaScript runtime agnostic Transfer-Encoding: chunked parser and usage https://github.com/guest271314/direct-sockets-http-ws-server/blob/main/assets/get-chunked-data.js, https://github.com/guest271314/direct-sockets-http-ws-server/blob/main/assets/script.js#L100-L177.

@guest271314
Copy link
Contributor

Here's what I mean by "upload streaming" https://developer.chrome.com/docs/capabilities/web-apis/fetch-streaming-requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants