转载记一次惊心动魄的数据操作过程

  • 引言

    ​ 某客户集群的一个节点磁盘使用率达到 100%,集群写保护,被禁止写入了,ES 集群存储是以节点为单位来计算集群容量的。那么本客户集群是多节点、低配置,集群索引设置 1 主 1 副。如下截图:

    img

    集群单分片大索引

    一、大索引带来的问题

     在选ES集群规模时,我们经常提到集群选型,规划先行。这里的规划就表示集群容量、节点选型、分片设置、索引管理、模板制定、数据建模等方面。这样执行的目的是尽量让集群规模符合用户业务预期,并保持集群的高计算、高可用能力。尽量不要让集群产生特别大的分片。那么为什么不能产生大的分片呢?
    

    第一:大分片在数据读写方面并不具备性能优势,反而在返回结果方面因为要遍历集群所有的文档进而让 master 产生很大的压力;第二:大分片并不会给集群带来高计算能力。10 个节点 1 个大分片远比 10 个节点 10 个分片计算能力要差很多;反而因为某一特定业务高峰时期索引被集中的读写所带来的集群某节点负载过高所带来的集群 CPU、负载过高导致集群节点 OOM 进而自动下线的情况。

    第三:节点重启状态下分片恢复速度极慢,甚至出现不能恢复、数据丢失的情况;

    在本次客户集群维护过程中,基本都遇到了以上提到的这些问题。

    基于以上腾讯云官方建议 ES 集群的分片范围建议在 30G-50G 区间,一般我建议客户按照每个 shard 40G/node.尽量避免集群出现特大分片或者单分片索引。

    二、动态解决集群存量超大分片的问题

    目前客户集群存在2个超大索引,1个1T、1个700G,客户的这几个索引不能删,而且还必须接着往里写数据。前端程序不能中断太久,要求快速解决这个问题。怎么解决?
    
    从ES 6.6版本开始,官方推出在线切分索引shard的API -- Split ,因此我们可以通过此API将集群存量索引的主分片进行动态设置,**一般用来增大索引的主分片数。具体可用参考API官网:** https://www.elastic.co/guide/en/elasticsearch/reference/7.x/indices-split-index.html
    

    这里,我大致总结一下该 API 的注意事项:split 的过程

    1、创建一个新的目标index,其定义与源index相同,但是具有更多的primary shard。
    
    2、将segment从源index硬链接到目标index。(如果文件系统不支持硬链接,则将所有segment都复制到新索引中,这是一个非常耗时的过程。)
    
    3、创建低级文件后,再次对所有文档进行哈希处理,以删除属于不同shard的documents
    
    4、恢复目标索引,就像它是刚刚重新打开的封闭索引一样。
    

    ​ 5、 在切割的时候需要对源索引进行锁写,切割完后,再开启写入,切割过程中,如集群负载出现问题,等 green 后,可以删除切割成功的正在均衡分片的索引并及时扩容集群后再执行 split 操作。

    ​ 6、 split API 属于资源密集型工作,对集群的负载,比如 CPU、磁盘、lOAD 负载比较高,在执行此操作前,建议用户对集群业务做一定的降低,并升级集群配置,一定要保证集群有足够的资源区执行此操作。集群配置低,不要此操作。《非常重要》

    ​ **7、split API 会对源分片所有内容进行复制,因此在执行此操作过程中,会发现时间耗时非常长,数据同步完后,会发现索引膨胀特别大,比如单分片索引切割成 4 个分片后,数据膨胀 4 倍,无需担心,这个是 split 操作的正常流程,后面可用手动进行段合并,让分片大小降下来。可以加上下面的参数:“expunge_only_deletes=true”,**此命令也可以多次执行。可以通过_cat/recovery API 或者 celebro 查看该操作进行的状态。另外,在执行段合并的命令时,分片大小也不会立即进行删除释放,而是根据其默认算法在一定的时间后自动进行合并操作,因此在手动执行段合并过程中看到的分片大小时大时小的状态也是正常的。过一段时间后,分片即可恢复正常大小状态。

    img

    段合并

    ​ 8、在切的过程中,不要一次切太多分片,可以分批切割,这能减少此 API 切割带来的节点负载高而导致的 OOM 的情况。

    ​ 9、 切割完后目标索引状态为只读,需要手动开启写入。

     10、发现新生产的索引无问题后,可以将源超大分片的索引给关闭或者删除。这里是用户确认后就删除了。
    

    那么这样,我们就完成了一个大分片的切割过程,这个过程比较复杂并且漫长,大概耗时 2 天左右。目标索引如下图所示:

    img

    切割后的索引分片

    小结:

    Split API 可以动态扩充集群特定索引主分片数量,一般调大索引分片数量。

    split 会 copy 所有 segment 到新的 shard,目标索引数据会有很大的膨胀,forcemerge 之后就大小就正常。

    split 操作对集群资源有特别大要求,建议业务低峰期使用,避免数据丢失。谨慎操作。

    split 操作需要对源索引停写,对集群负载有要求,属于停机离线业务数据操作。

    业务不能停、主分片需要调大,动态调整,可在业务低峰期考虑 reindex API,但是如果操作数据特别大,也是一个比较漫长的过程。

    三、如何规避集群出现超大分片的问题?

    首先,我们想以下几个问题:

    **问题一:**上面提到了大分片带来各种问题,那么如何才能避免集群出现超大分片?

    问题二:上面 split 新生产的目标索引 **tenbus_target,**如何关联到现有业务保证正常业务可持续性呢?

    问题三:为什么上面的索引只切了 4 个分片不接着继续切成 8 或者 16 或者 20?

    问题四: 用户前端程序代码改造成本大,想接入原有索引继续读写怎么办?

    针对上面的问题,  一般情况下,我们肯定会建议用户:把集群模板分片策略改一下,改成20或者30,然后重新生成一个索引,让新数据写入到一个新的索引。或者让用户改变现有策略,基于时间的方式生产索引,每周或者每月生产一个索引。这样也是一个办法。然而用户并不认可这种方式。因为它需要改动客户前端代码,改造成本大。
    
    或者我们可以再用Split API再切,切成20个分片,然后接着让用户继续往里边读写。这样暂时是可以,但是迟早有一天这20个分片又会因为无尽的写入导致超大分片的产生。集群最终还是有问题。所以,这里没有再接着切。切分4个分片已经有很大的改善。
    

    **其次:**上面提到的 4 个问题也是客户所最关心的 4 个需求,那么如何高效解决用户的问题、满足客户的需求,又能得到用户的认可呢?

    方法论 使用 ES ILM+Rollover+Alias 的方式给用户解决这一系列的问题

    我们知道,从 ES6.6 版本开始,ES 集群免费开放 ILM 功能。同时结合 Rollover API,我们可以针对非时间序列的索引进行滚动操作,大概流程图如下:

    img

    ILM+Rollover 流程图

    这里,我大致操作步骤如下:

    1,给用户指定 2 个 ILM,每个 ILM 针对不同的业务索引模式,并进行测试匹配。

    img

    ilm1

    2,Rollover 设定,设置索引 600G 自动滚动更新到下一个索引。每个索引 20 个分片,一个副本。

    3,Alias 别名指定源索引名,并关联现有存量索引。一个写入,一个只能读取

    img

    tenbus 别名

    注意:这里的别名 tenbus 需要跟 ILM 别名、模板别名、索引别名保持一样。因为这样才能最大限度的保证用户无感知。

    当然,在对客户业务再次接入恢复前,还需要对源索引的 mapping、settings 进行匹配设置,然后再让用户业务重新接入。

    因此, 经过上面的操作,我们就已经顺利的完成了用户集群后续超级大分片的动态设置、集群索引的长期规划管理问题。同时用户侧代码自动接入别名,无需做任何改动,别名同时关联存量索引,那么这样就彻底解决了用户上面提到的所有需求。大大提升了用户满意度。

    以上就是整个操作的流程。大家好好思考一下。

    四、总结

    ​ 由于本次客户数据操作过程遇到的问题比较多,这里就没有一一详述,但是大致的思路基本就是这样。具体还是要多参考实际生产工作环境而定。这里只提供一种方法跟思路,那就是动态扩展索引分片数量、以及超大分片或者超大索引的解决办法问题,希望大家多动手、多总结。

    另外,在操作解决客户问题过程中,遇到了很多问题、盲区,最后都是请教各大神才得以顺利解决用户需求,这里再次向各位前辈致敬,感谢。

    同时,这篇文章还有很多知识点比如段合并、别名、ILM、索引设置、reindex索引等其他知识未做过多铺垫,各位请自行官网补全并动手实践。
    

    最重要的是:本文已生产实践,共享大家。时间仓促,不足之处,欢迎批评指正。


