Skip to content

vllm.distributed.kv_transfer.kv_connector.v1

Modules:

Name Description
base

KVConnectorBase_V1 Class for Distributed KV Cache & Hidden State

decode_bench_connector

DecodeBenchConnector: A KV Connector for decode instance performance testing.

lmcache_connector
lmcache_integration
metrics
multi_connector
nixl_connector
offloading_connector
p2p
shared_storage_connector

__all__ module-attribute

__all__ = [
    "KVConnectorRole",
    "KVConnectorBase_V1",
    "supports_hma",
    "SupportsHMA",
    "DecodeBenchConnector",
]

DecodeBenchConnector

Bases: KVConnectorBase_V1

A KV Connector for decode instance performance testing.

This connector fills the KV cache with dummy (non-zero) values to emulate a prefill-decode disaggregated setting, enabling performance testing of the decoder with larger input sequence lengths.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
class DecodeBenchConnector(KVConnectorBase_V1):
    """
    A KV Connector for decode instance performance testing.

    This connector fills the KV cache with dummy (non-zero) values to
    emulate a prefill-decode disaggregated setting, enabling performance
    testing of the decoder with larger input sequence lengths.
    """

    def __init__(
        self,
        vllm_config: "VllmConfig",
        role: KVConnectorRole,
        kv_cache_config: Optional["KVCacheConfig"] = None,
    ):
        super().__init__(vllm_config, role, kv_cache_config)

        self.connector_scheduler: DecodeBenchConnectorScheduler | None = None
        self.connector_worker: DecodeBenchConnectorWorker | None = None

        if role == KVConnectorRole.SCHEDULER:
            self.connector_scheduler = DecodeBenchConnectorScheduler(vllm_config)
        elif role == KVConnectorRole.WORKER:
            self.connector_worker = DecodeBenchConnectorWorker(vllm_config)

    # ==============================
    # Worker-side methods
    # ==============================

    def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
        assert self.connector_worker is not None
        self.connector_worker.register_kv_caches(kv_caches)

    def start_load_kv(self, forward_context: "ForwardContext", **kwargs: Any) -> None:
        assert self.connector_worker is not None
        assert isinstance(self._connector_metadata, DecodeBenchConnectorMetadata)
        self.connector_worker.start_fill_kv(self._connector_metadata)

    def wait_for_layer_load(self, layer_name: str) -> None:
        # All operations are synchronous, so nothing to wait for
        pass

    def save_kv_layer(
        self,
        layer_name: str,
        kv_layer: torch.Tensor,
        attn_metadata: "AttentionMetadata",
        **kwargs: Any,
    ) -> None:
        # This connector doesn't save KV cache (benchmarking only)
        pass

    def wait_for_save(self):
        # This connector doesn't save KV cache (benchmarking only)
        pass

    # ==============================
    # Scheduler-side methods
    # ==============================

    def get_num_new_matched_tokens(
        self,
        request: "Request",
        num_computed_tokens: int,
    ) -> tuple[int | None, bool]:
        assert self.connector_scheduler is not None
        return self.connector_scheduler.get_num_new_matched_tokens(
            request, num_computed_tokens
        )

    def update_state_after_alloc(
        self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
    ):
        assert self.connector_scheduler is not None
        return self.connector_scheduler.update_state_after_alloc(
            request, blocks, num_external_tokens
        )

    def build_connector_meta(
        self, scheduler_output: "SchedulerOutput"
    ) -> KVConnectorMetadata:
        assert self.connector_scheduler is not None
        return self.connector_scheduler.build_connector_meta(scheduler_output)

    def request_finished(
        self,
        request: "Request",
        block_ids: list[int],
    ) -> tuple[bool, dict[str, Any] | None]:
        assert self.connector_scheduler is not None
        self.connector_scheduler.request_finished(request)
        return False, None

connector_scheduler instance-attribute

connector_scheduler: (
    DecodeBenchConnectorScheduler | None
) = None

connector_worker instance-attribute

connector_worker: DecodeBenchConnectorWorker | None = None

__init__

__init__(
    vllm_config: VllmConfig,
    role: KVConnectorRole,
    kv_cache_config: Optional[KVCacheConfig] = None,
)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def __init__(
    self,
    vllm_config: "VllmConfig",
    role: KVConnectorRole,
    kv_cache_config: Optional["KVCacheConfig"] = None,
):
    super().__init__(vllm_config, role, kv_cache_config)

    self.connector_scheduler: DecodeBenchConnectorScheduler | None = None
    self.connector_worker: DecodeBenchConnectorWorker | None = None

    if role == KVConnectorRole.SCHEDULER:
        self.connector_scheduler = DecodeBenchConnectorScheduler(vllm_config)
    elif role == KVConnectorRole.WORKER:
        self.connector_worker = DecodeBenchConnectorWorker(vllm_config)

build_connector_meta

build_connector_meta(
    scheduler_output: SchedulerOutput,
) -> KVConnectorMetadata
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def build_connector_meta(
    self, scheduler_output: "SchedulerOutput"
) -> KVConnectorMetadata:
    assert self.connector_scheduler is not None
    return self.connector_scheduler.build_connector_meta(scheduler_output)

get_num_new_matched_tokens

