通过docker-compose启动flink作业

部署一个 flink1.20 的版本,下面看一下 docker-compose 文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
 cat docker-compose.yml
version: "3.1"

services:
  cadvisor:
    image: gcr.io/cadvisor/cadvisor
    container_name: cadvisor
    ports:
      - "8080:8080"
    volumes:
      - /:/rootfs:ro
      - /var/run:/var/run:rw
      - /sys:/sys:ro
      - /var/lib/docker/:/var/lib/docker:ro

  mariadb:
    image: mariadb:10.6.14
    container_name: mariadb
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
    volumes:
      - ./sql/mariadb.cnf:/etc/mysql/mariadb.conf.d/mariadb.cnf
      - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "3306:3306"

  redis:
    image: redis:latest
    container_name: redis
    ports:
      - "6379:6379"
  jobmanager:
    image: flink:1.20.1-SN
    restart: always
    ports:
      - "8081:8081" # Flink UI port
    command: jobmanager
    volumes:
      - ./jars/flink-sql-connector-mysql-cdc-3.2-SNAPSHOT.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.4.1.jar
      - ./jars/flink-connector-jdbc-3.2-SNAPSHOT.jar:/opt/flink/lib/flink-connector-jdbc-3.2-SNAPSHOT.jar
      - ./jars/flink-s3-fs-hadoop-1.20-SNAPSHOT.jar:/opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.20-SNAPSHOT.jar
      - ./jars/mysql-connector-j-8.0.33.jar:/opt/flink/lib/mysql-connector-j-8.0.33.jar
      - ./jars/flink-connector-redis-1.3.0-jar-with-dependencies.jar:/opt/flink/lib/flink-connector-redis-1.3.0-jar-with-dependencies.jar
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        execution.checkpointing.mode: AT_LEAST_ONCE
        execution.checkpointing.interval: 60min
        execution.checkpointing.timeout: 10min
        state.backend: rocksdb
        state.backend.incremental: true
        state.checkpoints.dir: s3://state/checkpoints/
        s3.access.key: root
        s3.secret.key: hellowin
        s3.path.style.access: true
        s3.fs.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
        s3.endpoint: http://minio:9000

  taskmanager:
    image: flink:1.20.1-SN
    restart: always
    depends_on:
      - jobmanager
    command: taskmanager
    volumes:
      - ./jars/flink-sql-connector-mysql-cdc-3.2-SNAPSHOT.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.4.1.jar
      - ./jars/flink-connector-jdbc-3.2-SNAPSHOT.jar:/opt/flink/lib/flink-connector-jdbc-3.2-SNAPSHOT.jar
      - ./jars/flink-s3-fs-hadoop-1.20-SNAPSHOT.jar:/opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.20-SNAPSHOT.jar
      - ./jars/mysql-connector-j-8.0.33.jar:/opt/flink/lib/mysql-connector-j-8.0.33.jar
      - ./jars/flink-connector-redis-1.3.0-jar-with-dependencies.jar:/opt/flink/lib/flink-connector-redis-1.3.0-jar-with-dependencies.jar
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        s3.access.key: root
        s3.secret.key: hellowin
        s3.fs.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
        s3.endpoint: http://minio:9000
  # minio 兼容s3的存储 ,密码必须大于等于8位
  minio:
    image: minio/minio:latest
    environment:
      - MINIO_ACCESS_KEY=root
      - MINIO_SECRET_KEY=hellowin
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - ./minio:/data
    command: server /data --console-address ":9001" # 启动命令,指定数据目录和控制台地址

在这个文件中,部署了 flink 服务,通过 minio 来进行 checkpoint 的存储。

docker-compose up -d

image-20240605164144731

可以看到服务正常启动了。

通过 sql-client.sh 来进行作业的提交

1
docker exec -it jobmanager /opt/flink/bin/sql-client.sh

下面写一下 sql

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
-- read in the data from the table in mariadb
CREATE TABLE sales_records_table (
    sale_id INT,
    product_id INT,
    sale_date DATE,
    sale_amount DECIMAL(10, 2),
    PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mariadb',
    'port' = '3306',
    'username' = 'root',
    'password' = 'rootpassword',
    'database-name' = 'sales_database',
    'table-name' = 'sales_records'
);

-- create a view that aggregates the sales records
CREATE TEMPORARY VIEW total_sales AS
SELECT
    SUM(sale_amount) AS total_sales_amount
FROM
    sales_records_table;

-- create a redis sink table
CREATE TABLE redis_sink (
    key_name STRING,
    total DECIMAL(10, 2),
    PRIMARY KEY (key_name) NOT ENFORCED
) WITH (
    'connector' = 'redis',
    'redis-mode' = 'single',
    'host' = 'redis',
    'port' = '6379',
    'database' = '0',
    'command' = 'SET'
);

-- insert the aggregated sales records into the redis sink table
INSERT INTO
    redis_sink
SELECT
    'total_sales',
    total_sales_amount
FROM
    total_sales;

这任务是从 mysql 中读取数据写入到 redis 中

然后需要去 redis 中查看数据是否正常

image-20240605164355891

可以看到 5500 ,这代表了正常运行。

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up