查看生产环境 shards 的存储状况

1
GET /_cat/shards?v&h=index,shard,docs,store

在老版本的 ES(例如 2.3 版本)中, index 的 shard 数量定好后,就不能再修改,除非重建数据才能实现。

从 ES6.1 开始,ES 支持可以在线操作扩大 shard 的数量(注意:操作期间也需要对 index 锁写)

从 ES7.0 开始,split 时候,不再需要加参数 index.number_of_routing_shards

需要注意的是: 这个 split 的过程中, 它会先复制全量数据,然后再去做删除多余数据的操作,需要注意磁盘空间的占用。

© 著作权归作者所有:来自 51CTO 博客作者我的二狗呢的原创作品,请联系作者获取转载授权,否则将追究法律责任 ES7.5 下动态扩大索引的 shard 数量 https://blog.51cto.com/lee90/2467377

分解过程中一下子扩容了 30 倍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# Click the Variables button, above, to create your own variables.
GET ${exampleVariable1} // _search
{
  "query": {
    "${exampleVariable2}": {} // match_all
  }
}
PUT /remote_statistics_s3/_settings?pretty
{

  "settings": {

    "index.blocks.write": true

  }

}
#执行split
POST /remote_statistics_s3/_split/remote_statistics?pretty
{

  "settings": {

    "index.number_of_shards": 10,

    "index.number_of_replicas": 0

  }

}

