🚨 ¡Nueva review! ¡Mi teclado ideal! ⌨️ Perfecto para programar, el Logitech MX Keys S . ¡Échale un ojo! 👀

Queues y Jobs con Bull + Redis en NestJS

Serie NestJS #18 : @nestjs/bull, @Processor, @Process, @OnQueueEvent, colas de emails/notificaciones, reintentos, concurrencia y Docker + Redis

Escrito por domin el 16 de abril de 2026

Vamos con el post número 18 de la serie NestJS y segundo del bloque de Eventos, Colas y Tiempo Real. En el post anterior aprendimos a desacoplar módulos con eventos: el AuthService emite user.registered y se olvida y los listeners reaccionan a esto. Pero… ¿qué pasa cuando un listener necesita hacer algo pesado? ¿Enviar un email? ¿Generar un PDF? ¿Redimensionar una imagen? Si el listener falla, se pierde y no hay reintentos. No hay persistencia y no hay control.

Aquí entran en juego las colas de trabajo. Un evento dice algo ha pasado y un job dice haz esto, pero más tarde, con reintentos, con prioridad, y si falla 3 veces mándame un aviso. Los eventos son efímeros y los jobs son persistentes.

Como en cada post vamos a revisar toda la base que llevamos a las espaldas en esta serie: Docker (post 1), controllers (post 2), DI (post 3), módulos (post 4), middleware (post 5), validación (post 6), TypeORM (posts 7-9), seguridad (posts 10-12), funcionalidades avanzadas (posts 13-16) y eventos (post 17).

EA, amo al lío.

Diagrama de colas de trabajo con Bull y Redis procesando jobs en background en NestJS.

1. Eventos vs Colas: ¿cuál es la diferencia?

En el post anterior vimos que los eventos son fire-and-forget, vamos que lo lanzas y te olvidas, pero esto tiene alguna limitación:

EVENTOS (@nestjs/event-emitter):
 Desacoplamiento entre módulos
 Respuesta inmediata al usuario
 Si el listener falla, el job se pierde para siempre
 No hay reintentos automáticos
 Todo se ejecuta en el mismo proceso Node.js
 Si el servidor se reinicia, los eventos en vuelo desaparecen

COLAS (Bull + Redis):
 Persistencia: los jobs se guardan en Redis hasta que se procesan
 Reintentos automáticos con backoff exponencial
 Prioridades: "este email es urgente, este report puede esperar"
 Concurrencia controlada: "procesa máximo 5 jobs a la vez"
 Rate limiting: "máximo 100 emails por minuto"
 Delayed jobs: "envía este recordatorio en 24 horas"
 Escalable: varios workers procesando la misma cola
📡 Eventos (post 17)

Para tareas ligeras que no te importa si fallan. Notificaciones in-app, métricas, logs. Sin persistencia y todo en memoria.

📋 Colas (este post)

Para tareas pesadas que DEBEN completarse como emails, PDFs, procesamiento de imágenes, exports. Con persistencia en Redis, reintentos y control total.


2. ¿Qué es Bull?

es la librería estándar de colas para Node.js. Usa como broker para almacenar y distribuir los jobs.

El flujo sería el siguiente:

1. Tu servicio crea un job Bull lo guarda en Redis
2. Un @Processor toma el job de la cola lo ejecuta
3. Si falla Bull lo reintenta automáticamente según tu config
4. Si tiene éxito Bull lo marca como completado
5. Todo persiste en Redis si el servidor se reinicia, los jobs pendientes siguen ahí

3. Docker Compose: añadiendo Redis

Ya teníamos PostgreSQL en Docker desde el post 7, y ahora añadimos Redis:

# docker-compose.yml
services:
    app:
        build: .
        ports:
            - '3000:3000'
        depends_on:
            - postgres
            - redis
        environment:
            DATABASE_HOST: postgres
            DATABASE_PORT: 5432
            DATABASE_USER: nestjs
            DATABASE_PASSWORD: nestjs_password
            DATABASE_NAME: nestjs_db
            REDIS_HOST: redis
            REDIS_PORT: 6379
        volumes:
            - .:/app
            - /app/node_modules

    postgres:
        image: postgres:16-alpine
        environment:
            POSTGRES_USER: nestjs
            POSTGRES_PASSWORD: nestjs_password
            POSTGRES_DB: nestjs_db
        ports:
            - '5432:5432'
        volumes:
            - postgres_data:/var/lib/postgresql/data

    redis:
        image: redis:7-alpine
        ports:
            - '6379:6379'
        volumes:
            - redis_data:/data
        command: redis-server --appendonly yes

volumes:
    postgres_data:
    redis_data:

El flag --appendonly yes activa la AOF (Append Only File) de Redis. Si Redis se reinicia, no pierdes los jobs pendientes.


4. Instalación

Instalación de Bull para NestJS 0 / 2
$
Pulsa para ejecutar el siguiente comando

Instalamos tres paquetes:


5. Configuración del módulo

