flink的作业部署在k8s上

Why Flink Kubernetes Operator

The Apache Flink documentation highlights the key aspects of the Flink Kubernetes Operator:

The Flink Kubernetes Operator extends the Kubernetes API with the ability to manage and operate Flink Deployments. The operator features the following amongst others:

- Deploy and monitor Flink Application and Session deployments - Upgrade, suspend and delete deployments - Full logging and metrics integration - Flexible deployments and native integration with Kubernetes tooling - Flink Job Autoscaler

If you have previously managed on-premises Flink clusters, ad-hoc Flink clusters, or ephemeral Flink clusters, where you were responsible for all of the deployment and management tasks, then you understand the effort required to achieve such a solution without the capabilities provided by the Flink Operator.

Data as a Service: Integrating Apache Flink

In this hands-on guide, I will explain how to integrate Flink Kubernetes Operator and use it to manage the Flink Clusters. The focus here is to have a functional approach for executing Flink jobs, while further blog posts will show how we can leverage Apache Flink for concrete scenarios.

Versions

  • Minikube: 1.32.0
  • Kubernetes: 1.28.3
  • Flink Operator: 1.7.0
  • Apache Flink: 1.17.0
  • MinIO: RELEASE.2024–01–13T07–53–03Z
  • Helm: 3.13.3

Prerequisites

You need to start minikube to have a local setup of Kubernetes, as a prerequisite you need to install minikube and ensure that exists a Docker container or a Virtual Machine environment.

Execute:

1
minikube start

Hands-on guide: Deploying MinIO

1 — Create a dedicated namespace on Kubernetes

1
kubectl create ns minio-dev

2 — Create the deployment of MinIO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: minio
  name: deploy-minio
  namespace: minio-dev
spec:
  replicas: 1
  selector:
    matchLabels:
      app: minio
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: minio
    spec:
      volumes:
      - name: localvolume
        hostPath:
          path: /mnt/disk1/data
          type: DirectoryOrCreate
      containers:
      - name: minio
        image: quay.io/minio/minio:RELEASE.2024-01-13T07-53-03Z
        command:
        - /bin/bash
        - -c
        args:
        - minio server /data --address :9000 --console-address :9090
        volumeMounts:
        - mountPath: /data
          name: localvolume
status: {}

This deployment will create one instance of MinIO (replicas: 1), exposing the WebUi on port9090 and the rest api will be available on port 9000.

Execute:

1
kubectl -n minio-dev create -f <yaml_file_containing_the_example_above>

3 — Expose the WebConsole and the Rest Api of MinIO internally

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: v1
kind: Service
metadata:
  labels:
    app: minio
  name: minio-svc
  namespace: minio-dev
spec:
  ports:
  - name: webconsole
    port: 9090
    protocol: TCP
    targetPort: 9090
  - name: api
    port: 9000
    protocol: TCP
    targetPort: 9000
  selector:
    app: minio
  type: ClusterIP
status:
  loadBalancer: {}

