Spark Streaming

Structured Streaming reads are based on Hudi’s Incremental Query feature, therefore streaming read can return data for which commits and base files were not yet removed by the cleaner. You can control commits retention time.

Scala

  1. // spark-shell
  2. // reload data
  3. df.write.format("hudi").
  4. options(getQuickstartWriteConfigs).
  5. option("hoodie.datasource.write.precombine.field", "ts").
  6. option("hoodie.datasource.write.recordkey.field", "uuid").
  7. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  8. option("hoodie.table.name", tableName).
  9. mode(Overwrite).
  10. save(basePath)
  11. // read stream and output results to console
  12. spark.readStream.
  13. format("hudi").
  14. load(basePath).
  15. writeStream.
  16. format("console").
  17. start()
  18. // read stream to streaming df
  19. val df = spark.readStream.
  20. format("hudi").
  21. load(basePath)

Python

  1. # pyspark
  2. # reload data
  3. inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
  4. dataGen.generateInserts(10))
  5. df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  6. hudi_options = {
  7. 'hoodie.table.name': tableName,
  8. 'hoodie.datasource.write.recordkey.field': 'uuid',
  9. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  10. 'hoodie.datasource.write.table.name': tableName,
  11. 'hoodie.datasource.write.operation': 'upsert',
  12. 'hoodie.datasource.write.precombine.field': 'ts',
  13. 'hoodie.upsert.shuffle.parallelism': 2,
  14. 'hoodie.insert.shuffle.parallelism': 2
  15. }
  16. df.write.format("hudi"). \
  17. options(**hudi_options). \
  18. mode("overwrite"). \
  19. save(basePath)
  20. # read stream to streaming df
  21. df = spark.readStream \
  22. .format("hudi") \
  23. .load(basePath)
  24. # ead stream and output results to console
  25. spark.readStream \
  26. .format("hudi") \
  27. .load(basePath) \
  28. .writeStream \
  29. .format("console") \
  30. .start()

Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE and MERGE INTO. Target table must exist before write.