1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE products (
> id INT,
> name STRING,
> description STRING,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'mydb',
> 'table-name' = 'products'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE orders (
> order_id INT,
> order_date TIMESTAMP(0),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'mydb',
> 'table-name' = 'orders'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE shipments (
> shipment_id INT,
> order_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN,
> PRIMARY KEY (shipment_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'postgres-cdc',
> 'hostname' = 'localhost',
> 'port' = '5432',
> 'username' = 'postgres',
> 'password' = 'postgres',
> 'database-name' = 'postgres',
> 'schema-name' = 'public',
> 'table-name' = 'shipments'
> );
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE enriched_orders (
> order_id INT,
> order_date TIMESTAMP(0),
> customer_name STRING,
> price DECIMAL(10, 5),
> product_id INT,
> order_status BOOLEAN,
> product_name STRING,
> product_description STRING,
> shipment_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN,
> PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = 'http://localhost:9200',
> 'index' = 'enriched_orders'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO enriched_orders
> SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
> FROM orders AS o
> LEFT JOIN products AS p ON o.product_id = p.id
> LEFT JOIN shipments AS s ON o.order_id = s.order_id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: ca63b90e47ab7356c0659647bf167145
|