Skip to main content

@commandkit/queue

The CommandKit Queue package provides a service-agnostic message queue API for inter-service communication. It allows you to send and receive messages between different parts of your application or across multiple services using a simple, unified interface.

Features

  • Service Agnostic: Works with any message queue implementation
  • Simple API: Easy-to-use send and receive functions
  • Type Safety: Full TypeScript support with strong typing
  • Driver System: Pluggable drivers for different queue backends
  • Discord.js Integration: Built-in support for Discord.js brokers
  • Redis Support: Ready-to-use Redis PubSub driver

Installation

npm install @commandkit/queue

For Discord.js integration with Redis:

npm install @commandkit/queue @discordjs/brokers ioredis

Basic setup

Setting up the driver

Before you can send or receive messages, you need to set up a driver. The driver handles the actual message queue backend.

import { setDriver } from '@commandkit/queue';
import { RedisPubSubDriver } from '@commandkit/queue/discordjs';
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

// Create a Redis connection
const redis = new Redis();

// Create a broker
const broker = new PubSubRedisBroker(redis);

// Create a driver
const driver = new RedisPubSubDriver(broker);

// Set the driver
setDriver(driver);

Redis configuration

You can configure Redis for different environments:

import Redis from 'ioredis';

// Local Redis
const redis = new Redis();

// Or with configuration
const redis = new Redis({
host: 'localhost',
port: 6379,
password: 'your-password',
db: 0,
});

// Cloud Redis (example with Redis Cloud)
const redis = new Redis({
host: 'your-redis-host.redis.cloud.com',
port: 6379,
password: 'your-redis-password',
tls: {},
});

Core concepts

Topics

Topics are named channels where messages are sent and received. They function as categories for different types of messages.

Messages

Messages can be any JSON-serializable data. They're sent to topics and received by subscribers.

Drivers

Drivers are implementations that handle the actual message queue backend. CommandKit provides a unified API that works with any driver.

Basic operations

Sending messages

Use the send function to publish messages to a topic.

import { send } from '@commandkit/queue';

// Send a simple message
await send('user-events', { userId: '123', action: 'login' });

// Send different types of data
await send('notifications', {
type: 'welcome',
userId: '123',
message: 'Welcome to our platform!',
});

await send('analytics', {
event: 'page_view',
page: '/dashboard',
timestamp: Date.now(),
});

Receiving messages

Use the receive function to subscribe to messages from a topic.

import { receive } from '@commandkit/queue';

// Basic message handling
await receive('user-events', (message) => {
console.log(`User ${message.userId} performed ${message.action}`);
});

// Handle different message types
await receive('notifications', (message) => {
switch (message.type) {
case 'welcome':
console.log(`Welcome message for user ${message.userId}`);
break;
case 'reminder':
console.log(`Reminder: ${message.message}`);
break;
}
});

// Async message handling
await receive('analytics', async (message) => {
await processAnalyticsEvent(message);
});

Type safety

You can define types for your messages to get better TypeScript support.

interface UserEvent {
userId: string;
action: 'login' | 'logout' | 'register';
timestamp?: number;
}

interface Notification {
type: 'welcome' | 'reminder' | 'alert';
userId: string;
message: string;
}

// Type-safe sending
await send('user-events', {
userId: '123',
action: 'login',
timestamp: Date.now(),
} as UserEvent);

// Type-safe receiving
await receive('user-events', (message: UserEvent) => {
console.log(`User ${message.userId} ${message.action}`);
});

You can also define typed events for better TypeScript support:

interface QueueEvents {
'user-updates': {
userId: string;
action: 'login' | 'logout' | 'register';
timestamp: number;
};
'guild-events': {
guildId: string;
event: 'member-join' | 'member-leave' | 'role-update';
data: any;
};
analytics: {
event: string;
data: Record<string, any>;
timestamp: number;
};
}

// Create a typed driver
const driver = new RedisPubSubDriver<QueueEvents>(broker);
setDriver(driver);

Discord.js integration

CommandKit Queue provides seamless integration with Discord.js through the @discordjs/brokers package and Redis PubSub.

