FLINK 基于1.15.2的Java开发-如何使用外部配置文件

flink 的外置 properties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#redis config
redis.host=localhost:27001,localhost:27002,localhost:27003
redis.sentinel.master=master1
redis.password=111111
jedis.pool.min-idle=25
jedis.pool.max-active=100
jedis.pool.max-idle=100
jedis.pool.max-wait=-1
jedis.pool.timeBetweenEvictionRunsMillis=5000
jedis.pool.testOnBorrow=true;
jedis.pool.testWhileIdle=true
connection.timeout=0
redis.selected.database=0
#kafka config
kafka.host=127.0.0.1
kafka.port=9092
kafka.bootstrapservers=127.0.0.1:9092
kafka.topic=test

然后代码中这样引入使用

1
2
3
4
5
KafkaSource<ProductBean> source = KafkaSource.<ProductBean>builder()
                .setBootstrapServers(paras.get("kafka.bootstrapservers")).setTopics(paras.get("kafka.topic"))
                .setGroupId("test01").setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer(true, true)))
                .build();

image-20230712095902794

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#redis config
redis.host=localhost:27001,localhost:27002,localhost:27003
redis.sentinel.master=master1
redis.password=111111
jedis.pool.min-idle=25
jedis.pool.max-active=100
jedis.pool.max-idle=100
jedis.pool.max-wait=-1
jedis.pool.timeBetweenEvictionRunsMillis=5000
jedis.pool.testOnBorrow=true;
jedis.pool.testWhileIdle=true
connection.timeout=0
redis.selected.database=0
#kafka config
kafka.host=127.0.0.1
kafka.port=9092
kafka.bootstrapservers=127.0.0.1:9092
kafka.topic=test
1
2
3
4
5
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParas = ParameterTool.fromArgs(args);
String propertiesFilePath = argParas.get("config_path");
logger.info(">>>>>>start to load properties from {}", propertiesFilePath);
ParameterTool paras = ParameterTool.fromPropertiesFile(propertiesFilePath);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Configuration conf = new Configuration();
conf.setString("redis.host", paras.get("redis.host"));
conf.setString("redis.password", paras.get("redis.password"));
conf.setInteger("redis.selected.database", paras.getInt("redis.selected.database"));
conf.setString("redis.sentinel.master", paras.get("redis.sentinel.master"));
conf.setString("jedis.pool.min-idle", paras.get("jedis.pool.min-idle"));
conf.setString("jedis.pool.max-active", paras.get("jedis.pool.max-active"));
conf.setString("jedis.pool.max-idle", paras.get("jedis.pool.max-idle"));
conf.setString("jedis.pool.max-wait", paras.get("jedis.pool.max-wait"));
conf.setString("jedis.pool.timeBetweenEvictionRunsMillis",
        paras.get("jedis.pool.timeBetweenEvictionRunsMillis"));
conf.setString("jedis.pool.testOnBorrow", paras.get("jedis.pool.testOnBorrow"));
conf.setString("jedis.pool.testWhileIdle", paras.get("jedis.pool.testWhileIdle"));
conf.setString("connection.timeout", paras.get("connection.timeout"));
env.getConfig().setGlobalJobParameters(conf);

然后再需要的地方写如下代码即可

1
2
3
4
5
6
7
ExecutionConfig.GlobalJobParameters parameters = getRuntimeContext().getExecutionConfig()
                .getGlobalJobParameters();
Configuration globConf = (Configuration) parameters;

//开始使用
String redisPassword=globConf.getString(ConfigOptions.key("redis.password").stringType().noDefaultValue()));
Integer port = globConf.getInteger(ConfigOptions.key("redis.port").intType().noDefaultValue());
Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up