當(dāng)前位置:首頁(yè) > IT技術(shù) > Web編程 > 正文

查詢(xún)性能提升3倍!Apache Hudi 查詢(xún)優(yōu)化了解下?
2022-03-06 17:56:21

從 Hudi 0.10.0版本開(kāi)始,我們很高興推出在數(shù)據(jù)庫(kù)領(lǐng)域中稱(chēng)為 Z-Order 和 Hilbert 空間填充曲線的高級(jí)數(shù)據(jù)布局優(yōu)化技術(shù)的支持。

1. 背景

Amazon EMR 團(tuán)隊(duì)最近發(fā)表了一篇很不錯(cuò)的文章展示了對(duì)數(shù)據(jù)進(jìn)行聚簇是如何提高查詢(xún)性能的,為了更好地了解發(fā)生了什么以及它與空間填充曲線的關(guān)系,讓我們仔細(xì)研究該文章的設(shè)置。

文章中比較了 2 個(gè) Apache Hudi 表(均來(lái)自 Amazon Reviews 數(shù)據(jù)集):

  • 未聚簇的 amazon_reviews 表(即數(shù)據(jù)尚未按任何特定鍵重新排序)

  • amazon_reviews_clustered 聚簇表。當(dāng)數(shù)據(jù)被聚簇后,數(shù)據(jù)按字典順序排列(這里我們將這種排序稱(chēng)為線性排序),排序列為star_rating、total_votes兩列(見(jiàn)下圖)

為了展示查詢(xún)性能的改進(jìn),對(duì)這兩個(gè)表執(zhí)行以下查詢(xún):

這里要指出的重要考慮因素是查詢(xún)指定了排序的兩個(gè)列(star_rating 和 total_votes)。但不幸的是這是線性/詞典排序的一個(gè)關(guān)鍵限制,如果添加更多列,排序的價(jià)值會(huì)會(huì)隨之減少。

從上圖可以看到,對(duì)于按字典順序排列的 3 元組整數(shù),只有第一列能夠?qū)λ芯哂邢嗤档挠涗浘哂嘘P(guān)鍵的局部性屬性:例如所有記錄都具有以“開(kāi)頭的值” 1"、"2"、"3"(在第一列中)很好地聚簇在一起。但是如果嘗試在第三列中查找所有值為"5"的值,會(huì)發(fā)現(xiàn)這些值現(xiàn)在分散在所有地方,根本沒(méi)有局部性,過(guò)濾效果很差。

提高查詢(xún)性能的關(guān)鍵因素是局部性:它使查詢(xún)能夠顯著減少搜索空間和需要掃描、解析等的文件數(shù)量。

但是這是否意味著如果我們按表排序的列的第一個(gè)(或更準(zhǔn)確地說(shuō)是前綴)以外的任何內(nèi)容進(jìn)行過(guò)濾,我們的查詢(xún)就注定要進(jìn)行全面掃描?不完全是,局部性也是空間填充曲線在枚舉多維空間時(shí)啟用的屬性(我們表中的記錄可以表示為 N 維空間中的點(diǎn),其中 N 是我們表中的列數(shù))

那么它是如何工作的?我們以 Z 曲線為例:擬合二維平面的 Z 階曲線如下所示:

可以看到按照路徑,不是簡(jiǎn)單地先按一個(gè)坐標(biāo) ("x") 排序,然后再按另一個(gè)坐標(biāo)排序,它實(shí)際上是在對(duì)它們進(jìn)行排序,就好像這些坐標(biāo)的位已交織成單個(gè)值一樣:

在線性排序的情況下局部性?xún)H使用第一列相比,該方法的局部性使用到所有列。

以類(lèi)似的方式,希爾伯特曲線允許將 N 維空間中的點(diǎn)(我們表中的行)映射到一維曲線上,基本上對(duì)它們進(jìn)行排序,同時(shí)仍然保留局部性的關(guān)鍵屬性,在此處閱讀有關(guān)希爾伯特曲線的更多詳細(xì)信息,到目前為止我們的實(shí)驗(yàn)表明,使用希爾伯特曲線對(duì)數(shù)據(jù)進(jìn)行排序會(huì)有更好的聚簇和性能結(jié)果。

現(xiàn)在讓我們來(lái)看看它的實(shí)際效果!

2. 設(shè)置

我們將再次使用 Amazon Reviews 數(shù)據(jù)集,但這次我們將使用 Hudi 按 product_idcustomer_id 列元組進(jìn)行 Z-Order排序,而不是聚簇或線性排序。

