Message Queue Integration for ChatGPT Apps: RabbitMQ & AWS SQS Guide

Building production-grade ChatGPT applications requires robust asynchronous processing capabilities. When your app needs to handle long-running AI operations, process thousands of user requests, or coordinate complex workflows, message queues become essential infrastructure.

Message queues enable your ChatGPT app to decouple request processing from response delivery, ensuring reliability even under heavy load. They provide guaranteed delivery, automatic retries, and the ability to scale processing independently from user-facing components.

Why message queues matter for ChatGPT apps:

  • Async AI processing: OpenAI API calls can take 5-30 seconds for complex operations
  • Rate limit management: Queue requests to stay within OpenAI's token-per-minute limits
  • Fault tolerance: Retry failed operations automatically with exponential backoff
  • Load balancing: Distribute work across multiple processing workers
  • Workflow orchestration: Chain multiple AI operations together reliably

This guide covers production-ready integration patterns for RabbitMQ (self-hosted, full-featured) and AWS SQS (managed service, serverless-friendly), with real code examples you can deploy today.

Whether you're building a ChatGPT app for customer service automation or a complex multi-agent system, message queues provide the reliability foundation you need.

For comprehensive architectural patterns, see our ChatGPT Applications Guide.


Queue Architecture Fundamentals

Before implementing message queues, understanding core concepts prevents costly architectural mistakes:

Queue vs Topic vs Stream

Queue (Point-to-Point):

  • One message → one consumer
  • Work distribution pattern
  • Use case: Task processing, job execution
  • Example: Queue AI summarization requests

Topic (Pub/Sub):

  • One message → multiple subscribers
  • Event broadcasting pattern
  • Use case: Event notifications, webhooks
  • Example: Notify analytics + billing when conversation completes

Stream (Event Log):

  • Ordered, replayable event sequence
  • All consumers see all messages
  • Use case: Event sourcing, audit logs
  • Example: Conversation history reconstruction

Message Durability & Acknowledgment

Durability levels:

  • Transient: Lost if broker restarts (fast, risky)
  • Persistent: Survives restarts (slower, reliable)
  • Replicated: Distributed across nodes (slowest, most reliable)

Acknowledgment patterns:

  • Auto-ack: Message deleted on delivery (fast, can lose data)
  • Manual ack: Deleted after processing confirmation (reliable)
  • Batch ack: Acknowledge multiple messages at once (efficient)

For ChatGPT apps processing valuable user data, always use persistent messages with manual acknowledgment.

Dead Letter Queues (DLQ)

Failed messages must go somewhere. Dead letter queues capture:

  • Messages that exceed retry limits
  • Malformed/unparseable requests
  • Operations that always fail (e.g., invalid API keys)

DLQ strategy:

  1. Route failed messages to DLQ after N retries
  2. Alert ops team via monitoring
  3. Manual review and reprocessing
  4. Fix root cause before requeuing

Production ChatGPT apps should monitor DLQ depth as a critical metric.

For error handling strategies, see Error Handling & Recovery Patterns for ChatGPT Apps.


RabbitMQ Integration for ChatGPT Apps

RabbitMQ provides the most feature-rich queueing system with advanced routing, exchange types, and plugin ecosystem. Ideal for self-hosted deployments or Kubernetes environments.

RabbitMQ Producer Implementation

This producer publishes ChatGPT processing requests with full error handling, connection pooling, and confirm mode:

// rabbitmq-producer.ts - ChatGPT Task Publisher
import * as amqp from 'amqplib';
import { EventEmitter } from 'events';

interface ChatGPTTask {
  taskId: string;
  userId: string;
  conversationId: string;
  messages: Array<{ role: string; content: string }>;
  model: string;
  temperature?: number;
  maxTokens?: number;
  timestamp: number;
  priority?: number; // 0-10, higher = more urgent
}

interface PublisherConfig {
  url: string;
  exchange: string;
  exchangeType: 'direct' | 'topic' | 'fanout' | 'headers';
  routingKey: string;
  persistent: boolean;
  confirmMode: boolean;
}

export class RabbitMQPublisher extends EventEmitter {
  private connection: amqp.Connection | null = null;
  private channel: amqp.ConfirmChannel | null = null;
  private config: PublisherConfig;
  private reconnecting: boolean = false;

  constructor(config: PublisherConfig) {
    super();
    this.config = config;
  }

  async connect(): Promise<void> {
    try {
      // Connection options with heartbeat and reconnect
      this.connection = await amqp.connect(this.config.url, {
        heartbeat: 30,
        timeout: 10000,
      });

      this.connection.on('error', (err) => {
        console.error('RabbitMQ connection error:', err);
        this.handleConnectionError(err);
      });

      this.connection.on('close', () => {
        console.warn('RabbitMQ connection closed, reconnecting...');
        this.reconnect();
      });

      // Create confirm channel for guaranteed delivery
      this.channel = await this.connection.createConfirmChannel();

      // Declare exchange (idempotent operation)
      await this.channel.assertExchange(
        this.config.exchange,
        this.config.exchangeType,
        {
          durable: true,
          autoDelete: false,
        }
      );

      // Set prefetch for flow control
      await this.channel.prefetch(100);

      console.log('RabbitMQ publisher connected');
      this.emit('connected');
    } catch (error) {
      console.error('Failed to connect to RabbitMQ:', error);
      throw error;
    }
  }