PUT /remote_statistics/_settings
{
  "settings": {
    "index.blocks.write": null
  }
}
GET /remote_statistics/_search
{
  "query": {
    "match_all": {}
  }
}
POST /remote_statistics/_forcemerge

GET /_cat/shards?v&h=index,shard,docs,store

image-20230619113629472

image-20230619113649254

bulkSizeMb: 15

#无论数据量多少,间隔 20s 执行一次 bulk

flushInterval: 20

#允许并发的 bulk 请求数

concurrentRequests: 10

这里的具体配置值,可以根据观察集群状态,来逐步增加。对于高版本的 es,可以通过 x-pack 的监控页面观察索引速度进行相应调整,如果 es 版本较低,可以使用推荐的 rest api 进行逻辑封装。在低版本的 es 中,统计写入速度的思路是:写一个程序定时检查索引的数据量,来计算。如果使用 python,就两行代码就能获取索引的数据总量。

1
2
call_list = es.indices.stats((index = index))
total = call_list['indices'][index]['total']['indexing']['index_total']

复制代码

也可以隔几分钟用 CURL 来粗略统计单个索引的数据量大小。命令如下:

1
2
3
#查询索引文档总量
curl -XGET -uname:pwd
'http://esip:port/_cat/count/index-name?v&format=json&pretty'

复制代码

启动多个进程

由于 Bulkprocess 是线程安全的,所以我们可以使用多线程的方式来共享一个批处理器。更好的消费方式是,启动多个消费程序进程,将其部署在不同的主机上,让多个进程中开启的多线程总数和 topic 的分区数相等,并且将他们设置为同一个消费组。