get_num_new_matched_tokens(
    request: Request, num_computed_tokens: int
) -> tuple[int | None, bool]
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def get_num_new_matched_tokens(
    self,
    request: "Request",
    num_computed_tokens: int,
) -> tuple[int | None, bool]:
    assert self.connector_scheduler is not None
    return self.connector_scheduler.get_num_new_matched_tokens(
        request, num_computed_tokens
    )

register_kv_caches

register_kv_caches(kv_caches: dict[str, Tensor])
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
    assert self.connector_worker is not None
    self.connector_worker.register_kv_caches(kv_caches)

request_finished

request_finished(
    request: Request, block_ids: list[int]
) -> tuple[bool, dict[str, Any] | None]
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def request_finished(
    self,
    request: "Request",
    block_ids: list[int],
) -> tuple[bool, dict[str, Any] | None]:
    assert self.connector_scheduler is not None
    self.connector_scheduler.request_finished(request)
    return False, None

save_kv_layer

save_kv_layer(
    layer_name: str,
    kv_layer: Tensor,
    attn_metadata: AttentionMetadata,
    **kwargs: Any,
) -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def save_kv_layer(
    self,
    layer_name: str,
    kv_layer: torch.Tensor,
    attn_metadata: "AttentionMetadata",
    **kwargs: Any,
) -> None:
    # This connector doesn't save KV cache (benchmarking only)
    pass

start_load_kv

start_load_kv(
    forward_context: ForwardContext, **kwargs: Any
) -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def start_load_kv(self, forward_context: "ForwardContext", **kwargs: Any) -> None:
    assert self.connector_worker is not None
    assert isinstance(self._connector_metadata, DecodeBenchConnectorMetadata)
    self.connector_worker.start_fill_kv(self._connector_metadata)

update_state_after_alloc

update_state_after_alloc(
    request: Request,
    blocks: KVCacheBlocks,
    num_external_tokens: int,
)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def update_state_after_alloc(
    self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
):
    assert self.connector_scheduler is not None
    return self.connector_scheduler.update_state_after_alloc(
        request, blocks, num_external_tokens
    )

wait_for_layer_load

wait_for_layer_load(layer_name: str) -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def wait_for_layer_load(self, layer_name: str) -> None:
    # All operations are synchronous, so nothing to wait for
    pass

wait_for_save

wait_for_save()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/decode_bench_connector.py
def wait_for_save(self):
    # This connector doesn't save KV cache (benchmarking only)
    pass

KVConnectorBase_V1