  async publish(task: ChatGPTTask): Promise<boolean> {
    if (!this.channel) {
      throw new Error('Publisher not connected');
    }

    const messageBuffer = Buffer.from(JSON.stringify(task));

    const options: amqp.Options.Publish = {
      persistent: this.config.persistent,
      contentType: 'application/json',
      contentEncoding: 'utf-8',
      timestamp: Date.now(),
      messageId: task.taskId,
      priority: task.priority || 5,
      headers: {
        userId: task.userId,
        conversationId: task.conversationId,
        model: task.model,
        retryCount: 0,
      },
    };

    return new Promise((resolve, reject) => {
      this.channel!.publish(
        this.config.exchange,
        this.config.routingKey,
        messageBuffer,
        options,
        (err, ok) => {
          if (err) {
            console.error('Publish failed:', err);
            this.emit('error', err);
            reject(err);
          } else {
            console.log(`Published task ${task.taskId}`);
            this.emit('published', task.taskId);
            resolve(true);
          }
        }
      );
    });
  }

  private async reconnect(): Promise<void> {
    if (this.reconnecting) return;

    this.reconnecting = true;
    let attempts = 0;
    const maxAttempts = 10;
    const baseDelay = 1000;

    while (attempts < maxAttempts) {
      try {
        attempts++;
        const delay = Math.min(baseDelay * Math.pow(2, attempts), 30000);

        console.log(`Reconnect attempt ${attempts}/${maxAttempts} in ${delay}ms`);
        await new Promise(resolve => setTimeout(resolve, delay));

        await this.connect();
        this.reconnecting = false;
        return;
      } catch (error) {
        console.error(`Reconnect attempt ${attempts} failed:`, error);
      }
    }

    this.reconnecting = false;
    this.emit('reconnectFailed');
    throw new Error('Max reconnection attempts reached');
  }

  private handleConnectionError(error: Error): void {
    this.connection = null;
    this.channel = null;
    this.emit('disconnected', error);
  }

  async close(): Promise<void> {
    if (this.channel) {
      await this.channel.close();
    }
    if (this.connection) {
      await this.connection.close();
    }
  }
}

// Usage example
const publisher = new RabbitMQPublisher({
  url: process.env.RABBITMQ_URL || 'amqp://localhost',
  exchange: 'chatgpt.tasks',
  exchangeType: 'topic',
  routingKey: 'chatgpt.process',
  persistent: true,
  confirmMode: true,
});

await publisher.connect();

const task: ChatGPTTask = {
  taskId: crypto.randomUUID(),
  userId: 'user_123',
  conversationId: 'conv_456',
  messages: [
    { role: 'user', content: 'Summarize this document...' }
  ],
  model: 'gpt-4',
  temperature: 0.7,
  maxTokens: 500,
  timestamp: Date.now(),
  priority: 8,
};

await publisher.publish(task);

RabbitMQ Consumer with Retry Logic

This consumer processes ChatGPT tasks with exponential backoff retry and dead letter handling:

// rabbitmq-consumer.ts - ChatGPT Task Processor
import * as amqp from 'amqplib';
import OpenAI from 'openai';

interface ConsumerConfig {
  url: string;
  exchange: string;
  queue: string;
  routingKey: string;
  dlxExchange: string;
  dlxQueue: string;
  maxRetries: number;
  prefetchCount: number;
}

export class RabbitMQConsumer {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;
  private config: ConsumerConfig;
  private openai: OpenAI;

  constructor(config: ConsumerConfig) {
    this.config = config;
    this.openai = new OpenAI({
      apiKey: process.env.OPENAI_API_KEY,
    });
  }

