This section provides a quick guide to using SeaTunnel with Kubernetes.

Prerequisites

We assume that you have a local installations of the following:

So that the kubectl and helm commands are available on your local system.

For kubernetes minikube is our choice, at the time of writing this we are using version v1.23.3. You can start a cluster with the following command:

  1. minikube start --kubernetes-version=v1.23.3

Installation

SeaTunnel docker image

To run the image with SeaTunnel, first create a Dockerfile:

  1. FROM flink:1.13
  2. ENV SEATUNNEL_VERSION="2.3.5"
  3. ENV SEATUNNEL_HOME="/opt/seatunnel"
  4. RUN wget https://dlcdn.apache.org/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
  5. RUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
  6. RUN mv apache-seatunnel-${SEATUNNEL_VERSION} ${SEATUNNEL_HOME}
  7. RUN cd ${SEATUNNEL_HOME} && sh bin/install-plugin.sh ${SEATUNNEL_VERSION}

Then run the following commands to build the image:

  1. docker build -t seatunnel:2.3.5-flink-1.13 -f Dockerfile .

Image seatunnel:2.3.5-flink-1.13 need to be present in the host (minikube) so that the deployment can take place.

Load image to minikube via:

  1. minikube image load seatunnel:2.3.5-flink-1.13

Zeta (local-mode)

  1. FROM openjdk:8
  2. ENV SEATUNNEL_VERSION="2.3.5"
  3. ENV SEATUNNEL_HOME="/opt/seatunnel"
  4. RUN wget https://dlcdn.apache.org/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
  5. RUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
  6. RUN mv apache-seatunnel-${SEATUNNEL_VERSION} ${SEATUNNEL_HOME}
  7. RUN cd ${SEATUNNEL_HOME} && sh bin/install-plugin.sh ${SEATUNNEL_VERSION}

Then run the following commands to build the image:

  1. docker build -t seatunnel:2.3.5 -f Dockerfile .

Image seatunnel:2.3.5 need to be present in the host (minikube) so that the deployment can take place.

Load image to minikube via:

  1. minikube image load seatunnel:2.3.5

Zeta (cluster-mode)

  1. FROM openjdk:8
  2. ENV SEATUNNEL_VERSION="2.3.5"
  3. ENV SEATUNNEL_HOME="/opt/seatunnel"
  4. RUN wget https://dlcdn.apache.org/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
  5. RUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gz
  6. RUN mv apache-seatunnel-${SEATUNNEL_VERSION} ${SEATUNNEL_HOME}
  7. RUN mkdir -p $SEATUNNEL_HOME/logs
  8. RUN cd ${SEATUNNEL_HOME} && sh bin/install-plugin.sh ${SEATUNNEL_VERSION}

Then run the following commands to build the image:

  1. docker build -t seatunnel:2.3.5 -f Dockerfile .

Image seatunnel:2.3.5 need to be present in the host (minikube) so that the deployment can take place.

Load image to minikube via:

  1. minikube image load seatunnel:2.3.5

Deploying the operator

The steps below provide a quick walk-through on setting up the Flink Kubernetes Operator. You can refer to Flink Kubernetes Operator - Quick Start for more details.

Notice: All the Kubernetes resources bellow are created in default namespace.

Install the certificate manager on your Kubernetes cluster to enable adding the webhook component (only needed once per Kubernetes cluster):

  1. kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml

Now you can deploy the latest stable Flink Kubernetes Operator version using the included Helm chart:

  1. helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.3.1/
  2. helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
  3. --set image.repository=apache/flink-kubernetes-operator

You may verify your installation via kubectl:

  1. kubectl get pods
  2. NAME READY STATUS RESTARTS AGE
  3. flink-kubernetes-operator-5f466b8549-mgchb 1/1 Running 3 (23h ago) 16d

Zeta (local-mode)

none

Zeta (cluster-mode)

none

Run SeaTunnel Application