Bases: ABC

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class KVConnectorBase_V1(ABC):
    def __init__(
        self,
        vllm_config: "VllmConfig",
        role: KVConnectorRole,
        kv_cache_config: Optional["KVCacheConfig"] = None,
    ):
        logger.warning(
            "Initializing KVConnectorBase_V1. This API is experimental and "
            "subject to change in the future as we iterate the design."
        )
        self._connector_metadata: KVConnectorMetadata | None = None
        self._vllm_config = vllm_config
        if vllm_config.kv_transfer_config is not None:
            self._kv_transfer_config = vllm_config.kv_transfer_config
        else:
            raise ValueError("kv_transfer_config must be set for KVConnectorBase_V1")
        self._kv_cache_config = kv_cache_config
        if self._kv_cache_config is None:
            logger.warning(
                "KVConnectorBase_V1 initialized without kv_cache_config. "
                "This is deprecated - please update your connector to accept "
                "kv_cache_config as the third constructor argument and pass it "
                "to super().__init__()."
            )
        self._role = role

    @property
    def role(self) -> KVConnectorRole:
        return self._role

    # ==============================
    # Worker-side methods
    # ==============================

    def bind_connector_metadata(self, connector_metadata: KVConnectorMetadata) -> None:
        """Set the connector metadata from the scheduler.

        This function should be called by the model runner every time
        before the model execution. The metadata will be used for runtime
        KV cache loading and saving.

        Args:
            connector_metadata (dict): the connector metadata.
        """
        self._connector_metadata = connector_metadata

    def clear_connector_metadata(self) -> None:
        """Clear the connector metadata.

        This function should be called by the model runner every time
        after the model execution.
        """
        self._connector_metadata = None

    def _get_connector_metadata(self) -> KVConnectorMetadata:
        """Get the connector metadata.

        This function should only be called inside the connector.

        Returns:
            ConnectorMetadata: the connector metadata.
        """

        # Should only be called while set to valid metadata.
        assert self._connector_metadata is not None
        return self._connector_metadata

    def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
        """
        Initialize with the KV caches. Useful for pre-registering the
        KV Caches in the KVConnector (e.g. for NIXL).

        Args:
            kv_caches: dictionary of layer names, kv cache
        """
        return

    def set_host_xfer_buffer_ops(self, copy_operation: CopyBlocksOp):
        """
        Set the xPU-specific ops for copying KV between host and device.
        Needed when host buffer is used for kv transfer (e.g., in NixlConnector)
        """
        return

    @abstractmethod
    def start_load_kv(self, forward_context: "ForwardContext", **kwargs: Any) -> None:
        """
        Start loading the KV cache from the connector to vLLM's paged
        KV buffer. This is called from the forward context before the
        forward pass to enable async loading during model execution.

        Args:
            forward_context (ForwardContext): the forward context.
            **kwargs: additional arguments for the load operation

        Note:
            The number of elements in kv_caches and layer_names should be
            the same.

        """
        pass

    @abstractmethod
    def wait_for_layer_load(self, layer_name: str) -> None:
        """
        Block until the KV for a specific layer is loaded into vLLM's
        paged buffer. This is called from within attention layer to ensure
        async copying from start_load_kv is complete.

        This interface will be useful for layer-by-layer pipelining.

        Args:
            layer_name: the name of that layer
        """
        pass

    @abstractmethod
    def save_kv_layer(
        self,
        layer_name: str,
        kv_layer: torch.Tensor,
        attn_metadata: "AttentionMetadata",
        **kwargs: Any,
    ) -> None:
        """
        Start saving a layer of KV cache from vLLM's paged buffer
        to the connector. This is called from within attention layer to
        enable async copying during execution.

        Args:
            layer_name (str): the name of the layer.
            kv_layer (torch.Tensor): the paged KV buffer of the current
                layer in vLLM.
            attn_metadata (AttentionMetadata): the attention metadata.
            **kwargs: additional arguments for the save operation.
        """
        pass

    @abstractmethod
    def wait_for_save(self):
        """
        Block until all the save operations is done. This is called
        as the forward context exits to ensure that the async saving
        from save_kv_layer is complete before finishing the forward.

        This prevents overwrites of paged KV buffer before saving done.
        """
        pass

    def get_finished(
        self, finished_req_ids: set[str]
    ) -> tuple[set[str] | None, set[str] | None]:
        """
        Notifies worker-side connector ids of requests that have
        finished generating tokens on the worker.
        The scheduler process (via the Executors) will use this output
        to track which workers are done.

        Returns:
            ids of requests that have finished asynchronous transfer
            (requests that previously returned True from request_finished()),
            tuple of (sending/saving ids, recving/loading ids).
            The finished saves/sends req ids must belong to a set provided in a
            call to this method (this call or a prior one).
        """
        return None, None

    def get_block_ids_with_load_errors(self) -> set[int]:
        """
        Get the set of block IDs that failed to load.

        Returns:
            Set of block IDs that encountered load errors.
            Empty set if no load errors occurred.

        Notes:
            - Applies to both sync- and async-loading requests.
            - Async loading: failed blocks may be reported in any forward pass
              up to and including the pass where the request ID is returned by
              `get_finished()`. Even if failures occur, the request must still
              be reported via `get_finished()`, and the failed block IDs must
              appear here no later than that same pass.
            - Sync loading: failed blocks should be reported in the forward
              pass in which they are detected.
        """
        return set()

    def shutdown(self):
        """
        Shutdown the connector. This is called when the worker process
        is shutting down to ensure that all the async operations are
        completed and the connector is cleaned up properly.
        """
        return None

    def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
        """
        Get the KV connector stats collected during the last interval.
        """
        return None

    def get_handshake_metadata(self) -> KVConnectorHandshakeMetadata | None:
        """
        Get the KVConnector handshake metadata for this connector.
        This metadata is used for out-of-band connector handshake
        between P/D workers.

        Returns:
            KVConnectorHandshakeMetadata: the handshake metadata.
            None if no handshake metadata is available.
        """
        return None

    # ==============================
    # Scheduler-side methods
    # ==============================

    @abstractmethod
    def get_num_new_matched_tokens(
        self,
        request: "Request",
        num_computed_tokens: int,
    ) -> tuple[int | None, bool]:
        """
        Get number of new tokens that can be loaded from the
        external KV cache beyond the num_computed_tokens.

        Args:
            request (Request): the request object.
            num_computed_tokens (int): the number of locally
                computed tokens for this request

        Returns:
            A tuple with the following elements:
                - An optional number of tokens that can be loaded from the
                  external KV cache beyond what is already computed.
                  If None, it means that the connector needs more time to
                  determine the number of matched tokens, and the scheduler
                  should query for this request again later.
                - `True` if external KV cache tokens will be loaded
                  asynchronously (between scheduler steps). Must be
                  'False' if the first element is 0.

        Notes:
            The connector should only consider the largest prefix of prompt-
            tokens for which KV cache is actually available at the time of the
            call. If the cache cannot be loaded for some tokens (e.g., due to
            connectivity issues or eviction), those tokens must not be taken
            into account.
        """
        pass

    @abstractmethod
    def update_state_after_alloc(
        self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
    ):
        """
        Update KVConnector state after block allocation.

        If get_num_new_matched_tokens previously returned True for a
        request, this function may be called twice for that same request -
        first when blocks are allocated for the connector tokens to be
        asynchronously loaded into, and second when any additional blocks
        are allocated, after the load/transfer is complete.

        Args:
            request (Request): the request object.
            blocks (KVCacheBlocks): the blocks allocated for the request.
            num_external_tokens (int): the number of tokens that will be
                loaded from the external KV cache.
        """
        pass

    @abstractmethod
    def build_connector_meta(
        self, scheduler_output: SchedulerOutput
    ) -> KVConnectorMetadata:
        """
        Build the connector metadata for this step.

        This function should NOT modify fields in the scheduler_output.
        Also, calling this function will reset the state of the connector.

        Args:
            scheduler_output (SchedulerOutput): the scheduler output object.
        """
        pass

    def update_connector_output(self, connector_output: KVConnectorOutput):
        """
        Update KVConnector state from worker-side connectors output.

        Args:
            connector_output (KVConnectorOutput): the worker-side
                connectors output.
        """
        return

    def request_finished(
        self,
        request: "Request",
        block_ids: list[int],
    ) -> tuple[bool, dict[str, Any] | None]:
        """
        Called exactly once when a request has finished, before its blocks are
        freed.

        The connector may assumes responsibility for freeing the blocks
        asynchronously by returning True.

        Returns:
            True if the request is being saved/sent asynchronously and blocks
            should not be freed until the request_id is returned from
            get_finished().
            Optional KVTransferParams to be included in the request outputs
            returned by the engine.
        """
        return False, None

    def take_events(self) -> Iterable["KVCacheEvent"]:
        """
        Take the KV cache events from the connector.

        Yields:
            New KV cache events since the last call.
        """
        return ()

    @classmethod
    def get_required_kvcache_layout(cls, vllm_config: "VllmConfig") -> str | None:
        """
        Get the required KV cache layout for this connector.
        Args:
            vllm_config (VllmConfig): the vllm config.

        Returns:
            str: the required KV cache layout. e.g. HND, or NHD.
            None if the connector does not require a specific layout.
        """

        if cls is KVConnectorBase_V1:
            raise TypeError(
                "get_required_kvcache_layout should not be called "
                "on the abstract base class"
            )
        return None

    def get_finished_count(self) -> int | None:
        """
        Get the count of requests expected to complete send/receive operations
        via this connector. This method is used to initialize the
        KVOutputAggregator, overwriting the default world_size.

        Returns:
            int: expected sending or receiving completion count.
        """

        return None

    @classmethod
    def build_kv_connector_stats(
        cls, data: dict[str, Any] | None = None
    ) -> Optional["KVConnectorStats"]:
        """
        KVConnectorStats resolution method. This method allows dynamically
        registered connectors to return their own KVConnectorStats object,
        which can implement custom aggregation logic on the data dict.
        """
        return None

    def set_xfer_handshake_metadata(
        self, metadata: dict[int, KVConnectorHandshakeMetadata]
    ) -> None:
        """
        Set the KV connector handshake metadata for this connector.

        Args:
            metadata (KVConnectorHandshakeMetadata): the handshake metadata to set.
        """
        return None

    @classmethod
    def build_prom_metrics(
        cls,
        vllm_config: "VllmConfig",
        metric_types: dict[type["PromMetric"], type["PromMetricT"]],
        labelnames: list[str],
        per_engine_labelvalues: dict[int, list[str]],
    ) -> Optional["KVConnectorPromMetrics"]:
        """
        Create a KVConnectorPromMetrics subclass which should register
        per-connector Prometheus metrics and implement observe() to
        expose connector transfer stats via Prometheus.
        """
        return None

