flink-playgrounds跑flink任务

flink-playgrounds 的 github 库

https://github.com/huangxiaofeng10047/flink-playgrounds

image-20240321093951815

不错:看到了 gitemoji 表情

导入 idea 中看一下效果:

image-20240321094058343

开始运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
cd operations-playground
 docker-compose build
[+] Building 30.3s (17/17) FINISHED                                                                                                                                                        docker:default
 => [client internal] load build definition from Dockerfile                                                                                                                                          0.6s
 => => transferring dockerfile: 2.16kB                                                                                                                                                               0.1s
 => [client internal] load metadata for docker.io/library/maven:3.8-eclipse-temurin-17                                                                                                              25.8s
 => [client internal] load metadata for docker.io/apache/flink:1.16.0-scala_2.12-java11                                                                                                              0.1s
 => [client auth] library/maven:pull token for registry-1.docker.io                                                                                                                                  0.0s
 => [client internal] load .dockerignore                                                                                                                                                             0.3s
 => => transferring context: 2B                                                                                                                                                                      0.0s
 => [client stage-1 1/3] FROM docker.io/apache/flink:1.16.0-scala_2.12-java11                                                                                                                        0.0s
 => [client builder 1/7] FROM docker.io/library/maven:3.8-eclipse-temurin-17@sha256:40fcff4c4043d6adc90286c2e38ec70950f34f6dd5784f7e524866c66520cc23                                                 0.0s
 => [client internal] load build context                                                                                                                                                             0.8s
 => => transferring context: 89.15kB                                                                                                                                                                 0.4s
 => CACHED [client stage-1 2/3] WORKDIR /opt/flink/bin                                                                                                                                               0.0s
 => CACHED [client builder 2/7] COPY ./java/flink-playground-clickcountjob /opt/flink-playground-clickcountjob                                                                                       0.0s
 => CACHED [client builder 3/7] COPY settings.xml /usr/share/maven/conf/settings2.xml                                                                                                                0.0s
 => CACHED [client builder 4/7] WORKDIR /opt/flink-playground-clickcountjob                                                                                                                          0.0s
 => CACHED [client builder 5/7] RUN cat /etc/resolv.conf                                                                                                                                             0.0s
 => CACHED [client builder 6/7] RUN wget  https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-api-java/1.13.1/flink-table-api-java-1.13.1.pom                                          0.0s
 => CACHED [client builder 7/7] RUN  rm /usr/share/maven/conf/settings.xml &&      cp /usr/share/maven/conf/settings2.xml /usr/share/maven/conf/settings.xml &&      mvn clean verify                0.0s
 => CACHED [client stage-1 3/3] COPY --from=builder /opt/flink-playground-clickcountjob/target/flink-playground-clickcountjob-*.jar /opt/ClickCountJob.jar                                           0.0s
 => [client] exporting to image                                                                                                                                                                      0.4s
 => => exporting layers                                                                                                                                                                              0.0s
 => => writing image sha256:9719da33b93d0b7a184e0e84efe8a4ab2f608fd4ef2206f5c6c1b8b5cbeec8e9                                                                                                         0.2s
 => => naming to docker.io/apache/flink-ops-playground:1-FLINK-1.16-scala_2.12

开始运行任务

1
2
3
4
5
 ✔ Container operations-playground-zookeeper-1             Running                                                                                                                                   0.0s
 ✔ Container operations-playground-kafka-1                 Running                                                                                                                                   0.0s
 ✔ Container operations-playground-clickevent-generator-1  Running                                                                                                                                   0.0s  ✔ Container operations-playground-jobmanager-1            Running                                                                                                                                   0.0s
 ✔ Container operations-playground-taskmanager-1           Running                                                                                                                                   0.0s
 ✔ Container operations-playground-client-1                Started

访问 url 看看 http://localhost:8081/#/overview

image-20240321094803487

查看命令

1
2
3
4
5
6
//input topic (1000 records/s)
docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic input
//output topic (24 records/min)
docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

image-20240321100222382

查看帮助命令

1
2
3
4
5
6
7
8
9
docker-compose run --no-deps client flink list
#输出如下:
 docker-compose run --no-deps client flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
21.03.2024 01:45:21 : 8f60263629c3693f5273a9d9373437cd : Click Event Count (RUNNING)
21.03.2024 01:46:11 : 2b2c14a5de0992ad1a55b558fac1cd22 : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

观察故障与恢复

1、观察输出

1
2
docker-compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic output

2、模拟故障

这里模拟 TaskManager 进程的丢失

1
docker-compose kill taskmanager

几分钟之后观察

image-20240321100908960

