• <xmp id="om0om">
  • <table id="om0om"><noscript id="om0om"></noscript></table>
  • 數據科學

    在 RAPIDS libcudf 中使用嵌入式數據類型簡化 ETL 工作流程

    ?

    嵌入式數據類型是一種表示列式數據中分層關系的便捷方式。它們經常用于 提取、轉換、加載(ETL)在商業智能領域的工作負載、推薦系統、網絡安全、地理空間和其他應用中。

    例如,列表類型可用于輕松地將多個事務附加到用戶,而無需創建新的查找表。結構類型可用于在同一列中附加靈活的元數據和許多鍵值對。在 Web 和移動應用程序中,嵌套類型將原始 JSON 對象表示為數據列中的元素,從而使這些數據能頭輸入到 機器學習(ML)訓練管線。許多數據科學應用都依賴于嵌套類型來對復雜的數據輸入進行建模、管理和處理。

    RAPIDS 中,libcudf 是一套用于列式數據處理的 CUDA C++ 庫,旨在加速數據科學庫的性能。RAPIDS libcudf 基于 Apache Arrow 內存格式,支持 GPU 加速的數據讀取器、寫入器、關系代數函數和列轉換操作。

    除了數字和字符串等基本數據類型外,libcudf 還支持嵌套數據類型,例如可變長度列表、結構體以及列表和結構體類型的任意嵌套組合。在 23.02 到 23.12 的版本中,RAPIDS libcudf 擴展了對算法中嵌套數據類型的支持,包括聚合、連接和排序。

    本文展示了使用嵌套數據類型進行數據處理的過程,介紹了實現嵌入式數據處理的“行運算符”,并探討了嵌入式數據類型對性能的影響。

    使用嵌套類型進行數據處理

    數據庫管理中的一個常見工作流程是監控和管理重復數據。RAPIDS libcudf 現在包含一個C++ nested_types 示例,它將 JSON 數據讀取為 libcudf 表,計算第一列中每個不同元素的計數,將計數加入原始表,并將數據寫回 JSON。libcudf 的公共 API 使得數據處理應用程序能夠輕松處理數字或字符串等平面類型,以及結構體和列表等嵌套類型。

    我們的 C++ nested_types 示例使用 libcudf JSON 讀取器將列式格式的嵌套數據提取為表對象。加速的 JSON 讀取器也可供 C++ 開發者使用。JSON 提供了一種人類可讀的方式來創建和檢查嵌套列。要了解在 Python 層中使用 JSON 讀取器的模式,請參閱 使用 RAPIDS 進行 GPU 加速的 JSON 數據處理

    我們read_json函數,C++ nested_types示例接受filepath并返回table_with_metadata對象:

    cudf::io::table_with_metadata read_json(std::string filepath)
    {
      auto source_info = cudf::io::source_info(filepath);
      auto builder     = cudf::io::json_reader_options::builder(source_info).lines(true);
      auto options     = builder.build();
      return cudf::io::read_json(options);
    }

    將 JSON 數據讀取并解析為表對象后,第一個處理步驟是計數聚合,以跟蹤每個不同元素的出現次數。count_aggregate示例中的函數會填充聚合請求,執行聚合函數,然后構建輸出表:

    std::unique_ptr<cudf::table> count_aggregate(cudf::table_view tbl)
    {
      // Get count for each key
      auto keys = cudf::table_view{{tbl.column(0)}};
      auto val  = cudf::make_numeric_column(cudf::data_type{cudf::type_id::INT32}, keys.num_rows());
     
      cudf::groupby::groupby grpby_obj(keys);
      std::vector<cudf::groupby::aggregation_request> requests;
      requests.emplace_back(cudf::groupby::aggregation_request());
      auto agg = cudf::make_count_aggregation<cudf::groupby_aggregation>();
      requests[0].aggregations.push_back(std::move(agg));
      requests[0].values = *val;
      auto agg_results   = grpby_obj.aggregate(requests);
      auto result_key    = std::move(agg_results.first);
      auto result_val    = std::move(agg_results.second[0].results[0]);
     
      auto left_cols = result_key->release();
      left_cols.push_back(std::move(result_val));
     
      // Join on keys to get
      return std::make_unique<cudf::table>(std::move(left_cols));
    }

    掌握計數數據后,下一個處理步驟會將這些數據與原始表連接起來,并添加這些信息,以便為下游分析中基于計數的過濾和根本原因調查提供信息。join_count函數,C++ nested_types示例接受兩個table_view將對象與它們的第一列連接起來,然后構建輸出表:

    std::unique_ptr<cudf::table> join_count(cudf::table_view left, cudf::table_view right)
    {
      auto [left_indices, right_indices] =
        cudf::inner_join(cudf::table_view{{left.column(0)}}, cudf::table_view{{right.column(0)}});
      auto new_left  = cudf::gather(left, cudf::device_span<int const>{*left_indices});
      auto new_right = cudf::gather(right, cudf::device_span<int const>{*right_indices});
     
      auto left_cols  = new_left->release();
      auto right_cols = new_right->release();
      left_cols.push_back(std::move(right_cols[1]));
     
      return std::make_unique<cudf::table>(std::move(left_cols));
    }

    最后一個數據處理步驟根據第一列中的元素對表格進行排序。排序有助于提供確定性排序,從而促進分區和合并等下游步驟。sort_keysC++nested_types 示例中的函數接受table_view計算索引,sorted_order然后根據以下順序收集表格:

    ?

    std::unique_ptr<cudf::table> sort_keys(cudf::table_view tbl)
    {
      auto sort_order = cudf::sorted_order(cudf::table_view{{tbl.column(0)}});
      return cudf::gather(tbl, *sort_order);
    }

    最后,使用 GPU 加速的 JSON 寫入器將已處理的數據序列化回磁盤,該寫入器使用來自read_json以保留輸入數據中的嵌套結構鍵名。write_json函數,C++ nested_types示例接受table_view, table_metadata以及filepath:

    void write_json(cudf::table_view tbl, cudf::io::table_metadata metadata, std::string filepath)
    {
      auto sink_info = cudf::io::sink_info(filepath);
      auto builder   = cudf::io::json_writer_options::builder(sink_info, tbl).lines(true);
      builder.metadata(metadata);
      auto options = builder.build();
      cudf::io::write_json(options);
    }

    總而言之,C++ nested_types示例對第一列中的每個不同元素進行計數,將這些值連接到原始表,然后對第一列上的表進行排序。請注意,此示例中的代碼沒有任何部分特定于嵌套類型。事實上,此示例與 libcudf 中任何支持的數據類型(平面或嵌入式)兼容,展示了 libcudf 嵌入式類型支持的強大功能和靈活性。

    libcudf 行運算符簡介

    在幕后,libcudf 使用幾個關鍵的“行運算符”支持等式比較、不等式比較和元素哈希。這些行運算符在算法吞吐量 libcudf 中重復使用,并能夠將數據類型支持與其他算法細節分離。

    以基于哈希的聚合為例,在構建和探索哈希表時使用哈希運算符和等式運算符。對于基于排序的聚合,Lexicographic Operator 識別一個元素小于另一個元素,并且是任何排序算法的關鍵組件。新的行運算符在 libcudf 中的關系代數函數中解鎖對嵌套類型的支持。

    對于平面類型(例如數字和字符串),行運算符會處理每個元素的值和空狀態。字符串類型通過將可變數量的字符與每個元素關聯起來的整數偏移增加了復雜性。對于結構類型,行運算符會處理結構父級的空狀態以及每個子列的值和空狀態。

    可變長度列表類型增加了另一層復雜性,其中行運算符負責層次結構,包括每個嵌套級別的空狀態、列表深度和列表長度。如果層次結構匹配,則列表運算符會考慮每個葉元素的值和空狀態。在行運算符中,哈希和等式更簡單,因為它們可以以任何順序處理每個元素的數據。但是,對于包含列表的類型,字典比較必須產生一致的排序,因此需要對空狀態、層次結構和值進行順序解析。

    libcudf lexicographic 運算符中列表類型的處理靈感來源于 Parquet 格式中使用的 Dremel 編碼算法。在 Dremel 編碼中,列表列通過三個數據流來表示:一個 定義 流用于記錄空狀態和嵌套深度,一個 重復 流用于記錄列表長度,以及一個 流用于記錄葉值。這種編碼提供了一個平面數據結構,相較于 Arrow 中的遞歸變量長度列表表示,它能更高效地進行處理。

    Dremel 對列表進行編碼的一個限制是,值流僅支持平面類型。為了擴展對包含結構的列表的支持,預處理步驟將嵌入式結構列替換為與每個結構元素的秩對應的整數列。此遞歸預處理步驟擴展了詞庫運算符類型支持,以在數據類型中包含列表和結構的任意組合。

    數據類型如何影響性能

    我們的 C++ nested_types 示例與 libcudf 中所有受支持的數據類型兼容。通過示例中的命令行界面,您可以輕松地比較性能。以下性能數據基于示例中實現的計時收集,并使用 NVIDIA DGX H100 硬件。

    列的數據類型會影響示例的整體運行時間,因為更復雜的數據類型會增加基于排序的處理步驟的運行時間(圖 1)。在一系列數據類型中,結果顯示計數聚合步驟的運行時間為 2-5 毫秒,內部連接步驟的運行時間為 10-25 毫秒。這兩個步驟都使用基于哈希的實現,并依賴于哈希和等式行運算符。

    但是,排序步驟表明,對于包含字符串或列表的可變大小的類型,運行時間已增加到 60-90 毫秒。排序步驟依賴于更復雜的詞匯表行運算符。雖然基于哈希的算法作為數據類型的函數顯示相對一致的運行時間,但基于排序的算法顯示可變大小的類型的運行時間更長。

    Bar chart showing the runtime in ms of count_aggregate, join_count and sort_keys steps by data type, with 85% distinct elements and 20 million rows.
    圖 1.count_aggregate, join_count以及sort_keys其中包含 85%的不同元素和 2000 萬行數據,

    行數和套料深度也會影響示例的性能,因為行數越多,數據類型越簡單,數據處理吞吐量越高。圖 2 顯示count_aggregate性能C++ nested_types例如,吞吐量通常會隨著行數從 10 萬行增加到 2000 萬行而增加。標記為`8`的數據類型有 8 個級別的嵌套深度。intfloat是指 64 位類型。

    請注意,輸入數據使用帶有一個子級的結構,并使用長度為 1 的列表。性能數據顯示峰值吞吐量約為 45 GB/s 的基元類型、峰值吞吐量約為 30 GB/s 的單嵌套類型,以及峰值吞吐量約為 10-25 GB/s 的深度嵌套類型。結構級別產生的開銷比列表級別低,而混合結構/列表嵌套會產生最大的開銷。

    Scatter plot showing data processing throughput in GB/s versus memory size of data in MB.
    圖 2.數據的數據處理吞吐量與數據的顯存大小count_aggregate示例函數。每行顯示從 10 萬行掃描到 2000 萬行的效果

    最后,列表元素的長度也會影響性能,由于比較器中提前退出,列表元素的長度越長,吞吐量越高。圖 3 顯示了列表長度對數據處理吞吐量的影響list<int>列表長度從 1 到 16 的列。隨著列表長度的增加,整數葉總數和總顯存大小也會增加,并且偏移數據的行數和總大小保持不變。

    圖 3 中的數據使用隨機排序的葉值,因此比較器通常只需要檢查每個列表的第一個元素。從 1 到 16 的長度中收集的性能數據顯示,count_aggregate并且吞吐量提高了 4 倍,sort_keysstep.數據使用 1000 萬行、64 位整數葉元素、85%不同的葉值,以及每個表中保持不變的列表長度。

    Scatter plot showing data processing throughput in GB/s compared to list length from 1 to 16. As list length increases, the data shows data processing throughput also increases.
    圖 3.數據處理器的數據處理吞吐量count_aggregate, join_count以及sort_keys具有不同列表長度的單嵌套列表類型的步驟

    總結

    RAPIDS libcudf 為處理嵌套數據類型提供了功能強大、靈活且加速的工具。聚合、連接和排序等關系代數算法針對任何受支持的嵌套數據類型進行了調整和優化,甚至包括深度嵌套和混合列表以及結構嵌套數據類型。

    開始使用 RAPIDS libcudf,您可以構建并運行一些示例。要了解更多關于 CUDA 加速數據幀的信息,請參考cuDF 文檔rapidsai/cudf GitHub 倉庫。為了便于測試和部署,您也可以使用RAPIDS Docker 容器,它提供了正式發布和夜間構建版本。如果您已經在使用 cuDF,可以嘗試運行新的 C++ nested_types 示例,訪問rapidsai/cudf/tree/HEAD/cpp/examples/nested_types 在 GitHub 上查看。

    致謝

    感謝 Devavret Makkar、Jake Hemstad 以及 RAPIDS 團隊的其他成員為這項工作做出的貢獻。

    ?

    0

    標簽

    人人超碰97caoporen国产