Registra Bull en AppModule con la conexión a Redis:

// src/app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
    imports: [
        ConfigModule.forRoot({ isGlobal: true }),
        BullModule.forRootAsync({
            imports: [ConfigModule],
            inject: [ConfigService],
            useFactory: (configService: ConfigService) => ({
                redis: {
                    host: configService.getOrThrow<string>('REDIS_HOST'),
                    port: configService.getOrThrow<number>('REDIS_PORT'),
                },
                defaultJobOptions: {
                    // Opciones por defecto para TODOS los jobs
                    removeOnComplete: 100, // Mantiene los últimos 100 completados
                    removeOnFail: 500, // Mantiene los últimos 500 fallidos
                    attempts: 3, // Reintentar 3 veces si falla
                    backoff: {
                        type: 'exponential',
                        delay: 2000, // 2s → 4s → 8s entre reintentos
                    },
                },
            }),
        }),
        // ... otros módulos
    ],
})
export class AppModule {}

Las defaultJobOptions aplican a todas las colas de la app. Cada cola puede sobreescribirlas. Desglose:


6. Tu primera cola: emails

Vamos a ver un caso real que sería mover el envío de emails a una cola. Recuerda que en el post 17 teníamos un listener que enviaba emails de bienvenida. Si el mail server se caía, el email se iba a la puta, ahora ya no.

6.1. Registrar la cola en el módulo

// src/mail/mail.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { MailService } from './mail.service';
import { MailProcessor } from './processors/mail.processor';
import { MAIL_QUEUE } from './constants';

@Module({
    imports: [
        BullModule.registerQueue({
            name: MAIL_QUEUE,
            defaultJobOptions: {
                attempts: 5, // Los emails son críticos, más reintentos
                backoff: {
                    type: 'exponential',
                    delay: 3000, // 3s → 6s → 12s → 24s → 48s
                },
                priority: 2, // Prioridad media por defecto
            },
        }),
    ],
    providers: [MailService, MailProcessor],
    exports: [MailService],
})
export class MailModule {}
// src/mail/constants.ts
export const MAIL_QUEUE = 'mail';

export const MailJobNames = {
    WELCOME: 'welcome',
    PASSWORD_RESET: 'password-reset',
    ORDER_CONFIRMATION: 'order-confirmation',
    WEEKLY_DIGEST: 'weekly-digest',
} as const;

export type MailJobName = (typeof MailJobNames)[keyof typeof MailJobNames];

as const + el type derivado: misma técnica que usamos con los eventos en el post anterior. Hay que evitar las magic strings y por eso implementamos constantes tipadas.

6.2. El servicio que añade jobs a la cola

// src/mail/mail.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { MAIL_QUEUE, MailJobNames } from './constants';
import { type WelcomeMailJobData } from './interfaces/mail-job-data.interface';
import { type PasswordResetMailJobData } from './interfaces/mail-job-data.interface';

@Injectable()
export class MailService {
    private readonly logger = new Logger(MailService.name);

    constructor(
        @InjectQueue(MAIL_QUEUE)
        private readonly mailQueue: Queue
    ) {}

    async sendWelcomeEmail(email: string, name: string): Promise<void> {
        const jobData: WelcomeMailJobData = { email, name };

        await this.mailQueue.add(MailJobNames.WELCOME, jobData, {
            priority: 1, // Alta prioridad: el usuario acaba de registrarse
        });

        this.logger.log(`Job de email de bienvenida encolado para ${email}`);
    }

    async sendPasswordResetEmail(email: string, resetToken: string): Promise<void> {
        const jobData: PasswordResetMailJobData = { email, resetToken };

        await this.mailQueue.add(MailJobNames.PASSWORD_RESET, jobData, {
            priority: 1, // Urgente
            attempts: 5, // Más reintentos que el default
            removeOnComplete: true, // No guardar datos sensibles
        });

        this.logger.log(`Job de reset de password encolado para ${email}`);
    }

    async sendWeeklyDigest(recipients: ReadonlyArray<{ email: string; name: string }>): Promise<void> {
        const jobs = recipients.map((recipient) => ({
            name: MailJobNames.WEEKLY_DIGEST,
            data: { email: recipient.email, name: recipient.name },
            opts: {
                priority: 5, // Baja prioridad: no es urgente
            },
        }));

        await this.mailQueue.addBulk(jobs);

        this.logger.log(`${recipients.length} jobs de weekly digest encolados`);
    }
}

Vamos a ver qué tenemos por aquí:

6.3. Interfaces tipadas para los datos de los jobs

// src/mail/interfaces/mail-job-data.interface.ts
export interface WelcomeMailJobData {
    readonly email: string;
    readonly name: string;
}

export interface PasswordResetMailJobData {
    readonly email: string;
    readonly resetToken: string;
}

export interface OrderConfirmationMailJobData {
    readonly email: string;
    readonly orderId: string;
    readonly items: ReadonlyArray<{
        readonly name: string;
        readonly quantity: number;
        readonly price: number;
    }>;
}