In Kubernetes, there are [several ways to expose a service](http://in kubernetes, there are several ways to expose a service, providing connectivity and access to applications within the cluster or from external sources. the two primary methods are clusterip and nodeport, and there’s also loadbalancer and ingress/), and in this guide is used the ClusterIP method which will expose the minIO on an internal IP address that is only reachable within the Kubernetes cluster.

Execute:

1
kubectl -n minio-dev create -f <yaml_file_containing_the_example_above>

4 — External access to MinIO WebConsole

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: minio-ingress
  namespace: minio-dev
spec:
  rules:
    - http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: minio-svc
                port:
                  number: 9090

To enable external access to the MinIO WebConsole, we will utilize an Ingress. There are more approaches, but we decided to use this one, due to the intrinsic advantages of using Ingress, which you can see here.

There is a requirement step to have the Ingress Controller enabled, you must execute the following command:

1
minikube addons enable ingress

Execute the following command to verify that the NGINX Ingress controller is running:

1
kubectl get pods -n ingress-nginx

Execute:

1
kubectl -n minio-dev create -f <yaml_file_containing_the_example_above>

5 — Accessing the MinIO WebConsole

1
2
3
4
5
kubectl -n minio-dev get ingress

# output
NAME            CLASS   HOSTS   ADDRESS        PORTS   AGE
minio-ingress   nginx   *       192.168.64.3   80      2d4h

Open your web browser and enter the provided address, in this example, is 192.168.64.3. The MinIO user interface will then become accessible.

img

MinIO WebConsole

The default credentials are user: minioadmin and password: minioadmin. If you pretend to modify these credentials, which is highly recommended, you must set those environment variables.

You might wonder about the choice of MinIO in our configuration. In this setup, MinIO serves as the external storage solution for saving checkpoints, savepoints, and managing information related to the setup of high availability. As always you have other options to store this information such as AWS S3, GCS, or Azure Blob Storage

Bonus

If you access the MinIO WebConsole through the Ingress, file uploads are limited to 1Mb and to overcome this restriction, modify the Ingress rule by adding the annotation nginx.ingress.kubernetes.io/proxy-body-size: 500m to allow uploads with a maximum file size of 500Mb

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: minio-ingress
  namespace: minio-dev
  annotations:
    nginx.ingress.kubernetes.io/proxy-body-size: 500m
spec:
  rules:
    - http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: minio-svc
                port:
                  number: 9090

The fun part starts here :).

1 — Install Flink Kubernetes Operator

To install the operator we will use Helm, which is an open-source package manager for Kubernetes. If you don’t have it you need to follow this guide to install Helm.

Once you have Helm installed, you need to add the Flink Operator chart repository:

1
helm repo add flink-kubernetes-operator-1.7.0 https://archive.apache.org/dist/flink/flink-kubernetes-operator-1.7.0/

Useful commands:

  • Once the repository is added, you will be able to list the charts you can install:
1
helm search repo flink-kubernetes-operator-1.7.0
  • List all possible value settings for the chart of Flink Operator
1
helm show values flink-kubernetes-operator-1.7.0/flink-kubernetes-operator

You can explore the settings here to understand better what they mean and the purpose of the configs.

In our setup, some values will be overridden:

1
2
3
4
5
6
7
operatorServiceAccount:
  create: true
  annotations: {}
  name: "flink-operator"

watchNamespaces:
    - "flink-jobs"

The Flink Operator listens for Flink Deployments only on the flink-jobs namespace, while the Flink Operator is installed in the flink-operator namespace. This separation allows the isolation of resources by being possible to apply restrictions to the namespaces to prevent future problems within the cluster. Keep in mind that deploying Flink jobs across multiple namespaces is also feasible, but insuch scenario it’s necessary to update the attribute watchNamespaces .

Storing the configuration above in a file named values.yaml you are able to finally create the Flink Operator by executing the following commands 🎉:

1
2
3
kubectl create ns flink-operator
kubectl create ns flink-jobs
helm -n flink-operator install -f values.yaml flink-kubernetes-operator flink-kubernetes-operator-1.7.0/flink-kubernetes-operator --set webhook.create=false

After executing the commands above, you will see the Flink Operator running with success:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
kubectl -n flink-operator get all

#output
NAME                                             READY   STATUS    RESTARTS        AGE
pod/flink-kubernetes-operator-5c46f67cd7-vrdpn   1/1     Running   6 (7h40m ago)   3d5h

NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-kubernetes-operator   1/1     1            1           3d5h

NAME                                                   DESIRED   CURRENT   READY   AGE
replicaset.apps/flink-kubernetes-operator-5c46f67cd7   1         1         1       3d5h

2 — Deploy a Flink Job

To deploy a simple Flink job using the Flink Operator with local disk storage for checkpoints/savepoints, please note that if the disk is ephemeral all state will be lost. Simply create a definition similar to the following:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink-jobs
  name: basic-example-1
spec:
  image: flink-1:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://test/savepoins
    state.checkpoints.dir: s3://test/checkpoints
    s3.endpoint: http://minio-svc.minio-dev:9000
    s3.path.style.access: "true"
    s3.path.style.access: "true"
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1000m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1000m"
      cpu: 0.5
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: TZ  # 设置容器运行的时区
              value: Asia/Shanghai
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 1
    state: running

在上面添加时区即可,能保证打印的日志是正常的时间+8

