Hadoop/Spark读写ES之性能调优

腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇

腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇

通过前面几篇文章的介绍,相信大家已经基本了解了大数据组件结合ES使用的方法。

在该系列文章发布后,后台收到了大量的私信,询问了很多性能调优的问题。比如很多开发者测试Hive,Spark等数据导出写入到ES性能非常慢,百万级别的数据导出需要数小时之久。

在帮助部分开发者调优的过程中发现,固然有小部分case的问题原因是使用的集群规模较小,不能很好地承载大数据场景下高压力高吞吐的写入。但是,更多的case问题,还是因为我们没有对ES-Hadoop的参数做详细的了解及调优,过小的默认参数设置与我们的集群规模及写入吞吐不匹配,导致写入性能不高。

ES-Hadoop的参数非常多,包含了索引setting,mapping,网络,读写等等,其中的一些Advance高级参数非常重要,和我们的读写性能息息相关。这一类参数的默认值,都是假定用户使用的集群是一个在日常应用中常见的规模比较小的集群,但是国内的大数据场景下,动辄几亿用户画像人群数据的场景,如果不对这些参数调优,很难发挥应有的写入性能。

我们以Spark RDD写入ES为例:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import java.util.Map;
import java.util.List;

public class WriteToESUseRDD {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("my-app").clone()
                .set("es.nodes", "10.0.4.17")
                .set("es.port", "9200")
                .set("es.nodes.wan.only", "true")
                .set("es.batch.size.bytes", "30MB")
                .set("es.batch.size.entries", "20000")
                .set("es.batch.write.refresh", "false")
                .set("es.batch.write.retry.count", "50")
                .set("es.batch.write.retry.wait", "500s")
                .set("es.http.timeout", "5m")
                .set("es.http.retries", "50")
                set("es.action.heart.beat.lead", "50s");

        JavaSparkContext sc = new JavaSparkContext(conf);

        Map<String, ?> logs = ImmutableMap.of("clientip", "255.255.255.254",
                "request", "POST /write/using_spark_rdd HTTP/1.1",
                "status", 200,"size", 802,
                "@timestamp", 895435190);

        List<Map<String, ?>> list = ImmutableList.of(logs);

        JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list);

        JavaEsSpark.saveToEs(javaRDD, "logs-201998/type");

        sc.stop();
    }
}

可以看到,我们使用set方法设置了非常多的参数,下面就来逐一的介绍:

  1. es.nodes/es.prot: 这里比较简单,就是es的节点列表和端口号
  2. es.nodes.wan.only: 这里是表示使用的es节点ip是否是一个云环境中的ip,不允许使用节点嗅探探查真实的节点ip。适用于类似于腾讯云或AWS的ES云服务。
  3. es.batch.size.bytes/es.batch.size.entries: 这两个参数可以控制单次批量写入的数据量大小和条数,数据积累量先达到哪个参数设置,都会触发一次批量写入。我们知道,增大单次批量写入的数据,可以提高写入ES的整体吞吐。因为ES的写入一般是顺序写入,在一次批量写入中,很多数据的写入处理逻辑可以合并,大量的IO操作也可以合并。默认值设置的比较小,可以适当根据集群的规模调大这两个值,建议为20MB和2w条。当然,bulk size不能无限的增大,会造成写入任务的积压。
  4. es.batch.write.refresh: ES是一个准实时的搜索引擎,意味着当写入数据之后,只有当触发refresh操作后,写入的数据才能被搜索到。这里的参数是控制,是否每次bulk操作后都进行refresh。

    每次refresh后,ES会将当前内存中的数据生成一个新的segment。如果refresh速度过快,会产生大量的小segment,大量segment在进行合并时,会消耗磁盘的IO。
    默认值为开启,我们这里建议设置为false。在索引的settings中通过refresh_interval配置项进行控制,可以根据业务的需求设置为30s或更长。

  5. es.batch.write.retry.count/es.batch.write.retry.wait: 这两个参数会控制单次批量写入请求的重试次数,以及重试间隔。当超过重试次数后,Yarn任务管理会将该任务标记为failed,造成整个写数据任务的失败。默认值为3,为了防止集群偶发的网络抖动或压力过大造成的集群短暂熔断,建议将这个值调大,设置为50。
  6. es.http.timeout/es.http.retries: 这两个参数是控制http接口层面的超时及重试,覆盖读请求和写请求,和上面一样,默认值比较小。默认超时时间为1分钟,重试次数为3,建议调整为超时时间5分钟,重试次数50次。
  7. es.action.heart.beat.lead: 这个参数是控制任务超时之后多久才通知Yarn调度模块重试整个任务。大部分的写入场景,我们都不回设置写入任务的重试。
本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
Hadoop/Spark读写ES之性能调优
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
<<上一篇
下一篇>>