export interface WeeklyDigestMailJobData {
    readonly email: string;
    readonly name: string;
}

export type MailJobData =
    | WelcomeMailJobData
    | PasswordResetMailJobData
    | OrderConfirmationMailJobData
    | WeeklyDigestMailJobData;

Cada tipo de job tiene su propia interfaz con readonly, así el processor sabe qué datos deben llegar.


7. El Processor: quien hace el trabajo

El es el worker que toma los jobs de la cola y los ejecuta:

// src/mail/processors/mail.processor.ts
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { type Job } from 'bull';
import { MAIL_QUEUE, MailJobNames } from '../constants';
import { type WelcomeMailJobData } from '../interfaces/mail-job-data.interface';
import { type PasswordResetMailJobData } from '../interfaces/mail-job-data.interface';
import { type WeeklyDigestMailJobData } from '../interfaces/mail-job-data.interface';
import { MailTransportService } from '../mail-transport.service';

@Processor(MAIL_QUEUE)
export class MailProcessor {
    private readonly logger = new Logger(MailProcessor.name);

    constructor(private readonly mailTransport: MailTransportService) {}

    @Process(MailJobNames.WELCOME)
    async handleWelcomeEmail(job: Job<WelcomeMailJobData>): Promise<void> {
        this.logger.log(`Procesando email de bienvenida para ${job.data.email} (intento ${job.attemptsMade + 1})`);

        await this.mailTransport.send({
            to: job.data.email,
            subject: '¡Bienvenido!',
            template: 'welcome',
            context: { name: job.data.name },
        });
    }

    @Process(MailJobNames.PASSWORD_RESET)
    async handlePasswordReset(job: Job<PasswordResetMailJobData>): Promise<void> {
        this.logger.log(`Procesando email de reset de password para ${job.data.email}`);

        await this.mailTransport.send({
            to: job.data.email,
            subject: 'Resetea tu contraseña',
            template: 'password-reset',
            context: { resetToken: job.data.resetToken },
        });
    }

    @Process(MailJobNames.WEEKLY_DIGEST)
    async handleWeeklyDigest(job: Job<WeeklyDigestMailJobData>): Promise<void> {
        await this.mailTransport.send({
            to: job.data.email,
            subject: 'Tu resumen semanal',
            template: 'weekly-digest',
            context: { name: job.data.name },
        });
    }

    @OnQueueActive()
    onActive(job: Job): void {
        this.logger.log(`Procesando job ${job.id} de tipo ${job.name}...`);
    }

    @OnQueueCompleted()
    onCompleted(job: Job): void {
        this.logger.log(`Job ${job.id} (${job.name}) completado`);
    }

    @OnQueueFailed()
    onFailed(job: Job, error: Error): void {
        this.logger.error(`Job ${job.id} (${job.name}) falló: ${error.message}`, error.stack);
    }
}

Vamos a ver cada cosa con cariño:


8. Ciclo de vida de un job

Cada job pasa por varios estados. Bull te da hooks para reaccionar a cada uno:

  add() → WAITING → ACTIVE → COMPLETED ✅

                    FAILED

              (si quedan reintentos)

                   DELAYED WAITING ACTIVE ...

              (si no quedan reintentos)

                    FAILED (definitivo) 💀
DecoradorCuándo se ejecutaCaso de uso
@OnQueueWaiting()El job entra en la colaMétricas de cola
@OnQueueActive()Un worker empieza a procesar el jobLogging, tracking
@OnQueueCompleted()El job se completó con éxitoNotificaciones, métricas
@OnQueueFailed()El job falló (puede que se reintente)Alertas, logging de errores
@OnQueueStalled()El worker murió sin completar el jobAlertas críticas
@OnQueueDrained()La cola se vació (no quedan jobs)Batch processing, limpieza

El más peligroso: @OnQueueStalled(). Un job stalled significa que el worker murió sin terminar. Bull lo detecta y lo reencola automáticamente. Si ves muchos stalled jobs, tu worker tiene un memory leak o un timeout demasiado ajustado.


9. Estrategias de reintentos y backoff

El reintento no es volver a ejecutar y ya, hay estrategias:

// Backoff exponencial (recomendado para APIs externas)
await this.mailQueue.add(MailJobNames.WELCOME, data, {
    attempts: 5,
    backoff: {
        type: 'exponential',
        delay: 2000,
        // Intento 1: falla → espera 2s
        // Intento 2: falla → espera 4s
        // Intento 3: falla → espera 8s
        // Intento 4: falla → espera 16s
        // Intento 5: falla → marcado como FAILED definitivo
    },
});

// Backoff fijo (para errores transitorios conocidos)
await this.mailQueue.add(MailJobNames.WELCOME, data, {
    attempts: 3,
    backoff: {
        type: 'fixed',
        delay: 5000,
        // Siempre espera 5s entre reintentos
    },
});
📈 Exponencial