image-20240710151920947

This definition will create a single JobManager, a TaskManager with one slot, and a Flink Job with a parallelism of 1. Due to the instantion of MinIO it’s possible to store the the checkpoints and savepoints in MinIO.

The Ingress section above is to automate the creation of the Ingress to allow the external UI access. To access the UI, open the browser and navigate to the endpoint http://ip/flink-jobs/basic-example/ .

note: To know the ip address double-check the section Accessing the MinIO WebConsole .

img

Flink UI

Congratulations! 🎉🥳 You now have a Flink Job running through Flink Operator. By using a similar setup, you unlock numerous advantages, ensuring a streamlined experience for your infrastructure clients and reducing their work a lot.

In this guide you are able to understand the internal concepts of Flink and Kubernetes.

Bonus 1

If you have an eagle eye, you notice that the image used is flink-1:1.17 . This is because the utilization of the S3 schema for storing checkpoints and savepoints, and to make use the S3 schema, it’s required to include the necessary plugins, flink-s3-fs-hadoop. In this scenario, it was used a custom Flink image to enable the mandatory plugins.

1
2
3
4
5
6
7
8
9
FROM flink:1.17

RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop/ &&\
    mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/ &&\
    chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop

USER flink

CMD exec /usr/share/flink/bin/flink-console.sh ${COMPONENT} ${ARGS}

In the documentation are explained the available approaches.

Bonus 2

If you don’t need to use a custom Flink image, it’s possible to enable the plugins using an environment variable, ENABLE_BUILT_IN_PLUGINS:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink-jobs
  name: basic-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-s3-fs-hadoop-1.17.2.jar
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir: s3://test/savepoins
    state.checkpoints.dir: s3://test/checkpoints
    s3.endpoint: http://minio-svc.minio-dev:9000
    s3.path.style.access: "true"
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
  ingress:
    template: "/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "1000m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1000m"
      cpu: 0.5
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 1
    state: running

Bonus 3

In order to simplify the management of Kubernetes Cluster and to be easy to undertand what going on, consider install the Kubernetes Dashboard and here exists a guide describing the steps to create an user to access the Kubernetes Dasboard.

img

Kubernetes Dashboard

参考教程部署一下 kubernets-dashboard

1
2
3
4
# Add kubernetes-dashboard repository
helm repo add kubernetes-dashboard https://kubernetes.github.io/dashboard/
# Deploy a Helm Release named "kubernetes-dashboard" using the kubernetes-dashboard chart
helm upgrade --install kubernetes-dashboard kubernetes-dashboard/kubernetes-dashboard --create-namespace --namespace kubernetes-dashboard

image-20240710152324891

In the next series I will delve into the common issues and tips for optimizing your Flink applications and deploys.

1
2
3
4
5
6
7
8
#下载镜像
kubectl -n kubernetes-dashboard get pods | awk '{print $1}' | xargs -r -I '{}' kubectl -n kubernetes-dashboard get pods {} -o=jsonpath='{.status.containerStatuses[0].image}{"\n"}'
Error from server (NotFound): pods "NAME" not found
docker.io/kubernetesui/dashboard-api:1.7.0
docker.io/kubernetesui/dashboard-auth:1.1.3
kong:3.6
docker.io/kubernetesui/dashboard-metrics-scraper:1.1.1
docker.io/kubernetesui/dashboard-web:1.4.0

traefik v3 对应的文件为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 kubectl apply -f - <<EOF
∙ apiVersion: traefik.io/v1alpha1
kind: ServersTransport
metadata:
  name: skipverify
  namespace: kubernetes-dashboard
spec:
  insecureSkipVerify: true

∙ EOF
serverstransport.traefik.io/skipverify created

----
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
  name: dashboard-k8s
  namespace: kubernetes-dashboard
spec:
  entryPoints:
    - websecure
  routes:
    - match: "Host(`dashboard.traefik.k8s`)"
      kind: Rule
      services:
        - name: kubernetes-dashboard-kong-proxy
          port: 443
          serversTransport: skipverify
  tls:
    secretName: dashboard-tls

部署成功后访问一下页面

image-20240710161737940

