Redis是一个开源内存数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets)与范围查询,bitmaps、hyperloglogs 和地理空间(geospatial) 索引半径查询。 Redis 内置了事务(transactions) 和不同级别的磁盘持久化(persistence),并通过 Redis哨兵(Sentinel)和自动分区(Cluster)提供高可用性(high availability)。

Apache Flink 官方未提供写入 Reids 数据的连接器。Apache StreamPark 基于 Flink Connector Redis 封装了 RedisSink、配置 redis 连接参数,即可自动创建 redis 连接简化开发。目前 RedisSink 支持连接方式有:单节点模式、哨兵模式,因集群模式不支持事务,目前未支持。

StreamPark 使用Redis的 MULTI 命令开启事务,EXEC 命令提交事务,细节见链接:http://www.redis.cn/topics/transactions.html。RedisSink 默认支持 AT_LEAST_ONCE 的处理语义,在开启 checkpoint 情况下支持 EXACTLY_ONCE 语义。

Redis 是 key-value 类型的数据库,AT_LEAST_ONCE 语义下 Flink 作业出现异常重启后最新的数据会覆盖上一版本数据,达到最终数据一致。如果有外部程序在重启期间读取了数据会有和最终数据不一致的风险。

EXACTLY_ONCE语义下会在flink作业checkpoint整体完成情况下批量写入redis,会有一个checkpoint时间间隔的延时。请根据业务场景选择合适语义。

Redis写入依赖

Flink Connector Redis 官方提供两种,以下两种 API 均相同,StreamPark 使用的是 org.apache.bahir 依赖:

  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.0</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-redis_2.10</artifactId>
  4. <version>1.1.5</version>
  5. </dependency>

常规方式写 Redis

常规方式下使用 Flink Connector Redis 写入数据的方式如下:

1.接入source

  1. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  2. import scala.util.Random;
  3. public class TestSource implements SourceFunction<TestEntity> {
  4. private boolean isRunning = true;
  5. private Random random = new Random();
  6. private int index = 0;
  7. @Override
  8. public void run(SourceContext<TestEntity> sourceContext) throws Exception {
  9. while (isRunning && index <= 1000001) {
  10. index += 1;
  11. long userId = System.currentTimeMillis();
  12. long orderId = random.nextInt(100);
  13. int status = random.nextInt(1);
  14. double price = random.nextDouble();
  15. int quantity = new Random().nextInt(10);
  16. TestEntity order = new TestEntity(userId, orderId, 1l, 1l, status, price, quantity, System.currentTimeMillis());
  17. sourceContext.collect(order);
  18. }
  19. }
  20. @Override
  21. public void cancel() {
  22. this.isRunning = false;
  23. }
  24. }
  25. class TestEntity {
  26. Long userId;
  27. Long orderId;
  28. Long siteId;
  29. Long cityId;
  30. Integer orderStatus;
  31. Double price;
  32. Integer quantity;
  33. Long timestamp;
  34. /**
  35. * @param userId : 用户Id
  36. * @param orderId : 订单ID
  37. * @param siteId : 站点ID
  38. * @param cityId : 城市Id
  39. * @param orderStatus : 订单状态(1:下单,0:退单)
  40. * @param price : 单价
  41. * @param quantity : 订单数量
  42. * @param timestamp : 下单时间
  43. */
  44. public TestEntity(Long userId, Long orderId, Long siteId, Long cityId, Integer orderStatus, Double price, Integer quantity, Long timestamp) {
  45. this.userId = userId;
  46. this.orderId = orderId;
  47. this.siteId = siteId;
  48. this.cityId = cityId;
  49. this.orderStatus = orderStatus;
  50. this.price = price;
  51. this.quantity = quantity;
  52. this.timestamp = timestamp;
  53. }
  54. }

2. 写入redis

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.redis.RedisSink;
  5. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  6. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  7. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  9. public class FlinkRedisSink {
  10. public static void main(String[] args) throws Exception {
  11. //1.获取执行环境
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. env.setParallelism(1);
  14. //2.读数据并转换为JavaBean
  15. DataStreamSource<TestEntity> source = env.addSource(new TestSource(), TypeInformation.of(TestEntity.class));
  16. //3.将数据写入Redis
  17. FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
  18. .setHost("localhost")
  19. .setPort(6379)
  20. .build();
  21. source.addSink(new RedisSink<>(jedisPoolConfig, new MyRedisMapper()));
  22. //4.执行任务
  23. env.execute();
  24. }
  25. public static class MyRedisMapper implements RedisMapper<TestEntity> {
  26. @Override
  27. public RedisCommandDescription getCommandDescription() {
  28. // 返回存在Redis中的数据类型 存储的是Hash, 第二个参数是外面的key
  29. return new RedisCommandDescription(RedisCommand.HSET, "");
  30. }
  31. @Override
  32. public String getKeyFromData(TestEntity data) {
  33. // 从数据中获取Key: Hash的Key
  34. return String.valueOf(data.userId());
  35. }
  36. @Override
  37. public String getValueFromData(TestEntity data) {
  38. // 从数据中获取Value: Hash的value
  39. return String.valueOf(data.price());
  40. }
  41. }
  42. }

以上创建FlinkJedisPoolConfig较繁琐,redis的每种操作都要构建RedisMapper,非常的不灵敏。StreamPark使用约定大于配置、自动配置的方式只需要配置redis 连接参数、flink运行参数,StreamPark 会自动组装source和sink,极大的简化开发逻辑,提升开发效率和维护性。

Apache StreamPark™ 写入 Redis

RedisSink 默认为AT_LEAST_ONCE (至少一次)的处理语义,在开启checkpoint情况下两阶段段提交支持EXACTLY_ONCE语义,可使用的连接类型: 单节点模式、哨兵模式。

