Problem 1 — StreamingResponse doesn't stream, returns full response
I have an LLM chatbot endpoint that streams tokens to the browser as they are generated:
@landing_page_router.post("/stream", dependencies=[Depends(limiter)])
async def chat_landing_page_stream(
request: LandingPageChatRequest,
open_router: ChatOpenRouter = Depends(get_openrouter_client),
qdrant_manager: QdrantManager = Depends(get_qdrant_manager),
mongodb: AsyncIOMotorDatabase = Depends(get_mongo_db),
):
chat_service = ChatService(mongodb, open_router, qdrant_manager)
async def event_generator():
try:
async for token in chat_service.stream_chat_landing_page(
request.prompt, request.session_id
):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Error in streaming endpoint: {e}", exc_info=True)
yield f"data: {json.dumps({'token': 'I apologize, I encountered an error.'})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
On browser, the entire response is buffered and delivered at once — the user sees a blank screen until the LLM finishes, then the full text appears. Streaming never works. This works perfectly when I run uvicorn locally.
https://www.pythonanywhere.com/forums/topic/32484/
I can see that HTTP works for streaming but it not an option for me.
Problem 2 — SSE connections hang indefinitely ("pending" in browser DevTools)
I have a real-time event bus that pushes live notifications to the frontend via Server-Sent Events:
# app/core/event_bus.py
import asyncio
from typing import Any
# Global registry: session_id -> list of subscriber queues
_subscribers: dict[str, list[asyncio.Queue]] = {}
def subscribe(session_id: str) -> asyncio.Queue:
"""Create a new queue for the given session_id and register it."""
queue: asyncio.Queue = asyncio.Queue()
_subscribers.setdefault(session_id, []).append(queue)
return queue
def unsubscribe(session_id: str, queue: asyncio.Queue) -> None:
"""Remove a queue from the session's subscriber list; clean up if empty."""
queues = _subscribers.get(session_id)
if queues is None:
return
try:
queues.remove(queue)
except ValueError:
pass
if not queues:
_subscribers.pop(session_id, None)
def publish(session_id: str, data: dict[str, Any]) -> None:
"""Push data to every subscriber queue for the given session_id."""
queues = _subscribers.get(session_id)
if not queues:
return
for queue in queues:
queue.put_nowait(data)
async def append_chat_messages(self, session_id: str, tenant_id: str, chatbot_id: str, external_channel: str, external_user_id: str, messages: list[ChatMessage]) -> bool:
"""Append multiple messages to session's message array in one operation."""
try:
message_dicts = [message.model_dump(by_alias=True, exclude_none=True) for message in messages]
current_time = datetime.utcnow()
# Update chat messages with all messages at once
result = await self.chat_messages_collection.update_one(
{"session_id": session_id},
{
"$push": {"messages": {"$each": message_dicts}},
"$set": {"updated_at": current_time},
"$setOnInsert": {
"tenant_id": tenant_id,
"created_at": current_time,
"chatbot_id": chatbot_id,
"external_channel": external_channel,
"external_user_id": external_user_id,
"_id": ObjectId(),
},
},
upsert=True,
)
# Update analytics metadata with correct count
await self.chats_collection.update_one(
{"session_id": session_id},
{
"$inc": {"message_count": len(messages)},
"$set": {
"updated_at": current_time,
},
"$setOnInsert": {"created_at": current_time},
},
upsert=True,
)
event_bus_publish(session_id, {"event": "new_messages", "count": len(messages)})
event_bus_publish(f"tenant:{tenant_id}", {"event": "new_messages", "session_id": session_id, "count": len(messages)})
return result.acknowledged
except Exception as e:
logger.error(f"Error appending chat messages: {e}")
return False
Browser's EventSource connection stays in "pending" state in DevTools forever. No events are ever received. The SSE connection never completes . I know ASGI support is in beta. Any chances will these features getting implemented soon? or are there any workarounds for now?