k8s写入hudi快速测试环境

k8s 安装 nfs 对应的 storageclass:nfs-client

1
2
3
4
5
6
7
helm repo add nfs-subdir-external-provisioner https://kubernetes-sigs.github.io/nfs-subdir-external-provisioner/

helm install nfs-subdir-external-provisioner nfs-subdir-external-provisioner/nfs-subdir-external-provisioner \
    --set nfs.server=10.7.20.26 \
    --set nfs.path=/data1/nfs/rootfs
helm pull nfs-subdir-external-provisioner/nfs-subdir-external-provisioner
helm install nfs-subdir-external-provisioner  -f ./values.yaml .    --set nfs.server=10.7.20.26     --set nfs.path=/data1/nfs/rootfs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
juicefs format --storage=minio --bucket=http://10.7.20.12:9000/juicefs --access-key=admin --secret-key=12345678 "mysql://root:root@(10.7.3.100:30006)/juicefs" juicefsminio


2022/12/05 12:52:08.025727 juicefs[1049932] <INFO>: Volume is formatted as {
  "Name": "juicefsminio",
  "UUID": "98a5212e-250d-4b49-9037-41c126142216",
  "Storage": "minio",
  "Bucket": "http://10.7.20.26:30012/juicefs",
  "AccessKey": "minioadmin",
  "SecretKey": "removed",
  "BlockSize": 4096,
  "Compression": "none",
  "KeyEncrypted": true,
  "TrashDays": 1,
  "MetaVersion": 1
} [format.go:472]

部署 flink-k8s-operator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
#To find the list of stable versions please visit https://flink.apache.org/downloads.html

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.0/
#The Helm chart by default points to the ghcr.io/apache/flink-kubernetes-operator image repository. If you #have connectivity issues or prefer to use Dockerhub instead you can use --set image.repository=apache/flink-kubernetes-operator during installation.

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

[root@k-m1 ~]# kubectl apply -f basic.yaml
flinkdeployment.flink.apache.org/basic-example created

basic.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# apiVersion: flink.apache.org/v1beta1
# kind: FlinkDeployment
# metadata:
#   name: basic-example
# spec:
#   image: flink:1.15
#   flinkVersion: v1_15
#   flinkConfiguration:
#     taskmanager.numberOfTaskSlots: "2"
#   serviceAccount: flink
#   jobManager:
#     resource:
#       memory: "2048m"
#       cpu: 1
#   taskManager:
#     resource:
#       memory: "2048m"
#       cpu: 1
#   job:
#     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
#     parallelism: 2
#     upgradeMode: stateless
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: xiaozhch5/flink-sql-submit:hudi-0.12-juicefs
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    s3.endpoint: "http://10.7.20.26:30012"
    s3.path.style.access: "true"
    s3.access.key: "minioadmin"
    s3.secret.key: "minioadmin"
    state.backend.incremental: "true"
    execution.checkpointing.interval: "300000ms"
    state.savepoints.dir: "s3://flink-data/savepoints"
    state.checkpoints.dir: "s3://flink-data/checkpoints"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 2
  job:
    jarURI: local:///opt/flink/lib/flink-sql-submit-1.0.jar
    args: ["-f", "s3://flink-tasks/k8s-flink-sql-test.sql", "-m", "streaming", "-e", "http://10.7.20.26:30012", "-a", "minioadmin", "-s", "minioadmin"]
    parallelism: 2
    upgradeMode: stateless
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/hadoop/etc/hadoop/
              name: core-site
      volumes:
        - name: core-site
          configMap:
            name: core-site

datafaker

1
pip3 install datafaker

第一步创建表。再 mysql 中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
create table stu (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  class_num int not null comment '班级人数',
  score decimal(4,2) not null comment '成绩',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址',
  address text comment '家庭地址'
) engine=InnoDB default charset=utf8;

