diff --git a/src/js/core/sockets.js b/src/js/core/sockets.js index 3e2d1d27..b161f15b 100644 --- a/src/js/core/sockets.js +++ b/src/js/core/sockets.js @@ -3,6 +3,7 @@ import { readableStreamForHandle, writableStreamForHandle } from './stream-utils const core = globalThis[Symbol.for('tjs.internal.core')]; +const kOnConnection = Symbol('kOnConnection'); export async function connect(transport, host, port, options = {}) { const addr = await resolveAddress(transport, host, port); @@ -67,18 +68,28 @@ export async function listen(transport, host, port, options = {}) { } handle.bind(addr, flags); - handle.listen(options.backlog); - return new Listener(handle); + const l=new Listener(handle); + + handle.listen(handle=>{ + l[kOnConnection](handle); + },options.backlog); + + return l; } case 'pipe': { const handle = new core.Pipe(); handle.bind(addr); - handle.listen(options.backlog); - return new Listener(handle); + const l=new Listener(handle); + + handle.listen(handle=>{ + l[kOnConnection](handle); + },options.backlog); + + return l; } case 'udp': { @@ -204,6 +215,16 @@ class Listener { this[kHandle] = handle; } + #handleQueue=[]; + #acceptQueue=[]; + [kOnConnection](handle) { + if (this.#acceptQueue.length>0) { + this.#acceptQueue.shift().resolve(handle); + } else { + this.#handleQueue.push(handle); + } + } + get localAddress() { if (!this[kLocalAddress]) { this[kLocalAddress] = this[kHandle].getsockname(); @@ -213,17 +234,36 @@ class Listener { } async accept() { - const handle = await this[kHandle].accept(); + let handle; + + if (this.#handleQueue.length>0) { + handle=this.#handleQueue.shift(); + } else { + handle=await new Promise((resolve,reject)=>{ + this.#acceptQueue.push({ resolve,reject }); + }); + } + if (typeof handle === 'undefined') { return; } + if (handle instanceof Error) { + throw handle; + } + return new Connection(handle); } close() { this[kHandle].close(); + + for (let v1 of this.#acceptQueue) { + v1.reject(new Error('closed')); + } + + this.#handleQueue=[]; } // Async iterator. diff --git a/src/mod_streams.c b/src/mod_streams.c index 87b1b452..494b773b 100644 --- a/src/mod_streams.c +++ b/src/mod_streams.c @@ -55,8 +55,8 @@ typedef struct { TJSPromise result; } read; struct { - TJSPromise result; - } accept; + JSValue on_connection; + } listen; } TJSStream; typedef struct { @@ -105,9 +105,9 @@ static JSValue tjs_stream_close(JSContext *ctx, JSValue this_val, int argc, JSVa TJS_SettlePromise(ctx, &s->read.result, 0, 1, &arg); TJS_ClearPromise(ctx, &s->read.result); } - if (TJS_IsPromisePending(ctx, &s->accept.result)) { - TJS_SettlePromise(ctx, &s->accept.result, 0, 1, &arg); - TJS_ClearPromise(ctx, &s->accept.result); + if (!JS_IsUndefined(s->listen.on_connection)) { + JS_FreeValue(ctx, s->listen.on_connection); + s->listen.on_connection = JS_UNDEFINED; } maybe_close(s); @@ -337,14 +337,8 @@ static void uv__stream_connect_cb(uv_connect_t *req, int status) { static void uv__stream_connection_cb(uv_stream_t *handle, int status) { TJSStream *s = handle->data; CHECK_NOT_NULL(s); - - if (!TJS_IsPromisePending(s->ctx, &s->accept.result)) { - // TODO - handle this. - return; - } JSContext *ctx = s->ctx; JSValue arg; - int is_reject = 0; if (status == 0) { TJSStream *t2; switch (handle->type) { @@ -364,15 +358,12 @@ static void uv__stream_connection_cb(uv_stream_t *handle, int status) { if (r != 0) { JS_FreeValue(ctx, arg); arg = tjs_new_error(ctx, r); - is_reject = 1; } } else { arg = tjs_new_error(ctx, status); - is_reject = 1; } - TJS_SettlePromise(ctx, &s->accept.result, is_reject, 1, &arg); - TJS_ClearPromise(ctx, &s->accept.result); + JS_Call(ctx, s->listen.on_connection, JS_UNDEFINED, 1, &arg); } static JSValue tjs_stream_listen(JSContext *ctx, JSValue this_val, int argc, JSValue *argv) { @@ -382,8 +373,13 @@ static JSValue tjs_stream_listen(JSContext *ctx, JSValue this_val, int argc, JSV return JS_EXCEPTION; } uint32_t backlog = 511; - if (!JS_IsUndefined(argv[0])) { - if (JS_ToUint32(ctx, &backlog, argv[0])) { + if (!JS_IsFunction(ctx, argv[0])) { + return JS_EXCEPTION; + } else { + s->listen.on_connection = JS_DupValue(ctx, argv[0]); + } + if (!JS_IsUndefined(argv[1])) { + if (JS_ToUint32(ctx, &backlog, argv[1])) { return JS_EXCEPTION; } } @@ -394,17 +390,6 @@ static JSValue tjs_stream_listen(JSContext *ctx, JSValue this_val, int argc, JSV return JS_UNDEFINED; } -static JSValue tjs_stream_accept(JSContext *ctx, JSValue this_val, int argc, JSValue *argv) { - JSClassID class_id; - TJSStream *s = JS_GetAnyOpaque(this_val, &class_id); - if (!s) { - return JS_EXCEPTION; - } - if (TJS_IsPromisePending(ctx, &s->accept.result)) { - return tjs_throw_errno(ctx, UV_EBUSY); - } - return TJS_InitPromise(ctx, &s->accept.result); -} static JSValue tjs_stream_set_blocking(JSContext *ctx, JSValue this_val, int argc, JSValue *argv) { JSClassID class_id; @@ -433,7 +418,7 @@ static JSValue tjs_init_stream(JSContext *ctx, JSValue obj, TJSStream *s) { s->read.b.len = 0; TJS_ClearPromise(ctx, &s->read.result); - TJS_ClearPromise(ctx, &s->accept.result); + s->listen.on_connection = JS_UNDEFINED; JS_SetOpaque(obj, s); return obj; @@ -441,7 +426,9 @@ static JSValue tjs_init_stream(JSContext *ctx, JSValue obj, TJSStream *s) { static void tjs_stream_finalizer(JSRuntime *rt, TJSStream *s) { if (s) { - TJS_FreePromiseRT(rt, &s->accept.result); + if (!JS_IsUndefined(s->listen.on_connection)) { + JS_FreeValueRT(rt, s->listen.on_connection); + } TJS_FreePromiseRT(rt, &s->read.result); JS_FreeValueRT(rt, s->read.b.tarray); s->finalized = 1; @@ -457,7 +444,6 @@ static void tjs_stream_mark(JSRuntime *rt, TJSStream *s, JS_MarkFunc *mark_func) if (s) { JS_MarkValue(rt, s->read.b.tarray, mark_func); TJS_MarkPromise(rt, &s->read.result, mark_func); - TJS_MarkPromise(rt, &s->accept.result, mark_func); } } @@ -901,7 +887,6 @@ static JSValue tjs_pipe_open(JSContext *ctx, JSValue this_val, int argc, JSValue /* clang-format off */ static const JSCFunctionListEntry tjs_stream_proto_funcs[] = { TJS_CFUNC_DEF("listen", 1, tjs_stream_listen), - TJS_CFUNC_DEF("accept", 0, tjs_stream_accept), TJS_CFUNC_DEF("shutdown", 0, tjs_stream_shutdown), TJS_CFUNC_DEF("setBlocking", 1, tjs_stream_set_blocking), TJS_CFUNC_DEF("close", 0, tjs_stream_close),