每一个进程包含一个 bulkprocess 处理器,可以提高消费和批量写入能力。同时避免了程序的单点问题,假如一个消费者进程挂掉,则 kafka 集群会重新平衡分区的消费者。少了消费者只是会影响消费速度,并不影响数据的处理。

“压测”,提升批量插入条数

通过对各个监控指标的观察,来判断是否能继续提高写入条数或增加线程数,从而达到最大吞吐量。

一、观察集群负载 Load Average 值

负载值,一定程度上代表了 CPU 的繁忙程度,那我们如何来解读 elasticsearch 监控页面的的负载值呢?如下是一个三个节点的集群,从左侧 cerebo 监控提供的界面来看,load 值标红,表明 es 的负载可能有点高了,那么这个具体达到什么值会显示红色呢,让我们一起来研究研究。

img

先从主机层面说起,linux 下提供了一个 uptime 命令来观察主机的负载。

img

其中 load average 的三个值,分别代表主机在 1 分钟、5 分钟、15 分钟内的一个负载情况。有人可能会疑惑,26.01 是代表主机的负载在 26%的意思吗,从我们跑的 es 集群情况来看,这显然不是负载很低的表现。

其实,在单个 cpu 的情况下,这个值是可以看做一个百分比的,比如负载为 0.05,表明目前系统的负荷为 5%。但我们的服务器一般都是多个处理器,每个处理器内部会包含多个 cpu 核心,所以这里负载显示的值,是和 cpu 的核心数有关的,如果非要用百分比来表示系统负荷的话,可以用具体的负载值 除以 服务器的总核心数,观察是否大于 1。总核心数查看的命令为:

1
cat/proc/cpuinfo |grep -c 'model name'

复制代码

这台主机显示为 24,从 26 的负载来看,目前处理的任务需要排队了,这就是为什么负载标红的原因。

同时,这里列举一下,如何查看 CPU 情况

总逻辑 CPU 数 = 物理 CPU 个数 X 每颗物理 CPU 的核数 X 超线程数

# 物理 CPU 个数

1
cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l

复制代码

(我们的服务器是 2 个)

# 查看每个物理 CPU 中 core 的个数(就是核数)

1
cat /proc/cpuinfo| grep "cpu cores"| uniq

复制代码

(6 核)

# 查看逻辑 CPU 的个数

1
cat /proc/cpuinfo| grep "processor"| wc -l

复制代码

(显示 24,不等于上面的 cpu 个数 * 每个 cpu 的核数,说明是开启了超线程)

二、观察集群在“忙什么”

通过 tasks api 可以直观的观察到集群在忙什么?,结果会显示包括父级任务,任务的持续时间等指标。命令如下:

1
curl -u username:pwd ip:port/_cat/tasks/?v | more

复制代码

img

上面是我把副本设置为 0 后截的图。理论上还应该有一个 bulk[s][r] 操作。可以看到目前写入很耗时,正常情况一批 bulk 操作应该是毫秒级的,这也从侧面说明了 es 的负载很高。

从 task_id、parent_task_id 可以看出,一个 bulk 操作下面分为写主分片的动作 和写副本的动作。其中:

indices:data/write/bulk[s][p]:s 表示分片,p 表示主分片。

indices:data/write/bulk[s][r]:s 表示分片,r 表示副本。

三、观察线程池状态

避免大量写入被拒绝,可以通过观察 elasticsearch 后台日志或是通过使用 Thread pool Api 来观察内部线程池的使用情况,以及相应使用的队列大小,判断是否还可以继续调整写入配置参数。

1
curl -uusername:pwd-XGET "http://esip:port/_cat/thread_pool?v" | grep write

复制代码

写入负载高的情况下,可能会出现大量拒绝,如下:

简易的写入流程

如下是 bulk 请求的简易写入流程,我们知道客户端会选择一个节点发送请求,这个节点被之称为协调节点,也叫客户端节点,但是在执行之前,如果定义了预处理的 pipline 操作(比如写入前将 key 值转换,或者增加时间戳等),则此写操作会被拦截并进行对应逻辑处理。从图中可以看出,写入操作会现根据路由出来的规则,决定发送数据到那个分片上去,默认情况下,是通过数据的文档 id 来进行路由的,这能保证数据平均分配到各个节点上去,也可以自定义路由规则,具体定义方式我们在下面会讲到。

