Apache HBase 是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用 HBase 技术可在廉价服务器上搭建起大规模结构化存储集群。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库,HBase 基于列的而不是基于行的模式。

Apache Flink 官方未提供 HBase DataStream 的连接器。Apache StreamPark 基于 HBase client 封装了 HBaseSource、HBaseSink,支持依据配置自动创建连接,简化开发。StreamPark 读取 HBase 在开启 chekpoint 情况下可以记录读取数据的最新状态,通过数据本身标识可以恢复 source 对应偏移量。实现 source 端至少一次语义。

HBaseSource 实现了 Flink 的 Async I/O 接口,可以提升流处理的吞吐量。Sink 端默认支持至少一次的处理语义。在开启 checkpoint 情况下支持精确一次语义。

StreamPark 读取 HBase 在开启 chekpoint 情况下可以记录读取数据的最新状态,作业恢复后从是否可以恢复之前状态完全取决于数据本身是否有偏移量的标识,需要在代码手动指定。在 HBaseSource 的 getDataStream 方法 func 参数指定恢复逻辑。

HBase写入依赖

HBase Maven 依赖:

  1. <dependency>
  2. <groupId>org.apache.hbase</groupId>
  3. <artifactId>hbase-client</artifactId>
  4. <version>${hbase.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.hbase</groupId>
  8. <artifactId>hbase-common</artifactId>
  9. <version>${hbase.version}</version>
  10. </dependency>

常规方式写入读取HBase

1.创建库表

  1. create 'Student', {NAME => 'Stulnfo', VERSIONS => 3}, {NAME =>'Grades', BLOCKCACHE => true}

2.写入读取demo

读取数据

  1. import org.apache.flink.configuration.Configuration;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  5. import org.apache.hadoop.hbase.Cell;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.TableName;
  8. import org.apache.hadoop.hbase.client.*;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import java.util.List;
  11. public class FlinkHBaseReader {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. // 添加数据源
  15. DataStream<String> stream = env.addSource(new HBaseReader());
  16. stream.print();
  17. env.execute("FlinkHBaseDemo");
  18. }
  19. }
  20. class HBaseReader extends RichSourceFunction<String> {
  21. private Connection connection = null;
  22. private ResultScanner rs = null;
  23. private Table table = null;
  24. @Override
  25. public void open(Configuration parameters) throws Exception {
  26. org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
  27. hconf.set("hbase.zookeeper.quorum", "localhost:2181");
  28. hconf.set("zookeeper.property.clientPort", "/hbase");
  29. connection = ConnectionFactory.createConnection(hconf);
  30. }
  31. @Override
  32. public void run(SourceContext<String> sourceContext) throws Exception {
  33. table = connection.getTable(TableName.valueOf("Student"));
  34. Scan scan = new Scan();
  35. scan.addFamily(Bytes.toBytes("Stulnfo"));
  36. rs = table.getScanner(scan);
  37. for (Result result : rs) {
  38. StringBuilder sb = new StringBuilder();
  39. List<Cell> cells = result.listCells();
  40. for (Cell cell : cells) {
  41. String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  42. sb.append(value).append("-");
  43. }
  44. String value = sb.replace(sb.length() - 1, sb.length(), "").toString();
  45. sourceContext.collect(value);
  46. }
  47. }
  48. @Override
  49. public void cancel() {
  50. }
  51. @Override
  52. public void close() throws Exception {
  53. if (rs != null) {
  54. rs.close();
  55. }
  56. if (table != null) {
  57. table.close();
  58. }
  59. if (connection != null) {
  60. connection.close();
  61. }
  62. }
  63. }