_connector_metadata instance-attribute

_connector_metadata: KVConnectorMetadata | None = None

_kv_cache_config instance-attribute

_kv_cache_config = kv_cache_config

_kv_transfer_config instance-attribute

_kv_transfer_config = kv_transfer_config

_role instance-attribute

_role = role

_vllm_config instance-attribute

_vllm_config = vllm_config

role property

__init__

__init__(
    vllm_config: VllmConfig,
    role: KVConnectorRole,
    kv_cache_config: Optional[KVCacheConfig] = None,
)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def __init__(
    self,
    vllm_config: "VllmConfig",
    role: KVConnectorRole,
    kv_cache_config: Optional["KVCacheConfig"] = None,
):
    logger.warning(
        "Initializing KVConnectorBase_V1. This API is experimental and "
        "subject to change in the future as we iterate the design."
    )
    self._connector_metadata: KVConnectorMetadata | None = None
    self._vllm_config = vllm_config
    if vllm_config.kv_transfer_config is not None:
        self._kv_transfer_config = vllm_config.kv_transfer_config
    else:
        raise ValueError("kv_transfer_config must be set for KVConnectorBase_V1")
    self._kv_cache_config = kv_cache_config
    if self._kv_cache_config is None:
        logger.warning(
            "KVConnectorBase_V1 initialized without kv_cache_config. "
            "This is deprecated - please update your connector to accept "
            "kv_cache_config as the third constructor argument and pass it "
            "to super().__init__()."
        )
    self._role = role

_get_connector_metadata

_get_connector_metadata() -> KVConnectorMetadata

Get the connector metadata.

This function should only be called inside the connector.

Returns:

Name Type Description
ConnectorMetadata KVConnectorMetadata

the connector metadata.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def _get_connector_metadata(self) -> KVConnectorMetadata:
    """Get the connector metadata.

    This function should only be called inside the connector.

    Returns:
        ConnectorMetadata: the connector metadata.
    """

    # Should only be called while set to valid metadata.
    assert self._connector_metadata is not None
    return self._connector_metadata

bind_connector_metadata

bind_connector_metadata(
    connector_metadata: KVConnectorMetadata,
) -> None

Set the connector metadata from the scheduler.

This function should be called by the model runner every time before the model execution. The metadata will be used for runtime KV cache loading and saving.

Parameters:

Name Type Description Default
connector_metadata dict

the connector metadata.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def bind_connector_metadata(self, connector_metadata: KVConnectorMetadata) -> None:
    """Set the connector metadata from the scheduler.

    This function should be called by the model runner every time
    before the model execution. The metadata will be used for runtime
    KV cache loading and saving.

    Args:
        connector_metadata (dict): the connector metadata.
    """
    self._connector_metadata = connector_metadata