1. 配置策略和连接信息

单节点配置

  1. #redis sink 配置
  2. redis.sink:
  3. host: 127.0.0.1 #必须参数
  4. #选填参数
  5. port: 6379
  6. database: 2
  7. password:
  8. connectType: jedisPool #可选参数:jedisPool(默认)|sentinel
  9. maxTotal:
  10. maxIdle:
  11. minIdle:
  12. connectionTimeout:

哨兵模式配置

  1. #redis sink 配置
  2. redis.sink:
  3. masterName: master 哨兵模式参数
  4. host: 192.168.0.1:6379, 192.168.0.3:6379 #必须参数,必须指定连接的port
  5. connectType: sentinel
  6. #选填参数
  7. soTimeout: 6379
  8. database: 2
  9. password:
  10. maxTotal:
  11. maxIdle:
  12. minIdle:
  13. connectionTimeout:

2. 写入Redis

用 StreamPark 写入redis非常简单,代码如下:

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.{RedisMapper, RedisSink}
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
  5. import org.json4s.DefaultFormats
  6. object FlinkRedisSinkApp extends FlinkStreaming {
  7. @transient
  8. implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
  9. override def handle(): Unit = {
  10. /**
  11. * 创造读取数据的源头
  12. */
  13. val source = context.addSource(new TestSource)
  14. // Redis sink..................
  15. //1)定义 RedisSink
  16. val sink: RedisSink = RedisSink()
  17. //2)写Mapper映射
  18. val personMapper: RedisMapper[TestEntity] = RedisMapper[TestEntity](RedisCommand.HSET, "flink_user", _.userId.toString, _.orderId.toString)
  19. sink.sink[TestEntity](source, personMapper, 60000000).setParallelism(1)
  20. }
  21. }
  22. /**
  23. * RedisMapper
  24. * @param cmd redis 写入命令
  25. * @param additionalKey 写入额外key,适用于 hset
  26. * @param key 写入key
  27. * @param value 写入value
  28. * @tparam T
  29. */
  30. case class RedisMapper[T](cmd: RedisCommand, additionalKey: String, key: T => String, value: T => String) extends RMapper[T] {
  31. override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(cmd, additionalKey)
  32. override def getKeyFromData(r: T): String = key(r)
  33. override def getValueFromData(r: T): String = value(r)
  34. }

如代码所示,StreamPark 会自动加载配置创建RedisSink,用户通过创建需要的RedisMapper对象即完成redis写入操作,additionalKey为hset时为最外层key其他写入命令无效。 RedisSink.sink()写入相应的key对应数据是需要指定过期时间,如果未指定默认过期时间为java Integer.MAX_VALUE (67年)。如代码所示:

  1. class RedisSink() extends Sink {
  2. def sink[T](stream: DataStream[T], mapper: RedisMapper[T], ttl: Int = Int.MaxValue): DataStreamSink[T] = {
  3. val sinkFun = (enableCheckpoint, cpMode) match {
  4. case (false, CheckpointingMode.EXACTLY_ONCE) => throw new IllegalArgumentException("redis sink EXACTLY_ONCE must enable checkpoint")
  5. case (true, CheckpointingMode.EXACTLY_ONCE) => new Redis2PCSinkFunction[T](config, mapper, ttl)
  6. case _ => new RedisSinkFunction[T](config, mapper, ttl)
  7. }
  8. val sink = stream.addSink(sinkFun)
  9. afterSink(sink, parallelism, name, uid)
  10. }
  11. }

支持的redis操作命令

支持redis操作命令如下:

  1. public enum RedisCommand {
  2. /**
  3. * Insert the specified value at the head of the list stored at key.
  4. * If key does not exist, it is created as empty list before performing the push operations.
  5. */
  6. LPUSH(RedisDataType.LIST),
  7. /**
  8. * Insert the specified value at the tail of the list stored at key.
  9. * If key does not exist, it is created as empty list before performing the push operation.
  10. */
  11. RPUSH(RedisDataType.LIST),
  12. /**
  13. * Add the specified member to the set stored at key.
  14. * Specified member that is already a member of this set is ignored.
  15. */
  16. SADD(RedisDataType.SET),
  17. /**
  18. * Set key to hold the string value. If key already holds a value,
  19. * it is overwritten, regardless of its type.
  20. */
  21. SET(RedisDataType.STRING),
  22. /**
  23. * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
  24. */
  25. PFADD(RedisDataType.HYPER_LOG_LOG),
  26. /**
  27. * Posts a message to the given channel.
  28. */
  29. PUBLISH(RedisDataType.PUBSUB),
  30. /**
  31. * Adds the specified members with the specified score to the sorted set stored at key.
  32. */
  33. ZADD(RedisDataType.SORTED_SET),
  34. /**
  35. * Removes the specified members from the sorted set stored at key.
  36. */
  37. ZREM(RedisDataType.SORTED_SET),
  38. /**
  39. * Sets field in the hash stored at key to value. If key does not exist,
  40. * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
  41. */
  42. HSET(RedisDataType.HASH);
  43. }

RedisSink 目前支持单节点模式、哨兵模式连接,集群模式不支持事务,StreamPark 目前为支持,如有使用场景,请调用Flink Connector Redis官方api。

EXACTLY_ONCE语义下必须开启checkpoint,否则程序会抛出参数异常。

EXACTLY_ONCE语义下checkpoint的数据sink缓存在内存里面,需要根据实际数据合理设置checkpoint时间间隔,否则有oom的风险。

其他配置

其他的所有的配置都必须遵守 StreamPark 配置,具体可配置项和各个参数的作用请参考项目配置