Notice: Function _load_textdomain_just_in_time was called incorrectly. Translation loading for the amp domain was triggered too early. This is usually an indicator for some code in the plugin or theme running too early. Translations should be loaded at the init action or later. Please see Debugging in WordPress for more information. (This message was added in version 6.7.0.) in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: Function amp_has_paired_endpoint was called incorrectly. Function cannot be called before services are registered. The service ID "paired_routing" is not recognized and cannot be retrieved. Please see Debugging in WordPress for more information. (This message was added in version 2.1.1.) in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: Function amp_is_available was called incorrectly. `amp_is_available()` (or `amp_is_request()`, formerly `is_amp_endpoint()`) was called too early and so it will not work properly. WordPress is not currently doing any hook. Calling this function before the `wp` action means it will not have access to `WP_Query` and the queried object to determine if it is an AMP response, thus neither the `amp_skip_post()` filter nor the AMP enabled toggle will be considered. The function was called too early (before the plugins_loaded action) to determine the plugin source. Please see Debugging in WordPress for more information. (This message was added in version 2.0.0.) in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: Function amp_has_paired_endpoint was called incorrectly. Function cannot be called before services are registered. The service ID "paired_routing" is not recognized and cannot be retrieved. Please see Debugging in WordPress for more information. (This message was added in version 2.1.1.) in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: Function amp_is_available was called incorrectly. `amp_is_available()` (or `amp_is_request()`, formerly `is_amp_endpoint()`) was called too early and so it will not work properly. WordPress is not currently doing any hook. Calling this function before the `wp` action means it will not have access to `WP_Query` and the queried object to determine if it is an AMP response, thus neither the `amp_skip_post()` filter nor the AMP enabled toggle will be considered. The function was called too early (before the plugins_loaded action) to determine the plugin source. Please see Debugging in WordPress for more information. (This message was added in version 2.0.0.) in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: La función _load_textdomain_just_in_time ha sido llamada de forma incorrecta. La carga de la traducción para el dominio amp se activó demasiado pronto. Esto suele ser un indicador de que algún código del plugin o tema se ejecuta demasiado pronto. Las traducciones deberían cargarse en la acción init o más tarde. Por favor, ve depuración en WordPress para más información. (Este mensaje fue añadido en la versión 6.7.0). in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: La función amp_is_available ha sido llamada de forma incorrecta. `amp_is_available()` (o `amp_is_request()`, anteriormente `is_amp_endpoint()`) se llamó demasiado pronto y por tanto no funcionará correctamente. WordPress is currently doing the `plugins_loaded` hook. Llamar a esta función antes de la acción `wp` significa que no se tendrá acceso a `WP_Query` y el objeto de la consulta para determinar si es una respuesta AMP, por tanto, no se tendrán en cuenta ni el filtro `amp_skip_post()` ni el conmutador de que está activado AMP. Por favor, ve depuración en WordPress para más información. (Este mensaje fue añadido en la versión 2.0.0). in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170

Notice: La función amp_is_available ha sido llamada de forma incorrecta. `amp_is_available()` (o `amp_is_request()`, anteriormente `is_amp_endpoint()`) se llamó demasiado pronto y por tanto no funcionará correctamente. WordPress is currently doing the `plugins_loaded` hook. Llamar a esta función antes de la acción `wp` significa que no se tendrá acceso a `WP_Query` y el objeto de la consulta para determinar si es una respuesta AMP, por tanto, no se tendrán en cuenta ni el filtro `amp_skip_post()` ni el conmutador de que está activado AMP. Por favor, ve depuración en WordPress para más información. (Este mensaje fue añadido en la versión 2.0.0). in /opt/bitnami/wordpress/wp-includes/functions.php on line 6170
Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad - IT&ライフハックブログ|学びと実践のためのアイデア集

Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad

green snake
Photo by Pixabay on Pexels.com

Armando el Tiempo Real como Arma: Notificaciones WebSocket/SSE con FastAPI — Gestión de Conexiones, Salas, Reconexión, Escalado y Observabilidad