Cada reintento espera el doble que el anterior: 2s → 4s → 8s → 16s. Ideal cuando llamas a servicios externos (APIs de email, pagos, etc.) que pueden estar temporalmente caídos. Les das tiempo para recuperarse sin petarlos.

⏱️ Fijo

Siempre espera lo mismo entre reintentos: 5s → 5s → 5s. Útil para errores puntuales donde el tiempo de espera no importa: un lock de base de datos, un conflicto de concurrencia.

Backoff custom: lógica avanzada de reintentos

Para escenarios más complejos, define una función de backoff custom:

await this.mailQueue.add(MailJobNames.WELCOME, data, {
    attempts: 5,
    backoff: {
        type: 'custom',
    },
});

Y en el processor, implementa la lógica:

// src/mail/processors/mail.processor.ts
@Processor({
    name: MAIL_QUEUE,
    settings: {
        backoffStrategies: {
            custom: (attemptsMade: number): number => {
                // Jitter: añade aleatoriedad para evitar thundering herd
                const baseDelay = Math.pow(2, attemptsMade) * 1000;
                const jitter = Math.random() * 1000;
                return baseDelay + jitter;
            },
        },
    },
})
export class MailProcessor {
    // ...
}

El jitter es crucial cuando tienes muchos workers. Sin jitter, todos reintentan al mismo tiempo y saturan el servicio caído.


10. Delayed jobs: tareas programadas

Jobs que se ejecutan en el futuro y no inmediatamente:

@Injectable()
export class NotificationService {
    constructor(
        @InjectQueue('notifications')
        private readonly notificationQueue: Queue
    ) {}

    // Enviar un recordatorio 24 horas después del registro
    async scheduleWelcomeReminder(userId: string, email: string): Promise<void> {
        await this.notificationQueue.add(
            'welcome-reminder',
            { userId, email },
            {
                delay: 24 * 60 * 60 * 1000, // 24 horas en ms
            }
        );
    }

    // Enviar recordatorio de carrito abandonado después de 2 horas
    async scheduleCartReminder(userId: string, cartId: string): Promise<void> {
        const job = await this.notificationQueue.add(
            'cart-reminder',
            { userId, cartId },
            {
                delay: 2 * 60 * 60 * 1000, // 2 horas
                jobId: `cart-reminder-${cartId}`, // ID único por carrito
            }
        );

        // Guardar el jobId para poder cancelarlo si el usuario compra
        return job;
    }

    // Cancelar el recordatorio si el usuario completa la compra
    async cancelCartReminder(cartId: string): Promise<void> {
        const job = await this.notificationQueue.getJob(`cart-reminder-${cartId}`);

        if (job) {
            await job.remove();
        }
    }
}

Vamos a ver el patrón del carrito abandonado:

  1. Usuario añade productos → programas un recordatorio a 2 horas.
  2. Si compra antes de 2 horas → cancelas el job con job.remove().
  3. Si no compra → el job se ejecuta y envía el recordatorio.

El jobId custom (cart-reminder-${cartId}) te permite localizar y cancelar un job específico. Sin jobId, Bull asigna un ID auto-incremental y no hay forma fácil de encontrar el job después.


11. Concurrencia: procesar múltiples jobs a la vez

Por defecto, un processor procesa un job a la vez. Si cada email tarda 2 segundos y tienes 1000 en cola, tardarás 33 minutos. Con concurrencia:

@Processor({
    name: MAIL_QUEUE,
    concurrency: 5, // Procesa hasta 5 jobs simultáneamente
})
export class MailProcessor {
    // ...
}

O a nivel de método @Process individual:

@Process({ name: MailJobNames.WELCOME, concurrency: 3 })
async handleWelcomeEmail(job: Job<WelcomeMailJobData>): Promise<void> {
    // Hasta 3 emails de bienvenida simultáneos
}

@Process({ name: MailJobNames.WEEKLY_DIGEST, concurrency: 10 })
async handleWeeklyDigest(job: Job<WeeklyDigestMailJobData>): Promise<void> {
    // Hasta 10 digests simultáneos (menos críticos, más paralelismo)
}
Cuándo subir la concurrencia
  • Jobs que esperan respuesta de servicios externos (I/O bound): emails, APIs, uploads. El CPU está idle mientras espera
  • Jobs independientes que no comparten recursos ni tienen efectos colaterales entre sí
  • Cola con muchos jobs pendientes y el throughput actual no es suficiente
Cuándo mantenerla baja
  • Jobs CPU-intensive: procesamiento de imágenes, cálculos pesados. Más concurrencia = más contención de CPU
  • Jobs que acceden a recursos limitados: una API con rate limit de 10 req/s. Si pones concurrency: 50, te banean
  • Jobs que modifican los mismos datos: riesgo de race conditions y conflictos de escritura

12. Rate limiting: controlar el ritmo

Cuando llamas a APIs externas con límites, necesitas rate limiting :