  async connect(): Promise<void> {
    this.connection = await amqp.connect(this.config.url);
    this.channel = await this.connection.createChannel();

    // Set prefetch to control concurrent processing
    await this.channel.prefetch(this.config.prefetchCount);

    // Declare dead letter exchange
    await this.channel.assertExchange(this.config.dlxExchange, 'direct', {
      durable: true,
    });

    // Declare dead letter queue
    await this.channel.assertQueue(this.config.dlxQueue, {
      durable: true,
    });

    await this.channel.bindQueue(
      this.config.dlxQueue,
      this.config.dlxExchange,
      'failed'
    );

    // Declare main exchange
    await this.channel.assertExchange(this.config.exchange, 'topic', {
      durable: true,
    });

    // Declare main queue with DLX configuration
    await this.channel.assertQueue(this.config.queue, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': this.config.dlxExchange,
        'x-dead-letter-routing-key': 'failed',
      },
    });

    await this.channel.bindQueue(
      this.config.queue,
      this.config.exchange,
      this.config.routingKey
    );

    console.log('RabbitMQ consumer connected, waiting for messages...');
  }

  async startConsuming(): Promise<void> {
    if (!this.channel) {
      throw new Error('Consumer not connected');
    }

    await this.channel.consume(
      this.config.queue,
      async (msg) => {
        if (!msg) return;

        try {
          const task = JSON.parse(msg.content.toString()) as ChatGPTTask;
          const retryCount = msg.properties.headers?.retryCount || 0;

          console.log(`Processing task ${task.taskId} (retry ${retryCount})`);

          // Process with OpenAI
          const result = await this.processTask(task);

          // Acknowledge successful processing
          this.channel!.ack(msg);

          console.log(`Task ${task.taskId} completed successfully`);
          await this.storeResult(task.taskId, result);
        } catch (error) {
          await this.handleError(msg, error as Error);
        }
      },
      {
        noAck: false, // Manual acknowledgment
      }
    );
  }

  private async processTask(task: ChatGPTTask): Promise<string> {
    const completion = await this.openai.chat.completions.create({
      model: task.model,
      messages: task.messages as any,
      temperature: task.temperature,
      max_tokens: task.maxTokens,
    });

    return completion.choices[0].message.content || '';
  }

  private async handleError(msg: amqp.Message, error: Error): Promise<void> {
    const retryCount = msg.properties.headers?.retryCount || 0;

    if (retryCount < this.config.maxRetries) {
      // Retry with exponential backoff
      const delay = Math.min(1000 * Math.pow(2, retryCount), 60000);

      console.log(`Requeuing message (retry ${retryCount + 1}) after ${delay}ms`);

      await new Promise(resolve => setTimeout(resolve, delay));

      // Republish with incremented retry count
      this.channel!.publish(
        this.config.exchange,
        this.config.routingKey,
        msg.content,
        {
          ...msg.properties,
          headers: {
            ...msg.properties.headers,
            retryCount: retryCount + 1,
            lastError: error.message,
          },
        }
      );

      this.channel!.ack(msg);
    } else {
      // Max retries exceeded, send to DLX
      console.error(`Max retries exceeded for message, sending to DLQ`);
      this.channel!.nack(msg, false, false); // Don't requeue
    }
  }

  private async storeResult(taskId: string, result: string): Promise<void> {
    // Store in database, send webhook, etc.
    console.log(`Storing result for task ${taskId}`);
  }

  async close(): Promise<void> {
    if (this.channel) await this.channel.close();
    if (this.connection) await this.connection.close();
  }
}

// Usage
const consumer = new RabbitMQConsumer({
  url: process.env.RABBITMQ_URL || 'amqp://localhost',
  exchange: 'chatgpt.tasks',
  queue: 'chatgpt.process.queue',
  routingKey: 'chatgpt.process',
  dlxExchange: 'chatgpt.dlx',
  dlxQueue: 'chatgpt.failed.queue',
  maxRetries: 3,
  prefetchCount: 10,
});

await consumer.connect();
await consumer.startConsuming();

Dead Letter Queue Handler

Monitor and reprocess failed messages:

// dlq-handler.ts - Dead Letter Queue Monitor
import * as amqp from 'amqplib';

interface DLQMessage {
  taskId: string;
  originalQueue: string;
  failureReason: string;
  retryCount: number;
  timestamp: number;
  content: any;
}

export class DLQHandler {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();
  }

  async inspectDLQ(queueName: string): Promise<DLQMessage[]> {
    if (!this.channel) throw new Error('Not connected');

    const messages: DLQMessage[] = [];
    let msg;

    while ((msg = await this.channel.get(queueName, { noAck: false }))) {
      const dlqMessage: DLQMessage = {
        taskId: msg.properties.messageId,
        originalQueue: msg.properties.headers?.['x-first-death-queue'] || 'unknown',
        failureReason: msg.properties.headers?.lastError || 'unknown',
        retryCount: msg.properties.headers?.retryCount || 0,
        timestamp: msg.properties.timestamp,
        content: JSON.parse(msg.content.toString()),
      };

      messages.push(dlqMessage);

      // Don't ack yet, just peek
      this.channel.nack(msg, false, true);
    }

    return messages;
  }

  async requeueMessage(queueName: string, targetExchange: string): Promise<void> {
    if (!this.channel) throw new Error('Not connected');

    const msg = await this.channel.get(queueName, { noAck: false });

    if (msg) {
      // Reset retry count for manual reprocessing
      this.channel.publish(
        targetExchange,
        'chatgpt.process',
        msg.content,
        {
          ...msg.properties,
          headers: {
            ...msg.properties.headers,
            retryCount: 0,
            reprocessed: true,
          },
        }
      );

      this.channel.ack(msg);
      console.log(`Requeued message ${msg.properties.messageId}`);
    }
  }

  async purgeDLQ(queueName: string): Promise<number> {
    if (!this.channel) throw new Error('Not connected');
    const result = await this.channel.purgeQueue(queueName);
    console.log(`Purged ${result.messageCount} messages from ${queueName}`);
    return result.messageCount;
  }
}

