Redis is an open source in-memory data structure storage system that can be used as a database, cache, and messaging middleware. It supports many types of data structures such as strings, hashes, lists, sets, ordered sets and range queries, bitmaps, hyperlogloglogs and geospatial index radius queries. Redis has built-in transactions and various levels of disk persistence, and provides high availability through Redis Sentinel and Cluster.

Apache Flink does not officially provide a connector for writing reids data. Apache StreamPark is based on Flink Connector Redis.

It encapsulates RedisSink, configures redis connection parameters, and automatically creates redis connections to simplify development. Currently, RedisSink supports the following connection methods: single-node mode, sentinel mode, and cluster mode because it does not support transactions.

StreamPark uses Redis’ MULTI command to open a transaction and the EXEC command to commit a transaction, see the link for details: http://www.redis.cn/topics/transactions.html, using RedisSink supports AT_LEAST_ONCE processing semantics by default. EXACTLY_ONCE semantics are supported with checkpoint enabled.

Redis is a key-value database, AT_LEAST_ONCE semantics flink job with abnormal restart the latest data will overwrite the previous version of data to achieve the final data consistency. If an external program reads the data during the restart, there is a risk of inconsistency with the final data.

EXACTLY_ONCE semantics will write to redis in batch when the flink job checkpoint is completed as a whole, and there will be a delay of checkpoint interval. Please choose the appropriate semantics according to the business scenario.

Redis Write Dependency

Flink Connector Redis officially provides two kinds, the following two api are the same, StreamPark is using org.apache.bahir dependency.

  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.0</version>
  5. </dependency>
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-redis_2.10</artifactId>
  4. <version>1.1.5</version>
  5. </dependency>

Writing Redis the Regular Way

The regular way of writing data using Flink Connector Redis is as follows:

1.Access to source

  1. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  2. import scala.util.Random;
  3. public class TestSource implements SourceFunction<TestEntity> {
  4. private boolean isRunning = true;
  5. private Random random = new Random();
  6. private int index = 0;
  7. @Override
  8. public void run(SourceContext<TestEntity> sourceContext) throws Exception {
  9. while (isRunning && index <= 1000001) {
  10. index += 1;
  11. long userId = System.currentTimeMillis();
  12. long orderId = random.nextInt(100);
  13. int status = random.nextInt(1);
  14. double price = random.nextDouble();
  15. int quantity = new Random().nextInt(10);
  16. TestEntity order = new TestEntity(userId, orderId, 1l, 1l, status, price, quantity, System.currentTimeMillis());
  17. sourceContext.collect(order);
  18. }
  19. }
  20. @Override
  21. public void cancel() {
  22. this.isRunning = false;
  23. }
  24. }
  25. class TestEntity {
  26. Long userId;
  27. Long orderId;
  28. Long siteId;
  29. Long cityId;
  30. Integer orderStatus;
  31. Double price;
  32. Integer quantity;
  33. Long timestamp;
  34. /**
  35. * @param userId : User ID
  36. * @param orderId : Order ID
  37. * @param siteId : Site ID
  38. * @param cityId : City ID
  39. * @param orderStatus : Order status(1:Place order,0:Return order)
  40. * @param price : Unit price
  41. * @param quantity : Number of orders
  42. * @param timestamp : Order time
  43. */
  44. public TestEntity(Long userId, Long orderId, Long siteId, Long cityId, Integer orderStatus, Double price, Integer quantity, Long timestamp) {
  45. this.userId = userId;
  46. this.orderId = orderId;
  47. this.siteId = siteId;
  48. this.cityId = cityId;
  49. this.orderStatus = orderStatus;
  50. this.price = price;
  51. this.quantity = quantity;
  52. this.timestamp = timestamp;
  53. }
  54. }

2. Write to redis

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.redis.RedisSink;
  5. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  6. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  7. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  9. public class FlinkRedisSink {
  10. public static void main(String[] args) throws Exception {
  11. //1.Get the execution environment
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. env.setParallelism(1);
  14. //2.Read data and convert to JavaBean
  15. DataStreamSource<TestEntity> source = env.addSource(new TestSource(), TypeInformation.of(TestEntity.class));
  16. //3.Write Data to Redis
  17. FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
  18. .setHost("localhost")
  19. .setPort(6379)
  20. .build();
  21. source.addSink(new RedisSink<>(jedisPoolConfig, new MyRedisMapper()));
  22. //4.Perform the task
  23. env.execute();
  24. }
  25. public static class MyRedisMapper implements RedisMapper<TestEntity> {
  26. @Override
  27. public RedisCommandDescription getCommandDescription() {
  28. // Returns the type of data that exists in Redis, stored as a hash, with the second parameter being the outside key
  29. return new RedisCommandDescription(RedisCommand.HSET, "");
  30. }
  31. @Override
  32. public String getKeyFromData(TestEntity data) {
  33. // Get Key from data: Key of Hash
  34. return String.valueOf(data.userId());
  35. }
  36. @Override
  37. public String getValueFromData(TestEntity data) {
  38. // Get Value from data: Value of Hash
  39. return String.valueOf(data.price());
  40. }
  41. }
  42. }

The above creation of FlinkJedisPoolConfig is tedious, and each operation of redis has to build RedisMapper, which is very insensitive. StreamPark uses a convention over configuration and automatic configuration. This only requires configuring redis StreamPark automatically assembles the source and sink parameters, which greatly simplifies the development logic and improves development efficiency and maintainability.

Apache StreamPark™ Writes to Redis

RedisSink defaults to AT_LEAST_ONCE (at least once) processing semantics, two-stage segment submission supports EXACTLY_ONCE semantics with checkpoint enabled, available connection types: single-node mode, sentinel mode.

