flink on k8s -- FlinkReouseceListener Plugin 监听 JobStatus

转载:http://xinzhuxiansheng.com/articleDetail/ 大家可能会对  Operator FlinkResourceListener Plugin有些不熟悉,下面来看,官网对它的介绍(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource-listeners)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
The Flink Kubernetes Operator allows users to listen to events and status updates triggered for the Flink Resources managed by the operator. This feature enables tighter integration with the user’s own data platform.

By implementing the FlinkResourceListener interface users can listen to both events and status updates per resource type (FlinkDeployment / FlinkSessionJob). These methods will be called after the respective events have been triggered by the system. Using the context provided on each listener method users can also get access to the related Flink resource and the KubernetesClient itself in order to trigger any further events etc on demand.

Similar to custom validator implementations, resource listeners are loaded via the Flink Plugins(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/) mechanism.

In order to enable your custom FlinkResourceListener you need to:

    Implement the interface
    Add your listener class to org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener in META-INF/services
    Package your JAR and add it to the plugins directory of your operator image (/opt/flink/plugins)

根据以上描述,可知, Operator 提供 Plugin 机制,其目录在 Docker Images 的  /opt/flink/plugins, 若针对监听 FlinkDeployment、FlinkSessionJob 的  Flink Job events and status updates, 可通过实现  FlinkResourceListener interface来实现该需求。

估计看到这,你肯定也想蠢蠢欲试了。因为这对于我们解决实际生产环境中实时获取作业状态,提供了很大便利,当然这也带来一些不稳定的因素,下面我们就分析下:
resourcelisteners01

  • Job Status、Event 主动 Push  到自建平台  比  轮询查询 Flink REST API 或者 Query Prometheus(上报 Flink Metrics)实时性强、开发成本低
  • FlinkResourceListener Plugin 开发,增加了 Operator JVM 运行不稳定因素(开发人员技术层次不齐),其实我个人真的不偏向于这点,Operator 机制就是为了简化用户使用、开发成本。

这是利用 java spi 机制 Java SPI 机制,即 Java Service Provider Interface。 是 Java 提供的基于“接口编程 + 策略模式 + 配置文件”组合实现的动态加载机制。调用者可以根据实际使用需要,来启用、扩展或者替换框架的现有实现策略。在 Java 中,基于该 SPI 思想,提供了具体的实现,ServiceLoader,利用该类可以轻松实现面向服务的注册与发现,完成服务提供与使用的解耦。

Java SPI 机制常见的例子,如:

  • 数据库驱动接口实现类的加载:JDBC 可以根据实际使用加载不同类型数据库的驱动,如 OracleDriver、SQLServerDriver、Driver(MySql)。
  • slf4j 日志门面接口实现类的加载:slf4j 日志门面并不是日志框架,需要使用 Java SPI 机制加载符合条件的日志框架接口实现类来完成日志框架的绑定,如 Log4j、Logback 等。

Java SPI 机制在 Flink 中的应用

在 Flink SQL 程序中用到了 Java SPI 机制动态加载各种 Factory 的实现类。比如说,对于 TableFactory 接口,Flink 程序会从程序所使用到的依赖中找到 META-INF/services/org.apache.flink.table.factories.TableFactory,通过反射实例化 TableFactory 接口的实现,然后通过 TableFactoryService#filter()方法筛选出符合条件的 TableFactory 实现类。以 Flink SQL 程序从 Kafka(版本 0.11)读取数据为例,Flink SQL 程序会首先获得 TableFactory 所有可用的实现类,通过 TableFactoryService#filter()得到符合条件的 TableFactory 实现类 Kafka011TableSourceSinkFactory 实例。本文主要说明 Java SPI 机制在 Flink SQL 程序中的应用,对于 TableFactory 实现类的筛选工作读者可自己阅读相关代码。

作者:empcl
链接:https://juejin.cn/post/6854573215633801230
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

开发 FlinkResourceListener Plugin

搭建项目

项目目录结构:
resourcelisteners02

pom.xml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

