Database Sharding Strategies for ChatGPT Apps

As ChatGPT applications scale to millions of users, database sharding becomes essential for maintaining performance and availability. This comprehensive guide explores production-ready sharding strategies specifically designed for ChatGPT app architectures, from shard key selection to cross-shard query optimization.

Understanding Database Sharding for ChatGPT Applications

Database sharding is a horizontal partitioning strategy that distributes data across multiple database instances (shards) to overcome the scalability limitations of single-server architectures. For ChatGPT applications handling conversational state, user contexts, and tool execution results, sharding enables you to scale beyond the I/O, storage, and memory constraints of a single database server.

The fundamental principle is simple: instead of storing all data in one database, you partition it across multiple databases based on a shard key. Each shard operates independently, handling a subset of your total data and query load. This approach delivers horizontal scalability—you can add more shards as your user base grows without architectural changes.

For ChatGPT apps, sharding is particularly valuable because conversational workloads often exhibit natural partitioning boundaries. User sessions, conversation histories, and tool execution contexts are typically isolated by user ID or tenant ID, making them ideal candidates for shard-based distribution. Unlike traditional web applications where data relationships might span the entire database, ChatGPT apps often operate within bounded contexts that align well with shard boundaries.

However, sharding introduces complexity. You must handle cross-shard queries, maintain consistency across shards, and manage the operational overhead of multiple database instances. The strategies in this guide will help you implement sharding correctly the first time, avoiding common pitfalls that lead to hotspots, uneven distribution, and performance degradation.

Shard Key Selection: The Foundation of Sharding Success

The shard key is the single most important decision in your sharding strategy. It determines how data is distributed across shards, which directly impacts query performance, distribution evenness, and operational complexity. For ChatGPT applications, the shard key must align with your access patterns while ensuring even distribution.

User ID as Shard Key: The most common approach for ChatGPT apps is sharding by user ID. Since most queries access data for a specific user (conversation history, user preferences, session state), user-based sharding ensures that all related data resides on the same shard, avoiding expensive cross-shard queries. This works well for B2C applications where user distribution is naturally even.

Tenant ID for Multi-Tenant Architectures: If you're building a multi-tenant ChatGPT platform where businesses deploy custom apps, tenant ID is the optimal shard key. This keeps all data for a single tenant on one shard, simplifying queries and maintaining tenant isolation. However, be cautious of tenant size imbalance—large enterprise tenants might create hotspots.

Composite Keys for Complex Access Patterns: Some ChatGPT apps require composite shard keys combining multiple dimensions (e.g., tenantId + region or userId + conversationDate). Composite keys enable more granular distribution but increase routing complexity. Use them when simple keys lead to uneven distribution or when you need geographic data locality.

