This section provides a quick guide to using SeaTunnel with Kubernetes.
Prerequisites
We assume that you have a local installations of the following:
So that the kubectl and helm commands are available on your local system.
For kubernetes minikube is our choice, at the time of writing this we are using version v1.23.3. You can start a cluster with the following command:
minikube start --kubernetes-version=v1.23.3
Installation
SeaTunnel docker image
To run the image with SeaTunnel, first create a Dockerfile:
Flink
FROM flink:1.13ENV SEATUNNEL_VERSION="2.3.5"ENV SEATUNNEL_HOME="/opt/seatunnel"RUN wget https://dlcdn.apache.org/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gzRUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gzRUN mv apache-seatunnel-${SEATUNNEL_VERSION} ${SEATUNNEL_HOME}RUN cd ${SEATUNNEL_HOME} && sh bin/install-plugin.sh ${SEATUNNEL_VERSION}
Then run the following commands to build the image:
docker build -t seatunnel:2.3.5-flink-1.13 -f Dockerfile .
Image seatunnel:2.3.5-flink-1.13 need to be present in the host (minikube) so that the deployment can take place.
Load image to minikube via:
minikube image load seatunnel:2.3.5-flink-1.13
Zeta (local-mode)
FROM openjdk:8ENV SEATUNNEL_VERSION="2.3.5"ENV SEATUNNEL_HOME="/opt/seatunnel"RUN wget https://dlcdn.apache.org/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gzRUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gzRUN mv apache-seatunnel-${SEATUNNEL_VERSION} ${SEATUNNEL_HOME}RUN cd ${SEATUNNEL_HOME} && sh bin/install-plugin.sh ${SEATUNNEL_VERSION}
Then run the following commands to build the image:
docker build -t seatunnel:2.3.5 -f Dockerfile .
Image seatunnel:2.3.5 need to be present in the host (minikube) so that the deployment can take place.
Load image to minikube via:
minikube image load seatunnel:2.3.5
Zeta (cluster-mode)
FROM openjdk:8ENV SEATUNNEL_VERSION="2.3.5"ENV SEATUNNEL_HOME="/opt/seatunnel"RUN wget https://dlcdn.apache.org/seatunnel/${SEATUNNEL_VERSION}/apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gzRUN tar -xzvf apache-seatunnel-${SEATUNNEL_VERSION}-bin.tar.gzRUN mv apache-seatunnel-${SEATUNNEL_VERSION} ${SEATUNNEL_HOME}RUN mkdir -p $SEATUNNEL_HOME/logsRUN cd ${SEATUNNEL_HOME} && sh bin/install-plugin.sh ${SEATUNNEL_VERSION}
Then run the following commands to build the image:
docker build -t seatunnel:2.3.5 -f Dockerfile .
Image seatunnel:2.3.5 need to be present in the host (minikube) so that the deployment can take place.
Load image to minikube via:
minikube image load seatunnel:2.3.5
Deploying the operator
Flink
The steps below provide a quick walk-through on setting up the Flink Kubernetes Operator. You can refer to Flink Kubernetes Operator - Quick Start for more details.
Notice: All the Kubernetes resources bellow are created in default namespace.
Install the certificate manager on your Kubernetes cluster to enable adding the webhook component (only needed once per Kubernetes cluster):
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
Now you can deploy the latest stable Flink Kubernetes Operator version using the included Helm chart:
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.3.1/helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \--set image.repository=apache/flink-kubernetes-operator
You may verify your installation via kubectl:
kubectl get podsNAME READY STATUS RESTARTS AGEflink-kubernetes-operator-5f466b8549-mgchb 1/1 Running 3 (23h ago) 16d
Zeta (local-mode)
none
Zeta (cluster-mode)
none
Run SeaTunnel Application
Run Application:: SeaTunnel already providers out-of-the-box configurations.
Flink
In this guide we are going to use seatunnel.streaming.conf:
env {parallelism = 1job.mode = "STREAMING"checkpoint.interval = 2000}source {FakeSource {result_table_name = "fake"row.num = 160000schema = {fields {name = "string"age = "int"}}}}transform {FieldMapper {source_table_name = "fake"result_table_name = "fake1"field_mapper = {age = agename = new_name}}}sink {Console {source_table_name = "fake1"}}
Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.
kubectl create cm seatunnel-config \--from-file=seatunnel.streaming.conf=seatunnel.streaming.conf
Once the Flink Kubernetes Operator is running as seen in the previous steps you are ready to submit a Flink (SeaTunnel) job:
Create
seatunnel-flink.yamlFlinkDeployment manifest:apiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata:name: seatunnel-flink-streaming-examplespec:image: seatunnel:2.3.5-flink-1.13flinkVersion: v1_13flinkConfiguration:taskmanager.numberOfTaskSlots: "2"serviceAccount: flinkjobManager:replicas: 1resource:memory: "1024m"cpu: 1taskManager:resource:memory: "1024m"cpu: 1podTemplate:spec:containers:- name: flink-main-containervolumeMounts:- name: seatunnel-configmountPath: /data/seatunnel.streaming.confsubPath: seatunnel.streaming.confvolumes:- name: seatunnel-configconfigMap:name: seatunnel-configitems:- key: seatunnel.streaming.confpath: seatunnel.streaming.confjob:jarURI: local:///opt/seatunnel/starter/seatunnel-flink-13-starter.jarentryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlinkargs: ["--config", "/data/seatunnel.streaming.conf"]parallelism: 2upgradeMode: stateless
Run the example application:
kubectl apply -f seatunnel-flink.yaml
Zeta (local-mode)
In this guide we are going to use seatunnel.streaming.conf:
env {parallelism = 2job.mode = "STREAMING"checkpoint.interval = 2000}source {FakeSource {parallelism = 2result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}}sink {Console {}}
Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.
kubectl create cm seatunnel-config \--from-file=seatunnel.streaming.conf=seatunnel.streaming.conf
Create
seatunnel.yaml:apiVersion: v1kind: Podmetadata:name: seatunnelspec:containers:- name: seatunnelimage: seatunnel:2.3.5command: ["/bin/sh","-c","/opt/seatunnel/bin/seatunnel.sh --config /data/seatunnel.streaming.conf -e local"]resources:limits:cpu: "1"memory: 4Grequests:cpu: "1"memory: 2GvolumeMounts:- name: seatunnel-configmountPath: /data/seatunnel.streaming.confsubPath: seatunnel.streaming.confvolumes:- name: seatunnel-configconfigMap:name: seatunnel-configitems:- key: seatunnel.streaming.confpath: seatunnel.streaming.conf
Run the example application:
kubectl apply -f seatunnel.yaml
Zeta (cluster-mode)
In this guide we are going to use seatunnel.streaming.conf:
env {parallelism = 2job.mode = "STREAMING"checkpoint.interval = 2000}source {FakeSource {parallelism = 2result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}}sink {Console {}}
Generate a configmap named seatunnel-config in Kubernetes for the seatunnel.streaming.conf so that we can mount the config content in pod.
kubectl create cm seatunnel-config \--from-file=seatunnel.streaming.conf=seatunnel.streaming.conf
Then, we use the following command to load some configuration files used by the seatunnel cluster into the configmap
Create the yaml file locally as follows
- Create
hazelcast-client.yaml:
hazelcast-client:cluster-name: seatunnelproperties:hazelcast.logging.type: log4j2network:cluster-members:- localhost:5801
- Create
hazelcast.yaml:
hazelcast:cluster-name: seatunnelnetwork:rest-api:enabled: trueendpoint-groups:CLUSTER_WRITE:enabled: trueDATA:enabled: truejoin:tcp-ip:enabled: truemember-list:- localhostport:auto-increment: falseport: 5801properties:hazelcast.invocation.max.retry.count: 20hazelcast.tcp.join.port.try.count: 30hazelcast.logging.type: log4j2hazelcast.operation.generic.thread.count: 50
- Create
seatunnel.yaml:
seatunnel:engine:history-job-expire-minutes: 1440backup-count: 1queue-type: blockingqueueprint-execution-info-interval: 60print-job-metrics-info-interval: 60slot-service:dynamic-slot: truecheckpoint:interval: 10000timeout: 60000storage:type: hdfsmax-retained: 3plugin-config:namespace: /tmp/seatunnel/checkpoint_snapshotstorage.type: hdfsfs.defaultFS: file:///tmp/ # Ensure that the directory has written permission
Create congfigmaps for the configuration file using the following command
kubectl create configmap hazelcast-client --from-file=hazelcast-client.yamlkubectl create configmap hazelcast --from-file=hazelcast.yamlkubectl create configmap seatunnelmap --from-file=seatunnel.yaml
Deploy Reloader to achieve hot deployment We use the Reloader here to automatically restart the pod when the configuration file or other modifications are made. You can also directly give the value of the configuration file and do not use the Reloader
wget https://raw.githubusercontent.com/stakater/Reloader/master/deployments/kubernetes/reloader.yamlkubectl apply -f reloader.yaml
- Create
seatunnel-cluster.yml: ```yaml apiVersion: v1 kind: Service metadata: name: seatunnel spec: selector: app: seatunnel ports:- port: 5801 name: seatunnel clusterIP: None
apiVersion: apps/v1 kind: StatefulSet metadata: name: seatunnel annotations: configmap.reloader.stakater.com/reload: “hazelcast,hazelcast-client,seatunnelmap” spec: serviceName: “seatunnel” replicas: 3 # modify replicas according to your case selector: matchLabels: app: seatunnel template: metadata: labels: app: seatunnel spec: containers:
- name: seatunnelimage: seatunnel:2.3.5imagePullPolicy: IfNotPresentports:- containerPort: 5801name: clientcommand: ["/bin/sh","-c","/opt/seatunnel/bin/seatunnel-cluster.sh -DJvmOption=-Xms2G -Xmx2G"]resources:limits:cpu: "1"memory: 4Grequests:cpu: "1"memory: 2GvolumeMounts:- mountPath: "/opt/seatunnel/config/hazelcast.yaml"name: hazelcastsubPath: hazelcast.yaml- mountPath: "/opt/seatunnel/config/hazelcast-client.yaml"name: hazelcast-clientsubPath: hazelcast-client.yaml- mountPath: "/opt/seatunnel/config/seatunnel.yaml"name: seatunnelmapsubPath: seatunnel.yaml- mountPath: /data/seatunnel.streaming.confname: seatunnel-configsubPath: seatunnel.streaming.confvolumes:- name: hazelcastconfigMap:name: hazelcast- name: hazelcast-clientconfigMap:name: hazelcast-client- name: seatunnelmapconfigMap:name: seatunnelmap- name: seatunnel-configconfigMap:name: seatunnel-configitems:- key: seatunnel.streaming.confpath: seatunnel.streaming.conf
- Starting a cluster:```bashkubectl apply -f seatunnel-cluster.yml
Then modify the seatunnel configuration in pod using the following command
kubectl edit cm hazelcast
Change the member-list option to your cluster address
This uses the headless service access mode
The format for accessing between general pods is [pod-name].[service-name].[namespace].svc.cluster.local
for example:
- seatunnel-0.seatunnel.default.svc.cluster.local- seatunnel-1.seatunnel.default.svc.cluster.local- seatunnel-2.seatunnel.default.svc.cluster.local
kubectl edit cm hazelcast-client
Change the cluster-members option to your cluster address
for example:
- seatunnel-0.seatunnel.default.svc.cluster.local:5801- seatunnel-1.seatunnel.default.svc.cluster.local:5801- seatunnel-2.seatunnel.default.svc.cluster.local:5801
Later, you will see that the pod automatically restarts and updates the seatunnel configuration
kubectl edit cm hazelcast-client
After we wait for all pod updates to be completed, we can use the following command to check if the configuration inside the pod has been updated
kubectl exec -it seatunnel-0 -- cat /opt/seatunnel/config/hazelcast-client.yaml
Afterwards, we can submit tasks to any pod
kubectl exec -it seatunnel-0 -- /opt/seatunnel/bin/seatunnel.sh --config /data/seatunnel.streaming.conf
See The Output
Flink
You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:
kubectl logs -f deploy/seatunnel-flink-streaming-example
looks like the below:
...2023-01-31 12:13:54,349 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from SCHEDULED to DEPLOYING.2023-01-31 12:13:56,684 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (attempt #0) with attempt id 1665d2d011b2f6cf6525c0e5e75ec251 to seatunnel-flink-streaming-example-taskmanager-1-1 @ 100.103.244.106 (dataPort=39137) with allocation id fbe162650c4126649afcdaff00e468752023-01-31 12:13:57,794 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from DEPLOYING to INITIALIZING.2023-01-31 12:13:58,203 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SeaTunnel FakeSource -> Sink Writer: Console (1/1) (1665d2d011b2f6cf6525c0e5e75ec251) switched from INITIALIZING to RUNNING.
If OOM error accur in the log, you can decrease the row.num value in seatunnel.streaming.conf
To expose the Flink Dashboard you may add a port-forward rule:
kubectl port-forward svc/seatunnel-flink-streaming-example-rest 8081
Now the Flink Dashboard is accessible at localhost:8081.
Or launch minikube dashboard for a web-based Kubernetes user interface.
The content printed in the TaskManager Stdout log:
kubectl logs \-l 'app in (seatunnel-flink-streaming-example), component in (taskmanager)' \--tail=-1 \-f
looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):
...subtaskIndex=0: row=159991 : VVgpp, 978840000subtaskIndex=0: row=159992 : JxrOC, 1493825495subtaskIndex=0: row=159993 : YmCZR, 654146216subtaskIndex=0: row=159994 : LdmUn, 643140261subtaskIndex=0: row=159995 : tURkE, 837012821subtaskIndex=0: row=159996 : uPDfd, 2021489045subtaskIndex=0: row=159997 : mjrdG, 2074957853subtaskIndex=0: row=159998 : xbeUi, 864518418subtaskIndex=0: row=159999 : sSWLb, 1924451911subtaskIndex=0: row=160000 : AuPlM, 1255017876
To stop your job and delete your FlinkDeployment you can simply:
kubectl delete -f seatunnel-flink.yaml
Zeta (local-mode)
You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:
kubectl logs -f seatunnel
looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):
...2023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25673: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : hRJdE, 12958625072023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25674: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : kXlew, 9354607262023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25675: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : FrNOT, 17143581182023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25676: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : kSajX, 1267094142023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25677: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : YhpQv, 20201983512023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25678: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : nApin, 6913395532023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25679: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : KZNNa, 17207737362023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25680: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : uCUBI, 4908683862023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25681: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : oTLmO, 987707812023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25682: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : UECud, 8354946362023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25683: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : XNegY, 16028288962023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25684: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : LcFBx, 14008691772023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25685: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : EqSfF, 19336140602023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25686: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : BODIs, 18395338012023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25687: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : doxcI, 9701046162023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25688: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : IEVYn, 3718937672023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25689: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : YXYfq, 17192578822023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25690: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : LFWEm, 7250333602023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25691: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : ypUrY, 15917446162023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25692: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : rlnzJ, 4121629132023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25693: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : zWKnt, 9768162612023-10-07 08:20:12,797 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=25694: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : PXrsk, 43554541
To stop your job and delete your FlinkDeployment you can simply:
kubectl delete -f seatunnel.yaml
Zeta (cluster-mode)
You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:
kubectl exec -it seatunnel-1 -- tail -f /opt/seatunnel/logs/seatunnel-engine-server.log | grep ConsoleSinkWriter
looks like the below (your content may be different since we use FakeSource to automatically generate random stream data):
...2023-10-10 08:05:07,283 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=7: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : IibHk, 8209624652023-10-10 08:05:07,283 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=8: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : lmKdb, 10724980882023-10-10 08:05:07,283 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=9: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : iqGva, 9187303712023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=10: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : JMHmq, 11307717332023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=11: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : rxoHF, 1895966862023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=12: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : OSblw, 5594720642023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=13: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : yTZjG, 18424822722023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=14: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : RRiMg, 17137772142023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=15: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : lRcsd, 16260416492023-10-10 08:05:07,284 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=1 rowIndex=16: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : QrNNW, 41355294
To stop your job and delete your FlinkDeployment you can simply:
kubectl delete -f seatunnel-cluster.yaml
Happy SeaTunneling!
What’s More
For now, you are already taking a quick look at SeaTunnel, you could see connector to find all source and sink SeaTunnel supported. Or see deployment if you want to submit your application in another kind of your engine cluster.
