1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public class FlinkConnectKafkaDDL {
private final static Logger logger = LoggerFactory.getLogger(FlinkConnectKafkaDDL.class);
public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.创建TableEnvironment(Blink planner)
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
env.setParallelism(3);
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage("file:///data/source/flink/flink-save-point");
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
// String catalogName = "flink_hive";
// String hiveDataBase = "flink";
// String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
//
// // Catalog
// HiveCatalog hiveCatalog =
// new HiveCatalog(catalogName,hiveDataBase,hiveConfDir);
// tableEnvironment.registerCatalog(catalogName , hiveCatalog);
// tableEnvironment.useCatalog(catalogName);
// DDL,根据kafka数据源创建表
String sql
= "CREATE TABLE kafkaTable (\n" +
" netId String,\n" +
" data String,\n" +
" stamp Int,\n" +
" method1 String\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kafka_json_monitor',\n" +
" 'properties.group.id' = 'testGroup1',\n" +
" 'properties.bootstrap.servers' = '10.7.20.26:30001',\n" +
" 'scan.startup.mode' = 'group-offsets',\n" +
" 'format' = 'json'\n" +
")";
tableEnvironment.executeSql(sql);
logger.error(">>>>>>get data from kafka ->" + sql);
Table table = tableEnvironment.sqlQuery("select * from kafkaTable");
tableEnvironment.toAppendStream(table , Row.class).print();
env.execute("kafka");
}
}
|