Apache HBase is a highly reliable, high-performance, column-oriented, and scalable distributed storage system. Using HBase technology, large-scale structured storage clusters can be built on cheap PC Servers. Unlike general relational databases, Apache HBase is a database suitable for unstructured data storage because HBase storage is based on a column rather than a row-based schema.

Apache Flink does not officially provide a connector for HBase DataStream. Apache StreamPark encapsulates HBaseSource and HBaseSink based on HBase-client. It supports automatic connection creation based on configuration and simplifies development. StreamPark reading Apache HBase can record the latest status of the read data when the checkpoint is enabled, and the offset corresponding to the source can be restored through the data itself. Implement source-side AT_LEAST_ONCE.

HBaseSource implements Flink Async I/O to improve streaming throughput. The sink side supports AT_LEAST_ONCE by default. EXACTLY_ONCE is supported when checkpointing is enabled.

StreamPark reading Apache HBase can record the latest state of the read data when checkpoint is enabled. Whether the previous state can be restored after the job is resumed depends entirely on whether the data itself has an offset identifier, which needs to be manually specified in the code. The recovery logic needs to be specified in the func parameter of the getDataStream method of HBaseSource.

Dependency of Apache HBase writing

Apache HBase Maven Dependency:

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>${hbase.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-common</artifactId>
  9. <version>${hbase.version}</version>
  10. </dependency>

Regular way to write and read Apache HBase

1.Create database and table

  1. create 'Student', {NAME => 'Stulnfo', VERSIONS => 3}, {NAME =>'Grades', BLOCKCACHE => true}

2.Write demo and Read demo

read data

  1. import org.apache.flink.configuration.Configuration;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  5. import org.apache.hadoop.hbase.Cell;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.TableName;
  8. import org.apache.hadoop.hbase.client.*;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import java.util.List;
  11. public class FlinkHBaseReader {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. // 添加数据源
  15. DataStream<String> stream = env.addSource(new HBaseReader());
  16. stream.print();
  17. env.execute("FlinkHBaseDemo");
  18. }
  19. }
  20. class HBaseReader extends RichSourceFunction<String> {
  21. private Connection connection = null;
  22. private ResultScanner rs = null;
  23. private Table table = null;
  24. @Override
  25. public void open(Configuration parameters) throws Exception {
  26. org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
  27. hconf.set("hbase.zookeeper.quorum", "localhost:2181");
  28. hconf.set("zookeeper.property.clientPort", "/hbase");
  29. connection = ConnectionFactory.createConnection(hconf);
  30. }
  31. @Override
  32. public void run(SourceContext<String> sourceContext) throws Exception {
  33. table = connection.getTable(TableName.valueOf("Student"));
  34. Scan scan = new Scan();
  35. scan.addFamily(Bytes.toBytes("Stulnfo"));
  36. rs = table.getScanner(scan);
  37. for (Result result : rs) {
  38. StringBuilder sb = new StringBuilder();
  39. List<Cell> cells = result.listCells();
  40. for (Cell cell : cells) {
  41. String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  42. sb.append(value).append("-");
  43. }
  44. String value = sb.replace(sb.length() - 1, sb.length(), "").toString();
  45. sourceContext.collect(value);
  46. }
  47. }
  48. @Override
  49. public void cancel() {
  50. }
  51. @Override
  52. public void close() throws Exception {
  53. if (rs != null) {
  54. rs.close();
  55. }
  56. if (table != null) {
  57. table.close();
  58. }
  59. if (connection != null) {
  60. connection.close();
  61. }
  62. }
  63. }

