• <xmp id="om0om">
  • <table id="om0om"><noscript id="om0om"></noscript></table>
  • 數據中心/云端

    云端 Apache Spark 加速深度學習和大語言模型推理

    Apache Spark 是用于大數據處理和分析的行業領先平臺。隨著非結構化數據(documents、emails、multimedia content)的日益普及,深度學習 (DL) 和大語言模型 (LLMs) 已成為現代數據分析工作流的核心組成部分。這些模型支持各種下游任務,例如圖像描述、語義標記、文檔摘要等。

    然而,將 GPU 密集型 DL 與 Spark 相結合一直是一項挑戰。 NVIDIA RAPIDS Accelerator for Apache Spark Spark RAPIDS ML 庫可實現無縫 GPU 加速,但主要用于提取、轉換和加載 (ETL) 以及傳統機器學習 (ML) 工作負載。

    最近用于分布式訓練和推理的 Spark API (如上一篇博客中所述) 在 DL 集成方面取得了重大進展。本文以這項工作為基礎,介紹了 Spark 上分布式推理的最佳實踐。我們將展示與 NVIDIA Triton Inference Server 等服務平臺的集成、使用 vLLM 進行的高性能 LLM 推理以及在云平臺上的部署。

    為什么要進行批量推理?

    雖然實時推理最適合交互式應用,但批量推理提供了一種可擴展的高吞吐量范式,可一次性處理大量數據集。一些關鍵用例包括:

    • 語義搜索 :為大型內容庫批量生成 embeddings 和語義元數據,從而提高搜索質量。
    • 數據轉換: 將非結構化數據集 (例如自由格式的文本或圖像) 轉換、匯總或轉換為結構化模式,以執行下游任務。
    • 內容創作 :自動生成用于大規模內容制作的產品描述、圖像描述、社交媒體帖子或營銷文案。

    將 DL/LLM 模型集成到現有的 Spark 工作流中,可將 DL 和生成式 AI 的功能直接引入統一工作流中的企業數據。現在,我們來探索實現,首先回顧一下 Spark 的 predict_batch_udf API。

    基本部署:使用 predict_batch_udf 進行分布式推理

    Spark 3.4 引入了 predict_batch_udf API,可為 DL 模型推理提供簡單的接口。此 API 可自動將 Spark DataFrame 列轉換為批量處理的 NumPy 輸入,并在 Spark 執行程序上緩存模型。有關更多詳細信息,請參閱 Distributed Deep Learning Made Easy with Spark 3.4

    例如,以下代碼演示了如何使用 Huggingface Sentence Transformers 在包含文本數據的 Spark DataFrame 上執行分布式文本嵌入:

    from pyspark.sql.functions import predict_batch_udf
    from pyspark.sql.types import *
     
    def predict_batch_fn():
        from sentence_transformers import SentenceTransformer
     
        model = SentenceTransformer("paraphrase-MiniLM-L6-v2", device="cuda")
        def predict(inputs):
            return model.encode(inputs)
        return predict
     
    embed_udf = predict_batch_udf(predict_batch_fn,
                                  return_type=ArrayType(FloatType()),
                                  batch_size=128)
     
    df = spark.read.parquet("/path/to/text_data")
    embeddings_df = df.withColumn("embedding", embed("text"))
    embeddings_df.write.parquet("/path/to/embeddings")

    請注意,這是一個數據并行架構 (圖 1) ,其中每個 Python 工作節點將模型副本加載到 GPU 上,并在其數據集分區上進行預測。

    A diagram showing distributed inference architecture using the predict_batch_udf API. Two identical nodes are displayed, each containing an Executor with multiple predict() functions running in parallel. Each executor has a GPU, which contains a diagram of a neural network with “x4”, representing the number of models running in parallel. All nodes connect to a shared Distributed File System, from which the predict() functions load data.
    圖 1。使用 predict_batch_udf API 進行分布式推理。每個 Python 工作者加載一個模型副本。

    借助這種直接推理方法,您可以將現有的 PyTorch、TensorFlow 或 Hugging Face 框架代碼移植到 Spark,以進行分布式推理,同時盡可能減少代碼更改。

    但是,我們將看到,將多個模型副本加載到 GPU 上可能會給大型模型帶來問題。我們將討論推理服務如何解決這一問題,從而改進資源分離。

    高級部署:分布式推理服務

    在基本方法中,使用 predict_batch_udf 并行執行任務會使每個 Python 工作者在 GPU 上加載模型副本。因此,您必須調整每個執行程序的任務,以確定可以運行的模型副本數量,而不會出現內存不足錯誤或過度開銷。占用整個 GPU 顯存的大型模型(例如 LLMs)可能需要每個執行程序僅執行一項任務(即 spark.task.resource.gpu.amount=1,適用于整個應用,如圖 2 所示)。

    A diagram showing distributed inference architecture using the predict_batch_udf API. Two identical nodes are displayed, each containing an Executor where only a single predict() function is running. Each executor has a GPU, which contains a diagram of a neural network, representing a single model running on the GPU. All nodes connect to a shared Distributed File System, from which the predict() function loads data.
    圖 2。如果使用 predict_batch_udf 僅有 1 個模型適合 GPU,則我們必須將每個執行程序限制為 1 項任務。

    predict_batch_udf 的這種局限性凸顯了 Spark 調度的挑戰:它統一處理所有任務,而不區分 CPU 和 GPU 資源利用率。

    推理服務通過 將 GPU 執行與 Spark 任務調度解 。我們可以在每個執行程序上部署專用推理服務器,而不是在每個 Spark 任務中加載模型。然后,許多任務可以并行加載、預處理和寫入數據,以充分利用執行程序 CPU,而服務器將占用和利用 GPU 進行推理,如下所示。

    A diagram showing distributed inference architecture using an inference server. Two identical nodes are displayed, each containing an Executor with multiple predict() functions running in parallel. Each executor is connected to a server by a bidirectional arrow representing an HTTP connection. Each server has a GPU, which contains a diagram of a neural network, representing a single model running on the GPU. All nodes connect to a shared Distributed File System, from which the predict() functions load data.
    圖 3。使用 Inference Server 進行分布式推理,將 CPU 和 GPU 執行解耦。

    通過在 CPU 和 GPU 并行性之間提供邏輯分離,推理服務無需根據 GPU 顯存調整每個執行程序的任務。此外,它還支持輕松集成服務功能,例如模型管理和動態批處理。

    我們在 Spark-RAPIDS-Examples DL 推理庫 中提供服務器實用程序,以在 Spark 集群中啟動和管理服務器實例,并支持 NVIDIA Triton Inference Server vLLM 。請注意,這些示例正在不斷發展:我們可能很快會將支持范圍擴展到 NVIDIA Dynamo NVIDIA NIMs 等推理解決方案。

    與 Triton Inference Server 配合使用

    NVIDIA Triton 推理服務器是用于高性能模型服務的行業標準平臺,支持許多 主要功能 ,包括模型集成、并發執行和動態批處理。由于 Triton 通常在 Docker 容器中運行,因此在基于云的 Spark 環境(執行程序本身在容器中運行)中進行部署需要在 Docker 中部署 Docker,這帶來了權限要求和缺乏資源隔離等挑戰。

    幸運的是, PyTriton 提供了一個 Python 原生接口,可直接在 Python 進程中運行 Triton,從而簡化云端部署。有關 PyTriton 部署基礎知識的 博客 ,請查看簡要概述。

    Spark-RAPIDS-Examples DL Inference repo 中的 server_utils 模塊提供 TritonServerManager,用于管理整個 Spark 集群中服務器的生命周期,包括查找和分配可用端口、在每個執行程序上啟動服務器進程以及處理推理后的正常關閉。

    在本課程中,部署 Triton 服務器的步驟很簡單:

    1. 使用 PyTriton 服務器邏輯定義 triton_server 函數,其中包含您的 inference 框架代碼。
    2. 使用您的模型名稱和路徑初始化 TritonServerManager
    3. 調用 TritonServerManager.start_servers(triton_server),在集群中分配 triton_server 函數。

    我們將介紹以下步驟。首先,定義 triton_server 函數。為簡潔起見,我們省略了這一點 – 請參閱 notebooks 以獲取您選擇的框架中的大量示例。

    def triton_server(ports: List[int], model_path: str):
        # Load model to GPU, define inference logic, bind to server

    定義服務器邏輯后,使用模型名稱和路徑初始化服務器管理器,并在啟動服務器時傳遞 triton_server 函數:

    from server_utils import TritonServerManager
     
    server_manager = TritonServerManager(model_name="my-model", model_path="path/to/my-model")
    server_manager.start_servers(triton_server)
    host_to_grpc_url = server_manager.host_to_grpc_url

    驅動上的 ServerManager 向每個執行程序分配啟動任務,從而生成運行用戶定義的 triton_server 函數的 Python 進程,如圖 4 所示。

    A diagram showing distributed inference architecture when launching server processes. A driver and two identical nodes are displayed. Each node contains an Executor. The driver contains a box representing a ServerManager calling start_servers(triton_server). The ServerManager on the driver points to a box in each of the Executors, which contains the spawn_server(triton_server) function. Each of these are in turn pointing to a triton server within the node, which is connected to a GPU.
    圖 4。ServerManager start_servers() 函數在每個 executor 上啟動 triton_server 進程的部署。

    然后,使用 predict_batch_udf 預處理一批輸入,并使用 PyTriton 的 ModelClient API 向服務器發送推理請求。

    def triton_predict_fn(model_name, host_to_url):
        import socket
        from pytriton.client import ModelClient
     
        url = host_to_url.get(socket.gethostname())
     
        def infer_batch(inputs):
            with ModelClient(url, model_name) as client:
                # Do some preprocessing...
                result_data = client.infer_batch(inputs)  # Send batch to server
                return result_data["predictions"# Return predictions
             
        return infer_batch
     
    predict_udf = predict_batch_udf(partial(triton_predict_fn, model_name="my-model", host_to_url=host_to_grpc_url),
                                    return_type=ArrayType(FloatType()),
                                    batch_size=32)
     
    # Run inference
    df = spark.read.parquet("/path/to/my-data")
    predictions_df = df.withColumn("predictions", predict_udf(col("data")))
    predictions_df.write.parquet("/path/to/predictions.parquet")
     
    # Once we're finished, stop servers
    server_manager.stop_servers()

    請注意,在 CPU 上的 UDF 中執行的加載和預處理現已與 GPU 上的推理解耦 — Spark 可以自由并行安排這些任務,而無需在 GPU 顯存中創建額外的模型副本。

    使用 vLLM 服務器提供服務

    雖然 Triton 擅長處理自定義推理邏輯、多個框架和不同的模型類型,但 vLLM 是一種直接的替代服務,專門針對 LLM 進行了優化。它為生產部署提供了兼容 OpenAI 的 HTTP 服務器。

    我們支持通過實用程序類中的 VLLMServerManager 在 Spark 上提供 vLLM 服務。與圖 3 類似,此方法會在每個 Spark 執行程序上啟動 vLLM 服務器進程,從而將 CPU 和 GPU 執行解耦。啟動服務器時,您可以傳遞任何受支持的 vLLM CLI 參數,而不是使用自定義服務器功能:

    from server_utils import VLLMServerManager
     
    server_manager = VLLMServerManager(model_name="qwen-2.5-7b",
                                       model_path="/path/to/Qwen2.5-7B")
    server_manager.start_servers(gpu_memory_utilization=0.95,
                                 max_model_len=6600,
                                 task="generate")
     
    host_to_http_url = server_manager.host_to_http_url

    同樣,您可以通過向服務器發送請求來運行分布式推理,在本例中,您可以使用兼容 Open-AI 的 JSON 格式:

    def vllm_fn(model_name, host_to_url):
        import socket
        import numpy as np
        import requests
     
        url = host_to_url[socket.gethostname()]
         
        def predict(inputs):
            response = requests.post(
                f"{url}/v1/completions",
                json={
                    "model": model_name,
                    "prompt": inputs.tolist(),
                    "max_tokens": 256,
                }
            )
            return np.array([r["text"] for r in response.json()["choices"]])
         
        return predict
     
    generate = predict_batch_udf(partial(vllm_fn, model_name="qwen-2.5-7b", host_to_url=host_to_http_url),
                                 return_type=StringType(),
                                 batch_size=32)
     
    # Run inference
    preds = text_df.withColumn("response", generate("prompt"))
     
    # Once we're finished, stop servers
    server_manager.stop_servers()

    摘要:選擇部署策略

    在探索了這兩種部署方法后,我們來比較它們的優勢和權衡,以指導您的實施。我們通常推薦簡單原型設計的基本方法,以及實現靈活性和清潔資源分離的高級方法,如下表總結所示。

    注意事項 基本部署 (predict_batch_udf) 高級部署 (Inference Server)
    資源管理 需要調整任務并行性以適應 GPU 內存 無需任務并行調優 – 將 CPU 和 GPU 調度解耦
    設置復雜性 簡單、直接移植您的 framework 代碼 使用 ServerManager 非常簡單,但需要一些額外的客戶端/服務器代碼
    推理功能 受限于 framework 功能 其他特定于服務器的功能 (dynamic batching, model ensembles)
    可移植性 推理代碼特定于 UDF 只需在服務器中定義一次推理邏輯,即可在在線/離線應用程序中重復使用
    最適合 小模型、簡單流程和原型設計 更大的模型和復雜的 pipelines
    表 1。部署方法之間的主要區別。

    在云平臺上部署

    雖然我們 之前的博客 演示了本地部署,但我們已更新 Spark-RAPIDS-Examples DL 推理資源庫 ,為您提供在 CSP Spark 集群上部署 DL/LLM 推理工作負載所需的一切。

    云就緒模板

    CSP 說明 提供了用于設置和運行工作負載的云就緒模板,目標是 Databricks 和 Dataproc Spark 環境。其中包括:

    • 預配置初始化腳本,用于 spin-up 集群并安裝必要的庫。
    • 推薦用于批量推理的 Spark 配置。
    • 從云存儲保存和加載模型的最佳實踐。

    無論您的集群環境如何 (包括本地獨立集群、Databricks 集群或 Dataproc 集群) ,Notebooks 均配置為端到端工作,且無需更改代碼。

    配置 GPU 實例

    要運行這些示例,我們推薦 A10/L4 或更高版本的 GPU 實例 (例如 Azure 上的 NVadsA10、AWS 上的 g5/g6、GCP 上的 g2-standard) ,可確保有足夠的 GPU 顯存。A100/H100 GPU 將是大型 LLM 的更好選擇,例如,半精度的 Llama 70b 可在兩臺 H100 上舒適運行。

    對于大型模型,通常需要跨多個 GPU 對模型進行分片,并且可以在每個節點有多個 GPU 的 Spark 集群中完成。設置 spark.executor.resource.gpu.amount=(gpus per node) 將為每個執行程序分配必要的 GPU,進而使推理服務器可見這些 GPU。然后,可以通過框架并行處理模型:例如,通過設置 tensor_parallel_size=(gpus per node) 與 vLLM。有關此示例,請參閱 vLLM Tensor Parallel Notebook

    入門指南

    首先,您可以瀏覽 示例 notebooks ,其中展示了使用開源模型和數據集的一系列用例 (包括圖像分類、情感分析、文檔摘要等) 的端到端 Spark 應用。要在云端部署這些應用程序,請參閱我們的 CSP 平臺運行指南

    ?

    0

    標簽

    人人超碰97caoporen国产