Spark Streaming
You can write Hudi tables using spark’s structured streaming.
Scala
// spark-shell// prepare to stream write to new tableimport org.apache.spark.sql.streaming.Triggerval streamingTableName = "hudi_trips_cow_streaming"val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"// create streaming dfval df = spark.readStream.format("hudi").load(basePath)// write stream to new hudi tabledf.writeStream.format("hudi").options(getQuickstartWriteConfigs).option("hoodie.datasource.write.precombine.field", "ts").option("hoodie.datasource.write.recordkey.field", "uuid").option("hoodie.datasource.write.partitionpath.field", "partitionpath").option("hoodie.table.name", streamingTableName).outputMode("append").option("path", baseStreamingPath).option("checkpointLocation", checkpointLocation).trigger(Trigger.Once()).start()
Python
# pyspark# prepare to stream write to new tablestreamingTableName = "hudi_trips_cow_streaming"baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"hudi_streaming_options = {'hoodie.table.name': streamingTableName,'hoodie.datasource.write.recordkey.field': 'uuid','hoodie.datasource.write.partitionpath.field': 'partitionpath','hoodie.datasource.write.table.name': streamingTableName,'hoodie.datasource.write.operation': 'upsert','hoodie.datasource.write.precombine.field': 'ts','hoodie.upsert.shuffle.parallelism': 2,'hoodie.insert.shuffle.parallelism': 2}# create streaming dfdf = spark.readStream \.format("hudi") \.load(basePath)# write stream to new hudi tabledf.writeStream.format("hudi") \.options(**hudi_streaming_options) \.outputMode("append") \.option("path", baseStreamingPath) \.option("checkpointLocation", checkpointLocation) \.trigger(once=True) \.start()
