flink+cdc+mysql+postgres+es

前置条件:

docker

flink

jdk8

mysql+postgres

docker-compose 启动准备环境

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    volumes:
      - ./mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

启动

1
docker compose up -d

image-20230417234000894

启动 flink 环境:

1
flink-1.17

需要另外准备 jar 包

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
├── flink-cep-1.17.0.jar
├── flink-connector-files-1.17.0.jar
├── flink-csv-1.17.0.jar
├── flink-dist-1.17.0.jar
├── flink-json-1.17.0.jar
├── flink-scala_2.12-1.17.0.jar
├── flink-sql-connector-elasticsearch7-1.16.0.jar
├── flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
├── flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar
├── flink-table-api-java-uber-1.17.0.jar
├── flink-table-planner-loader-1.17.0.jar
├── flink-table-runtime-1.17.0.jar
├── log4j-1.2-api-2.17.1.jar
├── log4j-api-2.17.1.jar
├── log4j-core-2.17.1.jar
└── log4j-slf4j-impl-2.17.1.jar

image-20230417234217049

进入 mysql 操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
docker-compose exec mysql mysql -uroot -p123456
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

进入 postgres 容器执行以下命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
docker-compose exec postgres psql -h localhost -U postgres
-- PG
CREATE TABLE shipments (
  shipment_id SERIAL NOT NULL PRIMARY KEY,
  order_id SERIAL NOT NULL,
  origin VARCHAR(255) NOT NULL,
  destination VARCHAR(255) NOT NULL,
  is_arrived BOOLEAN NOT NULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
ALTER TABLE public.shipments REPLICA IDENTITY FULL;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
       (default,10002,'Hangzhou','Shanghai',false),
       (default,10003,'Shanghai','Hangzhou',false);
  1. 使用下面的命令跳转至 Flink 目录下

  2. 1
    
    bin/start-cluster.sh
    

    看到如下界面 http://localhost:8081

image-20230417234719165

使用下面的命令启动 Flink SQL CLI

1
./bin/sql-client.sh

image-20230417234938395

在 flink sql client 上执行以下命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE products (
>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = 'localhost',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = '123456',
>     'database-name' = 'mydb',
>     'table-name' = 'products'
>   );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE orders (
>    order_id INT,
>    order_date TIMESTAMP(0),
>    customer_name STRING,
>    price DECIMAL(10, 5),
>    product_id INT,
>    order_status BOOLEAN,
>    PRIMARY KEY (order_id) NOT ENFORCED
>  ) WITH (
>    'connector' = 'mysql-cdc',
>    'hostname' = 'localhost',
>    'port' = '3306',
>    'username' = 'root',
>    'password' = '123456',
>    'database-name' = 'mydb',
>    'table-name' = 'orders'
>  );
[INFO] Execute statement succeed.

Flink SQL>  CREATE TABLE shipments (
>    shipment_id INT,
>    order_id INT,
>    origin STRING,
>    destination STRING,
>    is_arrived BOOLEAN,
>    PRIMARY KEY (shipment_id) NOT ENFORCED
>  ) WITH (
>    'connector' = 'postgres-cdc',
>    'hostname' = 'localhost',
>    'port' = '5432',
>    'username' = 'postgres',
>    'password' = 'postgres',
>    'database-name' = 'postgres',
>    'schema-name' = 'public',
>    'table-name' = 'shipments'
>  );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE enriched_orders (
>    order_id INT,
>    order_date TIMESTAMP(0),
>    customer_name STRING,
>    price DECIMAL(10, 5),
>    product_id INT,
>    order_status BOOLEAN,
>    product_name STRING,
>    product_description STRING,
>    shipment_id INT,
>    origin STRING,
>    destination STRING,
>    is_arrived BOOLEAN,
>    PRIMARY KEY (order_id) NOT ENFORCED
>  ) WITH (
>      'connector' = 'elasticsearch-7',
>      'hosts' = 'http://localhost:9200',
>      'index' = 'enriched_orders'
>  );
[INFO] Execute statement succeed.

Flink SQL>  INSERT INTO enriched_orders
>  SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
>  FROM orders AS o
>  LEFT JOIN products AS p ON o.product_id = p.id
>  LEFT JOIN shipments AS s ON o.order_id = s.order_id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: ca63b90e47ab7356c0659647bf167145

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

image-20230417235107855

image-20230417235124620

image-20230417235153477

参考文档:

https://segmentfault.com/a/1190000043516625

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up