Cross-shard communication

Send messages between different shards of your Discord.js application:

// In shard 0
await send('shard-communication', {
fromShard: 0,
toShard: 1,
type: 'user-status-update',
data: { userId: '123456789', status: 'online' },
});

// In shard 1 (receiving)
await receive('shard-communication', (message) => {
if (message.toShard === 1) {
console.log(
`Received from shard ${message.fromShard}:`,
message.data,
);
}
});

Multi-bot communication

Send messages between different Discord.js bots:

// In bot A
await send('bot-communication', {
fromBot: 'bot-a',
toBot: 'bot-b',
type: 'user-data-request',
data: { userId: '123456789' },
});

// In bot B
await receive('bot-communication', async (message) => {
if (message.toBot === 'bot-b') {
const userData = await getUserData(message.data.userId);

await send('bot-communication', {
fromBot: 'bot-b',
toBot: 'bot-a',
type: 'user-data-response',
data: userData,
});
}
});

Real-time updates

Send real-time updates to connected clients:

// When a user joins a voice channel
await send('voice-updates', {
userId: '123456789',
guildId: '987654321',
channelId: '111222333',
action: 'join',
timestamp: Date.now(),
});

// Handle voice updates
await receive('voice-updates', (message) => {
// Update voice channel status
updateVoiceChannelStatus(
message.guildId,
message.channelId,
message.userId,
message.action,
);

// Notify other users
notifyVoiceChannelUsers(
message.guildId,
message.channelId,
message,
);
});

Error handling

Always handle errors when sending or receiving messages.

// Handle Redis connection errors
redis.on('error', (error) => {
console.error('Redis connection error:', error);
});

redis.on('connect', () => {
console.log('Connected to Redis');
});

// Handle broker errors
broker.on('error', (error) => {
console.error('Broker error:', error);
});

// Handle message processing errors
await receive('user-updates', async (message) => {
try {
await processUserUpdate(message);
} catch (error) {
console.error('Failed to process user update:', error);

// Optionally retry or send to dead letter queue
await send('failed-messages', {
originalTopic: 'user-updates',
message,
error: error.message,
timestamp: Date.now(),
});
}
});

Best practices

Use descriptive topic names

// Good
await send('user-authentication-events', message);
await send('order-processing-updates', message);

// Avoid
await send('events', message);
await send('data', message);

Structure your messages

// Good - structured message
await send('user-events', {
type: 'login',
userId: '123',
timestamp: Date.now(),
});

// Avoid - unstructured
await send('user-events', 'user logged in');

Handle message processing gracefully

await receive('user-events', async (message) => {
try {
// Process the message
await processUserEvent(message);

// Acknowledge successful processing
console.log(`Processed event for user ${message.userId}`);
} catch (error) {
// Log error but don't crash
console.error(
`Failed to process event for user ${message.userId}:`,
error,
);

// Optionally retry or send to dead letter queue
await handleFailedMessage(message, error);
}
});

Use appropriate message sizes

// Good - reasonable message size
await send('user-profile-updates', {
userId: '123',
changes: {
displayName: 'New Name',
avatar: 'https://example.com/avatar.jpg',
},
});

// Avoid - very large messages
await send('user-profile-updates', {
userId: '123',
fullProfile: {
/* massive object */
},
});

Use cases

  • Inter-Service Communication: Send messages between different parts of your application
  • Event Broadcasting: Broadcast events to multiple subscribers
  • Task Distribution: Distribute work across multiple workers
  • Real-time Updates: Send real-time updates to connected clients
  • Microservices: Enable communication between microservices

Cleanup

Always clean up resources when shutting down:

import { setDriver } from '@commandkit/queue';

// Cleanup function
async function cleanup() {
try {
// Close the driver
const driver = getDriver(); // You'll need to implement this
if (driver && driver.close) {
await driver.close();
}

// Close Redis connection
await redis.quit();

console.log('Queue cleanup completed');
} catch (error) {
console.error('Error during cleanup:', error);
}
}

// Handle graceful shutdown
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);