Skip to main content

commandkit/async-queue

An async queue manages task execution by processing them sequentially or with controlled concurrency. Tasks are queued and executed in order, with configurable limits on how many can run simultaneously.

Basic usage

The simplest way to use a queue is with the default sequential processing:

import { createAsyncQueue } from 'commandkit/async-queue';

// Create a sequential queue (default)
const queue = createAsyncQueue();

// Add tasks to the queue
const result1 = await queue.add(async () => {
return await processTask1();
});

const result2 = await queue.add(async () => {
return await processTask2();
});

Custom configuration

You can create a queue that processes multiple tasks at the same time:

import { createAsyncQueue } from 'commandkit/async-queue';

// Allow 3 concurrent tasks
const queue = createAsyncQueue({
concurrency: 3,
onDrain: () => {
console.log('All tasks completed');
},
});

// Tasks will run with up to 3 concurrent executions
const promises = [
queue.add(async () => await task1()),
queue.add(async () => await task2()),
queue.add(async () => await task3()),
queue.add(async () => await task4()),
];

const results = await Promise.all(promises);

Advanced usage

Queue control

You can pause and resume the queue, and check its status:

import { createAsyncQueue } from 'commandkit/async-queue';

const queue = createAsyncQueue({ concurrency: 2 });

// Pause the queue
queue.pause();

// Add tasks (they won't execute until resumed)
const promise1 = queue.add(async () => await task1());
const promise2 = queue.add(async () => await task2());

// Resume the queue
queue.resume();

// Check queue status
console.log(`Running: ${queue.getRunning()}`);
console.log(`Pending: ${queue.getPending()}`);
console.log(`Paused: ${queue.isPaused()}`);

Cancelling operations

You can cancel queue operations using an AbortSignal:

import { createAsyncQueue } from 'commandkit/async-queue';

// Create a timeout that cancels after 5 seconds
const signal = AbortSignal.timeout(5000);

const queue = createAsyncQueue({
concurrency: 2,
signal,
});

// Add tasks with individual timeouts
const task1Promise = queue.add(
async () => await longRunningTask1(),
AbortSignal.timeout(3000),
);

const task2Promise = queue.add(
async () => await longRunningTask2(),
AbortSignal.timeout(3000),
);

try {
await Promise.all([task1Promise, task2Promise]);
} catch (error) {
if (error.message.includes('aborted')) {
console.log('Queue was aborted');
}
}

Batch processing

Queues are great for processing large batches of items efficiently:

import { createAsyncQueue } from 'commandkit/async-queue';

const queue = createAsyncQueue({
concurrency: 5,
onDrain: () => console.log('Batch processing complete'),
});

// Process a large batch of items
const items = Array.from({ length: 100 }, (_, i) => i);

const processItem = async (item: number) => {
await new Promise((resolve) => setTimeout(resolve, 100));
return `Processed item ${item}`;
};

// Add all items to queue
const promises = items.map((item) =>
queue.add(() => processItem(item)),
);

// Wait for all to complete
const results = await Promise.all(promises);

Error handling

Queues handle errors gracefully without stopping other tasks:

import { createAsyncQueue } from 'commandkit/async-queue';

const queue = createAsyncQueue({ concurrency: 2 });

// Handle individual task errors
const results = await Promise.allSettled([
queue.add(async () => {
throw new Error('Task failed');
}),
queue.add(async () => {
return 'Task succeeded';
}),
queue.add(async () => {
return 'Another success';
}),
]);

results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Task ${index}: ${result.value}`);
} else {
console.log(`Task ${index}: ${result.reason.message}`);
}
});

Default settings

  • Concurrency: 1 (sequential processing)
  • Storage: In-memory
  • Abort Support: Yes