數(shù)據(jù)集不需要特別的準(zhǔn)備,可以直接從 S3 中以 Parquet 格式下載并將其直接用作 Spark 將其攝取到 Hudi 表。

啟動(dòng)spark-shell

./bin/spark-shell --master 'local[4]' --driver-memory 8G --executor-memory 8G 
  --jars ../../packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.10.0.jar 
  --packages org.apache.spark:spark-avro_2.12:2.4.4 
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

導(dǎo)入Hudi表

import org.apache.hadoop.fs.{FileStatus, Path}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.DataFrame

import java.util.stream.Collectors

val layoutOptStrategy = "z-order"; // OR "hilbert"

val inputPath = s"file:///${System.getProperty("user.home")}/datasets/amazon_reviews_parquet"
val tableName = s"amazon_reviews_${layoutOptStrategy}"
val outputPath = s"file:///tmp/hudi/$tableName"


def safeTableName(s: String) = s.replace('-', '_')

val commonOpts =
  Map(
    "hoodie.compact.inline" -> "false",
    "hoodie.bulk_insert.shuffle.parallelism" -> "10"
  )


////////////////////////////////////////////////////////////////
// Writing to Hudi
////////////////////////////////////////////////////////////////

val df = spark.read.parquet(inputPath)

df.write.format("hudi")
  .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
  .option("hoodie.table.name", tableName)
  .option(PRECOMBINE_FIELD.key(), "review_id")
  .option(RECORDKEY_FIELD.key(), "review_id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category")
  .option("hoodie.clustering.inline", "true")
  .option("hoodie.clustering.inline.max.commits", "1")
  // NOTE: Small file limit is intentionally kept _ABOVE_ target file-size max threshold for Clustering,
  // to force re-clustering
  .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(1024 * 1024 * 1024)) // 1Gb
  .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(128 * 1024 * 1024)) // 128Mb
  // NOTE: We're increasing cap on number of file-groups produced as part of the Clustering run to be able to accommodate for the 
  // whole dataset (~33Gb)
  .option("hoodie.clustering.plan.strategy.max.num.groups", String.valueOf(4096))
  .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
  .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy)
  .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id")
  .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
  .option(BULK_INSERT_SORT_MODE.key(), "NONE")
  .options(commonOpts)
  .mode(ErrorIfExists)

3. 測(cè)試

每個(gè)單獨(dú)的測(cè)試請(qǐng)?jiān)趩为?dú)的 spark-shell 中運(yùn)行,以避免緩存影響測(cè)試結(jié)果。

////////////////////////////////////////////////////////////////
// Reading
///////////////////////////////////////////////////////////////

// Temp Table w/ Data Skipping DISABLED
val readDf: DataFrame =
  spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false").format("hudi").load(outputPath)

val rawSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot")

readDf.createOrReplaceTempView(rawSnapshotTableName)


// Temp Table w/ Data Skipping ENABLED
val readDfSkip: DataFrame =
  spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(outputPath)

val dataSkippingSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot_skipping")

readDfSkip.createOrReplaceTempView(dataSkippingSnapshotTableName)

// Query 1: Total votes by product_category, for 6 months
def runQuery1(tableName: String) = {
  // Query 1: Total votes by product_category, for 6 months
  spark.sql(s"SELECT sum(total_votes), product_category FROM $tableName WHERE review_date > '2013-12-15' AND review_date < '2014-06-01' GROUP BY product_category").show()
}

// Query 2: Average star rating by product_id, for some product
def runQuery2(tableName: String) = {
  spark.sql(s"SELECT avg(star_rating), product_id FROM $tableName WHERE product_id in ('B0184XC75U') GROUP BY product_id").show()
}

// Query 3: Count number of reviews by customer_id for some 5 customers
def runQuery3(tableName: String) = {
  spark.sql(s"SELECT count(*) as num_reviews, customer_id FROM $tableName WHERE customer_id in ('53096570','10046284','53096576','10000196','21700145') GROUP BY customer_id").show()
}

