1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
cluster.name=my-application
ip=10.0.2.26
port=9300
# 每x个请求执行一次Bulk
setBulkActions=10
# 每5MB Bulk
setBulkSize=5
#无论请求多少,我都希望每x秒刷新一次*
setFlushInterval=10
# 设置并发请求数
setConcurrentRequests=5
index=logs
path=D:\\log.txt
Maven
org.elasticsearch.client
transport
7.1.0
org.apache.logging.log4j
log4j-core
2.11.1
com.lmax
disruptor
3.4.1
com.google.code.gson
gson
2.8.5
org.elasticsearch
elasticsearch
7.1.0
org.elasticsearch.client
elasticsearch-rest-client
7.1.0
org.elasticsearch.client
elasticsearch-rest-high-level-client
7.1.0
public static void init(String filepath) throws IOException, InterruptedException {
// https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
/*cluster.name 必须与服务端相同*/
Settings settings = Settings.builder().put("cluster.name", "my-application").build();
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "12346")); //es账号密码(默认用户名为elastic)
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(properties.getProperty("ip")), Integer.valueOf(properties.getProperty("port"))));
BulkProcessor build = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
/*在批量执行之前调用此方法*/
logger.info("---尝试插入{}条数据---", bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
/*批量执行后调用此方法。例如,您可以检查是否有一些失败的请求response.hasFailures()*/
logger.info("---插入完成-> {}条数据,---", bulkRequest.numberOfActions());
bulkResponse.hasFailures();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
/*批量失败并引发一个 Throwable*/
logger.error("error:" + throwable.getMessage() + " \ncause:" + throwable.getCause());
// logger.info("---尝试插入{}条数据---", bulkRequest.numberOfActions());
}
})
/*每x个请求执行一次Bulk*/
.setBulkActions(Integer.valueOf(properties.getProperty("setBulkActions")))
/*每5MB Bulk一次*/
.setBulkSize(new ByteSizeValue(Integer.valueOf(properties.getProperty("setBulkSize")), ByteSizeUnit.MB))
/*无论请求多少,我都希望每5秒刷新一次*/
.setFlushInterval(TimeValue.timeValueSeconds(Integer.valueOf(properties.getProperty("setFlushInterval"))))
/*设置并发请求数。值为0表示仅允许执行一个请求。值为1表示允许在累积新的批量请求时执行1个并发请求。*/
.setConcurrentRequests(Integer.valueOf(properties.getProperty("setConcurrentRequests")))
/*设置一个自定义退避策略,该策略最初将等待100毫秒,然后呈指数增长,然后重试最多3次。每当一个或多个批量项目请求失败时,都会尝试重试,EsRejectedExecutionException 并显示,表明有太多的计算资源可用于处理请求。要禁用退避,请通过BackoffPolicy.noBackoff()*/
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)
).build();
/*默认情况下BulkProcessor:
将bulkActions设置为 1000
设置bulkSize为 5mb
没有设置flushInterval
将parallelRequests设置为1,这意味着异步执行刷新操作。
将backoffPolicy设置为具有8次重试和50ms启动延迟的指数补偿。总等待时间约为5.1秒。
*/
String index = properties.getProperty("index");
logger.info("开始添加数据!");
Gson gson = new Gson();
// 读取日志文件
File file = new File(filepath);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "utf-8"), 10 * 1024 * 1024);// 用10M的缓冲读取文本文件
String line = "";
while ((line = reader.readLine()) != null) {
//TODO: write your business
Map<String, String> map = new HashMap<>();
/*第一种情况*/
String[] split = line.split("----");
if (split.length > 1) {
map.put("qq", split[1]);
map.put("phone", split[2]);
}
if (map.size() > 0) {
/*添加数据*/
build.add(new IndexRequest(index, "_doc").source(gson.toJson(map), XContentType.JSON));
}
}
logger.info("添加数据完成!等待提交......");
/*刷新剩余请求*/
//build.flush();
build.awaitClose(10, TimeUnit.MINUTES);
// build.close();
|