build_connector_meta abstractmethod

build_connector_meta(
    scheduler_output: SchedulerOutput,
) -> KVConnectorMetadata

Build the connector metadata for this step.

This function should NOT modify fields in the scheduler_output. Also, calling this function will reset the state of the connector.

Parameters:

Name Type Description Default
scheduler_output SchedulerOutput

the scheduler output object.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def build_connector_meta(
    self, scheduler_output: SchedulerOutput
) -> KVConnectorMetadata:
    """
    Build the connector metadata for this step.

    This function should NOT modify fields in the scheduler_output.
    Also, calling this function will reset the state of the connector.

    Args:
        scheduler_output (SchedulerOutput): the scheduler output object.
    """
    pass

build_kv_connector_stats classmethod

build_kv_connector_stats(
    data: dict[str, Any] | None = None,
) -> Optional[KVConnectorStats]

KVConnectorStats resolution method. This method allows dynamically registered connectors to return their own KVConnectorStats object, which can implement custom aggregation logic on the data dict.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@classmethod
def build_kv_connector_stats(
    cls, data: dict[str, Any] | None = None
) -> Optional["KVConnectorStats"]:
    """
    KVConnectorStats resolution method. This method allows dynamically
    registered connectors to return their own KVConnectorStats object,
    which can implement custom aggregation logic on the data dict.
    """
    return None

build_prom_metrics classmethod

build_prom_metrics(
    vllm_config: VllmConfig,
    metric_types: dict[type[PromMetric], type[PromMetricT]],
    labelnames: list[str],
    per_engine_labelvalues: dict[int, list[str]],
) -> Optional[KVConnectorPromMetrics]

Create a KVConnectorPromMetrics subclass which should register per-connector Prometheus metrics and implement observe() to expose connector transfer stats via Prometheus.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@classmethod
def build_prom_metrics(
    cls,
    vllm_config: "VllmConfig",
    metric_types: dict[type["PromMetric"], type["PromMetricT"]],
    labelnames: list[str],
    per_engine_labelvalues: dict[int, list[str]],
) -> Optional["KVConnectorPromMetrics"]:
    """
    Create a KVConnectorPromMetrics subclass which should register
    per-connector Prometheus metrics and implement observe() to
    expose connector transfer stats via Prometheus.
    """
    return None

clear_connector_metadata

clear_connector_metadata() -> None

Clear the connector metadata.

This function should be called by the model runner every time after the model execution.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def clear_connector_metadata(self) -> None:
    """Clear the connector metadata.

    This function should be called by the model runner every time
    after the model execution.
    """
    self._connector_metadata = None

get_block_ids_with_load_errors

get_block_ids_with_load_errors() -> set[int]

Get the set of block IDs that failed to load.

Returns:

Type Description
set[int]

Set of block IDs that encountered load errors.

set[int]

Empty set if no load errors occurred.

Notes
  • Applies to both sync- and async-loading requests.
  • Async loading: failed blocks may be reported in any forward pass up to and including the pass where the request ID is returned by get_finished(). Even if failures occur, the request must still be reported via get_finished(), and the failed block IDs must appear here no later than that same pass.
  • Sync loading: failed blocks should be reported in the forward pass in which they are detected.
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def get_block_ids_with_load_errors(self) -> set[int]:
    """
    Get the set of block IDs that failed to load.

    Returns:
        Set of block IDs that encountered load errors.
        Empty set if no load errors occurred.

    Notes:
        - Applies to both sync- and async-loading requests.
        - Async loading: failed blocks may be reported in any forward pass
          up to and including the pass where the request ID is returned by
          `get_finished()`. Even if failures occur, the request must still
          be reported via `get_finished()`, and the failed block IDs must
          appear here no later than that same pass.
        - Sync loading: failed blocks should be reported in the forward
          pass in which they are detected.
    """
    return set()

get_finished

get_finished(
    finished_req_ids: set[str],
) -> tuple[set[str] | None, set[str] | None]

Notifies worker-side connector ids of requests that have finished generating tokens on the worker. The scheduler process (via the Executors) will use this output to track which workers are done.

Returns:

Type Description
set[str] | None

ids of requests that have finished asynchronous transfer

set[str] | None

(requests that previously returned True from request_finished()),

tuple[set[str] | None, set[str] | None]

tuple of (sending/saving ids, recving/loading ids).

tuple[set[str] | None, set[str] | None]

The finished saves/sends req ids must belong to a set provided in a

tuple[set[str] | None, set[str] | None]

call to this method (this call or a prior one).

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def get_finished(
    self, finished_req_ids: set[str]
) -> tuple[set[str] | None, set[str] | None]:
    """
    Notifies worker-side connector ids of requests that have
    finished generating tokens on the worker.
    The scheduler process (via the Executors) will use this output
    to track which workers are done.

    Returns:
        ids of requests that have finished asynchronous transfer
        (requests that previously returned True from request_finished()),
        tuple of (sending/saving ids, recving/loading ids).
        The finished saves/sends req ids must belong to a set provided in a
        call to this method (this call or a prior one).
    """
    return None, None

get_finished_count

get_finished_count() -> int | None

