Queue Infrastructure
This page covers the infrastructure and operations layer of the queue system: driver internals, worker process management, auto-scaling, batch processing, custom failed-job handlers, and production HA.
Queue Drivers
Database Queue Driver
The database driver provides ACID-compliant job storage with transaction support and efficient indexing.
Features
- ACID Compliance: Full transaction support for job operations
- Atomic Job Reservation: Row-level locking for job picking
- Priority Queues: Priority-based job ordering with compound indexes
- Delayed Jobs: Timestamp-based job scheduling
- Failed Job Isolation: Separate table for failed job tracking
- Batch Operations: Efficient bulk insertions
Usage
use Glueful\Queue\QueueManager;
// Initialize queue manager with database driver
$queueManager = new QueueManager([
'default' => 'database',
'connections' => [
'database' => [
'driver' => 'database',
'table' => 'queue_jobs',
'failed_table' => 'queue_failed_jobs',
'retry_after' => 90
]
]
]);
// Push immediate job
$jobUuid = $queueManager->push('ProcessEmail', [
'to' => 'user@example.com',
'template' => 'welcome'
]);
// Push delayed job (5 minutes)
$delayedUuid = $queueManager->later(300, 'SendReminder', [
'user_id' => 123,
'type' => 'subscription_expiry'
]);
// Push high priority job
$priorityUuid = $queueManager->push('ProcessPayment', [
'order_id' => 456,
'amount' => 99.99,
'priority' => 10
]);
Database Schema
-- Queue jobs table
CREATE TABLE queue_jobs (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
uuid CHAR(21) NOT NULL UNIQUE,
queue VARCHAR(100) NOT NULL,
payload LONGTEXT NOT NULL,
attempts INTEGER DEFAULT 0,
reserved_at INTEGER NULL,
available_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
priority INTEGER DEFAULT 0,
batch_id CHAR(21) NULL,
INDEX idx_queue_processing (queue, reserved_at, available_at, priority),
INDEX idx_batch_id (batch_id)
);
-- Failed jobs table
CREATE TABLE queue_failed_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
uuid CHAR(12) NOT NULL UNIQUE,
connection VARCHAR(255) NOT NULL,
queue VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
exception TEXT NOT NULL,
batch_uuid CHAR(12) NULL,
failed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_connection_queue (connection, queue),
INDEX idx_failed_at (failed_at)
);
Redis Queue Driver
The Redis driver provides high-performance job processing with atomic operations and memory efficiency.
Features
- High Performance: Memory-based storage with atomic operations
- Atomic Operations: Redis transactions for job consistency
- Priority Queues: Sorted sets for priority-based processing
- Delayed Jobs: Redis ZADD for timestamp-based scheduling
- Memory Efficient: Optimized data structures and job expiration
- Connection Pooling: Persistent connections with failover support
Redis Data Structures
# Queue structures
queue:{name} # List for immediate jobs
queue:{name}:delayed # Sorted set for delayed jobs (score = timestamp)
queue:{name}:reserved # Sorted set for reserved jobs (score = timeout)
queue:{name}:failed # List for failed jobs
# Job data
job:{uuid} # Hash containing job data
# Queue registry
queues # Set of all queue names
Usage
// Redis queue configuration
$queueManager = new QueueManager([
'default' => 'redis',
'connections' => [
'redis' => [
'driver' => 'redis',
'host' => '127.0.0.1',
'port' => 6379,
'database' => 0,
'prefix' => 'glueful:queue:',
'retry_after' => 90,
'job_expiration' => 3600
]
]
]);
// Redis supports all the same operations as database driver
$queueManager->push('ProcessImages', ['images' => [1, 2, 3]]);
$queueManager->later(600, 'CleanupTemp', ['older_than' => '1 hour']);
Job System
Tasks vs Jobs Architecture
Starting with Glueful 1.2.0, the framework implements a clear separation between business logic (Tasks) and queue execution (Jobs):
- Tasks (
src/Tasks/): Contains pure business logic that can be executed directly or queued - Jobs (
src/Queue/Jobs/): Lightweight wrappers that handle queue-specific concerns like retries, logging, and error handling
This architecture provides several benefits:
- Testability: Business logic can be tested independently of queue infrastructure
- Flexibility: Tasks can be executed directly without queuing when needed
- Maintainability: Clear separation of concerns between business logic and execution context
- Reliability: Queue-specific error handling and retry logic is centralized in Job classes
Example: Cache Maintenance
// Task: Pure business logic
use Glueful\Tasks\CacheMaintenanceTask;
class CacheMaintenanceTask
{
public function handle(array $options = []): array
{
$operation = $options['operation'] ?? 'clearExpiredKeys';
$verbose = $options['verbose'] ?? false;
return match ($operation) {
'clearExpiredKeys' => $this->clearExpiredKeys($verbose),
'optimizeCache' => $this->optimizeCache($verbose),
'fullCleanup' => $this->fullCleanup($options),
default => throw new \InvalidArgumentException("Unknown operation: {$operation}")
};
}
private function clearExpiredKeys(bool $verbose): array
{
// Business logic for clearing expired cache keys
$cleared = 0;
// ... implementation
return ['operation' => 'clearExpiredKeys', 'cleared' => $cleared];
}
}
// Job: Queue wrapper with error handling
use Glueful\Queue\Jobs\CacheMaintenanceJob;
use Glueful\Queue\Job;
class CacheMaintenanceJob extends Job
{
public function handle(): void
{
$data = $this->getData();
$operation = $data['operation'] ?? 'clearExpiredKeys';
$options = $data['options'] ?? [];
try {
$task = new CacheMaintenanceTask();
$result = $task->handle($options);
// Log successful completion
app($context, LogManager::class)->info('Cache maintenance completed successfully', [
'operation' => $operation,
'result' => $result,
'job_uuid' => $this->getUuid()
]);
} catch (\Exception $e) {
app($context, LogManager::class)->error('Cache maintenance failed', [
'operation' => $operation,
'error' => $e->getMessage(),
'job_uuid' => $this->getUuid()
]);
throw $e;
}
}
public function failed(\Exception $exception): void
{
$data = $this->getData();
$operation = $data['operation'] ?? 'clearExpiredKeys';
$options = $data['options'] ?? [];
app($context, LogManager::class)->error('Cache maintenance job failed', [
'operation' => $operation,
'options' => $options,
'error' => $exception->getMessage()
]);
}
}
Using Tasks and Jobs
// Direct execution (testing, CLI commands)
$task = new CacheMaintenanceTask();
$result = $task->handle(['operation' => 'clearExpiredKeys', 'verbose' => true]);
// Queued execution
$queueManager = app($context, QueueManager::class);
$jobId = $queueManager->push(CacheMaintenanceJob::class, [
'operation' => 'clearExpiredKeys',
'options' => ['verbose' => true]
], 'maintenance');
// Scheduled execution
$jobId = $queueManager->later(3600, CacheMaintenanceJob::class, [
'operation' => 'optimizeCache',
'options' => ['retention_days' => 30]
], 'maintenance');
Available Framework Tasks
The framework includes several built-in tasks for common maintenance operations:
// Cache maintenance
use Glueful\Tasks\CacheMaintenanceTask;
$task = new CacheMaintenanceTask();
$result = $task->handle(['operation' => 'fullCleanup', 'retention_days' => 30]);
// Database backup
use Glueful\Tasks\DatabaseBackupTask;
$task = new DatabaseBackupTask();
$result = $task->handle(['retention_days' => 7, 'compress' => true]);
// Log cleanup
use Glueful\Tasks\LogCleanupTask;
$task = new LogCleanupTask();
$result = $task->handle(['retention_days' => 14, 'cleanup_type' => 'filesystem']);
// Session cleanup
use Glueful\Tasks\SessionCleanupTask;
$task = new SessionCleanupTask();
$result = $task->handle(['max_lifetime' => 1440]); // 24 hours
// Notification retry processing
use Glueful\Tasks\NotificationRetryTask;
$task = new NotificationRetryTask();
$result = $task->handle(['max_retries' => 3, 'retry_delay' => 300]);
Process Management
Worker Process Management
The ProcessManager handles the lifecycle of worker processes with automatic scaling, health monitoring, and graceful shutdown.
use Glueful\Queue\Process\ProcessManager;
use Glueful\Queue\Process\ProcessFactory;
use Glueful\Queue\Monitoring\WorkerMonitor;
use Glueful\Queue\WorkerOptions;
// Initialize process manager
$processManager = new ProcessManager(
factory: new ProcessFactory(),
monitor: new WorkerMonitor(),
logger: $logger,
config: [
'max_workers' => 20,
'restart_delay' => 5,
'health_check_interval' => 30
]
);
// Spawn workers for different queues
$emailWorkerOptions = new WorkerOptions(
memory: 256, // MB
timeout: 120, // seconds
maxJobs: 500, // jobs per worker
maxAttempts: 3 // retry attempts
);
$imageWorkerOptions = new WorkerOptions(
memory: 512, // Higher memory for image processing
timeout: 300, // Longer timeout
maxJobs: 100 // Fewer jobs due to resource intensity
);
// Spawn workers
$emailWorker = $processManager->spawn('emails', $emailWorkerOptions);
$imageWorker = $processManager->spawn('images', $imageWorkerOptions);
// Scale workers based on load
$processManager->scale(5, 'emails', $emailWorkerOptions); // Scale to 5 email workers
$processManager->scale(2, 'images', $imageWorkerOptions); // Scale to 2 image workers
Worker Status and Monitoring
// Get status of all workers
$status = $processManager->getStatus();
/*
[
[
'id' => 'worker_abc123',
'queue' => 'emails',
'pid' => 12345,
'status' => 'running',
'memory_usage' => 145.2,
'cpu_usage' => 15.6,
'jobs_processed' => 250,
'started_at' => '2024-07-02 10:15:30',
'last_heartbeat' => '2024-07-02 12:45:22'
],
// ... more workers
]
*/
// Monitor worker health
$processManager->monitorHealth(); // Automatically restarts unhealthy workers
// Manual worker restart
$processManager->restart('worker_abc123');
// Graceful shutdown of all workers
$processManager->stopAll(timeout: 60);
Auto-Scaling
Intelligent Worker Scaling
The AutoScaler automatically adjusts worker count based on queue load, processing rates, and resource utilization.
use Glueful\Queue\Process\AutoScaler;
// Initialize auto-scaler
$autoScaler = new AutoScaler(
processManager: $processManager,
queueManager: $queueManager,
logger: $logger,
config: [
'enabled' => true,
'limits' => [
'max_workers_per_queue' => 15
],
'auto_scale' => [
'scale_up_threshold' => 100, // Queue size threshold
'scale_down_threshold' => 10, // Queue size threshold
'scale_up_step' => 2, // Workers to add
'scale_down_step' => 1, // Workers to remove
'cooldown_period' => 300 // Seconds between scaling
],
'queues' => [
'emails' => [
'auto_scale' => true,
'min_workers' => 2,
'max_workers' => 10,
'scale_up_threshold' => 50,
'max_wait_time' => 30
],
'images' => [
'auto_scale' => true,
'min_workers' => 1,
'max_workers' => 5,
'scale_up_threshold' => 20,
'max_wait_time' => 120
]
]
]
);
// Perform auto-scaling check
$scalingActions = $autoScaler->scale();
/*
[
[
'queue' => 'emails',
'action' => 'scale_up',
'from' => 3,
'to' => 5,
'reason' => 'Queue size (75) > threshold (50), High worker utilization (92%)',
'metrics' => [
'queue_size' => 75,
'avg_worker_utilization' => 92,
'processing_rate' => 25.5,
'incoming_rate' => 35.2
]
]
]
*/
// Force scaling (bypasses cooldown)
$autoScaler->forceScale('images', 3, 'Manual scale for image batch processing');
// Get scaling history
$history = $autoScaler->getScalingHistory('emails');
/*
[
[
'queue' => 'emails',
'timestamp' => 1720035123,
'from_workers' => 3,
'to_workers' => 5,
'reason' => 'Queue size (75) > threshold (50)'
],
// ... more history entries
]
*/
Scaling Metrics and Decision Logic
// The auto-scaler considers multiple metrics for scaling decisions:
// Scale-up conditions:
// - Queue size > scale_up_threshold
// - Incoming rate > processing rate * 1.5
// - Average worker utilization > 85%
// - Average wait time > max_wait_time
// Scale-down conditions:
// - Queue size < scale_down_threshold
// - Average worker utilization < 30% AND wait time < 10 seconds
// - Processing rate > incoming rate * 2 AND queue size < 5
// Custom scaling logic can be implemented by extending AutoScaler
class CustomAutoScaler extends AutoScaler
{
protected function shouldScaleUp(array $metrics, int $currentWorkers, array $queueConfig): bool
{
// Add custom business logic
$isBusinessHours = (date('H') >= 9 && date('H') <= 17);
$isWeekday = !in_array(date('w'), [0, 6]);
if ($isBusinessHours && $isWeekday) {
// More aggressive scaling during business hours
return $metrics['queue_size'] > 25 || parent::shouldScaleUp($metrics, $currentWorkers, $queueConfig);
}
return parent::shouldScaleUp($metrics, $currentWorkers, $queueConfig);
}
}
Batch Processing
Creating and Managing Batches
use Glueful\Helpers\Utils;
// Create a batch of related jobs
$batchUuid = Utils::generateNanoID();
$jobs = [
[
'job' => ProcessEmailJob::class,
'data' => ['to' => 'user1@example.com', 'template' => 'newsletter'],
'batch_uuid' => $batchUuid
],
[
'job' => ProcessEmailJob::class,
'data' => ['to' => 'user2@example.com', 'template' => 'newsletter'],
'batch_uuid' => $batchUuid
],
[
'job' => ProcessEmailJob::class,
'data' => ['to' => 'user3@example.com', 'template' => 'newsletter'],
'batch_uuid' => $batchUuid
]
];
// Push batch jobs
$uuids = $queueManager->bulk($jobs, 'emails');
// Track batch progress
class BatchProgressTracker
{
private QueueManager $queueManager;
public function getBatchProgress(string $batchUuid): array
{
// Get batch job statistics from database
$db = container($context)->get(DatabaseInterface::class);
$completed = $db->count('queue_jobs_completed', ['batch_uuid' => $batchUuid]);
$failed = $db->count('queue_failed_jobs', ['batch_uuid' => $batchUuid]);
$pending = $db->count('queue_jobs', ['batch_uuid' => $batchUuid]);
$total = $completed + $failed + $pending;
return [
'batch_uuid' => $batchUuid,
'total' => $total,
'completed' => $completed,
'failed' => $failed,
'pending' => $pending,
'progress_percentage' => $total > 0 ? ($completed / $total) * 100 : 0,
'success_rate' => ($completed + $failed) > 0 ? ($completed / ($completed + $failed)) * 100 : 0
];
}
}
Batch Completion Callbacks
class NewsletterBatchJob extends Job
{
public function handle(): void
{
// Process individual newsletter email
$this->sendNewsletterEmail();
// Check if this is the last job in the batch
if ($this->isBatchComplete()) {
$this->handleBatchCompletion();
}
}
private function isBatchComplete(): bool
{
$batchUuid = $this->getBatchUuid();
if (!$batchUuid) {
return true; // Not part of batch
}
$db = container($context)->get(DatabaseInterface::class);
$remaining = $db->count('queue_jobs', ['batch_uuid' => $batchUuid]);
return $remaining <= 1; // This job is the last one
}
private function handleBatchCompletion(): void
{
$batchUuid = $this->getBatchUuid();
// Create completion job
$queueManager = container($context)->get(QueueManager::class);
$queueManager->push(NewsletterBatchCompletedJob::class, [
'batch_uuid' => $batchUuid,
'completed_at' => date('Y-m-d H:i:s')
]);
}
}
class NewsletterBatchCompletedJob extends Job
{
public function handle(): void
{
$data = $this->getData();
$batchUuid = $data['batch_uuid'];
// Generate batch report
$tracker = new BatchProgressTracker();
$progress = $tracker->getBatchProgress($batchUuid);
// Send completion notification
$notificationService = container($context)->get(NotificationService::class);
$notificationService->send(
'admin@example.com',
'Newsletter Batch Completed',
'batch-completion',
[
'batch_uuid' => $batchUuid,
'total_emails' => $progress['total'],
'success_rate' => $progress['success_rate'],
'completed_at' => $data['completed_at']
]
);
}
}
Failed Job Handling
Failed Job Recovery and Analysis
use Glueful\Queue\Failed\FailedJobProvider;
// Get failed job provider
$failedJobProvider = new FailedJobProvider($database);
// Get failed jobs
$failedJobs = $failedJobProvider->all();
foreach ($failedJobs as $failedJob) {
echo "Failed Job: {$failedJob['uuid']}\n";
echo "Queue: {$failedJob['queue']}\n";
echo "Failed At: {$failedJob['failed_at']}\n";
echo "Exception: {$failedJob['exception']}\n";
echo "---\n";
}
// Get failed jobs for specific queue
$emailFailures = $failedJobProvider->getByQueue('emails');
// Retry specific failed job
$failedJobProvider->retry('failed_job_uuid_123');
// Retry all failed jobs for a queue
$retryCount = $failedJobProvider->retryQueue('emails');
// Delete old failed jobs
$deletedCount = $failedJobProvider->prune(days: 30);
Custom Failed Job Handlers
class CustomFailedJobHandler
{
public function handle(JobInterface $job, \Exception $exception): void
{
$jobData = $job->getData();
// Categorize failure
$failureCategory = $this->categorizeFailure($exception);
switch ($failureCategory) {
case 'network_error':
// Retry with exponential backoff
$delay = min(3600, pow(2, $job->getAttempts()) * 60);
$job->release($delay);
break;
case 'validation_error':
// Don't retry validation errors, log for investigation
$this->logValidationError($job, $exception);
break;
case 'resource_exhaustion':
// Retry after longer delay when resources recover
$job->release(1800); // 30 minutes
break;
default:
// Standard retry logic
if ($job->shouldRetry()) {
$job->release(300); // 5 minutes
} else {
$this->moveToFailedJobs($job, $exception);
}
}
}
private function categorizeFailure(\Exception $exception): string
{
$message = strtolower($exception->getMessage());
if (strpos($message, 'network') !== false || strpos($message, 'timeout') !== false) {
return 'network_error';
}
if (strpos($message, 'validation') !== false || strpos($message, 'invalid') !== false) {
return 'validation_error';
}
if (strpos($message, 'memory') !== false || strpos($message, 'disk') !== false) {
return 'resource_exhaustion';
}
return 'unknown';
}
}
Production Deployment
High-Availability Setup
Run multiple lean core workers under a process manager (Supervisor, systemd). Each
queue:work invocation is a single worker — use numprocs to scale.
# Use a process manager like Supervisor for production
# /etc/supervisor/conf.d/glueful-workers.conf
[program:glueful-worker-emails]
command=php /path/to/your-app/glueful queue:work --queue=emails --memory=256 --timeout=120
process_name=%(program_name)s_%(process_num)02d
numprocs=4
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-queue-emails.log
[program:glueful-worker-images]
command=php /path/to/your-app/glueful queue:work --queue=images --memory=512 --timeout=300
process_name=%(program_name)s_%(process_num)02d
numprocs=2
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/glueful-queue-images.log
Performance Optimization
// Production optimization tips:
// 1. Use Redis for high-throughput workloads
$config['default'] = 'redis';
// 2. Tune worker settings for your workload
$config['worker_options'] = [
'memory' => 512, // Increase for memory-intensive jobs
'timeout' => 120, // Increase for long-running jobs
'max_jobs' => 500, // Balance between efficiency and memory usage
'sleep' => 1 // Reduce for high-frequency workloads
];
// 3. Enable auto-scaling for variable workloads (requires glueful/queue-ops;
// configured under queue_ops.* once the extension is installed)
$config['auto_scaling']['enabled'] = true;
// 4. Use connection pooling for database drivers
$config['connections']['database']['pool_size'] = 20;
// 5. Configure appropriate retry settings
$config['connections']['redis']['retry_after'] = 60; // Faster retry for Redis
// 6. Enable monitoring for production insights
$config['monitoring']['enabled'] = true;
Monitoring and Alerting Setup
// Set up monitoring webhook for queue alerts
class QueueAlertingService
{
public function checkAndAlert(): void
{
$monitor = new QueueMonitoringService();
$alerts = $monitor->getAlerts();
foreach ($alerts as $alert) {
match ($alert['severity']) {
'error' => $this->sendCriticalAlert($alert),
'warning' => $this->sendWarningAlert($alert),
default => $this->logAlert($alert)
};
}
}
private function sendCriticalAlert(array $alert): void
{
// Send to PagerDuty, Slack, email, etc.
$this->notificationService->send([
'channels' => ['pagerduty', 'slack'],
'message' => "CRITICAL: Queue Alert - {$alert['message']}",
'data' => $alert
]);
}
}
// Run checkAndAlert() periodically — register it as a scheduled task, or invoke it
// from your own cron-run console command. (There is no built-in queue:check-alerts command.)