Configuration is very important in StreamPark.

Why do I need to configure

It takes about 4 steps to create a DataStream program.

  • StreamexEcutionEnvironment initial and configured
  • Create source
  • Create transformation
  • Create sink

Project Configuration - 图1

When developing ‘datastream’ programs, we need to initialize ‘environment’ and configure relevant parameters. Generally, we should initialize ‘environment’ and configure relevant parameters in the first step. The configuration parameters include the following categories:

  • Parallelism
  • TimeCharacteristic
  • checkpoint
  • Watermark
  • State Backend
  • Restart Strategy
  • Other…

The above configurations are basically general, which is a repetitive work to be done in the first step.

Submit the program as follows:

  1. flink run -m yarn-cluster -p 1 -c com.xx.Main job.jar

You need to set a series of environment parameters when developing the Flink Sql program. In addition, the following is an example of using pure SQL to develop the program.

  1. import org.apache.flink.table.api.EnvironmentSettings;
  2. import org.apache.flink.table.api.Table;
  3. import org.apache.flink.table.api.TableEnvironment;
  4. public class JavaTableApp {
  5. public static void main(String[] args) {
  6. EnvironmentSettings bbSettings = EnvironmentSettings
  7. .newInstance()
  8. .useBlinkPlanner()
  9. .build();
  10. TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
  11. String sourceDDL = "CREATE TABLE datagen ( " +
  12. " f_random INT, " +
  13. " f_random_str STRING, " +
  14. " ts AS localtimestamp, " +
  15. " WATERMARK FOR ts AS ts " +
  16. ") WITH ( " +
  17. " 'connector' = 'datagen', " +
  18. " 'rows-per-second'='10', " +
  19. " 'fields.f_random.min'='1', " +
  20. " 'fields.f_random.max'='5', " +
  21. " 'fields.f_random_str.length'='10' " +
  22. ")";
  23. bsTableEnv.executeSql(sourceDDL);
  24. String sinkDDL = "CREATE TABLE print_table (" +
  25. " f_random int," +
  26. " c_val bigint, " +
  27. " wStart TIMESTAMP(3) " +
  28. ") WITH ('connector' = 'print') ";
  29. bsTableEnv.executeSql(sinkDDL);
  30. }
  31. }

In addition to setting the ‘environmentsettings’ parameter, most of the remaining code is spliced with SQL in Java. If the business is very complex, it will be difficult to maintain.

A simpler method should be used, such as simplifying some environment initialization parameters and startup parameters in the ‘datastream’ and ‘Flink SQL’ tasks. For the ‘Flink SQL’ job, it is better not to write a single line of code, nor write large pieces of SQL in the code. Can it be solved in a more elegant way?

Absolutely

StreamPark proposes the concept of unified program configuration, which is generated by configuring a series of parameters from development to deployment in the application.ymlaccording to a specific format a general configuration template, so that the initialization of the environment can be completed by transferring the configuration of the project to the program when the program is started. This is the concept of configuration file.

StreamPark provides a higher level of abstraction for the Flink SQL, developers only need to define SQL to sql.yaml, when the program is started, the sql.yaml is transferred to the main program, and the SQL will be automatically loaded and executed. This is the concept of sql file.

Terms

In order to better understand and communicate with each other, we configure a series of parameters of the program from development to deployment into a file according to a specific format. This file with a specific role is the configuration file of the project.

The SQL extracted in Flink SQL task is put into sql.yaml, this file with specific role is the sql file of the project.

Configuration file

In StreamPark, the configuration file of DataStream job and Flink Sql are common. In other words, this configuration file can define the configurations of DataStream and Flink Sql (the configuration file in Flink SQL job is optional). The format of the configuration file must be yaml and must meet the requirements of yaml.

How to configure this configuration file and what to pay attention to.

  1. flink:
  2. deployment:
  3. option:
  4. target: application
  5. detached:
  6. shutdownOnAttachedExit:
  7. jobmanager:
  8. property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
  9. $internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp
  10. pipeline.name: StreamPark QuickStart App
  11. yarn.application.queue:
  12. taskmanager.numberOfTaskSlots: 1
  13. parallelism.default: 2
  14. jobmanager.memory:
  15. flink.size:
  16. heap.size:
  17. jvm-metaspace.size:
  18. jvm-overhead.max:
  19. off-heap.size:
  20. process.size:
  21. taskmanager.memory:
  22. flink.size:
  23. framework.heap.size:
  24. framework.off-heap.size:
  25. managed.size:
  26. process.size:
  27. task.heap.size:
  28. task.off-heap.size:
  29. jvm-metaspace.size:
  30. jvm-overhead.max:
  31. jvm-overhead.min:
  32. managed.fraction: 0.4
  33. pipeline:
  34. auto-watermark-interval: 200ms
  35. # checkpoint
  36. execution:
  37. checkpointing:
  38. mode: EXACTLY_ONCE
  39. interval: 30s
  40. timeout: 10min
  41. unaligned: false
  42. externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  43. # state backend
  44. state:
  45. backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
  46. backend.incremental: true
  47. checkpoint-storage: filesystem
  48. savepoints.dir: file:///tmp/chkdir
  49. checkpoints.dir: file:///tmp/chkdir
  50. # restart strategy
  51. restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
  52. restart-strategy.fixed-delay:
  53. attempts: 3
  54. delay: 5000
  55. restart-strategy.failure-rate:
  56. max-failures-per-interval:
  57. failure-rate-interval:
  58. delay:
  59. # table
  60. table:
  61. planner: blink # (blink|old|any)
  62. mode: streaming #(batch|streaming)

