From 513e4d3f990c1a391adbf1f5a172bf69c3691f36 Mon Sep 17 00:00:00 2001 From: JOLIMAITRE Matthieu Date: Mon, 24 Jun 2024 02:44:07 +0200 Subject: [PATCH] add event and pool structures --- src/lib.ts | 2 ++ src/lib/AsyncPool.ts | 23 +++++++++++++++++++++++ src/lib/Event.ts | 27 +++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) create mode 100644 src/lib/AsyncPool.ts create mode 100644 src/lib/Event.ts diff --git a/src/lib.ts b/src/lib.ts index 16a6083..ba3c6d8 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -9,3 +9,5 @@ export { log_from, next, split_promise, wait } from "./lib/utils.ts"; export { Chain } from "./lib/Chain.ts"; export { Channel } from "./lib/Channel.ts"; export { Range, range } from "./lib/Range.ts"; +export { Event } from "./lib/Event.ts"; +export { AsyncPool } from "./lib/AsyncPool.ts"; diff --git a/src/lib/AsyncPool.ts b/src/lib/AsyncPool.ts new file mode 100644 index 0000000..63814d4 --- /dev/null +++ b/src/lib/AsyncPool.ts @@ -0,0 +1,23 @@ +import { Event } from "./Event.ts"; + +export class AsyncPool { + limit; + current = 0; + current_changed = new Event(); + + constructor(limit: number = 8) { + this.limit = limit; + } + + async run(task: () => unknown) { + while (this.current >= this.limit) await this.current_changed.wait(); + this.update_current(+1); + await task(); + this.update_current(-1); + } + + update_current(increment: number) { + this.current += increment; + this.current_changed.trigger(); + } +} diff --git a/src/lib/Event.ts b/src/lib/Event.ts new file mode 100644 index 0000000..1608b36 --- /dev/null +++ b/src/lib/Event.ts @@ -0,0 +1,27 @@ +import { split_promise } from "../lib.ts"; + +export class Event { + subscriber = [] as ((value: V) => unknown)[]; + + subscribe(callback: (value: V) => unknown) { + this.subscriber.push(callback); + } + + unsubscribe(callback: (value: V) => unknown) { + const index = this.subscriber.indexOf(callback); + if (index === -1) return false; + this.subscriber.splice(index, 1); + return true; + } + + trigger(value: V) { + for (const handler of this.subscriber) handler(value); + } + + async wait() { + const [promise, resolver] = split_promise(); + this.subscribe(resolver); + await promise; + this.unsubscribe(resolver); + } +}