Run Application:: SeaTunnel already providers out-of-the-box configurations.

In this guide we are going to use seatunnel.streaming.conf:

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 2000
  5. }
  6. source {
  7. FakeSource {
  8. result_table_name = "fake"
  9. row.num = 160000
  10. schema = {
  11. fields {
  12. name = "string"
  13. age = "int"
  14. }
  15. }
  16. }
  17. }
  18. transform {
  19. FieldMapper {
  20. source_table_name = "fake"
  21. result_table_name = "fake1"
  22. field_mapper = {
  23. age = age
  24. name = new_name
  25. }
  26. }
  27. }
  28. sink {
  29. Console {
  30. source_table_name = "fake1"
  31. }
  32. }

Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.

  1. kubectl create cm seatunnel-config \
  2. --from-file=seatunnel.streaming.conf=seatunnel.streaming.conf

Once the Flink Kubernetes Operator is running as seen in the previous steps you are ready to submit a Flink (SeaTunnel) job:

  • Create seatunnel-flink.yaml FlinkDeployment manifest:

    1. apiVersion: flink.apache.org/v1beta1
    2. kind: FlinkDeployment
    3. metadata:
    4. name: seatunnel-flink-streaming-example
    5. spec:
    6. image: seatunnel:2.3.5-flink-1.13
    7. flinkVersion: v1_13
    8. flinkConfiguration:
    9. taskmanager.numberOfTaskSlots: "2"
    10. serviceAccount: flink
    11. jobManager:
    12. replicas: 1
    13. resource:
    14. memory: "1024m"
    15. cpu: 1
    16. taskManager:
    17. resource:
    18. memory: "1024m"
    19. cpu: 1
    20. podTemplate:
    21. spec:
    22. containers:
    23. - name: flink-main-container
    24. volumeMounts:
    25. - name: seatunnel-config
    26. mountPath: /data/seatunnel.streaming.conf
    27. subPath: seatunnel.streaming.conf
    28. volumes:
    29. - name: seatunnel-config
    30. configMap:
    31. name: seatunnel-config
    32. items:
    33. - key: seatunnel.streaming.conf
    34. path: seatunnel.streaming.conf
    35. job:
    36. jarURI: local:///opt/seatunnel/starter/seatunnel-flink-13-starter.jar
    37. entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
    38. args: ["--config", "/data/seatunnel.streaming.conf"]
    39. parallelism: 2
    40. upgradeMode: stateless
  • Run the example application:

    1. kubectl apply -f seatunnel-flink.yaml

Zeta (local-mode)

In this guide we are going to use seatunnel.streaming.conf:

  1. env {
  2. parallelism = 2
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 2000
  5. }
  6. source {
  7. FakeSource {
  8. parallelism = 2
  9. result_table_name = "fake"
  10. row.num = 16
  11. schema = {
  12. fields {
  13. name = "string"
  14. age = "int"
  15. }
  16. }
  17. }
  18. }
  19. sink {
  20. Console {
  21. }
  22. }

Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.

  1. kubectl create cm seatunnel-config \
  2. --from-file=seatunnel.streaming.conf=seatunnel.streaming.conf
  • Create seatunnel.yaml:

    1. apiVersion: v1
    2. kind: Pod
    3. metadata:
    4. name: seatunnel
    5. spec:
    6. containers:
    7. - name: seatunnel
    8. image: seatunnel:2.3.5
    9. command: ["/bin/sh","-c","/opt/seatunnel/bin/seatunnel.sh --config /data/seatunnel.streaming.conf -e local"]
    10. resources:
    11. limits:
    12. cpu: "1"
    13. memory: 4G
    14. requests:
    15. cpu: "1"
    16. memory: 2G
    17. volumeMounts:
    18. - name: seatunnel-config
    19. mountPath: /data/seatunnel.streaming.conf
    20. subPath: seatunnel.streaming.conf
    21. volumes:
    22. - name: seatunnel-config
    23. configMap:
    24. name: seatunnel-config
    25. items:
    26. - key: seatunnel.streaming.conf
    27. path: seatunnel.streaming.conf
  • Run the example application:

    1. kubectl apply -f seatunnel.yaml