... 省略部分代码
 <dependencies>
        <!-- LOGGING begin -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.24</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-ext</artifactId>
            <version>1.7.24</version>
        </dependency>
        <!-- 代码直接调用commons-logging会被桥接到slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.24</version>
        </dependency>

        <!-- 代码直接调用java.util.logging会被桥接到slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jul-to-slf4j</artifactId>
            <version>1.7.24</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-1.2-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- log4j-slf4j-impl(用于log4j2与slf4j集成) -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.8.2</version>
        </dependency>

        <!-- LOGGING end -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-kubernetes-operator</artifactId>
            <version>1.8.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.fabric8</groupId>
            <artifactId>kubernetes-client</artifactId>
            <version>6.8.1</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

        </plugins>

    </build>
</project>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.javamain.flink;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
public class MyFlinkResourceListener implements FlinkResourceListener {
    private static final Logger logger = LoggerFactory.getLogger(MyFlinkResourceListener.class);

    @Override
    public void onDeploymentStatusUpdate(StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus> statusUpdateContext) {
        FlinkDeployment flinkResource = statusUpdateContext.getFlinkResource();
        FlinkDeploymentStatus previousStatus = statusUpdateContext.getPreviousStatus();
        FlinkDeploymentStatus newStatus = statusUpdateContext.getNewStatus();
        Instant timestamp = statusUpdateContext.getTimestamp();

        printlnFlinkDeploymentJobStatus("FlinkDeployment previousStatus", flinkResource, previousStatus);
        printlnFlinkDeploymentJobStatus("FlinkDeployment newStatus", flinkResource, newStatus);
    }

    private void printlnFlinkDeploymentJobStatus(String tag, FlinkDeployment flinkResource, FlinkDeploymentStatus jobStatus) {
        String namespace = flinkResource.getMetadata().getNamespace();
        String jobName = flinkResource.getMetadata().getName(); // NAME
        String state = jobStatus.getJobStatus().getState(); // JOB STATUS
        String lifecycleState = jobStatus.getLifecycleState().name(); // LIFECYCLE STATE
        logger.info("TAG: {}, NAMESPACE: {}, NAME: {}, JOB STATUS: {}, LIFECYCLE STATE: {}",
                tag,
                namespace,
                jobName,
                state,
                lifecycleState);
    }


    @Override
    public void onDeploymentEvent(ResourceEventContext<FlinkDeployment> resourceEventContext) {
        AuditUtils.logContext(resourceEventContext);
    }

    @Override
    public void onSessionJobStatusUpdate(StatusUpdateContext<FlinkSessionJob, FlinkSessionJobStatus> statusUpdateContext) {
        FlinkSessionJob flinkResource = statusUpdateContext.getFlinkResource();
        FlinkSessionJobStatus previousStatus = statusUpdateContext.getPreviousStatus();
        FlinkSessionJobStatus newStatus = statusUpdateContext.getNewStatus();
        Instant timestamp = statusUpdateContext.getTimestamp();

        printlnFlinkSessionJobStatus("FlinkSessionJob previousStatus", flinkResource, previousStatus);
        printlnFlinkSessionJobStatus("FlinkSessionJob newStatus", flinkResource, newStatus);
    }

    private void printlnFlinkSessionJobStatus(String tag, FlinkSessionJob flinkResource, FlinkSessionJobStatus jobStatus) {
        String namespace = flinkResource.getMetadata().getNamespace();
        String jobName = flinkResource.getMetadata().getName(); // NAME
        String state = jobStatus.getJobStatus().getState(); // JOB STATUS
        String lifecycleState = jobStatus.getLifecycleState().name(); // LIFECYCLE STATE
        logger.info("TAG: {}, NAMESPACE: {}, NAME: {}, JOB STATUS: {}, LIFECYCLE STATE: {}",
                tag,
                namespace,
                jobName,
                state,
                lifecycleState);
    }

    @Override
    public void onSessionJobEvent(ResourceEventContext<FlinkSessionJob> resourceEventContext) {
        AuditUtils.logContext(resourceEventContext);
    }
}

定义 SPI 实现类

在  resources目录下 创建  META-INF\services  目录, 并创建 以org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener  命名的文件,内容是 MyFlinkResourceListener 的全限定名 eg: com.javamain.flink.MyFlinkResourceListener

配置 Operator Plugin Listener 参数