For building scalable ChatGPT applications, RabbitMQ provides enterprise-grade reliability.


AWS SQS Integration for Serverless ChatGPT Apps

AWS SQS offers fully managed queueing with zero infrastructure management, perfect for serverless architectures and AWS Lambda integration.

AWS SQS Publisher Implementation

// sqs-publisher.ts - AWS SQS ChatGPT Task Publisher
import { SQSClient, SendMessageCommand, SendMessageBatchCommand } from '@aws-sdk/client-sqs';
import type { ChatGPTTask } from './types';

interface SQSPublisherConfig {
  region: string;
  queueUrl: string;
  fifo?: boolean;
  messageGroupId?: string;
}

export class SQSPublisher {
  private client: SQSClient;
  private config: SQSPublisherConfig;

  constructor(config: SQSPublisherConfig) {
    this.config = config;
    this.client = new SQSClient({ region: config.region });
  }

  async publishTask(task: ChatGPTTask): Promise<string> {
    const messageBody = JSON.stringify(task);

    const params: any = {
      QueueUrl: this.config.queueUrl,
      MessageBody: messageBody,
      MessageAttributes: {
        TaskId: {
          DataType: 'String',
          StringValue: task.taskId,
        },
        UserId: {
          DataType: 'String',
          StringValue: task.userId,
        },
        Model: {
          DataType: 'String',
          StringValue: task.model,
        },
        Priority: {
          DataType: 'Number',
          StringValue: (task.priority || 5).toString(),
        },
      },
    };

    // FIFO queue specific parameters
    if (this.config.fifo) {
      params.MessageGroupId = this.config.messageGroupId || task.userId;
      params.MessageDeduplicationId = task.taskId;
    }

    const command = new SendMessageCommand(params);
    const response = await this.client.send(command);

    console.log(`Published task ${task.taskId} to SQS: ${response.MessageId}`);
    return response.MessageId!;
  }

  async publishBatch(tasks: ChatGPTTask[]): Promise<void> {
    if (tasks.length === 0) return;
    if (tasks.length > 10) {
      throw new Error('AWS SQS batch limit is 10 messages');
    }

    const entries = tasks.map((task, index) => ({
      Id: index.toString(),
      MessageBody: JSON.stringify(task),
      MessageAttributes: {
        TaskId: {
          DataType: 'String',
          StringValue: task.taskId,
        },
        UserId: {
          DataType: 'String',
          StringValue: task.userId,
        },
        Model: {
          DataType: 'String',
          StringValue: task.model,
        },
      },
      ...(this.config.fifo && {
        MessageGroupId: this.config.messageGroupId || task.userId,
        MessageDeduplicationId: task.taskId,
      }),
    }));

    const command = new SendMessageBatchCommand({
      QueueUrl: this.config.queueUrl,
      Entries: entries,
    });

    const response = await this.client.send(command);

    if (response.Failed && response.Failed.length > 0) {
      console.error('Some messages failed:', response.Failed);
    }

    console.log(`Published ${response.Successful?.length || 0} tasks to SQS`);
  }
}

// Usage
const publisher = new SQSPublisher({
  region: 'us-east-1',
  queueUrl: process.env.SQS_QUEUE_URL!,
  fifo: false,
});

await publisher.publishTask({
  taskId: crypto.randomUUID(),
  userId: 'user_123',
  conversationId: 'conv_456',
  messages: [{ role: 'user', content: 'Analyze this data...' }],
  model: 'gpt-4',
  timestamp: Date.now(),
  priority: 7,
});

SQS Consumer with Long Polling

// sqs-consumer.ts - AWS SQS ChatGPT Task Consumer
import {
  SQSClient,
  ReceiveMessageCommand,
  DeleteMessageCommand,
  ChangeMessageVisibilityCommand,
} from '@aws-sdk/client-sqs';
import OpenAI from 'openai';

interface SQSConsumerConfig {
  region: string;
  queueUrl: string;
  maxMessages?: number;
  waitTimeSeconds?: number;
  visibilityTimeout?: number;
  maxRetries?: number;
}

export class SQSConsumer {
  private client: SQSClient;
  private config: SQSConsumerConfig;
  private openai: OpenAI;
  private running: boolean = false;

  constructor(config: SQSConsumerConfig) {
    this.config = {
      maxMessages: 10,
      waitTimeSeconds: 20, // Long polling
      visibilityTimeout: 300, // 5 minutes
      maxRetries: 3,
      ...config,
    };

    this.client = new SQSClient({ region: config.region });
    this.openai = new OpenAI({
      apiKey: process.env.OPENAI_API_KEY,
    });
  }

