L7 — HTTP server
7.1 The architectural decision
scheduler.step() is a synchronous, GPU-blocking, multi-millisecond function. It launches CUDA kernels and blocks the CPU thread until the sampler's .item() calls sync. asyncio is a single-threaded cooperative event loop. A 30 ms CPU-blocking call freezes the entire server — TCP accepts stall, in-flight responses pause, everything tanks.
| option | what | verdict |
|---|---|---|
| A | Dedicated engine thread, queues across boundary | ✅ recommended for MVP |
| B | run_in_executor(None, scheduler.step) | concurrent calls race the scheduler; basically A done worse |
| C | Async-native step() | production-grade, weeks of work |
7.2 The three-component architecture
Queues:
pending_q: thread-safequeue.Queue[GenRequest]. HTTP handlers push; engine drains every step.out_qper request:asyncio.Queue[str | None]. Engine thread pushes vialoop.call_soon_threadsafe; handler awaits and yields.None= "finished".
7.3 The Engine class
@dataclass
class GenRequest:
req: Request
out_q: asyncio.Queue # str pieces; None terminates
loop: asyncio.AbstractEventLoop # the loop that owns out_q
class Engine:
def __init__(self, model, sampler, scheduler, tokenizer):
self.scheduler = scheduler
self.tokenizer = tokenizer
self.pending_q: queue.Queue[GenRequest] = queue.Queue()
self.alive = threading.Event(); self.alive.set()
self.thread = threading.Thread(target=self._run, daemon=True)
self.in_flight: dict[int, GenRequest] = {}
def start(self): self.thread.start()
def stop (self): self.alive.clear(); self.thread.join(timeout=5)
def submit(self, gen): self.pending_q.put_nowait(gen)
def _run(self):
while self.alive.is_set():
while True:
try: gen = self.pending_q.get_nowait()
except queue.Empty: break
self.in_flight[gen.req.id] = gen
self.scheduler.add_request(gen.req)
if not self.scheduler.has_unfinished():
self.alive.wait(timeout=0.005) # short wait, wakes on stop()
continue
res = self.scheduler.step()
for rid, tok in res.new_tokens.items():
gen = self.in_flight[rid]
piece = gen.req.detok.push(tok)
if piece:
self._send(gen, piece)
for rid in res.finished:
gen = self.in_flight.pop(rid)
tail = gen.req.detok.flush()
if tail: self._send(gen, tail)
self._send(gen, None) # EOS
def _send(self, gen, item):
gen.loop.call_soon_threadsafe(gen.out_q.put_nowait, item)
cross-thread async putsloop.call_soon_threadsafe(q.put_nowait, item)is the only safe way to push into anasyncio.Queuefrom a non-event-loop thread. Direct.put_nowait()from another thread is undefined behaviour in CPython — works 95% of the time, deadlocks at scale.
7.4 The FastAPI app
class GenerateBody(BaseModel):
prompt: str
max_tokens: int = 64
temperature: float = 0.0
top_k: int = 0
top_p: float = 1.0
rep_penalty: float = 1.0
stream: bool = True
@asynccontextmanager
async def lifespan(app):
model = load_model(MODEL_DIR); model.eval()
cfg = model.cfg
tok = load_tokenizer(MODEL_DIR)
alloc = BlockAllocator(NUM_SLOTS)
pool = KvPool(cfg.num_hidden_layers, alloc.num_blocks, alloc.block_size,
NUM_SLOTS, cfg.num_key_value_heads, cfg.head_dim,
dtype=cfg.dtype, device="cuda")
sched = Scheduler(model, Sampler(), alloc, pool, eos_id=cfg.eos_token_id)
engine = Engine(model, Sampler(), sched, tok); engine.start()
state.update(engine=engine, tokenizer=tok, next_id=count())
yield
engine.stop()
app = FastAPI(lifespan=lifespan)
@app.post("/generate")
async def generate(body: GenerateBody):
engine = state["engine"]; tok = state["tokenizer"]
rid = next(state["next_id"])
prompt_ids = tok(body.prompt, return_tensors="pt").input_ids[0].cuda()
detok = IncrementalDetokenizer(tok, prompt_ids)
req = Request(id=rid, prompt_ids=prompt_ids,
sampling_param=dict(temperature=body.temperature, top_k=body.top_k,
top_p=body.top_p, rep_penalty=body.rep_penalty),
max_tokens=body.max_tokens, detok=detok)
out_q = asyncio.Queue()
engine.submit(GenRequest(req=req, out_q=out_q, loop=asyncio.get_running_loop()))
async def event_stream():
try:
while True:
piece = await out_q.get()
if piece is None:
yield "data: [DONE]\n\n"; break
yield f"data: {json.dumps({'text': piece})}\n\n"
except asyncio.CancelledError:
engine.cancel(rid) # tell engine to drop and free KV
raise
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.get("/health")
async def health():
return {"status": "ok"}
Boot
uvicorn mini_sglang.server:app --host 0.0.0.0 --port 8000 --workers 1
Always --workers 1. Each worker would load its own 16 GiB model copy → OOM. Multi-worker scaling is a separate process per GPU + shared scheduler, beyond MVP scope.
7.5 The SSE protocol
Two formats in the wild; we use SSE for OpenAI compatibility:
data: {"text": "Paris"}\n\n
data: {"text": "."}\n\n
data: {"text": " The"}\n\n
...
data: [DONE]\n\n
Client parses with fetch + ReadableStream in browsers or httpx.AsyncClient.stream + aiter_lines in Python. The data: [DONE]\n\n terminator is OpenAI convention — many clients expect it.
7.6 Five things to verify
- Hello-world streaming:
curl -N -X POST http://localhost:8000/generate -d '{"prompt":"The capital of France is","max_tokens":20}'— tokens stream in as the model decodes, not as one blob. - Two concurrent requests share the scheduler: fire two
curls in parallel; both complete in roughly the same time, not 2× single. Server logs should show mixed-batch steps. - Disconnect mid-stream: kill a
curlwith Ctrl-C. Server doesn't crash. KV blocks freed (verified by VRAM print). /healthindependent of model: even whilescheduler.step()is mid-flight,GET /healthreturns in <10ms. This is the test that the event loop isn't blocked. If it takes 30 ms, you've put the scheduler on the event-loop thread.- Memory stable across requests: 100 sequential requests don't grow VRAM monotonically. KV blocks freed when requests finish.
7.7 Cancellation + KV cleanup
When the client disconnects, FastAPI raises asyncio.CancelledError. Catch it and notify the engine:
except asyncio.CancelledError:
engine.cancel(rid)
raise
# in Engine:
def cancel(self, rid):
self.cancelled.add(rid)
# in _run, after step():
for rid in list(self.cancelled):
self.scheduler.finish(rid) # removes from running/waiting, frees KV
self.in_flight.pop(rid, None)
self.cancelled.discard(rid)
Without this, generation keeps running for orphaned requests. Wasted GPU and leaked KV.
And in the scheduler's writeback path, when a request becomes finished:
self.alloc.free(req.blocks)
req.blocks.clear()
req.slot_indices.clear()
7.8 Pitfalls (table of eight)
| pitfall | symptom |
|---|---|
direct .put_nowait() into asyncio.Queue from other thread | works 95% of the time, deadlocks at scale |
uvicorn --workers N with N>1 | OOM (N × 16 GiB) |
| forget to free KV on request finish | gradual OOM after ~100 requests |
run step() in executor without lock | concurrent KV writes → garbage |
| busy-spin scheduler loop when idle | one CPU core pegged at 100% |
time.sleep in engine thread | shutdown takes up to that duration |
print() in handler | blocks event loop on disk I/O; use logging |
| return token IDs not text | client re-implements detokenizer; defeats L6 |
7.9 The smoke test
# scripts/l7_smoke.py — 3 concurrent requests
async def one(prompt, idx):
async with httpx.AsyncClient(timeout=120) as c:
async with c.stream("POST", URL, json={"prompt": prompt, "max_tokens": 30}) as r:
buf = ""; t0 = time.time(); ttft = None
async for line in r.aiter_lines():
if not line.startswith("data: "): continue
if line == "data: [DONE]": break
ttft = ttft or (time.time() - t0)
buf += json.loads(line[6:])["text"]
print(f"[{idx}] TTFT={ttft:.2f}s total={time.time()-t0:.2f}s")
return buf
results = await asyncio.gather(
one("The capital of France is", 0),
one("Once upon a time", 1),
one("Python is", 2),
)
assert all(results)
print("✅ L7 PASS")
7.10 Acceptance
L7 pass criteria
uvicorn mini_sglang.server:appboots, model loaded once.curl localhost:8000/healthreturns instantly even during generation.curl -N localhost:8000/generate -d '{...}'streams text fragments.- Two simultaneous
curls produce two valid streams, shared by the scheduler.scripts/l7_smoke.pypasses.- Repeated requests don't grow VRAM.
After L7 lands you have a real product. L8 (radix prefix cache) and L9 (CUDA graphs) are throughput/latency optimisations on top.