Resumen (Pirámide invertida)

  • Las actualizaciones inmediatas de UI se entregan vía WebSocket o SSE. Elige según el caso de uso.
  • Para una sola instancia, un endpoint WebSocket más gestión de conexiones (pool de conexiones, salas) es suficiente.
  • Para escalar entre múltiples instancias, difunde (fan-out) mensajes mediante Redis Pub/Sub o similar.
  • Operaciones sanas requieren latidos (heartbeats), contraflujo (backpressure), reconexión, autenticación & scopes, y monitoreo.
  • Diseña extremo a extremo, incluyendo timeouts de Nginx/CDN, configuración CORS/WebSocket y pruebas de carga.

A quién beneficiará

  • Aprendiz A (último año de grado)
    Quiere chat/notificaciones en tiempo real. Necesita diferencias entre WebSocket y SSE y el mínimo necesario.
  • Equipo pequeño B (agencia de 3 personas)
    Quiere un dashboard de gestión con actualizaciones instantáneas. Necesita implementación segura de gestión de conexiones, auth y difusión a salas.
  • Desarrollador SaaS C (startup)
    Quiere que las notificaciones lleguen incluso cuando los contenedores escalen. Necesita fan-out con Redis, heartbeats y limitación de tasa.

Evaluación de accesibilidad

  • Estructurado con párrafos cortos y listas por capítulo. El código usa fuente monoespaciada; los comentarios son concisos.
  • Desambiguación temprana de elecciones en las que principiantes suelen tropezar (WebSocket vs SSE). Trampas y mitigaciones resumidas en una tabla.
  • Nivel general: aproximadamente AA.

1. Elegir el enfoque: ¿WebSocket o SSE?

  • WebSocket: bidireccional. Ideal para casos con envíos frecuentes cliente → servidor como chat, edición colaborativa, juegos.
  • SSE (Server-Sent Events): unidireccional (servidor → cliente). Ideal cuando domina la difusión: precios de acciones, badges de notificación, progreso de jobs. Funciona sobre HTTP/1.1 y atraviesa proxies fácilmente.

Puntos de decisión

  • Si necesitas bidireccionalidad, usa WebSocket. Si el foco es difusión y quieres amplio soporte de navegador y resiliencia, elige SSE.
  • Para conexiones concurrentes muy grandes, SSE puede encajar mejor en infra en algunos casos (aprovechando HTTP/2 y CDNs).
  • Las apps móviles tienden a adoptar WebSocket.

2. WebSocket mínimo (instancia única)

2.1 Gestión de conexiones

# app/realtime/manager.py
from typing import Dict, Set
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self):
        self.active: Set[WebSocket] = set()
        self.rooms: Dict[str, Set[WebSocket]] = {}

    async def connect(self, ws: WebSocket, room: str | None = None):
        await ws.accept()
        self.active.add(ws)
        if room:
            self.rooms.setdefault(room, set()).add(ws)

    def disconnect(self, ws: WebSocket):
        self.active.discard(ws)
        for r in list(self.rooms.values()):
            r.discard(ws)

    async def send_to_ws(self, ws: WebSocket, data: dict):
        await ws.send_json(data)

    async def broadcast_all(self, data: dict):
        for ws in list(self.active):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

    async def broadcast_room(self, room: str, data: dict):
        for ws in list(self.rooms.get(room, set())):
            try:
                await ws.send_json(data)
            except Exception:
                self.disconnect(ws)

2.2 Router

# app/realtime/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.manager import ConnectionManager

router = APIRouter()
manager = ConnectionManager()

