配置在StreamPark中是非常重要的概念,先说说为什么需要配置

为什么需要配置

开发DataStream程序,大体流程都可以抽象为以下4步

  • StreamExecutionEnvironment初始并配置
  • Source接入数据
  • Transformation逻辑处理
  • Sink结果数据落地

项目配置 - 图1

开发DataStream程序都需要定义Environment初始化并且配置环境相关的参数,一般我们都会在第一步初始化Environment并配置各种参数,配置的参数大概有以下几类

  • Parallelism 默认并行度配置
  • TimeCharacteristic 时间特征配置
  • checkpoint 检查点的相关配置
  • Watermark 相关配置
  • State Backend 状态后端配置
  • Restart Strategy 重启策略配置
  • 其他配置…

以上的配置基本都是比较普遍且通用的,是每个程序上来第一步就要定义的,是一项重复的工作

当程序写好后,要上线运行,任务启动提交都差不多用下面的命令行的方式,设置各种启动参数, 这时就得开发者清楚的知道每个参数的含义,如果再设置几个运行时资源参数,那启动命名会很长,可读性很差,参数解析用到了强校验,一旦设置错误,会直接报错,导致任务启动失败,最直接的异常是 找不到程序的jar

  1. flink run -m yarn-cluster -p 1 -c com.xx.Main job.jar

开发Flink Sql程序,也需要设置一系列环境参数,除此之外,如果要使用纯sql的方式开发,举一个最简单的例子,代码如下

``` java title= 编程式开发FlinkSql任务

import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment;

public class JavaTableApp {

  1. public static void main(String[] args) {
  2. EnvironmentSettings bbSettings = EnvironmentSettings
  3. .newInstance()
  4. .useBlinkPlanner()
  5. .build();
  6. TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
  7. String sourceDDL = "CREATE TABLE datagen ( " +
  8. " f_random INT, " +
  9. " f_random_str STRING, " +
  10. " ts AS localtimestamp, " +
  11. " WATERMARK FOR ts AS ts " +
  12. ") WITH ( " +
  13. " 'connector' = 'datagen', " +
  14. " 'rows-per-second'='10', " +
  15. " 'fields.f_random.min'='1', " +
  16. " 'fields.f_random.max'='5', " +
  17. " 'fields.f_random_str.length'='10' " +
  18. ")";
  19. bsTableEnv.executeSql(sourceDDL);
  20. String sinkDDL = "CREATE TABLE print_table (" +
  21. " f_random int," +
  22. " c_val bigint, " +
  23. " wStart TIMESTAMP(3) " +
  24. ") WITH ('connector' = 'print') ";
  25. bsTableEnv.executeSql(sinkDDL);
  26. }

}

  1. 我们会看到除了设置 `EnvironmentSettings` 参数之外,剩下的几乎大段大段的代码都是在写 `sql`,用java代码拼接各种sql,这种编码的方式,极不优雅,如果业务复杂,更是难以维护,而且会发现,整个编码的模式是统一的, 都是声明一段sql,然后调用 `executeSql` 方法
  2. **我们的设想是**:能不能以一种更好的方式将这种重复的工作简单化,将`DataStream``Flink Sql`任务中的一些环境初始化相关的参数和启动相关参数简化,最好一行代码都不写,针对`Flink Sql`作业,也不想在代码里写大段的sql,能不能以一种更优雅的方式解决?
  3. **答案是肯定的**
  4. 针对参数设置的问题,在`StreamPark`中提出统一程序配置的概念,把程序的一系列参数从开发到部署阶段按照特定的格式配置到`application.yml`里,抽象出
  5. 一个通用的配置模板,按照这种规定的格式将上述配置的各项参数在配置文件里定义出来,在程序启动的时候将这个项目配置传入到程序中即可完成环境的初始化工作,在任务启动的时候也会自动识别启动时的参数.
  6. 针对Flink Sql作业在代码里写sql的问题,`StreamPark`针对`Flink Sql`作业做了更高层级封装和抽象,开发者只需要将sql按照一定的规范要求定义到`application.yaml`中,在程序启动时传入该文件到主程序中, 就会自动按照要求加载执行sql
  7. 下面我们来详细看看这个配置文件的各项配置都是如何进行配置的,有哪些注意事项
  8. ```yaml
  9. flink:
  10. option:
  11. target: yarn-per-job
  12. detached:
  13. shutdownOnAttachedExit:
  14. zookeeperNamespace:
  15. jobmanager:
  16. property: #@see: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
  17. $internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp
  18. pipeline.name: streampark-quickstartApp
  19. yarn.application.queue:
  20. taskmanager.numberOfTaskSlots: 1
  21. parallelism.default: 2
  22. jobmanager.memory:
  23. flink.size:
  24. heap.size:
  25. jvm-metaspace.size:
  26. jvm-overhead.max:
  27. off-heap.size:
  28. process.size:
  29. taskmanager.memory:
  30. flink.size:
  31. framework.heap.size:
  32. framework.off-heap.size:
  33. managed.size:
  34. process.size:
  35. task.heap.size:
  36. task.off-heap.size:
  37. jvm-metaspace.size:
  38. jvm-overhead.max:
  39. jvm-overhead.min:
  40. managed.fraction: 0.4
  41. pipeline:
  42. auto-watermark-interval: 200ms
  43. # checkpoint
  44. execution:
  45. checkpointing:
  46. mode: EXACTLY_ONCE
  47. interval: 30s
  48. timeout: 10min
  49. unaligned: false
  50. externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  51. # state backend
  52. state:
  53. backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
  54. backend.incremental: true
  55. checkpoint-storage: filesystem
  56. savepoints.dir: file:///tmp/chkdir
  57. checkpoints.dir: file:///tmp/chkdir
  58. # restart strategy
  59. restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
  60. restart-strategy.fixed-delay:
  61. attempts: 3
  62. delay: 5000
  63. restart-strategy.failure-rate:
  64. max-failures-per-interval:
  65. failure-rate-interval:
  66. delay:
  67. # table
  68. table:
  69. table.local-time-zone: default # @see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
  70. # kafka source
  71. app:
  72. kafka.source:
  73. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  74. topic: test_user
  75. group.id: user_01
  76. auto.offset.reset: earliest
  77. # mysql
  78. jdbc:
  79. driverClassName: com.mysql.cj.jdbc.Driver
  80. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  81. username: root
  82. password: 123456
  83. sql:
  84. flinksql: |
  85. CREATE TABLE datagen (
  86. f_sequence INT,
  87. f_random INT,
  88. f_random_str STRING,
  89. ts AS localtimestamp,
  90. WATERMARK FOR ts AS ts
  91. ) WITH (
  92. 'connector' = 'datagen',
  93. -- optional options --
  94. 'rows-per-second'='5',
  95. 'fields.f_sequence.kind'='sequence',
  96. 'fields.f_sequence.start'='1',
  97. 'fields.f_sequence.end'='1000',
  98. 'fields.f_random.min'='1',
  99. 'fields.f_random.max'='1000000',
  100. 'fields.f_random_str.length'='10'
  101. );
  102. CREATE TABLE print_table (
  103. f_sequence INT,
  104. f_random INT,
  105. f_random_str STRING
  106. ) WITH (
  107. 'connector' = 'print'
  108. );
  109. INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;