接下来获取密码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
cat > dashboard-user.yaml << EOF
apiVersion: v1
kind: ServiceAccount
metadata:
  name: admin-user
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: admin-user
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: cluster-admin
subjects:
- kind: ServiceAccount
  name: admin-user
  namespace: kube-system
EOF

kubectl  apply -f dashboard-user.yaml

# 创建token
kubectl -n kube-system create token admin-user

image-20240710162023138

查看日志看看

image-20240710162044249

在之前的 Blog “Flink on Kubernetes - Kubernetes Operator - 安装 Operator” 介绍了使用 helm 方式 安装 flink operator, 若要重新安装,则需要先卸载,那flink operator 卸载后,会影响正在运行的 Flink Job?

接下来,一起探索…

1
2
3
4
5
6
7
8
[root@k8s01 k8s_yaml]# helm uninstall flink-kubernetes-operator -n flink
These resources were kept due to the resource policy:
[RoleBinding] flink-role-binding
[Role] flink
[ServiceAccount] flink

release "flink-kubernetes-operator" uninstalled
[root@k8s01 k8s_yaml]#

执行完后,

  • 1.Flink Job 运行正常(kubectl -n flink get pod |grep “basic-application-deployment-only” )
1
2
3
[root@k8s01 k8s_yaml]# kubectl -n flink get pod |grep "basic-application-deployment-only"
basic-application-deployment-only-ingress-tz-bd954c447-s446f   1/1     Running   0               35m
basic-application-deployment-only-ingress-tz-taskmanager-1-1   1/1     Running   0               35m
  • 2.FlinkDeployment 也正常展示 (kubectl get flinkdeployment -n flink)
1
2
3
[root@k8s01 k8s_yaml]# kubectl get flinkdeployment -n flink
NAME                                           JOB STATUS   LIFECYCLE STATE
basic-application-deployment-only-ingress-tz   RUNNING      STABLE
  • 3.CRD 仍然存在
1
2
3
[root@k8s01 k8s_yaml]# kubectl get crd -n flink |grep flink
flinkdeployments.flink.apache.org                     2024-03-29T13:05:39Z
flinksessionjobs.flink.apache.org                     2024-03-29T13:05:39Z
  • 4.但 flink-kubernetes-operator POD 已删除。

接下来,使用 helm 安装 flink-kubernetes-operator

大家可访问https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/helm/#overriding-configuration-parameters-during-helm-install,了解官网的介绍,在 helm install 的时候,我们可以通过指定配置参数来安装。例如,现在我们需要对 flink-kubernetes-operator的 POD 增加 TZ 环境变量,内容如下:

1
2
3
  env:
    - name: TZ
      value: Asia/Shanghai

通过 operatorPod 指定 env 参数

1
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink-operator --set 'operatorPod.env[0].name=TZ,operatorPod.env[0].value=Asia/Shanghai'

Output logs:

1
2
3
4
5
6
7
8
9
[root@k8s01 k8s_yaml]# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink-operator --set 'operatorPod.env[0].name=TZ,operatorPod.env[0].value=Asia/Shanghai'
WARNING: Kubernetes configuration file is group-readable. This is insecure. Location: /home/###/.kube/config
WARNING: Kubernetes configuration file is world-readable. This is insecure. Location: /home/###/.kube/config
NAME: flink-kubernetes-operator
LAST DEPLOYED: Fri Jul
NAMESPACE: flink-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None

此时 查看 flink-kubernetes-operator logs 是正确。

1
2
3
4
5
6
[root@k8s01 k8s_yaml]# kubectl get pod -n flink
NAME                                                           READY   STATUS    RESTARTS        AGE
basic-application-deployment-only-ingress-tz-bd954c447-s446f   1/1     Running   0               42m
basic-application-deployment-only-ingress-tz-taskmanager-1-1   1/1     Running   0               41m
busybox                                                        1/1     Running   649 (24m ago)   38d
flink-kubernetes-operator-7d5d8dcb64-bqnpp                     2/2     Running   0               117s

查看 YAML

1
kubectl get pod flink-kubernetes-operator-7d5d8dcb64-bqnpp -n flink -o yaml

查看在 containers 包含以下配置:

