在 Flink on Kubernetes 上使用 Apache Hadoop 资源
在 StreamPark Flink-Kubernetes runtime 下使用 Hadoop 资源,如 checkpoint 挂载 HDFS、读写 Hive 等,大概流程如下:
1. Apache HDFS
如需将 flink on k8s 相关资源放在 HDFS 中,需要经过以下两个步骤:
1.1 添加 shade jar
默认情况下,从 Docker 上 pull 的 Flink 镜像是不包括 Hadoop 相关的 jar,这里以 flink:1.14.5-scala_2.12-java8 为例,如下:
[flink@ff] /opt/flink-1.14.5/lib$ lsflink-csv-1.14.5.jar flink-shaded-zookeeper-3.4.14.jar log4j-api-2.17.1.jarflink-dist_2.12-1.14.5.jar flink-table_2.12-1.14.5.jar log4j-core-2.17.1.jarflink-json-1.14.5.jar log4j-1.2-api-2.17.1.jar log4j-slf4j-impl-2.17.1.jar
这是需要将 shade jar 下载下来,然后放在 Flink 的 lib 目录下,这里 以hadoop2 为例;下载 flink-shaded-hadoop-2-uber:https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
另外,可以将 shade jar 以依赖的方式在 StreamPark 的任务配置中的Dependency 进行依赖配置,如下配置:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-9.0</version><scope>provided</scope></dependency>
1.2、添加 core-site.xml 和 hdfs-site.xml
有了 shaded jar 还需要相应的配置文件去找到 Hadoop 地址,这里主要涉及到两个配置文件:core-site.xml和hdfs-site.xml,通过 flink 的源码分析(涉及到的类主要是:org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters),该两文件有固定的加载顺序,如下:
// 寻找 hadoop 配置文件的流程// 1. 先去寻在是否添加了参数:kubernetes.hadoop.conf.config-map.name@Overridepublic Optional<String> getExistingHadoopConfigurationConfigMap() {final String existingHadoopConfigMap =flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);if (StringUtils.isBlank(existingHadoopConfigMap)) {return Optional.empty();} else {return Optional.of(existingHadoopConfigMap.trim());}}@Overridepublic Optional<String> getLocalHadoopConfigurationDirectory() {// 2. 如果没有1中指定的参数,查找提交 native 命令的本地环境是否有环境变量:HADOOP_CONF_DIRfinal String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);if (StringUtils.isNotBlank(hadoopConfDirEnv)) {return Optional.of(hadoopConfDirEnv);}// 3. 如果没有2中环境变量,再继续看是否有环境变量:HADOOP_HOMEfinal String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);if (StringUtils.isNotBlank(hadoopHomeEnv)) {// Hadoop 2.xfinal File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");if (hadoop2ConfDir.exists()) {return Optional.of(hadoop2ConfDir.getAbsolutePath());}// Hadoop 1.xfinal File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");if (hadoop1ConfDir.exists()) {return Optional.of(hadoop1ConfDir.getAbsolutePath());}}return Optional.empty();}final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());// 如果没有找到1、2、3说明没有 hadoop 环境if (hadoopConfigurationFileItems.isEmpty()) {LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());return flinkPod;}//如果2或者3存在,会在路径下查找 core-site.xml 和 hdfs-site.xml 文件private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {final List<String> expectedFileNames = new ArrayList<>();expectedFileNames.add("core-site.xml");expectedFileNames.add("hdfs-site.xml");final File directory = new File(localHadoopConfigurationDirectory);if (directory.exists() && directory.isDirectory()) {return Arrays.stream(directory.listFiles()).filter(file ->file.isFile()&& expectedFileNames.stream().anyMatch(name -> file.getName().equals(name))).collect(Collectors.toList());} else {return Collections.emptyList();}}// 如果找到上述文件,说明有 Hadoop 的环境,将会把上述两个文件解析为 key-value 对,然后构建成一个 ConfigMap,名字命名规则如下:public static String getHadoopConfConfigMapName(String clusterId) {return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;}
2、Apache Hive
将数据 sink 到 Hive,或者以 Hive 的 Metastore 作为 Flink 的元数据,都需要打通 flink 到 hive 的路径,同样需要经过一下两个步骤:
i、添加 hive 相关的 jar
如上所述,默认 flink 镜像是不包括 hive 相关的 jar,需要将 hive 相关的如下三个 jar 放在 flink 的 lib 目录下,这里以 hive 2.3.6 版本为例:
1. hive-exec:https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.6/hive-exec-2.3.6.jar
2. flink-connector-hive:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.14.5/flink-connector-hive_2.12-1.14.5.jar
3. flink-sql-connector-hive:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.5/flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar
同样,也可以将上述 hive 相关 jar 以依赖的方式在 StreamPark 的任务配置中的Dependency 进行依赖配置,这里不再赘述。
2.1. 添加 hive 的配置文件(hive-site.xml)
和 hdfs 所不同的是,flink 源码中并没有 hive 的配置文件的默认的加载方式,因此需要开发者手动添加 hive 的配置文件,这里主要采用三种方式:
1. 将 hive-site.xml 打在 flink 的自定义镜像之中,一般建议放在镜像里的/opt/flink/目录之下
2. 将 hive-site.xml 放在远端的存储系统之后,例如 HDFS,在使用的时候进行加载
3. 将 hive-site.xml 以 ConfigMap 的形式挂载在 k8s 之中,建议使用此种方式,如下:
# 1. 在指定的 ns 中挂载指定位置的 hive-site.xmlkubectl create cm hive-conf --from-file=hive-site.xml -n flink-test# 2. 查看挂载到 k8s 中的 hive-site.xmlkubectl describe cm hive-conf -n flink-test# 3. 将此 cm 挂载到容器内指定的目录spec:containers:- name: flink-main-containervolumeMounts:- mountPath: /opt/flink/hivename: hive-confvolumes:- name: hive-confconfigMap:name: hive-confitems:- key: hive-site.xmlpath: hive-site.xml
总结
通过以上的方式便可以将 Flink 和 Hadoop、Hive 打通,此方法可推广至一般,即 flink 与外部系统如redis、mongo 等连通,一般需要如下两个步骤:
1. 加载指定外部服务的 connector jar
2. 如果有,加载指定的配置文件到 flink 系统之中