从全局看参数分为三大类: flink, app, sql, 各自作用如下:

flink下放的是作业的基础参数, 包括 optionproperty, table 三大类:

option

option下放的参数是flink run 下支持的参数,目前支持的参数如下

短参数 完整参数(前缀”—“) 有效 取值范围值或类型 作用描述
-t target yarn-per-job application 部署方式(目前只支持yarn-per-job,application)
-d detached true false 是否以detached模式启动
-n allowNonRestoredState true false 从savePoint恢复失败时是否允许跳过该步骤
-sae shutdownOnAttachedExit true false attached模式下任务停止时是否关闭集群
-m jobmanager yarn-cluster 连接地址 JobManager的连接地址
-p parallelism int 程序并行度
-c class String 程序的main方法的全名称

parallelism (-p) 并行度不支持在option里配置,会在后面的property里配置 class (-c) 程序main不支持在option里配置,会在后面的property里配置

option下的参数必须是 完整参数名

property

property下放的参数是标准的flink参数(-D下的参数),flink所有的参数数据都可以在这里进行配置, 大致如下:

基础参数

基础参数可以配置的选项非常之多,这里举例5个最基础的设置

参数名称 作用描述 是否必须
$internal.application.main 程序的主类(main)的完整类名
yarn.application.name 程序的名称(YARN中显示的任务名称)
yarn.application.queue 在YARN中运行的队列名称
taskmanager.numberOfTaskSlots taskmanager Slot的数量
parallelism.default 程序的并行

