Cookbook

Queue Infrastructure

Advanced queue operations — driver internals, process management, auto-scaling, batch processing, failed-job handlers, and production HA.

This page covers the infrastructure and operations layer of the queue system: driver internals, worker process management, auto-scaling, batch processing, custom failed-job handlers, and production HA.

Queue Drivers

Database Queue Driver

The database driver provides ACID-compliant job storage with transaction support and efficient indexing.

Features

  • ACID Compliance: Full transaction support for job operations
  • Atomic Job Reservation: Row-level locking for job picking
  • Priority Queues: Priority-based job ordering with compound indexes
  • Delayed Jobs: Timestamp-based job scheduling
  • Failed Job Isolation: Separate table for failed job tracking
  • Batch Operations: Efficient bulk insertions

Usage

use Glueful\Queue\QueueManager;

// Initialize queue manager with database driver
$queueManager = new QueueManager([
    'default' => 'database',
    'connections' => [
        'database' => [
            'driver' => 'database',
            'table' => 'queue_jobs',
            'failed_table' => 'queue_failed_jobs',
            'retry_after' => 90
        ]
    ]
]);

// Push immediate job
$jobUuid = $queueManager->push('ProcessEmail', [
    'to' => 'user@example.com',
    'template' => 'welcome'
]);

// Push delayed job (5 minutes)
$delayedUuid = $queueManager->later(300, 'SendReminder', [
    'user_id' => 123,
    'type' => 'subscription_expiry'
]);

// Push high priority job
$priorityUuid = $queueManager->push('ProcessPayment', [
    'order_id' => 456,
    'amount' => 99.99,
    'priority' => 10
]);

Database Schema

-- Queue jobs table
CREATE TABLE queue_jobs (
    id INTEGER PRIMARY KEY AUTO_INCREMENT,
    uuid CHAR(21) NOT NULL UNIQUE,
    queue VARCHAR(100) NOT NULL,
    payload LONGTEXT NOT NULL,
    attempts INTEGER DEFAULT 0,
    reserved_at INTEGER NULL,
    available_at INTEGER NOT NULL,
    created_at INTEGER NOT NULL,
    priority INTEGER DEFAULT 0,
    batch_id CHAR(21) NULL,
    
    INDEX idx_queue_processing (queue, reserved_at, available_at, priority),
    INDEX idx_batch_id (batch_id)
);

-- Failed jobs table
CREATE TABLE queue_failed_jobs (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    uuid CHAR(12) NOT NULL UNIQUE,
    connection VARCHAR(255) NOT NULL,
    queue VARCHAR(255) NOT NULL,
    payload TEXT NOT NULL,
    exception TEXT NOT NULL,
    batch_uuid CHAR(12) NULL,
    failed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_connection_queue (connection, queue),
    INDEX idx_failed_at (failed_at)
);

Redis Queue Driver

The Redis driver provides high-performance job processing with atomic operations and memory efficiency.

Features

  • High Performance: Memory-based storage with atomic operations
  • Atomic Operations: Redis transactions for job consistency
  • Priority Queues: Sorted sets for priority-based processing
  • Delayed Jobs: Redis ZADD for timestamp-based scheduling
  • Memory Efficient: Optimized data structures and job expiration
  • Connection Pooling: Persistent connections with failover support

Redis Data Structures

# Queue structures
queue:{name}               # List for immediate jobs
queue:{name}:delayed       # Sorted set for delayed jobs (score = timestamp)
queue:{name}:reserved      # Sorted set for reserved jobs (score = timeout)
queue:{name}:failed        # List for failed jobs

# Job data
job:{uuid}                 # Hash containing job data

# Queue registry
queues                     # Set of all queue names

Usage

// Redis queue configuration
$queueManager = new QueueManager([
    'default' => 'redis',
    'connections' => [
        'redis' => [
            'driver' => 'redis',
            'host' => '127.0.0.1',
            'port' => 6379,
            'database' => 0,
            'prefix' => 'glueful:queue:',
            'retry_after' => 90,
            'job_expiration' => 3600
        ]
    ]
]);

// Redis supports all the same operations as database driver
$queueManager->push('ProcessImages', ['images' => [1, 2, 3]]);
$queueManager->later(600, 'CleanupTemp', ['older_than' => '1 hour']);

Job System

Tasks vs Jobs Architecture

Starting with Glueful 1.2.0, the framework implements a clear separation between business logic (Tasks) and queue execution (Jobs):

  • Tasks (src/Tasks/): Contains pure business logic that can be executed directly or queued
  • Jobs (src/Queue/Jobs/): Lightweight wrappers that handle queue-specific concerns like retries, logging, and error handling

