Schema evolution is an essential aspect of data management, and Hudi supports schema evolution on write out-of-the-box, and experimental support for schema evolution on read. This page will discuss the schema evolution support in Hudi.

Schema Evolution on Write

Hudi supports backwards-compatible schema evolution scenarios out of the box, such as adding a nullable field or promoting a field’s datatype.

We recommend employing this approach as much as possible. This is a practical and efficient way to evolve schemas, proven at large-scale data lakes at companies like Uber, Walmart, and LinkedIn. It is also implemented at scale by vendors like Confluent for streaming data. Given the continuous nature of streaming data, there are no boundaries to define a schema change that can be incompatible with the previous schema (e.g., renaming a column).

Furthermore, the evolved schema is queryable across high-performance engines like Presto and Spark SQL without additional overhead for column ID translations or type reconciliations. The following table summarizes the schema changes compatible with different Hudi table types.

The incoming schema will automatically have missing columns added with null values from the table schema. For this we need to enable the following config hoodie.write.set.null.for.missing.columns, otherwise the pipeline will fail.

Schema Change COW MOR Remarks
Add a new nullable column at root level at the end Yes Yes Yes means that a write with evolved schema succeeds and a read following the write succeeds to read entire dataset.
Add a new nullable column to inner struct (at the end) Yes Yes
Add a new complex type field with default (map and array) Yes Yes
Add a new nullable column and change the ordering of fields No No Write succeeds but read fails if the write with evolved schema updated only some of the base files but not all. Currently, Hudi does not maintain a schema registry with history of changes across base files. Nevertheless, if the upsert touched all base files then the read will succeed.
Add a custom nullable Hudi meta column, e.g. _hoodie_meta_col Yes Yes
Promote datatype for a field at root level Yes Yes
Promote datatype for a nested field Yes Yes
Promote datatype for a complex type (value of map or array) Yes Yes
Add a new non-nullable column at root level at the end No No In case of MOR table with Spark data source, write succeeds but read fails. As a workaround, you can make the field nullable.
Add a new non-nullable column to inner struct (at the end) No No
Demote datatype for a field at root level No No
Demote datatype for a nested field No No
Demote datatype for a complex type (value of map or array) No No

Type Promotions

This chart shows what the table schema will be when an incoming column type has changed (X means that it is not allowed):

Incoming Schema ↓ \ Table Schema → int long float double string bytes
int int long float double string X
long long long float double string X
float float float float double string X
double double double double double string X
string string string string string string bytes
bytes X X X X string bytes

Schema Evolution on read

There are often scenarios where it’s desirable to have the ability to evolve the schema more flexibly. For example,

  1. Columns (including nested columns) can be added, deleted, modified, and moved.
  2. Renaming of columns (including nested columns).
  3. Add, delete, or perform operations on nested columns of the Array type.

Hudi has experimental support for allowing backward incompatible schema evolution scenarios on write while resolving it during read time. To enable this feature, hoodie.schema.on.read.enable=true needs to be set on the writer config (Datasource) or table property (SQL).

Hudi versions > 0.11 and Spark versions > 3.1.x, and 3.2.1 are required. For Spark 3.2.1 and above, spark.sql.catalog.spark_catalog must also be set. If schema on read is enabled, it cannot be disabled again since the table would have accepted such schema changes already.

Adding Columns

  1. -- add columns
  2. ALTER TABLE tableName ADD COLUMNS(col_spec[, col_spec ...])

Column specification consists of five field, next to each other.

Parameter Description
col_name name of the new column. To add sub-column col1 to a nested map type column member map>, set this field to member.value.col1
col_type type of the new column.
nullable whether or not the new column allows null values. (optional)
comment comment of the new column. (optional)
col_position The position where the new column is added. The value can be FIRST or AFTER origin_col. If it is set to FIRST, the new column will be added before the first column of the table. If it is set to AFTER origin_col, the new column will be added after the original column. FIRST can be used only when new sub-columns are added to nested columns and not in top-level columns. There are no restrictions on the usage of AFTER.

Examples

  1. ALTER TABLE h0 ADD COLUMNS(ext0 string);
  2. ALTER TABLE h0 ADD COLUMNS(new_col int not null comment 'add new column' AFTER col1);
  3. ALTER TABLE complex_table ADD COLUMNS(col_struct.col_name string comment 'add new column to a struct col' AFTER col_from_col_struct);

Altering Columns

Syntax

  1. -- alter table ... alter column
  2. ALTER TABLE tableName ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name

Parameter Description

Parameter Description
tableName Table name.
col_old_name Name of the column to be altered.
column_type Type of the target column.
col_comment Optional comments on the altered column.
column_name The new position to place the altered column. For example, AFTER column_name indicates that the target column is placed after column_name.

Examples

  1. --- Changing the column type
  2. ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
  3. --- Altering other attributes
  4. ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
  5. ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
  6. ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
  7. ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL

column type change

Source\Target long float double string decimal date int
int Y Y Y Y Y N Y
long Y Y Y Y Y N N
float N Y Y Y Y N N
double N N Y Y Y N N
decimal N N N Y Y N N
string N N N Y Y Y N
date N N N Y N Y N

Deleting Columns

Syntax

  1. -- alter table ... drop columns
  2. ALTER TABLE tableName DROP COLUMN|COLUMNS cols

Examples

  1. ALTER TABLE table1 DROP COLUMN a.b.c
  2. ALTER TABLE table1 DROP COLUMNS a.b.c, x, y

Renaming columns

Syntax

  1. -- alter table ... rename column
  2. ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName

Examples

  1. ALTER TABLE table1 RENAME COLUMN a.b.c TO x

