Unlocking the Power of Flink with Kubernetes Operator:Simplify Data Management for Data as a Service

In my previous blog post, I discussed deploying Flink jobs on Kubernetes without using a Flink operator. While this approach was functional, it required manual actions to manage the Flink clusters. Today, we take a step further by adding Flink Operator on Kubernetes to orchestrate and manage the lifecycle of the Flink clusters efficiently.

Don’t forget to dive into the previous posts of the series Data as a Service:

Why Flink Kubernetes Operator

The Apache Flink documentation highlights the key aspects of the Flink Kubernetes Operator:

The Flink Kubernetes Operator extends the Kubernetes API with the ability to manage and operate Flink Deployments. The operator features the following amongst others:

  • Deploy and monitor Flink Application and Session deployments
  • Upgrade, suspend and delete deployments
  • Full logging and metrics integration
  • Flexible deployments and native integration with Kubernetes tooling
  • Flink Job Autoscaler

If you have previously managed on-premises Flink clusters, ad-hoc Flink clusters, or ephemeral Flink clusters, where you were responsible for all of the deployment and management tasks, then you understand the effort required to achieve such a solution without the capabilities provided by the Flink Operator.

Data as a Service: Integrating Apache Flink

In this hands-on guide, I will explain how to integrate Flink Kubernetes Operator and use it to manage the Flink Clusters. The focus here is to have a functional approach for executing Flink jobs, while further blog posts will show how we can leverage Apache Flink for concrete scenarios.

Versions

  • Minikube: 1.32.0
  • Kubernetes: 1.28.3
  • Flink Operator: 1.7.0
  • Apache Flink: 1.17.0
  • MinIO: RELEASE.2024–01–13T07–53–03Z
  • Helm: 3.13.3

Prerequisites

You need to start minikube to have a local setup of Kubernetes, as a prerequisite you need to install minikube and ensure that exists a Docker container or a Virtual Machine environment.

Execute:

minikube start

Hands-on guide: Deploying MinIO

1 — Create a dedicated namespace on Kubernetes

kubectl create ns minio-dev

2 — Create the deployment of MinIO

apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
app: minio
name: deploy-minio
namespace: minio-dev
spec:
replicas: 1
selector:
matchLabels:
app: minio
strategy: {}
template:
metadata:
creationTimestamp: null
labels:
app: minio
spec:
volumes:

  • name: localvolume
    hostPath:
    path: /mnt/disk1/data
    type: DirectoryOrCreate
    containers:
  • name: minio
    image: quay.io/minio/minio:RELEASE.2024-01-13T07-53-03Z
    command:
  • /bin/bash
  • -c
    args: - minio server /data –address :9000 –console-address :9090
    volumeMounts:
  • mountPath: /data
    name: localvolume
    status: {}

This deployment will create one instance of MinIO (replicas: 1), exposing the WebUi on port9090 and the rest api will be available on port 9000.

Execute:

kubectl -n minio-dev create -f <yaml_file_containing_the_example_above>

3 — Expose the WebConsole and the Rest Api of MinIO internally

apiVersion: v1
kind: Service
metadata:
labels:
app: minio
name: minio-svc
namespace: minio-dev
spec:
ports:

  • name: webconsole
    port: 9090
    protocol: TCP
    targetPort: 9090
  • name: api
    port: 9000
    protocol: TCP
    targetPort: 9000
    selector:
    app: minio
    type: ClusterIP
    status:
    loadBalancer: {}

In Kubernetes, there are [several ways to expose a service], and in this guide is used the ClusterIP method which will expose the minIO on an internal IP address that is only reachable within the Kubernetes cluster.

Execute:

kubectl -n minio-dev create -f <yaml_file_containing_the_example_above>

4 — External access to MinIO WebConsole

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: minio-ingress
namespace: minio-dev
spec:
rules:

  • http:
    paths:
  • path: /
    pathType: Prefix
    backend:
    service:
    name: minio-svc
    port:
    number: 9090

To enable external access to the MinIO WebConsole, we will utilize an Ingress. There are more approaches, but we decided to use this one, due to the intrinsic advantages of using Ingress, which you can see here.

There is a requirement step to have the Ingress Controller enabled, you must execute the following command:

minikube addons enable ingress

Execute the following command to verify that the NGINX Ingress controller is running:

kubectl get pods -n ingress-nginx

Execute:

kubectl -n minio-dev create -f <yaml_file_containing_the_example_above>

5 — Accessing the MinIO WebConsole image.png ![[image-20240725165530338.png]]

用 k9s 访问一下看看 image.png ![[image-20240725165659408.png]]

image.png ![[image-20240725170205247.png]]

Open your web browser and enter the provided address, in this example, is 192.168.49.2. The MinIO user interface will then become accessible. The default credentials are user: minioadmin and password: minioadmin. If you pretend to modify these credentials, which is highly recommended, you must set those environment variables.

