如何使用

在上个章节已经详细介绍了一站式平台 streampark-console 的安装, 本章节看看如果用 streampark-console 快速部署运行一个作业, streampark-console 对标准的 Flink 程序 ( 按照 Flink 官方要求的结构和规范 ) 和用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console 之旅

streampark-quickstart 是 StreamPark 开发 Flink 的上手示例程序,具体请查阅:

部署 DataStream 任务

下面的示例演示了如何部署一个 DataStream 应用

部署 FlinkSql 任务

下面的示例演示了如何部署一个 FlinkSql 应用

  • 项目演示使用到的 flink sql 如下
  1. CREATE TABLE user_log (
  2. user_id VARCHAR,
  3. item_id VARCHAR,
  4. category_id VARCHAR,
  5. behavior VARCHAR,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector.type' = 'kafka', -- 使用 kafka connector
  9. 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
  10. 'connector.topic' = 'user_behavior', -- kafka topic
  11. 'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092',
  12. 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
  13. 'update-mode' = 'append',
  14. 'format.type' = 'json', -- 数据源格式为 json
  15. 'format.derive-schema' = 'true' -- DDL schema 确定 json 解析规则
  16. );
  17. CREATE TABLE pvuv_sink (
  18. dt VARCHAR,
  19. pv BIGINT,
  20. uv BIGINT
  21. ) WITH (
  22. 'connector.type' = 'jdbc', -- 使用 jdbc connector
  23. 'connector.url' = 'jdbc:mysql://test-mysql:3306/test', -- jdbc url
  24. 'connector.table' = 'pvuv_sink', -- 表名
  25. 'connector.username' = 'root', -- 用户名
  26. 'connector.password' = '123456', -- 密码
  27. 'connector.write.flush.max-rows' = '1' -- 默认 5000 条,为了演示改为 1
  28. );
  29. INSERT INTO pvuv_sink
  30. SELECT
  31. DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  32. COUNT(*) AS pv,
  33. COUNT(DISTINCT user_id) AS uv
  34. FROM user_log
  35. GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
  • 使用到 maven 依赖如下
  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.48</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-sql-connector-kafka_2.11</artifactId>
  9. <version>1.12.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-connector-jdbc_2.11</artifactId>
  14. <version>1.12.0</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-json</artifactId>
  19. <version>1.12.0</version>
  20. </dependency>
  • Kafka 模拟发送的数据如下
  1. {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts":"2021-02-01T01:00:00Z"}
  2. {"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "pv", "ts":"2021-02-01T01:00:00Z"}
  3. {"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "pv", "ts":"2021-02-01T01:00:00Z"}
  4. {"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "learning flink", "ts":"2021-02-01T01:00:00Z"}

任务启动流程

任务启动流程图如下

快速开始 - 图1

streampark-console 提交任务流程

关于项目的概念,Development Mode,savepoint,NoteBook,自定义 jar 管理,任务发布,任务恢复,参数配置,参数对比,多版本管理等等更多使用教程和文档后续持续更新。..