1
2
    - name: TZ
      value: Asia/Shanghai

总结

目前来看,Operator 和 Flink Job 时区都已设置完成。 得到的结论,卸载 flink-kubernetes-operator,并没有影响正在运行的 Flink Job。

别忘记,在 Java 程序部署 Flink Job 的方法中,添加 时区配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
PodTemplateSpec podTemplateSpec = new PodTemplateSpec();
PodSpec podSpec = new PodSpec();
Container container = new Container();
container.setName("flink-main-container"); // container name 不可修改
EnvVar envVar01 = new EnvVar();
envVar01.setName("TZ");
envVar01.setValue("Asia/Shanghai");
container.setEnv(Collections.singletonList(envVar01));
podSpec.setContainers(Collections.singletonList(container));
podTemplateSpec.setSpec(podSpec);
flinkDeploymentSpec.setPodTemplate(podTemplateSpec);

juicefs 作业

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
juicefs format --storage=minio --bucket=http://10.7.3.100:31889/juicefs --access-key=minioadmin --secret-key=minioadmin "mysql://root:root@(10.7.3.100:30006)/juicefs" juicefsminio
2024/07/10 17:13:58.051333 juicefs[2598847] <INFO>: Meta address: mysql://root:****@(10.7.3.100:30006)/juicefs [interface.go:497]
2024/07/10 17:13:58.058010 juicefs[2598847] <WARNING>: The latency to database is too high: 6.002499ms [sql.go:260]
2024/07/10 17:13:58.062734 juicefs[2598847] <INFO>: Data use minio://10.7.3.100:31889/juicefs/juicefsminio/ [format.go:471]
2024/07/10 17:13:58.978178 juicefs[2598847] <INFO>: Volume is formatted as {
  "Name": "juicefsminio",
  "UUID": "b1f089b9-4e22-4e80-8746-7d55ea9688b0",
  "Storage": "minio",
  "Bucket": "http://10.7.3.100:31889/juicefs",
  "AccessKey": "minioadmin",
  "SecretKey": "removed",
  "BlockSize": 4096,
  "Compression": "none",
  "EncryptAlgo": "aes256gcm-rsa",
  "KeyEncrypted": true,
  "TrashDays": 1,
  "MetaVersion": 1,
  "MinClientVersion": "1.1.0-A",
  "DirStats": true
} [format.go:508]

创建一个 file 看看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink-jobs
  name: basic-example
spec:
  image:  10.7.20.12:5000/flink-juicefs:v1.1.3
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    s3.endpoint: "http://10.43.180.28:9000"
    s3.path.style.access: "true"
    s3.access.key: "MpCurbSf0FdWTbO9XCzB"
    s3.secret.key: "d6VSjJAAyHqszi3RFif4Yg1UEBd9T4nL6hLcyK71"
    state.backend.incremental: "true"
    execution.checkpointing.interval: "300000ms"
    state.savepoints.dir: "s3://flink-data/savepoints"
    state.checkpoints.dir: "s3://flink-data/checkpoints"
    env.java.opts.jobmanager: -Duser.timezone=GMT+08
    env.java.opts.taskmanager: -Duser.timezone=GMT+08
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Shanghai
  job:
    jarURI: local:///opt/flink/lib/flink-sql-submit-1.0-SNAPSHOT.jar
    args: ["-f", "s3://flink-tasks/hudi2mysql.sql", "-m", "streaming", "-e", "http://10.43.180.28:9000", "-a", "MpCurbSf0FdWTbO9XCzB", "-s", "d6VSjJAAyHqszi3RFif4Yg1UEBd9T4nL6hLcyK71"]
    parallelism: 1
    upgradeMode: stateless
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/hadoop/etc/hadoop/
              name: core-site
      volumes:
        - name: core-site
          configMap:
            name: core-site

这个任务提交报错最多的是不认识 jfs,以及 class not found 解决办法注意如下

Dockerfile

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
# 省略了 License,特此声明