Get the count of requests expected to complete send/receive operations via this connector. This method is used to initialize the KVOutputAggregator, overwriting the default world_size.

Returns:

Name Type Description
int int | None

expected sending or receiving completion count.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def get_finished_count(self) -> int | None:
    """
    Get the count of requests expected to complete send/receive operations
    via this connector. This method is used to initialize the
    KVOutputAggregator, overwriting the default world_size.

    Returns:
        int: expected sending or receiving completion count.
    """

    return None

get_handshake_metadata

get_handshake_metadata() -> (
    KVConnectorHandshakeMetadata | None
)

Get the KVConnector handshake metadata for this connector. This metadata is used for out-of-band connector handshake between P/D workers.

Returns:

Name Type Description
KVConnectorHandshakeMetadata KVConnectorHandshakeMetadata | None

the handshake metadata.

KVConnectorHandshakeMetadata | None

None if no handshake metadata is available.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def get_handshake_metadata(self) -> KVConnectorHandshakeMetadata | None:
    """
    Get the KVConnector handshake metadata for this connector.
    This metadata is used for out-of-band connector handshake
    between P/D workers.

    Returns:
        KVConnectorHandshakeMetadata: the handshake metadata.
        None if no handshake metadata is available.
    """
    return None

get_kv_connector_stats

get_kv_connector_stats() -> Optional[KVConnectorStats]

Get the KV connector stats collected during the last interval.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
    """
    Get the KV connector stats collected during the last interval.
    """
    return None

get_num_new_matched_tokens abstractmethod

get_num_new_matched_tokens(
    request: Request, num_computed_tokens: int
) -> tuple[int | None, bool]

Get number of new tokens that can be loaded from the external KV cache beyond the num_computed_tokens.

Parameters:

Name Type Description Default
request Request

the request object.

required
num_computed_tokens int

the number of locally computed tokens for this request

required

Returns:

Type Description
tuple[int | None, bool]

A tuple with the following elements: - An optional number of tokens that can be loaded from the external KV cache beyond what is already computed. If None, it means that the connector needs more time to determine the number of matched tokens, and the scheduler should query for this request again later. - True if external KV cache tokens will be loaded asynchronously (between scheduler steps). Must be 'False' if the first element is 0.

Notes

The connector should only consider the largest prefix of prompt- tokens for which KV cache is actually available at the time of the call. If the cache cannot be loaded for some tokens (e.g., due to connectivity issues or eviction), those tokens must not be taken into account.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def get_num_new_matched_tokens(
    self,
    request: "Request",
    num_computed_tokens: int,
) -> tuple[int | None, bool]:
    """
    Get number of new tokens that can be loaded from the
    external KV cache beyond the num_computed_tokens.

    Args:
        request (Request): the request object.
        num_computed_tokens (int): the number of locally
            computed tokens for this request

    Returns:
        A tuple with the following elements:
            - An optional number of tokens that can be loaded from the
              external KV cache beyond what is already computed.
              If None, it means that the connector needs more time to
              determine the number of matched tokens, and the scheduler
              should query for this request again later.
            - `True` if external KV cache tokens will be loaded
              asynchronously (between scheduler steps). Must be
              'False' if the first element is 0.

    Notes:
        The connector should only consider the largest prefix of prompt-
        tokens for which KV cache is actually available at the time of the
        call. If the cache cannot be loaded for some tokens (e.g., due to
        connectivity issues or eviction), those tokens must not be taken
        into account.
    """
    pass

get_required_kvcache_layout classmethod

get_required_kvcache_layout(
    vllm_config: VllmConfig,
) -> str | None

Get the required KV cache layout for this connector. Args: vllm_config (VllmConfig): the vllm config.

Returns:

Name Type Description
str str | None

the required KV cache layout. e.g. HND, or NHD.

str | None

None if the connector does not require a specific layout.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@classmethod
def get_required_kvcache_layout(cls, vllm_config: "VllmConfig") -> str | None:
    """
    Get the required KV cache layout for this connector.
    Args:
        vllm_config (VllmConfig): the vllm config.

    Returns:
        str: the required KV cache layout. e.g. HND, or NHD.
        None if the connector does not require a specific layout.
    """

    if cls is KVConnectorBase_V1:
        raise TypeError(
            "get_required_kvcache_layout should not be called "
            "on the abstract base class"
        )
    return None

register_kv_caches

register_kv_caches(kv_caches: dict[str, Tensor])

Initialize with the KV caches. Useful for pre-registering the KV Caches in the KVConnector (e.g. for NIXL).

Parameters:

Name Type Description Default
kv_caches dict[str, Tensor]

dictionary of layer names, kv cache

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
    """
    Initialize with the KV caches. Useful for pre-registering the
    KV Caches in the KVConnector (e.g. for NIXL).

    Args:
        kv_caches: dictionary of layer names, kv cache
    """
    return

request_finished

request_finished(
    request: Request, block_ids: list[int]
) -> tuple[bool, dict[str, Any] | None]

Called exactly once when a request has finished, before its blocks are freed.

The connector may assumes responsibility for freeing the blocks asynchronously by returning True.

Returns:

Type Description
bool

True if the request is being saved/sent asynchronously and blocks

dict[str, Any] | None

should not be freed until the request_id is returned from

tuple[bool, dict[str, Any] | None]

