Forums

ASGI support needed — StreamingResponse and SSE broken on FastAPI

Problem 1 — StreamingResponse doesn't stream, returns full response

I have an LLM chatbot endpoint that streams tokens to the browser as they are generated:

@landing_page_router.post("/stream", dependencies=[Depends(limiter)])
async def chat_landing_page_stream(
    request: LandingPageChatRequest,
    open_router: ChatOpenRouter = Depends(get_openrouter_client),
    qdrant_manager: QdrantManager = Depends(get_qdrant_manager),
    mongodb: AsyncIOMotorDatabase = Depends(get_mongo_db),
):
    chat_service = ChatService(mongodb, open_router, qdrant_manager)

    async def event_generator():
        try:
            async for token in chat_service.stream_chat_landing_page(
                request.prompt, request.session_id
            ):
                yield f"data: {json.dumps({'token': token})}\n\n"
            yield "data: [DONE]\n\n"
        except Exception as e:
            logger.error(f"Error in streaming endpoint: {e}", exc_info=True)
            yield f"data: {json.dumps({'token': 'I apologize, I encountered an error.'})}\n\n"
            yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

On browser, the entire response is buffered and delivered at once — the user sees a blank screen until the LLM finishes, then the full text appears. Streaming never works. This works perfectly when I run uvicorn locally.

https://www.pythonanywhere.com/forums/topic/32484/

I can see that HTTP works for streaming but it not an option for me.

Problem 2 — SSE connections hang indefinitely ("pending" in browser DevTools)

I have a real-time event bus that pushes live notifications to the frontend via Server-Sent Events:

# app/core/event_bus.py
import asyncio
from typing import Any

# Global registry: session_id -> list of subscriber queues
_subscribers: dict[str, list[asyncio.Queue]] = {}


def subscribe(session_id: str) -> asyncio.Queue:
    """Create a new queue for the given session_id and register it."""
    queue: asyncio.Queue = asyncio.Queue()
    _subscribers.setdefault(session_id, []).append(queue)
    return queue


def unsubscribe(session_id: str, queue: asyncio.Queue) -> None:
    """Remove a queue from the session's subscriber list; clean up if empty."""
    queues = _subscribers.get(session_id)
    if queues is None:
        return
    try:
        queues.remove(queue)
    except ValueError:
        pass
    if not queues:
        _subscribers.pop(session_id, None)


def publish(session_id: str, data: dict[str, Any]) -> None:
    """Push data to every subscriber queue for the given session_id."""
    queues = _subscribers.get(session_id)
    if not queues:
        return
    for queue in queues:
        queue.put_nowait(data)

async def append_chat_messages(self, session_id: str, tenant_id: str, chatbot_id: str, external_channel: str, external_user_id: str, messages: list[ChatMessage]) -> bool:
    """Append multiple messages to session's message array in one operation."""
    try:
        message_dicts = [message.model_dump(by_alias=True, exclude_none=True) for message in messages]
        current_time = datetime.utcnow()

        # Update chat messages with all messages at once
        result = await self.chat_messages_collection.update_one(
            {"session_id": session_id},
            {
                "$push": {"messages": {"$each": message_dicts}},
                "$set": {"updated_at": current_time},
                "$setOnInsert": {
                    "tenant_id": tenant_id,
                    "created_at": current_time,
                    "chatbot_id": chatbot_id,
                    "external_channel": external_channel,
                    "external_user_id": external_user_id,
                    "_id": ObjectId(),
                },
            },
            upsert=True,
        )

        # Update analytics metadata with correct count
        await self.chats_collection.update_one(
            {"session_id": session_id},
            {
                "$inc": {"message_count": len(messages)},
                "$set": {
                    "updated_at": current_time,
                },
                "$setOnInsert": {"created_at": current_time},
            },
            upsert=True,
        )

        event_bus_publish(session_id, {"event": "new_messages", "count": len(messages)})
        event_bus_publish(f"tenant:{tenant_id}", {"event": "new_messages", "session_id": session_id, "count": len(messages)})

        return result.acknowledged
    except Exception as e:
        logger.error(f"Error appending chat messages: {e}")
        return False

Browser's EventSource connection stays in "pending" state in DevTools forever. No events are ever received. The SSE connection never completes . I know ASGI support is in beta. Any chances will these features getting implemented soon? or are there any workarounds for now?

We are aware that SSE does not work in ASGI apps, but do not have an estimate of when we will add it.

Thank you! Looking forward to it . I love this platform and we literally need this feature !

Glad to hear that!