BullModule.registerQueue({
    name: MAIL_QUEUE,
    limiter: {
        max: 100,       // Máximo 100 jobs
        duration: 60000, // por cada 60 segundos (1 minuto)
    },
}),

Esto es global para la cola. No importa la concurrencia porque aunque tengas 10 workers, Bull garantiza que no se procesan más de 100 jobs por minuto en total.

Combinado con la concurrencia:

concurrency: 5  Hasta 5 jobs ejecutándose al mismo tiempo
limiter: 100/min Pero no más de 100 jobs por minuto en total

Si cada job tarda 0.5s 5 workers × 2 jobs/s = 10 jobs/s = 600 jobs/min
Con limiter: Bull frena a 100/min, los workers esperan entre lotes

13. Conectando eventos con colas

La combinación más potente sería usar los eventos del post 17 para alimentar las colas de este post. El evento es el trigger, la cola es el ejecutor:

// src/mail/listeners/user-registered-queue.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { MailService } from '../mail.service';
import { UserRegisteredEvent } from '../../auth/events/user-registered.event';
import { NotificationService } from '../../notifications/notification.service';
import { EventNames } from '../../common/events/event-names';

@Injectable()
export class UserRegisteredQueueListener {
    private readonly logger = new Logger(UserRegisteredQueueListener.name);

    constructor(
        private readonly mailService: MailService,
        private readonly notificationService: NotificationService
    ) {}

    @OnEvent(EventNames.USER_REGISTERED)
    async handleUserRegistered(event: UserRegisteredEvent): Promise<void> {
        // No enviamos el email directamente : lo encolamos
        await this.mailService.sendWelcomeEmail(event.email, event.name);

        // Programar un recordatorio para 24h después
        await this.notificationService.scheduleWelcomeReminder(event.userId, event.email);

        this.logger.log(`Jobs de bienvenida encolados para ${event.email}`);
    }
}

El flujo completo:

1. AuthService emit('user.registered')          (fire-and-forget)
2. UserRegisteredQueueListener @OnEvent          (reacciona al evento)
3. mailService.sendWelcomeEmail()  queue.add()    (encola el job)
4. MailProcessor @Process('welcome')             (procesa el job)
5. Si falla Bull reintenta automáticamente       (3-5 intentos con backoff)

Tres capas de resiliencia:

  1. Evento: desacopla el AuthService del email.
  2. Cola: persiste el job en Redis, sobrevive a reinicios.
  3. Reintentos: si el mail server se cae, reintenta automáticamente.

14. Cola de procesamiento de imágenes

Otro caso real podría ser redimensionar imágenes subidas por usuarios. En el post 16 subíamos archivos y ahora procesamos esos archivos en segundo plano:

// src/images/constants.ts
export const IMAGE_QUEUE = 'image-processing';

export const ImageJobNames = {
    RESIZE: 'resize',
    GENERATE_THUMBNAILS: 'generate-thumbnails',
    OPTIMIZE: 'optimize',
} as const;
// src/images/interfaces/image-job-data.interface.ts
export interface ResizeImageJobData {
    readonly filePath: string;
    readonly userId: string;
    readonly dimensions: ReadonlyArray<{
        readonly width: number;
        readonly height: number;
        readonly suffix: string;
    }>;
}

