elasticsearch 使用BulkProcessor导入txt大文件

手上有一个 txt 大约有 8 亿条数据的样子,文件有 19G 左右,一开始百度搜了下,基本都是重复文章,不过也根据写这些文章的大佬慢慢 google 到了一些方法。 先说下导入机器配置: cpu: E5 1620V2 内存: 32G(分给 es 12G) 硬盘:4x2T raid 0(io 大概在 600 左右)

导入的几种方法: 1、bulk: ES 本地支持的批量导入方式,将 json 文件 post 到 es 进行处理。 将需要导入的数据先生成 json 文件,格式类似这种

1
2
3
4
5
6
7
8
#指定 index
{"index":{"_index":"suoy","_id":1}}
#字段
{"text_entry":"内容"}
{"index":{"_index":"suoy","_id":1}}
{"text_entry":"内容"}
{"index":{"_index":"suoy","_id":1}}
{"text_entry":"内容"}

……….. 然后使用 curl 提交

curl -H ‘Content-Type: application/x-ndjson’ -XPOST ‘127.0.0.1:9200/xxxxxxxx/doc/_bulk?pretty’ –data-binary @out.json 一开始我是尝试这种方法,用 python 将数据重新处理了下,生成的文件有 48GB……,还花了 3-5 个小时的样子,导入的时候直接失败~后面,看了下说是文件大小尽量不能超过 200MB???这样的话就要分割文件了,虽然可以 shell 脚本一个一个的提交小文件 json,后面想想直接放弃了(嫌麻烦)…….

2、logstash: ES 官方的另一个产品,将数据文本转换为 ES 的数据源。 我的文本一行只有两个字段,用 “—-” 分割,花了点时间学 logstash 直接上手开干,但是导入速度只有 9000 条/s 的样子,一个小时导了大概 3200w 条数据,导了 16 个小时大概导了 5.2 亿条数据,这速度完全不行啊,后面手贱不注意按了 Ctrl+c,嗯…..这下好了,不知道怎么断点续传,又得重新来…… 我用的脚本如下,有懂的大佬能否告知下 logstash 有没有类似 BulkProcessor 储存到 x 条数据再执行 Bulk 的方法?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
input {
    # 从文件读取日志信息
    file {
        path => "/data/sda3/k/log.txt"
        type => "text"
        start_position => "beginning"
    }
}
filter {
    mutate {
        split => ["message", "----"]
    }
	if [message][2] {
        mutate {
            add_field =>   {
                "title" => "%{[message][1]}"
				"log" => "%{[message][2]}"
            }
        }
    }else{
		if [message][0] {
        mutate {
            add_field =>   {
                "title" => "%{[message][0]}"
            }
        }
		}
		if [message][1] {
			mutate {
				add_field =>   {
					"log" => "%{[message][1]}"
				}
			}
		}
	}
}
output {
	elasticsearch{
		hosts => "127.0.0.1:9200"
		index => "logs"
		user => "elastic"
		#password  => "xxx"
		#document_type => "_doc"
	}
    # 标准输出
    #stdout { codec => rubydebug }
}

3、Bulk Processor rest 方式采用的是 restClient,基于 http 协议,Bulk Processor 使用的是 TransportClient,基于 Tcp 协议。 我这里是将数据 jaon 序列化后再提交的 配置文件

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111

