Using Hadoop resources under StreamPark’s Flink Kubernetes runtime, such as checkpoint mount HDFS, read and write Hive, etc. The general process is as follows:

1. Apache HDFS

To put flink on k8s-related resources in HDFS, you need to go through the following two steps:

1.1 Add the shaded jar

By default, the Flink image pulled from Docker does not include Hadoop-related jars. Here, flink:1.14.5-scala_2.12-java8 is taken as an example, as follows:

  1. [flink@ff] /opt/flink-1.14.5/lib
  2. $ ls
  3. flink-csv-1.14.5.jar flink-shaded-zookeeper-3.4.14.jar log4j-api-2.17.1.jar
  4. flink-dist_2.12-1.14.5.jar flink-table_2.12-1.14.5.jar log4j-core-2.17.1.jar
  5. flink-json-1.14.5.jar log4j-1.2-api-2.17.1.jar log4j-slf4j-impl-2.17.1.jar

This is to download the shaded jar and put it in the lib directory of Flink. Take hadoop2 as an example; download flink-shaded-hadoop-2-uberhttps://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar

In addition, you can configure the shaded jar in a dependent manner in the Dependency in the StreamPark task configuration. the following configuration:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-shaded-hadoop-2-uber</artifactId>
  4. <version>2.7.5-9.0</version>
  5. <scope>provided</scope>
  6. </dependency>
1.2. add core-site.xml and hdfs-site.xml

With the shaded jar, you also need the corresponding configuration file to find the Hadoop address. Two configuration files are mainly involved here: core-site.xml and hdfs-site.xml, through the source code analysis of flink (the classes involved are mainly: org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters), the two files have a fixed loading order, as follows:

  1. // The process of finding hadoop configuration files:
  2. // 1. Find out whether parameters have been added:${kubernetes.hadoop.conf.config-map.name}
  3. @Override
  4. public Optional<String> getExistingHadoopConfigurationConfigMap() {
  5. final String existingHadoopConfigMap =
  6. flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
  7. if (StringUtils.isBlank(existingHadoopConfigMap)) {
  8. return Optional.empty();
  9. } else {
  10. return Optional.of(existingHadoopConfigMap.trim());
  11. }
  12. }
  13. @Override
  14. public Optional<String> getLocalHadoopConfigurationDirectory() {
  15. // 2、If there is no parameter specified in "1", find out whether the local environment where the native command is submitted has environment variables:${HADOOP_CONF_DIR}
  16. final String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);
  17. if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
  18. return Optional.of(hadoopConfDirEnv);
  19. }
  20. // 3、If there are no "2" environment variables, continue to see if there are environment variables:${HADOOP_HOME}
  21. final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
  22. if (StringUtils.isNotBlank(hadoopHomeEnv)) {
  23. // Hadoop 2.x
  24. final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
  25. if (hadoop2ConfDir.exists()) {
  26. return Optional.of(hadoop2ConfDir.getAbsolutePath());
  27. }
  28. // Hadoop 1.x
  29. final File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");
  30. if (hadoop1ConfDir.exists()) {
  31. return Optional.of(hadoop1ConfDir.getAbsolutePath());
  32. }
  33. }
  34. return Optional.empty();
  35. }
  36. final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
  37. // If "1", "2", and "3" are not found, there is no hadoop environment
  38. if (hadoopConfigurationFileItems.isEmpty()) {
  39. LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());
  40. return flinkPod;
  41. }
  42. // If "2" or "3" exists, it will look for the core-site.xml and hdfs-site.xml files in the path where the above environment variables are located
  43. private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
  44. final List<String> expectedFileNames = new ArrayList<>();
  45. expectedFileNames.add("core-site.xml");
  46. expectedFileNames.add("hdfs-site.xml");
  47. final File directory = new File(localHadoopConfigurationDirectory);
  48. if (directory.exists() && directory.isDirectory()) {
  49. return Arrays.stream(directory.listFiles())
  50. .filter(
  51. file ->
  52. file.isFile()
  53. && expectedFileNames.stream()
  54. .anyMatch(name -> file.getName().equals(name)))
  55. .collect(Collectors.toList());
  56. } else {
  57. return Collections.emptyList();
  58. }
  59. }
  60. // If the above files are found, a Hadoop environment exists. The above two files will be parsed into key-value pairs and then constructed into a ConfigMap. The naming rules are as follows:
  61. public static String getHadoopConfConfigMapName(String clusterId) {
  62. return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
  63. }

2. Apache Hive

To sink data to Apache Hive or use Hive Metastore for Flink’s metadata, it is necessary to open the path from Apache Flink to Apache Hive, which also needs to go through the following two steps:

As mentioned above, the default flink image does not include hive-related jars. The following three hive-related jars need to be placed in the lib directory of Flink. Here, Apache Hive version 2.3.6 is used as an example:

  1. hive-exechttps://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.6/hive-exec-2.3.6.jar
  2. flink-connector-hivehttps://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.14.5/flink-connector-hive_2.12-1.14.5.jar
  3. flink-sql-connector-hivehttps://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.5/flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar

Similarly, the above-mentioned hive-related jars can also be dependently configured in the Dependency task configuration of StreamPark in a dependent manner, which will not be repeated here.

2.2 Add Apache Hive configuration file (hive-site.xml)

The difference to HDFS is that there is no default loading method for the hive configuration file in the Flink source code, so developers need to manually add the hive configuration file. There are three main methods here:

  1. Put hive-site.xml in the custom image of Flink, it is generally recommended to put it under the /opt/flink/ directory in the image
  2. Put hive-site.xml behind the remote storage system, such as HDFS, and load it when it is used
  3. Mount hive-site.xml in k8s in the form of ConfigMap. It is recommended to use this method as follows:
  1. # 1. Mount the hive-site.xml at the specified location in the specified namespace
  2. kubectl create cm hive-conf --from-file=hive-site.xml -n flink-test
  3. # 2. View the hive-site.xml mounted to k8s
  4. kubectl describe cm hive-conf -n flink-test
  5. # 3. Mount this cm to the specified directory inside the container
  6. spec:
  7. containers:
  8. - name: flink-main-container
  9. volumeMounts:
  10. - mountPath: /opt/flink/hive
  11. name: hive-conf
  12. volumes:
  13. - name: hive-conf
  14. configMap:
  15. name: hive-conf
  16. items:
  17. - key: hive-site.xml
  18. path: hive-site.xml

Conclusion

Through the above method, Apache Flink can be connected with Apache Hadoop and Hive. This method can be extended to general, that is, flink and external systems such as redis, mongo, etc., generally require the following two steps:

  1. Load the connector jar of the specified external service;
  2. If there is, load the specified configuration file into the Flink system.