编写元数据 meta.txt,这是个学生表描述。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(file://names.txt)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]
1
datafaker rdb mysql+mysqldb://root:root@10.7.20.26:30006/test?charset=utf8 stu 10  --meta meta.txt

datafaker 安装遇到问题:

🌈 pip install datafaker –upgrade

pip install mysqlclient

🌈 pip install mysqlclient

这种顺序即可。

pacman -S python-mysqlclient

各版本 linux 类型安装mysqlclient

dafaker 导入数据查看https://lrting.top/useful-tools/443/

du -sh * .

可以找出占用空间。

202331 号重新排解

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20


[root@k-m1 ~]# juicefs format --storage=minio --bucket=http://10.7.20.26:32000/juicefs --access-key=minio --secret-key=minio123  mysql://root:root@(10.7.20.26:30006)/juicefs juicefsminio
2023/03/01 10:32:33.588985 juicefs[3761846] <INFO>: Meta address: mysql://root:****@(10.7.20.26:30006)/juicefs [interface.go:402]
2023/03/01 10:32:33.692516 juicefs[3761846] <WARNING>: The latency to database is too high: 103.232302ms [sql.go:203]
2023/03/01 10:32:33.809210 juicefs[3761846] <INFO>: Data use minio://10.7.20.26:32000/juicefs/juicefsminio/ [format.go:435]
2023/03/01 10:32:34.187506 juicefs[3761846] <INFO>: Volume is formatted as {
  "Name": "juicefsminio",
  "UUID": "8897e62f-f349-4e3c-bdf4-be93639fd5a5",
  "Storage": "minio",
  "Bucket": "http://10.7.20.26:32000/juicefs",
  "AccessKey": "minio",
  "SecretKey": "removed",
  "BlockSize": 4096,
  "Compression": "none",
  "KeyEncrypted": true,
  "TrashDays": 1,
  "MetaVersion": 1
} [format.go:472]
[root@k-m1 ~]#

出现报错:

Permission denied: user=flink, path=“jfs://juicefsminio/orders_hudi_2/.hoodie/.temp”:hdfs:supergroup:drwxr-xr-x

需要修改 core-site.xml 文件:

1
2
3
4
5
    在末尾追加
    <property>
      <name>juicefs.superuser</name>
      <value>flink</value>
    </property>

参考文档:

https://lrting.top/backend/10300/

https://www.bookstack.cn/read/juicefs-ce-1.0-beta-zh/16fa6af351f62e26.md

在 default namespace 使用 core-site.xml 创建 configmap,core-site

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>

    <property>
        <name>fs.s3a.endpoint</name>
        <value>http://192.168.1.2:9000</value>
        <description>AWS S3 endpoint to connect to. An up-to-date list is
            provided in the AWS Documentation: regions and endpoints. Without this
            property, the standard region (s3.amazonaws.com) is assumed.
        </description>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <value>PSBZMLL1NXZYCX55QMBI</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>CNACTHv4+fPHvYT7gwaKCyWR7K96zHXNU+f9yccJ</value>
    </property>

    <property>
        <name>fs.s3a.path.style.access</name>
        <value>true</value>
        <description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
            Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
        </description>
    </property>

    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>
            org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
        </value>
        <description>
            Comma-separated class names of credential provider classes which implement
            com.amazonaws.auth.AWSCredentialsProvider.

            When S3A delegation tokens are not enabled, this list will be used
            to directly authenticate with S3 and DynamoDB services.
            When S3A Delegation tokens are enabled, depending upon the delegation
            token binding it may be used
            to communicate with the STS endpoint to request session/role
            credentials.

            These are loaded and queried in sequence for a valid set of credentials.
            Each listed class must implement one of the following means of
            construction, which are attempted in order:
            * a public constructor accepting java.net.URI and
            org.apache.hadoop.conf.Configuration,
            * a public constructor accepting org.apache.hadoop.conf.Configuration,
            * a public static method named getInstance that accepts no
            arguments and returns an instance of
            com.amazonaws.auth.AWSCredentialsProvider, or
            * a public default constructor.

            Specifying org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider allows
            anonymous access to a publicly accessible S3 bucket without any credentials.
            Please note that allowing anonymous access to an S3 bucket compromises
            security and therefore is unsuitable for most use cases. It can be useful
            for accessing public data sets without requiring AWS credentials.

            If unspecified, then the default list of credential provider classes,
            queried in sequence, is:
            * org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider: looks
            for session login secrets in the Hadoop configuration.
            * org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider:
            Uses the values of fs.s3a.access.key and fs.s3a.secret.key.
            * com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports
            configuration of AWS access key ID and secret access key in
            environment variables named AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
            and AWS_SESSION_TOKEN as documented in the AWS SDK.
            * org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider: picks up
            IAM credentials of any EC2 VM or AWS container in which the process is running.
        </description>
    </property>

    <property>
        <name>fs.defaultFS</name>
        <value>jfs://juicefsminio/hudi-dir</value>
        <description>Optional, you can also specify full path "jfs://myjfs/path-to-dir" with location to use JuiceFS</description>
    </property>
    <property>
        <name>fs.jfs.impl</name>
        <value>io.juicefs.JuiceFileSystem</value>
    </property>
    <property>
        <name>fs.AbstractFileSystem.jfs.impl</name>
        <value>io.juicefs.JuiceFS</value>
    </property>
    <property>
        <name>juicefs.meta</name>
        <value>mysql://root:root@(192.168.1.2:3306)/juicefs</value>
    </property>
    <property>
        <name>juicefs.cache-dir</name>
        <value>/tmp/juicefs-cache-dir</value>
    </property>
    <property>
        <name>juicefs.cache-size</name>
        <value>1024</value>
    </property>
    <property>
        <name>juicefs.access-log</name>
        <value>/tmp/juicefs.access.log</value>
    </property>
    <property>
      <name>juicefs.superuser</name>
      <value>flink</value>
    </property>

</configuration>

基于 core-site configmap 以及 flink-kubernetes-operator 创建 flink 任务

如果任务名称为 basic-example,那么还需要基于上述 core-site.xml 创建 hadoop-config-basic-example configmap

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
#  image: xiaozhch5/flink-sql-submit:hudi-0.12-juicefs
  image: 10.7.20.12:5000/flink/sqlsubmit-job:1.15.4-juicefs
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    s3.endpoint: "http://10.7.11.5:31667"
    s3.path.style.access: "true"
    s3.access.key: "minio"
    s3.secret.key: "minio123"
    state.backend.incremental: "true"
    execution.checkpointing.interval: "300000ms"
    state.savepoints.dir: "s3://flink-data/savepoints"
    state.checkpoints.dir: "s3://flink-data/checkpoints"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 2
  job:
    jarURI: local:///opt/flink/lib/flink-sql-submit-1.0.jar
    args: ["-f", "s3://flink-tasks/k8s-flink-sql-test.sql", "-m", "streaming", "-e", "http://10.7.11.5:31667", "-a", "minio", "-s", "minio123"]
    parallelism: 2
    upgradeMode: stateless
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
            - mountPath: /opt/hadoop/etc/hadoop/
              name: core-site
      volumes:
        - name: core-site
          configMap:
            name: core-site

flink sql 任务为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE TABLE Orders (
    order_number BIGINT,
    price        DECIMAL(32,2),
    order_time   TIMESTAMP(3),
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
);

CREATE TABLE Orders_hudi (
    order_number BIGINT,
    price        DECIMAL(32,2),
    order_time   TIMESTAMP(3),
    PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'jfs://juicefsminio/orders_hudi_2',
  'table.type' = 'MERGE_ON_READ'
);

insert into Orders_hudi select * from Orders;

为了部署 juicefs 还需要部署 mysql

1
2
3
4
5
6



sudo mount -t nfs -o nolock,nfsvers=3,vers=3 192.168.2.3:/Users/huangxiaofeng/nas_a /Users/huangxiaofeng/nas_a2

touch /Users/huangxiaofeng/nas_a2/hello

mysql-pvc.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: mysql-pvc
spec:
  storageClassName: juicefs-sc
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 5G

mysql-pvc-mac.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
apiVersion: v1

kind: PersistentVolume

metadata:

name: nfspv1

spec:

mountOptions:

- nfsvers=3

- nolock

capacity:

storage: 10Gi

accessModes:

- ReadWriteOnce

persistentVolumeReclaimPolicy: Recycle

storageClassName: nfs

nfs:

path: /Users/huangxiaofeng/nas_a

server: docker.for.mac.host.internal

mysql-deployment.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: v1
kind: ReplicationController
metadata:
  name: mysql-rc
  namespace: default
  labels:
    name: mysql-rc
spec:
  replicas: 1
  selector:
    name: mysql-rc
  template:
    metadata:
      labels:
        name: mysql-rc
    spec:
      containers:
        - name: mysql
          image: mysql:5.7.39
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 3306
          env:
            - name: MYSQL_ROOT_PASSWORD
              value: "root"
          volumeMounts:
            - name: mysql-persistent-storage
              mountPath: /var/lib/mysql          #MySQL容器的数据都是存在这个目录的,要对这个目录做数据持久化
      volumes:
        - name: mysql-persistent-storage
          persistentVolumeClaim:
            claimName: mysql-pvc                 #指定pvc的名称

mysql-service.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
apiVersion: v1
kind: Service
metadata:
  labels:
    expose: "true"
    name: mysql-rc
  name: mysql
spec:
  type: NodePort
  ports:
    - name: http
      port: 3306
      protocol: TCP
      nodePort: 30006
  selector:
    name: mysql-rc

使用 juicefs 初始化 minio

minio 默认账号密码为:minioadmin/minioadmin,在 minio 创建 juicefs 桶。

mysql 默认账号密码为:root/root,在 minio 创建 juicefs 数据库

镜像为采用 flink1.17.1 版本,基于 jdk17 运行

image-20230927180557391

运行环境 jdk17

image-20230927180630019

包含步骤,编译 flink1.17.1

编译 hudi 1.0-SN(基于 jdk1.8)

1
 mvn clean install -DskipTests  -Dcheckstyle.skip=true -Dscala-2.12  -Pflink1.17  -Dhadoop.version=3.0.0  -Pflink-bundle-shade-hive2 --settings ~/.m2/setting-flink2.xml

注意一定要加上 Pflink-bundle-shade-hive2 ,不然编译的 jar 包很小,无法使用正常使用 hudi

,编译后的 hudi-flink1.17-bundle-1.0-SNAPSHOT.jar 大小需要达到 80m 左右。

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up