@commandkit/queue
The CommandKit Queue package provides a service-agnostic message queue API for inter-service communication. It allows you to send and receive messages between different parts of your application or across multiple services using a simple, unified interface.
Features
- Service Agnostic: Works with any message queue implementation
- Simple API: Easy-to-use
send
andreceive
functions - Type Safety: Full TypeScript support with strong typing
- Driver System: Pluggable drivers for different queue backends
- Discord.js Integration: Built-in support for Discord.js brokers
- Redis Support: Ready-to-use Redis PubSub driver
Installation
npm install @commandkit/queue
For Discord.js integration with Redis:
npm install @commandkit/queue @discordjs/brokers ioredis
Basic setup
Setting up the driver
Before you can send or receive messages, you need to set up a driver. The driver handles the actual message queue backend.
import { setDriver } from '@commandkit/queue';
import { RedisPubSubDriver } from '@commandkit/queue/discordjs';
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';
// Create a Redis connection
const redis = new Redis();
// Create a broker
const broker = new PubSubRedisBroker(redis);
// Create a driver
const driver = new RedisPubSubDriver(broker);
// Set the driver
setDriver(driver);
Redis configuration
You can configure Redis for different environments:
import Redis from 'ioredis';
// Local Redis
const redis = new Redis();
// Or with configuration
const redis = new Redis({
host: 'localhost',
port: 6379,
password: 'your-password',
db: 0,
});
// Cloud Redis (example with Redis Cloud)
const redis = new Redis({
host: 'your-redis-host.redis.cloud.com',
port: 6379,
password: 'your-redis-password',
tls: {},
});
Core concepts
Topics
Topics are named channels where messages are sent and received. They function as categories for different types of messages.
Messages
Messages can be any JSON-serializable data. They're sent to topics and received by subscribers.
Drivers
Drivers are implementations that handle the actual message queue backend. CommandKit provides a unified API that works with any driver.
Basic operations
Sending messages
Use the send
function to publish messages to a topic.
import { send } from '@commandkit/queue';
// Send a simple message
await send('user-events', { userId: '123', action: 'login' });
// Send different types of data
await send('notifications', {
type: 'welcome',
userId: '123',
message: 'Welcome to our platform!',
});
await send('analytics', {
event: 'page_view',
page: '/dashboard',
timestamp: Date.now(),
});
Receiving messages
Use the receive
function to subscribe to messages from a topic.
import { receive } from '@commandkit/queue';
// Basic message handling
await receive('user-events', (message) => {
console.log(`User ${message.userId} performed ${message.action}`);
});
// Handle different message types
await receive('notifications', (message) => {
switch (message.type) {
case 'welcome':
console.log(`Welcome message for user ${message.userId}`);
break;
case 'reminder':
console.log(`Reminder: ${message.message}`);
break;
}
});
// Async message handling
await receive('analytics', async (message) => {
await processAnalyticsEvent(message);
});
Type safety
You can define types for your messages to get better TypeScript support.
interface UserEvent {
userId: string;
action: 'login' | 'logout' | 'register';
timestamp?: number;
}
interface Notification {
type: 'welcome' | 'reminder' | 'alert';
userId: string;
message: string;
}
// Type-safe sending
await send('user-events', {
userId: '123',
action: 'login',
timestamp: Date.now(),
} as UserEvent);
// Type-safe receiving
await receive('user-events', (message: UserEvent) => {
console.log(`User ${message.userId} ${message.action}`);
});
You can also define typed events for better TypeScript support:
interface QueueEvents {
'user-updates': {
userId: string;
action: 'login' | 'logout' | 'register';
timestamp: number;
};
'guild-events': {
guildId: string;
event: 'member-join' | 'member-leave' | 'role-update';
data: any;
};
analytics: {
event: string;
data: Record<string, any>;
timestamp: number;
};
}
// Create a typed driver
const driver = new RedisPubSubDriver<QueueEvents>(broker);
setDriver(driver);
Discord.js integration
CommandKit Queue provides seamless integration with Discord.js through
the @discordjs/brokers
package and Redis PubSub.
Cross-shard communication
Send messages between different shards of your Discord.js application:
// In shard 0
await send('shard-communication', {
fromShard: 0,
toShard: 1,
type: 'user-status-update',
data: { userId: '123456789', status: 'online' },
});
// In shard 1 (receiving)
await receive('shard-communication', (message) => {
if (message.toShard === 1) {
console.log(
`Received from shard ${message.fromShard}:`,
message.data,
);
}
});
Multi-bot communication
Send messages between different Discord.js bots:
// In bot A
await send('bot-communication', {
fromBot: 'bot-a',
toBot: 'bot-b',
type: 'user-data-request',
data: { userId: '123456789' },
});
// In bot B
await receive('bot-communication', async (message) => {
if (message.toBot === 'bot-b') {
const userData = await getUserData(message.data.userId);
await send('bot-communication', {
fromBot: 'bot-b',
toBot: 'bot-a',
type: 'user-data-response',
data: userData,
});
}
});
Real-time updates
Send real-time updates to connected clients:
// When a user joins a voice channel
await send('voice-updates', {
userId: '123456789',
guildId: '987654321',
channelId: '111222333',
action: 'join',
timestamp: Date.now(),
});
// Handle voice updates
await receive('voice-updates', (message) => {
// Update voice channel status
updateVoiceChannelStatus(
message.guildId,
message.channelId,
message.userId,
message.action,
);
// Notify other users
notifyVoiceChannelUsers(
message.guildId,
message.channelId,
message,
);
});
Error handling
Always handle errors when sending or receiving messages.
// Handle Redis connection errors
redis.on('error', (error) => {
console.error('Redis connection error:', error);
});
redis.on('connect', () => {
console.log('Connected to Redis');
});
// Handle broker errors
broker.on('error', (error) => {
console.error('Broker error:', error);
});
// Handle message processing errors
await receive('user-updates', async (message) => {
try {
await processUserUpdate(message);
} catch (error) {
console.error('Failed to process user update:', error);
// Optionally retry or send to dead letter queue
await send('failed-messages', {
originalTopic: 'user-updates',
message,
error: error.message,
timestamp: Date.now(),
});
}
});
Best practices
Use descriptive topic names
// Good
await send('user-authentication-events', message);
await send('order-processing-updates', message);
// Avoid
await send('events', message);
await send('data', message);
Structure your messages
// Good - structured message
await send('user-events', {
type: 'login',
userId: '123',
timestamp: Date.now(),
});
// Avoid - unstructured
await send('user-events', 'user logged in');
Handle message processing gracefully
await receive('user-events', async (message) => {
try {
// Process the message
await processUserEvent(message);
// Acknowledge successful processing
console.log(`Processed event for user ${message.userId}`);
} catch (error) {
// Log error but don't crash
console.error(
`Failed to process event for user ${message.userId}:`,
error,
);
// Optionally retry or send to dead letter queue
await handleFailedMessage(message, error);
}
});
Use appropriate message sizes
// Good - reasonable message size
await send('user-profile-updates', {
userId: '123',
changes: {
displayName: 'New Name',
avatar: 'https://example.com/avatar.jpg',
},
});
// Avoid - very large messages
await send('user-profile-updates', {
userId: '123',
fullProfile: {
/* massive object */
},
});
Use cases
- Inter-Service Communication: Send messages between different parts of your application
- Event Broadcasting: Broadcast events to multiple subscribers
- Task Distribution: Distribute work across multiple workers
- Real-time Updates: Send real-time updates to connected clients
- Microservices: Enable communication between microservices
Cleanup
Always clean up resources when shutting down:
import { setDriver } from '@commandkit/queue';
// Cleanup function
async function cleanup() {
try {
// Close the driver
const driver = getDriver(); // You'll need to implement this
if (driver && driver.close) {
await driver.close();
}
// Close Redis connection
await redis.quit();
console.log('Queue cleanup completed');
} catch (error) {
console.error('Error during cleanup:', error);
}
}
// Handle graceful shutdown
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);