Vamos con el post número 18 de la serie NestJS y segundo del bloque de Eventos, Colas y Tiempo Real. En el post anterior aprendimos a desacoplar módulos con eventos: el AuthService emite user.registered y se olvida y los listeners reaccionan a esto. Pero… ¿qué pasa cuando un listener necesita hacer algo pesado? ¿Enviar un email? ¿Generar un PDF? ¿Redimensionar una imagen? Si el listener falla, se pierde y no hay reintentos. No hay persistencia y no hay control.
Aquí entran en juego las colas de trabajo. Un evento dice algo ha pasado y un job dice haz esto, pero más tarde, con reintentos, con prioridad, y si falla 3 veces mándame un aviso. Los eventos son efímeros y los jobs son persistentes.
Como en cada post vamos a revisar toda la base que llevamos a las espaldas en esta serie: Docker (post 1), controllers (post 2), DI (post 3), módulos (post 4), middleware (post 5), validación (post 6), TypeORM (posts 7-9), seguridad (posts 10-12), funcionalidades avanzadas (posts 13-16) y eventos (post 17).
EA, amo al lío.

1. Eventos vs Colas: ¿cuál es la diferencia?
En el post anterior vimos que los eventos son fire-and-forget, vamos que lo lanzas y te olvidas, pero esto tiene alguna limitación:
EVENTOS (@nestjs/event-emitter):
✅ Desacoplamiento entre módulos
✅ Respuesta inmediata al usuario
❌ Si el listener falla, el job se pierde para siempre
❌ No hay reintentos automáticos
❌ Todo se ejecuta en el mismo proceso Node.js
❌ Si el servidor se reinicia, los eventos en vuelo desaparecen
COLAS (Bull + Redis):
✅ Persistencia: los jobs se guardan en Redis hasta que se procesan
✅ Reintentos automáticos con backoff exponencial
✅ Prioridades: "este email es urgente, este report puede esperar"
✅ Concurrencia controlada: "procesa máximo 5 jobs a la vez"
✅ Rate limiting: "máximo 100 emails por minuto"
✅ Delayed jobs: "envía este recordatorio en 24 horas"
✅ Escalable: varios workers procesando la misma cola
Para tareas ligeras que no te importa si fallan. Notificaciones in-app, métricas, logs. Sin persistencia y todo en memoria.
Para tareas pesadas que DEBEN completarse como emails, PDFs, procesamiento de imágenes, exports. Con persistencia en Redis, reintentos y control total.
2. ¿Qué es Bull?
💡 Bull Librería de Node.js para gestión de colas de trabajo basada en Redis. Soporta jobs con prioridades, reintentos automáticos, backoff exponencial, rate limiting, scheduling, eventos de ciclo de vida y múltiples workers. @nestjs/bull la integra con el sistema de módulos y DI de NestJS. Más info → es la librería estándar de colas para Node.js. Usa 💡 Redis Base de datos en memoria key-value. Extremadamente rápida para operaciones de lectura/escritura. Bull la usa como broker: almacena los jobs, gestiona las colas, maneja los locks entre workers y persiste los datos para que sobrevivan a reinicios del servidor. Más info → como broker para almacenar y distribuir los jobs.
El flujo sería el siguiente:
1. Tu servicio crea un job → Bull lo guarda en Redis
2. Un @Processor toma el job de la cola → lo ejecuta
3. Si falla → Bull lo reintenta automáticamente según tu config
4. Si tiene éxito → Bull lo marca como completado
5. Todo persiste en Redis → si el servidor se reinicia, los jobs pendientes siguen ahí
3. Docker Compose: añadiendo Redis
Ya teníamos PostgreSQL en Docker desde el post 7, y ahora añadimos Redis:
# docker-compose.yml
services:
app:
build: .
ports:
- '3000:3000'
depends_on:
- postgres
- redis
environment:
DATABASE_HOST: postgres
DATABASE_PORT: 5432
DATABASE_USER: nestjs
DATABASE_PASSWORD: nestjs_password
DATABASE_NAME: nestjs_db
REDIS_HOST: redis
REDIS_PORT: 6379
volumes:
- .:/app
- /app/node_modules
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: nestjs
POSTGRES_PASSWORD: nestjs_password
POSTGRES_DB: nestjs_db
ports:
- '5432:5432'
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- '6379:6379'
volumes:
- redis_data:/data
command: redis-server --appendonly yes
volumes:
postgres_data:
redis_data:
El flag --appendonly yes activa la AOF (Append Only File) AOF (Append Only File) Modo de persistencia de Redis que guarda cada operación de escritura en un archivo de log. Si Redis se reinicia, reproduce el log para recuperar todos los datos. Más seguro que RDB snapshots para no perder jobs. de Redis. Si Redis se reinicia, no pierdes los jobs pendientes.
4. Instalación
Instalamos tres paquetes:
@nestjs/bull: Integración con NestJS con sus decoradores, módulo, DI.bull: La librería core que habla con Redis.@types/bull: Tipos TypeScript.
5. Configuración del módulo
Registra Bull en AppModule con la conexión a Redis:
// src/app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
BullModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
redis: {
host: configService.getOrThrow<string>('REDIS_HOST'),
port: configService.getOrThrow<number>('REDIS_PORT'),
},
defaultJobOptions: {
// Opciones por defecto para TODOS los jobs
removeOnComplete: 100, // Mantiene los últimos 100 completados
removeOnFail: 500, // Mantiene los últimos 500 fallidos
attempts: 3, // Reintentar 3 veces si falla
backoff: {
type: 'exponential',
delay: 2000, // 2s → 4s → 8s entre reintentos
},
},
}),
}),
// ... otros módulos
],
})
export class AppModule {}
Las defaultJobOptions aplican a todas las colas de la app. Cada cola puede sobreescribirlas. Desglose:
removeOnComplete: 100: No acumules millones de jobs completados en Redis. Guarda los últimos 100 para debugging.removeOnFail: 500: Los fallidos son más importantes para investigar, guarda más.attempts: 3: Si un job falla, reintenta hasta 3 veces antes de darlo por perdido.backoff: exponential: Espera progresivamente más entre reintentos: 2s, 4s, 8s. Evita enviar a la tumba un servicio caído.
6. Tu primera cola: emails
Vamos a ver un caso real que sería mover el envío de emails a una cola. Recuerda que en el post 17 teníamos un listener que enviaba emails de bienvenida. Si el mail server se caía, el email se iba a la puta, ahora ya no.
6.1. Registrar la cola en el módulo
// src/mail/mail.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { MailService } from './mail.service';
import { MailProcessor } from './processors/mail.processor';
import { MAIL_QUEUE } from './constants';
@Module({
imports: [
BullModule.registerQueue({
name: MAIL_QUEUE,
defaultJobOptions: {
attempts: 5, // Los emails son críticos, más reintentos
backoff: {
type: 'exponential',
delay: 3000, // 3s → 6s → 12s → 24s → 48s
},
priority: 2, // Prioridad media por defecto
},
}),
],
providers: [MailService, MailProcessor],
exports: [MailService],
})
export class MailModule {}
// src/mail/constants.ts
export const MAIL_QUEUE = 'mail';
export const MailJobNames = {
WELCOME: 'welcome',
PASSWORD_RESET: 'password-reset',
ORDER_CONFIRMATION: 'order-confirmation',
WEEKLY_DIGEST: 'weekly-digest',
} as const;
export type MailJobName = (typeof MailJobNames)[keyof typeof MailJobNames];
as const + el type derivado: misma técnica que usamos con los eventos en el post anterior. Hay que evitar las magic strings y por eso implementamos constantes tipadas.
6.2. El servicio que añade jobs a la cola
// src/mail/mail.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { MAIL_QUEUE, MailJobNames } from './constants';
import { type WelcomeMailJobData } from './interfaces/mail-job-data.interface';
import { type PasswordResetMailJobData } from './interfaces/mail-job-data.interface';
@Injectable()
export class MailService {
private readonly logger = new Logger(MailService.name);
constructor(
@InjectQueue(MAIL_QUEUE)
private readonly mailQueue: Queue
) {}
async sendWelcomeEmail(email: string, name: string): Promise<void> {
const jobData: WelcomeMailJobData = { email, name };
await this.mailQueue.add(MailJobNames.WELCOME, jobData, {
priority: 1, // Alta prioridad: el usuario acaba de registrarse
});
this.logger.log(`Job de email de bienvenida encolado para ${email}`);
}
async sendPasswordResetEmail(email: string, resetToken: string): Promise<void> {
const jobData: PasswordResetMailJobData = { email, resetToken };
await this.mailQueue.add(MailJobNames.PASSWORD_RESET, jobData, {
priority: 1, // Urgente
attempts: 5, // Más reintentos que el default
removeOnComplete: true, // No guardar datos sensibles
});
this.logger.log(`Job de reset de password encolado para ${email}`);
}
async sendWeeklyDigest(recipients: ReadonlyArray<{ email: string; name: string }>): Promise<void> {
const jobs = recipients.map((recipient) => ({
name: MailJobNames.WEEKLY_DIGEST,
data: { email: recipient.email, name: recipient.name },
opts: {
priority: 5, // Baja prioridad: no es urgente
},
}));
await this.mailQueue.addBulk(jobs);
this.logger.log(`${recipients.length} jobs de weekly digest encolados`);
}
}
Vamos a ver qué tenemos por aquí:
@InjectQueue(MAIL_QUEUE): Inyecta la instancia de la cola registrada en el módulo.queue.add(jobName, data, opts): Añade un job a la cola.dataes el payload,optssobreescribe las opciones por defecto.queue.addBulk(jobs): Añade múltiples jobs en una sola operación de Redis. Mucho más eficiente que un bucle conadd().priority: 1: Los números más bajos = mayor prioridad. El email de bienvenida se procesa antes que el digest semanal.
6.3. Interfaces tipadas para los datos de los jobs
// src/mail/interfaces/mail-job-data.interface.ts
export interface WelcomeMailJobData {
readonly email: string;
readonly name: string;
}
export interface PasswordResetMailJobData {
readonly email: string;
readonly resetToken: string;
}
export interface OrderConfirmationMailJobData {
readonly email: string;
readonly orderId: string;
readonly items: ReadonlyArray<{
readonly name: string;
readonly quantity: number;
readonly price: number;
}>;
}
export interface WeeklyDigestMailJobData {
readonly email: string;
readonly name: string;
}
export type MailJobData =
| WelcomeMailJobData
| PasswordResetMailJobData
| OrderConfirmationMailJobData
| WeeklyDigestMailJobData;
Cada tipo de job tiene su propia interfaz con readonly, así el processor sabe qué datos deben llegar.
7. El Processor: quien hace el trabajo
El 💡 Processor Clase decorada con @Processor que procesa los jobs de una cola. Cada método @Process maneja un tipo de job específico. NestJS lo registra automáticamente y le inyecta dependencias vía DI. Es el worker que consume y ejecuta los jobs. Más info → es el worker que toma los jobs de la cola y los ejecuta:
// src/mail/processors/mail.processor.ts
import { Processor, Process, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { type Job } from 'bull';
import { MAIL_QUEUE, MailJobNames } from '../constants';
import { type WelcomeMailJobData } from '../interfaces/mail-job-data.interface';
import { type PasswordResetMailJobData } from '../interfaces/mail-job-data.interface';
import { type WeeklyDigestMailJobData } from '../interfaces/mail-job-data.interface';
import { MailTransportService } from '../mail-transport.service';
@Processor(MAIL_QUEUE)
export class MailProcessor {
private readonly logger = new Logger(MailProcessor.name);
constructor(private readonly mailTransport: MailTransportService) {}
@Process(MailJobNames.WELCOME)
async handleWelcomeEmail(job: Job<WelcomeMailJobData>): Promise<void> {
this.logger.log(`Procesando email de bienvenida para ${job.data.email} (intento ${job.attemptsMade + 1})`);
await this.mailTransport.send({
to: job.data.email,
subject: '¡Bienvenido!',
template: 'welcome',
context: { name: job.data.name },
});
}
@Process(MailJobNames.PASSWORD_RESET)
async handlePasswordReset(job: Job<PasswordResetMailJobData>): Promise<void> {
this.logger.log(`Procesando email de reset de password para ${job.data.email}`);
await this.mailTransport.send({
to: job.data.email,
subject: 'Resetea tu contraseña',
template: 'password-reset',
context: { resetToken: job.data.resetToken },
});
}
@Process(MailJobNames.WEEKLY_DIGEST)
async handleWeeklyDigest(job: Job<WeeklyDigestMailJobData>): Promise<void> {
await this.mailTransport.send({
to: job.data.email,
subject: 'Tu resumen semanal',
template: 'weekly-digest',
context: { name: job.data.name },
});
}
@OnQueueActive()
onActive(job: Job): void {
this.logger.log(`Procesando job ${job.id} de tipo ${job.name}...`);
}
@OnQueueCompleted()
onCompleted(job: Job): void {
this.logger.log(`Job ${job.id} (${job.name}) completado`);
}
@OnQueueFailed()
onFailed(job: Job, error: Error): void {
this.logger.error(`Job ${job.id} (${job.name}) falló: ${error.message}`, error.stack);
}
}
Vamos a ver cada cosa con cariño:
@Processor(MAIL_QUEUE): Vincula esta clase a la colamail. NestJS la registra como consumer automáticamente.@Process(MailJobNames.WELCOME): Este método se ejecuta cuando un job con nombre'welcome'llega a la cola.Job<WelcomeMailJobData>: El parámetrojobes genérico.job.datatiene el tipo exacto del payload. Ceroany.job.attemptsMade: Número de intentos previos. Útil para logging y para lógica condicional (ej: si es el último intento, notificar al admin).@OnQueueActive/Completed/Failed: Hooks del ciclo de vida de la cola. Perfectos para logging y métricas.
8. Ciclo de vida de un job
Cada job pasa por varios estados. Bull te da hooks para reaccionar a cada uno:
add() → WAITING → ACTIVE → COMPLETED ✅
↓
FAILED ❌
↓
(si quedan reintentos)
↓
DELAYED → WAITING → ACTIVE → ...
↓
(si no quedan reintentos)
↓
FAILED (definitivo) 💀
| Decorador | Cuándo se ejecuta | Caso de uso |
|---|---|---|
@OnQueueWaiting() | El job entra en la cola | Métricas de cola |
@OnQueueActive() | Un worker empieza a procesar el job | Logging, tracking |
@OnQueueCompleted() | El job se completó con éxito | Notificaciones, métricas |
@OnQueueFailed() | El job falló (puede que se reintente) | Alertas, logging de errores |
@OnQueueStalled() | El worker murió sin completar el job | Alertas críticas |
@OnQueueDrained() | La cola se vació (no quedan jobs) | Batch processing, limpieza |
El más peligroso: @OnQueueStalled(). Un job stalled stalled Un job se considera 'stalled' cuando el worker que lo estaba procesando dejó de responder (crash, OOM kill, timeout). Bull detecta esto automáticamente y puede reencolar el job para que otro worker lo procese. significa que el worker murió sin terminar. Bull lo detecta y lo reencola automáticamente. Si ves muchos stalled jobs, tu worker tiene un memory leak o un timeout demasiado ajustado.
9. Estrategias de reintentos y backoff
El reintento no es volver a ejecutar y ya, hay estrategias:
// Backoff exponencial (recomendado para APIs externas)
await this.mailQueue.add(MailJobNames.WELCOME, data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
// Intento 1: falla → espera 2s
// Intento 2: falla → espera 4s
// Intento 3: falla → espera 8s
// Intento 4: falla → espera 16s
// Intento 5: falla → marcado como FAILED definitivo
},
});
// Backoff fijo (para errores transitorios conocidos)
await this.mailQueue.add(MailJobNames.WELCOME, data, {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000,
// Siempre espera 5s entre reintentos
},
});
Cada reintento espera el doble que el anterior: 2s → 4s → 8s → 16s. Ideal cuando llamas a servicios externos (APIs de email, pagos, etc.) que pueden estar temporalmente caídos. Les das tiempo para recuperarse sin petarlos.
Siempre espera lo mismo entre reintentos: 5s → 5s → 5s. Útil para errores puntuales donde el tiempo de espera no importa: un lock de base de datos, un conflicto de concurrencia.
Backoff custom: lógica avanzada de reintentos
Para escenarios más complejos, define una función de backoff custom:
await this.mailQueue.add(MailJobNames.WELCOME, data, {
attempts: 5,
backoff: {
type: 'custom',
},
});
Y en el processor, implementa la lógica:
// src/mail/processors/mail.processor.ts
@Processor({
name: MAIL_QUEUE,
settings: {
backoffStrategies: {
custom: (attemptsMade: number): number => {
// Jitter: añade aleatoriedad para evitar thundering herd
const baseDelay = Math.pow(2, attemptsMade) * 1000;
const jitter = Math.random() * 1000;
return baseDelay + jitter;
},
},
},
})
export class MailProcessor {
// ...
}
El jitter jitter Añadir una cantidad aleatoria al delay entre reintentos. Si 1000 jobs fallan al mismo tiempo y todos reintentan a los 4 segundos exactos, saturan el servicio de nuevo (thundering herd). El jitter dispersa los reintentos en una ventana temporal. es crucial cuando tienes muchos workers. Sin jitter, todos reintentan al mismo tiempo y saturan el servicio caído.
10. Delayed jobs: tareas programadas
Jobs que se ejecutan en el futuro y no inmediatamente:
@Injectable()
export class NotificationService {
constructor(
@InjectQueue('notifications')
private readonly notificationQueue: Queue
) {}
// Enviar un recordatorio 24 horas después del registro
async scheduleWelcomeReminder(userId: string, email: string): Promise<void> {
await this.notificationQueue.add(
'welcome-reminder',
{ userId, email },
{
delay: 24 * 60 * 60 * 1000, // 24 horas en ms
}
);
}
// Enviar recordatorio de carrito abandonado después de 2 horas
async scheduleCartReminder(userId: string, cartId: string): Promise<void> {
const job = await this.notificationQueue.add(
'cart-reminder',
{ userId, cartId },
{
delay: 2 * 60 * 60 * 1000, // 2 horas
jobId: `cart-reminder-${cartId}`, // ID único por carrito
}
);
// Guardar el jobId para poder cancelarlo si el usuario compra
return job;
}
// Cancelar el recordatorio si el usuario completa la compra
async cancelCartReminder(cartId: string): Promise<void> {
const job = await this.notificationQueue.getJob(`cart-reminder-${cartId}`);
if (job) {
await job.remove();
}
}
}
Vamos a ver el patrón del carrito abandonado:
- Usuario añade productos → programas un recordatorio a 2 horas.
- Si compra antes de 2 horas → cancelas el job con
job.remove(). - Si no compra → el job se ejecuta y envía el recordatorio.
El jobId custom (cart-reminder-${cartId}) te permite localizar y cancelar un job específico. Sin jobId, Bull asigna un ID auto-incremental y no hay forma fácil de encontrar el job después.
11. Concurrencia: procesar múltiples jobs a la vez
Por defecto, un processor procesa un job a la vez. Si cada email tarda 2 segundos y tienes 1000 en cola, tardarás 33 minutos. Con concurrencia:
@Processor({
name: MAIL_QUEUE,
concurrency: 5, // Procesa hasta 5 jobs simultáneamente
})
export class MailProcessor {
// ...
}
O a nivel de método @Process individual:
@Process({ name: MailJobNames.WELCOME, concurrency: 3 })
async handleWelcomeEmail(job: Job<WelcomeMailJobData>): Promise<void> {
// Hasta 3 emails de bienvenida simultáneos
}
@Process({ name: MailJobNames.WEEKLY_DIGEST, concurrency: 10 })
async handleWeeklyDigest(job: Job<WeeklyDigestMailJobData>): Promise<void> {
// Hasta 10 digests simultáneos (menos críticos, más paralelismo)
}
- Jobs que esperan respuesta de servicios externos (I/O bound): emails, APIs, uploads. El CPU está idle mientras espera
- Jobs independientes que no comparten recursos ni tienen efectos colaterales entre sí
- Cola con muchos jobs pendientes y el throughput actual no es suficiente
- Jobs CPU-intensive: procesamiento de imágenes, cálculos pesados. Más concurrencia = más contención de CPU
- Jobs que acceden a recursos limitados: una API con rate limit de 10 req/s. Si pones concurrency: 50, te banean
- Jobs que modifican los mismos datos: riesgo de race conditions y conflictos de escritura
12. Rate limiting: controlar el ritmo
Cuando llamas a APIs externas con límites, necesitas rate limiting rate limiting Limitar la cantidad de jobs procesados por unidad de tiempo. Por ejemplo, máximo 100 emails por minuto. Evita que satures servicios externos, te baneen, o te cobren de más. :
BullModule.registerQueue({
name: MAIL_QUEUE,
limiter: {
max: 100, // Máximo 100 jobs
duration: 60000, // por cada 60 segundos (1 minuto)
},
}),
Esto es global para la cola. No importa la concurrencia porque aunque tengas 10 workers, Bull garantiza que no se procesan más de 100 jobs por minuto en total.
Combinado con la concurrencia:
concurrency: 5 → Hasta 5 jobs ejecutándose al mismo tiempo
limiter: 100/min → Pero no más de 100 jobs por minuto en total
Si cada job tarda 0.5s → 5 workers × 2 jobs/s = 10 jobs/s = 600 jobs/min
Con limiter: Bull frena a 100/min, los workers esperan entre lotes
13. Conectando eventos con colas
La combinación más potente sería usar los eventos del post 17 para alimentar las colas de este post. El evento es el trigger, la cola es el ejecutor:
// src/mail/listeners/user-registered-queue.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { MailService } from '../mail.service';
import { UserRegisteredEvent } from '../../auth/events/user-registered.event';
import { NotificationService } from '../../notifications/notification.service';
import { EventNames } from '../../common/events/event-names';
@Injectable()
export class UserRegisteredQueueListener {
private readonly logger = new Logger(UserRegisteredQueueListener.name);
constructor(
private readonly mailService: MailService,
private readonly notificationService: NotificationService
) {}
@OnEvent(EventNames.USER_REGISTERED)
async handleUserRegistered(event: UserRegisteredEvent): Promise<void> {
// No enviamos el email directamente : lo encolamos
await this.mailService.sendWelcomeEmail(event.email, event.name);
// Programar un recordatorio para 24h después
await this.notificationService.scheduleWelcomeReminder(event.userId, event.email);
this.logger.log(`Jobs de bienvenida encolados para ${event.email}`);
}
}
El flujo completo:
1. AuthService → emit('user.registered') (fire-and-forget)
2. UserRegisteredQueueListener → @OnEvent (reacciona al evento)
3. mailService.sendWelcomeEmail() → queue.add() (encola el job)
4. MailProcessor → @Process('welcome') (procesa el job)
5. Si falla → Bull reintenta automáticamente (3-5 intentos con backoff)
Tres capas de resiliencia:
- Evento: desacopla el
AuthServicedel email. - Cola: persiste el job en Redis, sobrevive a reinicios.
- Reintentos: si el mail server se cae, reintenta automáticamente.
14. Cola de procesamiento de imágenes
Otro caso real podría ser redimensionar imágenes subidas por usuarios. En el post 16 subíamos archivos y ahora procesamos esos archivos en segundo plano:
// src/images/constants.ts
export const IMAGE_QUEUE = 'image-processing';
export const ImageJobNames = {
RESIZE: 'resize',
GENERATE_THUMBNAILS: 'generate-thumbnails',
OPTIMIZE: 'optimize',
} as const;
// src/images/interfaces/image-job-data.interface.ts
export interface ResizeImageJobData {
readonly filePath: string;
readonly userId: string;
readonly dimensions: ReadonlyArray<{
readonly width: number;
readonly height: number;
readonly suffix: string;
}>;
}
export interface GenerateThumbnailsJobData {
readonly filePath: string;
readonly userId: string;
readonly sizes: ReadonlyArray<number>;
}
// src/images/processors/image.processor.ts
import { Processor, Process, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { type Job } from 'bull';
import { IMAGE_QUEUE, ImageJobNames } from '../constants';
import { type ResizeImageJobData } from '../interfaces/image-job-data.interface';
import { type GenerateThumbnailsJobData } from '../interfaces/image-job-data.interface';
import { ImageTransformService } from '../image-transform.service';
@Processor({
name: IMAGE_QUEUE,
concurrency: 3, // Máximo 3 imágenes a la vez (CPU-intensive)
})
export class ImageProcessor {
private readonly logger = new Logger(ImageProcessor.name);
constructor(private readonly imageTransform: ImageTransformService) {}
@Process(ImageJobNames.RESIZE)
async handleResize(job: Job<ResizeImageJobData>): Promise<void> {
const { filePath, dimensions } = job.data;
for (let i = 0; i < dimensions.length; i++) {
const dim = dimensions[i];
await this.imageTransform.resize(filePath, dim.width, dim.height, dim.suffix);
// Reportar progreso al job
const progress = Math.round(((i + 1) / dimensions.length) * 100);
await job.progress(progress);
}
}
@Process(ImageJobNames.GENERATE_THUMBNAILS)
async handleThumbnails(job: Job<GenerateThumbnailsJobData>): Promise<void> {
const { filePath, sizes } = job.data;
for (let i = 0; i < sizes.length; i++) {
await this.imageTransform.thumbnail(filePath, sizes[i]);
await job.progress(Math.round(((i + 1) / sizes.length) * 100));
}
}
@OnQueueFailed()
onFailed(job: Job, error: Error): void {
this.logger.error(`Procesamiento de imagen falló (job ${job.id}): ${error.message}`, error.stack);
}
}
Atención al job.progress() porque reporta el progreso del job en porcentaje. Si tienes un dashboard (como Bull Board, que veremos al final), puedes ver cuánto lleva cada job en tiempo real.
La concurrencia es baja (3) porque el procesamiento de imágenes es CPU-intensive. Más workers no ayudan si el CPU ya está petada al 100%.
15. Jobs con progreso: tracking en tiempo real
job.progress() no solo sirve para dashboards:
// src/images/images.controller.ts
import { Controller, Get, Param } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { IMAGE_QUEUE } from './constants';
interface JobStatusResponse {
readonly id: string;
readonly state: string;
readonly progress: number;
readonly failedReason?: string;
}
@Controller('images')
export class ImagesController {
constructor(
@InjectQueue(IMAGE_QUEUE)
private readonly imageQueue: Queue
) {}
@Get('jobs/:jobId/status')
async getJobStatus(@Param('jobId') jobId: string): Promise<JobStatusResponse> {
const job = await this.imageQueue.getJob(jobId);
if (!job) {
return {
id: jobId,
state: 'not_found',
progress: 0,
};
}
const state = await job.getState();
return {
id: job.id.toString(),
state,
progress: job.progress() as number,
failedReason: job.failedReason ?? undefined,
};
}
}
El cliente puede hacer polling a este endpoint para mostrar una barra de progreso:
GET /images/jobs/42/status
→ { "id": "42", "state": "active", "progress": 66 }
GET /images/jobs/42/status (unos segundos después)
→ { "id": "42", "state": "completed", "progress": 100 }
16. Jobs repetibles (cron jobs)
Jobs que se ejecutan en un schedule, como un cron pero gestionado por Bull y Redis:
// src/reports/reports.module.ts
import { Module, type OnModuleInit } from '@nestjs/common';
import { BullModule, InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { ReportsProcessor } from './processors/reports.processor';
const REPORTS_QUEUE = 'reports';
@Module({
imports: [BullModule.registerQueue({ name: REPORTS_QUEUE })],
providers: [ReportsProcessor],
})
export class ReportsModule implements OnModuleInit {
constructor(
@InjectQueue(REPORTS_QUEUE)
private readonly reportsQueue: Queue
) {}
async onModuleInit(): Promise<void> {
// Limpiar jobs repetibles previos para evitar duplicados al reiniciar
const existingJobs = await this.reportsQueue.getRepeatableJobs();
for (const job of existingJobs) {
await this.reportsQueue.removeRepeatableByKey(job.key);
}
// Report diario a las 8:00 AM
await this.reportsQueue.add(
'daily-report',
{},
{
repeat: { cron: '0 8 * * *' },
jobId: 'daily-report',
}
);
// Limpieza semanal los domingos a las 3:00 AM
await this.reportsQueue.add(
'weekly-cleanup',
{},
{
repeat: { cron: '0 3 * * 0' },
jobId: 'weekly-cleanup',
}
);
}
}
El onModuleInit limpia los jobs repetibles previos antes de re-registrarlos. Sin esto, cada reinicio del servidor crearía duplicados y el job se ejecutaría múltiples veces.
¿Por qué no usar
@nestjs/schedule? Podrías, pero@nestjs/scheduleejecuta la tarea en el proceso principal. Si tienes múltiples instancias de tu app (load balancing), el cron se ejecuta en TODAS las instancias a la vez. Con Bull + Redis, solo un worker toma el job, sin duplicados.
17. Separar el producer del consumer
En producción, es habitual que el producer (quien encola jobs) y el consumer (quien los procesa) sean procesos separados:
┌─────────────┐ ┌─────────┐ ┌──────────────┐
│ API Server │────→│ Redis │←────│ Worker │
│ (producer) │ │ │ │ (consumer) │
│ Puerto 3000 │ │ │ │ Sin puerto │
└─────────────┘ └─────────┘ └──────────────┘
Esto te permite:
- Escalar workers independientemente del API server.
- El API server nunca se ralentiza por procesamiento pesado.
- Reiniciar workers sin downtime en la API.
Para implementarlo, crea un entry point separado para el worker:
// src/worker.ts
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker.module';
import { Logger } from '@nestjs/common';
async function bootstrap(): Promise<void> {
const logger = new Logger('Worker');
// No llamamos a app.listen() : el worker no expone un puerto HTTP
const app = await NestFactory.createApplicationContext(WorkerModule);
logger.log('Worker iniciado, procesando jobs...');
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.log('SIGTERM recibido, cerrando worker...');
await app.close();
process.exit(0);
});
}
bootstrap();
// src/worker.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bull';
import { MailModule } from './mail/mail.module';
import { ImagesModule } from './images/images.module';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
BullModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
redis: {
host: configService.getOrThrow<string>('REDIS_HOST'),
port: configService.getOrThrow<number>('REDIS_PORT'),
},
}),
}),
MailModule,
ImagesModule,
// Solo módulos con processors
],
})
export class WorkerModule {}
Y en Docker Compose:
services:
api:
build: .
command: node dist/main.js
ports:
- '3000:3000'
worker:
build: .
command: node dist/worker.js
# Sin puerto : solo procesa jobs
depends_on:
- redis
deploy:
replicas: 2 # 2 workers procesando en paralelo
18. Monitorización con Bull Board
No volar a ciegas. 💡 Bull Board Dashboard web para monitorizar colas de Bull. Muestra jobs en cada estado (waiting, active, completed, failed, delayed), permite reintentar jobs fallidos, ver datos del payload y progreso. Se integra con Express y NestJS. Más info → te da un panel visual:
// src/bull-board.module.ts
import { Module, type MiddlewareConsumer, type NestModule } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { type Queue } from 'bull';
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { MAIL_QUEUE } from './mail/constants';
import { IMAGE_QUEUE } from './images/constants';
@Module({})
export class BullBoardModule implements NestModule {
private readonly serverAdapter = new ExpressAdapter();
constructor(
@InjectQueue(MAIL_QUEUE)
private readonly mailQueue: Queue,
@InjectQueue(IMAGE_QUEUE)
private readonly imageQueue: Queue
) {
this.serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [new BullAdapter(this.mailQueue), new BullAdapter(this.imageQueue)],
serverAdapter: this.serverAdapter,
});
}
configure(consumer: MiddlewareConsumer): void {
consumer.apply(this.serverAdapter.getRouter()).forRoutes('/admin/queues');
}
}
Accede a http://localhost:3000/admin/queues para ver el estado de tus colas en tiempo real.
En producción: protege esta ruta con autenticación. Usa el
RolesGuarddel post 11 o un middleware de auth básico. Nunca dejes el dashboard expuesto.
19. Manejo de errores avanzado: dead letter queue
Cuando un job falla todos sus reintentos, ¿qué haces? Una 💡 Dead Letter Queue (DLQ) Cola especial donde van los jobs que fallaron todos sus reintentos. Permite investigar errores, reintentar manualmente o notificar al equipo. Sin DLQ, los jobs fallidos se pierden silenciosamente. captura esos jobs para investigación:
// src/common/queues/dead-letter.processor.ts
import { Processor, Process } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { type Job } from 'bull';
export const DEAD_LETTER_QUEUE = 'dead-letter';
interface DeadLetterJobData {
readonly originalQueue: string;
readonly originalJobName: string;
readonly originalData: unknown;
readonly failedReason: string;
readonly attemptsMade: number;
readonly failedAt: string;
}
@Processor(DEAD_LETTER_QUEUE)
export class DeadLetterProcessor {
private readonly logger = new Logger(DeadLetterProcessor.name);
@Process()
async handleDeadLetter(job: Job<DeadLetterJobData>): Promise<void> {
this.logger.error(
`💀 Dead letter: Job "${job.data.originalJobName}" de cola "${job.data.originalQueue}" ` +
`falló después de ${job.data.attemptsMade} intentos. ` +
`Razón: ${job.data.failedReason}`
);
// Aquí puedes:
// 1. Enviar una alerta a Slack/Discord
// 2. Guardar en base de datos para investigación
// 3. Notificar al equipo de soporte
}
}
Y en el processor original, envía a la DLQ cuando se agotan los reintentos:
// En el MailProcessor
@OnQueueFailed()
async onFailed(job: Job, error: Error): Promise<void> {
this.logger.error(
`Job ${job.id} falló: ${error.message}`,
);
// Si se agotaron los reintentos, enviar a dead letter queue
if (job.attemptsMade >= (job.opts.attempts ?? 3)) {
await this.deadLetterQueue.add({
originalQueue: MAIL_QUEUE,
originalJobName: job.name,
originalData: job.data,
failedReason: error.message,
attemptsMade: job.attemptsMade,
failedAt: new Date().toISOString(),
});
}
}
20. Estructura de archivos
src/
├── common/
│ ├── events/
│ │ ├── event-names.ts
│ │ └── event-map.ts
│ └── queues/
│ ├── dead-letter.processor.ts
│ └── dead-letter.module.ts
├── mail/
│ ├── mail.module.ts
│ ├── mail.service.ts ← Encola jobs
│ ├── mail-transport.service.ts ← Envía emails realmente
│ ├── constants.ts ← MAIL_QUEUE, MailJobNames
│ ├── interfaces/
│ │ └── mail-job-data.interface.ts
│ ├── processors/
│ │ └── mail.processor.ts ← Procesa jobs de email
│ └── listeners/
│ └── user-registered-queue.listener.ts ← Evento → Cola
├── images/
│ ├── images.module.ts
│ ├── images.controller.ts ← Endpoint de status de jobs
│ ├── image-transform.service.ts
│ ├── constants.ts
│ ├── interfaces/
│ │ └── image-job-data.interface.ts
│ └── processors/
│ └── image.processor.ts
├── notifications/
│ ├── notification.module.ts
│ ├── notification.service.ts ← Delayed jobs
│ └── processors/
│ └── notification.processor.ts
├── reports/
│ └── reports.module.ts ← Jobs repetibles (cron)
├── bull-board.module.ts ← Dashboard
├── worker.module.ts ← Entry point worker separado
└── worker.ts
Patrón: constants.ts + interfaces/ + processors/ en cada módulo que usa colas. Los processors están separados de los services porque tienen responsabilidades diferentes: el service encola, el processor ejecuta.
21. Errores comunes
Error 1: No importar BullModule.forRoot() antes de registerQueue()
// ❌ registerQueue sin forRoot → "Could not find connection"
@Module({
imports: [BullModule.registerQueue({ name: 'mail' })],
})
export class MailModule {}
// ✅ forRoot en AppModule configura la conexión a Redis
// registerQueue en feature modules registra las colas
@Module({
imports: [
BullModule.forRootAsync({
/* redis config */
}),
],
})
export class AppModule {}
Error 2: No registrar el Processor como provider
// ❌ El processor existe pero no está en providers → los jobs nunca se procesan
@Module({
imports: [BullModule.registerQueue({ name: 'mail' })],
providers: [MailService], // Falta MailProcessor
})
export class MailModule {}
// ✅ El processor DEBE estar en providers
@Module({
imports: [BullModule.registerQueue({ name: 'mail' })],
providers: [MailService, MailProcessor],
})
export class MailModule {}
Error 3: Datos serializables en los jobs
// ❌ Los datos del job se serializan a JSON en Redis
// Clases, funciones y referencias circulares no sobreviven
await queue.add('process', {
user: userEntity, // Entidad TypeORM con relaciones circulares
callback: () => {}, // Funciones no son serializables
date: new Date(), // Se serializa como string, pierde el tipo Date
});
// ✅ Solo datos primitivos y objetos planos
await queue.add('process', {
userId: user.id, // ID, no la entidad completa
email: user.email,
createdAt: new Date().toISOString(), // ISO string, no Date
});
Los datos del job van a Redis como JSON. Todo lo que no sea serializable se pierde o rompe. Pasa solo IDs y datos primitivos, nunca entidades ni objetos con métodos.
Error 4: Jobs repetibles duplicados al reiniciar
// ❌ Cada vez que la app reinicia, se añade otro job repetible
async onModuleInit(): Promise<void> {
await this.queue.add('report', {}, { repeat: { cron: '0 8 * * *' } });
// Después de 5 reinicios: 5 jobs idénticos ejecutándose a las 8:00
}
// ✅ Limpiar antes de registrar
async onModuleInit(): Promise<void> {
const existing = await this.queue.getRepeatableJobs();
for (const job of existing) {
await this.queue.removeRepeatableByKey(job.key);
}
await this.queue.add('report', {}, { repeat: { cron: '0 8 * * *' } });
}
22. Recapitulando
BullModule.forRootAsync() configura Redis. registerQueue() crea colas. @InjectQueue() las inyecta en tus services. Todo con tipado.
@Processor vincula una clase a una cola. @Process(nombre) maneja jobs específicos. Job
attempts + backoff exponencial. Si falla, reintenta con delay creciente. Jitter para evitar thundering herd. Dead letter queue para los que fallan definitivamente.
concurrency: N procesa N jobs a la vez. limiter: {max, duration} controla el throughput global. Ajusta según el tipo de job.
delay: ms para jobs futuros (recordatorios, carritos abandonados). repeat: {cron} para tareas periódicas. jobId custom para poder cancelar.
Los eventos desacoplan. Las colas persisten. Combínalos: el listener del evento encola el job. Tres capas de resiliencia.
En el próximo post nos metemos con WebSockets y Socket.IO: comunicación en tiempo real, @WebSocketGateway, @SubscribeMessage, namespaces, rooms, autenticación en WebSockets y eventos tipados.
EA, nos vemos en los bares!! 🍺
Pon a prueba lo aprendido
1. ¿Cuál es la principal diferencia entre eventos (@nestjs/event-emitter) y colas (Bull + Redis)?
2. ¿Para qué sirve el backoff exponencial en los reintentos de un job?
3. ¿Qué pasa si no limpias los jobs repetibles antes de re-registrarlos en onModuleInit?
4. ¿Por qué solo debes pasar datos primitivos y IDs en el payload de un job, y no entidades completas de TypeORM?
5. ¿Qué ventaja tiene separar el producer (API server) del consumer (worker) en procesos distintos?