cluster.name=my-application
ip=10.0.2.26
port=9300
# 每x个请求执行一次Bulk
setBulkActions=10
# 每5MB Bulk
setBulkSize=5
#无论请求多少,我都希望每x秒刷新一次*
setFlushInterval=10
# 设置并发请求数
setConcurrentRequests=5
index=logs
path=D:\\log.txt
Maven

          org.elasticsearch.client
          transport
          7.1.0
          org.apache.logging.log4j
          log4j-core
          2.11.1
          com.lmax
          disruptor
          3.4.1
          com.google.code.gson
          gson
          2.8.5
          org.elasticsearch
          elasticsearch
          7.1.0
          org.elasticsearch.client
          elasticsearch-rest-client
          7.1.0
          org.elasticsearch.client
          elasticsearch-rest-high-level-client
          7.1.0

    public static void init(String filepath) throws IOException, InterruptedException {
        // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
        /*cluster.name 必须与服务端相同*/
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();
//        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "12346"));  //es账号密码(默认用户名为elastic)
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName(properties.getProperty("ip")), Integer.valueOf(properties.getProperty("port"))));
        BulkProcessor build = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                /*在批量执行之前调用此方法*/
                logger.info("---尝试插入{}条数据---", bulkRequest.numberOfActions());
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                /*批量执行后调用此方法。例如,您可以检查是否有一些失败的请求response.hasFailures()*/
                logger.info("---插入完成-> {}条数据,---", bulkRequest.numberOfActions());
                bulkResponse.hasFailures();
            }
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                /*批量失败并引发一个 Throwable*/
                logger.error("error:" + throwable.getMessage() + " \ncause:" + throwable.getCause());
//                logger.info("---尝试插入{}条数据---", bulkRequest.numberOfActions());
            }
        })
                /*每x个请求执行一次Bulk*/
                .setBulkActions(Integer.valueOf(properties.getProperty("setBulkActions")))
                /*每5MB Bulk一次*/
                .setBulkSize(new ByteSizeValue(Integer.valueOf(properties.getProperty("setBulkSize")), ByteSizeUnit.MB))
                /*无论请求多少,我都希望每5秒刷新一次*/
                .setFlushInterval(TimeValue.timeValueSeconds(Integer.valueOf(properties.getProperty("setFlushInterval"))))
                /*设置并发请求数。值为0表示仅允许执行一个请求。值为1表示允许在累积新的批量请求时执行1个并发请求。*/
                .setConcurrentRequests(Integer.valueOf(properties.getProperty("setConcurrentRequests")))
                /*设置一个自定义退避策略,该策略最初将等待100毫秒,然后呈指数增长,然后重试最多3次。每当一个或多个批量项目请求失败时,都会尝试重试,EsRejectedExecutionException 并显示,表明有太多的计算资源可用于处理请求。要禁用退避,请通过BackoffPolicy.noBackoff()*/
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)
                ).build();
                /*默认情况下BulkProcessor:
                    将bulkActions设置为 1000
                    设置bulkSize为 5mb
                    没有设置flushInterval
                    将parallelRequests设置为1,这意味着异步执行刷新操作。
                    将backoffPolicy设置为具有8次重试和50ms启动延迟的指数补偿。总等待时间约为5.1秒。
                */
        String index = properties.getProperty("index");
        logger.info("开始添加数据!");
        Gson gson = new Gson();
        // 读取日志文件
        File file = new File(filepath);
        BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "utf-8"), 10 * 1024 * 1024);// 用10M的缓冲读取文本文件
        String line = "";
        while ((line = reader.readLine()) != null) {
            //TODO: write your business
            Map<String, String> map = new HashMap<>();
            /*第一种情况*/
            String[] split = line.split("----");
            if (split.length > 1) {
                map.put("qq", split[1]);
                map.put("phone", split[2]);
            }
            if (map.size() > 0) {
                /*添加数据*/
                build.add(new IndexRequest(index, "_doc").source(gson.toJson(map), XContentType.JSON));
            }
        }
        logger.info("添加数据完成!等待提交......");
        /*刷新剩余请求*/
        //build.flush();
        build.awaitClose(10, TimeUnit.MINUTES);
//        build.close();

我设置的 BulkActions 值为 50000 用这种方式导入速度达到了 6w/s 的速度,cpu 也是全部跑满,完全将性能发挥出来了~

参考文章: https://www.cnblogs.com/ttzsqwq/p/11077574.html https://blog.csdn.net/wslyk606/article/details/79413980

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up