Zeta (cluster-mode)

In this guide we are going to use seatunnel.streaming.conf:

  1. env {
  2. parallelism = 2
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 2000
  5. }
  6. source {
  7. FakeSource {
  8. parallelism = 2
  9. result_table_name = "fake"
  10. row.num = 16
  11. schema = {
  12. fields {
  13. name = "string"
  14. age = "int"
  15. }
  16. }
  17. }
  18. }
  19. sink {
  20. Console {
  21. }
  22. }

Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.

  1. kubectl create cm seatunnel-config \
  2. --from-file=seatunnel.streaming.conf=seatunnel.streaming.conf

Then, we use the following command to load some configuration files used by the seatunnel cluster into the configmap

Create the yaml file locally as follows

  • Create hazelcast-client.yaml:
  1. hazelcast-client:
  2. cluster-name: seatunnel
  3. properties:
  4. hazelcast.logging.type: log4j2
  5. network:
  6. cluster-members:
  7. - localhost:5801
  • Create hazelcast.yaml:
  1. hazelcast:
  2. cluster-name: seatunnel
  3. network:
  4. rest-api:
  5. enabled: true
  6. endpoint-groups:
  7. CLUSTER_WRITE:
  8. enabled: true
  9. DATA:
  10. enabled: true
  11. join:
  12. tcp-ip:
  13. enabled: true
  14. member-list:
  15. - localhost
  16. port:
  17. auto-increment: false
  18. port: 5801
  19. properties:
  20. hazelcast.invocation.max.retry.count: 20
  21. hazelcast.tcp.join.port.try.count: 30
  22. hazelcast.logging.type: log4j2
  23. hazelcast.operation.generic.thread.count: 50
  • Create seatunnel.yaml:
  1. seatunnel:
  2. engine:
  3. history-job-expire-minutes: 1440
  4. backup-count: 1
  5. queue-type: blockingqueue
  6. print-execution-info-interval: 60
  7. print-job-metrics-info-interval: 60
  8. slot-service:
  9. dynamic-slot: true
  10. checkpoint:
  11. interval: 10000
  12. timeout: 60000
  13. storage:
  14. type: hdfs
  15. max-retained: 3
  16. plugin-config:
  17. namespace: /tmp/seatunnel/checkpoint_snapshot
  18. storage.type: hdfs
  19. fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission

Create congfigmaps for the configuration file using the following command

  1. kubectl create configmap hazelcast-client --from-file=hazelcast-client.yaml
  2. kubectl create configmap hazelcast --from-file=hazelcast.yaml
  3. kubectl create configmap seatunnelmap --from-file=seatunnel.yaml