get_finished().

tuple[bool, dict[str, Any] | None]

Optional KVTransferParams to be included in the request outputs

tuple[bool, dict[str, Any] | None]

returned by the engine.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def request_finished(
    self,
    request: "Request",
    block_ids: list[int],
) -> tuple[bool, dict[str, Any] | None]:
    """
    Called exactly once when a request has finished, before its blocks are
    freed.

    The connector may assumes responsibility for freeing the blocks
    asynchronously by returning True.

    Returns:
        True if the request is being saved/sent asynchronously and blocks
        should not be freed until the request_id is returned from
        get_finished().
        Optional KVTransferParams to be included in the request outputs
        returned by the engine.
    """
    return False, None

save_kv_layer abstractmethod

save_kv_layer(
    layer_name: str,
    kv_layer: Tensor,
    attn_metadata: AttentionMetadata,
    **kwargs: Any,
) -> None

Start saving a layer of KV cache from vLLM's paged buffer to the connector. This is called from within attention layer to enable async copying during execution.

Parameters:

Name Type Description Default
layer_name str

the name of the layer.

required
kv_layer Tensor

the paged KV buffer of the current layer in vLLM.

required
attn_metadata AttentionMetadata

the attention metadata.

required
**kwargs Any

additional arguments for the save operation.

{}
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def save_kv_layer(
    self,
    layer_name: str,
    kv_layer: torch.Tensor,
    attn_metadata: "AttentionMetadata",
    **kwargs: Any,
) -> None:
    """
    Start saving a layer of KV cache from vLLM's paged buffer
    to the connector. This is called from within attention layer to
    enable async copying during execution.

    Args:
        layer_name (str): the name of the layer.
        kv_layer (torch.Tensor): the paged KV buffer of the current
            layer in vLLM.
        attn_metadata (AttentionMetadata): the attention metadata.
        **kwargs: additional arguments for the save operation.
    """
    pass

set_host_xfer_buffer_ops

set_host_xfer_buffer_ops(copy_operation: CopyBlocksOp)

Set the xPU-specific ops for copying KV between host and device. Needed when host buffer is used for kv transfer (e.g., in NixlConnector)

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def set_host_xfer_buffer_ops(self, copy_operation: CopyBlocksOp):
    """
    Set the xPU-specific ops for copying KV between host and device.
    Needed when host buffer is used for kv transfer (e.g., in NixlConnector)
    """
    return

set_xfer_handshake_metadata

set_xfer_handshake_metadata(
    metadata: dict[int, KVConnectorHandshakeMetadata],
) -> None

Set the KV connector handshake metadata for this connector.

Parameters:

Name Type Description Default
metadata KVConnectorHandshakeMetadata

the handshake metadata to set.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def set_xfer_handshake_metadata(
    self, metadata: dict[int, KVConnectorHandshakeMetadata]
) -> None:
    """
    Set the KV connector handshake metadata for this connector.

    Args:
        metadata (KVConnectorHandshakeMetadata): the handshake metadata to set.
    """
    return None

shutdown

shutdown()

Shutdown the connector. This is called when the worker process is shutting down to ensure that all the async operations are completed and the connector is cleaned up properly.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def shutdown(self):
    """
    Shutdown the connector. This is called when the worker process
    is shutting down to ensure that all the async operations are
    completed and the connector is cleaned up properly.
    """
    return None

start_load_kv abstractmethod

start_load_kv(
    forward_context: ForwardContext, **kwargs: Any
) -> None

Start loading the KV cache from the connector to vLLM's paged KV buffer. This is called from the forward context before the forward pass to enable async loading during model execution.

Parameters:

Name Type Description Default
forward_context ForwardContext

the forward context.

required
**kwargs Any

additional arguments for the load operation

{}
Note

The number of elements in kv_caches and layer_names should be the same.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def start_load_kv(self, forward_context: "ForwardContext", **kwargs: Any) -> None:
    """
    Start loading the KV cache from the connector to vLLM's paged
    KV buffer. This is called from the forward context before the
    forward pass to enable async loading during model execution.

    Args:
        forward_context (ForwardContext): the forward context.
        **kwargs: additional arguments for the load operation

    Note:
        The number of elements in kv_caches and layer_names should be
        the same.

    """
    pass

take_events

take_events() -> Iterable[KVCacheEvent]

Take the KV cache events from the connector.

Yields:

Type Description
Iterable[KVCacheEvent]

New KV cache events since the last call.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def take_events(self) -> Iterable["KVCacheEvent"]:
    """
    Take the KV cache events from the connector.

    Yields:
        New KV cache events since the last call.
    """
    return ()

update_connector_output

update_connector_output(
    connector_output: KVConnectorOutput,
)

Update KVConnector state from worker-side connectors output.

Parameters:

Name Type Description Default
connector_output KVConnectorOutput

the worker-side connectors output.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def update_connector_output(self, connector_output: KVConnectorOutput):
    """
    Update KVConnector state from worker-side connectors output.

    Args:
        connector_output (KVConnectorOutput): the worker-side
            connectors output.
    """
    return

update_state_after_alloc abstractmethod

update_state_after_alloc(
    request: Request,
    blocks: KVCacheBlocks,
    num_external_tokens: int,
)

Update KVConnector state after block allocation.

