• <xmp id="om0om">
  • <table id="om0om"><noscript id="om0om"></noscript></table>
  • 數據科學

    使用 NVIDIA RAPIDS 將大眾汽車連接的汽車數據管道加速 100 倍

    互聯汽車是指使用后端系統與其他車輛進行通信的車輛,以增強可用性,實現便捷的服務,并保持分布式軟件的維護和更新。

    在大眾汽車公司,我們正在使用 NVIDIA 開發互聯汽車,以解決在原生 python 和 pandas 中實現時存在計算效率低下的問題,如地理空間索引和 K 近鄰。

    處理駕駛和傳感器數據對于聯網汽車了解其環境至關重要。它使連接的車輛能夠執行諸如停車點檢測、基于位置的服務、防盜、通過實時交通推薦路線、車隊管理等任務。位置信息是大多數這些用例的關鍵,需要一個快速的處理管道來支持實時服務。

    全球聯網汽車的銷量正在迅速增長,而這反過來又增加了可用的數據量。根據 Gartner ,平均連接車輛每年將產生 280 PB 的數據,其中至少一天會產生 4 TB 的數據。研究還指出,到 2025 年,將部署約 4 . 7 億輛聯網車輛。

    這篇博文將集中討論處理基于位置的地理空間信息和為聯網汽車提供必要服務所需的數據管道。

    互聯汽車數據的挑戰

    使用互聯汽車數據帶來了技術和業務挑戰:

    需要快速處理大量的流數據,因為用戶希望獲得近乎實時的體驗,以便及時做出決策。例如,如果一個用戶請求一個停車位,而系統需要 5 分鐘的響應時間,那么這個停車位很可能在回答時就已經被占用了。更快地處理和分析數據是克服這一挑戰的關鍵因素。

    還有數據隱私問題需要考慮。連接的車輛必須滿足 通用數據保護條例( GDPR ) 。簡而言之, GDPR 要求在數據分析之后,不應該有機會從分析的數據中識別單個用戶。此外,禁止存儲與個人用戶有關的數據(除非用戶書面同意)。匿名化可以通過屏蔽識別單個用戶的數據或者通過分組和聚合數據來滿足這些要求,這樣就不可能跟蹤用戶。為此,我們需要確保處理聯網車輛數據的軟件符合 GDPR 關于數據匿名化的規定,這在數據處理過程中增加了額外的計算要求。

    A picture with three cars streaming data into cloud.
    圖 1 :連接車輛數據挑戰的示意圖。

    采用數據科學方法

    RAPIDS 可以解決互聯汽車的技術和業務難題。開放源碼軟件( OSS )庫和 API 的 RAPIDS 套件使您能夠完全在 GPU 上執行端到端的數據科學和分析管道。根據 Apache 2 . 0 , RAPIDS 由 NVIDIA 孵育 許可,并基于廣泛的硬件和數據科學經驗。 RAPIDS 利用 NVIDIA CUDA 開關 原語進行低級計算優化,并通過用戶友好的 Python 接口公開 GPU 并行性和高帶寬內存速度。

    在下面的部分中,我們將討論 RAPIDS (軟件)和 NVIDIA GPU s (硬件)如何使用測試數據幫助解決原型應用程序的技術和業務挑戰。將評估兩種不同的方法:地理空間索引和 k- 近鄰。

    通過使用 RAPIDS ,我們可以將這個管道的速度提高 100 倍。

    地理空間索引簡介

    地理空間索引是互聯汽車領域許多算法的基礎。它是將地球的各個區域劃分成可識別的網格單元的過程。在查詢互聯汽車產生的海量數據時,對搜索空間進行修剪是一種有效的方法。

    流行的方法包括 軍用網格參考系統 ( MGRS )和 Uber 六邊形層次空間索引 ( Uber H3 )。

    在這個數據管道示例中,我們使用 Uber H3 在空間上將記錄拆分為一組較小的子集。

    以下是將記錄拆分為子集后需要滿足的條件 :

    • 每個子集最多由 N 個記錄組成。這個“ N ”是根據計算能力限制來選擇的。對于這個實驗,我們考慮“ n ”等于 2500 個記錄。
    • 子集由 subset _ id 表示,它是從 0 開始的自動遞增數字。

    以下是示例輸入數據,它有兩列–緯度和經度:

    A table containing a sample of the input data. It consists of two columns: latitude and longitude.
    表 1 :輸入數據示例。

    下面是將 Uber H3 應用于用例需要實現的算法

    • 迭代 latitudelongitude ,并從分辨率 0 分配 hex_id
    • 如果發現任何包含少于 2500 條記錄的 hex_id ,則從 0 開始遞增地分配 subset_id
    • 識別包含超過 2500 條記錄的 hex \ u id 。
    • 以增量分辨率拆分前面的記錄,現在分辨率為 1 。
    • 重復第 3 步和第 4 步,直到所有記錄都分配給 subset \ u id 和 hex \ u id ,或者直到分辨率達到 15 。

    應用上述算法后,將產生以下輸出數據:

    表 2 :應用地理空間索引后的輸出數據示例。

    Uber H3 實現的代碼片段

    下面是使用 pandas 實現 Uber H3 的代碼片段:

    #while loop until all the records are assigned to subset_id
    while resolution < 16 and df["subset_id"].isnull().any():
         #assignment of hex_id
         df['hex_id']= df.apply(lambda row: h3.geo_to_h3(row["latitude"],
                       row["longitude"], resolution), axis = 1)
          df_aggreg = df.groupby(by = "hex_id").size().reset_index()
         df_aggreg.columns = ["hex_id", "value"]
          #filtering the records that are less than 2500 count    
         hex_id = df_aggreg[df_aggreg['value']<2500]['hex_id']
          #assignment of subset_id
         for index, value in hex_id.items():
             df.loc[df['hex_id'] == value, 'subset_id'] = subset_id
             subset_id += 1     
         df_return = df_return.append(df[~df['subset_id'].isna()],
                     ignore_index=True)
         df = df[df['subset_id'].isna()]
         resolution += 1

    下面是使用 PySpark 實現 Uber H3 的代碼片段:

    #while loop until all the records are assigned to subset_id
    while resolution < 16 and (len(df.head(1)) != 0):
          #assignment of hex_id
         df = df.rdd.map(lambda x: (x["latitude"], x["longitude"],
              x["subset_id"],h3.geo_to_h3(x["latitude"], x["longitude"],
              resolution)))
          df = sqlContext.createDataFrame(df, schema)
         df_aggreg = df.groupby("hex_id").count()        
         df_aggreg = df_aggreg.withColumnRenamed("hex_id", "hex_id") 
              .withColumnRenamed("count", "value")
          #filtering the records that are less than 2500 count
         hex_id = df_aggreg.filter(df_aggreg.value < 2500)         
         var_hex_id = list(hex_id.select('hex_id').toPandas()['hex_id'])
         for i in var_hex_id:
             #assignment of subset_id
             df = df.withColumn('subset_id',F.when(df.hex_id==i,subset_id)
                 .otherwise(df.subset_id)).select(df.latitude, df.longitude,
                 'subset_id', df.hex_id)
              subset_id += 1
          df_return = df_return.union(df.filter(df.subset_id != 0))         
         df = df.filter(df.subset_id == 0)
         resolution += 1

    通過對 Uber H3 模型的 pandas 實現,我們發現了一個非常緩慢的執行過程。代碼的執行速度太慢導致生產力大大降低,因為只能做很少的實驗。具體目標是將執行時間縮短 10 倍。

    為了加快管道的速度,我們采取了如下一步一步的方法。

    第一步:簡單 CPU 并行版本

    這個版本的思想是為 H3 庫處理實現一個簡單的基于多處理的內核。處理的第二部分,即根據數據分配子集,是 pandas 庫函數,它不容易并行化。

    #Function to assign hex_id
    def minikernel(df, resolution):
     df['hex_id'] = np.vectorize(lambda latitude, longitude:
                     h3.geo_to_h3(latitude, longitude, resolution))(
                     np.array(df['latitude']), np.array(df['longitude']))
      return df
    #while loop until all the records are assigned to subset_id
    while resolution < 16 and df["subset_id"].isnull().any():     
         #CPU Parallelization
         df_chunk = np.array_split(df, n_cores)
         pool = Pool(n_cores)
     
         #assigning hex_id by calling the function minikernel()
         df_chunk_res=pool.map(partial(minikernel, resolution=resolution),
                         df_chunk)
     
         df = pd.concat(df_chunk_res)
     
         pool.close()
         pool.join()
                 
         df_aggreg = df.groupby(by = "hex_id").size().reset_index()
         df_aggreg.columns = ["hex_id", "value"]
         
         #filtering the records that are less than 2500 count
         hex_id = df_aggreg[df_aggreg['value']<2500]['hex_id']
     
         for index, value in hex_id.items():
                #assignment of subset_id is pandas library function
                #which cannot be parallelized
             df.loc[df['hex_id'] == value, 'subset_id'] = subset_id
             subset_id += 1
        
         df_return = df_return.append(df[~df['subset_id'].isna()],
                     ignore_index=True)
     
         df = df[df['subset_id'].isna()]     
         resolution += 1 

    通過對線程池應用簡單的并行化,我們可以顯著減少代碼的第一部分( H3 庫),但第二部分( pandas 庫)是完全單線程的,速度非常慢。

    第二步:應用 RAPIDS [ZBK9

    這里的想法是盡可能多地使用 cuDF 中的標準特性(因此,最輕微的代碼更改)來實現最佳性能。由于 cuDF 現在在 CUDA 統一內存上運行,因此不可能簡單地并行化第一部分( H3 庫),因為 cuDF 不處理 CPU 分區。代碼如下所示。注意,以下代碼在 cuDF 數據幀上運行。

    #while loop until all the records are assigned to subset_id
    while resolution < 16 and df["subset_id"].isnull().any():
         #assignment of hex_id
         #df is a cuDF
         df['hex_id'] = np.vectorize(lambda latitude, longitude:
                         h3.geo_to_h3(latitude, longitude, resolution))
                         (df['latitude'].to_array(), df['longitude']
                         .to_array())
         
         df_aggreg = df.groupby('hex_id').size().reset_index()
         df_aggreg.columns = ["hex_id", "value"]
     
         #filtering the records that are less than 2500 count
         hex_id = df_aggreg[df_aggreg['value']<2500]['hex_id']
                 
         for index, value in hex_id.to_pandas().items():
             #assignment of subset_id
             df.loc[df['hex_id'] == value, 'subset_id'] = subset_id
             subset_id += 1
             
         df_return = df_return.append(df[~df['subset_id'].isna()],
                     ignore_index=True)
     
         df = df[df['subset_id'].isna()]
         
         resolution += 1

    步驟 3 :使用較大的數據執行簡單的 CPU 并行版本和 cuDF GPU 版本

    在這一步中,我們將數據量增加了三倍,從 50 萬條記錄增加到 150 萬條記錄,并執行一個簡單的 CPU 并行版本及其等價的 cuDF GPU 版本。

    第四步:再做一次實驗,復制到 pandas 和 cuDF

    如步驟 2 所述, cuDF 在 CUDA 統一內存上運行,由于 cuDF 缺少 CPU 分區,因此無法并行化第一部分( H3 庫)。因此,我們沒有使用函數數組\ u split 。為了克服這個挑戰,我們首先將 cuDF 轉換為 pandas 數據幀,然后應用函數數組\ u split ,然后將分割的塊轉換回 cuDF ,并進一步進行 H3 庫處理。

    #while loop until all the records are assigned to subset_id
    while resolution < 16 and df["subset_id"].isnull().any():
         #copy to pandas
         df_temp = df.to_pandas()
         
         #CPU Parallelization
         df_chunk = np.array_split(df_temp, n_cores)
         pool = Pool(n_cores)
         df_chunk_res=pool.map(partial(minikernel, resolution=resolution),
                         df_chunk)
     
         pool.close()
         pool.join()
         
         df_temp = pd.concat(df_chunk_res)
         
         #Back to cuDF
         df = cudf.DataFrame(df_temp)
     
         #assignment of hex_id
         df['hex_id'] = np.vectorize(lambda latitude, longitude:
                         h3.geo_to_h3(latitude, longitude, resolution))
                         (df['latitude'].to_array(), df['longitude']
                         .to_array())
        
         df_aggreg = df.groupby('hex_id').size().reset_index()
         df_aggreg.columns = ["hex_id", "value"]
     
         #filtering the records that are less than 2500 count
         hex_id = df_aggreg[df_aggreg['value']<2500]['hex_id']
                 
         for index, value in hex_id.to_pandas().items():
             #assignment of subset_id
             df.loc[df['hex_id'] == value, 'subset_id'] = subset_id
             subset_id += 1
             
         df_return = df_return.append(df[~df['subset_id'].isna()],
                     ignore_index=True)
     
         df = df[df['subset_id'].isna()]
        
         resolution += 1 

    瀏覽所有上述方法的執行時間圖:

    圖 2 :數據大小為 50 萬的各種方法的執行時間。
    圖 3 :數據大小為 150 萬的各種方法的執行時間。

    加快地理空間指數計算的經驗教訓

    • 高性能: 前面的略圖得出的結論很清楚, cuDF GPU 版本提供了最佳性能。而且數據集越大,速度就越快。
    • 代碼適應性和易轉換性: 請注意,所移植的代碼并不是 GPU 加速的最佳方案。我們正在 CPU 上運行的第三方庫( Uber H3 )上運行比較。為了利用這個庫,我們需要在每個循環上將數據從 GPU 內存復制到 CPU 內存,這不是最佳方法。
      除此之外,還有一個子集合 id 計算,它也是以行方式進行的,通過更改原始代碼可能會加快計算速度。但代碼仍然沒有改變,因為我們的主要目標之一是檢查代碼的適應性以及庫 pandas 和 cuDF 之間的輕松轉換。
    • 可重用代碼: 正如您在前面已經觀察到的,管道是一組標準化的函數,也可以用作解決其他用例的函數。

    一種 CUDA 加速 K 近鄰分類方法的研究

    使用上述方案,而不是通過索引和分組的方式來測量相連車輛的密度——另一種方法是根據兩個地點之間的地球距離進行地理分類。

    我們選擇的分類算法是 K- 近鄰 。最近鄰方法的原理是找到距離數據點最近的預定義數量的數據點( K )。我們將比較基于 CPU 的 KNN 實現與相同算法的 RAPIDS GPU 加速版本。

    在我們當前的用例中,我們使用匿名流式連接的汽車數據(如前面的業務挑戰中所述)。在這里,使用 KNN 對數據進行分組和聚合是匿名化的一部分。

    然而,對于我們的用例,當我們在地理坐標上分組和聚合時,我們將使用 Haversine 度量,它是唯一可以對地理坐標進行聚類的度量。

    在我們的管道中,使用 haversine 作為距離度量的 KNN 的輸入將是地理坐標(緯度、經度)和所需的最近數據點的數量。在下面的示例中,將創建 K = 7 。

    在下面的例子中,我們展示了以元組(經度和緯度)表示的相同數據。

    • 輸入數據與上一個示例中顯示的元組(經度和緯度)相同。
    • 應用 KNN 后,通過 KNN 算法計算群集 id : 對于前兩行輸入數據,集群輸出數據如下所示。為了避免混淆,我們用相應的顏色標記集群 id 。
    A table containing the output data after applying k-nearest neighbors classification to sample input data. It consists of four columns: custer_set_id, latitude and longitude.
    表 3 :應用 k 近鄰分類后的樣本輸出數據。

    以下是使用 pandas 實現 KNN 的代碼片段:

    nbrs = NearestNeighbors(n_neighbors=7, algorithm='ball_tree',
         metric = "haversine").fit(coord_array_rad)
     
    distances, indices = nbrs.kneighbors(coord_array_rad)
     
    # Distance is computed in radians from haversine
    distances_m = earth_radius * distances
     
    # Drop KNN, which are not compliant with minimum distance
    compliant_distances_mask = (distances_m<KNN_MAX_DISTANCE)
                             .all(axis = 1)
     
    compliant_indices = indices[compliant_distances_mask]

    采用 KNN 作為分類算法。 KNN 的缺點是它的計算性能,特別是當數據量較大時。我們的目的是最終利用 cuML 的 KNN 實現。

    之前的實現在相當小的數據集中工作,但沒有在 1 . 5 天內完成 300 萬條記錄的處理。所以我們阻止了它。

    為了轉向 CUDA 加速的 KNN 實現,我們必須用如下所示的等效度量來模擬 haversine 距離。

    第 1 步:圍繞哈弗斯線進行坐標變換

    在運行此練習時,在 cuML 的 KNN 實現中, haversine 距離度量本機不可用。因此,使用歐幾里德距離代替。盡管如此,公平地說,當前版本的 RAPIDS KNN 已經支持 haversine 度量。

    首先,我們將坐標轉換為以米為單位的距離,以便執行距離度量計算。[10]這是通過名為 df _ geo ()的函數實現的,該函數將在下一步中使用。

    歐幾里德距離的一個警告是,它不適用于地球上距離更遠的坐標。相反,它基本上會在地球表面“挖洞”,而不是在地球表面。但是,對于小于等于 100 公里的較小距離,哈弗斯距離和歐幾里德距離之間的差異最小。

    步驟 2 :執行 KNN 算法

    到目前為止,我們已經將所有坐標轉換為東北坐標格式,在這一步中,可以應用實際的 KNN 算法。

    我們在下面的設置中使用了 CUDA 加速 KNN 。我們注意到這個實現執行得非常快,絕對值得實現。

    #defining the hyperparameters
    n_neighbors = 7
    algorithm = "brute"
    metric = "euclidean"
     
    #Implementation of kNN by calling df_geo() which converts the coordinates #into Northing and Easting coordinate format
    nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm=algorithm,
         metric=metric).fit(df_geo[['northing', 'easting']])
     
    distances, indices = nbrs.kneighbors(df_geo[['northing', 'easting']])

    步驟 3 :執行距離掩蔽和過濾

    這一部分是在 CPU 上完成的,因為在 GPU 上沒有明顯的加速。

    distances = cp.asnumpy(distances.values)
    indices = cp.asnumpy(indices.values) 
    #running on CPU
    KNN_MAX_DISTANCE = 10000 # meters
     
    # Drop KNN, which are not compliant with minimum distance
    compliant_distances_mask = (distances < KNN_MAX_DISTANCE).all(axis = 1)
    compliant_indices = indices[compliant_distances_mask]

    我們的結果是在 naive pandas 實現的基礎上應用到一個有 300 萬個樣本的數據集時,加速了 800 倍。

    圖 4 :數據量為 300 萬的各種方法的執行時間。

    K 近鄰聚類的經驗教訓

    • 高性能: 前面的略圖得出的結論很清楚, cuDF GPU 版本提供了最佳性能。即使數據集更大,執行也不會像 CPU 執行那樣花費很長時間。
    • 比較 cuML 和 scikit 的 KNN : 基于 cuML 的實現速度極快。但是我們不得不多走一英里來模擬缺失的距離指標。考慮到所取得的性能提升,做比要求更多的事情絕對值得。同時, haversine 距離在 RAPIDS AI 中受支持,并且與 scikit 實現一樣方便。
      我們利用歐幾里德距離和北向東距的方法來克服缺失的哈弗斯距離。根據我們代碼中的研究“ 在相當長的距離上——可能長達幾千公里或更多,歐幾里德開始錯誤的計算 ”,我們將距離限制為 10 公里。通過使用北距 – 東距,我們首先需要轉換坐標。由于整體性能更好,我們可以接受轉換坐標所需的時間。
    • 代碼適應性和易轉換性: 除北距 – 東距功能外。
    • 剩下的代碼類似于 CPU 代碼,仍然獲得了更好的性能。我們沒有更改代碼,因為我們的主要目標之一也是檢查代碼的適應性以及庫 pandas 和 cuDF 之間的輕松轉換。
    • 可重用代碼: 正如您在前面已經觀察到的, pipeline 是一組標準化的函數,也可以用作解決其他用例的函數。

    概括

    本文總結了 RAPIDS 如何通過在兩個模型(即地理空間索引( Uber H3 )和 K 近鄰分類( KNN ))上對數據管道進行評估,從而將數據管道的速度提高 100 倍。此外,我們分析了 NVIDIA RAPIDS AI 相對于前兩種模型在性能、代碼適應性和可重用性等方面的優缺點。我們的結論是 RAPIDS 肯定是一種流數據處理技術(連接的汽車數據)。它提供了更快的數據處理的好處,這是流數據分析的關鍵因素。而且, RAPIDS 支持大量的機器學習算法。加速的 RAPIDS cuDF 和 cuML 庫的 API 與 pandas 保持相似,以實現簡單的轉換。改造現有的 ML 管道并使其受益于 cuDF 和 cuML 非常容易。

    何時選擇 RAPIDS 而不是標準 Python 和 pandas :

    • 當應用程序需要更快的數據處理時。
    • 如果您確信該代碼在 GPU 中運行比在 CPU 中運行有好處。
    • 如果推薦的算法作為 cuML 的一部分可用。

    本文針對汽車工程師、數據工程師、大數據架構師、項目經理和行業顧問,他們對探索或處理數據科學的可能性以及使用 Python 分析數據感興趣。

    ?

    0

    標簽

    人人超碰97caoporen国产