Deploy Reloader to achieve hot deployment We use the Reloader here to automatically restart the pod when the configuration file or other modifications are made. You can also directly give the value of the configuration file and do not use the Reloader

  1. wget https://raw.githubusercontent.com/stakater/Reloader/master/deployments/kubernetes/reloader.yaml
  2. kubectl apply -f reloader.yaml
  • Create seatunnel-cluster.yml: ```yaml apiVersion: v1 kind: Service metadata: name: seatunnel spec: selector: app: seatunnel ports:
    • port: 5801 name: seatunnel clusterIP: None

apiVersion: apps/v1 kind: StatefulSet metadata: name: seatunnel annotations: configmap.reloader.stakater.com/reload: “hazelcast,hazelcast-client,seatunnelmap” spec: serviceName: “seatunnel” replicas: 3 # modify replicas according to your case selector: matchLabels: app: seatunnel template: metadata: labels: app: seatunnel spec: containers:

  1. - name: seatunnel
  2. image: seatunnel:2.3.5
  3. imagePullPolicy: IfNotPresent
  4. ports:
  5. - containerPort: 5801
  6. name: client
  7. command: ["/bin/sh","-c","/opt/seatunnel/bin/seatunnel-cluster.sh -DJvmOption=-Xms2G -Xmx2G"]
  8. resources:
  9. limits:
  10. cpu: "1"
  11. memory: 4G
  12. requests:
  13. cpu: "1"
  14. memory: 2G
  15. volumeMounts:
  16. - mountPath: "/opt/seatunnel/config/hazelcast.yaml"
  17. name: hazelcast
  18. subPath: hazelcast.yaml
  19. - mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"
  20. name: hazelcast-client
  21. subPath: hazelcast-client.yaml
  22. - mountPath: "/opt/seatunnel/config/seatunnel.yaml"
  23. name: seatunnelmap
  24. subPath: seatunnel.yaml
  25. - mountPath: /data/seatunnel.streaming.conf
  26. name: seatunnel-config
  27. subPath: seatunnel.streaming.conf
  28. volumes:
  29. - name: hazelcast
  30. configMap:
  31. name: hazelcast
  32. - name: hazelcast-client
  33. configMap:
  34. name: hazelcast-client
  35. - name: seatunnelmap
  36. configMap:
  37. name: seatunnelmap
  38. - name: seatunnel-config
  39. configMap:
  40. name: seatunnel-config
  41. items:
  42. - key: seatunnel.streaming.conf
  43. path: seatunnel.streaming.conf
  1. - Starting a cluster:
  2. ```bash
  3. kubectl apply -f seatunnel-cluster.yml

Then modify the seatunnel configuration in pod using the following command

  1. kubectl edit cm hazelcast

Change the member-list option to your cluster address

This uses the headless service access mode

The format for accessing between general pods is [pod-name].[service-name].[namespace].svc.cluster.local

for example:

  1. - seatunnel-0.seatunnel.default.svc.cluster.local
  2. - seatunnel-1.seatunnel.default.svc.cluster.local
  3. - seatunnel-2.seatunnel.default.svc.cluster.local
  1. kubectl edit cm hazelcast-client

Change the cluster-members option to your cluster address

for example:

  1. - seatunnel-0.seatunnel.default.svc.cluster.local:5801
  2. - seatunnel-1.seatunnel.default.svc.cluster.local:5801
  3. - seatunnel-2.seatunnel.default.svc.cluster.local:5801

Later, you will see that the pod automatically restarts and updates the seatunnel configuration

  1. kubectl edit cm hazelcast-client

After we wait for all pod updates to be completed, we can use the following command to check if the configuration inside the pod has been updated

  1. kubectl exec -it seatunnel-0 -- cat /opt/seatunnel/config/hazelcast-client.yaml

Afterwards, we can submit tasks to any pod

  1. kubectl exec -it seatunnel-0 -- /opt/seatunnel/bin/seatunnel.sh --config /data/seatunnel.streaming.conf

See The Output

You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:

  1. kubectl logs -f deploy/seatunnel-flink-streaming-example