  async startConsuming(): Promise<void> {
    this.running = true;

    while (this.running) {
      try {
        const command = new ReceiveMessageCommand({
          QueueUrl: this.config.queueUrl,
          MaxNumberOfMessages: this.config.maxMessages,
          WaitTimeSeconds: this.config.waitTimeSeconds,
          MessageAttributeNames: ['All'],
          AttributeNames: ['All'],
        });

        const response = await this.client.send(command);

        if (response.Messages && response.Messages.length > 0) {
          console.log(`Received ${response.Messages.length} messages`);

          // Process messages in parallel
          await Promise.all(
            response.Messages.map(msg => this.processMessage(msg))
          );
        }
      } catch (error) {
        console.error('Error polling SQS:', error);
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }

  private async processMessage(message: any): Promise<void> {
    try {
      const task = JSON.parse(message.Body) as ChatGPTTask;
      const receiveCount = parseInt(
        message.Attributes?.ApproximateReceiveCount || '0'
      );

      console.log(`Processing task ${task.taskId} (attempt ${receiveCount})`);

      // Extend visibility timeout for long-running tasks
      if (task.maxTokens && task.maxTokens > 1000) {
        await this.extendVisibilityTimeout(message.ReceiptHandle, 600);
      }

      // Process with OpenAI
      const result = await this.processTask(task);

      // Delete message on success
      await this.deleteMessage(message.ReceiptHandle);

      console.log(`Task ${task.taskId} completed successfully`);
      await this.storeResult(task.taskId, result);
    } catch (error) {
      console.error('Error processing message:', error);
      await this.handleError(message, error as Error);
    }
  }

  private async processTask(task: ChatGPTTask): Promise<string> {
    const completion = await this.openai.chat.completions.create({
      model: task.model,
      messages: task.messages as any,
      temperature: task.temperature,
      max_tokens: task.maxTokens,
    });

    return completion.choices[0].message.content || '';
  }

  private async extendVisibilityTimeout(
    receiptHandle: string,
    timeoutSeconds: number
  ): Promise<void> {
    const command = new ChangeMessageVisibilityCommand({
      QueueUrl: this.config.queueUrl,
      ReceiptHandle: receiptHandle,
      VisibilityTimeout: timeoutSeconds,
    });

    await this.client.send(command);
  }

  private async deleteMessage(receiptHandle: string): Promise<void> {
    const command = new DeleteMessageCommand({
      QueueUrl: this.config.queueUrl,
      ReceiptHandle: receiptHandle,
    });

    await this.client.send(command);
  }

  private async handleError(message: any, error: Error): Promise<void> {
    const receiveCount = parseInt(
      message.Attributes?.ApproximateReceiveCount || '0'
    );

    if (receiveCount >= this.config.maxRetries!) {
      // Max retries exceeded, delete and log to DLQ
      console.error(`Max retries exceeded, moving to DLQ`);
      await this.sendToDLQ(message);
      await this.deleteMessage(message.ReceiptHandle);
    } else {
      // Message will automatically reappear after visibility timeout
      console.log(`Message will be retried (attempt ${receiveCount + 1})`);
    }
  }

  private async sendToDLQ(message: any): Promise<void> {
    // SQS automatically moves to DLQ if configured via Redrive Policy
    // This is just for custom DLQ handling
    console.log(`Message moved to DLQ: ${message.MessageId}`);
  }

  private async storeResult(taskId: string, result: string): Promise<void> {
    console.log(`Storing result for task ${taskId}`);
  }

  stop(): void {
    this.running = false;
    console.log('Stopping SQS consumer...');
  }
}

// Usage
const consumer = new SQSConsumer({
  region: 'us-east-1',
  queueUrl: process.env.SQS_QUEUE_URL!,
  maxMessages: 10,
  waitTimeSeconds: 20,
  visibilityTimeout: 300,
  maxRetries: 3,
});

await consumer.startConsuming();

Batch Processing Pattern

Process multiple tasks efficiently:

// sqs-batch-processor.ts - Batch Processing for Cost Efficiency
import { SQSClient, ReceiveMessageCommand, DeleteMessageBatchCommand } from '@aws-sdk/client-sqs';
import OpenAI from 'openai';

interface BatchResult {
  taskId: string;
  success: boolean;
  result?: string;
  error?: string;
}

export class SQSBatchProcessor {
  private client: SQSClient;
  private openai: OpenAI;

  constructor(region: string) {
    this.client = new SQSClient({ region });
    this.openai = new OpenAI({
      apiKey: process.env.OPENAI_API_KEY,
    });
  }

  async processBatch(queueUrl: string): Promise<BatchResult[]> {
    // Receive up to 10 messages
    const receiveCommand = new ReceiveMessageCommand({
      QueueUrl: queueUrl,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 20,
    });

    const response = await this.client.send(receiveCommand);

    if (!response.Messages || response.Messages.length === 0) {
      return [];
    }

    console.log(`Processing batch of ${response.Messages.length} messages`);

    // Process all messages in parallel
    const results = await Promise.allSettled(
      response.Messages.map(async (msg) => {
        const task = JSON.parse(msg.Body!) as ChatGPTTask;

        const completion = await this.openai.chat.completions.create({
          model: task.model,
          messages: task.messages as any,
          temperature: task.temperature,
          max_tokens: task.maxTokens,
        });

        return {
          taskId: task.taskId,
          success: true,
          result: completion.choices[0].message.content || '',
          receiptHandle: msg.ReceiptHandle!,
        };
      })
    );

    // Batch delete successful messages
    const successfulHandles = results
      .filter((r) => r.status === 'fulfilled')
      .map((r: any) => r.value.receiptHandle);

    if (successfulHandles.length > 0) {
      await this.batchDelete(queueUrl, successfulHandles);
    }

    // Return results
    return results.map((r, idx) => {
      if (r.status === 'fulfilled') {
        return {
          taskId: r.value.taskId,
          success: r.value.success,
          result: r.value.result,
        };
      } else {
        const task = JSON.parse(response.Messages![idx].Body!) as ChatGPTTask;
        return {
          taskId: task.taskId,
          success: false,
          error: r.reason.message,
        };
      }
    });
  }

  private async batchDelete(queueUrl: string, receiptHandles: string[]): Promise<void> {
    const entries = receiptHandles.map((handle, idx) => ({
      Id: idx.toString(),
      ReceiptHandle: handle,
    }));

    const command = new DeleteMessageBatchCommand({
      QueueUrl: queueUrl,
      Entries: entries,
    });

    await this.client.send(command);
    console.log(`Deleted ${entries.length} messages in batch`);
  }
}

For serverless architectures, see Building Event-Driven ChatGPT Apps.


Advanced Message Queue Patterns

Priority Queue Implementation

Process urgent tasks first:

// priority-queue.ts - RabbitMQ Priority Queue
import * as amqp from 'amqplib';

export class PriorityQueueManager {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async setupPriorityQueue(queueName: string, maxPriority: number = 10): Promise<void> {
    this.connection = await amqp.connect(process.env.RABBITMQ_URL!);
    this.channel = await this.connection.createChannel();

    // Declare priority queue
    await this.channel.assertQueue(queueName, {
      durable: true,
      arguments: {
        'x-max-priority': maxPriority,
      },
    });

    console.log(`Priority queue ${queueName} created with max priority ${maxPriority}`);
  }

  async publishWithPriority(
    queueName: string,
    task: ChatGPTTask,
    priority: number
  ): Promise<void> {
    if (!this.channel) throw new Error('Not connected');

    const messageBuffer = Buffer.from(JSON.stringify(task));

    this.channel.sendToQueue(queueName, messageBuffer, {
      persistent: true,
      priority: Math.min(priority, 10),
      messageId: task.taskId,
    });

    console.log(`Published task ${task.taskId} with priority ${priority}`);
  }

  async consumePriority(queueName: string): Promise<void> {
    if (!this.channel) throw new Error('Not connected');

    await this.channel.consume(
      queueName,
      async (msg) => {
        if (!msg) return;

        const task = JSON.parse(msg.content.toString()) as ChatGPTTask;
        const priority = msg.properties.priority || 5;

        console.log(`Processing task ${task.taskId} (priority ${priority})`);

        // Process task...
        await this.processTask(task);

        this.channel!.ack(msg);
      },
      { noAck: false }
    );
  }

  private async processTask(task: ChatGPTTask): Promise<void> {
    // OpenAI processing logic
  }
}

// Usage: Urgent tasks get processed first
const priorityQueue = new PriorityQueueManager();
await priorityQueue.setupPriorityQueue('chatgpt.priority.queue', 10);

// High priority (customer support)
await priorityQueue.publishWithPriority(queueName, task1, 10);

// Medium priority (regular requests)
await priorityQueue.publishWithPriority(queueName, task2, 5);

// Low priority (batch processing)
await priorityQueue.publishWithPriority(queueName, task3, 1);

Delayed Message Queue

Schedule tasks for future execution:

// delayed-queue.ts - SQS Delay Queue
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';

export class DelayedMessageQueue {
  private client: SQSClient;
  private queueUrl: string;

  constructor(region: string, queueUrl: string) {
    this.client = new SQSClient({ region });
    this.queueUrl = queueUrl;
  }

  async scheduleTask(task: ChatGPTTask, delaySeconds: number): Promise<string> {
    if (delaySeconds > 900) {
      throw new Error('SQS max delay is 900 seconds (15 minutes)');
    }

    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(task),
      DelaySeconds: delaySeconds,
      MessageAttributes: {
        ScheduledFor: {
          DataType: 'Number',
          StringValue: (Date.now() + delaySeconds * 1000).toString(),
        },
      },
    });

    const response = await this.client.send(command);
    console.log(`Scheduled task ${task.taskId} for ${delaySeconds}s delay`);

    return response.MessageId!;
  }