You might wonder about the choice of MinIO in our configuration. In this setup, MinIO serves as the external storage solution for saving checkpointssavepoints, and managing information related to the setup of high availability. As always you have other options to store this information such as AWS S3, GCS, or Azure Blob Storage 访问 minio 是访问不了,这个 ip 只能 minikube 的机器中可以访问,如果想访问,可以使用 minikube tunnel 进行访问, 然后可以通过 localhost 访问到节点如图所 image.png

Bonus

If you access the MinIO WebConsole through the Ingress, file uploads are limited to 1Mb and to overcome this restriction, modify the Ingress rule by adding the annotation nginx.ingress.kubernetes.io/proxy-body-size: 500m to allow uploads with a maximum file size of 500Mb

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: minio-ingress
namespace: minio-dev
annotations:
nginx.ingress.kubernetes.io/proxy-body-size: 500m
spec:
rules:

  • http:
    paths:
  • path: /
    pathType: Prefix
    backend:
    service:
    name: minio-svc
    port:
    number: 9090

The fun part starts here :). 1 — Install Flink Kubernetes Operator

To install the operator we will use Helm, which is an open-source package manager for Kubernetes. If you don’t have it you need to follow this guide to install Helm.

Once you have Helm installed, you need to add the Flink Operator chart repository:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
helm repo add flink-kubernetes-operator-1.7.0 https://archive.apache.org/dist/flink/flink-kubernetes-operator-1.7.0/
helm repo list
NAME                            URL
traefik                         https://helm.traefik.io/traefik
prometheus-community            https://prometheus-community.github.io/helm-charts
drone                           https://charts.drone.io
ingress-nginx                   https://kubernetes.github.io/ingress-nginx
bitnami                         https://charts.bitnami.com/bitnami
juicefs                         https://juicedata.github.io/charts/
jetstack                        https://charts.jetstack.io
emqx                            https://repos.emqx.io/charts
kubeblocks                      https://apecloud.github.io/helm-charts
gitlab                          https://charts.gitlab.io
istio                           https://istio-release.storage.googleapis.com/charts
kubernetes-dashboard            https://kubernetes.github.io/dashboard/
rancher-latest                  https://releases.rancher.com/server-charts/latest
kong                            https://charts.konghq.com
rancher-alpha                   https://releases.rancher.com/server-charts/alpha
flink-operator-repo             https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
nfs-subdir-external-provisioner https://kubernetes-sigs.github.io/nfs-subdir-external-provisioner/
flink-kubernetes-operator-1.7.0 https://archive.apache.org/dist/flink/flink-kubernetes-operator-1.7.0/
selectdb-repo                   https://charts.selectdb.com
bigdata-gradiant                https://gradiant.github.io/bigdata-charts/

Useful commands:

  • Once the repository is added, you will be able to list the charts you can install:

helm search repo flink-kubernetes-operator-1.7.0

  • List all possible value settings for the chart of Flink Operator

helm show values flink-kubernetes-operator-1.7.0/flink-kubernetes-operator

You can explore the settings here to understand better what they mean and the purpose of the configs.

In our setup, some values will be overridden:

operatorServiceAccount:
create: true
annotations: {}
name: “flink-operator”

watchNamespaces: - “flink-jobs”

The Flink Operator listens for Flink Deployments only on the flink-jobs namespace, while the Flink Operator is installed in the flink-operator namespace. This separation allows the isolation of resources by being possible to apply restrictions to the namespaces to prevent future problems within the cluster. Keep in mind that deploying Flink jobs across multiple namespaces is also feasible, but insuch scenario it’s necessary to update the attribute watchNamespaces .

Storing the configuration above in a file named values.yaml you are able to finally create the Flink Operator by executing the following commands 🎉:

1
2
3
kubectl create ns flink-operator
kubectl create ns flink-jobs
helm -n flink-operator install -f values.yaml flink-kubernetes-operator flink-kubernetes-operator-1.7.0/flink-kubernetes-operator --set webhook.create=false

After executing the commands above, you will see the Flink Operator running with success:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

kubectl -n flink-operator get all

#output
NAME                                             READY   STATUS    RESTARTS        AGE
pod/flink-kubernetes-operator-5c46f67cd7-vrdpn   1/1     Running   6 (7h40m ago)   3d5h

NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-kubernetes-operator   1/1     1            1           3d5h

NAME                                                   DESIRED   CURRENT   READY   AGE
replicaset.apps/flink-kubernetes-operator-5c46f67cd7   1         1         1       3d5h

2 — Deploy a Flink Job