This architecture provides several benefits:

  • Testability: Business logic can be tested independently of queue infrastructure
  • Flexibility: Tasks can be executed directly without queuing when needed
  • Maintainability: Clear separation of concerns between business logic and execution context
  • Reliability: Queue-specific error handling and retry logic is centralized in Job classes

Example: Cache Maintenance

// Task: Pure business logic
use Glueful\Tasks\CacheMaintenanceTask;

class CacheMaintenanceTask
{
    public function handle(array $options = []): array
    {
        $operation = $options['operation'] ?? 'clearExpiredKeys';
        $verbose = $options['verbose'] ?? false;

        return match ($operation) {
            'clearExpiredKeys' => $this->clearExpiredKeys($verbose),
            'optimizeCache' => $this->optimizeCache($verbose),
            'fullCleanup' => $this->fullCleanup($options),
            default => throw new \InvalidArgumentException("Unknown operation: {$operation}")
        };
    }

    private function clearExpiredKeys(bool $verbose): array
    {
        // Business logic for clearing expired cache keys
        $cleared = 0;
        // ... implementation
        return ['operation' => 'clearExpiredKeys', 'cleared' => $cleared];
    }
}

// Job: Queue wrapper with error handling
use Glueful\Queue\Jobs\CacheMaintenanceJob;
use Glueful\Queue\Job;

class CacheMaintenanceJob extends Job
{
    public function handle(): void
    {
        $data = $this->getData();
        $operation = $data['operation'] ?? 'clearExpiredKeys';
        $options = $data['options'] ?? [];

        try {
            $task = new CacheMaintenanceTask();
            $result = $task->handle($options);

            // Log successful completion
            app($context, LogManager::class)->info('Cache maintenance completed successfully', [
                'operation' => $operation,
                'result' => $result,
                'job_uuid' => $this->getUuid()
            ]);

        } catch (\Exception $e) {
            app($context, LogManager::class)->error('Cache maintenance failed', [
                'operation' => $operation,
                'error' => $e->getMessage(),
                'job_uuid' => $this->getUuid()
            ]);
            throw $e;
        }
    }

    public function failed(\Exception $exception): void
    {
        $data = $this->getData();
        $operation = $data['operation'] ?? 'clearExpiredKeys';
        $options = $data['options'] ?? [];

        app($context, LogManager::class)->error('Cache maintenance job failed', [
            'operation' => $operation,
            'options' => $options,
            'error' => $exception->getMessage()
        ]);
    }
}

Using Tasks and Jobs

// Direct execution (testing, CLI commands)
$task = new CacheMaintenanceTask();
$result = $task->handle(['operation' => 'clearExpiredKeys', 'verbose' => true]);

// Queued execution
$queueManager = app($context, QueueManager::class);
$jobId = $queueManager->push(CacheMaintenanceJob::class, [
    'operation' => 'clearExpiredKeys',
    'options' => ['verbose' => true]
], 'maintenance');

// Scheduled execution
$jobId = $queueManager->later(3600, CacheMaintenanceJob::class, [
    'operation' => 'optimizeCache',
    'options' => ['retention_days' => 30]
], 'maintenance');

Available Framework Tasks

The framework includes several built-in tasks for common maintenance operations:

// Cache maintenance
use Glueful\Tasks\CacheMaintenanceTask;
$task = new CacheMaintenanceTask();
$result = $task->handle(['operation' => 'fullCleanup', 'retention_days' => 30]);

// Database backup
use Glueful\Tasks\DatabaseBackupTask;
$task = new DatabaseBackupTask();
$result = $task->handle(['retention_days' => 7, 'compress' => true]);

// Log cleanup
use Glueful\Tasks\LogCleanupTask;
$task = new LogCleanupTask();
$result = $task->handle(['retention_days' => 14, 'cleanup_type' => 'filesystem']);

// Session cleanup
use Glueful\Tasks\SessionCleanupTask;
$task = new SessionCleanupTask();
$result = $task->handle(['max_lifetime' => 1440]); // 24 hours

// Notification retry processing
use Glueful\Tasks\NotificationRetryTask;
$task = new NotificationRetryTask();
$result = $task->handle(['max_retries' => 3, 'retry_delay' => 300]);

Process Management

Worker Process Management

The ProcessManager handles the lifecycle of worker processes with automatic scaling, health monitoring, and graceful shutdown.

use Glueful\Queue\Process\ProcessManager;
use Glueful\Queue\Process\ProcessFactory;
use Glueful\Queue\Monitoring\WorkerMonitor;
use Glueful\Queue\WorkerOptions;