写入数据

  1. import com.zhisheng.common.utils.ExecutionEnvUtil;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.flink.api.java.utils.ParameterTool;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  8. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  9. import org.apache.hadoop.hbase.HBaseConfiguration;
  10. import org.apache.hadoop.hbase.TableName;
  11. import org.apache.hadoop.hbase.client.*;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import java.io.IOException;
  15. /**
  16. * Desc: 读取流数据,然后写入到 HBase
  17. */
  18. @Slf4j
  19. public class HBaseStreamWriteMain {
  20. public static void main(String[] args) throws Exception {
  21. final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
  22. StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
  23. DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
  24. private static final long serialVersionUID = 1L;
  25. private volatile boolean isRunning = true;
  26. @Override
  27. public void run(SourceContext<String> out) throws Exception {
  28. while (isRunning) {
  29. out.collect("name" + Math.floor(Math.random() * 100));
  30. }
  31. }
  32. @Override
  33. public void cancel() {
  34. isRunning = false;
  35. }
  36. });
  37. dataStream.addSink(new HBaseWriter());
  38. env.execute("Flink HBase connector sink");
  39. }
  40. }
  41. /**
  42. * 写入HBase
  43. * 继承RichSinkFunction重写父类方法
  44. * <p>
  45. * 写入hbase时500条flush一次, 批量插入, 使用的是writeBufferSize
  46. */
  47. class HBaseWriter extends RichSinkFunction<String> {
  48. private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);
  49. private static org.apache.hadoop.conf.Configuration configuration;
  50. private static Connection connection = null;
  51. private static BufferedMutator mutator;
  52. private static int count = 0;
  53. @Override
  54. public void open(Configuration parameters) throws Exception {
  55. configuration = HBaseConfiguration.create();
  56. configuration.set("hbase.zookeeper.quorum", "localhost:21981");
  57. configuration.set("zookeeper.property.clientPort", "/hbase");
  58. try {
  59. connection = ConnectionFactory.createConnection(configuration);
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }
  63. BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("Student"));
  64. params.writeBufferSize(2 * 1024 * 1024);
  65. mutator = connection.getBufferedMutator(params);
  66. }
  67. @Override
  68. public void close() throws IOException {
  69. if (mutator != null) {
  70. mutator.close();
  71. }
  72. if (connection != null) {
  73. connection.close();
  74. }
  75. }
  76. @Override
  77. public void invoke(String values, Context context) throws Exception {
  78. //Date 1970-01-06 11:45:55 to 445555000
  79. long unixTimestamp = 0;
  80. String RowKey = String.valueOf(unixTimestamp);
  81. Put put = new Put(RowKey.getBytes());
  82. put.addColumn("Stulnfo".getBytes(), "Name".getBytes(), values.getBytes());
  83. mutator.mutate(put);
  84. //每满500条刷新一下数据
  85. if (count >= 500) {
  86. mutator.flush();
  87. count = 0;
  88. }
  89. count = count + 1;
  90. }
  91. }

以方式读写HBase较繁琐,非常的不灵敏。StreamPark使用约定大于配置、自动配置的方式只需要配置HBase连接参数、flink运行参数,StreamPark 会自动组装source和sink,极大的简化开发逻辑,提升开发效率和维护性。

Apache StreamPark™ 读写 HBase

1. 配置策略和连接信息

  1. # hbase
  2. hbase:
  3. zookeeper.quorum: test1,test2,test6
  4. zookeeper.property.clientPort: 2181
  5. zookeeper.session.timeout: 1200000
  6. rpc.timeout: 5000
  7. client.pause: 20

2. 读写入HBase

用 StreamPark 写入HBase非常简单,代码如下:

读取HBase

  1. import org.apache.streampark.common.util.ConfigUtils
  2. import org.apache.streampark.flink.core.java.wrapper.HBaseQuery
  3. import org.apache.streampark.flink.core.scala.FlinkStreaming
  4. import org.apache.streampark.flink.core.scala.request.HBaseRequest
  5. import org.apache.streampark.flink.core.scala.source.HBaseSource
  6. import org.apache.flink.api.scala.createTypeInformation
  7. import org.apache.hadoop.hbase.CellUtil
  8. import org.apache.hadoop.hbase.client.{Get, Scan}
  9. import org.apache.hadoop.hbase.util.Bytes
  10. import java.util
  11. object HBaseSourceApp extends FlinkStreaming {
  12. override def handle(): Unit = {
  13. implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
  14. val id = HBaseSource().getDataStream[String](query => {
  15. new HBaseQuery("person", new Scan())
  16. },
  17. //以下方法决定从checkpoint恢复偏移量的逻辑
  18. r => new String(r.getRow), null)
  19. //flink Async I/O
  20. HBaseRequest(id).requestOrdered(x => {
  21. new HBaseQuery("person", new Get(x.getBytes()))
  22. }, (a, r) => {
  23. val map = new util.HashMap[String, String]()
  24. val cellScanner = r.cellScanner()
  25. while (cellScanner.advance()) {
  26. val cell = cellScanner.current()
  27. val q = Bytes.toString(CellUtil.cloneQualifier(cell))
  28. val (name, v) = q.split("_") match {
  29. case Array(_type, name) =>
  30. _type match {
  31. case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
  32. case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
  33. case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
  34. case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
  35. }
  36. case _ =>
  37. }
  38. map.put(name.toString, v.toString)
  39. }
  40. map.toString
  41. }).print("Async")
  42. }
  43. }