data input

  1. import com.zhisheng.common.utils.ExecutionEnvUtil;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.flink.api.java.utils.ParameterTool;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  8. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  9. import org.apache.hadoop.hbase.HBaseConfiguration;
  10. import org.apache.hadoop.hbase.TableName;
  11. import org.apache.hadoop.hbase.client.*;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import java.io.IOException;
  15. /**
  16. * Desc: Read stream data, then write to Apache HBase
  17. */
  18. @Slf4j
  19. public class HBaseStreamWriteMain {
  20. public static void main(String[] args) throws Exception {
  21. final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
  22. StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
  23. DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
  24. private static final long serialVersionUID = 1L;
  25. private volatile boolean isRunning = true;
  26. @Override
  27. public void run(SourceContext<String> out) throws Exception {
  28. while (isRunning) {
  29. out.collect("name" + Math.floor(Math.random() * 100));
  30. }
  31. }
  32. @Override
  33. public void cancel() {
  34. isRunning = false;
  35. }
  36. });
  37. dataStream.addSink(new HBaseWriter());
  38. env.execute("Flink HBase connector sink");
  39. }
  40. }
  41. /**
  42. Write to Apache HBase
  43. Inherit RichSinkFunction to override the parent class method
  44. <p>
  45. When writing to Apache HBase, 500 items are flushed once, inserted in batches, using writeBufferSize
  46. */
  47. class HBaseWriter extends RichSinkFunction<String> {
  48. private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);
  49. private static org.apache.hadoop.conf.Configuration configuration;
  50. private static Connection connection = null;
  51. private static BufferedMutator mutator;
  52. private static int count = 0;
  53. @Override
  54. public void open(Configuration parameters) throws Exception {
  55. configuration = HBaseConfiguration.create();
  56. configuration.set("hbase.zookeeper.quorum", "localhost:21981");
  57. configuration.set("zookeeper.property.clientPort", "/hbase");
  58. try {
  59. connection = ConnectionFactory.createConnection(configuration);
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }
  63. BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("Student"));
  64. params.writeBufferSize(2 * 1024 * 1024);
  65. mutator = connection.getBufferedMutator(params);
  66. }
  67. @Override
  68. public void close() throws IOException {
  69. if (mutator != null) {
  70. mutator.close();
  71. }
  72. if (connection != null) {
  73. connection.close();
  74. }
  75. }
  76. @Override
  77. public void invoke(String values, Context context) throws Exception {
  78. //Date 1970-01-06 11:45:55 to 445555000
  79. long unixTimestamp = 0;
  80. String RowKey = String.valueOf(unixTimestamp);
  81. Put put = new Put(RowKey.getBytes());
  82. put.addColumn("Stulnfo".getBytes(), "Name".getBytes(), values.getBytes());
  83. mutator.mutate(put);
  84. //每满500条刷新一下数据
  85. if (count >= 500) {
  86. mutator.flush();
  87. count = 0;
  88. }
  89. count = count + 1;
  90. }
  91. }

Reading and writing Apache HBase in this way is cumbersome and inconvenient. StreamPark follows the concept of convention over configuration and automatic configuration. Users only need to configure Apache HBase connection parameters and Flink operating parameters. StreamPark will automatically assemble source and sink, which greatly simplifies development logic and improves development efficiency and maintainability.

write and read Apache HBase with Apache StreamPark™

1. Configure policies and connection information

  1. # apache hbase
  2. hbase:
  3. zookeeper.quorum: test1,test2,test6
  4. zookeeper.property.clientPort: 2181
  5. zookeeper.session.timeout: 1200000
  6. rpc.timeout: 5000
  7. client.pause: 20

2. Read and write Apache HBase

Writing to Apache HBase with StreamPark is very simple, the code is as follows:

read Apache HBase

  1. import org.apache.streampark.common.util.ConfigUtils
  2. import org.apache.streampark.flink.core.java.wrapper.HBaseQuery
  3. import org.apache.streampark.flink.core.scala.FlinkStreaming
  4. import org.apache.streampark.flink.core.scala.request.HBaseRequest
  5. import org.apache.streampark.flink.core.scala.source.HBaseSource
  6. import org.apache.flink.api.scala.createTypeInformation
  7. import org.apache.hadoop.hbase.CellUtil
  8. import org.apache.hadoop.hbase.client.{Get, Scan}
  9. import org.apache.hadoop.hbase.util.Bytes
  10. import java.util
  11. object HBaseSourceApp extends FlinkStreaming {
  12. override def handle(): Unit = {
  13. implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
  14. val id = HBaseSource().getDataStream[String](query => {
  15. new HBaseQuery("person", new Scan())
  16. },
  17. //The following methods determine the logic for restoring offsets from checkpoints
  18. r => new String(r.getRow), null)
  19. //flink Async I/O
  20. HBaseRequest(id).requestOrdered(x => {
  21. new HBaseQuery("person", new Get(x.getBytes()))
  22. }, (a, r) => {
  23. val map = new util.HashMap[String, String]()
  24. val cellScanner = r.cellScanner()
  25. while (cellScanner.advance()) {
  26. val cell = cellScanner.current()
  27. val q = Bytes.toString(CellUtil.cloneQualifier(cell))
  28. val (name, v) = q.split("_") match {
  29. case Array(_type, name) =>
  30. _type match {
  31. case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
  32. case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
  33. case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
  34. case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
  35. }
  36. case _ =>
  37. }
  38. map.put(name.toString, v.toString)
  39. }
  40. map.toString
  41. }).print("Async")
  42. }
  43. }

