mini_sglang

L7 — HTTP server

FastAPI + SSE streaming. Where mini_sglang stops being a script and starts being a product.

7.1 The architectural decision

scheduler.step() is a synchronous, GPU-blocking, multi-millisecond function. It launches CUDA kernels and blocks the CPU thread until the sampler's .item() calls sync. asyncio is a single-threaded cooperative event loop. A 30 ms CPU-blocking call freezes the entire server — TCP accepts stall, in-flight responses pause, everything tanks.

optionwhatverdict
ADedicated engine thread, queues across boundaryrecommended for MVP
Brun_in_executor(None, scheduler.step)concurrent calls race the scheduler; basically A done worse
CAsync-native step()production-grade, weeks of work

7.2 The three-component architecture

┌─────────────────────────────────────────────────────────────────┐ │ FastAPI process │ │ │ │ ┌────────────────────────┐ ┌────────────────────────────┐ │ │ │ asyncio event loop │ │ Engine thread │ │ │ │ ────────────────────── │ │ ────────────────────────── │ │ │ │ POST /generate handler │ │ while alive: │ │ │ │ → make Request │ │ drain pending_in_q │ │ │ │ → put in pending_q │←──→│ res = scheduler.step() │ │ │ │ → await tokens from │ │ for each (rid, tok): │ │ │ │ this_req's queue │ │ out_qs[rid].put(piece) │ │ │ │ → yield SSE chunks │ │ for each finished rid: │ │ │ │ │ │ out_qs[rid].put(None) │ │ │ └────────────────────────┘ └────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Queues:

7.3 The Engine class

@dataclass
class GenRequest:
    req:   Request
    out_q: asyncio.Queue                  # str pieces; None terminates
    loop:  asyncio.AbstractEventLoop      # the loop that owns out_q


class Engine:
    def __init__(self, model, sampler, scheduler, tokenizer):
        self.scheduler  = scheduler
        self.tokenizer  = tokenizer
        self.pending_q: queue.Queue[GenRequest] = queue.Queue()
        self.alive      = threading.Event(); self.alive.set()
        self.thread     = threading.Thread(target=self._run, daemon=True)
        self.in_flight: dict[int, GenRequest] = {}

    def start(self):                self.thread.start()
    def stop (self):                self.alive.clear(); self.thread.join(timeout=5)
    def submit(self, gen):          self.pending_q.put_nowait(gen)

    def _run(self):
        while self.alive.is_set():
            while True:
                try: gen = self.pending_q.get_nowait()
                except queue.Empty: break
                self.in_flight[gen.req.id] = gen
                self.scheduler.add_request(gen.req)

            if not self.scheduler.has_unfinished():
                self.alive.wait(timeout=0.005)    # short wait, wakes on stop()
                continue

            res = self.scheduler.step()
            for rid, tok in res.new_tokens.items():
                gen   = self.in_flight[rid]
                piece = gen.req.detok.push(tok)
                if piece:
                    self._send(gen, piece)
            for rid in res.finished:
                gen  = self.in_flight.pop(rid)
                tail = gen.req.detok.flush()
                if tail: self._send(gen, tail)
                self._send(gen, None)              # EOS

    def _send(self, gen, item):
        gen.loop.call_soon_threadsafe(gen.out_q.put_nowait, item)
cross-thread async puts loop.call_soon_threadsafe(q.put_nowait, item) is the only safe way to push into an asyncio.Queue from a non-event-loop thread. Direct .put_nowait() from another thread is undefined behaviour in CPython — works 95% of the time, deadlocks at scale.

7.4 The FastAPI app

class GenerateBody(BaseModel):
    prompt:     str
    max_tokens: int   = 64
    temperature: float = 0.0
    top_k:      int   = 0
    top_p:      float = 1.0
    rep_penalty: float = 1.0
    stream:     bool  = True

@asynccontextmanager
async def lifespan(app):
    model = load_model(MODEL_DIR); model.eval()
    cfg   = model.cfg
    tok   = load_tokenizer(MODEL_DIR)
    alloc = BlockAllocator(NUM_SLOTS)
    pool  = KvPool(cfg.num_hidden_layers, alloc.num_blocks, alloc.block_size,
                   NUM_SLOTS, cfg.num_key_value_heads, cfg.head_dim,
                   dtype=cfg.dtype, device="cuda")
    sched = Scheduler(model, Sampler(), alloc, pool, eos_id=cfg.eos_token_id)
    engine = Engine(model, Sampler(), sched, tok); engine.start()
    state.update(engine=engine, tokenizer=tok, next_id=count())
    yield
    engine.stop()

app = FastAPI(lifespan=lifespan)