// Initialize process manager
$processManager = new ProcessManager(
    factory: new ProcessFactory(),
    monitor: new WorkerMonitor(),
    logger: $logger,
    config: [
        'max_workers' => 20,
        'restart_delay' => 5,
        'health_check_interval' => 30
    ]
);

// Spawn workers for different queues
$emailWorkerOptions = new WorkerOptions(
    memory: 256,        // MB
    timeout: 120,       // seconds
    maxJobs: 500,       // jobs per worker
    maxAttempts: 3      // retry attempts
);

$imageWorkerOptions = new WorkerOptions(
    memory: 512,        // Higher memory for image processing
    timeout: 300,       // Longer timeout
    maxJobs: 100        // Fewer jobs due to resource intensity
);

// Spawn workers
$emailWorker = $processManager->spawn('emails', $emailWorkerOptions);
$imageWorker = $processManager->spawn('images', $imageWorkerOptions);

// Scale workers based on load
$processManager->scale(5, 'emails', $emailWorkerOptions);  // Scale to 5 email workers
$processManager->scale(2, 'images', $imageWorkerOptions);  // Scale to 2 image workers

Worker Status and Monitoring

// Get status of all workers
$status = $processManager->getStatus();
/*
[
    [
        'id' => 'worker_abc123',
        'queue' => 'emails',
        'pid' => 12345,
        'status' => 'running',
        'memory_usage' => 145.2,
        'cpu_usage' => 15.6,
        'jobs_processed' => 250,
        'started_at' => '2024-07-02 10:15:30',
        'last_heartbeat' => '2024-07-02 12:45:22'
    ],
    // ... more workers
]
*/

// Monitor worker health
$processManager->monitorHealth(); // Automatically restarts unhealthy workers

// Manual worker restart
$processManager->restart('worker_abc123');

// Graceful shutdown of all workers
$processManager->stopAll(timeout: 60);

Auto-Scaling

Intelligent Worker Scaling

The AutoScaler automatically adjusts worker count based on queue load, processing rates, and resource utilization.

use Glueful\Queue\Process\AutoScaler;

// Initialize auto-scaler
$autoScaler = new AutoScaler(
    processManager: $processManager,
    queueManager: $queueManager,
    logger: $logger,
    config: [
        'enabled' => true,
        'limits' => [
            'max_workers_per_queue' => 15
        ],
        'auto_scale' => [
            'scale_up_threshold' => 100,      // Queue size threshold
            'scale_down_threshold' => 10,     // Queue size threshold
            'scale_up_step' => 2,             // Workers to add
            'scale_down_step' => 1,           // Workers to remove
            'cooldown_period' => 300          // Seconds between scaling
        ],
        'queues' => [
            'emails' => [
                'auto_scale' => true,
                'min_workers' => 2,
                'max_workers' => 10,
                'scale_up_threshold' => 50,
                'max_wait_time' => 30
            ],
            'images' => [
                'auto_scale' => true,
                'min_workers' => 1,
                'max_workers' => 5,
                'scale_up_threshold' => 20,
                'max_wait_time' => 120
            ]
        ]
    ]
);

// Perform auto-scaling check
$scalingActions = $autoScaler->scale();
/*
[
    [
        'queue' => 'emails',
        'action' => 'scale_up',
        'from' => 3,
        'to' => 5,
        'reason' => 'Queue size (75) > threshold (50), High worker utilization (92%)',
        'metrics' => [
            'queue_size' => 75,
            'avg_worker_utilization' => 92,
            'processing_rate' => 25.5,
            'incoming_rate' => 35.2
        ]
    ]
]
*/

// Force scaling (bypasses cooldown)
$autoScaler->forceScale('images', 3, 'Manual scale for image batch processing');

// Get scaling history
$history = $autoScaler->getScalingHistory('emails');
/*
[
    [
        'queue' => 'emails',
        'timestamp' => 1720035123,
        'from_workers' => 3,
        'to_workers' => 5,
        'reason' => 'Queue size (75) > threshold (50)'
    ],
    // ... more history entries
]
*/

Scaling Metrics and Decision Logic

// The auto-scaler considers multiple metrics for scaling decisions:

// Scale-up conditions:
// - Queue size > scale_up_threshold
// - Incoming rate > processing rate * 1.5
// - Average worker utilization > 85%
// - Average wait time > max_wait_time

// Scale-down conditions:
// - Queue size < scale_down_threshold
// - Average worker utilization < 30% AND wait time < 10 seconds
// - Processing rate > incoming rate * 2 AND queue size < 5

