注意构建的 haproxy+keepalived 的双主 k8s 集群
![[image-20240829110902257.png]]
![[image-20240830094012202.png]]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
|
---
## Directory where the binaries will be installed
bin_dir: /usr/local/bin
## The access_ip variable is used to define how other nodes should access
## the node. This is used in flannel to allow other flannel nodes to see
## this node for example. The access_ip is really useful AWS and Google
## environments where the nodes are accessed remotely by the "public" ip,
## but don't know about that address themselves.
# access_ip: 1.1.1.1
## External LB example config
apiserver_loadbalancer_domain_name: "master.k8s.io"
loadbalancer_apiserver:
address: 10.7.10.196
port: 16443
## Internal loadbalancers for apiservers
# loadbalancer_apiserver_localhost: true
# valid options are "nginx" or "haproxy"
loadbalancer_apiserver_type: haproxy # valid values "nginx" or "haproxy"
## Local loadbalancer should use this port
## And must be set port 6443
loadbalancer_apiserver_port: 16443
## If loadbalancer_apiserver_healthcheck_port variable defined, enables proxy liveness check for nginx.
loadbalancer_apiserver_healthcheck_port: 8081
### OTHER OPTIONAL VARIABLES
## By default, Kubespray collects nameservers on the host. It then adds the previously collected nameservers in nameserverentries.
## If true, Kubespray does not include host nameservers in nameserverentries in dns_late stage. However, It uses the nameserver to make sure cluster installed safely in dns_early stage.
## Use this option with caution, you may need to define your dns servers. Otherwise, the outbound queries such as www.google.com may fail.
# disable_host_nameservers: false
## Upstream dns servers
# upstream_dns_servers:
# - 8.8.8.8
# - 8.8.4.4
## There are some changes specific to the cloud providers
## for instance we need to encapsulate packets with some network plugins
## If set the possible values are either 'gce', 'aws', 'azure', 'openstack', 'vsphere', 'oci', or 'external'
## When openstack is used make sure to source in the openstack credentials
## like you would do when using openstack-client before starting the playbook.
# cloud_provider:
## When cloud_provider is set to 'external', you can set the cloud controller to deploy
## Supported cloud controllers are: 'openstack', 'vsphere', 'huaweicloud' and 'hcloud'
## When openstack or vsphere are used make sure to source in the required fields
# external_cloud_provider:
## Set these proxy values in order to update package manager and docker daemon to use proxies and custom CA for https_proxy if needed
# http_proxy: ""
# https_proxy: ""
# https_proxy_cert_file: ""
## Refer to roles/kubespray-defaults/defaults/main/main.yml before modifying no_proxy
# no_proxy: ""
## Some problems may occur when downloading files over https proxy due to ansible bug
## https://github.com/ansible/ansible/issues/32750. Set this variable to False to disable
## SSL validation of get_url module. Note that kubespray will still be performing checksum validation.
# download_validate_certs: False
## If you need exclude all cluster nodes from proxy and other resources, add other resources here.
# additional_no_proxy: ""
## If you need to disable proxying of os package repositories but are still behind an http_proxy set
## skip_http_proxy_on_os_packages to true
## This will cause kubespray not to set proxy environment in /etc/yum.conf for centos and in /etc/apt/apt.conf for debian/ubuntu
## Special information for debian/ubuntu - you have to set the no_proxy variable, then apt package will install from your source of wish
# skip_http_proxy_on_os_packages: false
## Since workers are included in the no_proxy variable by default, docker engine will be restarted on all nodes (all
## pods will restart) when adding or removing workers. To override this behaviour by only including master nodes in the
## no_proxy variable, set below to true:
no_proxy_exclude_workers: false
## Certificate Management
## This setting determines whether certs are generated via scripts.
## Chose 'none' if you provide your own certificates.
## Option is "script", "none"
# cert_management: script
## Set to true to allow pre-checks to fail and continue deployment
# ignore_assert_errors: false
## The read-only port for the Kubelet to serve on with no authentication/authorization. Uncomment to enable.
# kube_read_only_port: 10255
## Set true to download and cache container
# download_container: true
## Deploy container engine
# Set false if you want to deploy container engine manually.
# deploy_container_engine: true
## Red Hat Enterprise Linux subscription registration
## Add either RHEL subscription Username/Password or Organization ID/Activation Key combination
## Update RHEL subscription purpose usage, role and SLA if necessary
# rh_subscription_username: ""
# rh_subscription_password: ""
# rh_subscription_org_id: ""
# rh_subscription_activation_key: ""
# rh_subscription_usage: "Development"
# rh_subscription_role: "Red Hat Enterprise Server"
# rh_subscription_sla: "Self-Support"
## Check if access_ip responds to ping. Set false if your firewall blocks ICMP.
# ping_access_ip: true
# sysctl_file_path to add sysctl conf to
# sysctl_file_path: "/etc/sysctl.d/99-sysctl.conf"
## Variables for webhook token auth https://kubernetes.io/docs/reference/access-authn-authz/authentication/#webhook-token-authentication
kube_webhook_token_auth: false
kube_webhook_token_auth_url_skip_tls_verify: false
# kube_webhook_token_auth_url: https://...
## base64-encoded string of the webhook's CA certificate
# kube_webhook_token_auth_ca_data: "LS0t..."
## NTP Settings
# Start the ntpd or chrony service and enable it at system boot.
ntp_enabled: false
ntp_manage_config: false
ntp_servers:
- "0.pool.ntp.org iburst"
- "1.pool.ntp.org iburst"
- "2.pool.ntp.org iburst"
- "3.pool.ntp.org iburst"
## Used to control no_log attribute
unsafe_show_logs: false
## If enabled it will allow kubespray to attempt setup even if the distribution is not supported. For unsupported distributions this can lead to unexpected failures in some cases.
allow_unsupported_distribution_setup: false
|
containerd.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
---
# Please see roles/container-engine/containerd/defaults/main.yml for more configuration options
# containerd_storage_dir: "/var/lib/containerd"
# containerd_state_dir: "/run/containerd"
# containerd_oom_score: 0
# containerd_default_runtime: "runc"
# containerd_snapshotter: "native"
# containerd_runc_runtime:
# name: runc
# type: "io.containerd.runc.v2"
# engine: ""
# root: ""
# containerd_additional_runtimes:
# Example for Kata Containers as additional runtime:
# - name: kata
# type: "io.containerd.kata.v2"
# engine: ""
# root: ""
# containerd_grpc_max_recv_message_size: 16777216
# containerd_grpc_max_send_message_size: 16777216
# Containerd debug socket location: unix or tcp format
# containerd_debug_address: ""
# Containerd log level
# containerd_debug_level: "info"
# Containerd logs format, supported values: text, json
# containerd_debug_format: ""
# Containerd debug socket UID
# containerd_debug_uid: 0
# Containerd debug socket GID
# containerd_debug_gid: 0
# containerd_metrics_address: ""
# containerd_metrics_grpc_histogram: false
# Registries defined within containerd.
containerd_registries_mirrors:
- prefix: 10.7.20.12:5000
mirrors:
- host: http://10.7.20.12:5000
capabilities: ["pull", "resolve"]
skip_verify: true
# containerd_max_container_log_line_size: -1
# containerd_registry_auth:
# - registry: 10.0.0.2:5000
# username: user
# password: pass
|
将 ssh 公钥添加到所有主机
root@debian:/root/kubespray git:(master*) # ansible -i inventory/mycluster/inventory.ini all -m authorized_key -a “user={{ ansible_user }} key=’{{ lookup(‘file’, ‘{{ ssh_cert_path }}’) }}’” -e ssh_cert_path=./.ssh/id_rsa.pub -e ansible_ssh_pass=abc.123
安装命令是:
1
2
3
|
ansible-playbook -i inventory/mycluster/inventory.ini -b cluster.yml -v
#卸载
ansible-playbook -i inventory/mycluster/inventory.ini -b reset.yml -v
|
安装后用 k9s 查看
![[image-20240830093852966.png]]
![[image-20240829111032761.png]]
安装 ingress
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
kubectl apply -f deploy.yaml
namespace/ingress-nginx created
serviceaccount/ingress-nginx created
serviceaccount/ingress-nginx-admission created
role.rbac.authorization.k8s.io/ingress-nginx created
role.rbac.authorization.k8s.io/ingress-nginx-admission created
clusterrole.rbac.authorization.k8s.io/ingress-nginx created
clusterrole.rbac.authorization.k8s.io/ingress-nginx-admission created
rolebinding.rbac.authorization.k8s.io/ingress-nginx created
rolebinding.rbac.authorization.k8s.io/ingress-nginx-admission created
clusterrolebinding.rbac.authorization.k8s.io/ingress-nginx created
clusterrolebinding.rbac.authorization.k8s.io/ingress-nginx-admission created
configmap/ingress-nginx-controller created
service/ingress-nginx-controller created
service/ingress-nginx-controller-admission created
deployment.apps/ingress-nginx-controller created
job.batch/ingress-nginx-admission-create created
job.batch/ingress-nginx-admission-patch created
ingressclass.networking.k8s.io/nginx created
validatingwebhookconfiguration.admissionregistration.k8s.io/ingress-nginx-admission created
|
安装 nfs-storageclass
1
2
3
4
5
6
7
8
|
kubectl apply -f nfs-sc.yaml
storageclass.storage.k8s.io/nfs-storage created
deployment.apps/nfs-client-provisioner created
serviceaccount/nfs-client-provisioner created
clusterrole.rbac.authorization.k8s.io/nfs-client-provisioner-runner created
clusterrolebinding.rbac.authorization.k8s.io/run-nfs-client-provisioner created
role.rbac.authorization.k8s.io/leader-locking-nfs-client-provisioner created
rolebinding.rbac.authorization.k8s.io/leader-locking-nfs-client-provisioner created
|
安装 flink-kubernetes-operator
1
2
|
kubectl apply -f cert-manager.yaml
helm install flink-kubernetes-operator --namespace flink --create-namespace -f values-test.yaml flink-kubernetes-operator/
|
输出如下所示:
![[image-20240830100013520.png]]
values-test.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
cat values-test.yaml
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
---
# List of kubernetes namespaces to watch for FlinkDeployment changes, empty means all namespaces.
# When enabled RBAC is only created for said namespaces, otherwise it is done for the cluster scope.
# watchNamespaces: ["flink"]
image:
repository: 10.7.20.12:5000/apache/flink-kubernetes-operator
pullPolicy: IfNotPresent
tag: "91d67d9"
# If image digest is set then it takes precedence and the image tag will be ignored
# digest: ""
imagePullSecrets: []
# Replicas must be 1 unless operator leader election is configured
replicas: 1
# Strategy type must be Recreate unless leader election is configured
strategy:
type: Recreate
rbac:
create: true
# kubernetes.rest-service.exposed.type: NodePort requires
# list permission for nodes at the cluster scope.
# Set create to true if you are using NodePort type.
nodesRule:
create: false
operatorRole:
create: true
name: "flink-operator"
operatorRoleBinding:
create: true
name: "flink-operator-role-binding"
jobRole:
create: true
name: "flink"
jobRoleBinding:
create: true
name: "flink-role-binding"
operatorPod:
priorityClassName: null
annotations: {}
labels: {}
env:
# - name: ""
# value: ""
# - name: ""
# valueFrom:
# configMapKeyRef:
# name: ""
# key: ""
# dnsPolicy: ""
# dnsConfig: {}
# Node labels for operator pod assignment
# https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/
envFrom:
# - configMapRef:
# name: ""
nodeSelector: {}
# Node tolerations for operator pod assignment
# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
tolerations: []
# Topology spread constrains
# https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/
topologySpreadConstraints: []
resources: {}
# resources:
# limits:
# cpu: "250m"
# memory: "512Mi"
# requests:
# cpu: "250m"
# memory: "512Mi"
webhook:
resources: {}
operatorServiceAccount:
create: true
annotations: {}
name: "flink-operator"
jobServiceAccount:
create: true
annotations:
"helm.sh/resource-policy": keep
name: "flink"
operatorVolumeMounts:
create: false
data:
- name: flink-artifacts
mountPath: /opt/flink/artifacts
operatorVolumes:
create: false
data:
- name: flink-artifacts
hostPath:
path: /tmp/flink/artifacts
type: DirectoryOrCreate
# - name: flink-artifacts
# persistentVolumeClaim:
# claimName: flink-artifacts
podSecurityContext:
runAsUser: 9999
runAsGroup: 9999
# fsGroup: 9999
operatorSecurityContext: {}
webhookSecurityContext: {}
webhook:
create: true
# validator:
# create: true
# mutator:
# create: true
keystore:
useDefaultPassword: true
# passwordSecretRef:
# name: jks-password-secret
# key: password-key
defaultConfiguration:
# If set to true, creates ConfigMaps/VolumeMounts. If set to false, no configuration will be created.
# All below fields will be ignored if create is set to false.
create: true
# If set to true,
# (1) loads the built-in default configuration
# (2) appends the below flink-conf and logging configuration overrides
# If set to false, loads just the overrides as in (2).
# This option has not effect, if create is equal to false.
append: true
flink-conf.yaml: |+
# Flink Config Overrides
kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
kubernetes.operator.reconcile.interval: 15 s
kubernetes.operator.observer.progress-check.interval: 5 s
log4j-operator.properties: |+
# Flink Operator Logging Overrides
# rootLogger.level = DEBUG
# logger.operator.name= org.apache.flink.kubernetes.operator
# logger.operator.level = DEBUG
log4j-console.properties: |+
# Flink Deployment Logging Overrides
# rootLogger.level = DEBUG
# (Optional) Exposes metrics port on the container if defined
metrics:
port:
nameOverride: ""
fullnameOverride: ""
# Set the jvm start up options for webhook and operator
jvmArgs:
webhook: ""
operator: ""
# Configure health probes for the operator
operatorHealth:
port: 8085
livenessProbe:
periodSeconds: 10
initialDelaySeconds: 30
startupProbe:
failureThreshold: 30
periodSeconds: 10
# Set postStart hook of the main container
postStart: {}
# Configuration for tls
tls:
create: false
secretName: flink-operator-cert
secretKeyRef:
name: operator-certificate-password
key: password
|
![[image-20240829111528252.png]]
创建 flink-session-job:
1
2
3
4
5
|
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
basic-session-deployment-only-example ClusterIP None <none> 6123/TCP,6124/TCP 7s
basic-session-deployment-only-example-rest ClusterIP 192.166.51.229 <none> 8081/TCP 7s
basic-session-deployment-only-example-rest-nodeport NodePort 192.166.15.121 <none> 8081:30081/TCP 20s
flink-operator-webhook-service ClusterIP 192.166.25.15 <none> 443/TCP 2m32s
|
访问虚拟 ip+30081 即可
![[image-20240829111747923.png]]
注意 10.7.10.196 是虚拟 ip。
接下来是用 metallb 来进行区分
Preparation
If you’re using kube-proxy in IPVS mode, since Kubernetes v1.14.2 you have to enable strict ARP mode.
Note, you don’t need this if you’re using kube-router as service-proxy because it is enabling strict ARP by default.
You can achieve this by editing kube-proxy config in current cluster:
1
|
kubectl edit configmap -n kube-system kube-proxy
|
and set:
1
2
3
4
5
|
apiVersion: kubeproxy.config.k8s.io/v1alpha1
kind: KubeProxyConfiguration
mode: 'ipvs'
ipvs:
strictARP: true
|
You can also add this configuration snippet to your kubeadm-config, just append it with ---
after the main configuration.
If you are trying to automate this change, these shell snippets may help you:
1
2
3
4
5
6
7
8
9
|
# see what changes would be made, returns nonzero returncode if different
kubectl get configmap kube-proxy -n kube-system -o yaml | \
sed -e "s/strictARP: false/strictARP: true/" | \
kubectl diff -f - -n kube-system
# actually apply the changes, returns nonzero returncode on errors only
kubectl get configmap kube-proxy -n kube-system -o yaml | \
sed -e "s/strictARP: false/strictARP: true/" | \
kubectl apply -f - -n kube-system
|
Installation by manifest
To install MetalLB, apply the manifest:
1
|
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.14.8/config/manifests/metallb-native.yaml
|
Note
If you want to deploy MetalLB using the FRR mode, apply the manifests:
1
|
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.14.8/config/manifests/metallb-frr.yaml
|
If you want to deploy MetalLB using the experimental FRR-K8s mode:
1
|
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.14.8/config/manifests/metallb-frr-k8s.yaml
|
Please do note that these manifests deploy MetalLB from the main development branch. We highly encourage cloud operators to deploy a stable released version of MetalLB on production environments!
This will deploy MetalLB to your cluster, under the metallb-system
namespace. The components in the manifest are:
- The
metallb-system/controller
deployment. This is the cluster-wide controller that handles IP address assignments.
- The
metallb-system/speaker
daemonset. This is the component that speaks the protocol(s) of your choice to make the services reachable.
- Service accounts for the controller and speaker, along with the RBAC permissions that the components need to function.
The installation manifest does not include a configuration file. MetalLB’s components will still start, but will remain idle until you start deploying resources.
There are also two all-in-one manifests to allow the integration with prometheus. They assume that the prometheus operator is deployed in the monitoring
namespace using the prometheus-k8s
service account. It is suggested to use either the charts or kustomize if they need to be changed.
Note
免密公钥操作:
```
ansible -i inventory/mycluster/inventory.ini all -m authorized_key -a “user={{ ansible_user }} key=’{{ lookup(‘file’, ‘{{ ssh_cert_path }}’) }}’” -e ssh_cert_path=./.ssh/id_rsa.pub -e ansible_ssh_pass=abc.123
1
2
3
4
5
|
![image.png](https://cdn.jsdelivr.net/gh/huangxiaofeng10047/blogimage@main/img/20240829143119.png)
![[image-20240829143119999.png]]
ip线程池不能存在于节点间,会出问题的。部署成节点ip之后,网络断掉了。
下载镜像:
|
ctr images pull –hosts-dir “/etc/containerd/certs.d” -k 10.7.20.12:5000/calico/node:v3.28.1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
访问nginx成功
![image.png](https://cdn.jsdelivr.net/gh/huangxiaofeng10047/blogimage@main/img/20240830105359.png)
![[image-20240830105359902.png]]
查看ingress-nginx 分配给flink的external-ip
![image.png](https://cdn.jsdelivr.net/gh/huangxiaofeng10047/blogimage@main/img/20240830105659.png)
![[image-20240830105659861.png]]
配置ip为:
![image.png](https://cdn.jsdelivr.net/gh/huangxiaofeng10047/blogimage@main/img/20240830105715.png)
![[image-20240830105715084.png]]
访问地址http://flink.k8s.io/flink/basic-application-deployment-only-ingress/
![image.png](https://cdn.jsdelivr.net/gh/huangxiaofeng10047/blogimage@main/img/20240830105737.png)
![[image-20240830105737182.png]]
使用java sdk 开发项目:
```pom
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator</artifactId>
<version>1.10-SNAPSHOT</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
<groupId>io.fabric8</groupId>
</exclusion>
<exclusion>
<artifactId>kubernetes-client</artifactId>
<groupId>io.fabric8</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/io.fabric8/kubernetes-client -->
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>6.13.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.13.1</version>
</dependency>
</dependencies>
|
使用 flink-kubernetes-operator 代码
1
|
git clone git@github.com:apache/flink-kubernetes-operator.git
|
新建 module 即可。注意运行过程中如何报错,no_proxy ,需要添加 jvm 参数,如下图所示
![[image-20240830132756484.png]]
Main.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
package org.apache.flink.examples;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Main {
public static void main(String[] args) throws InterruptedException, IOException {
// 拼接 YAML 文件
FlinkDeployment flinkDeployment = new FlinkDeployment();
flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
flinkDeployment.setKind("FlinkDeployment");
ObjectMeta objectMeta = new ObjectMeta();
objectMeta.setNamespace("flink");
objectMeta.setName("basic-application-deployment-only-ingress");
flinkDeployment.setMetadata(objectMeta);
FlinkDeploymentSpec flinkDeploymentSpec = new FlinkDeploymentSpec();
flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_17);
flinkDeploymentSpec.setImage("10.7.20.12:5000/flink:1.17");
IngressSpec ingressSpec = new IngressSpec();
ingressSpec.setTemplate("flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)");
ingressSpec.setClassName("nginx");
Map<String, String> annotations = new HashMap<>();
annotations.put("nginx.ingress.kubernetes.io/rewrite-target","/$2");
ingressSpec.setAnnotations(annotations);
flinkDeploymentSpec.setIngress(ingressSpec);
Map<String, String> flinkConfiguration = new HashMap<>();
flinkConfiguration.put("taskmanager.numberOfTaskSlots", "2");
flinkDeploymentSpec.setFlinkConfiguration(flinkConfiguration);
flinkDeploymentSpec.setServiceAccount("flink");
JobManagerSpec jobManagerSpec = new JobManagerSpec();
jobManagerSpec.setResource(new Resource(1.0, "1024m","1G"));
flinkDeploymentSpec.setJobManager(jobManagerSpec);
TaskManagerSpec taskManagerSpec = new TaskManagerSpec();
taskManagerSpec.setResource(new Resource(1.0, "1024m","1G"));
flinkDeploymentSpec.setTaskManager(taskManagerSpec);
flinkDeployment.setSpec(flinkDeploymentSpec);
flinkDeployment
.getSpec()
.setJob(
JobSpec.builder()
.jarURI(
"local:///opt/flink/examples/streaming/StateMachineExample.jar")
.parallelism(2)
.upgradeMode(UpgradeMode.STATELESS)
.build());
// 打印 内容
String yaml = toYaml(flinkDeployment);
System.out.println("打印 Flink Job YAML");
System.out.println(yaml);
// 提交 Job try (KubernetesClient kubernetesClient = new KubernetesClientBuilder().build()) {
kubernetesClient.resource(flinkDeployment).createOrReplace();
}
System.out.println("Job 提交结束");
}
public static String toYaml(Object obj) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
mapper.findAndRegisterModules(); // Registers all available modules including Java Time modules
return mapper.writeValueAsString(obj);
}
}
|
运行结果如下图所示:
![[image-20240830132953907.png]]
输出的 yaml 如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
---
apiVersion: 'flink.apache.org/v1beta1'
kind: 'FlinkDeployment'
metadata:
name: 'basic-application-deployment-only-ingress'
namespace: 'flink'
spec:
job:
jarURI: 'local:///opt/flink/examples/streaming/StateMachineExample.jar'
parallelism: 2
entryClass: null
args: null
state: null
savepointTriggerNonce: null
initialSavepointPath: null
flinkStateSnapshotReference: null
checkpointTriggerNonce: null
upgradeMode: 'stateless'
allowNonRestoredState: null
savepointRedeployNonce: null
restartNonce: null
flinkConfiguration:
taskmanager.numberOfTaskSlots: '2'
image: '10.7.20.12:5000/flink:1.17'
imagePullPolicy: null
serviceAccount: 'flink'
flinkVersion: 'v1_17'
ingress:
template: 'flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)'
className: 'nginx'
annotations:
nginx.ingress.kubernetes.io/rewrite-target: '/$2'
labels: null
tls: null
podTemplate: null
jobManager:
resource:
cpu: 1.0
memory: '1024m'
ephemeralStorage: '1G'
replicas: 1
podTemplate: null
taskManager:
resource:
cpu: 1.0
memory: '1024m'
ephemeralStorage: '1G'
replicas: null
podTemplate: null
logConfiguration: null
mode: null
status:
jobStatus:
jobName: null
jobId: null
state: null
startTime: null
updateTime: null
upgradeSnapshotReference: null
savepointInfo:
lastSavepoint: null
triggerId: null
triggerTimestamp: null
triggerType: null
formatType: null
savepointHistory: []
lastPeriodicSavepointTimestamp: 0
checkpointInfo:
lastCheckpoint: null
triggerId: null
triggerTimestamp: null
triggerType: null
formatType: null
lastPeriodicCheckpointTimestamp: 0
error: null
observedGeneration: null
lifecycleState: 'CREATED'
clusterInfo: {}
jobManagerDeploymentStatus: 'MISSING'
reconciliationStatus:
reconciliationTimestamp: 0
lastReconciledSpec: null
lastStableSpec: null
state: 'UPGRADING'
taskManager: null
|
![[image-20240830133114334.png]]
访问接口看看
![[image-20240830133223518.png]]
访问 flink.k8s.io 即可。
![[image-20240830133333465.png]]
遇到这样的报错:
![[image-20240830133532733.png]]
通过添加 jvm 参数
1
|
-Dno.proxy=localhost,127.0.0.1,kubernetes.default.svc,k8s.io
|
metallb 的负载平衡
![[image-20240830133815396.png]]
对比其他的文档的一个,也是一样。