写入HBase

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.{HBaseOutputFormat, HBaseSink}
  3. import org.apache.flink.api.scala._
  4. import org.apache.streampark.common.util.ConfigUtils
  5. import org.apache.hadoop.hbase.client.{Mutation, Put}
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import java.util.{Collections, Random}
  8. object HBaseSinkApp extends FlinkStreaming {
  9. override def handle(): Unit = {
  10. val source = context.addSource(new TestSource)
  11. val random = new Random()
  12. //定义转换规则...
  13. implicit def entry2Put(entity: TestEntity): java.lang.Iterable[Mutation] = {
  14. val put = new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)), entity.timestamp)
  15. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), Bytes.toBytes(entity.cityId))
  16. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), Bytes.toBytes(entity.orderId))
  17. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), Bytes.toBytes(entity.orderStatus))
  18. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"), Bytes.toBytes(entity.quantity))
  19. put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), Bytes.toBytes(entity.siteId))
  20. Collections.singleton(put)
  21. }
  22. //source ===> trans ===> sink
  23. //1)插入方式1
  24. HBaseSink().sink[TestEntity](source, "order")
  25. //2) 插入方式2
  26. //1.指定HBase 配置文件
  27. implicit val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
  28. //2.插入...
  29. source.writeUsingOutputFormat(new HBaseOutputFormat[TestEntity]("order", entry2Put))
  30. }
  31. }

StreamPark 写入HBase 需要创建HBaseQuery的方法、指定将查询结果转化为需要对象的方法、标识是否在运行、传入运行参数。具体如下:

  1. /**
  2. * @param ctx
  3. * @param property
  4. */
  5. class HBaseSource(@(transient@param) val ctx: StreamingContext, property: Properties = new Properties()) {
  6. /**
  7. * @param query 指定创建HBaseQuery的方法
  8. * @param func 查询结果转化为期望对方方法
  9. * @param running 运行标识
  10. * @param prop 作业参数
  11. * @tparam R 返回类型
  12. * @return
  13. */
  14. def getDataStream[R: TypeInformation](query: R => HBaseQuery,
  15. func: Result => R,
  16. running: Unit => Boolean)(implicit prop: Properties = new Properties()) = {
  17. Utils.copyProperties(property, prop)
  18. val hBaseFunc = new HBaseSourceFunction[R](prop, query, func, running)
  19. ctx.addSource(hBaseFunc)
  20. }
  21. }

StreamPark HBaseSource 实现了flink Async I/O 用于提升Streaming的吞吐量,先创建 DataStream 然后创建 HBaseRequest 调用 requestOrdered() 或者 requestUnordered() 创建异步流,建如下代码:

  1. class HBaseRequest[T: TypeInformation](@(transient@param) private val stream: DataStream[T], property: Properties = new Properties()) {
  2. /**
  3. *
  4. * @param queryFunc
  5. * @param resultFunc
  6. * @param timeout
  7. * @param capacity
  8. * @param prop
  9. * @tparam R
  10. * @return
  11. */
  12. def requestOrdered[R: TypeInformation](queryFunc: T => HBaseQuery, resultFunc: (T, Result) => R, timeout: Long = 1000, capacity: Int = 10)(implicit prop: Properties): DataStream[R] = {
  13. Utils.copyProperties(property, prop)
  14. val async = new HBaseAsyncFunction[T, R](prop, queryFunc, resultFunc, capacity)
  15. AsyncDataStream.orderedWait(stream, async, timeout, TimeUnit.MILLISECONDS, capacity)
  16. }
  17. /**
  18. *
  19. * @param queryFunc
  20. * @param resultFunc
  21. * @param timeout
  22. * @param capacity
  23. * @param prop
  24. * @tparam R
  25. * @return
  26. */
  27. def requestUnordered[R: TypeInformation](queryFunc: T => HBaseQuery, resultFunc: (T, Result) => R, timeout: Long = 1000, capacity: Int = 10)(implicit prop: Properties): DataStream[R] = {
  28. Utils.copyProperties(property, prop)
  29. val async = new HBaseAsyncFunction[T, R](prop, queryFunc, resultFunc, capacity)
  30. AsyncDataStream.unorderedWait(stream, async, timeout, TimeUnit.MILLISECONDS, capacity)
  31. }
  32. }

StreamPark 支持两种方式写入数据:1.addSink() 2. writeUsingOutputFormat 样例如下:

  1. //1)插入方式1
  2. HBaseSink().sink[TestEntity](source, "order")
  3. //2) 插入方式2
  4. //1.指定HBase 配置文件
  5. implicit val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
  6. //2.插入...
  7. source.writeUsingOutputFormat(new HBaseOutputFormat[TestEntity]("order", entry2Put))

其他配置

其他的所有的配置都必须遵守 StreamPark 配置,具体可配置项和各个参数的作用请参考项目配置