此处有些一波三折,目前 Flink Operator 官网(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/ )并没有对 Operator Plugin Listener 参数做介绍,看了源码之后,本人也给 Flink 社区提了 jira(https://issues.apache.org/jira/browse/FLINK-35357),目前工单已 assigned 给我,我后续给 Flink Operator 的文档 提一个 PR。 目前 1.10 的版本上已经有这个配置了:

image.png ![[image-20240905145010932.png]]

修改  flink-kubernetes-operator/conf/flink-conf.yaml  添加以下内容:

1
2
3
4
5
# plugin listener 参数格式:
kubernetes.operator.plugins.listeners.<listener-name>.class: <fully-qualified-class-name>

# 在 flink-config.yaml,添加以下参数
kubernetes.operator.plugins.listeners.yzhoulistener.class: com.javamain.flink.MyFlinkResourceListener

注意:根据官网介绍(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/#dynamic-operator-configuration) Operator flink-operator-config configMap其 内部  flink-conf.yaml  配置,是支持动态加载的,可以直接修改 configMap YAML, 但注意它默认的加载时间间隔是 5 分钟。可查看 Operator Log:

image.png ![[image-20240905143131817.png]]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
2024-05-17 01:11:55,469 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2024-05-17 01:11:55,470 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2024-05-17 01:11:55,470 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.reconcile.interval, 15 s
2024-05-17 01:11:55,470 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.metrics.reporter.slf4j.interval, 5 MINUTE
2024-05-17 01:11:55,470 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.observer.progress-check.interval, 5 s
2024-05-17 01:11:55,470 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.health.probe.enabled, true
2024-05-17 01:11:55,471 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.health.probe.port, 8085
2024-05-17 01:11:55,471 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.plugins.listeners.yzhoulistener.class, com.javam
ain.flink.MyFlinkResourceListener
2024-05-17 01:11:55,471 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.operator.metrics.reporter.slf4j.factory.class, org.apache
.flink.metrics.slf4j.Slf4jReporterFactory

注意:完整的项目示例,可访问  https://github.com/xinzhuxiansheng/javamain-services/tree/main/javamain-flinkOperatorPlugin

重新打包 Operator Docker Image & 配置镜像

vim Dockerfile

1
2
3
FROM ghcr.io/apache/flink-kubernetes-operator:91d67d9
RUN mkdir -p /opt/flink/plugins/yzhou
COPY target/javamain-flinkOperatorPlugin-1.0-SNAPSHOT.jar /opt/flink/plugins/flinkoperatorplugin/javamain-flinkOperatorPlugin-1.0-SNAPSHOT.jar

注意:路径要符合 plugin path 的规则, /opt/flink/plugins/[自定义]/xxx.jar ,例如  RUN mkdir -p /opt/flink/plugins/yzhou

1
2
3
4
5
# 打包镜像
docker build -t harbor01.io/yzhou/flink-kubernetes-operator:91d67d9-04 .

# push镜像
docker push harbor01.io/yzhou/flink-kubernetes-operator:91d67d9-04

vim flink-kubernetes-operator/myvalues.yaml

1
2
3
4
5
6
# 将 “ghcr.io/apache/flink-kubernetes-operator” 修改为 “harbor01.io/yzhou/flink-kubernetes-operator”
# 将 “91d67d9” 修改为 “91d67d9-04”
image:
  repository: harbor01.io/yzhou/flink-kubernetes-operator # 修改 自己的私服地址
  pullPolicy: IfNotPresent
  tag: "91d67d9-04" # 修改 版本号

重新部署 Operator

注意:在之前的 Blog 涉及到多次对 Operator 进行卸载、安装,后续会简化这部分的介绍

1
2
3
4
5
# 卸载
helm uninstall flink-kubernetes-operator -n flink

# 安装
helm install -f myvalues.yaml flink-kubernetes-operator . --namespace flink

自定义 Plugin jar 是否存在  /opt/flink/plugins/目录下

1
2
3
4
5
6
7
# 检查 plugins/yzhou 目录是否存在
root@flink-kubernetes-operator-5597c49d78-tnq42:/opt/flink/plugins# ls


# 查看 listener plugin jar 是否存在
root@flink-kubernetes-operator-5597c49d78-tnq42:/opt/flink/plugins# ls yzhou/
javamain-flinkOperatorPlugin-1.0-SNAPSHOT.jar

image.png ![[image-20240905144200189.png]]

vim basic-application-deployment-only-ingress-tz.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-application-deployment-only-ingress-tz
spec:
  image: flink:1.17
  flinkVersion: v1_17
  ingress:
    template: 'flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)'
    className: 'nginx'
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: '/$2'
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: '2'
  serviceAccount: flink
  jobManager:
    resource:
      memory: '2048m'
      cpu: 1
  taskManager:
    resource:
      memory: '2048m'
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: TZ # 设置容器运行的时区
              value: Asia/Shanghai
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless
1
2
3
4
5
# 部署
kubectl -n flink apply -f basic-application-deployment-only-ingress-tz.yaml

# 删除
kubectl -n flink delete -f basic-application-deployment-only-ingress-tz.yaml

查看 Listener Log

通过部署、删除 可观察到  MyFlinkResourceListener log 的输出

部署 Job 的 log 输出 image.png ![[image-20240905145120358.png]]

1
2
3
4
5
6
7
8
2024-05-17 01:07:05,290 INFO  org.apache.flink.kubernetes.operator.listener.AuditUtils     [] - >>> Event  | Info    | JOBSTATUSCHANGED | Job status changed from CREATED to RUNNING
2024-05-17 01:07:05,291 INFO  org.apache.flink.kubernetes.operator.listener.AuditUtils     [] - >>> Event  | Info    | JOBSTATUSCHANGED | Job status changed from CREATED to RUNNING
2024-05-17 01:07:05,409 INFO  com.javamain.flink.MyFlinkResourceListener                   [] - TAG: FlinkDeployment previousStatus, NAMESPACE: flink, NAME: basic-application-deployment-only-ing
ress-tz, JOB STATUS: CREATED, LIFECYCLE STATE: DEPLOYED
2024-05-17 01:07:05,410 INFO  com.javamain.flink.MyFlinkResourceListener                   [] - TAG: FlinkDeployment newStatus, NAMESPACE: flink, NAME: basic-application-deployment-only-ingress-tz
, JOB STATUS: RUNNING, LIFECYCLE STATE: STABLE
2024-05-17 01:07:05,410 INFO  org.apache.flink.kubernetes.operator.listener.AuditUtils     [] - >>> Status | Info    | STABLE          | The resource deployment is considered to be stable and won’
t be rolled back

删除 Job 的 log 输出 image.png ![[image-20240905145102245.png]]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
2024-05-17 01:10:49,095 INFO  org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController [] - Cleaning up FlinkDeployment
2024-05-17 01:10:49,195 INFO  org.apache.flink.kubernetes.operator.listener.AuditUtils     [] - >>> Event  | Info    | CLEANUP         | Cleaning up FlinkDeployment
2024-05-17 01:10:49,195 INFO  org.apache.flink.kubernetes.operator.listener.AuditUtils     [] - >>> Event  | Info    | CLEANUP         | Cleaning up FlinkDeployment
2024-05-17 01:10:49,252 INFO  org.apache.flink.autoscaler.JobAutoScalerImpl                [] - Cleaning up autoscaling meta data
2024-05-17 01:10:49,254 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Job is running, cancelling job.
2024-05-17 01:10:49,276 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Job successfully cancelled.
2024-05-17 01:10:49,276 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Deleting cluster with Foreground propagation
2024-05-17 01:10:49,277 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Scaling JobManager Deployment to zero with 300 seconds timeout...
2024-05-17 01:10:51,717 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Completed Scaling JobManager Deployment to zero
2024-05-17 01:10:51,718 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Deleting JobManager Deployment with 297 seconds timeout...
2024-05-17 01:10:53,657 INFO  org.apache.flink.kubernetes.operator.service.AbstractFlinkService [] - Completed Deleting JobManager Deployment

注意:通过 log,总结规律,目前  MyFlinkResourceListener  案例,仅打印的 log,并没有将 Job Status 、Event 数据调用接口请求给自建平台,我想这部分的实现不算多麻烦, 大家可以实践起来。

总结

FlinkResourceListener Plugin 很好的弥补了轮询监控带来的延迟性, 如果直接通过 K8s Client Watch Flink Job Status,也是可以的,这也体现了 Operator的优势, 因为 Operator 将 Flink Job Status 也暴露在 CRD 中,例如:

1
2
3
[root@k8s01 job_yaml]# kubectl -n flink get flinkdeployments
NAME                                           JOB STATUS   LIFECYCLE STATE
basic-application-deployment-only-ingress-tz   RUNNING      STABLE

后面,我会再介绍一篇 “Akka Cluster 集成 K8s Client 监听 Flink Job Status”;

refer
1.https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/
2.https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/configuration/

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up