Skip to main content
The engine is the runtime layer that sits between user-facing API calls and the raw model forward pass. It orchestrates scheduling, memory, batching, and (optionally) multiple GPU processes.

Component overview

LLMEngine
├── Scheduler
│   ├── BlockManager  (KV cache block allocation + prefix caching)
│   └── Sequence[]    (waiting / running / finished queues)
└── ModelRunner  (rank 0)
    └── ModelRunner  (rank 1 … N, worker processes)
ComponentResponsibility
LLMEnginePublic API; owns the main loop
SchedulerDecides which sequences run each step
BlockManagerAllocates and frees paged KV cache blocks
SequenceTracks token IDs, block table, and status for one request
ModelRunnerPrepares tensors, executes the model, samples tokens

Request lifecycle

1

add_prompt

The caller passes a prompt string. LLMEngine.add_prompt tokenizes it and wraps the token IDs in a Sequence object, then hands it to the scheduler.
def add_prompt(self, prompt: str, sampling_params: SamplingParams) -> None:
    self.scheduler.add_sequence(
        Sequence(
            token_ids=self.tokenizer.encode(prompt),
            block_size=self.config['block_size'],
            sampling_params=sampling_params,
        )
    )
The sequence enters the waiting deque with status WAITING.
2

schedule

Scheduler.schedule() is called at the start of each step. It returns a list of sequences to run and a boolean indicating whether this is a prefill step.Prefill is always tried first. If any waiting sequence can fit within the token budget and KV cache budget, it is moved to running and included in the batch. Only if no prefill is possible does the scheduler schedule decode steps for already-running sequences.
def schedule(self) -> tuple[list[Sequence], bool]:
    # 1. Try to schedule prefill from waiting queue
    while self.waiting:
        seq = self.waiting[0]
        if self.block_manager.can_allocate(seq) and fits_token_budget:
            self.block_manager.allocate(seq)
            seq.status = SequenceStatus.RUNNING
            scheduled.append(seq)
        else:
            break
    if scheduled:
        return scheduled, True  # is_prefill=True

    # 2. Fall back to decode
    while self.running:
        if self.block_manager.can_append(seq):
            self.block_manager.append(seq)
            scheduled.append(seq)
        else:
            self.preempt(seq)   # evict to waiting queue
    return scheduled, False     # is_prefill=False
3

model_runner.run

ModelRunner.run prepares the input tensors and executes the model:
def run(self, seqs: list[Sequence], is_prefill: bool) -> list[int]:
    input_ids = self.prepare_prefill(seqs) if is_prefill \
                else self.prepare_decode(seqs)
    logits = self.run_model(input_ids, is_prefill)
    token_ids = None
    if self.rank == 0:
        token_ids = self.sampler(logits, self.prepare_sample(seqs))
    reset_context()
    return token_ids
Worker processes (rank > 0) participate in the model forward pass via collective operations but do not sample.
4

postprocess

Scheduler.postprocess appends the new token to each sequence and checks stopping conditions.
def postprocess(self, seqs, token_ids):
    for seq, token_id in zip(seqs, token_ids):
        seq.append_token(token_id)
        stop = (
            (not seq.ignore_eos and token_id == self.eos) or
            seq.num_completion_tokens >= seq.max_tokens or
            seq.num_tokens >= seq.max_model_length
        )
        if stop:
            seq.status = SequenceStatus.FINISHED
            self.block_manager.deallocate(seq)
            self.running.remove(seq)

The generate() method

generate is the top-level entry point for batch inference.
def generate(
    self, prompts: list[str], sampling_params: SamplingParams
) -> list[str]:
    for prompt in prompts:
        self.add_prompt(prompt, sampling_params)

    generated_tokens = {}
    while not self.scheduler.is_finished():
        outputs, num_processed_tokens, is_prefill = self.step()
        generated_tokens.update({seq_id: tokens for seq_id, tokens in outputs})

    # Decode token IDs back to strings, preserving input order
    generated_tokens = [
        generated_tokens[seq_id] for seq_id in sorted(generated_tokens.keys())
    ]
    return {
        'text': [self.tokenizer.decode(tokens) for tokens in generated_tokens],
        'token_ids': generated_tokens,
    }
The loop calls step() until scheduler.is_finished() returns True (both waiting and running queues are empty). Results are collected as sequences finish and returned in the original prompt order.

The step() method

def step(self) -> tuple[list[tuple[int, list[int]]], int, bool]:
    scheduled_sequences, is_prefill = self.scheduler.schedule()
    if not scheduled_sequences:
        return [], 0, is_prefill
    outputs = self.model_runner.call("run", scheduled_sequences, is_prefill)
    if outputs is not None:
        outputs = outputs.cpu().tolist()
    self.scheduler.postprocess(scheduled_sequences, outputs)
    finished = [
        (seq.seq_id, seq.completion_token_ids)
        for seq in scheduled_sequences if seq.is_finished
    ]
    # prefill: count all tokens; decode: count one per sequence
    num_processed_tokens = (
        sum(len(seq) for seq in scheduled_sequences)
        if is_prefill else len(scheduled_sequences)
    )
    return finished, num_processed_tokens, is_prefill
model_runner.call("run", ...) dispatches to the correct method on both rank 0 and worker ranks via shared memory (see Multi-GPU Inference for details).

ModelRunner responsibilities

prepare_prefill

Builds a flat 1-D input_ids tensor by concatenating all token IDs from all sequences (excluding already-cached prefix tokens). Also computes:
  • cu_seqlens_q — cumulative query sequence lengths, e.g. [0, 5, 8, 12]
  • slot_mapping — physical cache slot for each new token to write into
  • block_tables — mapping from logical block index to physical block ID (for cross-sequence prefix reuse)