1. Configure policy and connection information

Single-node configuration

  1. #redis sink configuration
  2. redis.sink:
  3. host: 127.0.0.1 #Required parameters
  4. #Optional parameters
  5. port: 6379
  6. database: 2
  7. password:
  8. connectType: jedisPool #Optional parameters: jedisPool(默认)|sentinel
  9. maxTotal:
  10. maxIdle:
  11. minIdle:
  12. connectionTimeout:

Sentinel mode configuration

  1. #redis sink configuration
  2. redis.sink:
  3. masterName: master # Sentinel mode parameters
  4. host: 192.168.0.1:6379, 192.168.0.3:6379 # Required parameter, must specify the port of the connection
  5. connectType: sentinel
  6. #Optional parameters
  7. soTimeout: 6379
  8. database: 2
  9. password:
  10. maxTotal:
  11. maxIdle:
  12. minIdle:
  13. connectionTimeout:

2. Write to Redis

Writing to redis with StreamPark is very simple, the code is as follows:

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.{RedisMapper, RedisSink}
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
  5. import org.json4s.DefaultFormats
  6. object FlinkRedisSinkApp extends FlinkStreaming {
  7. @transient
  8. implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
  9. override def handle(): Unit = {
  10. /**
  11. * Create the source of read data
  12. */
  13. val source = context.addSource(new TestSource)
  14. // Redis sink..................
  15. //1)Define RedisSink
  16. val sink: RedisSink = RedisSink()
  17. //2)Write Mapper's mapping
  18. val personMapper: RedisMapper[TestEntity] = RedisMapper[TestEntity](RedisCommand.HSET, "flink_user", _.userId.toString, _.orderId.toString)
  19. sink.sink[TestEntity](source, personMapper, 60000000).setParallelism(1)
  20. }
  21. }
  22. /**
  23. * RedisMapper
  24. * @param cmd redis -Write command
  25. * @param additionalKey -Write additional keys, applicable to hset
  26. * @param key -Write key
  27. * @param value -Write value
  28. * @tparam T
  29. */
  30. case class RedisMapper[T](cmd: RedisCommand, additionalKey: String, key: T => String, value: T => String) extends RMapper[T] {
  31. override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(cmd, additionalKey)
  32. override def getKeyFromData(r: T): String = key(r)
  33. override def getValueFromData(r: T): String = value(r)
  34. }

As the code shows, StreamPark automatically loads the configuration to create a RedisSink, and the user completes the redis write operation by creating the required RedisMapper object, additionalKey is the outermost key when hset is invalid for other write commands. RedisSink.sink() write the corresponding key corresponding to the data is required to specify the expiration time, if not specified default expiration time is java Integer.MAX_VALUE (67 years). As shown in the code.

  1. class RedisSink() extends Sink {
  2. def sink[T](stream: DataStream[T], mapper: RedisMapper[T], ttl: Int = Int.MaxValue): DataStreamSink[T] = {
  3. val sinkFun = (enableCheckpoint, cpMode) match {
  4. case (false, CheckpointingMode.EXACTLY_ONCE) => throw new IllegalArgumentException("redis sink EXACTLY_ONCE must enable checkpoint")
  5. case (true, CheckpointingMode.EXACTLY_ONCE) => new Redis2PCSinkFunction[T](config, mapper, ttl)
  6. case _ => new RedisSinkFunction[T](config, mapper, ttl)
  7. }
  8. val sink = stream.addSink(sinkFun)
  9. afterSink(sink, parallelism, name, uid)
  10. }
  11. }

Supported redis operating commands

The following commands are supported for redis operations:

  1. public enum RedisCommand {
  2. /**
  3. * Insert the specified value at the head of the list stored at key.
  4. * If key does not exist, it is created as empty list before performing the push operations.
  5. */
  6. LPUSH(RedisDataType.LIST),
  7. /**
  8. * Insert the specified value at the tail of the list stored at key.
  9. * If key does not exist, it is created as empty list before performing the push operation.
  10. */
  11. RPUSH(RedisDataType.LIST),
  12. /**
  13. * Add the specified member to the set stored at key.
  14. * Specified member that is already a member of this set is ignored.
  15. */
  16. SADD(RedisDataType.SET),
  17. /**
  18. * Set key to hold the string value. If key already holds a value,
  19. * it is overwritten, regardless of its type.
  20. */
  21. SET(RedisDataType.STRING),
  22. /**
  23. * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
  24. */
  25. PFADD(RedisDataType.HYPER_LOG_LOG),
  26. /**
  27. * Posts a message to the given channel.
  28. */
  29. PUBLISH(RedisDataType.PUBSUB),
  30. /**
  31. * Adds the specified members with the specified score to the sorted set stored at key.
  32. */
  33. ZADD(RedisDataType.SORTED_SET),
  34. /**
  35. * Removes the specified members from the sorted set stored at key.
  36. */
  37. ZREM(RedisDataType.SORTED_SET),
  38. /**
  39. * Sets field in the hash stored at key to value. If key does not exist,
  40. * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
  41. */
  42. HSET(RedisDataType.HASH);
  43. }

RedisSink currently supports single-node mode and sentinel mode connections. And its cluster mode does not support transactions, but StreamPark is currently for support. Please call the official Flink Connector Redis api if you have a usage scenario.
Checkpoint must be enabled under EXACTLY_ONCE semantics, otherwise the program will throw parameter exceptions.
EXACTLY_ONCE semantics checkpoint data sink cache inside the memory, you need to reasonably set the checkpoint interval according to the actual data, otherwise there is a risk of oom.

Other Configuration

All other configurations must adhere to the StreamPark configuration, please refer to project configuration for specific configurable items and the role of each parameter.