Spark Streaming

You can write Hudi tables using spark’s structured streaming.

Scala

  1. // spark-shell
  2. // prepare to stream write to new table
  3. import org.apache.spark.sql.streaming.Trigger
  4. val streamingTableName = "hudi_trips_cow_streaming"
  5. val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
  6. val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
  7. // create streaming df
  8. val df = spark.readStream.
  9. format("hudi").
  10. load(basePath)
  11. // write stream to new hudi table
  12. df.writeStream.format("hudi").
  13. options(getQuickstartWriteConfigs).
  14. option("hoodie.datasource.write.precombine.field", "ts").
  15. option("hoodie.datasource.write.recordkey.field", "uuid").
  16. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  17. option("hoodie.table.name", streamingTableName).
  18. outputMode("append").
  19. option("path", baseStreamingPath).
  20. option("checkpointLocation", checkpointLocation).
  21. trigger(Trigger.Once()).
  22. start()

Python

  1. # pyspark
  2. # prepare to stream write to new table
  3. streamingTableName = "hudi_trips_cow_streaming"
  4. baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
  5. checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
  6. hudi_streaming_options = {
  7. 'hoodie.table.name': streamingTableName,
  8. 'hoodie.datasource.write.recordkey.field': 'uuid',
  9. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  10. 'hoodie.datasource.write.table.name': streamingTableName,
  11. 'hoodie.datasource.write.operation': 'upsert',
  12. 'hoodie.datasource.write.precombine.field': 'ts',
  13. 'hoodie.upsert.shuffle.parallelism': 2,
  14. 'hoodie.insert.shuffle.parallelism': 2
  15. }
  16. # create streaming df
  17. df = spark.readStream \
  18. .format("hudi") \
  19. .load(basePath)
  20. # write stream to new hudi table
  21. df.writeStream.format("hudi") \
  22. .options(**hudi_streaming_options) \
  23. .outputMode("append") \
  24. .option("path", baseStreamingPath) \
  25. .option("checkpointLocation", checkpointLocation) \
  26. .trigger(once=True) \
  27. .start()