When using hive metastore, please disable hive.metastore.disallow.incompatible.col.type.changes if you encounter this error: The following columns have types incompatible with the existing columns in their respective positions.

Schema Evolution in Action

Let us walk through an example to demonstrate the schema evolution support in Hudi. In the below example, we are going to add a new string field and change the datatype of a field from int to long.

  1. scala> :paste
  2. import org.apache.hudi.QuickstartUtils._
  3. import scala.collection.JavaConversions._
  4. import org.apache.spark.sql.SaveMode._
  5. import org.apache.hudi.DataSourceReadOptions._
  6. import org.apache.hudi.DataSourceWriteOptions._
  7. import org.apache.hudi.config.HoodieWriteConfig._
  8. import org.apache.spark.sql.types._
  9. import org.apache.spark.sql.Row
  10. val tableName = "hudi_trips_cow"
  11. val basePath = "file:///tmp/hudi_trips_cow"
  12. val schema = StructType( Array(
  13. StructField("rowId", StringType,true),
  14. StructField("partitionId", StringType,true),
  15. StructField("preComb", LongType,true),
  16. StructField("name", StringType,true),
  17. StructField("versionId", StringType,true),
  18. StructField("intToLong", IntegerType,true)
  19. ))
  20. val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
  21. Row("row_2", "part_0", 0L, "john", "v_0", 0),
  22. Row("row_3", "part_0", 0L, "tom", "v_0", 0))
  23. var dfFromData1 = spark.createDataFrame(data1, schema)
  24. dfFromData1.write.format("hudi").
  25. options(getQuickstartWriteConfigs).
  26. option("hoodie.datasource.write.precombine.field", "preComb").
  27. option("hoodie.datasource.write.recordkey.field", "rowId").
  28. option("hoodie.datasource.write.partitionpath.field", "partitionId").
  29. option("hoodie.index.type","SIMPLE").
  30. option("hoodie.table.name", tableName).
  31. mode(Overwrite).
  32. save(basePath)
  33. var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
  34. tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
  35. ctrl+D
  36. scala> spark.sql("desc hudi_trips_snapshot").show()
  37. +--------------------+---------+-------+
  38. | col_name|data_type|comment|
  39. +--------------------+---------+-------+
  40. | _hoodie_commit_time| string| null|
  41. |_hoodie_commit_seqno| string| null|
  42. | _hoodie_record_key| string| null|
  43. |_hoodie_partition...| string| null|
  44. | _hoodie_file_name| string| null|
  45. | rowId| string| null|
  46. | partitionId| string| null|
  47. | preComb| bigint| null|
  48. | name| string| null|
  49. | versionId| string| null|
  50. | intToLong| int| null|
  51. +--------------------+---------+-------+
  52. scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
  53. +-----+-----------+-------+----+---------+---------+
  54. |rowId|partitionId|preComb|name|versionId|intToLong|
  55. +-----+-----------+-------+----+---------+---------+
  56. |row_3| part_0| 0| tom| v_0| 0|
  57. |row_2| part_0| 0|john| v_0| 0|
  58. |row_1| part_0| 0| bob| v_0| 0|
  59. +-----+-----------+-------+----+---------+---------+
  60. // In the new schema, we are going to add a String field and
  61. // change the datatype `intToLong` field from int to long.
  62. scala> :paste
  63. val newSchema = StructType( Array(
  64. StructField("rowId", StringType,true),
  65. StructField("partitionId", StringType,true),
  66. StructField("preComb", LongType,true),
  67. StructField("name", StringType,true),
  68. StructField("versionId", StringType,true),
  69. StructField("intToLong", LongType,true),
  70. StructField("newField", StringType,true)
  71. ))
  72. val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
  73. Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
  74. Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
  75. var dfFromData2 = spark.createDataFrame(data2, newSchema)
  76. dfFromData2.write.format("hudi").
  77. options(getQuickstartWriteConfigs).
  78. option("hoodie.datasource.write.precombine.field", "preComb").
  79. option("hoodie.datasource.write.recordkey.field", "rowId").
  80. option("hoodie.datasource.write.partitionpath.field", "partitionId").
  81. option("hoodie.index.type","SIMPLE").
  82. option("hoodie.table.name", tableName).
  83. mode(Append).
  84. save(basePath)
  85. var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
  86. tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")
  87. Ctrl + D
  88. scala> spark.sql("desc hudi_trips_snapshot").show()
  89. +--------------------+---------+-------+
  90. | col_name|data_type|comment|
  91. +--------------------+---------+-------+
  92. | _hoodie_commit_time| string| null|
  93. |_hoodie_commit_seqno| string| null|
  94. | _hoodie_record_key| string| null|
  95. |_hoodie_partition...| string| null|
  96. | _hoodie_file_name| string| null|
  97. | rowId| string| null|
  98. | partitionId| string| null|
  99. | preComb| bigint| null|
  100. | name| string| null|
  101. | versionId| string| null|
  102. | intToLong| bigint| null|
  103. | newField| string| null|
  104. +--------------------+---------+-------+
  105. scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
  106. +-----+-----------+-------+-------+---------+---------+----------+
  107. |rowId|partitionId|preComb| name|versionId|intToLong| newField|
  108. +-----+-----------+-------+-------+---------+---------+----------+
  109. |row_3| part_0| 0| tom| v_0| 0| null|
  110. |row_2| part_0| 5| john| v_3| 3|newField_1|
  111. |row_1| part_0| 0| bob| v_0| 0| null|
  112. |row_5| part_0| 5| maroon| v_2| 2|newField_1|
  113. |row_9| part_0| 5|michael| v_2| 2|newField_1|
  114. +-----+-----------+-------+-------+---------+---------+----------+

Videos