export interface GenerateThumbnailsJobData {
    readonly filePath: string;
    readonly userId: string;
    readonly sizes: ReadonlyArray<number>;
}
// src/images/processors/image.processor.ts
import { Processor, Process, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { type Job } from 'bull';
import { IMAGE_QUEUE, ImageJobNames } from '../constants';
import { type ResizeImageJobData } from '../interfaces/image-job-data.interface';
import { type GenerateThumbnailsJobData } from '../interfaces/image-job-data.interface';
import { ImageTransformService } from '../image-transform.service';

@Processor({
    name: IMAGE_QUEUE,
    concurrency: 3, // Máximo 3 imágenes a la vez (CPU-intensive)
})
export class ImageProcessor {
    private readonly logger = new Logger(ImageProcessor.name);

    constructor(private readonly imageTransform: ImageTransformService) {}

    @Process(ImageJobNames.RESIZE)
    async handleResize(job: Job<ResizeImageJobData>): Promise<void> {
        const { filePath, dimensions } = job.data;

        for (let i = 0; i < dimensions.length; i++) {
            const dim = dimensions[i];
            await this.imageTransform.resize(filePath, dim.width, dim.height, dim.suffix);

            // Reportar progreso al job
            const progress = Math.round(((i + 1) / dimensions.length) * 100);
            await job.progress(progress);
        }
    }

    @Process(ImageJobNames.GENERATE_THUMBNAILS)
    async handleThumbnails(job: Job<GenerateThumbnailsJobData>): Promise<void> {
        const { filePath, sizes } = job.data;

        for (let i = 0; i < sizes.length; i++) {
            await this.imageTransform.thumbnail(filePath, sizes[i]);
            await job.progress(Math.round(((i + 1) / sizes.length) * 100));
        }
    }

    @OnQueueFailed()
    onFailed(job: Job, error: Error): void {
        this.logger.error(`Procesamiento de imagen falló (job ${job.id}): ${error.message}`, error.stack);
    }
}

Atención al job.progress() porque reporta el progreso del job en porcentaje. Si tienes un dashboard (como Bull Board, que veremos al final), puedes ver cuánto lleva cada job en tiempo real.

La concurrencia es baja (3) porque el procesamiento de imágenes es CPU-intensive. Más workers no ayudan si el CPU ya está petada al 100%.


15. Jobs con progreso: tracking en tiempo real

job.progress() no solo sirve para dashboards:

// src/images/images.controller.ts
import { Controller, Get, Param } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { IMAGE_QUEUE } from './constants';

interface JobStatusResponse {
    readonly id: string;
    readonly state: string;
    readonly progress: number;
    readonly failedReason?: string;
}

@Controller('images')
export class ImagesController {
    constructor(
        @InjectQueue(IMAGE_QUEUE)
        private readonly imageQueue: Queue
    ) {}

    @Get('jobs/:jobId/status')
    async getJobStatus(@Param('jobId') jobId: string): Promise<JobStatusResponse> {
        const job = await this.imageQueue.getJob(jobId);

        if (!job) {
            return {
                id: jobId,
                state: 'not_found',
                progress: 0,
            };
        }

        const state = await job.getState();

        return {
            id: job.id.toString(),
            state,
            progress: job.progress() as number,
            failedReason: job.failedReason ?? undefined,
        };
    }
}

El cliente puede hacer polling a este endpoint para mostrar una barra de progreso:

GET /images/jobs/42/status
 { "id": "42", "state": "active", "progress": 66 }

GET /images/jobs/42/status  (unos segundos después)
 { "id": "42", "state": "completed", "progress": 100 }

16. Jobs repetibles (cron jobs)

Jobs que se ejecutan en un schedule, como un cron pero gestionado por Bull y Redis:

// src/reports/reports.module.ts
import { Module, type OnModuleInit } from '@nestjs/common';
import { BullModule, InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { ReportsProcessor } from './processors/reports.processor';

const REPORTS_QUEUE = 'reports';

@Module({
    imports: [BullModule.registerQueue({ name: REPORTS_QUEUE })],
    providers: [ReportsProcessor],
})
export class ReportsModule implements OnModuleInit {
    constructor(
        @InjectQueue(REPORTS_QUEUE)
        private readonly reportsQueue: Queue
    ) {}

    async onModuleInit(): Promise<void> {
        // Limpiar jobs repetibles previos para evitar duplicados al reiniciar
        const existingJobs = await this.reportsQueue.getRepeatableJobs();
        for (const job of existingJobs) {
            await this.reportsQueue.removeRepeatableByKey(job.key);
        }

        // Report diario a las 8:00 AM
        await this.reportsQueue.add(
            'daily-report',
            {},
            {
                repeat: { cron: '0 8 * * *' },
                jobId: 'daily-report',
            }
        );

        // Limpieza semanal los domingos a las 3:00 AM
        await this.reportsQueue.add(
            'weekly-cleanup',
            {},
            {
                repeat: { cron: '0 3 * * 0' },
                jobId: 'weekly-cleanup',
            }
        );
    }
}

El onModuleInit limpia los jobs repetibles previos antes de re-registrarlos. Sin esto, cada reinicio del servidor crearía duplicados y el job se ejecutaría múltiples veces.

¿Por qué no usar @nestjs/schedule? Podrías, pero @nestjs/schedule ejecuta la tarea en el proceso principal. Si tienes múltiples instancias de tu app (load balancing), el cron se ejecuta en TODAS las instancias a la vez. Con Bull + Redis, solo un worker toma el job, sin duplicados.


17. Separar el producer del consumer

En producción, es habitual que el producer (quien encola jobs) y el consumer (quien los procesa) sean procesos separados:

┌─────────────┐     ┌─────────┐     ┌──────────────┐
  API Server  │────→│  Redis  │←────│  Worker
  (producer)  │     │         │     │  (consumer)  │
  Puerto 3000  Sin puerto
└─────────────┘     └─────────┘     └──────────────┘

Esto te permite:

Para implementarlo, crea un entry point separado para el worker:

// src/worker.ts
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker.module';
import { Logger } from '@nestjs/common';

async function bootstrap(): Promise<void> {
    const logger = new Logger('Worker');

    // No llamamos a app.listen() : el worker no expone un puerto HTTP
    const app = await NestFactory.createApplicationContext(WorkerModule);

    logger.log('Worker iniciado, procesando jobs...');

    // Graceful shutdown
    process.on('SIGTERM', async () => {
        logger.log('SIGTERM recibido, cerrando worker...');
        await app.close();
        process.exit(0);
    });
}

bootstrap();
// src/worker.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bull';
import { MailModule } from './mail/mail.module';
import { ImagesModule } from './images/images.module';

@Module({
    imports: [
        ConfigModule.forRoot({ isGlobal: true }),
        BullModule.forRootAsync({
            imports: [ConfigModule],
            inject: [ConfigService],
            useFactory: (configService: ConfigService) => ({
                redis: {
                    host: configService.getOrThrow<string>('REDIS_HOST'),
                    port: configService.getOrThrow<number>('REDIS_PORT'),
                },
            }),
        }),
        MailModule,
        ImagesModule,
        // Solo módulos con processors
    ],
})
export class WorkerModule {}

Y en Docker Compose:

services:
    api:
        build: .
        command: node dist/main.js
        ports:
            - '3000:3000'

    worker:
        build: .
        command: node dist/worker.js
        # Sin puerto : solo procesa jobs
        depends_on:
            - redis
        deploy:
            replicas: 2 # 2 workers procesando en paralelo

18. Monitorización con Bull Board

No volar a ciegas. te da un panel visual:

Instalación de Bull Board 0 / 1
$
Pulsa para ejecutar el siguiente comando
// src/bull-board.module.ts
import { Module, type MiddlewareConsumer, type NestModule } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { MAIL_QUEUE } from './mail/constants';
import { IMAGE_QUEUE } from './images/constants';

@Module({})
export class BullBoardModule implements NestModule {
    private readonly serverAdapter = new ExpressAdapter();

    constructor(
        @InjectQueue(MAIL_QUEUE)
        private readonly mailQueue: Queue,
        @InjectQueue(IMAGE_QUEUE)
        private readonly imageQueue: Queue
    ) {
        this.serverAdapter.setBasePath('/admin/queues');

        createBullBoard({
            queues: [new BullAdapter(this.mailQueue), new BullAdapter(this.imageQueue)],
            serverAdapter: this.serverAdapter,
        });
    }

    configure(consumer: MiddlewareConsumer): void {
        consumer.apply(this.serverAdapter.getRouter()).forRoutes('/admin/queues');
    }
}

Accede a http://localhost:3000/admin/queues para ver el estado de tus colas en tiempo real.

En producción: protege esta ruta con autenticación. Usa el RolesGuard del post 11 o un middleware de auth básico. Nunca dejes el dashboard expuesto.


19. Manejo de errores avanzado: dead letter queue

Cuando un job falla todos sus reintentos, ¿qué haces? Una captura esos jobs para investigación:

// src/common/queues/dead-letter.processor.ts
import { Processor, Process } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { type Job } from 'bull';

export const DEAD_LETTER_QUEUE = 'dead-letter';

interface DeadLetterJobData {
    readonly originalQueue: string;
    readonly originalJobName: string;
    readonly originalData: unknown;
    readonly failedReason: string;
    readonly attemptsMade: number;
    readonly failedAt: string;
}

@Processor(DEAD_LETTER_QUEUE)
export class DeadLetterProcessor {
    private readonly logger = new Logger(DeadLetterProcessor.name);

    @Process()
    async handleDeadLetter(job: Job<DeadLetterJobData>): Promise<void> {
        this.logger.error(
            `💀 Dead letter: Job "${job.data.originalJobName}" de cola "${job.data.originalQueue}" ` +
                `falló después de ${job.data.attemptsMade} intentos. ` +
                `Razón: ${job.data.failedReason}`
        );

        // Aquí puedes:
        // 1. Enviar una alerta a Slack/Discord
        // 2. Guardar en base de datos para investigación
        // 3. Notificar al equipo de soporte
    }
}

Y en el processor original, envía a la DLQ cuando se agotan los reintentos:

// En el MailProcessor
@OnQueueFailed()
async onFailed(job: Job, error: Error): Promise<void> {
    this.logger.error(
        `Job ${job.id} falló: ${error.message}`,
    );

    // Si se agotaron los reintentos, enviar a dead letter queue
    if (job.attemptsMade >= (job.opts.attempts ?? 3)) {
        await this.deadLetterQueue.add({
            originalQueue: MAIL_QUEUE,
            originalJobName: job.name,
            originalData: job.data,
            failedReason: error.message,
            attemptsMade: job.attemptsMade,
            failedAt: new Date().toISOString(),
        });
    }
}

20. Estructura de archivos

src/
├── common/
   ├── events/
   ├── event-names.ts
   └── event-map.ts
   └── queues/
       ├── dead-letter.processor.ts
       └── dead-letter.module.ts
├── mail/
   ├── mail.module.ts
   ├── mail.service.ts Encola jobs
   ├── mail-transport.service.ts Envía emails realmente
   ├── constants.ts MAIL_QUEUE, MailJobNames
   ├── interfaces/
   └── mail-job-data.interface.ts
   ├── processors/
   └── mail.processor.ts Procesa jobs de email
   └── listeners/
       └── user-registered-queue.listener.ts Evento Cola
├── images/
   ├── images.module.ts
   ├── images.controller.ts Endpoint de status de jobs
   ├── image-transform.service.ts
   ├── constants.ts
   ├── interfaces/
   └── image-job-data.interface.ts
   └── processors/
       └── image.processor.ts
├── notifications/
   ├── notification.module.ts
   ├── notification.service.ts Delayed jobs
   └── processors/
       └── notification.processor.ts
├── reports/
   └── reports.module.ts Jobs repetibles (cron)
├── bull-board.module.ts Dashboard
├── worker.module.ts Entry point worker separado
└── worker.ts

Patrón: constants.ts + interfaces/ + processors/ en cada módulo que usa colas. Los processors están separados de los services porque tienen responsabilidades diferentes: el service encola, el processor ejecuta.


21. Errores comunes

Error 1: No importar BullModule.forRoot() antes de registerQueue()

// ❌ registerQueue sin forRoot → "Could not find connection"
@Module({
    imports: [BullModule.registerQueue({ name: 'mail' })],
})
export class MailModule {}

// ✅ forRoot en AppModule configura la conexión a Redis
// registerQueue en feature modules registra las colas
@Module({
    imports: [
        BullModule.forRootAsync({
            /* redis config */
        }),
    ],
})
export class AppModule {}

Error 2: No registrar el Processor como provider

// ❌ El processor existe pero no está en providers → los jobs nunca se procesan
@Module({
    imports: [BullModule.registerQueue({ name: 'mail' })],
    providers: [MailService], // Falta MailProcessor
})
export class MailModule {}

// ✅ El processor DEBE estar en providers
@Module({
    imports: [BullModule.registerQueue({ name: 'mail' })],
    providers: [MailService, MailProcessor],
})
export class MailModule {}

Error 3: Datos serializables en los jobs

// ❌ Los datos del job se serializan a JSON en Redis
// Clases, funciones y referencias circulares no sobreviven
await queue.add('process', {
    user: userEntity, // Entidad TypeORM con relaciones circulares
    callback: () => {}, // Funciones no son serializables
    date: new Date(), // Se serializa como string, pierde el tipo Date
});

// ✅ Solo datos primitivos y objetos planos
await queue.add('process', {
    userId: user.id, // ID, no la entidad completa
    email: user.email,
    createdAt: new Date().toISOString(), // ISO string, no Date
});

Los datos del job van a Redis como JSON. Todo lo que no sea serializable se pierde o rompe. Pasa solo IDs y datos primitivos, nunca entidades ni objetos con métodos.

Error 4: Jobs repetibles duplicados al reiniciar

// ❌ Cada vez que la app reinicia, se añade otro job repetible
async onModuleInit(): Promise<void> {
    await this.queue.add('report', {}, { repeat: { cron: '0 8 * * *' } });
    // Después de 5 reinicios: 5 jobs idénticos ejecutándose a las 8:00
}

// ✅ Limpiar antes de registrar
async onModuleInit(): Promise<void> {
    const existing = await this.queue.getRepeatableJobs();
    for (const job of existing) {
        await this.queue.removeRepeatableByKey(job.key);
    }
    await this.queue.add('report', {}, { repeat: { cron: '0 8 * * *' } });
}

22. Recapitulando

📋 @nestjs/bull + Redis

BullModule.forRootAsync() configura Redis. registerQueue() crea colas. @InjectQueue() las inyecta en tus services. Todo con tipado.

⚙️ @Processor + @Process

@Processor vincula una clase a una cola. @Process(nombre) maneja jobs específicos. Job da tipado al payload.

🔄 Reintentos + Backoff

attempts + backoff exponencial. Si falla, reintenta con delay creciente. Jitter para evitar thundering herd. Dead letter queue para los que fallan definitivamente.

🚦 Concurrencia + Rate Limiting

concurrency: N procesa N jobs a la vez. limiter: {max, duration} controla el throughput global. Ajusta según el tipo de job.

⏰ Delayed + Repeatable

delay: ms para jobs futuros (recordatorios, carritos abandonados). repeat: {cron} para tareas periódicas. jobId custom para poder cancelar.

🔗 Eventos → Colas

Los eventos desacoplan. Las colas persisten. Combínalos: el listener del evento encola el job. Tres capas de resiliencia.

En el próximo post nos metemos con WebSockets y Socket.IO: comunicación en tiempo real, @WebSocketGateway, @SubscribeMessage, namespaces, rooms, autenticación en WebSockets y eventos tipados.

EA, nos vemos en los bares!! 🍺


Pon a prueba lo aprendido

1. ¿Cuál es la principal diferencia entre eventos (@nestjs/event-emitter) y colas (Bull + Redis)?

2. ¿Para qué sirve el backoff exponencial en los reintentos de un job?

3. ¿Qué pasa si no limpias los jobs repetibles antes de re-registrarlos en onModuleInit?

4. ¿Por qué solo debes pasar datos primitivos y IDs en el payload de un job, y no entidades completas de TypeORM?

5. ¿Qué ventaja tiene separar el producer (API server) del consumer (worker) en procesos distintos?