Skip to Content
API ReferencePythonEmitter (Python)

Emitter

The Python Emitter parses raw LLM token output and emits StreamingChunk patch operations. Designed for FastAPI, Flask, and Django backends. Transport is out of scope — serialise patches with chunk.to_json() however your stack requires.

Import

from jsoncurrent import Emitter

Constructor

emitter = Emitter( root: str = "", completions: bool = True )
ParamTypeDefaultDescription
rootstr''Namespace prefix for all emitted paths. E.g. 'prediction' produces paths like prediction.title.
completionsboolTrueEmit complete patches when a value is fully assembled. Set False on high-throughput routes where pathcomplete events are not used.

Methods

emitter.write(token)

Feed a raw token from the LLM stream.

for text in stream.text_stream: emitter.write(text)
ParamTypeDescription
tokenstrRaw token string from the LLM response stream.

emitter.flush()

Signal end of stream. Finalises any pending state, emits complete, and resets internal state. Call this when the LLM stream closes.

emitter.flush()

emitter.reset()

Reset internal state without emitting complete. Preserves event listeners and middleware.

emitter.use(fn)

Register a middleware function. Returns self for chaining.

emitter.use(my_middleware)

emitter.on(event, fn)

Register an event listener. Returns self for chaining.

emitter.on('patch', lambda chunk: print(chunk.to_json()))

emitter.off(event, fn)

Remove an event listener.

Events

patch

Fires for each StreamingChunk emitted. Serialise with chunk.to_json().

def on_patch(chunk): response.write(f"data: {chunk.to_json()}\n\n") emitter.on('patch', on_patch)

complete

Fires when flush() is called.

emitter.on('complete', lambda: response.write("data: [DONE]\n\n"))

error

Fires on parse errors.

emitter.on('error', lambda err: logger.error(err))

FastAPI example

from fastapi import FastAPI from fastapi.responses import StreamingResponse from jsoncurrent import Emitter import anthropic import asyncio app = FastAPI() client = anthropic.Anthropic() @app.get('/stream') async def stream(): queue = asyncio.Queue() emitter = Emitter() emitter.on('patch', lambda chunk: queue.put_nowait(f"data: {chunk.to_json()}\n\n")) emitter.on('complete', lambda: queue.put_nowait("data: [DONE]\n\n")) async def generate(): with client.messages.stream( model="claude-opus-4-6", max_tokens=4096, messages=[{"role": "user", "content": "Generate a report as JSON..."}], ) as stream: for text in stream.text_stream: emitter.write(text) emitter.flush() while not queue.empty(): yield await queue.get() return StreamingResponse(generate(), media_type="text/event-stream")

Flask example

from flask import Flask, Response, stream_with_context from jsoncurrent import Emitter import anthropic app = Flask(__name__) client = anthropic.Anthropic() @app.get('/stream') def stream(): def generate(): emitter = Emitter() patches = [] emitter.on('patch', patches.append) with client.messages.stream( model="claude-opus-4-6", max_tokens=4096, messages=[{"role": "user", "content": "Generate a report as JSON..."}], ) as stream: for text in stream.text_stream: emitter.write(text) for chunk in patches: yield f"data: {chunk.to_json()}\n\n" patches.clear() emitter.flush() yield "data: [DONE]\n\n" return Response(stream_with_context(generate()), mimetype="text/event-stream")
Last updated on