Auxiliary SQL extension for Spark SQL
Kyuubi provides SQL extension out of box. Due to the version compatibility with Apache Spark, currently we only support Apache Spark branch-3.1 (i.e 3.1.1 and 3.1.2). And don’t worry, Kyuubi will support the new Apache Spark version in future. Thanks to the adaptive query execution framework (AQE), Kyuubi can do these optimization.
What feature does Kyuubi SQL extension provide
merging small files automatically
Small files is a long time issue with Apache Spark. Kyuubi can merge small files by adding an extra shuffle. Currently, Kyuubi supports handle small files with datasource table and hive table, and also Kyuubi support optimize dynamic partition insertion. For example, a common write query
INSERT INTO TABLE $table1 SELECT * FROM $table2
, Kyuubi will introduce an extra shuffle before write and then the small files will go away.
insert shuffle node before Join to make AQE OptimizeSkewedJoin work
In current implementation, Apache Spark can only optimize skewed join by the standard join which means a join must have two sort and shuffle node. However, in complex scenario this assuming will be broken easily. Kyuubi can guarantee the join is standard by adding an extra shuffle node before join. So that, OptimizeSkewedJoin can work better.
stage level config isolation in AQE
As we know,
spark.sql.adaptive.advisoryPartitionSizeInBytes
is a key config in Apache Spark AQE. It controls how big data size per-task should handle during shuffle, so we always use a 64MB or a smaller value to make parallelism enough. However, in general, we expect a file is big enough like 256MB or 512MB. Kyuubi can make the config isolation to solve the conflict so that we can make staging partition data size small and last partition data size big.
How to use Kyuubi SQL extension
- you need to choose Apache Spark branch-3.1 or higher version with Kyuubi binary tgz.
- if you want to compile Kyuubi by yourself, the maven opt should add
-Pkyuubi-extension-spark-3-1
- move the jar(
kyuubi-extension-spark-*.jar
) which is in$KYUUBI_HOME/extension
into$SPARK_HOME/jars
- add a config into
spark-defaults.conf
,spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension
Now, you can enjoy the Kyuubi SQL Extension, and also Kyuubi provides some configs to make these feature easy to use.
Name | Default Value | Description | Since |
---|---|---|---|
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 |
spark.sql.optimizer.insertRepartitionNum | none | The partition number if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. If AQE is disabled, the default value is spark.sql.shuffle.partitions . If AQE is enabled, the default value is none that means depend on AQE. |
1.2.0 |
spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The partition number of each dynamic partition if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. |
1.2.0 |
spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE OptimizeSkewedJoin works (complex scenario join, multi table join). |
1.2.0 |
spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be spark.sql.finalStage. . For example, the raw spark config: spark.sql.adaptive.advisoryPartitionSizeInBytes , then the final stage config should be: spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes . |
1.2.0 |