重启一下看看,

1
docker-compose up taskmanager

再观察服务正常了。

image-20240321101157446

注意这个 docker-compose run –no-deps client flink list 运行后,其实是启动了一个 flink 容器进行运行命令的,刚才一共跑了三次就有三个。

curl localhost:8081/jobs {“jobs”:[{“id”:“025d98f000d1068b800af3dc51bcd953”,“status”:“RUNNING”}]}%

停止工作

要优雅地停止作业,你需要使用 CLI 或 REST API 的 “stop” 命令。为此,你需要该作业的 JobID,你可以通过列出所有正在运行的 Job 或从 WebUI 中获得。有了 JobID,你就可以继续停止该作业:

  • CLI
1
2
3
docker-compose run --no-deps client flink stop 025d98f000d1068b800af3dc51bcd953
Suspending job "025d98f000d1068b800af3dc51bcd953" with a CANONICAL savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints-directory/savepoint-025d98-8b7fd702c6a2

Savepoint 已经被存储到 flink-conf.yaml 中配置的 state.savepoint.dir 中,它被安装在本地机器的 /tmp/flink-savepoints-directory/ 下。在下一步中,你将需要这个 Savepoint 的路径。在 REST API 的情况下,这个路径已经是响应的一部分,你将需要直接查看文件系统。

1
ls -lia /tmp/flink-savepoints-directory

image-20240321110209055

请求

1
2
# triggering stop
curl -X POST localhost:8081/jobs/025d98f000d1068b800af3dc51bcd953/stop -d '{"drain": false}'

预期的响应(美化了打印)

1
2
3
{
  "request-id": "<trigger-id>"
}

请求

1
2
3
# check status of stop action and retrieve savepoint path
curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>
curl localhost:8081/jobs/025d98f000d1068b800af3dc51bcd953/savepoints/477f5b07a6bf993354196da81197c14b

预期的响应(美化了打印)

1
2
3
4
5
6
7
8
{
  "status": {
    "id": "COMPLETED"
  },
  "operation": {
    "location": "<savepoint-path>"
  }
}

步骤 2a: 重启 Job,不做任何改变

现在你可以从该保存点重新启动升级后的作业(Job)。为了简单起见,你可以在不做任何更改的情况下重新启动它。

  • CLI

命令

1
2
3
docker-compose run --no-deps client flink run -s /tmp/flink-savepoints-directory/savepoint-025d98-8b7fd702c6a2 \
  -d /opt/ClickCountJob.jar \
  --bootstrap.servers kafka:9092 --checkpointing --event-time

预期的输出

1
2
Starting execution of program
Job has been submitted with JobID <job-id>
  • REST API

请求

1
2
3
# Uploading the JAR from the Client container
docker-compose run --no-deps client curl -X POST -H "Expect:" \
  -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

预期的响应(美化了打印)

1
2
3
4
{
  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
  "status": "success"
}

请求

1
2
3
# Submitting the Job
curl -X POST http://localhost:8081/jars/<jar-id>/run \
  -d '{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'

预期的输出

1
2
3
{
  "jobid": "<job-id>"
}

一旦 Job 再次 RUNNING,你会在 output 主题中看到,当 Job 在处理中断期间积累的积压时,记录以较高的速度产生。此外,你会看到在升级过程中没有丢失任何数据:所有窗口都存在,数量正好是 1000。

步骤 2b: 用不同的并行度重新启动作业(重新缩放)

另外,你也可以在重新提交时通过传递不同的并行性,从这个保存点重新缩放作业。

  • CLI
1
2
3
docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
  -d /opt/ClickCountJob.jar \
  --bootstrap.servers kafka:9092 --checkpointing --event-time

预期的输出

1
2
Starting execution of program
Job has been submitted with JobID <job-id>
  • REST API

请求

1
2
3
# Uploading the JAR from the Client container
docker-compose run --no-deps client curl -X POST -H "Expect:" \
  -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

预期的响应(美化了打印)

1
2
3
4
{
  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
  "status": "success"
}

请求

1
2
3
# Submitting the Job
curl -X POST http://localhost:8081/jars/<jar-id>/run \
  -d '{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'

预期的响应(美化了打印)

1
2
3
{
  "jobid": "<job-id>"
}

现在,作业(Job)已经被重新提交,但它不会启动,因为没有足够的 TaskSlots 在增加的并行度下执行它(2 个可用,需要 3 个)。使用:

1
docker-compose scale taskmanager=2

你可以在 Flink 集群中添加一个带有两个 TaskSlots 的第二个 TaskManager,它将自动注册到 JobManager 中。添加 TaskManager 后不久,该任务(Job)应该再次开始运行。