  async scheduleForTimestamp(task: ChatGPTTask, timestamp: number): Promise<string> {
    const delaySeconds = Math.max(0, Math.floor((timestamp - Date.now()) / 1000));

    if (delaySeconds > 900) {
      // For longer delays, use EventBridge Scheduler
      return this.scheduleWithEventBridge(task, timestamp);
    }

    return this.scheduleTask(task, delaySeconds);
  }

  private async scheduleWithEventBridge(task: ChatGPTTask, timestamp: number): Promise<string> {
    // EventBridge Scheduler for delays > 15 minutes
    console.log('Using EventBridge for long-term scheduling');
    // Implementation depends on EventBridge setup
    return 'eventbridge_' + task.taskId;
  }
}

// Usage
const delayQueue = new DelayedMessageQueue('us-east-1', process.env.SQS_QUEUE_URL!);

// Schedule task for 5 minutes from now
await delayQueue.scheduleTask(task, 300);

// Schedule for specific timestamp (e.g., next hour)
const nextHour = Date.now() + 3600000;
await delayQueue.scheduleForTimestamp(task, nextHour);

Message Deduplication

Prevent duplicate processing:

// deduplication.ts - Message Deduplication
import { createHash } from 'crypto';

interface DeduplicationCache {
  set(key: string, ttlSeconds: number): Promise<boolean>;
  has(key: string): Promise<boolean>;
}

