AQ
An Asynchronous Queue Implementation with Synchronous / Asynchronous Enqueueing of Synchronous / Asynchronous Jobs for JS / TS (Node / Deno)
AQ is a lightweight asynchronous queue implementation with no dependencies. You may enqueue synchronous or asynchronous items either synchronously or asynchronously.
As of v0.5.0, AQ utilizes modern ES2024 features such as Promise.withResolvers and Explicit Resource Management (Symbol.asyncDispose). Ensure your environment supports these features (Deno 1.42+, Node.js 22+, Chrome 119+, Edge 119+). This version brings improved stability and memory management. While the library retains some console.log statements for execution visibility, it is structurally robust and memory-safe.
Recent Updates & Fixes
- Memory Leak Fixed: A critical issue causing infinite retention of linked nodes has been resolved. Processed items are now properly garbage collected.
- Stability: Fixed a guaranteed crash when the queue became empty during consumption.
- Timeout Logic: Fixed timers not being cleared after promise resolution, preventing resource leaks.
- Performance: Optimized class definitions and promise creation using modern patterns (
Promise.withResolvers). - Flush: The
.flush()method now returns a structured object{ resolved: [], rejected: [], aborted: [], pending: [] }instead of a flat array. - Standard Compliance: Better handling of âThenablesâ and strictly compliant Promises.
Functionality
- AQ is a âkind of relaxedâ FIFO queue structure which can take both asynchronous and synchronous items at the same time. Sync or async, all
enqueued items are wrapped by a promise (outer promise). A resolution of the inner promise (enqueued item) triggers the previous outer promise in the queue to be resolved. Since the previous inner promise is doing the same thing, this interlaced async chaining mechanism forms the basis of an uninterrupted continuum. Such as when the queue becomes empty (when all inner promises are depleted) there is still one outer promise yielded at the tip of the queue, awaiting for a resolution or rejection. AQ will remain there, keeping the queue alive up until you.kill()AQ abrubtly. At the meantime you may safelyenqueuenew items asynchronously regardless the queue had become empty or not. - AQ is a âkind of relaxedâ FIFO queue structure which can take both asynchronous and synchronous items at the same time. Sync or async, all
enqueued items are wrapped by a promise (outer promise). A resolution of the inner promise (enqueued item) triggers the previous outer promise in the queue to be resolved. Since the previous inner promise is doing the same thing, this interlaced async chaining mechanism forms the basis of an uninterrupted continuum. Such as when the queue becomes empty (when all inner promises are depleted) there is still one outer promise yielded at the tip of the queue, awaiting for a resolution or rejection. AQ will remain there, keeping the queue alive up until you.kill()AQ abruptly. At the meantime you may safelyenqueuenew items asynchronously regardless the queue had become empty or not. - The item at the head of the queue gets automatically dequeued once it resolves or instantly if itâs already in the resolved state. Under normal operation all other items in the queue must wait until they become the head to be dequeued. So there is no
dequeuemethod in AQ at all. - AQ can also be used as a race machine. In
raceMode = truecase all pending items up to the the first resolving promise get wiped out to allow the winner item to dequeue. However unlikePromise.race()the itemsenqueued after the winner will remain in the queue. This is particularly so since the ones coming after the winner might have been asynchronously enqueued at a later time. The late ones should be granted with their chances. - In the basic operation rejections are handled inside the queue silently. This also means while consuming from AQ instances you donât need to deploy a
try&catchfunctionality. However in order to capture the rejections you can watch them by registering an eventlistener function to the"error"event to see why and which promise was rejected. - There are four events those can be listened by eventlistener functions such as
"next","error","reply"and"empty". The eventlistener functions can be added / removed freely at any time. Every event can hold multiple eventlistener functions. The eventlistener functions can also be anonymous and they can still be removed because all eventlistener functions are assigned with a unique id. - Once an AQ instance is initiated it will remain being available for asynchronous
enqueueing even if in time it becomes empty. So you may keep it alive indefinitely or just.kill()if itâs no longer needed. - Being an async iterator, AQ instances are ideally consumed by a
for await ofloop.
Importing
Via JSR (Recommended)
deno add @kedicesur/aqimport { AQ } from "@kedicesur/aq";Via URL
import { AQ } from "https://jsr.io/@kedicesur/aq/0.5.0/mod.ts";to your script. To test AQ in your projects please follow the instructions [down here](#strongtestingstrong #testing).
Instances
The AQ constructor takes an options argument and simply called as,
var aq = new AQ(opts);and consumed like
async function getAsyncValues(aq){
for await (let item of aq){
console.log(`The Promise at endpoint resolved with a value of "${item}"`);
};
};The options object argument (opts above) could take the following shape
var opts = { timeout : 200
, clearMode: "soft"
, raceMode : false
};timeout: Since AQ instances getdequeued automatically one thing that we shall avoid is to have an indefinitelly pending promise in the queue. Thetimeoutproperty is ofNumbertype and defines a duration in ms.timeoutshould always be provided unless you are sure that allenqueued items are either synchronous or promises those will certainly resolve or reject within a reasonable time.clearMode: AQ instances have a.clear(clearMode)method used to clear the queue at any time.clearModecan take only two values.<"hard" | "soft">."hard"clears all the items in the queue regardless their state."soft"clears all the items in the queue except for the ones in resolved state.
raceMode: Boolean<true | false>option is used to switch the queue into the race mode. In race mode the queue is cleared only upto the first resolving item in the queue. Once the first resolving item is dequeued the queue now contains only the recent items thoseenqueued after the resolving item and remains available for furtherenqueueing operations.
Explicit Resource Management
AQ implements Symbol.asyncDispose (ES2024), enabling automatic cleanup of resources (timers, internal references) when used with the await using syntax.
{
await using aq = new AQ({ timeout: 5000 });
aq.enqueue(fetch("..."));
// ... work with queue
} // aq.kill() is automatically called hereMethods
As of v0.4.1 the following methods are available
.enqueue(item): Inserts an item to the end of the queue and increments thesizeof the queue. The return value is apanelobject. Thepanelobject has three properties and a method as followsitem: A reference to the enqueued item itself.state: Shows the current state of the item in the queue such as"pending","resolved","rejected"or"aborted".id: An ID to the item which can be checked against thepidproperty of theErrorobject caught at the"error"eventlistener function. This can be useful to retry a particular async call or whatnot..abort(): Method is used to manually abort a pending item prematurely whenever needed. An aborted item will not be dequeued.
.clear("hard" | "soft" | "upto", [targetId]): Depending on the provided argument clears the queue accordingly;"hard": Clears the queue completelly."soft": Clears the queue but leaves the already resolved items"upto": Clears the queue up to the item with theidmaching the provided optionaltargetIdargument. The items coming after are kept.
The return value is the current AQ instance.
.flush(): Similar to.clear("hard")but returns a structured object{ resolved: [], rejected: [], aborted: [] }containing items in their respective states. This provides better visibility than the previous flat array return..on("event"): Adds or removes eventlisteners. You can add multiple eventlisteners per event. AQ instances can take four event types"next"event is fired per successfull yielding of a pending or an already resolved item at the head of the queue. Some pending items at the head might of course get rejected and the"next"event wonât fire for rejections. The uniqueidof the resolving promise is passed to the eventhandler."error"event is fired once an item in the queue gets rejected. Anerrorobject is passed to the event handler. Theerrorobject can take the shape as follows;
{ name : "Timeout" // name of the error , message: "Promise timed out before fullfillment" // descrtiption , pid : "D8VJQ7ZMIDA" // the ID of the rejected or aborted promise }
"empty"event is fired whenever the queue becomes empty."reply"event is fired whenever a promise in the queue gets resolved and areplyobject is passed to the eventhandler. Thereplyobject has two properties."value"which holds the resolved value and"pid"which holds the uniqueidof the resolving promise.
When invoked, the
.on("event")returns an object with two methods..do(f)and.forget(id)whereasfis a function andidis a unique id string..on("event").do(f): Such asvar id = aq.on("error").do(e => doSomethingWith(e));. The return value will be a Unique Id String like"4E34SIO5X56"even if your have provided an anonymous function. This Unique Id String can be saved to remove a particular eventlistener at a later time..on("event").forget(id): Such asaq.on("error").forget("4E34SIO5X56");which will, if exists, remove the eventlistener function with the correspoing ID string from the eventlisteners list of that particular event. The return value is eithertrueorfalsedepending on the outcome.
Properties
As of v0.4.1, AQ instances have only one read only property which is .size that gives you the number of items in the queue.
Use Cases
AQ is a very lightweight tool but at the same time like a Swiss Army Knife, it will allow you to perform many interesting tasks easily. It doesnât offer a big API just because I want to keep it as simple as possible while being functional. This means, you may easily extend AQâs functionalities by using itâs availabe methods cleverly. Having said that, there already exists many built in asynchronous capabilities in JS/TS language so you should consider using them in the first place. However only when some exceptional cases arise where the naked Promises are not sufficient then you may consider using AQ. The point being, all Promise methods are supplied with asynchronous tasks synchronously, while AQ can always be enqueued with asynchronous tasks asynchronously whenever you have something to enqueue.
Let us start with the case where you provide your asynchronous tasks synchronously.
Sync Input - Async Output Task Queue
This is the basic operation where we
enqueuethe async tasks synchronously. In this case the main differences compared to Promise methods are;- The whole system wonât collapse all at once when a Promise fails.
- You have full control on aborting at a certain
timeoutvalue. - You have full control on taking an action such as retrying by using the
"error"event.
In other words you simply
enqueuethe Promises sequentially and then;- Some of them may timeout and you may not care.
- You may register an
"error"eventlistener to take the necessary action such as retrying in case a Promise in AQ fails. - You may register a
"next"eventlistener to take necessary action once a specific Promise resolves. Perhaps you may like to do aaq.clear("hard")in the eventlisterner function and abort the remaining promises.
Assuming that we already have an array of multiple
urlsto fetch from an API, a simpleretryfor 5 times errorhandler may look like;var aq = new AQ({timeout: 80}), tryCnt = 5, panels = [], retry = e => { const ix = panels.findIndex(p => p.id === e.pid), tc = panels[ix].tryCnt; ix >= 0 && tc && ( panels[ix] = aq.enqueue(fetch(urls[ix])) , panels[ix].tryCnt = tc - 1 , console.log(`Retry #${(tryCnt-panels[ix].tryCnt).toString().padStart(2," ")} for "${urls[ix]}"`) ); }; getAsyncValues(aq); aq.on("error").do(retry); // Literature BITCH..! panels = urls.map(url => { const panel = aq.enqueue(fetch(url)); panel.tryCnt = tryCnt; return panel; });
In the above
retryattempt we are utilizing the AQ functionalities at hand such as the"error"eventlistener and thepanelobject however in future releases AQ may employ a built in retry functionality. This is also a good place to point out some importantfetch()API rejection cases. Keep in mind that thefetch()API rejections are only limited with network errors and all server side errors resolve within the response object. You should check theokproperty of the receivedresponseobject in the consumingfor awaitloop, in a similar manner to the following;async function getAsyncValues(aq){ for await (const res of aq){ res.ok ? res.json() .then(json => doSomethingWith(json)) : handleServerError(res); } console.log("The stream is finalized"); }
Also not all JS/TS asynchronous functionalities serve you with Promises. One example could be the Workers API. You may easily promisify your worker tasks like
var wp = new Promise((v,x) => ( firstWorker.onmessage(v) , firstWorker.onmessageerror(x) ));
Async Input - Async Output Task Queue
Say you want to make Short Polling requests to an API once in every 500ms and you would like to use the first resolving response. Think like, the request that you made @0ms happens to resolve @1000ms and for some reason the one that you make @500ms resolves @900ms. You are interested in the second one since it gives you the most fresh state from the API. At this point you no longer need the first request and itâs best to get rid of it. Then just add to your
optionsobject{raceMode: true}and keepenqueueing your requests (polling) indefinitely at every 500ms. You will get the most fresh resolutions continusously and the previouslyenqueued slower ones will be wiped out. Awesome..!
Testing
AQ is being developed on Deno. So I would advise you to install and use Deno, at least for your experiments. Working on Deno projects, Denon is the tool which does the job what Nodemon does in Node. So go ahead and simply install Denon too as shown in itâs page. The following is the scripts.json file used by Denon in this project.
{
"$schema": "https://deno.land/x/denon@2.4.7/schema.json",
"scripts": {
"start": {
"cmd": "deno run app.js",
"desc": "run my app.js file"
},
"test": {
"cmd": "deno run --inspect ./test/test-async-queue.js",
"desc": "run the test file"
},
"race": {
"cmd": "deno run ./test/test-AQ-race.js",
"desc": "run the race test"
},
"retry": {
"cmd": "deno run --allow-net ./test/test-AQ-retry.js",
"desc": "run the retry test"
}
}
}Now in the project folder run the tests like
# to run test-async-queue.js
$ denon test
# to run test-Aq-race.js
$ denon race
# to run test-AQ-retry.js
$ denon retryand play with their source code as you like.
TODO
- Implementing an AQ native interface for Fetch API to include auto retry functionality etc.
- Implementing an AQ native interface for Web Workers API.
- Implementing an AQ native interface for Broadcast Channel API.
Contribution
I hope to have PRs inline with the fancy wide indenting of this code or i will have to rephrase them and it will make me a dull boy. Also, if you may, please pay attention to the followings,
- We are not using any
ifclauses unless it is essential like in the case ofthrowing errors. Please try to use ternary with proper indenting instead. - For single choice conditionals please try to use shortcircuits.
- If you have multiple instructions to do after the conditional then use the comma operator to group them like.
ifTrue && ( firstDoThis
, thenDoThis
, andReturnThis
);- Use arrow functions whenever possible.
- Any bleeding edge JS/TS functionalities are welcome if need be. AQ is not thought to be backward compatible.
License
CopyrightŠ 2021, Redu. Relesed under GNU General Public License v3.0