背景
每天elasticsearch集群在上午某个时间段CPU几乎打满,此时访问elasticsearch的服务rt会跟着抖动,通过排查发现是由于这个时间段会有数据通过hive任务写到elasticsearch,这个hive任务使用的是ES-Hadoop插件做的数据导入,整个问题的罪魁祸首已经发现了,那么应该怎么去解决呢?
优化方案一
ES-Hadoop组件已经发展多年,按道理说已经很成熟了,一般出现这种问题大概率都是使用不当的缘故,因此通过查看 官网配置 来查看是否使用不当,通过一番查看下来大致看到两个比较相关的配置
es.batch.size.entries: 控制每批写到Elasticsearch的消息条数
es.batch.write.retry.wait:控制重试批次之间时间间隔
通过反复针对这两个参数调优后发现,基本没太大的收益,此时elasticsearch集群CPU的突刺仿佛一把利刃插进心脏一样,不甘心,一定要将它消灭,此时又参考了网上不少优秀的文章例如 https://cloud.tencent.com/developer/article/1612108 这位大佬写的,但是并没有解决我的问题,继续摸索下一个方案
优化方案二
ES-Hadoop配置方式行不通,那么换个思路,咱们是通过Hive来将数据写到elasticsearch集群的,那么如果咱们将Hive任务的并行度降低些行不行呢?想到这里不禁喜悦起来,感觉自己又行了
通过下面几个配置来控制Hive写elasticsearch的并行度
set hive.exec.reducers.max=20;
set tez.am.vertex.max-task-concurrency=20;
set hive.tez.auto.reducer.parallelism=false;
虽然已经大幅降低并行度了,但是elasticsearch的CPU还是高居不下,仿佛在说,你过来打我呀~
优化方案三
通过上面两个失败的方式后,通过思考🤔发现最大的问题是elasticsearch集群的处理能力赶不上处理的能力,那么如果限制elasticsearch的写速率呢,通过跟elasticsearch dba沟通发现不支持这种方式。那只能从写的任务进行下手了
通过查阅ES-Hadoop代码发现,代码写得还挺好的,封装都不错~
通过跟踪代码发现最关键的代码在 BulkProcessor类的add方法,如下
/*** Adds an entry to the bulk request, potentially flushing if the request reaches capacity.* @param payload the entire bulk entry in JSON format, including the header and payload.*/
public void add(BytesRef payload) {// check space first// ba is the backing array for dataif (payload.length() > ba.available()) {if (autoFlush) {flush();}else {throw new EsHadoopIllegalStateException(String.format("Auto-flush disabled and bulk buffer full; disable manual flush or increase " +"capacity [current size %s]; bailing out", ba.capacity()));}}data.copyFrom(payload);dataEntries++;if (bufferEntriesThreshold > 0 && dataEntries >= bufferEntriesThreshold) {if (autoFlush) {flush();}else {// handle the corner case of manual flush that occurs only after the buffer is completely full (think size of 1)if (dataEntries > bufferEntriesThreshold) {throw new EsHadoopIllegalStateException(String.format("Auto-flush disabled and maximum number of entries surpassed; disable manual " +"flush or increase capacity [current size %s]; bailing out",bufferEntriesThreshold));}}}}
上面的逻辑也很清晰,就是要么数据写满本地容量或者数据写够配置的条数就触发一次发送。那么如果咱们能够控制两个批次之间间隔,是否就可以让elasticsearch集群“休息”一下,降低CPU从而正常的给其他业务提供服务呢?
纸上得来终觉浅,绝知此事要躬行。说干就干,以下是我的代码改动,供大家参考
/*** 控制写入批次的间隔*/private void waitSomeTime() {int batchWaitTime = settings.getBatchWaitTime();System.out.println("batchWaitTime is:" +batchWaitTime);if (batchWaitTime <= 0) return;try {System.out.println("start sleep!"+",time is :"+new Date()+", thread name is:"+Thread.currentThread().getName());Thread.sleep(batchWaitTime);System.out.println("end sleep!"+",time is :"+new Date()+", thread name is:"+Thread.currentThread().getName());} catch (Exception e) {e.printStackTrace();}}
Settings类
public int getBatchWaitTime() {return Integer.valueOf(getProperty(ES_BATCH_WAIT_TIME, ES_BATCH_WAIT_TIME_DEFAULT));}
ConfigurationOptions接口
/** Elasticsearch batch size given in bytes */String ES_BATCH_WAIT_TIME = "es.batch.wait.time";String ES_BATCH_WAIT_TIME_DEFAULT = "0";
最后在每写完一批数据后触发休息一下
改动完后,再结合以下配置执行任务
ES-Hadoop配置
'es.batch.size.bytes'='5mb'
'es.batch.size.entries'='2000',
'es.batch.wait.time'='2000', //咱们自己新加的配置
'es.batch.write.refresh'='false',Hive参数调优
set hive.exec.reducers.max=20;
set tez.am.vertex.max-task-concurrency=20;
set hive.tez.auto.reducer.parallelism=false;
通过测试验证将CPU从几乎100%干到了20%~30%,算是圆满完成任务了,心中的石头也落了下来,终于可以睡个好觉了~
补充说明
除了上述的几种方式,还有一些可供参考的方案列在这里
- 使用elasticsearch-rest-client-xxx.jar,不过这是通过http写入的性能会差些
- 扩容elasticsearch集群(经费充足的话可以考虑)
- 对数据做列处理,就是在业务高峰期仅写必要的几列数据到elasticsearch集群(但可能CPU还是会高)
在解决完这个问题后,本来想给ES-Hadoop社区提这个issue并进行fix的。但是发现了已经有大佬反馈过这个问题了 https://github.com/elastic/elasticsearch-hadoop/pull/1405,但是由于社区的前辈们认为有其他更好的解决方式并没有merge相关的代码
总结
以上就是整个问题的解决过程,我也相信一定有更好,更优雅的解决方式,如果你恰好有好的想法也可以给ES-Hadoop社区提供;但无论黑猫还是白猫,能抓到老鼠的才是好猫,通过方案三快速的解决的问题并且稳定运行了一年,给公司节省下扩容集群的成本,这在我这个菜鸡看来已经够了。如果你也比较赶时间的话,可以考虑直接试下我的jar包,希望能对您有帮助。