一旦 Job 再次 “RUNNING”,你会在 output Topic 中看到在重新缩放过程中没有丢失数据:所有的窗口都存在,计数正好是 1000。

JobManager 通过其 REST API 公开系统和用户指标

端点取决于这些指标的范围。可以通过 jobs/<job-id>/metrics 来列出一个作业的范围内的度量。指标的实际值可以通过 get query 参数进行查询。

请求

1
curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"

预期的响应(美化了打印; 没有占位符)

1
2
3
4
5
6
[
  {
    "id": "lastCheckpointSize",
    "value": "9378"
  }
]

REST API 不仅可以用来查询指标,还可以检索运行中的作业状态的详细信息。

请求

1
2
# find the vertex-id of the vertex of interest
curl localhost:8081/jobs/<jod-id>

预期的响应(美化了打印)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
{
  "jid": "<job-id>",
  "name": "Click Event Count",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1564467066026,
  "end-time": -1,
  "duration": 374793,
  "now": 1564467440819,
  "timestamps": {
    "CREATED": 1564467066026,
    "FINISHED": 0,
    "SUSPENDED": 0,
    "FAILING": 0,
    "CANCELLING": 0,
    "CANCELED": 0,
    "RECONCILING": 0,
    "RUNNING": 1564467066126,
    "FAILED": 0,
    "RESTARTING": 0
  },
  "vertices": [
    {
      "id": "<vertex-id>",
      "name": "ClickEvent Source",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066423,
      "end-time": -1,
      "duration": 374396,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 0,
        "read-bytes-complete": true,
        "write-bytes": 5033461,
        "write-bytes-complete": true,
        "read-records": 0,
        "read-records-complete": true,
        "write-records": 166351,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "Timestamps/Watermarks",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066441,
      "end-time": -1,
      "duration": 374378,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 5066280,
        "read-bytes-complete": true,
        "write-bytes": 5033496,
        "write-bytes-complete": true,
        "read-records": 166349,
        "read-records-complete": true,
        "write-records": 166349,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "ClickEvent Counter",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066469,
      "end-time": -1,
      "duration": 374350,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 5085332,
        "read-bytes-complete": true,
        "write-bytes": 316,
        "write-bytes-complete": true,
        "read-records": 166305,
        "read-records-complete": true,
        "write-records": 6,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "ClickEventStatistics Sink",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066476,
      "end-time": -1,
      "duration": 374343,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 20668,
        "read-bytes-complete": true,
        "write-bytes": 0,
        "write-bytes-complete": true,
        "read-records": 6,
        "read-records-complete": true,
        "write-records": 0,
        "write-records-complete": true
      }
    }
  ],
  "status-counts": {
    "CREATED": 0,
    "FINISHED": 0,
    "DEPLOYING": 0,
    "RUNNING": 4,
    "CANCELING": 0,
    "FAILED": 0,
    "CANCELED": 0,
    "RECONCILING": 0,
    "SCHEDULED": 0
  },
  "plan": {
    "jid": "<job-id>",
    "name": "Click Event Count",
    "nodes": [
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEventStatistics Sink",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "FORWARD",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEvent Counter",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "HASH",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "Timestamps/Watermarks",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "FORWARD",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEvent Source",
        "optimizer_properties": {}
      }
    ]
  }
}

请查阅 REST API 参考资料,了解可能查询的完整列表,包括如何查询不同作用域的指标(如 TaskManager 指标)。

你可能已经注意到,Click Event Count 应用程序总是以 --checkpointing--event-time 程序参数启动。通过在 docker-compose.yaml 的客户端容器的命令中省略这些,你可以改变 Job 的行为。

  • --checkpointing 启用了 checkpoint,这是 Flink 的容错机制。如果你在没有它的情况下运行,并通过故障和恢复,你应该会看到数据实际上已经丢失了。
  • --event-time 启用了你的 Job 的事件时间语义。当禁用时,作业将根据挂钟时间而不是 ClickEvent 的时间戳将事件分配给窗口。因此,每个窗口的事件数量将不再是精确的 1000。

Click Event Count 应用程序还有另一个选项,默认情况下是关闭的,你可以启用这个选项来探索这个作业在背压下的行为。你可以在 docker-compose.yaml 的客户端容器的命令中添加这个选项。

  • --backpressure 在作业中间增加了一个额外的 operator,在偶数分钟内会造成严重的背压(例如,在 10:12 期间,但在 10:13 期间不会)。这可以通过检查各种网络指标(如 outputQueueLength 和 outPoolUsage)和/或使用 WebUI 中的背压监控来观察。
Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up