The engine is the runtime layer that sits between user-facing API calls and the raw model forward pass. It orchestrates scheduling, memory, batching, and (optionally) multiple GPU processes.
Component overview
LLMEngine
├── Scheduler
│ ├── BlockManager (KV cache block allocation + prefix caching)
│ └── Sequence[] (waiting / running / finished queues)
└── ModelRunner (rank 0)
└── ModelRunner (rank 1 … N, worker processes)
| Component | Responsibility |
|---|
LLMEngine | Public API; owns the main loop |
Scheduler | Decides which sequences run each step |
BlockManager | Allocates and frees paged KV cache blocks |
Sequence | Tracks token IDs, block table, and status for one request |
ModelRunner | Prepares tensors, executes the model, samples tokens |
Request lifecycle
add_prompt
The caller passes a prompt string. LLMEngine.add_prompt tokenizes it and wraps the token IDs in a Sequence object, then hands it to the scheduler.def add_prompt(self, prompt: str, sampling_params: SamplingParams) -> None:
self.scheduler.add_sequence(
Sequence(
token_ids=self.tokenizer.encode(prompt),
block_size=self.config['block_size'],
sampling_params=sampling_params,
)
)
The sequence enters the waiting deque with status WAITING. schedule
Scheduler.schedule() is called at the start of each step. It returns a list of sequences to run and a boolean indicating whether this is a prefill step.Prefill is always tried first. If any waiting sequence can fit within the token budget and KV cache budget, it is moved to running and included in the batch. Only if no prefill is possible does the scheduler schedule decode steps for already-running sequences.def schedule(self) -> tuple[list[Sequence], bool]:
# 1. Try to schedule prefill from waiting queue
while self.waiting:
seq = self.waiting[0]
if self.block_manager.can_allocate(seq) and fits_token_budget:
self.block_manager.allocate(seq)
seq.status = SequenceStatus.RUNNING
scheduled.append(seq)
else:
break
if scheduled:
return scheduled, True # is_prefill=True
# 2. Fall back to decode
while self.running:
if self.block_manager.can_append(seq):
self.block_manager.append(seq)
scheduled.append(seq)
else:
self.preempt(seq) # evict to waiting queue
return scheduled, False # is_prefill=False
model_runner.run
ModelRunner.run prepares the input tensors and executes the model:def run(self, seqs: list[Sequence], is_prefill: bool) -> list[int]:
input_ids = self.prepare_prefill(seqs) if is_prefill \
else self.prepare_decode(seqs)
logits = self.run_model(input_ids, is_prefill)
token_ids = None
if self.rank == 0:
token_ids = self.sampler(logits, self.prepare_sample(seqs))
reset_context()
return token_ids
Worker processes (rank > 0) participate in the model forward pass via collective operations but do not sample.postprocess
Scheduler.postprocess appends the new token to each sequence and checks stopping conditions.def postprocess(self, seqs, token_ids):
for seq, token_id in zip(seqs, token_ids):
seq.append_token(token_id)
stop = (
(not seq.ignore_eos and token_id == self.eos) or
seq.num_completion_tokens >= seq.max_tokens or
seq.num_tokens >= seq.max_model_length
)
if stop:
seq.status = SequenceStatus.FINISHED
self.block_manager.deallocate(seq)
self.running.remove(seq)
The generate() method
generate is the top-level entry point for batch inference.
def generate(
self, prompts: list[str], sampling_params: SamplingParams
) -> list[str]:
for prompt in prompts:
self.add_prompt(prompt, sampling_params)
generated_tokens = {}
while not self.scheduler.is_finished():
outputs, num_processed_tokens, is_prefill = self.step()
generated_tokens.update({seq_id: tokens for seq_id, tokens in outputs})
# Decode token IDs back to strings, preserving input order
generated_tokens = [
generated_tokens[seq_id] for seq_id in sorted(generated_tokens.keys())
]
return {
'text': [self.tokenizer.decode(tokens) for tokens in generated_tokens],
'token_ids': generated_tokens,
}
The loop calls step() until scheduler.is_finished() returns True (both waiting and running queues are empty). Results are collected as sequences finish and returned in the original prompt order.
The step() method
def step(self) -> tuple[list[tuple[int, list[int]]], int, bool]:
scheduled_sequences, is_prefill = self.scheduler.schedule()
if not scheduled_sequences:
return [], 0, is_prefill
outputs = self.model_runner.call("run", scheduled_sequences, is_prefill)
if outputs is not None:
outputs = outputs.cpu().tolist()
self.scheduler.postprocess(scheduled_sequences, outputs)
finished = [
(seq.seq_id, seq.completion_token_ids)
for seq in scheduled_sequences if seq.is_finished
]
# prefill: count all tokens; decode: count one per sequence
num_processed_tokens = (
sum(len(seq) for seq in scheduled_sequences)
if is_prefill else len(scheduled_sequences)
)
return finished, num_processed_tokens, is_prefill
model_runner.call("run", ...) dispatches to the correct method on both rank 0 and worker ranks via shared memory (see Multi-GPU Inference for details).
ModelRunner responsibilities
prepare_prefill
Builds a flat 1-D input_ids tensor by concatenating all token IDs from all sequences (excluding already-cached prefix tokens). Also computes:
cu_seqlens_q — cumulative query sequence lengths, e.g. [0, 5, 8, 12]
slot_mapping — physical cache slot for each new token to write into
block_tables — mapping from logical block index to physical block ID (for cross-sequence prefix reuse)
def prepare_prefill(self, seqs: list[Sequence]) -> torch.Tensor:
input_ids, slot_mappings, cu_seqlens_q = [], [], [0]
for seq in seqs:
input_ids.extend(seq.token_ids[seq.num_cached_tokens:])
cu_seqlens_q.append(cu_seqlens_q[-1] + len(seq) - seq.num_cached_tokens)
# ... slot_mapping for new blocks
set_context(is_prefill=True, cu_seqlens_q=..., slot_mapping=..., ...)
return torch.tensor(input_ids, ...).cuda(non_blocking=True)
pin_memory=True and cuda(non_blocking=True) are used together so the CPU→GPU transfer happens asynchronously via DMA, overlapping with other CPU work.
prepare_decode
For decode, each sequence contributes exactly one token (its most recently generated token). The method produces:
input_ids of shape (batch_size,)
context_lens — total tokens processed so far per sequence
slot_mapping — the single new cache slot for each sequence
block_tables — full block table for reading KV history
run_model
@torch.inference_mode()
def run_model(self, input_ids: torch.Tensor, is_prefill: bool) -> torch.Tensor:
if is_prefill or self.enforce_eager:
hidden_states = self.model(input_ids)
return self.model.compute_logits(hidden_states)
else:
# Decode: use the pre-captured CUDA graph
bs = input_ids.size(0)
graph = self.graphs[next(bs_ for bs_ in self.graphs if bs_ >= bs)]
vars = self.graph_vars
vars['input_ids'][:bs].copy_(input_ids)
vars['context_lens'][:bs].copy_(context.context_lens)
vars['block_tables'][:bs].copy_(context.block_tables)
graph.replay()
return self.model.compute_logits(vars['outputs'][:bs])
CUDA graph optimization
CUDA graphs eliminate per-step kernel launch overhead by recording the entire sequence of CUDA operations once and replaying the recording on subsequent steps.
Why only for decode? Prefill has variable input lengths — every batch is a different shape, so the graph would need to be re-captured for each request. Decode always processes exactly one token per sequence, giving a fixed input shape for each batch size.
Capture strategy. Graphs are captured for batch sizes [1, 2, 4, 8, 16, 32, ...]. Capture happens in descending order so the memory pool created for the largest graph is reused by smaller ones.
def capture_cudagraph(self) -> None:
max_bs = self.config['max_num_seqs'] # requires 'max_num_seqs' key in config
batch_sizes = [1, 2, 4, 8] + list(range(16, max_bs + 1, 16))
graph_pool = None
for batch_size in reversed(batch_sizes): # largest first
graph = torch.cuda.CUDAGraph()
set_context(is_prefill=False, ...)
self.model(input_ids[:batch_size]) # warmup run
with torch.cuda.graph(graph, graph_pool):
outputs[:batch_size] = self.model(input_ids[:batch_size])
if graph_pool is None:
graph_pool = graph.pool() # share memory pool
self.graphs[batch_size] = graph
torch.cuda.synchronize()
reset_context()
self.graph_vars = dict(
input_ids=input_ids,
slot_mapping=slot_mapping,
context_lens=context_lens,
block_tables=block_tables,
outputs=outputs,
)
At inference time, run_model finds the smallest captured graph that is at least as large as the current batch, copies inputs into the pre-allocated buffers, and calls graph.replay().
torch.compile and CUDA graphs are complementary: @torch.compile fuses multiple operations into fewer CUDA kernels, while CUDA graphs eliminate the CPU overhead of launching those kernels on every decode step.
allocate_kv_cache
Before capturing graphs, the model runner allocates the entire KV cache as one large tensor and distributes slices to each Attention module.
def allocate_kv_cache(self):
# Determine available GPU memory after model weights
free_mem, _ = torch.cuda.mem_get_info()
available_mem = free_mem * gpu_memory_utilization - peak_overhead
# Each block stores block_size tokens for all layers and KV heads
block_bytes = (
block_size * 2 * num_layers * num_kv_heads * head_dim
* dtype.itemsize
)
num_blocks = int(available_mem // block_bytes)
# Allocate one contiguous tensor for all blocks
kv_cache = torch.zeros(
2, num_layers, num_blocks, block_size, num_kv_heads, head_dim
)
# Distribute slices to each Attention layer
for layer_id, module in enumerate(attention_modules):
module.k_cache = kv_cache[0, layer_id]
module.v_cache = kv_cache[1, layer_id]
When world_size > 1, each rank computes its own num_blocks from local free memory. A dist.all_reduce(MIN) ensures all ranks agree on the most conservative limit, preventing OOM on the most memory-constrained GPU.
Initialization order
class LLMEngine:
def __init__(self, config):
# 1. Spawn worker processes (rank 1 … N)
for i in range(1, world_size):
process = ctx.Process(target=worker_process, args=(config, i, event))
process.start()
# 2. Initialize rank-0 ModelRunner
# → calls dist.init_process_group (collective barrier, blocks until
# all workers have also called it)
# → warmup_model, allocate_kv_cache, capture_cudagraph
self.model_runner = ModelRunner(config, rank=0, event=self.events)
# 3. Initialize Scheduler AFTER ModelRunner returns
# (max_cached_blocks is now set by allocate_kv_cache)
self.scheduler = Scheduler(
max_cached_blocks=config['max_cached_blocks'], ...
)
The scheduler must be created after ModelRunner.__init__ because allocate_kv_cache writes the final max_cached_blocks value into the config dict that the scheduler reads.