#FROM adoptopenjdk/openjdk11:jre-11.0.9_11.1-alpine
#
## 安装需要的软件
## snappy 是一个压缩库
## libc6-compat 是 ANSI C 的函数库
#RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
## RUN apk add --no-cache bash snappy libc6-compat \
#RUN apk add --no-cache bash snappy gnupg   \
# && apk add gettext
## Flink 容器里的环境变量
## Flink 软件的安装目录在 /opt
#ENV FLINK_INSTALL_PATH=/opt
## Flikn 的解压目录在 /opt/flink
#ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
## Flink 的依赖包目录在 /opt/flink/lib
#ENV FLINK_LIB_DIR $FLINK_HOME/lib
## Flink 的插件目录在 /opt/flink/plugins
#ENV FLINK_PLUGINS_DIR $FLINK_HOME/plugins
## 这个不知道是什么目录
#ENV FLINK_OPT_DIR $FLINK_HOME/opt
## 这是用户代码的 Jar 包目录,/opt/flink/artifacts
#ENV FLINK_JOB_ARTIFACTS_DIR $FLINK_INSTALL_PATH/artifacts
## 更新一下 PATH,把 Flink 的二进制文件的目录加上 /opt/flink/bin
#ENV PATH $PATH:$FLINK_HOME/bin
#
## 这些 ARG 可以在构建镜像的时候输入参数,默认值都是 NOT_SET,如果设置了就会去找对应的目录,并且打入镜像里
## Flink 的发行版路径,可以在本地指定任何下载或者自行打包的 Flink 发行版包
#ARG flink_dist=NOT_SET
## 用户写的业务代码路径
#ARG job_artifacts=NOT_SET
## Python 的版本,填2或者3
#ARG python_version=NOT_SET
## Hadoop Jar 包的依赖路径
#ARG hadoop_jar=NOT_SET*
#
## 安装 Python,根据前面填的 python_version 这个环境变量,不填就不装
#RUN \
#  if [ "$python_version" = "2" ]; then \
#    apk add --no-cache python; \
#  elif [ "$python_version" = "3" ]; then \
#    apk add --no-cache python3 && ln -s /usr/bin/python3 /usr/bin/python; \
#  fi
#
## 把 Flink 发行版和 Hadoop jar(不一定有 Hadoop)放在 /opt/flink 目录
#ADD $flink_dist $hadoop_jar $FLINK_INSTALL_PATH/
## 用户代码放在 /opt/artifacts
#ADD $job_artifacts/* $FLINK_JOB_ARTIFACTS_DIR/
#ENV GOSU_VERSION 1.11
#COPY gosu-amd64 /usr/local/bin/gosu
#COPY gosu-amd64.asc /usr/local/bin/gosu.asc
#
#RUN set -ex; \
#  export GNUPGHOME="$(mktemp -d)"; \
#  for server in ha.pool.sks-keyservers.net $(shuf -e \
#                          hkp://p80.pool.sks-keyservers.net:80 \
#                          keyserver.ubuntu.com \
#                          hkp://keyserver.ubuntu.com:80 \
#                          pgp.mit.edu) ; do \
#      gpg --batch --keyserver "$server" --recv-keys B42F6819007F00F88E364FD4036A9C25BF357DD4 && break || : ; \
#  done && \
#  gpg --batch --verify /usr/local/bin/gosu.asc /usr/local/bin/gosu; \
#  gpgconf --kill all; \
#  rm -rf "$GNUPGHOME" /usr/local/bin/gosu.asc; \
#  chmod +x /usr/local/bin/gosu; \
#  gosu nobody true
#RUN set -x && \
#  ln -s $FLINK_INSTALL_PATH/flink-[0-9]* $FLINK_HOME && \
#  for jar in $FLINK_JOB_ARTIFACTS_DIR/*.jar; do [ -f "$jar" ] || continue; ln -s $jar $FLINK_LIB_DIR; done && \
#  if [ -n "$python_version" ]; then ln -s $FLINK_OPT_DIR/flink-python-*-java-binding.jar $FLINK_LIB_DIR; fi && \
#  if [ -f ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* ]; then ln -s ${FLINK_INSTALL_PATH}/flink-shaded-hadoop* $FLINK_LIB_DIR; fi && \
#  # 创建 flink 用户组和 flink 用户,并且更改下面目录的用户权限
#  addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
#  chown -R flink:flink ${FLINK_INSTALL_PATH}/flink-* && \
#  chown -R flink:flink ${FLINK_JOB_ARTIFACTS_DIR}/ && \
#  chown -h flink:flink $FLINK_HOME  && \
#  sed -i 's/rest.address: localhost/rest.address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml && \
#  sed -i 's/rest.bind-address: localhost/rest.bind-address: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml && \
#  sed -i 's/jobmanager.bind-host: localhost/jobmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml && \
#  sed -i 's/taskmanager.bind-host: localhost/taskmanager.bind-host: 0.0.0.0/g' $FLINK_HOME/conf/flink-conf.yaml && \
#  sed -i '/taskmanager.host: localhost/d' $FLINK_HOME/conf/flink-conf.yaml
## 把这个脚本拷贝到镜像
#COPY docker/flink/docker-entrypoint.sh /
FROM flink:1.17.2-SN #注意jdk版本的影响
COPY  juicefs-hadoop-1.1-dev.jar  /opt/flink/lib/ # 这个jar包是自己编译的。
#COPY  flink-s3-fs-hadoop-1.17.2.jar /opt/flink/lib/
COPY  target/flink-sql-submit-1.0-SNAPSHOT.jar /opt/flink/lib/
#install hadoop
RUN set -ex; \
    wget -nv -O /opt/flink/lib/hudi-flink.jar https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/1.0.0-beta1/hudi-flink1.17-bundle-1.0.0-beta1.jar ; \
     wget -nv -O /opt/hadoop.tar.gz https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz ; \
        tar -xzf /opt/hadoop.tar.gz -C /opt/ ; \
        rm -rf /opt/hadoop.tar.gz ; \
           mv /opt/hadoop-3.3.5 /opt/hadoop ; \
     cp /opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.5.jar /opt/flink/lib/
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop/ &&\
    mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/ &&\
    chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop
