这是indexloc提供的服务,不要输入任何密码
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions src/js/core/sockets.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { readableStreamForHandle, writableStreamForHandle } from './stream-utils

const core = globalThis[Symbol.for('tjs.internal.core')];

const kOnConnection = Symbol('kOnConnection');

export async function connect(transport, host, port, options = {}) {
const addr = await resolveAddress(transport, host, port);
Expand Down Expand Up @@ -67,18 +68,28 @@ export async function listen(transport, host, port, options = {}) {
}

handle.bind(addr, flags);
handle.listen(options.backlog);

return new Listener(handle);
const l=new Listener(handle);

handle.listen(handle=>{
l[kOnConnection](handle);
},options.backlog);

return l;
}

case 'pipe': {
const handle = new core.Pipe();

handle.bind(addr);
handle.listen(options.backlog);

return new Listener(handle);
const l=new Listener(handle);

handle.listen(handle=>{
l[kOnConnection](handle);
},options.backlog);

return l;
}

case 'udp': {
Expand Down Expand Up @@ -204,6 +215,16 @@ class Listener {
this[kHandle] = handle;
}

#handleQueue=[];
#acceptQueue=[];
[kOnConnection](handle) {
if (this.#acceptQueue.length>0) {
this.#acceptQueue.shift().resolve(handle);
} else {
this.#handleQueue.push(handle);
}
}

get localAddress() {
if (!this[kLocalAddress]) {
this[kLocalAddress] = this[kHandle].getsockname();
Expand All @@ -213,17 +234,36 @@ class Listener {
}

async accept() {
const handle = await this[kHandle].accept();
let handle;

if (this.#handleQueue.length>0) {
handle=this.#handleQueue.shift();
} else {
handle=await new Promise((resolve,reject)=>{
this.#acceptQueue.push({ resolve,reject });
});
}


if (typeof handle === 'undefined') {
return;
}

if (handle instanceof Error) {
throw handle;
}

return new Connection(handle);
}

close() {
this[kHandle].close();

for (let v1 of this.#acceptQueue) {
v1.reject(new Error('closed'));
}

this.#handleQueue=[];
}

// Async iterator.
Expand Down
49 changes: 17 additions & 32 deletions src/mod_streams.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ typedef struct {
TJSPromise result;
} read;
struct {
TJSPromise result;
} accept;
JSValue on_connection;
} listen;
} TJSStream;

typedef struct {
Expand Down Expand Up @@ -105,9 +105,9 @@ static JSValue tjs_stream_close(JSContext *ctx, JSValue this_val, int argc, JSVa
TJS_SettlePromise(ctx, &s->read.result, 0, 1, &arg);
TJS_ClearPromise(ctx, &s->read.result);
}
if (TJS_IsPromisePending(ctx, &s->accept.result)) {
TJS_SettlePromise(ctx, &s->accept.result, 0, 1, &arg);
TJS_ClearPromise(ctx, &s->accept.result);
if (!JS_IsUndefined(s->listen.on_connection)) {
JS_FreeValue(ctx, s->listen.on_connection);
s->listen.on_connection = JS_UNDEFINED;
}

maybe_close(s);
Expand Down Expand Up @@ -337,14 +337,8 @@ static void uv__stream_connect_cb(uv_connect_t *req, int status) {
static void uv__stream_connection_cb(uv_stream_t *handle, int status) {
TJSStream *s = handle->data;
CHECK_NOT_NULL(s);

if (!TJS_IsPromisePending(s->ctx, &s->accept.result)) {
// TODO - handle this.
return;
}
JSContext *ctx = s->ctx;
JSValue arg;
int is_reject = 0;
if (status == 0) {
TJSStream *t2;
switch (handle->type) {
Expand All @@ -364,15 +358,12 @@ static void uv__stream_connection_cb(uv_stream_t *handle, int status) {
if (r != 0) {
JS_FreeValue(ctx, arg);
arg = tjs_new_error(ctx, r);
is_reject = 1;
}
} else {
arg = tjs_new_error(ctx, status);
is_reject = 1;
}

TJS_SettlePromise(ctx, &s->accept.result, is_reject, 1, &arg);
TJS_ClearPromise(ctx, &s->accept.result);
JS_Call(ctx, s->listen.on_connection, JS_UNDEFINED, 1, &arg);
}

static JSValue tjs_stream_listen(JSContext *ctx, JSValue this_val, int argc, JSValue *argv) {
Expand All @@ -382,8 +373,13 @@ static JSValue tjs_stream_listen(JSContext *ctx, JSValue this_val, int argc, JSV
return JS_EXCEPTION;
}
uint32_t backlog = 511;
if (!JS_IsUndefined(argv[0])) {
if (JS_ToUint32(ctx, &backlog, argv[0])) {
if (!JS_IsFunction(ctx, argv[0])) {
return JS_EXCEPTION;
} else {
s->listen.on_connection = JS_DupValue(ctx, argv[0]);
}
if (!JS_IsUndefined(argv[1])) {
if (JS_ToUint32(ctx, &backlog, argv[1])) {
return JS_EXCEPTION;
}
}
Expand All @@ -394,17 +390,6 @@ static JSValue tjs_stream_listen(JSContext *ctx, JSValue this_val, int argc, JSV
return JS_UNDEFINED;
}

static JSValue tjs_stream_accept(JSContext *ctx, JSValue this_val, int argc, JSValue *argv) {
JSClassID class_id;
TJSStream *s = JS_GetAnyOpaque(this_val, &class_id);
if (!s) {
return JS_EXCEPTION;
}
if (TJS_IsPromisePending(ctx, &s->accept.result)) {
return tjs_throw_errno(ctx, UV_EBUSY);
}
return TJS_InitPromise(ctx, &s->accept.result);
}

static JSValue tjs_stream_set_blocking(JSContext *ctx, JSValue this_val, int argc, JSValue *argv) {
JSClassID class_id;
Expand Down Expand Up @@ -433,15 +418,17 @@ static JSValue tjs_init_stream(JSContext *ctx, JSValue obj, TJSStream *s) {
s->read.b.len = 0;

TJS_ClearPromise(ctx, &s->read.result);
TJS_ClearPromise(ctx, &s->accept.result);
s->listen.on_connection = JS_UNDEFINED;

JS_SetOpaque(obj, s);
return obj;
}

static void tjs_stream_finalizer(JSRuntime *rt, TJSStream *s) {
if (s) {
TJS_FreePromiseRT(rt, &s->accept.result);
if (!JS_IsUndefined(s->listen.on_connection)) {
JS_FreeValueRT(rt, s->listen.on_connection);
}
TJS_FreePromiseRT(rt, &s->read.result);
JS_FreeValueRT(rt, s->read.b.tarray);
s->finalized = 1;
Expand All @@ -457,7 +444,6 @@ static void tjs_stream_mark(JSRuntime *rt, TJSStream *s, JS_MarkFunc *mark_func)
if (s) {
JS_MarkValue(rt, s->read.b.tarray, mark_func);
TJS_MarkPromise(rt, &s->read.result, mark_func);
TJS_MarkPromise(rt, &s->accept.result, mark_func);
}
}

Expand Down Expand Up @@ -901,7 +887,6 @@ static JSValue tjs_pipe_open(JSContext *ctx, JSValue this_val, int argc, JSValue
/* clang-format off */
static const JSCFunctionListEntry tjs_stream_proto_funcs[] = {
TJS_CFUNC_DEF("listen", 1, tjs_stream_listen),
TJS_CFUNC_DEF("accept", 0, tjs_stream_accept),
TJS_CFUNC_DEF("shutdown", 0, tjs_stream_shutdown),
TJS_CFUNC_DEF("setBlocking", 1, tjs_stream_set_blocking),
TJS_CFUNC_DEF("close", 0, tjs_stream_close),
Expand Down
Loading