Emitter
The Python Emitter parses raw LLM token output and emits StreamingChunk patch operations.
Designed for FastAPI, Flask, and Django backends. Transport is out of scope — serialise
patches with chunk.to_json() however your stack requires.
Import
from jsoncurrent import EmitterConstructor
emitter = Emitter(
root: str = "",
completions: bool = True
)| Param | Type | Default | Description |
|---|---|---|---|
root | str | '' | Namespace prefix for all emitted paths. E.g. 'prediction' produces paths like prediction.title. |
completions | bool | True | Emit complete patches when a value is fully assembled. Set False on high-throughput routes where pathcomplete events are not used. |
Methods
emitter.write(token)
Feed a raw token from the LLM stream.
for text in stream.text_stream:
emitter.write(text)| Param | Type | Description |
|---|---|---|
token | str | Raw token string from the LLM response stream. |
emitter.flush()
Signal end of stream. Finalises any pending state, emits complete, and resets internal
state. Call this when the LLM stream closes.
emitter.flush()emitter.reset()
Reset internal state without emitting complete. Preserves event listeners and middleware.
emitter.use(fn)
Register a middleware function. Returns self for chaining.
emitter.use(my_middleware)emitter.on(event, fn)
Register an event listener. Returns self for chaining.
emitter.on('patch', lambda chunk: print(chunk.to_json()))emitter.off(event, fn)
Remove an event listener.
Events
patch
Fires for each StreamingChunk emitted. Serialise with chunk.to_json().
def on_patch(chunk):
response.write(f"data: {chunk.to_json()}\n\n")
emitter.on('patch', on_patch)complete
Fires when flush() is called.
emitter.on('complete', lambda: response.write("data: [DONE]\n\n"))error
Fires on parse errors.
emitter.on('error', lambda err: logger.error(err))FastAPI example
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from jsoncurrent import Emitter
import anthropic
import asyncio
app = FastAPI()
client = anthropic.Anthropic()
@app.get('/stream')
async def stream():
queue = asyncio.Queue()
emitter = Emitter()
emitter.on('patch', lambda chunk: queue.put_nowait(f"data: {chunk.to_json()}\n\n"))
emitter.on('complete', lambda: queue.put_nowait("data: [DONE]\n\n"))
async def generate():
with client.messages.stream(
model="claude-opus-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": "Generate a report as JSON..."}],
) as stream:
for text in stream.text_stream:
emitter.write(text)
emitter.flush()
while not queue.empty():
yield await queue.get()
return StreamingResponse(generate(), media_type="text/event-stream")Flask example
from flask import Flask, Response, stream_with_context
from jsoncurrent import Emitter
import anthropic
app = Flask(__name__)
client = anthropic.Anthropic()
@app.get('/stream')
def stream():
def generate():
emitter = Emitter()
patches = []
emitter.on('patch', patches.append)
with client.messages.stream(
model="claude-opus-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": "Generate a report as JSON..."}],
) as stream:
for text in stream.text_stream:
emitter.write(text)
for chunk in patches:
yield f"data: {chunk.to_json()}\n\n"
patches.clear()
emitter.flush()
yield "data: [DONE]\n\n"
return Response(stream_with_context(generate()), mimetype="text/event-stream")