// Redis-based implementation
import { createClient } from 'redis';

export class RedisDeduplication implements DeduplicationCache {
  private client: ReturnType<typeof createClient>;

  constructor(redisUrl: string) {
    this.client = createClient({ url: redisUrl });
    this.client.connect();
  }

  async set(key: string, ttlSeconds: number): Promise<boolean> {
    const result = await this.client.set(key, '1', {
      NX: true, // Only set if not exists
      EX: ttlSeconds,
    });

    return result === 'OK';
  }

  async has(key: string): Promise<boolean> {
    const exists = await this.client.exists(key);
    return exists === 1;
  }
}

export class MessageDeduplicator {
  private cache: DeduplicationCache;

  constructor(cache: DeduplicationCache) {
    this.cache = cache;
  }

  generateMessageHash(task: ChatGPTTask): string {
    // Hash based on content (not taskId to catch duplicates)
    const content = JSON.stringify({
      userId: task.userId,
      conversationId: task.conversationId,
      messages: task.messages,
      model: task.model,
    });

    return createHash('sha256').update(content).digest('hex');
  }

  async isDuplicate(task: ChatGPTTask, ttlSeconds: number = 300): Promise<boolean> {
    const hash = this.generateMessageHash(task);
    const exists = await this.cache.has(hash);

    if (!exists) {
      await this.cache.set(hash, ttlSeconds);
      return false;
    }

    console.log(`Duplicate detected: ${task.taskId}`);
    return true;
  }
}

// Usage
const deduplicator = new MessageDeduplicator(
  new RedisDeduplication(process.env.REDIS_URL!)
);

// Before processing
if (await deduplicator.isDuplicate(task, 300)) {
  console.log('Skipping duplicate task');
  channel.ack(msg);
  return;
}

// Process task...

For complex workflows, see Multi-Agent Orchestration Patterns.


Monitoring & Scaling Message Queues

Queue Metrics Collector

Monitor queue health with Prometheus:

// queue-metrics.ts - Prometheus Queue Metrics
import { Registry, Gauge, Counter, Histogram } from 'prom-client';
import { SQSClient, GetQueueAttributesCommand } from '@aws-sdk/client-sqs';
import * as amqp from 'amqplib';

export class QueueMetricsCollector {
  private register: Registry;
  private queueDepthGauge: Gauge;
  private messagesProcessedCounter: Counter;
  private processingDurationHistogram: Histogram;

  constructor() {
    this.register = new Registry();

    this.queueDepthGauge = new Gauge({
      name: 'queue_depth',
      help: 'Number of messages waiting in queue',
      labelNames: ['queue_name'],
      registers: [this.register],
    });

    this.messagesProcessedCounter = new Counter({
      name: 'messages_processed_total',
      help: 'Total messages processed',
      labelNames: ['queue_name', 'status'],
      registers: [this.register],
    });

    this.processingDurationHistogram = new Histogram({
      name: 'message_processing_duration_seconds',
      help: 'Message processing duration',
      labelNames: ['queue_name'],
      buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
      registers: [this.register],
    });
  }

  async collectSQSMetrics(queueUrl: string, queueName: string): Promise<void> {
    const client = new SQSClient({ region: 'us-east-1' });

    const command = new GetQueueAttributesCommand({
      QueueUrl: queueUrl,
      AttributeNames: [
        'ApproximateNumberOfMessages',
        'ApproximateNumberOfMessagesNotVisible',
      ],
    });

    const response = await client.send(command);

    const depth = parseInt(
      response.Attributes?.ApproximateNumberOfMessages || '0'
    );

    this.queueDepthGauge.set({ queue_name: queueName }, depth);

    console.log(`Queue ${queueName} depth: ${depth}`);
  }