$internal.application.mainyarn.application.name 这两个参数是必须的

如您需要设置更多的参数,可参考这里 一定要将这些参数放到property下,并且参数名称要正确,StreamPark会自动解析这些参数并生效

Memory参数

Memory相关的参数设置也非常之多,一般常见的配置如下

参数名称 作用描述
jobmanager.memory.heap.size JobManager 的 JVM 堆内存
jobmanager.memory.off-heap.size JobManager 的堆外内存(直接内存或本地内存)
jobmanager.memory.jvm-metaspace.size Flink JVM进程的Metaspace
jobmanager.memory.jvm-metaspace.size Flink JVM进程的Metaspace
jobmanager.memory.jvm-overhead.min Flink JVM进程的Metaspace
jobmanager.memory.jvm-metaspace.size 用于其他 JVM 开销的本地内存
jobmanager.memory.jvm-overhead.max 用于其他 JVM 开销的本地内存
jobmanager.memory.jvm-overhead.fraction 用于其他 JVM开销的本地内存
taskmanager.memory.framework.heap.size 用于Flink 框架的JVM堆内存(进阶配置)
taskmanager.memory.task.heap.size 由Flink管理的用于排序,哈希表,缓存状态后端的本地内存
taskmanager.memory.managed.size 用于其他 JVM 开销的本地内存
taskmanager.memory.managed.fraction 用于其他 JVM 开销的本地内存
taskmanager.memory.framework.off-heap.size 用于Flink框架的堆外内存(直接内存或本地内存)进阶配置
taskmanager.memory.task.off-heap.size 用于Flink应用的算子及用户代码的堆外内存(直接内存或本地内存)
taskmanager.memory.jvm-metaspace.size Flink JVM 进程的 Metaspace

同样,如你想配置更多的内存相关的参数,请参考这里 查看Flink Process Memory , jobmanagertaskmanager 相关的内存配置将这些参数放到property下,保证参数正确即可生效

配置总内存

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)

项目配置 - 图2

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项 TaskManager 配置参数 JobManager 配置参数
Flink 总内存 taskmanager.memory.flink.size jobmanager.memory.flink.size
进程总内存 taskmanager.memory.process.size jobmanager.memory.process.size

不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。

Table

table下是Flink Sql相关的配置,目前支持的配置项和作用如下

  • planner
  • mode
  • catalog
  • database
配置项 作用描述 参数值
planner Table Planner blink old any
mode Table Mode streaming batch
catalog 指定catalog,如指定初始化时会使用到
database 指定database,如指定初始化时会使用到

app

我们将作业中需要的一些用户自定义的参数放到app下, 这里可以放用户需要的任意参数. 如,用户的需求是从kafka读取数据写入mysql, 则肯定会用到kafka和mysql的相关信息. 就可以定义到这里.

sql

sql是在flink sql作业中需要指定的, 我们提倡将sql语句本身抽取到配置文件里,使开发更简单, 该sql得遵循yaml文件的定义规则,具体内部sql格式的定义非常简单,如下:

  1. sql: |
  2. CREATE TABLE datagen (
  3. f_sequence INT,
  4. f_random INT,
  5. f_random_str STRING,
  6. ts AS localtimestamp,
  7. WATERMARK FOR ts AS ts
  8. ) WITH (
  9. 'connector' = 'datagen',
  10. -- optional options --
  11. 'rows-per-second'='5',
  12. 'fields.f_sequence.kind'='sequence',
  13. 'fields.f_sequence.start'='1',
  14. 'fields.f_sequence.end'='1000',
  15. 'fields.f_random.min'='1',
  16. 'fields.f_random.max'='1000',
  17. 'fields.f_random_str.length'='10'
  18. );
  19. CREATE TABLE print_table (
  20. f_sequence INT,
  21. f_random INT,
  22. f_random_str STRING
  23. ) WITH (
  24. 'connector' = 'print'
  25. );
  26. INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;

sql为当前sql的id,必须是唯一的,后面的内容则是具体的sql

上面内容中 sql: 后面的 | 是必带的, 加上 | 会保留整段内容的格式,重点是保留了换行符, StreamPark封装了Flink Sql的提交,可以直接将多个Sql一次性定义出来,每个Sql必须用 ; 分割,每段 Sql也必须遵循Flink Sql规定的格式和规范

总结

本章节详细介绍了配置文件sql文件的由来和具体配置,相信你已经有了一个初步的印象和概念,具体使用请查后续章节