COPY juicefs-hadoop-1.1-dev.jar /opt/hadoop/share/hadoop/common/lib/
COPY juicefs-hadoop-1.1-dev.jar /opt/hadoop/share/hadoop/mapreduce/lib/


ENV HADOOP_HOME=/opt/hadoop
ENV HADOOP_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*
1
docker build -t flink-juicefs:v1.17.4 .  -f Dockerfile.flink17

项目路径在:

1
 git@github.com:huangxiaofeng10047/flink-sql-submit.git (fetch)

项目运行的地方

image-20240711171253721

这个是 flink native job 作业提交方式,每一个任务独占 Flink。

记录一下时区的问题,刚开始采用的是 podTemplate 下放置时区,不生效,最后是在 flinkConf 中添加如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    s3.endpoint: "http://10.43.180.28:9000"
    s3.path.style.access: "true"
    s3.access.key: "MpCurbSf0FdWTbO9XCzB"
    s3.secret.key: "d6VSjJAAyHqszi3RFif4Yg1UEBd9T4nL6hLcyK71"
    state.backend.incremental: "true"
    execution.checkpointing.interval: "300000ms"
    state.savepoints.dir: "s3://flink-data/savepoints"
    state.checkpoints.dir: "s3://flink-data/checkpoints"
 # 这个位置是新增的。
    env.java.opts.jobmanager: -Duser.timezone=GMT+08
    env.java.opts.taskmanager: -Duser.timezone=GMT+08

新增之后时间正常了。

image-20240712104226199

在启动一个作业打印一下结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink-jobs
  name: basic-example-show
spec:
  image:  10.7.20.12:5000/flink-juicefs:v1.1.3
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    s3.endpoint: "http://10.43.180.28:9000"
    s3.path.style.access: "true"
    s3.access.key: "MpCurbSf0FdWTbO9XCzB"
    s3.secret.key: "d6VSjJAAyHqszi3RFif4Yg1UEBd9T4nL6hLcyK71"
    state.backend.incremental: "true"
    execution.checkpointing.interval: "300000ms"
    state.savepoints.dir: "s3://flink-data/savepoints"
    state.checkpoints.dir: "s3://flink-data/checkpoints"
    env.java.opts.jobmanager: -Duser.timezone=GMT+08
    env.java.opts.taskmanager: -Duser.timezone=GMT+08
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Shanghai
  job:
    jarURI: local:///opt/flink/lib/flink-sql-submit-1.0-SNAPSHOT.jar
    args: ["-f", "s3://flink-tasks/k8s-flink-sql-test-show.sql", "-m", "streaming", "-e", "http://10.43.180.28:9000", "-a", "MpCurbSf0FdWTbO9XCzB", "-s", "d6VSjJAAyHqszi3RFif4Yg1UEBd9T4nL6hLcyK71"]
    parallelism: 1
    upgradeMode: stateless
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/hadoop/etc/hadoop/
              name: core-site
      volumes:
        - name: core-site
          configMap:
            name: core-site