The above is the complete configuration related to the environment that needs to be paid attention to. These configurations are carried out under the namespace of Flink, mainly including two categories.

  • The configuration under deployment is the configuration related to the project deployment (that is, the configuration parameters related to a series of resources when the application is started).
  • Others are the configuration of the environment that needs attention during development.

There are some configurations related to the environment that need to be paid attention to during development.

  • checkpoint
  • watermark
  • state backend
  • restart-strategy
  • table

Deployment

Deployment related parameters and configuration items are included in deployment, including two types:

  • option
  • property

    option

The parameters of Flink run are configured under option. Currently, the supported parameters are as follows.

Short Param Full Param(prefix”—“) Effective Value & Type Description
-t target yarn-per-job application Deployment mode(only support yarn-per-job,application)
-d detached true false run as detached mode
-n allowNonRestoredState true false allow to skip savepoint state that cannot be restored
-sae shutdownOnAttachedExit true false If the job is submitted in attached, when job cancel close cluster
-m jobmanager yarn-cluster address Address of the JobManager to which to connect
-p parallelism int Program parallelism
-c class String Class with the program entry point (“main()” method)

parallelism (-p) Parallelism does not support configuration in option, you can configure it in property class (-c) The main class of the program does not support configuration in option, you can configure it in property

The parameter in option must be a full parameter name

property

The parameter under property is the parameter under standard parameter - D, including two parts

  • Basic parameters
  • Memory parameters
    Basic parameters

There are many basic parameters. The five most basic parameters are as follows.

Key Description Required
$internal.application.main Class with the program entry point (“main()” method)
pipeline.name Job name
yarn.application.queue YARN queue
taskmanager.numberOfTaskSlots Taskmanager slot number
parallelism.default Program parallelism

$internal.application.main and pipeline.name must be set.

If you need to set more parameters, please refer to here, These parameters must be placed under the property and the parameter names must be correct. StreamPark will automatically resolve these parameters and take effect.

Memory parameters

Memory has many configuration parameters. The common configurations are as follows.

Key Description
jobmanager.memory.heap.size JVM Heap Memory size for JobManager. The minimum recommended JVM Heap size is 128.000mb (134217728 bytes).
jobmanager.memory.off-heap.size Off-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by jobmanager.memory.enable-jvm-direct-memory-limit
jobmanager.memory.jvm-metaspace.size JVM Metaspace Size for the JobManager.
jobmanager.memory.jvm-overhead.min Min JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.max Max JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.fraction Fraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
taskmanager.memory.framework.heap.size Framework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.
taskmanager.memory.task.heap.size Task Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Framework Off-Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.
taskmanager.memory.managed.size Managed Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.managed.fraction Fraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.
taskmanager.memory.framework.off-heap.size Framework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
taskmanager.memory.task.off-heap.size Task Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
taskmanager.memory.jvm-metaspace.size JVM Metaspace Size for the TaskExecutors.

Similarly, if you want to configure more memory parameters, please refer to here. You need to put the memory configuration of Flink process memory, jobmanager and taskmanager in the property to ensure that it takes effect.

Configure Total Memory

The total process memory of Flink JVM processes consists of memory consumed by the Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM Heap and Off-heap (Direct or Native) memory.

Project Configuration - 图2

The simplest way to set up memory in Flink is to configure either of the two following options:

Item TaskManager Config JobManager Config
Flink total memory taskmanager.memory.flink.size jobmanager.memory.flink.size
Flink process total memory taskmanager.memory.process.size jobmanager.memory.process.size

Explicitly configuring both total process memory and total Flink memory is not recommended. It may lead to deployment failures due to potential memory configuration conflicts. Configuring other memory components also requires caution as it can produce further configuration conflicts.

Checkpoint

The configuration of checkpoint is simple. You can configure them as follows:

