在 StreamPark Flink-Kubernetes runtime 下使用 Hadoop 资源,如 checkpoint 挂载 HDFS、读写 Hive 等,大概流程如下:

1. Apache HDFS

​ 如需将 flink on k8s 相关资源放在 HDFS 中,需要经过以下两个步骤:

1.1 添加 shade jar

​ 默认情况下,从 Docker 上 pull 的 Flink 镜像是不包括 Hadoop 相关的 jar,这里以 flink:1.14.5-scala_2.12-java8 为例,如下:

  1. [flink@ff] /opt/flink-1.14.5/lib
  2. $ ls
  3. flink-csv-1.14.5.jar flink-shaded-zookeeper-3.4.14.jar log4j-api-2.17.1.jar
  4. flink-dist_2.12-1.14.5.jar flink-table_2.12-1.14.5.jar log4j-core-2.17.1.jar
  5. flink-json-1.14.5.jar log4j-1.2-api-2.17.1.jar log4j-slf4j-impl-2.17.1.jar

​ 这是需要将 shade jar 下载下来,然后放在 Flink 的 lib 目录下,这里 以hadoop2 为例;下载 flink-shaded-hadoop-2-uberhttps://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar

​ 另外,可以将 shade jar 以依赖的方式在 StreamPark 的任务配置中的Dependency 进行依赖配置,如下配置:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-shaded-hadoop-2-uber</artifactId>
  4. <version>2.7.5-9.0</version>
  5. <scope>provided</scope>
  6. </dependency>
1.2、添加 core-site.xml 和 hdfs-site.xml

​ 有了 shaded jar 还需要相应的配置文件去找到 Hadoop 地址,这里主要涉及到两个配置文件:core-site.xml和hdfs-site.xml,通过 flink 的源码分析(涉及到的类主要是:org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters),该两文件有固定的加载顺序,如下:

  1. // 寻找 hadoop 配置文件的流程
  2. // 1. 先去寻在是否添加了参数:kubernetes.hadoop.conf.config-map.name
  3. @Override
  4. public Optional<String> getExistingHadoopConfigurationConfigMap() {
  5. final String existingHadoopConfigMap =
  6. flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
  7. if (StringUtils.isBlank(existingHadoopConfigMap)) {
  8. return Optional.empty();
  9. } else {
  10. return Optional.of(existingHadoopConfigMap.trim());
  11. }
  12. }
  13. @Override
  14. public Optional<String> getLocalHadoopConfigurationDirectory() {
  15. // 2. 如果没有1中指定的参数,查找提交 native 命令的本地环境是否有环境变量:HADOOP_CONF_DIR
  16. final String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);
  17. if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
  18. return Optional.of(hadoopConfDirEnv);
  19. }
  20. // 3. 如果没有2中环境变量,再继续看是否有环境变量:HADOOP_HOME
  21. final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
  22. if (StringUtils.isNotBlank(hadoopHomeEnv)) {
  23. // Hadoop 2.x
  24. final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
  25. if (hadoop2ConfDir.exists()) {
  26. return Optional.of(hadoop2ConfDir.getAbsolutePath());
  27. }
  28. // Hadoop 1.x
  29. final File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");
  30. if (hadoop1ConfDir.exists()) {
  31. return Optional.of(hadoop1ConfDir.getAbsolutePath());
  32. }
  33. }
  34. return Optional.empty();
  35. }
  36. final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
  37. // 如果没有找到1、2、3说明没有 hadoop 环境
  38. if (hadoopConfigurationFileItems.isEmpty()) {
  39. LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());
  40. return flinkPod;
  41. }
  42. //如果2或者3存在,会在路径下查找 core-site.xml 和 hdfs-site.xml 文件
  43. private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
  44. final List<String> expectedFileNames = new ArrayList<>();
  45. expectedFileNames.add("core-site.xml");
  46. expectedFileNames.add("hdfs-site.xml");
  47. final File directory = new File(localHadoopConfigurationDirectory);
  48. if (directory.exists() && directory.isDirectory()) {
  49. return Arrays.stream(directory.listFiles())
  50. .filter(
  51. file ->
  52. file.isFile()
  53. && expectedFileNames.stream()
  54. .anyMatch(name -> file.getName().equals(name)))
  55. .collect(Collectors.toList());
  56. } else {
  57. return Collections.emptyList();
  58. }
  59. }
  60. // 如果找到上述文件,说明有 Hadoop 的环境,将会把上述两个文件解析为 key-value 对,然后构建成一个 ConfigMap,名字命名规则如下:
  61. public static String getHadoopConfConfigMapName(String clusterId) {
  62. return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
  63. }

2、Apache Hive

​ 将数据 sink 到 Hive,或者以 Hive 的 Metastore 作为 Flink 的元数据,都需要打通 flink 到 hive 的路径,同样需要经过一下两个步骤:

i、添加 hive 相关的 jar

​ 如上所述,默认 flink 镜像是不包括 hive 相关的 jar,需要将 hive 相关的如下三个 jar 放在 flink 的 lib 目录下,这里以 hive 2.3.6 版本为例:

​ 1. hive-exechttps://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.6/hive-exec-2.3.6.jar

​ 2. flink-connector-hivehttps://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.14.5/flink-connector-hive_2.12-1.14.5.jar

​ 3. flink-sql-connector-hivehttps://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.5/flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar

​ 同样,也可以将上述 hive 相关 jar 以依赖的方式在 StreamPark 的任务配置中的Dependency 进行依赖配置,这里不再赘述。

2.1. 添加 hive 的配置文件(hive-site.xml)

​ 和 hdfs 所不同的是,flink 源码中并没有 hive 的配置文件的默认的加载方式,因此需要开发者手动添加 hive 的配置文件,这里主要采用三种方式:

​ 1. 将 hive-site.xml 打在 flink 的自定义镜像之中,一般建议放在镜像里的/opt/flink/目录之下

​ 2. 将 hive-site.xml 放在远端的存储系统之后,例如 HDFS,在使用的时候进行加载

​ 3. 将 hive-site.xml 以 ConfigMap 的形式挂载在 k8s 之中,建议使用此种方式,如下:

  1. # 1. 在指定的 ns 中挂载指定位置的 hive-site.xml
  2. kubectl create cm hive-conf --from-file=hive-site.xml -n flink-test
  3. # 2. 查看挂载到 k8s 中的 hive-site.xml
  4. kubectl describe cm hive-conf -n flink-test
  5. # 3. 将此 cm 挂载到容器内指定的目录
  6. spec:
  7. containers:
  8. - name: flink-main-container
  9. volumeMounts:
  10. - mountPath: /opt/flink/hive
  11. name: hive-conf
  12. volumes:
  13. - name: hive-conf
  14. configMap:
  15. name: hive-conf
  16. items:
  17. - key: hive-site.xml
  18. path: hive-site.xml

总结

​ 通过以上的方式便可以将 Flink 和 Hadoop、Hive 打通,此方法可推广至一般,即 flink 与外部系统如redis、mongo 等连通,一般需要如下两个步骤:

​ 1. 加载指定外部服务的 connector jar

​ 2. 如果有,加载指定的配置文件到 flink 系统之中