write Apache HBase

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.{HBaseOutputFormat, HBaseSink}
  3. import org.apache.flink.api.scala._
  4. import org.apache.streampark.common.util.ConfigUtils
  5. import org.apache.hadoop.hbase.client.{Mutation, Put}
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import java.util.{Collections, Random}
  8. object HBaseSinkApp extends FlinkStreaming {
  9. override def handle(): Unit = {
  10. val source = context.addSource(new TestSource)
  11. val random = new Random()
  12. //定义转换规则...
  13. implicit def entry2Put(entity: TestEntity): java.lang.Iterable[Mutation] = {
  14. val put = new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)), entity.timestamp)
  15. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), Bytes.toBytes(entity.cityId))
  16. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), Bytes.toBytes(entity.orderId))
  17. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), Bytes.toBytes(entity.orderStatus))
  18. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"), Bytes.toBytes(entity.quantity))
  19. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), Bytes.toBytes(entity.siteId))
  20. Collections.singleton(put)
  21. }
  22. //source ===> trans ===> sink
  23. //1)INSERT WAY 1
  24. HBaseSink().sink[TestEntity](source, "order")
  25. //2) 插入方式2
  26. //1.Specify the HBase configuration file
  27. implicit val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
  28. //2.break in...
  29. source.writeUsingOutputFormat(new HBaseOutputFormat[TestEntity]("order", entry2Put))
  30. }
  31. }

When StreamPark writes to Apache HBase, you need to create the method of HBaseQuery, specify the method to convert the query result into the required object, identify whether it is running, and pass in the running parameters. details as follows

  1. /**
  2. * @param ctx
  3. * @param property
  4. */
  5. class HBaseSource(@(transient@param) val ctx: StreamingContext, property: Properties = new Properties()) {
  6. /**
  7. * @param query Specify the method to create H Base Query
  8. * @param func The query results are converted into the expected counterparty method
  9. * @param running runID
  10. * @param prop Job parameters
  11. * @tparam R
  12. * @return
  13. */
  14. def getDataStream[R: TypeInformation](query: R => HBaseQuery,
  15. func: Result => R,
  16. running: Unit => Boolean)(implicit prop: Properties = new Properties()) = {
  17. Utils.copyProperties(property, prop)
  18. val hBaseFunc = new HBaseSourceFunction[R](prop, query, func, running)
  19. ctx.addSource(hBaseFunc)
  20. }
  21. }

StreamPark HBaseSource implements flink Async I/O, which is used to improve the throughput of Streaming: first create a DataStream, then create an HBaseRequest and call requestOrdered() or requestUnordered() to create an asynchronous stream, as follows:

  1. class HBaseRequest[T: TypeInformation](@(transient@param) private val stream: DataStream[T], property: Properties = new Properties()) {
  2. /**
  3. *
  4. * @param queryFunc
  5. * @param resultFunc
  6. * @param timeout
  7. * @param capacity
  8. * @param prop
  9. * @tparam R
  10. * @return
  11. */
  12. def requestOrdered[R: TypeInformation](queryFunc: T => HBaseQuery, resultFunc: (T, Result) => R, timeout: Long = 1000, capacity: Int = 10)(implicit prop: Properties): DataStream[R] = {
  13. Utils.copyProperties(property, prop)
  14. val async = new HBaseAsyncFunction[T, R](prop, queryFunc, resultFunc, capacity)
  15. AsyncDataStream.orderedWait(stream, async, timeout, TimeUnit.MILLISECONDS, capacity)
  16. }
  17. /**
  18. *
  19. * @param queryFunc
  20. * @param resultFunc
  21. * @param timeout
  22. * @param capacity
  23. * @param prop
  24. * @tparam R
  25. * @return
  26. */
  27. def requestUnordered[R: TypeInformation](queryFunc: T => HBaseQuery, resultFunc: (T, Result) => R, timeout: Long = 1000, capacity: Int = 10)(implicit prop: Properties): DataStream[R] = {
  28. Utils.copyProperties(property, prop)
  29. val async = new HBaseAsyncFunction[T, R](prop, queryFunc, resultFunc, capacity)
  30. AsyncDataStream.unorderedWait(stream, async, timeout, TimeUnit.MILLISECONDS, capacity)
  31. }
  32. }

StreamPark supports two ways to write data: 1. addSink() 2. writeUsingOutputFormat Examples are as follows:

  1. //1)Insert way 1
  2. HBaseSink().sink[TestEntity](source, "order")
  3. //2) insert way 2
  4. //1.Specify the HBase configuration file
  5. implicit val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
  6. //
  7. source.writeUsingOutputFormat(new HBaseOutputFormat[TestEntity]("order", entry2Put))

Other configuration

All other configurations must comply with the StreamPark configuration. For specific configurable items and the role of each parameter, please refer to the project configuration