core-site.xml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>

    <property>
        <name>fs.s3a.endpoint</name>
        <value>http://10.43.180.28:9000</value>
        <description>AWS S3 endpoint to connect to. An up-to-date list is
            provided in the AWS Documentation: regions and endpoints. Without this
            property, the standard region (s3.amazonaws.com) is assumed.
        </description>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <value>MpCurbSf0FdWTbO9XCzB</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>d6VSjJAAyHqszi3RFif4Yg1UEBd9T4nL6hLcyK71</value>
    </property>

    <property>
        <name>fs.s3a.path.style.access</name>
        <value>true</value>
        <description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
            Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
        </description>
    </property>

    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>
            org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
        </value>
        <description>
            Comma-separated class names of credential provider classes which implement
            com.amazonaws.auth.AWSCredentialsProvider.

            When S3A delegation tokens are not enabled, this list will be used
            to directly authenticate with S3 and DynamoDB services.
            When S3A Delegation tokens are enabled, depending upon the delegation
            token binding it may be used
            to communicate with the STS endpoint to request session/role
            credentials.

            These are loaded and queried in sequence for a valid set of credentials.
            Each listed class must implement one of the following means of
            construction, which are attempted in order:
            * a public constructor accepting java.net.URI and
            org.apache.hadoop.conf.Configuration,
            * a public constructor accepting org.apache.hadoop.conf.Configuration,
            * a public static method named getInstance that accepts no
            arguments and returns an instance of
            com.amazonaws.auth.AWSCredentialsProvider, or
            * a public default constructor.

            Specifying org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider allows
            anonymous access to a publicly accessible S3 bucket without any credentials.
            Please note that allowing anonymous access to an S3 bucket compromises
            security and therefore is unsuitable for most use cases. It can be useful
            for accessing public data sets without requiring AWS credentials.

            If unspecified, then the default list of credential provider classes,
            queried in sequence, is:
            * org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider: looks
            for session login secrets in the Hadoop configuration.
            * org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider:
            Uses the values of fs.s3a.access.key and fs.s3a.secret.key.
            * com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports
            configuration of AWS access key ID and secret access key in
            environment variables named AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
            and AWS_SESSION_TOKEN as documented in the AWS SDK.
            * org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider: picks up
            IAM credentials of any EC2 VM or AWS container in which the process is running.
        </description>
    </property>

    <property>
        <name>fs.defaultFS</name>
        <value>jfs://juicefsminio/hudi-dir</value>
        <description>Optional, you can also specify full path "jfs://myjfs/path-to-dir" with location to use JuiceFS</description>
    </property>
    <property>
        <name>fs.jfs.impl</name>
        <value>io.juicefs.JuiceFileSystem</value>
    </property>
    <property>
        <name>fs.AbstractFileSystem.jfs.impl</name>
        <value>io.juicefs.JuiceFS</value>
    </property>
    <property>
        <name>juicefs.meta</name>
        <value>mysql://root:root@(10.43.211.245:3306)/juicefs</value>
    </property>
    <property>
        <name>juicefs.cache-dir</name>
        <value>/tmp/juicefs-cache-dir</value>
    </property>
    <property>
        <name>juicefs.cache-size</name>
        <value>1024</value>
    </property>
    <property>
        <name>juicefs.access-log</name>
        <value>/tmp/juicefs.access.log</value>
    </property>
    <property>
      <name>juicefs.superuser</name>
      <value>flink</value>
    </property>
</configuration>
Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up