@router.websocket("/ws")
async def ws_endpoint(ws: WebSocket, room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        while True:
            msg = await ws.receive_json()
            # Ejemplo: eco a la sala
            if room:
                await manager.broadcast_room(room, {"echo": msg})
            else:
                await manager.send_to_ws(ws, {"echo": msg})
    except WebSocketDisconnect:
        manager.disconnect(ws)

Puntos clave

  • Llama siempre a accept() primero.
  • En desconexión, elimina de todos los conjuntos. No filtres conexiones en excepción.

3. Autenticación y scopes

La mejora (upgrade) a WebSocket ocurre tras el handshake HTTP. Autentica usando un Bearer token o Cookie y verifica autorización para participación en salas.

# app/realtime/auth.py
from fastapi import WebSocket, HTTPException, status
from app.security.jwt import decode_token  # Reutiliza tu implementación JWT

async def authenticate_ws(ws: WebSocket):
    auth = ws.headers.get("authorization", "")
    if not auth.lower().startswith("bearer "):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing token")
    token = auth.split(" ", 1)[1]
    data = decode_token(token)
    scopes = set(str(data.get("scope","")).split())
    return {"sub": data["sub"], "scopes": scopes}

Ejemplo de uso:

# app/realtime/ws.py (con auth)
from fastapi import Depends
from app.realtime.auth import authenticate_ws

@router.websocket("/ws/secure")
async def ws_secure(ws: WebSocket, user = Depends(authenticate_ws), room: str | None = Query(None)):
    await manager.connect(ws, room)
    try:
        await manager.send_to_ws(ws, {"hello": user["sub"]})
        while True:
            msg = await ws.receive_json()
            if "articles:write" in user["scopes"]:
                await manager.broadcast_room(room or "public", {"by": user["sub"], "msg": msg})
            else:
                await manager.send_to_ws(ws, {"error": "insufficient scope"})
    except WebSocketDisconnect:
        manager.disconnect(ws)

4. Latidos (heartbeats) y reconexión

  • Envía ping periódicos servidor → cliente y corta conexiones que no respondan.
  • Los clientes se reconectan con backoff exponencial. Al reconectar, envía el último ID de evento recibido para rellenar huecos (SSE lo estandariza con Last-Event-ID).

Ejemplo en servidor:

# app/realtime/heartbeat.py
import asyncio, json
from app.realtime.manager import ConnectionManager

async def heartbeat(manager: ConnectionManager, interval=30):
    while True:
        await asyncio.sleep(interval)
        await manager.broadcast_all({"type": "ping"})

Ejecuta esto en una tarea de fondo (p. ej., en el evento startup de la app).


5. Contraflujo (backpressure) y límites de tamaño

  • Usa una cola de envío; cuando supere un umbral, descarta actualizaciones obsoletas.
  • Aplica tamaño máximo por mensaje y límites de envíos por segundo para protegerte de clientes abusivos.

Ejemplo simple de cola:

# app/realtime/queue.py
import asyncio
from collections import deque

class SendQueue:
    def __init__(self, maxlen=1000):
        self.q = deque(maxlen=maxlen)
        self.cv = asyncio.Condition()

    async def put(self, item):
        async with self.cv:
            self.q.append(item)
            self.cv.notify()

    async def consume(self):
        while True:
            async with self.cv:
                while not self.q:
                    await self.cv.wait()
                item = self.q.popleft()
            yield item

6. Escalado: Difusión entre instancias con Redis Pub/Sub

Los conjuntos de conexiones locales no alcanzan clientes conectados a otras instancias. Inserta Redis Pub/Sub para “retransmitir” mensajes a cada instancia.

6.1 Arquitectura

  • Cada instancia: entrega a conexiones locales y se suscribe a un canal de Redis.
  • Al publicar: envía localmente y publish a Redis.

6.2 Implementación de ejemplo (async)

# app/realtime/bus.py
from redis import asyncio as aioredis
from typing import Callable
import json

class RedisBus:
    def __init__(self, url: str, channel: str):
        self.url, self.channel = url, channel
        self.redis: aioredis.Redis | None = None

    async def start(self, on_message: Callable[[dict], None]):
        self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(self.channel)
        async for m in pubsub.listen():
            if m.get("type") == "message":
                on_message(json.loads(m["data"]))

    async def publish(self, msg: dict):
        if not self.redis:
            self.redis = aioredis.from_url(self.url, encoding="utf-8", decode_responses=True)
        await self.redis.publish(self.channel, json.dumps(msg))

6.3 Puntos de integración

  • Antes de broadcast_room, llama bus.publish({"room": room, "data": payload}).
  • En on_message, llama manager.broadcast_room(m["room"], m["data"]).
  • Ahora cualquier instancia puede alcanzar la misma sala.

7. SSE mínimo (difusión unidireccional)

# app/realtime/sse.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
import asyncio, json, time

router = APIRouter()

async def event_stream(queue):
    while True:
        data = await queue.get()
        yield f"id: {int(time.time()*1000)}\n"
        yield "event: message\n"
        yield f"data: {json.dumps(data)}\n\n"

@router.get("/sse")
async def sse_endpoint(request: Request):
    queue = asyncio.Queue()
    # Inserta ítems en la cola en otro lugar; para demo, inicia un ticker de 1s
    async def ticker():
        while True:
            await asyncio.sleep(1)
            await queue.put({"now": time.time()})
    asyncio.create_task(ticker())
    headers = {
        "Cache-Control": "no-cache",
        "Content-Type": "text/event-stream",
        "Connection": "keep-alive",
    }
    return StreamingResponse(event_stream(queue), headers=headers)

Puntos clave

  • Una conexión = un flujo de respuesta. Usa data: por línea y separa eventos con una línea en blanco.
  • En reconexión, soporta “catch-up” vía Last-Event-ID si es posible.

8. Ajustes de Nginx/proxy y timeouts

  • WebSocket: asegúrate de que los encabezados Upgrade / Connection se propaguen. Configura un proxy_read_timeout suficientemente largo.
  • SSE: evita el buffering del proxy; agrega X-Accel-Buffering: no (o equivalente) cuando sea posible.
  • Ajusta client_max_body_size a tus necesidades de payload upstream.
  • Multiplexar SSE sobre HTTP/2 puede reducir el número de conexiones.

9. Monitoreo y operaciones

  • Métricas: conexiones concurrentes, vida útil de conexión, tasa de reconexión, tamaño medio de mensaje, msgs/seg in/out, descartes.
  • Logs: conectar/desconectar, fallos de auth, entrar/salir de salas, desbordes de colas.
  • Pruebas de carga: valida conexiones concurrentes y flujo de mensajes con autocannon o wrk más clientes a medida.

10. Errores comunes y soluciones

Síntoma Causa Mitigación
Pérdida de mensajes al escalar Sin relé entre instancias Redis Pub/Sub o broker de mensajes para fan-out
Desconexiones abruptas Timeouts inactivos del proxy Amplía proxy_read_timeout, envía heartbeats
Crecimiento continuo de memoria Fugas en desconexión o colas sin límite Asegura desconexión en excepciones; limita colas
Escalada de privilegios Sin autorización para unirse a salas Aplica comprobación de scopes al unirse; valida tokens
Tormentas de reconexión Sin backoff Backoff exponencial en cliente; límites y 429 en servidor

11. Ejemplo: Mini app extremo a extremo

# app/main.py
from fastapi import FastAPI
from app.realtime import ws as ws_router
from app.realtime import sse as sse_router

app = FastAPI(title="Realtime Demo")
app.include_router(ws_router.router, prefix="/realtime")
app.include_router(sse_router.router, prefix="/realtime")

@app.get("/health")
def health():
    return {"ok": True}

12. Seguridad

  • Auth: verifica JWT/Cookie durante el handshake. Las salas requieren scopes.
  • Validación de entrada: valida siempre con esquemas el JSON de los clientes.
  • Limitación de tasa: controla cuentas de conexiones y frecuencia de envío por IP/usuario.
  • Secretos: no registres payloads ni tokens.
  • CORS/Origen: habilita CORS para SSE; implementa comprobación de Origin para WebSocket.

13. Hoja de ruta de despliegue por fases

  1. Implementa gestión de conexiones y salas en una sola instancia.
  2. Añade auth & scopes; define heartbeat y comportamiento de reconexión.
  3. Integra Redis Pub/Sub para escalado.
  4. Añade backpressure, rate limits y métricas de monitoreo.
  5. Ajusta el proxy, ejecuta pruebas de carga y crea dashboards.

Referencias


Conclusiones

  • Comienza eligiendo WebSocket vs SSE según requisitos: WebSocket para bidireccional, SSE para difusión.
  • En instancia única, un pool de conexiones + salas, auth y heartbeats funcionará con fiabilidad.
  • A escala, usa Redis Pub/Sub para fan-out entre instancias, y aplica backpressure y rate limiting para estabilidad.
  • Diseña desde la operativa—incluyendo observabilidad y configuración de proxies—y el tiempo real se convierte en un arma competitiva.

por greeden

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)