Item Description Value Type
execution.checkpointing.interval Interval period of checkpoint Duration
execution.checkpointing.timeout timeout Duration
execution.checkpointing.mode semantics EXACTLY_ONCE AT_LEAST_ONCE
execution.checkpointing.unaligned unaligned true false

Watermark

For watermark configuration, you only need to set the generation cycle pipeline.auto-watermark-interval of the watermark.

State

  1. state:
  2. backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
  3. backend.incremental: true
  4. checkpoint-storage: filesystem
  5. savepoints.dir: file:///tmp/chkdir
  6. checkpoints.dir: file:///tmp/chkdir

There are roughly two types:

  • backend
  • checkpoints

backend

The backend is used to set the configuration of the state backend. The configuration of the state backend follows the configuration rules in the official website document. The following configurations are supported:

Item Description Value Type Effective rules
state.backend Type of backend storage hashmap rocksdb
state.checkpoint-storage The checkpoint storage implementation to be used to checkpoint state. jobmanager filesystem
state.backend.incremental Whether to enable increment true false rocksdb

If the save type of backend is rocksdb, you may need to further set the configuration of rocksdb. You can refer to the official website for configuration. It should be noted that the configuration of rocksdb on the official website is prefixed with state.backend, and the current namespace is under state.backend. Note that the parameter name must be correct

The value item is a non-standard configuration. This item is used to set the state saving type (jobmanager | filesystem | rocksdb). Other items are standard configurations and comply with the specifications of the official website.

Restart Strategy

There are three restart strategies in Flink, corresponding to the three configurations here, as follows:

  1. restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
  2. restart-strategy.fixed-delay:
  3. attempts: 3
  4. delay: 5000
  5. restart-strategy.failure-rate:
  6. max-failures-per-interval:
  7. failure-rate-interval:
  8. delay:

Configure the specific restart strategy under restart-strategy

  • fixed-delay
  • failure-rate
  • none

fixed-delay

Item Description Value Unit
attempts Number of Flink attempts to restart 3
delay Specify how long to restart after the task fails none s m min h d

Example:

  1. attempts: 5
  2. delay: 3 s

That is to say, the maximum number of failed retries of a task is 5, and the time interval for each task restart is 3 seconds. If the number of failed retries reaches 5, the task will fail and exit.

failure-rate

Item Description Value Unit
max-failures-per-interval Maximum number of restarts in given time interval before failing a job 3
failure-rate-interval Time interval for measuring failure None s m min h d
delay Delay between two consecutive restart attempts None s m min h d

Example

  1. max-failures-per-interval: 10
  2. failure-rate-interval: 5 min
  3. delay: 2 s

That is, the time interval between each abnormal restart is 2 seconds. If the total number of failures reaches 10 within 5 minutes, the task fails.

None

There is no need to configure task parameters in case of no restart.

Unit suffix

Note that the time interval and frequency settings can be set without the unit suffix. If the unit suffix is not included, it will be treated as milliseconds by default. The optional units are:

  • s second
  • m minute
  • min minute
  • h hour
  • d day

Table

Under table is the configuration of Flink SQL. The currently supported configuration items and functions are as follows:

  • planner
  • mode
  • catalog
  • database
Item Description Value
planner Table Planner blink old any
mode Table Mode streaming batch
catalog Catalog,Specifies that the will be used during initialization
database Database,Specifies that the will be used during initialization

Sql file

The SQL file must be in yaml format, and the definition rules of yaml file must be followed. The definition of specific internal SQL format is very simple, as follows:

  1. sql: |
  2. CREATE TABLE datagen (
  3. f_sequence INT,
  4. f_random INT,
  5. f_random_str STRING,
  6. ts AS localtimestamp,
  7. WATERMARK FOR ts AS ts
  8. ) WITH (
  9. 'connector' = 'datagen',
  10. -- optional options --
  11. 'rows-per-second'='5',
  12. 'fields.f_sequence.kind'='sequence',
  13. 'fields.f_sequence.start'='1',
  14. 'fields.f_sequence.end'='1000',
  15. 'fields.f_random.min'='1',
  16. 'fields.f_random.max'='1000',
  17. 'fields.f_random_str.length'='10'
  18. );
  19. CREATE TABLE print_table (
  20. f_sequence INT,
  21. f_random INT,
  22. f_random_str STRING
  23. ) WITH (
  24. 'connector' = 'print'
  25. );
  26. INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;

sql is the ID of the current SQL and must be unique. The following contents are specific SQL.

In the above content, | after SQL is required. In addition, | will retain the format of the whole section. StreamPark can directly define multiple SQLs at once. Each SQLs must be separated by semicolons, and each section of SQLs must follow the format and specification specified by Flink SQL.

Summary

This chapter introduces the specific configuration of configuration files and SQL files in detail. I believe you have a preliminary impression and concept. Please refer to the following chapters for specific use.