flink源码以jdk17启动

今天使用 jdk17 来启动 flink。 首先下载 flink 源码,这里使用 githubDesktop 来下载的,遇到很多问题,都是可以通过 google 解决的。 接下来,把 flink 导入到 idea 中。 image.png ![[image-20240812155528303.png]]

这样来选择 flink 的 maven 选项,这里需要勾选一下 fast,这样构建代码会变快。 接下来点击 profile 下的 clean 和 install。来进行代码编译,编译之后,可以看到 flink-dist 目录下面有构建完成的 flink 包。 image.png ![[image-20240812155702372.png]]

接下来进行 jdk17 的设置,首先选择 projectStructure,把 jdk 设置为 17 的版本 image.png ![[image-20240812155807525.png]]

在进入到 settings 中,把编译器设置为 17 版本号: image.png ![[image-20240812155909868.png]]

接下来进入到 Module 把 flink-core 设置为下图样子: image.png ![[image-20240812160112847.png]]

添加进去的参数可以在 config.yaml 中招

1
--add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED

启动 standalone 类: image.png ![[image-20240812160249473.png]]

image.png ![[image-20240812160301979.png]]

image.png ![[image-20240812160315675.png]]

log4j-console.properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30

# This affects logging for both user code and Flink
rootLogger.level = ${env:ROOT_LOG_LEVEL:-INFO}
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.pekko.name = org.apache.pekko
logger.pekko.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.console.filter.threshold.type = ThresholdFilter
appender.console.filter.threshold.level = ${sys:console.log.level:-ALL}

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = true
appender.rolling.fileName = ${env:log.file}
appender.rolling.filePattern = ${env:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

设置环境变量 env:log.file config.yaml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
################################################################################

# These parameters are required for Java 17 support.
# They can be safely removed when using Java 8/11.
env:
  java:
    opts:
      all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED

#==============================================================================
# Common
#==============================================================================

jobmanager:
  # The host interface the JobManager will bind to. By default, this is localhost, and will prevent
  # the JobManager from communicating outside the machine/container it is running on.  # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.  # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.  #  # To enable this, set the bind-host address to one that has access to an outside facing network  # interface, such as 0.0.0.0.  bind-host: localhost
  rpc:
    # The external address of the host on which the JobManager runs and can be
    # reached by the TaskManagers and any clients which want to connect. This setting    # is only used in Standalone mode and may be overwritten on the JobManager side    # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.    # In high availability mode, if you use the bin/start-cluster.sh script and setup    # the conf/masters file, this will be taken care of automatically. Yarn    # automatically configure the host name based on the hostname of the node where the    # JobManager runs.    address: localhost
    # The RPC port where the JobManager is reachable.
    port: 6123
  memory:
    process:
      # The total process memory size for the JobManager.
      # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.      size: 1600m
  execution:
    # The failover strategy, i.e., how the job computation recovers from task failures.
    # Only restart tasks that may have been affected by the task failure, which typically includes    # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.    failover-strategy: region

taskmanager:
  # The host interface the TaskManager will bind to. By default, this is localhost, and will prevent
  # the TaskManager from communicating outside the machine/container it is running on.  # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.  # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.  #  # To enable this, set the bind-host address to one that has access to an outside facing network  # interface, such as 0.0.0.0.  bind-host: localhost
  # The address of the host on which the TaskManager runs and can be reached by the JobManager and
  # other TaskManagers. If not specified, the TaskManager will try different strategies to identify  # the address.  #  # Note this address needs to be reachable by the JobManager and forward traffic to one of  # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').  #  # Note also that unless all TaskManagers are running on the same machine, this address needs to be  # configured separately for each TaskManager.  host: localhost
  # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
  numberOfTaskSlots: 1
  log:
    path: target/log/taskmanger/taskmanager.log
  memory:
    network:
      min: 128m
      max: 128m
    managed:
      size: 1024m

    framework:
      heap:
        size: 128m
      off-heap:
        size: 128m

        #    process:
    jvm-metaspace:
      size: 256m
    jvm-overhead:
       max: 256m
       min: 256m
      #      # The total process memory size for the TaskManager.
#      #
#      # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
#      # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
#      # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#      size: 1728m
    task:
      heap:
        size: 750m
      off-heap:
        size: 0m
  cpu:
    cores: 2

parallelism:
  # The parallelism used for programs that did not specify and other parallelism.
  default: 1

# # The default file system scheme and authority.
# # By default file paths without scheme are interpreted relative to the local
# # root file system 'file:///'. Use this to override the default and interpret
# # relative paths relative to a different file system,
# # for example 'hdfs://mynamenode:12345'
# fs:
#   default-scheme: hdfs://mynamenode:12345

#==============================================================================
# High Availability
#==============================================================================

# high-availability:
#   # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#   type: zookeeper
#   # The path where metadata for master recovery is persisted. While ZooKeeper stores
#   # the small ground truth for checkpoint and leader election, this location stores
#   # the larger objects, like persisted dataflow graphs.
#   #
#   # Must be a durable file system that is accessible from all nodes
#   # (like HDFS, S3, Ceph, nfs, ...)
#   storageDir: hdfs:///flink/ha/
#   zookeeper:
#     # The list of ZooKeeper quorum peers that coordinate the high-availability
#     # setup. This must be a list of the form:
#     # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#     quorum: localhost:2181
#     client:
#       # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
#       # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
#       # The default value is "open" and it can be changed to "creator" if ZK security is enabled
#       acl: open

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.

# # Execution checkpointing related parameters. Please refer to CheckpointConfig and CheckpointingOptions for more details.
# execution:
#   checkpointing:
#     interval: 3min
#     externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
#     max-concurrent-checkpoints: 1
#     min-pause: 0
#     mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
#     timeout: 10min
#     tolerable-failed-checkpoints: 0
#     unaligned: false

# state:
#   backend:
#     # Supported backends are 'hashmap', 'rocksdb', or the
#     # <class-name-of-factory>.
#     type: hashmap
#     # Flag to enable/disable incremental checkpoints for backends that
#     # support incremental checkpoints (like the RocksDB state backend).
#     incremental: false
#   checkpoints:
#       # Directory for checkpoints filesystem, when using any of the default bundled
#       # state backends.
#       dir: hdfs://namenode-host:port/flink-checkpoints
#   savepoints:
#       # Default target directory for savepoints, optional.
#       dir: hdfs://namenode-host:port/flink-savepoints

#==============================================================================
# Rest & web frontend
#==============================================================================

rest:
  # The address to which the REST client will connect to
  address: localhost
  # The address that the REST & web server binds to
  # By default, this is localhost, which prevents the REST & web server from  # being able to communicate outside of the machine/container it is running on.  #  # To enable this, set the bind address to one that has access to outside-facing  # network interface, such as 0.0.0.0.  bind-address: localhost
  # # The port to which the REST client connects to. If rest.bind-port has
  # # not been specified, then the server will bind to this port as well.  # port: 8081  # # Port range for the REST and web server to bind to.  # bind-port: 8080-8090
# web:
#   submit:
#     # Flag to specify whether job submission is enabled from the web-based
#     # runtime monitor. Uncomment to disable.
#     enable: false
#   cancel:
#     # Flag to specify whether job cancellation is enabled from the web-based
#     # runtime monitor. Uncomment to disable.
#     enable: false

#==============================================================================
# Advanced
#==============================================================================

# io:
#   tmp:
#     # Override the directories for temporary files. If not specified, the
#     # system-specific Java temporary directory (java.io.tmpdir property) is taken.
#     #
#     # For framework setups on Yarn, Flink will automatically pick up the
#     # containers' temp directories without any need for configuration.
#     #
#     # Add a delimited list for multiple directories, using the system directory
#     # delimiter (colon ':' on unix) or a comma, e.g.:
#     # /data1/tmp:/data2/tmp:/data3/tmp
#     #
#     # Note: Each directory entry is read from and written to by a different I/O
#     # thread. You can include the same directory multiple times in order to create
#     # multiple I/O threads against that directory. This is for example relevant for
#     # high-throughput RAIDs.
#     dirs: /tmp

# classloader:
#   resolve:
#     # The classloading resolve order. Possible values are 'child-first' (Flink's default)
#     # and 'parent-first' (Java's default).
#     #
#     # Child first classloading allows users to use different dependency/library
#     # versions in their application than those in the classpath. Switching back
#     # to 'parent-first' may help with debugging dependency issues.
#     order: child-first

# The amount of memory going to the network stack. These numbers usually need
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
#
# taskmanager:
#   memory:
#     network:
#       fraction: 0.1
#       min: 64mb
#       max: 1gb

#==============================================================================
# Flink Cluster Security Configuration
#==============================================================================

# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
# may be enabled in four steps:
# 1. configure the local krb5.conf file
# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
# 3. make the credentials available to various JAAS login contexts
# 4. configure the connector to use JAAS/SASL

# # The below configure how Kerberos credentials are provided. A keytab will be used instead of
# # a ticket cache if the keytab path and principal are set.
# security:
#   kerberos:
#     login:
#       use-ticket-cache: true
#       keytab: /path/to/kerberos/keytab
#       principal: flink-user
#       # The configuration below defines which JAAS login contexts
#       contexts: Client,KafkaClient

#==============================================================================
# ZK Security Configuration
#==============================================================================

# zookeeper:
#   sasl:
#     # Below configurations are applicable if ZK ensemble is configured for security
#     #
#     # Override below configuration to provide custom ZK service name if configured
#     # zookeeper.sasl.service-name: zookeeper
#     #
#     # The configuration below must match one of the values set in "security.kerberos.login.contexts"
#     login-context-name: Client

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
#
# jobmanager:
#   archive:
#     fs:
#       # Directory to upload completed jobs to. Add this directory to the list of
#       # monitored directories of the HistoryServer as well (see below).
#       dir: hdfs:///completed-jobs/

# historyserver:
#   web:
#     # The address under which the web-based HistoryServer listens.
#     address: 0.0.0.0
#     # The port under which the web-based HistoryServer listens.
#     port: 8082
#   archive:
#     fs:
#       # Comma separated list of directories to monitor for completed jobs.
#       dir: hdfs:///completed-jobs/
#       # Interval in milliseconds for refreshing the monitored directories.
#       fs.refresh-interval: 10000

这样设置以后,可以分开看到日志如下图所示: image.png ![[image-20240812160511859.png]]

因为 windows 不允许:出现再目录里,所以需要修改 taskMangerRunner 类中的 getTaskManagerResourceID,把连接服务·:·改成‘-’。 image.png ![[image-20240812162402610.png]]

参考文档: https://cloud.tencent.com/developer/article/2155944

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up