Skip to content

Commit

Permalink
worker: support sending SharedArrayBuffer and enable Atomics.wait
Browse files Browse the repository at this point in the history
  • Loading branch information
saghul committed Jul 16, 2024
1 parent 460bf7b commit 6b5e302
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/private.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ int tjs__eval_bytecode(JSContext *ctx, const uint8_t *buf, size_t buf_len, bool

void tjs__destroy_timers(TJSRuntime *qrt);

void tjs__sab_free(void *opaque, void *ptr);
void tjs__sab_dup(void *opaque, void *ptr);

uv_loop_t *TJS_GetLoop(TJSRuntime *qrt);
TJSRuntime *TJS_NewRuntimeWorker(void);
TJSRuntime *TJS_NewRuntimeInternal(bool is_worker, TJSRunOptions *options);
Expand Down
6 changes: 4 additions & 2 deletions src/vm.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ static void *tjs__sab_alloc(void *opaque, size_t size) {
return sab->buf;
}

static void tjs__sab_free(void *opaque, void *ptr) {
void tjs__sab_free(void *opaque, void *ptr) {
TJSSABHeader *sab = (TJSSABHeader *) ((uint8_t *) ptr - sizeof(TJSSABHeader));
if (sab->magic != TJS__SAB_MAGIC)
return;
Expand All @@ -127,7 +127,7 @@ static void tjs__sab_free(void *opaque, void *ptr) {
tjs__free(sab);
}

static void tjs__sab_dup(void *opaque, void *ptr) {
void tjs__sab_dup(void *opaque, void *ptr) {
TJSSABHeader *sab = (TJSSABHeader *) ((uint8_t *) ptr - sizeof(TJSSABHeader));
if (sab->magic != TJS__SAB_MAGIC)
return;
Expand Down Expand Up @@ -299,7 +299,9 @@ TJSRuntime *TJS_NewRuntimeInternal(bool is_worker, TJSRunOptions *options) {
/* SharedArrayBuffer functions */
JS_SetSharedArrayBufferFunctions(rt, &tjs_sf);

/* Worker support */
qrt->is_worker = is_worker;
JS_SetCanBlock(rt, is_worker);

CHECK_EQ(uv_loop_init(&qrt->loop), 0);

Expand Down
21 changes: 17 additions & 4 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,20 @@ static void uv__read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
CHECK_EQ(p->reading.nread, total_size);

/* We have a complete buffer now. */
int flags = JS_READ_OBJ_REFERENCE;
JSValue obj = JS_ReadObject(ctx, (const uint8_t *) p->reading.data, total_size, flags);
JSSABTab sab_tab;
int flags = JS_READ_OBJ_SAB | JS_READ_OBJ_REFERENCE;
JSValue obj = JS_ReadObject2(ctx, (const uint8_t *) p->reading.data, total_size, flags, &sab_tab);
if (JS_IsException(obj))
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE_ERROR, JS_GetException(ctx));
else
emit_msgpipe_event(p, MSGPIPE_EVENT_MESSAGE, obj);
JS_FreeValue(ctx, obj);

/* Decrement the SAB reference counts. */
for (int i = 0; i < sab_tab.len; i++) {
tjs__sab_free(NULL, sab_tab.tab[i]);
}

js_free(ctx, p->reading.data);
memset(&p->reading, 0, sizeof(p->reading));
}
Expand Down Expand Up @@ -251,8 +257,9 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg
return JS_EXCEPTION;

size_t len;
int flags = JS_WRITE_OBJ_REFERENCE | JS_WRITE_OBJ_STRIP_SOURCE;
uint8_t *buf = JS_WriteObject(ctx, &len, argv[0], flags);
int flags = JS_WRITE_OBJ_SAB | JS_WRITE_OBJ_REFERENCE | JS_WRITE_OBJ_STRIP_SOURCE;
JSSABTab sab_tab;
uint8_t *buf = JS_WriteObject2(ctx, &len, argv[0], flags, &sab_tab);
if (!buf) {
js_free(ctx, wr);
return JS_EXCEPTION;
Expand All @@ -268,10 +275,16 @@ static JSValue tjs_msgpipe_postmessage(JSContext *ctx, JSValue this_val, int arg
if (r != 0) {
js_free(ctx, buf);
js_free(ctx, wr);
js_free(ctx, sab_tab.tab);

return tjs_throw_errno(ctx, r);
}

/* Increment the SAB reference counts. */
for (int i = 0; i < sab_tab.len; i++) {
tjs__sab_dup(NULL, sab_tab.tab[i]);
}

return JS_UNDEFINED;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/test-worker-large-payload.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const w = new Worker(path.join(import.meta.dirname, 'helpers', 'worker-echo.js')
const timer = setTimeout(() => {
w.terminate();
assert.fail('Timeout out waiting for worker');
}, 10000);
}, 1000);
w.onmessage = event => {
clearTimeout(timer);
const recvData = event.data;
Expand Down

0 comments on commit 6b5e302

Please sign in to comment.