// Custom scaling logic can be implemented by extending AutoScaler
class CustomAutoScaler extends AutoScaler
{
    protected function shouldScaleUp(array $metrics, int $currentWorkers, array $queueConfig): bool
    {
        // Add custom business logic
        $isBusinessHours = (date('H') >= 9 && date('H') <= 17);
        $isWeekday = !in_array(date('w'), [0, 6]);
        
        if ($isBusinessHours && $isWeekday) {
            // More aggressive scaling during business hours
            return $metrics['queue_size'] > 25 || parent::shouldScaleUp($metrics, $currentWorkers, $queueConfig);
        }
        
        return parent::shouldScaleUp($metrics, $currentWorkers, $queueConfig);
    }
}

Batch Processing

Creating and Managing Batches

use Glueful\Helpers\Utils;

// Create a batch of related jobs
$batchUuid = Utils::generateNanoID();

$jobs = [
    [
        'job' => ProcessEmailJob::class,
        'data' => ['to' => 'user1@example.com', 'template' => 'newsletter'],
        'batch_uuid' => $batchUuid
    ],
    [
        'job' => ProcessEmailJob::class,
        'data' => ['to' => 'user2@example.com', 'template' => 'newsletter'],
        'batch_uuid' => $batchUuid
    ],
    [
        'job' => ProcessEmailJob::class,
        'data' => ['to' => 'user3@example.com', 'template' => 'newsletter'],
        'batch_uuid' => $batchUuid
    ]
];

// Push batch jobs
$uuids = $queueManager->bulk($jobs, 'emails');

// Track batch progress
class BatchProgressTracker
{
    private QueueManager $queueManager;
    
    public function getBatchProgress(string $batchUuid): array
    {
        // Get batch job statistics from database
        $db = container($context)->get(DatabaseInterface::class);
        
        $completed = $db->count('queue_jobs_completed', ['batch_uuid' => $batchUuid]);
        $failed = $db->count('queue_failed_jobs', ['batch_uuid' => $batchUuid]);
        $pending = $db->count('queue_jobs', ['batch_uuid' => $batchUuid]);
        
        $total = $completed + $failed + $pending;
        
        return [
            'batch_uuid' => $batchUuid,
            'total' => $total,
            'completed' => $completed,
            'failed' => $failed,
            'pending' => $pending,
            'progress_percentage' => $total > 0 ? ($completed / $total) * 100 : 0,
            'success_rate' => ($completed + $failed) > 0 ? ($completed / ($completed + $failed)) * 100 : 0
        ];
    }
}

Batch Completion Callbacks

class NewsletterBatchJob extends Job
{
    public function handle(): void
    {
        // Process individual newsletter email
        $this->sendNewsletterEmail();
        
        // Check if this is the last job in the batch
        if ($this->isBatchComplete()) {
            $this->handleBatchCompletion();
        }
    }
    
    private function isBatchComplete(): bool
    {
        $batchUuid = $this->getBatchUuid();
        if (!$batchUuid) {
            return true; // Not part of batch
        }
        
        $db = container($context)->get(DatabaseInterface::class);
        $remaining = $db->count('queue_jobs', ['batch_uuid' => $batchUuid]);
        
        return $remaining <= 1; // This job is the last one
    }
    
    private function handleBatchCompletion(): void
    {
        $batchUuid = $this->getBatchUuid();
        
        // Create completion job
        $queueManager = container($context)->get(QueueManager::class);
        $queueManager->push(NewsletterBatchCompletedJob::class, [
            'batch_uuid' => $batchUuid,
            'completed_at' => date('Y-m-d H:i:s')
        ]);
    }
}

class NewsletterBatchCompletedJob extends Job
{
    public function handle(): void
    {
        $data = $this->getData();
        $batchUuid = $data['batch_uuid'];
        
        // Generate batch report
        $tracker = new BatchProgressTracker();
        $progress = $tracker->getBatchProgress($batchUuid);
        
        // Send completion notification
        $notificationService = container($context)->get(NotificationService::class);
        $notificationService->send(
            'admin@example.com',
            'Newsletter Batch Completed',
            'batch-completion',
            [
                'batch_uuid' => $batchUuid,
                'total_emails' => $progress['total'],
                'success_rate' => $progress['success_rate'],
                'completed_at' => $data['completed_at']
            ]
        );
    }
}

Failed Job Handling

Failed Job Recovery and Analysis

use Glueful\Queue\Failed\FailedJobProvider;

// Get failed job provider
$failedJobProvider = new FailedJobProvider($database);

// Get failed jobs
$failedJobs = $failedJobProvider->all();

foreach ($failedJobs as $failedJob) {
    echo "Failed Job: {$failedJob['uuid']}\n";
    echo "Queue: {$failedJob['queue']}\n";
    echo "Failed At: {$failedJob['failed_at']}\n";
    echo "Exception: {$failedJob['exception']}\n";
    echo "---\n";
}