Distribution Analysis: Before finalizing your shard key, analyze your data distribution. Calculate the cardinality (number of unique values), examine the distribution skew (are some values far more common?), and project growth patterns. A good shard key should have high cardinality (millions of unique values), even distribution (no values represent more than 1-2% of total data), and growth independence (adding users doesn't concentrate load on specific shards).

Avoiding Hotspots: Hotspots occur when certain shards receive disproportionate traffic. Common causes include celebrity users (one user ID generates 100x more requests), trending topics (all requests hit shards containing trending conversation data), or time-based keys (today's shard gets all writes). Mitigate hotspots by adding entropy to your shard key (e.g., hash the user ID instead of using it directly) or implementing dynamic shard splitting.

Immutability Requirement: Once chosen, shard keys are extremely difficult to change. Migrating to a new shard key requires moving terabytes of data and potentially causes downtime. Choose carefully by analyzing access patterns, running distribution simulations, and consulting with database experts before committing to production.

Sharding Algorithms: Hash, Range, and Directory-Based

The sharding algorithm determines how shard keys map to physical shards. Each algorithm offers different trade-offs between distribution quality, query routing simplicity, and operational flexibility.

Hash-Based Sharding: This algorithm applies a hash function to the shard key and uses modulo arithmetic to assign the result to a shard. For example, shard = hash(userId) % numberOfShards. Hash-based sharding delivers excellent distribution evenness—keys are randomly distributed across shards regardless of their semantic meaning. This prevents hotspots caused by sequential IDs or timestamp-based keys.

The primary advantage is simplicity and predictability. Any service can calculate the target shard independently without consulting a central directory. This makes hash-based sharding ideal for stateless ChatGPT app architectures where multiple API instances need to route queries.

However, hash-based sharding has a critical weakness: adding or removing shards requires rehashing a large portion of your data. If you have 8 shards and add a 9th, changing the modulo base from 8 to 9 means that most keys will map to different shards. This triggers massive data migrations. Use consistent hashing (described below) to mitigate this issue.

Range-Based Sharding: This algorithm assigns contiguous ranges of shard key values to specific shards. For example, users A-M go to Shard 1, N-Z to Shard 2. Range-based sharding enables efficient range queries—if you need all conversations from users A-C, you only query Shard 1.

Range-based sharding works well when your access patterns frequently involve range scans (e.g., "get all conversations from the last week" or "find users in geographic region X"). It also simplifies adding shards—you can split existing ranges without moving data from other shards.

The disadvantage is distribution risk. If your user base skews toward certain letter ranges (more users with names starting with "A" than "Z"), you'll get uneven load distribution. Range boundaries must be carefully chosen based on actual data distribution, not assumed uniformity.

Directory-Based Sharding: This algorithm maintains a lookup table (directory) mapping shard keys to shards. The directory can use any logic—hash-based, range-based, or custom rules. For example, you might assign premium users to dedicated high-performance shards while distributing free users across commodity shards.

Directory-based sharding offers maximum flexibility. You can implement custom distribution logic, migrate individual users between shards without affecting others, and rebalance load dynamically. It's ideal for ChatGPT apps with heterogeneous users (enterprise vs. free tier) or evolving access patterns.

The trade-off is operational complexity. The directory becomes a critical dependency—if it's unavailable, no queries can be routed. You must ensure the directory is highly available, cached aggressively, and updated atomically during shard assignments. Directory-based sharding is recommended only for large-scale ChatGPT platforms with dedicated database operations teams.

Consistent Hashing: A specialized hash-based algorithm that minimizes data movement when adding/removing shards. Instead of hash(key) % shards, consistent hashing maps both keys and shards onto a circular hash space. Each key belongs to the first shard encountered when moving clockwise around the circle. Adding a shard only affects keys in the adjacent range, typically less than 10% of total data. This makes consistent hashing the industry standard for dynamic sharding environments.

Production-Ready Shard Router Implementation

Here's a production-grade shard router implementing consistent hashing with connection pooling, health checks, and automatic failover:

import { createHash } from 'crypto';
import { Pool } from 'pg';

interface ShardConfig {
  shardId: string;
  host: string;
  port: number;
  database: string;
  maxConnections: number;
}

interface VirtualNode {
  hash: number;
  shardId: string;
}

export class ShardRouter {
  private pools: Map<string, Pool> = new Map();
  private virtualNodes: VirtualNode[] = [];
  private readonly virtualNodesPerShard = 150; // Higher number = better distribution
  private healthCheckInterval: NodeJS.Timeout;
  private unhealthyShards: Set<string> = new Set();

  constructor(private shardConfigs: ShardConfig[]) {
    this.initializePools();
    this.initializeConsistentHashing();
    this.startHealthChecks();
  }

  /**
   * Initialize connection pools for each shard
   */
  private initializePools(): void {
    for (const config of this.shardConfigs) {
      const pool = new Pool({
        host: config.host,
        port: config.port,
        database: config.database,
        max: config.maxConnections,
        idleTimeoutMillis: 30000,
        connectionTimeoutMillis: 5000,
      });

      // Handle pool errors
      pool.on('error', (err) => {
        console.error(`Pool error for shard ${config.shardId}:`, err);
        this.markShardUnhealthy(config.shardId);
      });

      this.pools.set(config.shardId, pool);
    }
  }

  /**
   * Initialize consistent hashing ring with virtual nodes
   */
  private initializeConsistentHashing(): void {
    for (const config of this.shardConfigs) {
      // Create multiple virtual nodes per physical shard for better distribution
      for (let i = 0; i < this.virtualNodesPerShard; i++) {
        const virtualKey = `${config.shardId}-vnode-${i}`;
        const hash = this.hashFunction(virtualKey);
        this.virtualNodes.push({ hash, shardId: config.shardId });
      }
    }

    // Sort virtual nodes by hash for efficient binary search
    this.virtualNodes.sort((a, b) => a.hash - b.hash);
  }

  /**
   * Hash function using SHA256
   */
  private hashFunction(key: string): number {
    const hash = createHash('sha256').update(key).digest();
    // Use first 4 bytes as 32-bit integer
    return hash.readUInt32BE(0);
  }

  /**
   * Get shard ID for a given key using consistent hashing
   */
  public getShardId(key: string): string {
    const keyHash = this.hashFunction(key);

    // Binary search to find first virtual node >= keyHash
    let left = 0;
    let right = this.virtualNodes.length - 1;

    while (left < right) {
      const mid = Math.floor((left + right) / 2);
      if (this.virtualNodes[mid].hash < keyHash) {
        left = mid + 1;
      } else {
        right = mid;
      }
    }

    // Wrap around if we're past the end
    const targetNode = this.virtualNodes[left].hash >= keyHash
      ? this.virtualNodes[left]
      : this.virtualNodes[0];

    // If primary shard is unhealthy, find next healthy shard
    if (this.unhealthyShards.has(targetNode.shardId)) {
      return this.findNextHealthyShard(left);
    }

    return targetNode.shardId;
  }

  /**
   * Find next healthy shard in the ring
   */
  private findNextHealthyShard(startIndex: number): string {
    const maxAttempts = this.virtualNodes.length;
    let currentIndex = startIndex;

    for (let i = 0; i < maxAttempts; i++) {
      currentIndex = (currentIndex + 1) % this.virtualNodes.length;
      const shardId = this.virtualNodes[currentIndex].shardId;

      if (!this.unhealthyShards.has(shardId)) {
        return shardId;
      }
    }

    throw new Error('No healthy shards available');
  }

  /**
   * Execute query on the appropriate shard
   */
  public async executeQuery<T = any>(
    key: string,
    query: string,
    params: any[] = []
  ): Promise<T> {
    const shardId = this.getShardId(key);
    const pool = this.pools.get(shardId);

    if (!pool) {
      throw new Error(`No pool found for shard ${shardId}`);
    }

    try {
      const result = await pool.query(query, params);
      return result.rows as T;
    } catch (error) {
      console.error(`Query failed on shard ${shardId}:`, error);
      this.markShardUnhealthy(shardId);
      throw error;
    }
  }

  /**
   * Execute transaction on a single shard
   */
  public async executeTransaction<T>(
    key: string,
    callback: (client: any) => Promise<T>
  ): Promise<T> {
    const shardId = this.getShardId(key);
    const pool = this.pools.get(shardId);

    if (!pool) {
      throw new Error(`No pool found for shard ${shardId}`);
    }

    const client = await pool.connect();

    try {
      await client.query('BEGIN');
      const result = await callback(client);
      await client.query('COMMIT');
      return result;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  /**
   * Periodic health checks for all shards
   */
  private startHealthChecks(): void {
    this.healthCheckInterval = setInterval(async () => {
      for (const [shardId, pool] of this.pools.entries()) {
        try {
          await pool.query('SELECT 1');
          // If query succeeds, mark shard as healthy
          if (this.unhealthyShards.has(shardId)) {
            console.log(`Shard ${shardId} recovered`);
            this.unhealthyShards.delete(shardId);
          }
        } catch (error) {
          console.error(`Health check failed for shard ${shardId}:`, error);
          this.markShardUnhealthy(shardId);
        }
      }
    }, 30000); // Check every 30 seconds
  }

  /**
   * Mark shard as unhealthy
   */
  private markShardUnhealthy(shardId: string): void {
    if (!this.unhealthyShards.has(shardId)) {
      console.warn(`Marking shard ${shardId} as unhealthy`);
      this.unhealthyShards.add(shardId);
    }
  }

  /**
   * Get all shard IDs for cross-shard operations
   */
  public getAllShardIds(): string[] {
    return Array.from(this.pools.keys()).filter(
      id => !this.unhealthyShards.has(id)
    );
  }

  /**
   * Cleanup resources
   */
  public async destroy(): Promise<void> {
    clearInterval(this.healthCheckInterval);

    const closePromises = Array.from(this.pools.values()).map(pool =>
      pool.end()
    );

    await Promise.all(closePromises);
    this.pools.clear();
  }
}

Hash-Based Shard Key Generator

This implementation ensures consistent shard assignment with built-in hotspot detection:

interface ShardKeyMetrics {
  key: string;
  shardId: string;
  requestCount: number;
  lastAccessed: Date;
}

export class ShardKeyGenerator {
  private metricsMap: Map<string, ShardKeyMetrics> = new Map();
  private readonly hotspotThreshold = 1000; // Requests per minute
  private readonly metricsWindowMs = 60000; // 1 minute

  constructor(private router: ShardRouter) {
    this.startMetricsCleanup();
  }

  /**
   * Generate shard key from user ID with entropy injection
   */
  public generateUserShardKey(userId: string, options?: {
    addEntropy?: boolean;
    tenantId?: string;
  }): string {
    let key = userId;

    // Add tenant prefix for multi-tenant isolation
    if (options?.tenantId) {
      key = `${options.tenantId}:${userId}`;
    }

    // Inject entropy for known hotspot users
    if (options?.addEntropy && this.isHotspot(key)) {
      const entropy = Math.floor(Math.random() * 10);
      key = `${key}-${entropy}`;
    }

    return key;
  }

  /**
   * Generate shard key from conversation ID
   */
  public generateConversationShardKey(
    conversationId: string,
    userId: string
  ): string {
    // Co-locate conversation with user for query efficiency
    return this.generateUserShardKey(userId);
  }

  /**
   * Generate time-based shard key with bucketing
   */
  public generateTimeSeriesKey(
    userId: string,
    timestamp: Date,
    bucketSizeMs: number = 3600000 // 1 hour buckets
  ): string {
    const bucket = Math.floor(timestamp.getTime() / bucketSizeMs);
    return `${userId}:${bucket}`;
  }

  /**
   * Track metrics for hotspot detection
   */
  public recordAccess(key: string): void {
    const shardId = this.router.getShardId(key);
    const existing = this.metricsMap.get(key);

    if (existing) {
      existing.requestCount++;
      existing.lastAccessed = new Date();
    } else {
      this.metricsMap.set(key, {
        key,
        shardId,
        requestCount: 1,
        lastAccessed: new Date(),
      });
    }
  }

  /**
   * Check if key is a hotspot
   */
  private isHotspot(key: string): boolean {
    const metrics = this.metricsMap.get(key);
    if (!metrics) return false;

    const windowAge = Date.now() - metrics.lastAccessed.getTime();
    if (windowAge > this.metricsWindowMs) return false;

    const requestsPerMinute = (metrics.requestCount / windowAge) * 60000;
    return requestsPerMinute > this.hotspotThreshold;
  }

  /**
   * Get hotspot report
   */
  public getHotspots(): ShardKeyMetrics[] {
    const now = Date.now();
    const hotspots: ShardKeyMetrics[] = [];

    for (const metrics of this.metricsMap.values()) {
      const windowAge = now - metrics.lastAccessed.getTime();
      if (windowAge > this.metricsWindowMs) continue;

      const requestsPerMinute = (metrics.requestCount / windowAge) * 60000;
      if (requestsPerMinute > this.hotspotThreshold) {
        hotspots.push(metrics);
      }
    }

    return hotspots.sort((a, b) => b.requestCount - a.requestCount);
  }

  /**
   * Clean up old metrics
   */
  private startMetricsCleanup(): void {
    setInterval(() => {
      const now = Date.now();
      const cutoff = now - this.metricsWindowMs * 2;

      for (const [key, metrics] of this.metricsMap.entries()) {
        if (metrics.lastAccessed.getTime() < cutoff) {
          this.metricsMap.delete(key);
        }
      }
    }, this.metricsWindowMs);
  }

  /**
   * Get distribution statistics
   */
  public getDistributionStats(): Map<string, number> {
    const shardCounts = new Map<string, number>();

    for (const metrics of this.metricsMap.values()) {
      const current = shardCounts.get(metrics.shardId) || 0;
      shardCounts.set(metrics.shardId, current + metrics.requestCount);
    }

    return shardCounts;
  }
}

Shard Management and Configuration

This shard manager handles dynamic shard addition, removal, and configuration updates:

interface ShardInfo {
  shardId: string;
  host: string;
  port: number;
  database: string;
  maxConnections: number;
  status: 'active' | 'draining' | 'inactive';
  createdAt: Date;
  weight?: number; // For weighted distribution
}

export class ShardManager {
  private shards: Map<string, ShardInfo> = new Map();
  private router: ShardRouter | null = null;

  /**
   * Add new shard to the cluster
   */
  public async addShard(config: Omit<ShardInfo, 'status' | 'createdAt'>): Promise<void> {
    if (this.shards.has(config.shardId)) {
      throw new Error(`Shard ${config.shardId} already exists`);
    }

    const shardInfo: ShardInfo = {
      ...config,
      status: 'active',
      createdAt: new Date(),
    };

    // Validate connectivity before adding
    await this.validateShardConnectivity(shardInfo);

    this.shards.set(config.shardId, shardInfo);

    // Rebuild router with new shard
    await this.rebuildRouter();

    console.log(`Shard ${config.shardId} added successfully`);
  }

  /**
   * Remove shard from cluster (with draining period)
   */
  public async removeShard(shardId: string, drainTimeMs: number = 300000): Promise<void> {
    const shard = this.shards.get(shardId);
    if (!shard) {
      throw new Error(`Shard ${shardId} not found`);
    }

    // Mark as draining - stop sending new requests
    shard.status = 'draining';
    console.log(`Shard ${shardId} entering drain mode for ${drainTimeMs}ms`);

    // Wait for drain period to allow in-flight requests to complete
    await new Promise(resolve => setTimeout(resolve, drainTimeMs));

    // Mark as inactive and rebuild router
    shard.status = 'inactive';
    await this.rebuildRouter();

    // Remove from map
    this.shards.delete(shardId);
    console.log(`Shard ${shardId} removed successfully`);
  }

  /**
   * Validate shard connectivity
   */
  private async validateShardConnectivity(shard: ShardInfo): Promise<void> {
    const testPool = new Pool({
      host: shard.host,
      port: shard.port,
      database: shard.database,
      max: 1,
      connectionTimeoutMillis: 5000,
    });

    try {
      const result = await testPool.query('SELECT 1 as connected');
      if (!result.rows[0]?.connected) {
        throw new Error('Connectivity check failed');
      }
    } catch (error) {
      throw new Error(`Failed to connect to shard ${shard.shardId}: ${error.message}`);
    } finally {
      await testPool.end();
    }
  }

  /**
   * Rebuild router with current active shards
   */
  private async rebuildRouter(): Promise<void> {
    // Destroy old router
    if (this.router) {
      await this.router.destroy();
    }

    // Build config for active shards only
    const activeShards = Array.from(this.shards.values())
      .filter(s => s.status === 'active')
      .map(s => ({
        shardId: s.shardId,
        host: s.host,
        port: s.port,
        database: s.database,
        maxConnections: s.maxConnections,
      }));

    if (activeShards.length === 0) {
      throw new Error('No active shards available');
    }

    this.router = new ShardRouter(activeShards);
  }

  /**
   * Get current router instance
   */
  public getRouter(): ShardRouter {
    if (!this.router) {
      throw new Error('Router not initialized');
    }
    return this.router;
  }

  /**
   * Get shard information
   */
  public getShardInfo(shardId: string): ShardInfo | undefined {
    return this.shards.get(shardId);
  }

  /**
   * List all shards with status
   */
  public listShards(): ShardInfo[] {
    return Array.from(this.shards.values());
  }

  /**
   * Update shard configuration
   */
  public async updateShardConfig(
    shardId: string,
    updates: Partial<Pick<ShardInfo, 'maxConnections' | 'weight'>>
  ): Promise<void> {
    const shard = this.shards.get(shardId);
    if (!shard) {
      throw new Error(`Shard ${shardId} not found`);
    }

    Object.assign(shard, updates);

    // Rebuild router to apply changes
    await this.rebuildRouter();

    console.log(`Shard ${shardId} configuration updated`);
  }
}

Cross-Shard Query Execution

Handling queries that span multiple shards requires scatter-gather patterns and result aggregation:

interface QueryResult<T> {
  shardId: string;
  data: T[];
  error?: Error;
  executionTimeMs: number;
}

export class CrossShardQueryExecutor {
  constructor(private router: ShardRouter) {}

  /**
   * Execute query across all shards in parallel
   */
  public async scatterGather<T = any>(
    query: string,
    params: any[] = [],
    options?: {
      timeout?: number;
      continueOnError?: boolean;
    }
  ): Promise<QueryResult<T>[]> {
    const shardIds = this.router.getAllShardIds();
    const timeout = options?.timeout || 30000;
    const continueOnError = options?.continueOnError ?? true;

    const queryPromises = shardIds.map(async (shardId): Promise<QueryResult<T>> => {
      const startTime = Date.now();

      try {
        const pool = (this.router as any).pools.get(shardId);
        if (!pool) {
          throw new Error(`Pool not found for shard ${shardId}`);
        }

        const timeoutPromise = new Promise((_, reject) =>
          setTimeout(() => reject(new Error('Query timeout')), timeout)
        );

        const queryPromise = pool.query(query, params);
        const result = await Promise.race([queryPromise, timeoutPromise]);

        return {
          shardId,
          data: (result as any).rows,
          executionTimeMs: Date.now() - startTime,
        };
      } catch (error) {
        if (!continueOnError) {
          throw error;
        }

        return {
          shardId,
          data: [],
          error: error as Error,
          executionTimeMs: Date.now() - startTime,
        };
      }
    });

    return Promise.all(queryPromises);
  }

  /**
   * Aggregate results from multiple shards
   */
  public aggregateResults<T>(
    results: QueryResult<T>[],
    options?: {
      sortBy?: keyof T;
      sortOrder?: 'asc' | 'desc';
      limit?: number;
      offset?: number;
    }
  ): T[] {
    // Filter out errors
    const successfulResults = results.filter(r => !r.error);

    // Combine all data
    let combined = successfulResults.flatMap(r => r.data);

    // Sort if requested
    if (options?.sortBy) {
      combined.sort((a, b) => {
        const aVal = a[options.sortBy!];
        const bVal = b[options.sortBy!];

        if (aVal < bVal) return options.sortOrder === 'desc' ? 1 : -1;
        if (aVal > bVal) return options.sortOrder === 'desc' ? -1 : 1;
        return 0;
      });
    }

    // Apply pagination
    const offset = options?.offset || 0;
    const limit = options?.limit || combined.length;

    return combined.slice(offset, offset + limit);
  }

  /**
   * Execute aggregation query across shards
   */
  public async aggregateQuery(
    aggregationType: 'count' | 'sum' | 'avg' | 'min' | 'max',
    query: string,
    column: string,
    params: any[] = []
  ): Promise<number> {
    const results = await this.scatterGather(query, params);

    const values = results
      .filter(r => !r.error)
      .flatMap(r => r.data)
      .map(row => Number(row[column]))
      .filter(val => !isNaN(val));

    switch (aggregationType) {
      case 'count':
        return values.length;
      case 'sum':
        return values.reduce((sum, val) => sum + val, 0);
      case 'avg':
        return values.length > 0
          ? values.reduce((sum, val) => sum + val, 0) / values.length
          : 0;
      case 'min':
        return values.length > 0 ? Math.min(...values) : 0;
      case 'max':
        return values.length > 0 ? Math.max(...values) : 0;
    }
  }

  /**
   * Execute cross-shard JOIN operation
   */
  public async crossShardJoin<T1, T2, TResult>(
    leftQuery: string,
    rightQuery: string,
    leftParams: any[],
    rightParams: any[],
    joinCondition: (left: T1, right: T2) => boolean,
    resultMapper: (left: T1, right: T2) => TResult
  ): Promise<TResult[]> {
    // Execute both queries in parallel
    const [leftResults, rightResults] = await Promise.all([
      this.scatterGather<T1>(leftQuery, leftParams),
      this.scatterGather<T2>(rightQuery, rightParams),
    ]);

    const leftData = this.aggregateResults(leftResults);
    const rightData = this.aggregateResults(rightResults);

    // Perform in-memory join
    const joined: TResult[] = [];

    for (const leftRow of leftData) {
      for (const rightRow of rightData) {
        if (joinCondition(leftRow, rightRow)) {
          joined.push(resultMapper(leftRow, rightRow));
        }
      }
    }

    return joined;
  }

  /**
   * Get query performance metrics
   */
  public analyzePerformance<T>(results: QueryResult<T>[]): {
    totalShards: number;
    successfulShards: number;
    failedShards: number;
    avgExecutionTimeMs: number;
    maxExecutionTimeMs: number;
    minExecutionTimeMs: number;
    totalRows: number;
  } {
    const successful = results.filter(r => !r.error);
    const failed = results.filter(r => r.error);
    const executionTimes = results.map(r => r.executionTimeMs);

    return {
      totalShards: results.length,
      successfulShards: successful.length,
      failedShards: failed.length,
      avgExecutionTimeMs:
        executionTimes.reduce((sum, t) => sum + t, 0) / executionTimes.length,
      maxExecutionTimeMs: Math.max(...executionTimes),
      minExecutionTimeMs: Math.min(...executionTimes),
      totalRows: successful.reduce((sum, r) => sum + r.data.length, 0),
    };
  }
}

Data Migration and Rebalancing

This migrator handles moving data between shards with minimal downtime:

interface MigrationPlan {
  sourceShardId: string;
  targetShardId: string;
  keyRange?: { start: string; end: string };
  estimatedRows: number;
}

interface MigrationProgress {
  planId: string;
  status: 'pending' | 'running' | 'completed' | 'failed';
  rowsMigrated: number;
  totalRows: number;
  startTime?: Date;
  endTime?: Date;
  error?: string;
}

export class ShardDataMigrator {
  private migrations: Map<string, MigrationProgress> = new Map();

  constructor(
    private router: ShardRouter,
    private shardManager: ShardManager
  ) {}

  /**
   * Create migration plan for rebalancing
   */
  public async planMigration(
    sourceShardId: string,
    targetShardId: string,
    options?: {
      keyRange?: { start: string; end: string };
      batchSize?: number;
    }
  ): Promise<string> {
    const planId = `migration-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;

    // Estimate number of rows to migrate
    const estimatedRows = await this.estimateMigrationSize(
      sourceShardId,
      options?.keyRange
    );

    const progress: MigrationProgress = {
      planId,
      status: 'pending',
      rowsMigrated: 0,
      totalRows: estimatedRows,
    };

    this.migrations.set(planId, progress);

    console.log(
      `Migration plan ${planId} created: ${estimatedRows} rows from ${sourceShardId} to ${targetShardId}`
    );

    return planId;
  }

  /**
   * Execute migration with live traffic
   */
  public async executeMigration(
    planId: string,
    options?: {
      batchSize?: number;
      pauseBetweenBatchesMs?: number;
      verifyData?: boolean;
    }
  ): Promise<void> {
    const progress = this.migrations.get(planId);
    if (!progress) {
      throw new Error(`Migration plan ${planId} not found`);
    }

    if (progress.status !== 'pending') {
      throw new Error(`Migration ${planId} is not in pending state`);
    }

    const batchSize = options?.batchSize || 1000;
    const pauseBetweenBatchesMs = options?.pauseBetweenBatchesMs || 100;
    const verifyData = options?.verifyData ?? true;

    progress.status = 'running';
    progress.startTime = new Date();

    try {
      // Get source and target pools
      const sourcePlan = this.extractPlanDetails(planId);
      const sourcePool = (this.router as any).pools.get(sourcePlan.sourceShardId);
      const targetPool = (this.router as any).pools.get(sourcePlan.targetShardId);

      if (!sourcePool || !targetPool) {
        throw new Error('Source or target pool not found');
      }

      let offset = 0;
      let hasMoreData = true;

      while (hasMoreData) {
        // Fetch batch from source
        const batch = await sourcePool.query(
          `SELECT * FROM conversations
           WHERE shard_key >= $1
           ORDER BY shard_key
           LIMIT $2 OFFSET $3`,
          [sourcePlan.keyRange?.start || '', batchSize, offset]
        );

        if (batch.rows.length === 0) {
          hasMoreData = false;
          break;
        }

        // Insert batch into target
        for (const row of batch.rows) {
          await targetPool.query(
            `INSERT INTO conversations (id, user_id, data, shard_key, created_at)
             VALUES ($1, $2, $3, $4, $5)
             ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data`,
            [row.id, row.user_id, row.data, row.shard_key, row.created_at]
          );
        }

        // Verify data if requested
        if (verifyData) {
          await this.verifyBatch(batch.rows, sourcePool, targetPool);
        }

        // Update progress
        progress.rowsMigrated += batch.rows.length;
        offset += batchSize;

        // Pause to avoid overwhelming database
        await new Promise(resolve => setTimeout(resolve, pauseBetweenBatchesMs));

        console.log(
          `Migration ${planId}: ${progress.rowsMigrated}/${progress.totalRows} rows migrated`
        );
      }

      progress.status = 'completed';
      progress.endTime = new Date();

      console.log(`Migration ${planId} completed successfully`);
    } catch (error) {
      progress.status = 'failed';
      progress.error = (error as Error).message;
      progress.endTime = new Date();

      console.error(`Migration ${planId} failed:`, error);
      throw error;
    }
  }

  /**
   * Estimate migration size
   */
  private async estimateMigrationSize(
    shardId: string,
    keyRange?: { start: string; end: string }
  ): Promise<number> {
    const pool = (this.router as any).pools.get(shardId);
    if (!pool) {
      throw new Error(`Pool not found for shard ${shardId}`);
    }

    let query = 'SELECT COUNT(*) as count FROM conversations';
    const params: any[] = [];

    if (keyRange) {
      query += ' WHERE shard_key >= $1 AND shard_key <= $2';
      params.push(keyRange.start, keyRange.end);
    }

    const result = await pool.query(query, params);
    return parseInt(result.rows[0].count, 10);
  }

  /**
   * Verify batch data integrity
   */
  private async verifyBatch(
    rows: any[],
    sourcePool: any,
    targetPool: any
  ): Promise<void> {
    for (const row of rows) {
      const sourceData = await sourcePool.query(
        'SELECT * FROM conversations WHERE id = $1',
        [row.id]
      );

      const targetData = await targetPool.query(
        'SELECT * FROM conversations WHERE id = $1',
        [row.id]
      );

      if (JSON.stringify(sourceData.rows[0]) !== JSON.stringify(targetData.rows[0])) {
        throw new Error(`Data verification failed for row ${row.id}`);
      }
    }
  }

  /**
   * Extract plan details (simplified - would read from storage in production)
   */
  private extractPlanDetails(planId: string): MigrationPlan {
    // In production, this would fetch from a persistent store
    return {
      sourceShardId: 'shard-1',
      targetShardId: 'shard-2',
      estimatedRows: 10000,
    };
  }

  /**
   * Get migration progress
   */
  public getMigrationProgress(planId: string): MigrationProgress | undefined {
    return this.migrations.get(planId);
  }

  /**
   * List all migrations
   */
  public listMigrations(): MigrationProgress[] {
    return Array.from(this.migrations.values());
  }
}

Hotspot Detection and Mitigation

This analyzer identifies and mitigates shard hotspots in real-time:

interface HotspotMetrics {
  shardId: string;
  requestsPerSecond: number;
  avgLatencyMs: number;
  p95LatencyMs: number;
  errorRate: number;
  timestamp: Date;
}

interface HotspotAlert {
  shardId: string;
  severity: 'warning' | 'critical';
  metric: 'requests' | 'latency' | 'errors';
  currentValue: number;
  threshold: number;
  recommendation: string;
}

export class HotspotDetector {
  private metricsHistory: Map<string, HotspotMetrics[]> = new Map();
  private readonly historyWindowMs = 300000; // 5 minutes
  private readonly samplingIntervalMs = 5000; // 5 seconds

  private readonly thresholds = {
    requestsPerSecond: 1000,
    avgLatencyMs: 100,
    p95LatencyMs: 500,
    errorRate: 0.05, // 5%
  };

  constructor(private router: ShardRouter) {
    this.startMonitoring();
  }

  /**
   * Start continuous monitoring
   */
  private startMonitoring(): void {
    setInterval(async () => {
      await this.collectMetrics();
      const alerts = this.analyzeHotspots();

      if (alerts.length > 0) {
        this.handleAlerts(alerts);
      }
    }, this.samplingIntervalMs);
  }

  /**
   * Collect metrics from all shards
   */
  private async collectMetrics(): Promise<void> {
    const shardIds = this.router.getAllShardIds();

    for (const shardId of shardIds) {
      const metrics = await this.measureShardPerformance(shardId);
      this.recordMetrics(shardId, metrics);
    }
  }

  /**
   * Measure shard performance
   */
  private async measureShardPerformance(shardId: string): Promise<HotspotMetrics> {
    const pool = (this.router as any).pools.get(shardId);
    if (!pool) {
      throw new Error(`Pool not found for shard ${shardId}`);
    }

    const startTime = Date.now();
    const latencies: number[] = [];
    let errors = 0;

    // Execute sample queries to measure performance
    const sampleSize = 100;
    for (let i = 0; i < sampleSize; i++) {
      const queryStart = Date.now();
      try {
        await pool.query('SELECT 1');
        latencies.push(Date.now() - queryStart);
      } catch (error) {
        errors++;
      }
    }

    const totalTime = Date.now() - startTime;
    const requestsPerSecond = (sampleSize / totalTime) * 1000;

    latencies.sort((a, b) => a - b);
    const avgLatencyMs = latencies.reduce((sum, l) => sum + l, 0) / latencies.length;
    const p95Index = Math.floor(latencies.length * 0.95);
    const p95LatencyMs = latencies[p95Index] || 0;

    return {
      shardId,
      requestsPerSecond,
      avgLatencyMs,
      p95LatencyMs,
      errorRate: errors / sampleSize,
      timestamp: new Date(),
    };
  }

  /**
   * Record metrics in history
   */
  private recordMetrics(shardId: string, metrics: HotspotMetrics): void {
    const history = this.metricsHistory.get(shardId) || [];
    history.push(metrics);

    // Keep only recent history
    const cutoff = Date.now() - this.historyWindowMs;
    const filtered = history.filter(m => m.timestamp.getTime() > cutoff);

    this.metricsHistory.set(shardId, filtered);
  }

  /**
   * Analyze for hotspots
   */
  private analyzeHotspots(): HotspotAlert[] {
    const alerts: HotspotAlert[] = [];

    for (const [shardId, history] of this.metricsHistory.entries()) {
      if (history.length === 0) continue;

      const latest = history[history.length - 1];

      // Check request rate
      if (latest.requestsPerSecond > this.thresholds.requestsPerSecond) {
        alerts.push({
          shardId,
          severity: latest.requestsPerSecond > this.thresholds.requestsPerSecond * 2
            ? 'critical'
            : 'warning',
          metric: 'requests',
          currentValue: latest.requestsPerSecond,
          threshold: this.thresholds.requestsPerSecond,
          recommendation: 'Consider splitting this shard or adding entropy to shard keys',
        });
      }

      // Check latency
      if (latest.p95LatencyMs > this.thresholds.p95LatencyMs) {
        alerts.push({
          shardId,
          severity: latest.p95LatencyMs > this.thresholds.p95LatencyMs * 2
            ? 'critical'
            : 'warning',
          metric: 'latency',
          currentValue: latest.p95LatencyMs,
          threshold: this.thresholds.p95LatencyMs,
          recommendation: 'Investigate slow queries or add read replicas',
        });
      }

      // Check error rate
      if (latest.errorRate > this.thresholds.errorRate) {
        alerts.push({
          shardId,
          severity: latest.errorRate > this.thresholds.errorRate * 2
            ? 'critical'
            : 'warning',
          metric: 'errors',
          currentValue: latest.errorRate,
          threshold: this.thresholds.errorRate,
          recommendation: 'Check shard health and connection pool configuration',
        });
      }
    }

    return alerts;
  }

  /**
   * Handle hotspot alerts
   */
  private handleAlerts(alerts: HotspotAlert[]): void {
    for (const alert of alerts) {
      console.warn(
        `[${alert.severity.toUpperCase()}] Hotspot detected on ${alert.shardId}:`,
        `${alert.metric} = ${alert.currentValue} (threshold: ${alert.threshold})`,
        `Recommendation: ${alert.recommendation}`
      );

      // In production, this would trigger:
      // - PagerDuty/Opsgenie alerts
      // - Auto-scaling responses
      // - Load shedding mechanisms
      // - Automatic shard splitting
    }
  }

  /**
   * Get hotspot report
   */
  public getHotspotReport(): {
    alerts: HotspotAlert[];
    metrics: Map<string, HotspotMetrics[]>;
  } {
    return {
      alerts: this.analyzeHotspots(),
      metrics: this.metricsHistory,
    };
  }
}

Rebalancing Orchestrator

This orchestrator manages shard rebalancing operations with zero downtime:

interface RebalanceStrategy {
  type: 'split' | 'merge' | 'move';
  sourceShards: string[];
  targetShards: string[];
  estimatedDurationMs: number;
  estimatedDowntime: boolean;
}

export class RebalanceOrchestrator {
  constructor(
    private shardManager: ShardManager,
    private migrator: ShardDataMigrator,
    private detector: HotspotDetector
  ) {}

  /**
   * Analyze and recommend rebalancing strategy
   */
  public async recommendStrategy(): Promise<RebalanceStrategy> {
    const report = this.detector.getHotspotReport();
    const criticalHotspots = report.alerts.filter(a => a.severity === 'critical');

    if (criticalHotspots.length === 0) {
      throw new Error('No critical hotspots detected - rebalancing not needed');
    }

    // For simplicity, recommend splitting the most critical shard
    const worstHotspot = criticalHotspots[0];

    return {
      type: 'split',
      sourceShards: [worstHotspot.shardId],
      targetShards: [`${worstHotspot.shardId}-split-1`, `${worstHotspot.shardId}-split-2`],
      estimatedDurationMs: 3600000, // 1 hour
      estimatedDowntime: false, // Zero-downtime split
    };
  }

  /**
   * Execute rebalancing strategy
   */
  public async executeRebalance(strategy: RebalanceStrategy): Promise<void> {
    console.log(`Starting rebalance operation: ${strategy.type}`);
    console.log(`Source shards: ${strategy.sourceShards.join(', ')}`);
    console.log(`Target shards: ${strategy.targetShards.join(', ')}`);

    switch (strategy.type) {
      case 'split':
        await this.executeSplit(strategy);
        break;
      case 'merge':
        await this.executeMerge(strategy);
        break;
      case 'move':
        await this.executeMove(strategy);
        break;
    }

    console.log('Rebalance operation completed successfully');
  }

  /**
   * Execute shard split
   */
  private async executeSplit(strategy: RebalanceStrategy): Promise<void> {
    const sourceShardId = strategy.sourceShards[0];
    const [targetShard1, targetShard2] = strategy.targetShards;

    // Add new shards
    await this.shardManager.addShard({
      shardId: targetShard1,
      host: 'db-host-new-1',
      port: 5432,
      database: 'chatgpt_apps',
      maxConnections: 50,
    });

    await this.shardManager.addShard({
      shardId: targetShard2,
      host: 'db-host-new-2',
      port: 5432,
      database: 'chatgpt_apps',
      maxConnections: 50,
    });

    // Migrate half the data to each new shard
    const migrationPlan1 = await this.migrator.planMigration(
      sourceShardId,
      targetShard1,
      { keyRange: { start: 'a', end: 'm' } }
    );

    const migrationPlan2 = await this.migrator.planMigration(
      sourceShardId,
      targetShard2,
      { keyRange: { start: 'n', end: 'z' } }
    );

    // Execute migrations in parallel
    await Promise.all([
      this.migrator.executeMigration(migrationPlan1),
      this.migrator.executeMigration(migrationPlan2),
    ]);

    // Remove old shard after successful migration
    await this.shardManager.removeShard(sourceShardId);
  }

  /**
   * Execute shard merge (placeholder)
   */
  private async executeMerge(strategy: RebalanceStrategy): Promise<void> {
    // Implementation similar to split but in reverse
    console.log('Merge operation not yet implemented');
  }

  /**
   * Execute shard move (placeholder)
   */
  private async executeMove(strategy: RebalanceStrategy): Promise<void> {
    // Implementation for moving data between existing shards
    console.log('Move operation not yet implemented');
  }
}

Scaling ChatGPT Apps with Sharding

Database sharding is essential for ChatGPT applications that need to scale beyond single-server constraints. By implementing the strategies and production code in this guide, you can build a robust sharding architecture that handles millions of users, maintains low latency, and scales horizontally as your user base grows.

The key to successful sharding is choosing the right shard key, implementing consistent hashing for even distribution, and building operational tools for monitoring hotspots and rebalancing load. Start with hash-based sharding using user ID as your shard key, monitor for hotspots using the detector implementation, and plan for rebalancing as your data distribution evolves.

Ready to build scalable ChatGPT apps without database bottlenecks? MakeAIHQ provides production-ready infrastructure for ChatGPT applications, including automatic database scaling, built-in sharding support, and zero-downtime migrations. Start your free trial today and deploy your ChatGPT app to millions of users without worrying about database scalability.

Related Resources

  • Complete Guide to Building ChatGPT Applications - Comprehensive pillar guide
  • Database Optimization for ChatGPT Apps - Query optimization strategies
  • Horizontal Scaling Strategies for ChatGPT Apps - Application-level scaling
  • Multi-Tenant Architecture for ChatGPT Apps - Tenant isolation patterns

External Resources


About MakeAIHQ: We're the leading no-code platform for building and deploying ChatGPT applications to the ChatGPT App Store. Our infrastructure handles database sharding, scaling, and optimization automatically, so you can focus on building great conversational experiences. Start building today.