接着,请求发送到了主分片上,主分片执行成功后,会将请求再转发给相应的副本分片,在副本分片上执行成功后,这个请求才算是执行完毕,然后将执行结果返回给客户端。可以看出多副本在写多读少的场景下,十分的消耗性能,近似的,多了几个副本就相当于重复写了几份数据。如果不考虑数据容灾,则可以适当的降低副本数量,或者去掉副本,提高写入速度。在我们的集群里面并没有用到 ingest 角色类型的节点,这里提出来说也是为了便于大家更好理解各个节点的角色分工。

img

通过 ES 提供的 API 观察各个节点的热线程,api 结果会显示出占用 cpu 高的线程,这也是我们可以优化的地方。大量写入场景下,这里一般大多数会显示:Lucene

Merge Thread 或者[write],查询命令为:

1
GET /_nodes/hot_threads

复制代码

三、主机部分

每个目录挂载不同的磁盘

在 data 目录下,我们分出了 10 个子目录,分别挂载到不同的硬盘上去。这相当于做了 raid0。能大大的提高写入速度。

配置多个 path.data 路径

由于在前面我们将 10 个目录分别挂载到不同的硬盘上去,所以在 elasticsearch.yml 的 path.data 属性中,我们配置多个路径,让数据能高效的写入不同的目录(硬盘),需要注意的是,如果只有一个索引,它的分片在某个节点的存储目录是固定的。所以这个特性,也只有在存在多个索引的情况下,能发挥出它的作用。

一个主机启动两个节点

es 实例分配内存不会超过 32G,对于主机数量固定的我们,如果 125G 的机器只放一个 es 节点,实属有点浪费,所以考虑在主机上启动两个 es 节点实例。

配置上需要注意关注以下几点:

1、http 的端口、节点间通信的 trasport 端口设置。

2、节点的角色分配。

3、脑裂配置对应修改。