// Get failed jobs for specific queue
$emailFailures = $failedJobProvider->getByQueue('emails');

// Retry specific failed job
$failedJobProvider->retry('failed_job_uuid_123');

// Retry all failed jobs for a queue
$retryCount = $failedJobProvider->retryQueue('emails');

// Delete old failed jobs
$deletedCount = $failedJobProvider->prune(days: 30);

Custom Failed Job Handlers

class CustomFailedJobHandler
{
    public function handle(JobInterface $job, \Exception $exception): void
    {
        $jobData = $job->getData();
        
        // Categorize failure
        $failureCategory = $this->categorizeFailure($exception);
        
        switch ($failureCategory) {
            case 'network_error':
                // Retry with exponential backoff
                $delay = min(3600, pow(2, $job->getAttempts()) * 60);
                $job->release($delay);
                break;
                
            case 'validation_error':
                // Don't retry validation errors, log for investigation
                $this->logValidationError($job, $exception);
                break;
                
            case 'resource_exhaustion':
                // Retry after longer delay when resources recover
                $job->release(1800); // 30 minutes
                break;
                
            default:
                // Standard retry logic
                if ($job->shouldRetry()) {
                    $job->release(300); // 5 minutes
                } else {
                    $this->moveToFailedJobs($job, $exception);
                }
        }
    }
    
    private function categorizeFailure(\Exception $exception): string
    {
        $message = strtolower($exception->getMessage());
        
        if (strpos($message, 'network') !== false || strpos($message, 'timeout') !== false) {
            return 'network_error';
        }
        
        if (strpos($message, 'validation') !== false || strpos($message, 'invalid') !== false) {
            return 'validation_error';
        }
        
        if (strpos($message, 'memory') !== false || strpos($message, 'disk') !== false) {
            return 'resource_exhaustion';
        }
        
        return 'unknown';
    }
}

Production Deployment

High-Availability Setup

Run multiple lean core workers under a process manager (Supervisor, systemd). Each queue:work invocation is a single worker — use numprocs to scale.

# Use a process manager like Supervisor for production
# /etc/supervisor/conf.d/glueful-workers.conf

[program:glueful-worker-emails]
command=php /path/to/your-app/glueful queue:work --queue=emails --memory=256 --timeout=120
process_name=%(program_name)s_%(process_num)02d
numprocs=4
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-queue-emails.log

[program:glueful-worker-images]
command=php /path/to/your-app/glueful queue:work --queue=images --memory=512 --timeout=300
process_name=%(program_name)s_%(process_num)02d
numprocs=2
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-queue-images.log

Performance Optimization

// Production optimization tips:

// 1. Use Redis for high-throughput workloads
$config['default'] = 'redis';

// 2. Tune worker settings for your workload
$config['worker_options'] = [
    'memory' => 512,        // Increase for memory-intensive jobs
    'timeout' => 120,       // Increase for long-running jobs
    'max_jobs' => 500,      // Balance between efficiency and memory usage
    'sleep' => 1           // Reduce for high-frequency workloads
];

// 3. Enable auto-scaling for variable workloads (requires glueful/queue-ops;
//    configured under queue_ops.* once the extension is installed)
$config['auto_scaling']['enabled'] = true;

// 4. Use connection pooling for database drivers
$config['connections']['database']['pool_size'] = 20;

// 5. Configure appropriate retry settings
$config['connections']['redis']['retry_after'] = 60; // Faster retry for Redis

// 6. Enable monitoring for production insights
$config['monitoring']['enabled'] = true;

Monitoring and Alerting Setup

// Set up monitoring webhook for queue alerts
class QueueAlertingService
{
    public function checkAndAlert(): void
    {
        $monitor = new QueueMonitoringService();
        $alerts = $monitor->getAlerts();
        
        foreach ($alerts as $alert) {
            match ($alert['severity']) {
                'error' => $this->sendCriticalAlert($alert),
                'warning' => $this->sendWarningAlert($alert),
                default => $this->logAlert($alert)
            };
        }
    }
    
    private function sendCriticalAlert(array $alert): void
    {
        // Send to PagerDuty, Slack, email, etc.
        $this->notificationService->send([
            'channels' => ['pagerduty', 'slack'],
            'message' => "CRITICAL: Queue Alert - {$alert['message']}",
            'data' => $alert
        ]);
    }
}

// Run checkAndAlert() periodically — register it as a scheduled task, or invoke it
// from your own cron-run console command. (There is no built-in queue:check-alerts command.)