Tâches longues en Node.js : notifier les utilisateurs de l'avancée des tâches via SSE
- Les Server-Sent Events (SSE) offrent un mécanisme simple et léger, basé sur HTTP, pour envoyer des mises à jour en temps réel du serveur vers le navigateur.
- L'EventEmitter de NestJS fait le pont entre les workers de la queue et le flux SSE, selon une approche Event-Driven.
- Cette architecture s'applique à de nombreux cas concrets : analyse IA, OCR, génération de documents, import de données volumineuses.
Il arrive parfois que nous traitons des tâches lourdes côté API ce qui complexifie notre manière de gérer nos requêtes. Que ça soit de l’OCR, de l’analyse IA, ou autre tâches prenant plusieurs secondes, tout ceci ne peut être traité de manière synchrone sans dégrader l’expérience utilisateur.
Dans un précédent article, nous avions vu comment utiliser un système de queue pour éviter les problèmes liés au couplage fort de ce genre de tâches, qui pourrait résulter en des timeouts des requêtes, des problèmes de scalabilité, etc…
Il est donc temps de s’attaquer à notre nouveau problème : Quel est le sens de la vie ? Comment signaler aux différents clients, qui consomment l’API, de l’avancée des tâches qui sont traitées en arrière-plan ?
Si vous n’avez pas lu notre article sur le traitement de tâches lourdes en arrière-plan, nous vous conseillons de remédier à cette hérésie, et à aller le consulter juste ici.
Le système de queues c’est bien, mais…
Quand on utilise un système de queue, nous sommes confrontés à de nouveaux problèmes.
Le principe de Jobs et de Workers, permettant de déléguer le traitement à un processus en arrière-plan et de répondre immédiatement au client, a ses inconvénients. En effet, une fois que nous avons répondu à la requête HTTP classique, le client n’a plus moyen de savoir ce qui se passe côté serveur.
Mais rassurez-vous nous avons la solution, et nous vous promettons que ça n’impliquera pas du polling.
Notifier les clients : l’approche Event-Driven
L’architecture Event-Driven repose sur un principe simple : quand quelque chose se passe, on émet un événement. Les composants intéressés s’abonnent à ces événements et réagissent.
Dans notre cas, l’idée est d’utiliser ce principe à deux niveaux :
- Worker → Serveur : le worker émet un événement quand le job progresse.
- Serveur → Client : le serveur pousse l’information vers les navigateurs connectés.
À noter que notre objectif n’est pas forcément de suivre une architecture basée sur les événements de bout en bout, mais de prendre certains concepts pour pouvoir envoyer des événements côté client.
L’EventEmitter de NestJS
NestJS fournit un package dédié (@nestjs/event-emitter) qui implémente un système d’événements interne à l’application. Un worker peut émettre un événement, et un service peut s’y abonner, sans que les deux ne se connaissent directement.
flowchart LR
A@{ shape: processes, label: "Jobs" } --> B@{ "shape": "database", label: "Queue" }
B --> C@{ shape: lin-rect, label: "Worker 1" }
B --> D@{ shape: lin-rect, label: "Worker 2" }
B --> E@{ shape: lin-rect, label: "Worker 3" }
C -.-> | événement | F{EventEmitter}
D -.-> | événement | F{EventEmitter}
E -.-> | événement | F{EventEmitter}
F --> G@{ shape: paper-tape, label: "Listener"}
F --> H@{ shape: paper-tape, label: "Listener"}
Ce découplage est essentiel : le worker n’a pas besoin de savoir qui écoute. Il se contente d’émettre un événement avec les données de progression.
Server-Sent Events (SSE) : pousser des mises à jour vers le navigateur
Qu’est-ce que le SSE ?
On entend souvent parler de polling ou même de Websocket. Cependant, il existe d’autres manières de récupérer des informations de l’API vers le client.
Les Server-Sent Events (ou SSE) sont un mécanisme standard du web (API EventSource) qui permet au serveur d’envoyer des événements au client via une connexion HTTP persistante.
sequenceDiagram
Client->>Serveur: Ouvre un flux d'événements (GET /events)
Serveur-->>Client: Événement 1
Serveur-->>Client: Événement 2
Serveur-->>Client: Événement 3
Client->>Serveur: Ferme la connexion
Le client ouvre une connexion sur un endpoint dédié. Le serveur garde cette connexion ouverte et envoie des événements au format text/event-stream chaque fois qu’il a quelque chose à communiquer.
SSE vs WebSockets : le bon outil pour le bon usage
La question de l’utilisation des Websockets à la place du SSE peut être légitime. Il est donc de notre devoir de vous faire un comparatif pour que vous puissiez choisir au mieux pour votre application :
| WebSockets | Server-Sent Events | |
|---|---|---|
| Direction du flux | Bidirectionnel (serveur ↔ client) | Unidirectionnel (serveur → client) |
| Protocole | WebSocket (upgrade depuis HTTP) | HTTP/HTTPS standard |
| Complexité | Plus complexe (gestion d’un protocole dédié, reconnexion manuelle) | Simple (HTTP natif, reconnexion automatique par le navigateur) |
| Performance | Surcharge à l’ouverture, mais très efficace pour du trafic bidirectionnel à haute fréquence | Surcharge minimale à l’initialisation, idéal pour du push occasionnel |
| Cas d’usage | Chat en temps réel, jeux multijoueurs, collaboration simultanée | Notifications, suivi de progression, flux d’actualités |
Si l’objectif est simplement de prévenir les clients d’une mise à jour côté serveur de manière ponctuelle, le SSE est la solution la plus adaptée. Il est plus simple à mettre en place, repose sur HTTP standard, et le navigateur gère automatiquement la reconnexion en cas de coupure.
Cependant, si vos besoins sont plus complexes, notamment avec de la communication bidirectionnelle entre le client et le serveur, alors le protocole WebSocket sera sûrement une solution plus adaptée.
Par ailleurs, n’oubliez pas que ce ne sont pas les seules manières de faire communiquer votre serveur et vos clients, il existe des technologies plus complexes (comme le WebRTC) pour d’autres cas d’usage. Ne restez donc pas bloqués sur ce comparatif et amusez-vous à fouiller par vous même pour trouver la perle rare qui saura répondre à vos besoins !
Limitations à connaître
Le SSE a une limitation importante sur HTTP/1.1 : le nombre maximum de connexions simultanées est limité à 6 par domaine et par navigateur. Cette limite disparaît avec HTTP/2, qui est aujourd’hui largement supporté. Pensez-y si vous devez ouvrir plusieurs flux SSE différents dans la même application.
L’architecture complète : de la queue au navigateur
En combinant les trois mécanismes (queue, EventEmitter, SSE), on obtient un pipeline complet :
flowchart LR
subgraph Queue system
A@{ shape: processes, label: "Jobs" } --> B@{ "shape": "database", label: "Queue" }
B --> C@{ shape: lin-rect, label: "Worker 1" }
B --> D@{ shape: lin-rect, label: "Worker 2" }
B --> E@{ shape: lin-rect, label: "Worker 3" }
end
C -.-> | événement | F{EventEmitter}
D -.-> | événement | F{EventEmitter}
E -.-> | événement | F{EventEmitter}
subgraph Events-Driven
F --> G@{ shape: paper-tape, label: "Listener"}
end
subgraph "SSE"
G --> H["Observable (RxJS Subject)"]
H --> | événement | I["Client 1"]
H --> | événement | J["Client 2"]
H --> | événement | K["Client 3"]
end
Le flux complet se déroule ainsi :
- Le client envoie une requête
POST /analyzepour lancer une analyse. - Le serveur ajoute un job dans la queue BullMQ et répond immédiatement.
- Un worker récupère le job et commence le traitement.
- À chaque étape, le worker émet un événement via l’EventEmitter.
- Un listener capte cet événement et le pousse dans un RxJS
Subject. - Le
Subjectenvoie l’événement à tous les clients connectés au flux SSE.
Place à l’exemple : l’analyse du contenu d’un fichier
Vous vous souvenez de l’exemple de l’article sur l’utiliation de BullMQ ? Et bien il est temps de le compléter.
Pour rappel, nous allons prendre un cas qui nous est arrivé de rencontrer chez Lonestone : l’analyse du contenu d’un fichier pour en retirer des informations importantes.
L’architecture du système
Toujours dans l’optique de compléter notre précédent exemple, nous avions donc déjà fait toute la partie sur l’envoi d’une requête POST vers l’API afin de lancer une analyse. Ceci avait pour conséquence d’ajouter un job dans la queue, et de retourner une réponse au client.
Et grâce à ce qu’on a pu voir dans cet article, nous allons pouvoir y ajouter un système de communication en temps réel. Le client va donc se connecter à un flux SSE pour écouter les événements côté API.
Maintenant, voici le diagramme de séquence mis à jour avec notre système de SSE :
sequenceDiagram
participant Client
participant API
participant Queue
participant Worker
Client->>API: Ouvre /events (SSE)
API-->>Client: Flux SSE ouvert
Client->>API: POST /analyze
API->>Queue: Ajoute un job
API-->>Client: 200 OK
Queue->>Worker: Traite le job
Worker->>Worker: Étape 1 (extraction)
Worker-->>API: Émet événement "extraction"
API-->>Client: SSE → extraction
Worker->>Worker: Étape 2 (analyse)
Worker-->>API: Émet événement "analyse"
API-->>Client: SSE → analyse
Worker->>Worker: Terminé
Worker-->>API: Émet événement "completed"
API-->>Client: SSE → completed
Un peu de code : Implémentation du SSE avec NestJS
Comme on l’a dit dans cet article, 2 grands concepts vont nous être utiles : l’utilisation de l’EventEmitter pour les événements internes à l’API et l’utilisation de RxJS pour la mise en place du SSE.
Configuration du module
Notre système de queue étant mis en place, il nous suffit juste d’ajouter la partie sur l’EventEmitter dans le module.
Pour ce faire, on se rend dans AnalysisModule et on y ajoute notre AnalysisEventsService :
import { BullModule } from '@nestjs/bullmq'
import { Module } from '@nestjs/common'
import { AnalysisEventsService } from './analysis-events.service'
import { AnalysisController } from './analysis.controller'
import { ANALYSIS_QUEUE_NAME, AnalysisProcessor } from './analysis.processor'
import { AnalysisService } from './analysis.service'
@Module({
imports: [
BullModule.registerQueue({ name: ANALYSIS_QUEUE_NAME }),
],
controllers: [AnalysisController],
providers: [AnalysisService, AnalysisEventsService, AnalysisProcessor],
})
export class AnalysisModule {}
Le service d’événements : le pont entre EventEmitter et SSE
C’est la pièce maîtresse de l’architecture. Ce service écoute les événements internes et les transforme en un flux Observable (via un RxJS Subject) que NestJS peut exposer en SSE :
import { Injectable, OnModuleDestroy } from '@nestjs/common'
import { EventEmitter2 } from '@nestjs/event-emitter'
import { Subject } from 'rxjs'
export const ANALYSIS_UPDATED_EVENT = 'analysis-updated-event'
@Injectable()
export class AnalysisEventsService implements OnModuleDestroy {
private readonly eventSubject = new Subject<MessageEvent>()
constructor(private readonly eventEmitter: EventEmitter2) {
this.eventEmitter.on(ANALYSIS_UPDATED_EVENT, (event: AnalysisEvent) => {
this.eventSubject.next({ data: event } as MessageEvent)
})
}
onUpdated() {
return this.eventSubject.asObservable()
}
onModuleDestroy() {
this.eventSubject.complete()
}
}
Le Subject RxJS joue le rôle de pont :
- En entrée, il reçoit les événements via
next()à chaque fois que l’EventEmitter déclencheanalysis-updated-event. - En sortie, il expose un
Observableque NestJS utilise pour alimenter le flux SSE. - À la destruction du module, le
Subjectest complété proprement pour fermer les connexions.
Le processor (worker) : il faut émettre des événements
Une fois l’AnalysisEventsService définit, nous pouvons donc mettre à jour notre processor, que nous avions défini dans l’article précédent, pour envoyer des événements internes afin de communiquer dans l’API.
Pour rappel, c’est ici que nous traitons les tâches lourdes.
Nous devons donc émettre un événement à chaque étape via l’EventEmitter2 de NestJS, comme ceci :
this.eventEmitter.emit(ANALYSIS_UPDATED_EVENT, {
id: job.data.analysisId,
step: ANALYSIS_STEPS.EXTRACTION,
})
Ce qui donnerait dans le code :
import { Processor, WorkerHost } from '@nestjs/bullmq'
import { EventEmitter2 } from '@nestjs/event-emitter'
import { Job } from 'bullmq'
export const ANALYSIS_QUEUE_NAME = 'analysis_queue'
export const ANALYSIS_JOB_NAME = 'analysis_job'
export const ANALYSIS_JOBS_CONCURRENCY = 10
@Processor(ANALYSIS_QUEUE_NAME, {
concurrency: ANALYSIS_JOBS_CONCURRENCY,
removeOnComplete: { age: 3600, count: 1000 },
removeOnFail: { age: 24 * 3600 },
})
export class AnalysisProcessor extends WorkerHost {
constructor(private readonly eventEmitter: EventEmitter2) {
super()
}
async process(job: Job<AnalysisJobData>) {
this.eventEmitter.emit(ANALYSIS_UPDATED_EVENT, {
id: job.data.analysisId,
step: ANALYSIS_STEPS.EXTRACTION,
})
await performExtraction(job.data) // Tâche longue
this.eventEmitter.emit(ANALYSIS_UPDATED_EVENT, {
id: job.data.analysisId,
step: ANALYSIS_STEPS.ANALYSIS_PART_ONE,
})
await performAnalysis(job.data) // Tâche longue
this.eventEmitter.emit(ANALYSIS_UPDATED_EVENT, {
id: job.data.analysisId,
step: ANALYSIS_STEPS.COMPLETED,
})
}
}
À noter que chaque emit() déclenche l’envoi d’un événement interne que le service SSE va capter.
Le contrôleur : exposer le SSE et le déclenchement
Il nous reste une étape cruciale dans l’API, celle de l’ajout du endpoint pour ouvrir le flux SSE (je sais, vous ne vous y attendiez pas du tout). Pour ce faire nous l’ajoutons dans le AnalysisController :
import { Controller, HttpCode, HttpStatus, Param, Post, Sse } from '@nestjs/common'
@Controller('analysis')
export class AnalysisController {
constructor(
private readonly analysisEvents: AnalysisEventsService,
) {}
// [...]
@Sse('events')
getEvents() {
return this.analysisEvents.onUpdated()
}
}
Le décorateur @Sse('events') de NestJS fait tout le travail : il ouvre une connexion HTTP persistante avec le header Content-Type: text/event-stream et envoie chaque valeur émise par l’Observable au format SSE standard.
Implémentation côté front avec React
Côté client, l’objectif est de consommer le flux SSE et de mettre à jour l’interface en temps réel. L’exemple utilise React avec TanStack Query (React Query).
Écouter le flux SSE
Dans un premier temps, on doit écouter le flux SSE. On déclare donc le hook useAnalysisEvents qui s’abonne au flux SSE via la fonction expérimentale streamedQuery de TanStack Query. À chaque événement reçu, il met à jour le cache des analyses :
import {
queryOptions,
experimental_streamedQuery as streamedQuery,
useQuery,
useQueryClient,
} from '@tanstack/react-query'
import { useEffect, useRef } from 'react'
export function useAnalysisEvents() {
const queryClient = useQueryClient()
const lastProcessedLengthRef = useRef(0)
const query = queryOptions({
queryKey: ['analysis-events'],
queryFn: streamedQuery({
streamFn: async () => {
const { stream } = await analysisControllerGetEvents()
return stream
},
}),
})
const { data: streamedData } = useQuery(query)
useEffect(() => {
const events = Array.isArray(streamedData) ? streamedData : []
const from = lastProcessedLengthRef.current
if (from >= events.length) return
const toProcess = events.slice(from)
for (const raw of toProcess) {
const eventData = zAnalysisEventsSchema.parse(raw)
queryClient.setQueryData(['analyses'], (previous) => {
return previous.map((analysis) =>
analysis.id === eventData.id
? { ...analysis, ...eventData }
: analysis,
)
})
}
lastProcessedLengthRef.current = events.length
}, [streamedData, queryClient])
}
Quelques détails sur le fonctionnement :
streamedQueryest une API expérimentale de TanStack Query qui gère nativement les flux (ReadableStream). Elle accumule les événements dans un tableau. Si vous préférez passer par une méthode plus conventionnelle, vous pouvez utiliser l’interfaceEventSourcedirectement (et donc ajouter uneventListenersur unEventSource).lastProcessedLengthRefpermet de ne traiter que les nouveaux événements à chaque rendu, sans retraiter ceux déjà pris en compte.setQueryDatamet à jour le cache local de façon optimiste. L’UI se rafraîchit instantanément sans avoir à relancer une requête. Nous le faisons comme ça ici car dans notre exemple, nous n’avons pas de donnée persistante entre le back et le front, ni de route GET. Libre à vous de mettre à jour votre donnée comme bon vous semble (optimistic update, evict cache, etc…)
Afficher la progression
Si vous avez suivi les étapes précédentes, vous devriez avoir un dashboard qui affiche une carte par analyse avec son statut en temps réel. Sauf que le statut ne fonctionnait pas jusqu’à présent. Il est temps d’y remédier et pour ce faire nous pouvons donc appeler le hook useAnalysisEvents qui permettra d’écouter le flux SSE.
export default function DashboardPage() {
useAnalysisEvents()
const { data: analyses } = useAnalyses()
const { mutate } = useStartAnalysis()
return (
<main className="container mx-auto py-8 px-4 space-y-6">
<h1 className="text-3xl font-bold">Analyses</h1>
{analyses?.map((analysis) => (
<Card key={analysis.id}>
<CardHeader>
<CardTitle>Analysis</CardTitle>
<AnalysisBadge step={analysis.step} />
</CardHeader>
<CardFooter>
<Button
onClick={() => mutate({ id: analysis.id })}
disabled={
analysis.step !== 'completed' && analysis.step !== 'failed'
}
>
Lancer l'analyse
</Button>
</CardFooter>
</Card>
))}
</main>
)
}
Le composant AnalysisBadge affiche un badge coloré en fonction de l’étape en cours (extraction, analyse, terminé, échoué). L’intégralité de la mise à jour est pilotée par le cache TanStack Query : quand le hook SSE met à jour les données, React re-rend automatiquement les composants concernés.
Et voilà, normalement vous avez un exemple d’utilisation de queue et de SSE fonctionnel de bout en bout ! Vous pouvez retrouver le code complet sur GitHub ici.
Pour conclure
Voilà qui conclut nos 2 articles sur la façon de traiter les tâches lourdes dans une application Node.js (et plus précisément avec NestJS et React)
L’association BullMQ + EventEmitter + SSE forme une architecture simple, performante et facile à maintenir pour traiter des tâches lourdes en Node.js :
- BullMQ gère le traitement en arrière-plan via Redis, avec concurrence, retries et nettoyage automatique.
- L’EventEmitter fait circuler les événements de progression à l’intérieur de l’application.
- Les SSE poussent ces événements vers les navigateurs connectés, en s’appuyant sur HTTP standard.
Une architecture que vous pouvez ajuster à vos besoins, comme nous l’avons vu tout au long de ces 2 articles !
Si jamais vous êtes intéressé, vous pouvez retrouver le code complet de l’exemple (combinant l’utilisation de BullMQ et des SSE) sur GitHub.