def prepare_prefill(self, seqs: list[Sequence]) -> torch.Tensor:
    input_ids, slot_mappings, cu_seqlens_q = [], [], [0]
    for seq in seqs:
        input_ids.extend(seq.token_ids[seq.num_cached_tokens:])
        cu_seqlens_q.append(cu_seqlens_q[-1] + len(seq) - seq.num_cached_tokens)
        # ... slot_mapping for new blocks
    set_context(is_prefill=True, cu_seqlens_q=..., slot_mapping=..., ...)
    return torch.tensor(input_ids, ...).cuda(non_blocking=True)
pin_memory=True and cuda(non_blocking=True) are used together so the CPU→GPU transfer happens asynchronously via DMA, overlapping with other CPU work.

prepare_decode

For decode, each sequence contributes exactly one token (its most recently generated token). The method produces:
  • input_ids of shape (batch_size,)
  • context_lens — total tokens processed so far per sequence
  • slot_mapping — the single new cache slot for each sequence
  • block_tables — full block table for reading KV history

run_model

@torch.inference_mode()
def run_model(self, input_ids: torch.Tensor, is_prefill: bool) -> torch.Tensor:
    if is_prefill or self.enforce_eager:
        hidden_states = self.model(input_ids)
        return self.model.compute_logits(hidden_states)
    else:
        # Decode: use the pre-captured CUDA graph
        bs = input_ids.size(0)
        graph = self.graphs[next(bs_ for bs_ in self.graphs if bs_ >= bs)]
        vars = self.graph_vars
        vars['input_ids'][:bs].copy_(input_ids)
        vars['context_lens'][:bs].copy_(context.context_lens)
        vars['block_tables'][:bs].copy_(context.block_tables)
        graph.replay()
        return self.model.compute_logits(vars['outputs'][:bs])

CUDA graph optimization

CUDA graphs eliminate per-step kernel launch overhead by recording the entire sequence of CUDA operations once and replaying the recording on subsequent steps. Why only for decode? Prefill has variable input lengths — every batch is a different shape, so the graph would need to be re-captured for each request. Decode always processes exactly one token per sequence, giving a fixed input shape for each batch size. Capture strategy. Graphs are captured for batch sizes [1, 2, 4, 8, 16, 32, ...]. Capture happens in descending order so the memory pool created for the largest graph is reused by smaller ones.
def capture_cudagraph(self) -> None:
    max_bs = self.config['max_num_seqs']  # requires 'max_num_seqs' key in config
    batch_sizes = [1, 2, 4, 8] + list(range(16, max_bs + 1, 16))
    graph_pool = None

    for batch_size in reversed(batch_sizes):   # largest first
        graph = torch.cuda.CUDAGraph()
        set_context(is_prefill=False, ...)
        self.model(input_ids[:batch_size])     # warmup run

        with torch.cuda.graph(graph, graph_pool):
            outputs[:batch_size] = self.model(input_ids[:batch_size])
            if graph_pool is None:
                graph_pool = graph.pool()      # share memory pool
        self.graphs[batch_size] = graph
        torch.cuda.synchronize()
        reset_context()

    self.graph_vars = dict(
        input_ids=input_ids,
        slot_mapping=slot_mapping,
        context_lens=context_lens,
        block_tables=block_tables,
        outputs=outputs,
    )
At inference time, run_model finds the smallest captured graph that is at least as large as the current batch, copies inputs into the pre-allocated buffers, and calls graph.replay().
torch.compile and CUDA graphs are complementary: @torch.compile fuses multiple operations into fewer CUDA kernels, while CUDA graphs eliminate the CPU overhead of launching those kernels on every decode step.

allocate_kv_cache

Before capturing graphs, the model runner allocates the entire KV cache as one large tensor and distributes slices to each Attention module.
def allocate_kv_cache(self):
    # Determine available GPU memory after model weights
    free_mem, _ = torch.cuda.mem_get_info()
    available_mem = free_mem * gpu_memory_utilization - peak_overhead

    # Each block stores block_size tokens for all layers and KV heads
    block_bytes = (
        block_size * 2 * num_layers * num_kv_heads * head_dim
        * dtype.itemsize
    )
    num_blocks = int(available_mem // block_bytes)

    # Allocate one contiguous tensor for all blocks
    kv_cache = torch.zeros(
        2, num_layers, num_blocks, block_size, num_kv_heads, head_dim
    )
    # Distribute slices to each Attention layer
    for layer_id, module in enumerate(attention_modules):
        module.k_cache = kv_cache[0, layer_id]
        module.v_cache = kv_cache[1, layer_id]
When world_size > 1, each rank computes its own num_blocks from local free memory. A dist.all_reduce(MIN) ensures all ranks agree on the most conservative limit, preventing OOM on the most memory-constrained GPU.

Initialization order

class LLMEngine:
    def __init__(self, config):
        # 1. Spawn worker processes (rank 1 … N)
        for i in range(1, world_size):
            process = ctx.Process(target=worker_process, args=(config, i, event))
            process.start()

        # 2. Initialize rank-0 ModelRunner
        #    → calls dist.init_process_group (collective barrier, blocks until
        #      all workers have also called it)
        #    → warmup_model, allocate_kv_cache, capture_cudagraph
        self.model_runner = ModelRunner(config, rank=0, event=self.events)

        # 3. Initialize Scheduler AFTER ModelRunner returns
        #    (max_cached_blocks is now set by allocate_kv_cache)
        self.scheduler = Scheduler(
            max_cached_blocks=config['max_cached_blocks'], ...
        )
The scheduler must be created after ModelRunner.__init__ because allocate_kv_cache writes the final max_cached_blocks value into the config dict that the scheduler reads.