To deploy a simple Flink job using the Flink Operator with local disk storage for checkpoints/savepoints, please note that if the disk is ephemeral all state will be lost. Simply create a definition similar to the following:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink-jobs
name: basic-example-1
spec:
image: flink-1:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: “1”
state.savepoints.dir: s3://test/savepoins
state.checkpoints.dir: s3://test/checkpoints
s3.endpoint: http://minio-svc.minio-dev:9000
s3.path.style.access: “true”
s3.access-key: minioadmin
s3.secret-key: minioadmin
ingress:
template: “/{{namespace}}/{{name}}(/|$)(.*)”
className: “nginx”
annotations:
nginx.ingress.kubernetes.io/rewrite-target: “/$2”
serviceAccount: flink
jobManager:
resource:
memory: “100m”
cpu: 0.5
taskManager:
resource:
memory: “100m”
cpu: 0.5
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 1
state: running

100m 不能触发任务,因为 flink 的内存计算机制,至少 1g 的 memory 才可以运行。 This definition will create a single JobManager, a TaskManager with one slot, and a Flink Job with a parallelism of 1. Due to the instantion of MinIO it’s possible to store the the checkpoints and savepoints in MinIO.

The Ingress section above is to automate the creation of the Ingress to allow the external UI access. To access the UI, open the browser and navigate to the endpoint [http://ip/flink-jobs/basic-example/](http://ip/flink-jobs/basic-example/) .

Attention

Starting in Version 0.22.0, ingress definitions using the annotation nginx.ingress.kubernetes.io/rewrite-target are not backwards compatible with previous versions. In Version 0.22.0 and beyond, any substrings within the request URI that need to be passed to the rewritten path must explicitly be defined in a capture group.

文档上给出了非常明显的警告 ⚠️:从 V0.22.0 版本开始将不再兼容之前的入口定义,再查看一下我的 nginx-ingress-controller 版本,果然问题出现来这里。

Note

Captured groups are saved in numbered placeholders, chronologically, in the form $1$2 … $n. These placeholders can be used as parameters in the rewrite-target annotation.

ingress 访问也遇到问题,无法正常访问 flink,出现这个问题的原因是重定向出问题了,新版本的 ingress 升级了,之前的策略不生效。解决办法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
$ echo '
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /$2
  name: rewrite
  namespace: default
spec:
  rules:
  - host: rewrite.bar.com
    http:
      paths:
      - backend:
          serviceName: http-svc
          servicePort: 80
        path: /something(/|$)(.*)
' | kubectl create -f -

note: To know the ip address double-check the section Accessing the MinIO WebConsole . image.png

访问 localhost,是因为可以开启 minikube tunnel,才可以进行访问

Flink UI

Congratulations! 🎉🥳 You now have a Flink Job running through Flink Operator. By using a similar setup, you unlock numerous advantages, ensuring a streamlined experience for your infrastructure clients and reducing their work a lot.

In this guide you are able to understand the internal concepts of Flink and Kubernetes.

Bonus 1

If you have an eagle eye, you notice that the image used is flink-1:1.17 . This is because the utilization of the S3 schema for storing checkpoints and savepoints, and to make use the S3 schema, it’s required to include the necessary pluginsflink-s3-fs-hadoop. In this scenario, it was used a custom Flink image to enable the mandatory plugins.

1
2
3
4
5
6
7
8
9
FROM flink:1.17

RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop/ &&\
    mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/ &&\
    chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop

USER flink

CMD exec /usr/share/flink/bin/flink-console.sh ${COMPONENT} ${ARGS}

In the documentation are explained the available approaches.

Bonus 2

If you don’t need to use a custom Flink image, it’s possible to enable the plugins using an environment variable, ENABLE_BUILT_IN_PLUGINS:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: flink-jobs
name: basic-example
spec:
image: flink:1.17
flinkVersion: v1_17
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:

  • name: flink-main-container
    env:
  • name: ENABLE_BUILT_IN_PLUGINS
    value: flink-s3-fs-hadoop-1.17.2.jar
    flinkConfiguration:
    taskmanager.numberOfTaskSlots: “1”
    state.savepoints.dir: s3://test/savepoins
    state.checkpoints.dir: s3://test/checkpoints
    s3.endpoint: http://minio-svc.minio-dev:9000
    s3.path.style.access: “true”
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
    ingress:
    template: “/{{namespace}}/{{name}}(/|$)(.*)”
    className: “nginx”
    annotations:
    nginx.ingress.kubernetes.io/rewrite-target: “/$2”
    serviceAccount: flink
    jobManager:
    resource:
    memory: “1000m”
    cpu: 0.5
    taskManager:
    resource:
    memory: “1000m”
    cpu: 0.5
    job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 1 upgradeMode: savepoint state: running

Bonus 3

In order to simplify the management of Kubernetes Cluster and to be easy to undertand what going on, consider install the Kubernetes Dashboard and here exists a guide describing the steps to create an user to access the Kubernetes Dasboard.

image.png ![[image-20240726110647464.png]]

Kubernetes Dashboard

In the next series I will delve into the common issues and tips for optimizing your Flink applications and deploys.

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up