If get_num_new_matched_tokens previously returned True for a request, this function may be called twice for that same request - first when blocks are allocated for the connector tokens to be asynchronously loaded into, and second when any additional blocks are allocated, after the load/transfer is complete.

Parameters:

Name Type Description Default
request Request

the request object.

required
blocks KVCacheBlocks

the blocks allocated for the request.

required
num_external_tokens int

the number of tokens that will be loaded from the external KV cache.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def update_state_after_alloc(
    self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
):
    """
    Update KVConnector state after block allocation.

    If get_num_new_matched_tokens previously returned True for a
    request, this function may be called twice for that same request -
    first when blocks are allocated for the connector tokens to be
    asynchronously loaded into, and second when any additional blocks
    are allocated, after the load/transfer is complete.

    Args:
        request (Request): the request object.
        blocks (KVCacheBlocks): the blocks allocated for the request.
        num_external_tokens (int): the number of tokens that will be
            loaded from the external KV cache.
    """
    pass

wait_for_layer_load abstractmethod

wait_for_layer_load(layer_name: str) -> None

Block until the KV for a specific layer is loaded into vLLM's paged buffer. This is called from within attention layer to ensure async copying from start_load_kv is complete.

This interface will be useful for layer-by-layer pipelining.

Parameters:

Name Type Description Default
layer_name str

the name of that layer

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def wait_for_layer_load(self, layer_name: str) -> None:
    """
    Block until the KV for a specific layer is loaded into vLLM's
    paged buffer. This is called from within attention layer to ensure
    async copying from start_load_kv is complete.

    This interface will be useful for layer-by-layer pipelining.

    Args:
        layer_name: the name of that layer
    """
    pass

wait_for_save abstractmethod

wait_for_save()

Block until all the save operations is done. This is called as the forward context exits to ensure that the async saving from save_kv_layer is complete before finishing the forward.

This prevents overwrites of paged KV buffer before saving done.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def wait_for_save(self):
    """
    Block until all the save operations is done. This is called
    as the forward context exits to ensure that the async saving
    from save_kv_layer is complete before finishing the forward.

    This prevents overwrites of paged KV buffer before saving done.
    """
    pass

KVConnectorRole

Bases: Enum

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
class KVConnectorRole(enum.Enum):
    # Connector running in the scheduler process
    SCHEDULER = 0

    # Connector running in the worker process
    WORKER = 1

SCHEDULER class-attribute instance-attribute

SCHEDULER = 0

WORKER class-attribute instance-attribute

WORKER = 1

SupportsHMA

Bases: ABC

The class that indicates the corresponding connector supports hybrid memory allocator (HMA). This is required to use the connector together with hybrid memory allocator.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
class SupportsHMA(ABC):
    """
    The class that indicates the corresponding connector supports hybrid memory
    allocator (HMA).
    This is required to use the connector together with hybrid memory allocator.
    """

    @abstractmethod
    def request_finished_all_groups(
        self,
        request: "Request",
        block_ids: tuple[list[int], ...],
    ) -> tuple[bool, dict[str, Any] | None]:
        """
        Called exactly once when a request has finished for all kv cache groups,
        before its blocks are freed for each group.

        NOTE(Kuntai): This function is only supported by connectors that support HMA.

        The connector may assumes responsibility for freeing the blocks
        asynchronously by returning True.

        Returns:
            True if the request is being saved/sent asynchronously and blocks
            should not be freed until the request_id is returned from
            get_finished().
            Optional KVTransferParams to be included in the request outputs
            returned by the engine.
        """
        raise NotImplementedError

request_finished_all_groups abstractmethod

request_finished_all_groups(
    request: Request, block_ids: tuple[list[int], ...]
) -> tuple[bool, dict[str, Any] | None]

Called exactly once when a request has finished for all kv cache groups, before its blocks are freed for each group.

NOTE(Kuntai): This function is only supported by connectors that support HMA.

The connector may assumes responsibility for freeing the blocks asynchronously by returning True.

Returns:

Type Description
bool

True if the request is being saved/sent asynchronously and blocks

dict[str, Any] | None

should not be freed until the request_id is returned from

tuple[bool, dict[str, Any] | None]

get_finished().

tuple[bool, dict[str, Any] | None]

Optional KVTransferParams to be included in the request outputs

tuple[bool, dict[str, Any] | None]

returned by the engine.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
@abstractmethod
def request_finished_all_groups(
    self,
    request: "Request",
    block_ids: tuple[list[int], ...],
) -> tuple[bool, dict[str, Any] | None]:
    """
    Called exactly once when a request has finished for all kv cache groups,
    before its blocks are freed for each group.

    NOTE(Kuntai): This function is only supported by connectors that support HMA.

    The connector may assumes responsibility for freeing the blocks
    asynchronously by returning True.

    Returns:
        True if the request is being saved/sent asynchronously and blocks
        should not be freed until the request_id is returned from
        get_finished().
        Optional KVTransferParams to be included in the request outputs
        returned by the engine.
    """
    raise NotImplementedError

supports_hma

supports_hma(connector: Any) -> bool
Source code in vllm/distributed/kv_transfer/kv_connector/v1/base.py
def supports_hma(connector: Any) -> bool:
    if isinstance(connector, type):
        return issubclass(connector, SupportsHMA)
    else:
        return isinstance(connector, SupportsHMA)