//
// Query 1: Is a "wide" query and hence it's expected to touch a lot of files
//
scala> runQuery1(rawSnapshotTableName)
+----------------+--------------------+
|sum(total_votes)|    product_category|
+----------------+--------------------+
|         1050944|                  PC|
|          867794|             Kitchen|
|         1167489|                Home|
|          927531|            Wireless|
|            6861|               Video|
|           39602| Digital_Video_Games|
|          954924|Digital_Video_Dow...|
|           81876|             Luggage|
|          320536|         Video_Games|
|          817679|              Sports|
|           11451|  Mobile_Electronics|
|          228739|  Home_Entertainment|
|         3769269|Digital_Ebook_Pur...|
|          252273|                Baby|
|          735042|             Apparel|
|           49101|    Major_Appliances|
|          484732|             Grocery|
|          285682|               Tools|
|          459980|         Electronics|
|          454258|            Outdoors|
+----------------+--------------------+
only showing top 20 rows

scala> runQuery1(dataSkippingSnapshotTableName)
+----------------+--------------------+
|sum(total_votes)|    product_category|
+----------------+--------------------+
|         1050944|                  PC|
|          867794|             Kitchen|
|         1167489|                Home|
|          927531|            Wireless|
|            6861|               Video|
|           39602| Digital_Video_Games|
|          954924|Digital_Video_Dow...|
|           81876|             Luggage|
|          320536|         Video_Games|
|          817679|              Sports|
|           11451|  Mobile_Electronics|
|          228739|  Home_Entertainment|
|         3769269|Digital_Ebook_Pur...|
|          252273|                Baby|
|          735042|             Apparel|
|           49101|    Major_Appliances|
|          484732|             Grocery|
|          285682|               Tools|
|          459980|         Electronics|
|          454258|            Outdoors|
+----------------+--------------------+
only showing top 20 rows

//
// Query 2: Is a "pointwise" query and hence it's expected that data-skipping should substantially reduce number 
// of files scanned (as compared to Baseline)
//
// NOTE: That Linear Ordering (as compared to Space-curve based on) will have similar effect on performance reducing
// total # of Parquet files scanned, since we're querying on the prefix of the ordering key
//
scala> runQuery2(rawSnapshotTableName)
+----------------+----------+
|avg(star_rating)|product_id|
+----------------+----------+
|             1.0|B0184XC75U|
+----------------+----------+


scala> runQuery2(dataSkippingSnapshotTableName)
+----------------+----------+
|avg(star_rating)|product_id|
+----------------+----------+
|             1.0|B0184XC75U|
+----------------+----------+

//
// Query 3: Similar to Q2, is a "pointwise" query, but querying other part of the ordering-key (product_id, customer_id)
// and hence it's expected that data-skipping should substantially reduce number of files scanned (as compared to Baseline, Linear Ordering).
//
// NOTE: That Linear Ordering (as compared to Space-curve based on) will _NOT_ have similar effect on performance reducing
// total # of Parquet files scanned, since we're NOT querying on the prefix of the ordering key
//
scala> runQuery3(rawSnapshotTableName)
+-----------+-----------+
|num_reviews|customer_id|
+-----------+-----------+
|         50|   53096570|
|          3|   53096576|
|         25|   10046284|
|          1|   10000196|
|         14|   21700145|
+-----------+-----------+

scala> runQuery3(dataSkippingSnapshotTableName)
+-----------+-----------+
|num_reviews|customer_id|
+-----------+-----------+
|         50|   53096570|
|          3|   53096576|
|         25|   10046284|
|          1|   10000196|
|         14|   21700145|
+-----------+-----------+

4. 結(jié)果

我們總結(jié)了以下的測(cè)試結(jié)果

可以看到多列線性排序?qū)τ诎戳校≦2、Q3)以外的列進(jìn)行過(guò)濾的查詢(xún)不是很有效,這與空間填充曲線(Z-order 和 Hilbert)形成了非常明顯的對(duì)比,后者將查詢(xún)時(shí)間加快多達(dá) 3倍 。值得注意的是性能提升在很大程度上取決于基礎(chǔ)數(shù)據(jù)和查詢(xún),在我們內(nèi)部數(shù)據(jù)的基準(zhǔn)測(cè)試中,能夠?qū)崿F(xiàn)超過(guò) 11倍 的查詢(xún)性能改進(jìn)!

5. 總結(jié)

Apache Hudi v0.10 為開(kāi)源帶來(lái)了新的布局優(yōu)化功能 Z-order 和 Hilbert。 使用這些行業(yè)領(lǐng)先的布局優(yōu)化技術(shù)可以為用戶(hù)查詢(xún)帶來(lái)顯著的性能提升和成本節(jié)約!

本文摘自 :https://www.cnblogs.com/

開(kāi)通會(huì)員,享受整站包年服務(wù)立即開(kāi)通 >