  recordMessageProcessed(queueName: string, success: boolean, durationMs: number): void {
    this.messagesProcessedCounter.inc({
      queue_name: queueName,
      status: success ? 'success' : 'failure',
    });

    this.processingDurationHistogram.observe(
      { queue_name: queueName },
      durationMs / 1000
    );
  }

  getMetrics(): string {
    return this.register.metrics();
  }
}

// Usage with Express endpoint
import express from 'express';

const app = express();
const metrics = new QueueMetricsCollector();

app.get('/metrics', (req, res) => {
  res.set('Content-Type', metrics.register.contentType);
  res.send(metrics.getMetrics());
});

// Collect metrics every 30 seconds
setInterval(async () => {
  await metrics.collectSQSMetrics(
    process.env.SQS_QUEUE_URL!,
    'chatgpt-tasks'
  );
}, 30000);

Auto-Scaling Consumers

Scale consumers based on queue depth:

AWS Lambda + SQS: Automatically scales 0→1000 concurrent executions based on queue depth.

Kubernetes HPA: Scale pods based on queue metrics:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: chatgpt-consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: chatgpt-consumer
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: sqs_queue_depth
        selector:
          matchLabels:
            queue_name: "chatgpt-tasks"
      target:
        type: AverageValue
        averageValue: "100"

Docker Swarm: Scale services based on custom metrics from Prometheus.

For production deployment strategies, see Deploying ChatGPT Apps to Production.


Production Deployment Checklist

RabbitMQ Production Setup:

  • ✅ Enable persistent messages and durable queues
  • ✅ Configure dead letter exchanges for failed messages
  • ✅ Set prefetch count based on processing capacity (10-50)
  • ✅ Implement connection pooling and heartbeat monitoring
  • ✅ Use confirm mode for critical messages
  • ✅ Monitor queue depth and consumer lag
  • ✅ Setup high availability cluster (3+ nodes)
  • ✅ Configure memory and disk alarms
  • ✅ Implement message TTL for stale tasks

AWS SQS Production Setup:

  • ✅ Use FIFO queues for ordered processing
  • ✅ Configure DLQ with appropriate maxReceiveCount (3-5)
  • ✅ Set visibility timeout > max processing time
  • ✅ Enable server-side encryption (SSE)
  • ✅ Implement long polling (20 seconds)
  • ✅ Monitor CloudWatch metrics (ApproximateNumberOfMessages)
  • ✅ Set up alarms for queue depth thresholds
  • ✅ Use batch operations to reduce API costs
  • ✅ Implement message deduplication

Common Best Practices:

  • ✅ Implement idempotent message handlers
  • ✅ Add structured logging with correlation IDs
  • ✅ Monitor processing duration and error rates
  • ✅ Implement circuit breakers for external dependencies
  • ✅ Use exponential backoff for retries
  • ✅ Encrypt sensitive data in message payloads
  • ✅ Implement graceful shutdown handling
  • ✅ Add health check endpoints
  • ✅ Document queue naming conventions and routing keys

Conclusion: Building Reliable ChatGPT Apps with Message Queues

Message queues transform ChatGPT applications from fragile request-response systems into resilient, scalable platforms. By decoupling AI processing from user interactions, you unlock:

Reliability: Automatic retries ensure no user request is lost due to temporary failures.

Scalability: Process thousands of concurrent ChatGPT requests by adding consumers.

Performance: Users get instant responses while AI processing happens asynchronously.

Cost efficiency: Batch processing and rate limiting optimize OpenAI API usage.

Observability: Queue metrics reveal bottlenecks before they impact users.

Key takeaways:

  1. Choose RabbitMQ for self-hosted deployments needing advanced routing and priority queues
  2. Choose AWS SQS for serverless architectures and zero infrastructure management
  3. Always use persistent messages with manual acknowledgment for production workloads
  4. Implement DLQs to capture and analyze failed messages
  5. Monitor queue depth as a leading indicator of system health
  6. Scale consumers based on queue backlog, not CPU/memory

Start simple with a basic producer-consumer pattern, then add complexity (priority queues, delayed messages, deduplication) as your ChatGPT app scales.

For architectural guidance on building production ChatGPT apps, see our comprehensive ChatGPT Applications Guide.

Related resources:


Build Your ChatGPT App with MakeAIHQ

Ready to deploy production-grade ChatGPT apps with built-in message queue integration?

MakeAIHQ provides:

  • ✅ Pre-configured RabbitMQ and SQS integrations
  • ✅ Auto-scaling consumer infrastructure
  • ✅ Built-in retry logic and DLQ monitoring
  • ✅ One-click deployment to ChatGPT App Store
  • ✅ Real-time queue metrics dashboard

Start building your ChatGPT app today →

No infrastructure management. No queue configuration. Just ship reliable AI products.

Free tier includes: 1 app, 1,000 queued tasks/month, RabbitMQ or SQS integration.

Questions? Contact our solutions team for architecture consulting.