@app.post("/generate")
async def generate(body: GenerateBody):
    engine = state["engine"]; tok = state["tokenizer"]
    rid    = next(state["next_id"])
    prompt_ids = tok(body.prompt, return_tensors="pt").input_ids[0].cuda()
    detok = IncrementalDetokenizer(tok, prompt_ids)
    req   = Request(id=rid, prompt_ids=prompt_ids,
                    sampling_param=dict(temperature=body.temperature, top_k=body.top_k,
                                        top_p=body.top_p, rep_penalty=body.rep_penalty),
                    max_tokens=body.max_tokens, detok=detok)

    out_q = asyncio.Queue()
    engine.submit(GenRequest(req=req, out_q=out_q, loop=asyncio.get_running_loop()))

    async def event_stream():
        try:
            while True:
                piece = await out_q.get()
                if piece is None:
                    yield "data: [DONE]\n\n"; break
                yield f"data: {json.dumps({'text': piece})}\n\n"
        except asyncio.CancelledError:
            engine.cancel(rid)         # tell engine to drop and free KV
            raise

    return StreamingResponse(event_stream(), media_type="text/event-stream")

@app.get("/health")
async def health():
    return {"status": "ok"}

Boot

uvicorn mini_sglang.server:app --host 0.0.0.0 --port 8000 --workers 1

Always --workers 1. Each worker would load its own 16 GiB model copy → OOM. Multi-worker scaling is a separate process per GPU + shared scheduler, beyond MVP scope.

7.5 The SSE protocol

Two formats in the wild; we use SSE for OpenAI compatibility:

data: {"text": "Paris"}\n\n
data: {"text": "."}\n\n
data: {"text": " The"}\n\n
...
data: [DONE]\n\n

Client parses with fetch + ReadableStream in browsers or httpx.AsyncClient.stream + aiter_lines in Python. The data: [DONE]\n\n terminator is OpenAI convention — many clients expect it.

7.6 Five things to verify

  1. Hello-world streaming: curl -N -X POST http://localhost:8000/generate -d '{"prompt":"The capital of France is","max_tokens":20}' — tokens stream in as the model decodes, not as one blob.
  2. Two concurrent requests share the scheduler: fire two curls in parallel; both complete in roughly the same time, not 2× single. Server logs should show mixed-batch steps.
  3. Disconnect mid-stream: kill a curl with Ctrl-C. Server doesn't crash. KV blocks freed (verified by VRAM print).
  4. /health independent of model: even while scheduler.step() is mid-flight, GET /health returns in <10ms. This is the test that the event loop isn't blocked. If it takes 30 ms, you've put the scheduler on the event-loop thread.
  5. Memory stable across requests: 100 sequential requests don't grow VRAM monotonically. KV blocks freed when requests finish.

7.7 Cancellation + KV cleanup

When the client disconnects, FastAPI raises asyncio.CancelledError. Catch it and notify the engine:

except asyncio.CancelledError:
    engine.cancel(rid)
    raise

# in Engine:
def cancel(self, rid):
    self.cancelled.add(rid)

# in _run, after step():
for rid in list(self.cancelled):
    self.scheduler.finish(rid)        # removes from running/waiting, frees KV
    self.in_flight.pop(rid, None)
    self.cancelled.discard(rid)

Without this, generation keeps running for orphaned requests. Wasted GPU and leaked KV.

And in the scheduler's writeback path, when a request becomes finished:

self.alloc.free(req.blocks)
req.blocks.clear()
req.slot_indices.clear()

7.8 Pitfalls (table of eight)

pitfallsymptom
direct .put_nowait() into asyncio.Queue from other threadworks 95% of the time, deadlocks at scale
uvicorn --workers N with N>1OOM (N × 16 GiB)
forget to free KV on request finishgradual OOM after ~100 requests
run step() in executor without lockconcurrent KV writes → garbage
busy-spin scheduler loop when idleone CPU core pegged at 100%
time.sleep in engine threadshutdown takes up to that duration
print() in handlerblocks event loop on disk I/O; use logging
return token IDs not textclient re-implements detokenizer; defeats L6

7.9 The smoke test

# scripts/l7_smoke.py — 3 concurrent requests
async def one(prompt, idx):
    async with httpx.AsyncClient(timeout=120) as c:
        async with c.stream("POST", URL, json={"prompt": prompt, "max_tokens": 30}) as r:
            buf = ""; t0 = time.time(); ttft = None
            async for line in r.aiter_lines():
                if not line.startswith("data: "): continue
                if line == "data: [DONE]": break
                ttft = ttft or (time.time() - t0)
                buf += json.loads(line[6:])["text"]
            print(f"[{idx}] TTFT={ttft:.2f}s total={time.time()-t0:.2f}s")
            return buf

results = await asyncio.gather(
    one("The capital of France is", 0),
    one("Once upon a time",         1),
    one("Python is",                2),
)
assert all(results)
print("✅ L7 PASS")

7.10 Acceptance

L7 pass criteria

After L7 lands you have a real product. L8 (radix prefix cache) and L9 (CUDA graphs) are throughput/latency optimisations on top.

← L6 Home