looks like the below:

  1. ...
  2. 2023-01-31 12:13:54,349 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from SCHEDULED to DEPLOYING.
  3. 2023-01-31 12:13:56,684 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (attempt #0) with attempt id 1665d2d011b2f6cf6525c0e5e75ec251 to seatunnel-flink-streaming-example-taskmanager-1-1 @ 100.103.244.106 (dataPort=39137) with allocation id fbe162650c4126649afcdaff00e46875
  4. 2023-01-31 12:13:57,794 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from DEPLOYING to INITIALIZING.
  5. 2023-01-31 12:13:58,203 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from INITIALIZING to RUNNING.

If OOM error accur in the log, you can decrease the row.num value in seatunnel.streaming.conf

To expose the Flink Dashboard you may add a port-forward rule:

  1. kubectl port-forward svc/seatunnel-flink-streaming-example-rest 8081

Now the Flink Dashboard is accessible at localhost:8081.

Or launch minikube dashboard for a web-based Kubernetes user interface.

The content printed in the TaskManager Stdout log:

  1. kubectl logs \
  2. -l 'app in (seatunnel-flink-streaming-example), component in (taskmanager)' \
  3. --tail=-1 \
  4. -f

looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):

  1. ...
  2. subtaskIndex=0: row=159991 : VVgpp, 978840000
  3. subtaskIndex=0: row=159992 : JxrOC, 1493825495
  4. subtaskIndex=0: row=159993 : YmCZR, 654146216
  5. subtaskIndex=0: row=159994 : LdmUn, 643140261
  6. subtaskIndex=0: row=159995 : tURkE, 837012821
  7. subtaskIndex=0: row=159996 : uPDfd, 2021489045
  8. subtaskIndex=0: row=159997 : mjrdG, 2074957853
  9. subtaskIndex=0: row=159998 : xbeUi, 864518418
  10. subtaskIndex=0: row=159999 : sSWLb, 1924451911
  11. subtaskIndex=0: row=160000 : AuPlM, 1255017876

To stop your job and delete your FlinkDeployment you can simply:

  1. kubectl delete -f seatunnel-flink.yaml

Zeta (local-mode)

You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:

  1. kubectl logs -f seatunnel

looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):

  1. ...
  2. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25673: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : hRJdE, 1295862507
  3. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25674: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : kXlew, 935460726
  4. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25675: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : FrNOT, 1714358118
  5. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25676: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : kSajX, 126709414
  6. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25677: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : YhpQv, 2020198351
  7. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25678: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : nApin, 691339553
  8. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25679: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : KZNNa, 1720773736
  9. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25680: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : uCUBI, 490868386
  10. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25681: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : oTLmO, 98770781
  11. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25682: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : UECud, 835494636
  12. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25683: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : XNegY, 1602828896
  13. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25684: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : LcFBx, 1400869177
  14. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25685: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : EqSfF, 1933614060
  15. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25686: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : BODIs, 1839533801
  16. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25687: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : doxcI, 970104616
  17. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25688: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : IEVYn, 371893767
  18. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25689: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : YXYfq, 1719257882
  19. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25690: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : LFWEm, 725033360
  20. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25691: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : ypUrY, 1591744616
  21. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25692: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : rlnzJ, 412162913
  22. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25693: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : zWKnt, 976816261
  23. 2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25694: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : PXrsk, 43554541

To stop your job and delete your FlinkDeployment you can simply:

  1. kubectl delete -f seatunnel.yaml

Zeta (cluster-mode)

You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:

  1. kubectl exec -it seatunnel-1 -- tail -f /opt/seatunnel/logs/seatunnel-engine-server.log | grep ConsoleSinkWriter

looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):

  1. ...
  2. 2023-10-10 08:05:07,283 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=7: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : IibHk, 820962465
  3. 2023-10-10 08:05:07,283 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=8: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : lmKdb, 1072498088
  4. 2023-10-10 08:05:07,283 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=9: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : iqGva, 918730371
  5. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=10: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : JMHmq, 1130771733
  6. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=11: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : rxoHF, 189596686
  7. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=12: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : OSblw, 559472064
  8. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=13: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : yTZjG, 1842482272
  9. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=14: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : RRiMg, 1713777214
  10. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=15: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : lRcsd, 1626041649
  11. 2023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=16: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : QrNNW, 41355294

To stop your job and delete your FlinkDeployment you can simply:

  1. kubectl delete -f seatunnel-cluster.yaml

Happy SeaTunneling!

What’s More

For now, you are already taking a quick look at SeaTunnel, you could see connector to find all source and sink SeaTunnel supported. Or see deployment if you want to submit your application in another kind of your engine cluster.