4、path.data 属性修改(重要

5、path.logs 属性修改。

均分硬盘

这里着重说一下第 4 点,同一个主机启动两个实例后,我们将 path.data 配置从原来的 10 个目录改为了各自配置 5 个不同目录。

1
path.data: /data01/esdata,/data02/esdata,/data03/esdata,/data04/esdata,/data05/esdata

复制代码

一方面是 能够控制分片的分配,避免太多分片分配到一台主机上的其中一个节点上。另一方面是避免两个 es 进程对同一磁盘进行写入。随机写造成的磁头非常频繁的大面积移动肯定比单进程的顺序写入慢,这也是我们提高写入速度的初衷。

更换 ssd

ssd 能成倍的提高写入速度,如果使用 ssd,可能就不会折腾这篇文章出来了(偷笑)。

四、elasticsearch 部分

节点角色的设置

img

elasticsearch 提供几种类型的节点角色设置,需要在 elasticsearch.yml 配置中指定。

指定索引模板

可以根据需要修改,具体配置含义不再细说。

1
{  "order": 0,  "index_patterns": [    "topicA*"  ],  "settings": {    "index": {      "refresh_interval": "40s",      "number_of_shards": "30",      "translog": {        "flush_threshold_size": "1024mb",        "sync_interval": "120s",        "durability": "async"      },      "number_of_replicas": "0",      "merge": {        "scheduler": {          "max_thread_count": "1"        }      }    }  },  "mappings": {  },  "aliases": {}}

复制代码

计算分片数

需要注意分片数量最好设置为节点数的整数倍,保证每一个主机的负载是差不多一样的,特别的,如果是一个主机部署多个实例的情况,更要注意这一点,否则可能遇到其他主机负载正常,就某个主机负载特别高的情况。

一般我们根据每天的数据量来计算分片,保持每个分片的大小在 50G 以下比较合理。如果还不能满足要求,那么可能需要在索引层面通过拆分更多的索引或者通过别名+按小时 创建索引的方式来实现了。

控制分片均分各个主机

以 TopicA 数据的一个索引为例,共 30 个分片,在 10 个节点上分配,应该每个节点分配 3 个分片,一个主机上一共有 6 个分片才算是均衡。如果分配不是这样,可以使用 cerebo 或者通过命令行进行分片迁移。

img

1
curl -X POST "localhost:9200/_cluster/reroute?pretty" -H 'Content-Type: application/json' -d'{    "commands" : [        {            "move" : {                "index" : "test", "shard" : 0,                "from_node" : "node1", "to_node" : "node2"            }        }    ]}

复制代码

配置索引缓冲区

即是指定 indices.memory.index_buffer_size 的大小,这个是一个静态变量,需要修改配置文件,重启后才能生效。

参考的计算公式:indices.memory.index_buffer_size / shards_count > 512MB(超过这个值索引性能并不会有太明显提高)

shards_count 为一个节点上面的分片数量,可以配置具体指或者一个占用 Es 内存总值的百分比。这里我们修改成了 20%(默认 10%)。

img

路由分片

可以使用 elasticsearch 提供的 routing 特性,将数据按一定规则计算后(内部采用 hash 算法,把相同 hash 值的文档放入同一个分片中),默认情况下是使用 DocId 来计算,写入到分片,查询时指定 routing 查询,则可以提高查询速度,避免了扫描过多的分片带来的性能开销。

第一步:在创建索引模板的时候,需要在 mappings 中增加配置,要求匹配到此索引模板的索引,必须配置 routing:

1
"_routing": {        "required": true}

复制代码

第二步: 为 BulkPorcess 创建 IndexRequest 时,通过 routing(java.lang.String routing) 方法指定参与计算 hash 的值。

注意这里是具体的值,而不是字段名称。

五、效果

经过如上的调优配置,三个 Topic 数据都能正常写入,集群文档总数在 170 亿,33 个索引,每个索引保留 4 天,242 个分片,集群负载正常。

img

六、踩过的坑

节点角色的设置方面

如果集群中节点数量不多,并且不需要对数据进行预处理,那么其实可以放弃使用 Ingest 类型的节点。默认情况下所有的节点的默认设置都为 true。所以我们手动将主节点和数据节点做如下设置

1
node.ingest: false

复制代码

但是需要注意一点,x-pack 监控用到了这种类型的节点。会如下错误:

1
failed to flush export bulks no ingest node

复制代码

解决办法是,打开这个属性配置,或者 elasticsearch.yml 中指定:

1
xpack.monitoring.exporters.my_local: type: xpack.monitoring.exporters.local use_ingest: false

复制代码

elasticsearch 线程池相关配置参数改变

从 5.0 版本以后,禁止了修改各个模块线程池的类型,线程池相关配置的前缀从 threadpool 变成了 thread_pool。 并且线程池相关配置级别上升至节点级配置,禁止通过使用 API 修改,因为场景是写多读少,所以我们只是增加了写队列的大小,配置为: thread_pool.write.queue_size: 1000。只能通过修改配置文件的方式修改。

单台主机负载过高

同一个主机两个节点都是数据节点,并且分片分配不均匀,导致这个主机 CPU 使用率在 98%左右,后面通过迁移分片的方式将负载降低。

自定义 routing 写热点问题

比如按省份分的数据, 省份为北京的数据过多,西藏的数据很少,可能会带来写热点问题。所以合理的路由分配同样很重要。

作者:

侠梦,通信公司 java 研发工程师,关注 java、微服务架构、mysql、elasticsearch 等领域。

参考文章:

http://kane-xie.github.io/2017/09/09/2017-09-09_Elasticsearch写入速度优化/

https://www.elastic.co/guide/en/elasticsearch/reference/5.0/breaking_50_settings_changes.html

https://elasticsearch.cn/question/1915

https://juejin.im/entry/5d0f17cce51d454d544abf7f

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up