Druid源码阅读(一):Druid Hadoop-based ingestion实现

一、Druid Hadoop-based ingestion简介

Apache Druid是一款开源时序OLAP数据库,支持流数据摄入和批数据摄入两种数据写入方式,其中批数据摄入又包括Native batch和Hadoop-based两种方式。根据官方文档[1],Druid推荐使用Native batch方式摄入数据,因为这种方式更灵活,且没有对外部Hadoop集群的依赖。但Hadoop-based数据摄入也有其优势: 1. 生成Segment的离线计算过程可以使用Hadoop集群的计算资源,减少了druid集群的计算压力,做到计算和存储分离;2. 与大数据处理生态对接更方便,前期的数据预处理可以使用Spark、Flink等计算引擎,将处理结果放在某个HDFS目录下即可。

Hadoop-based数据摄入一般是向Druid Overload节点提交一个Json文件,里面会定义数据源、写入的datasource、数据格式等信息,具体语法可参考Druid官方文档[2]。

本文的目的就是对照Druid源码,解析Druid如何通过MapReduce任务完成索引计算并生成Segment文件存储。本文会聚焦于MapReduce任务的执行,略过Druid数据聚合和生成索引的逻辑。数据聚合和生成索引的逻辑相对比较复杂且独立,打算后续在另外的文章中详细描述。

二、MapReduce任务

Druid的Hadoop数据摄入任务实现在indexing-hadoop子工程中,核心代码是IndexGeneratorJob.java这个文件。接下来就深入这个文件,看看Druid如何将HDFS文件中的数据通过MapReduce任务转化为Segment存储下来。

任务构建

现在Spark、Flink框架比较流行,可能很多人已经不认识原始的MapReduce任务代码了。下面的代码就完成了构建一个MapReduce任务并提交给Hadoop集群。可以看到任务使用IndexGeneratorMapper类作为Mapper、使用IndexGeneratorPartitioner作为Partitioner、使用IndexGeneratorCombiner作为Combiner、使用IndexGeneratorReducer作为Reducer,并设置输入输出路径、任务配置参数,最后job.submit()提交任务。

这里job.getConfiguration().set("io.sort.record.percent","0.23")是配置环形缓冲区中用多大比例来保存数据索引,默认值是0.05。

job.getConfiguration().set("io.sort.record.percent", "0.23");

job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);
SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class);

if (config.getSchema().getTuningConfig().getUseCombiner()) {
    job.setCombinerClass(IndexGeneratorCombiner.class);
    job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
}

// setReducerClass(job); 
job.setReducerClass(IndexGeneratorReducer.class);

job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());

config.addInputPaths(job);

config.intoConfiguration(job);

JobHelper.setupClasspath(
    JobHelper.distributedClassPath(config.getWorkingPath()),
    JobHelper.distributedClassPath(config.makeIntermediatePath()),
    job
);

job.submit();

下面就具体说明Mapper、Combiner、Reducer的逻辑。

Mapper: IndexGeneratorMapper

IndexGeneratorMapper继承自HadoopDruidIndexerMapper<BytesWritable, BytesWritable>,进入HadoopDruidIndexerMapper,我们看到了熟悉的map函数。

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
{
    try {
      final List<InputRow> inputRows = parseInputRow(value, parser);

      for (InputRow inputRow : inputRows) {
        try {
          if (inputRow == null) {
            // Throw away null rows from the parser.
            log.debug("Throwing away row [%s]", value);
            context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
            continue;
          }

          if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
            final String errorMsg = StringUtils.format(
                "Encountered row with timestamp that cannot be represented as a long: [%s]",
                inputRow
            );
            throw new ParseException(errorMsg);
          }

          if (!granularitySpec.bucketIntervals().isPresent()
              || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
                                .isPresent()) {
            innerMap(inputRow, context);
          } else {
            context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
          }
        }
        catch (ParseException pe) {
          handleParseException(pe, context);
        }
      }
    }
    catch (ParseException pe) {
      handleParseException(pe, context);
    }
    catch (RuntimeException e) {
      throw new RE(e, "Failure on row[%s]", value);
    }
}

首先parseInputRow,将文本格式输入中的每一行转换为InputRow,这里的parser实例是HadoopyStringInputRowParser,inputRows的实例是MapBasedInputRow。具体调用的ParserSpec会根据提交Json中的spec.dataSchema.parser来实例化,Druid官方文档[3]中说明的数据格式在图一中都能找到对应ParserSpec实现。

图一 Druid ParserSpecs UML

然后对每一行数据做一个过滤,过滤掉空行、没有时间戳的行以及不在任务指定时间范围内的行。这里的判断逻辑是基于提交Json中的spec.dataSchema.granularitySpec.intervals字段,若该字段不存在,则任意时间的数据都可以摄入;若指定了该字段,则需要检查当前行的时间戳是否在需要摄入的时间范围内,以决定是否丢弃该行数据。如果通过了这些条件的校验,函数最终会调用到innerMap函数,否则会对丢弃的行计数,用于日志或监控。

下面来看IndexGeneratorMapper.innerMap函数。

protected void innerMap(
        InputRow inputRow,
        Context context
    ) throws IOException, InterruptedException
{
    // Group by bucket, sort by timestamp
    final Optional<Bucket> bucket = getConfig().getBucket(inputRow);

    if (!bucket.isPresent()) {
        throw new ISE("WTF?! No bucket found for row: %s", inputRow);
    }

    final long truncatedTimestamp = granularitySpec.getQueryGranularity()
                                                   .bucketStart(inputRow.getTimestamp())
                                                   .getMillis();
    
    // type SegmentInputRow serves as a marker that these InputRow instances have already been combined
    // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
    // data
    InputRowSerde.SerializeResult serializeResult = inputRow instanceof SegmentInputRow ?
                                                      InputRowSerde.toBytes(
                                                          typeHelperMap,
                                                          inputRow,
                                                          aggsForSerializingSegmentInputRow
                                                      )
                                                                                          :
                                                      InputRowSerde.toBytes(
                                                          typeHelperMap,
                                                          inputRow,
                                                          aggregators
                                                      );

    final byte[] hashedDimensions = HASH_FUNCTION.hashBytes(
        HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(
            Rows.toGroupKey(
                truncatedTimestamp,
                inputRow
            )
        )
    ).asBytes();
    
    context.write(
        new SortableBytes(
            bucket.get().toGroupKey(),
            // sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side
            ByteBuffer.allocate(Long.BYTES + hashedDimensions.length)
                      .putLong(truncatedTimestamp)
                      .put(hashedDimensions)
                      .array()
        ).toBytesWritable(),
        new BytesWritable(serializeResult.getSerializedRow())
    );

    ParseException pe = IncrementalIndex.getCombinedParseException(
        inputRow,
        serializeResult.getParseExceptionMessages(),
        null
    );
    if (pe != null) {
      throw pe;
    } else {
      context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1);
    }
}

这里第一步是获取该行所在的Bucket,代码中的Bucket的概念与Segment的概念是对应的,即在运算阶段属于同一个Bucket的数据,最终会被写入同一个Segment文件中。Bucket由shardNum、truncatedTimestamp和partitionNum唯一确定(理论上通过shardNum就可以唯一确定,后面会看到shardNum、truncatedTimestamp和partitionNum之间的关系)。因为在进入该函数前已经按时间区间做了过滤,所以这里获取Bucket理论上一定会成功。然后需要对行的时间戳做截取,比如queryGranularity设置为小时,这里就会将时间戳截取至小时粒度,剩余的精度全部补0。时间戳截取的目的是为了方便rollup,即将属于同一个时间区间的数据预聚合起来,这样虽然丢失了部分原始信息,但可以很大程度减少存储的数据量,并提升查询效率。

下一步将InputRow序列化,作为Map输出的Value部分。这里判断了inputRow instanceof SegmentInputRow,即当前inputRow是原始数据还是其他Segment中已经预聚合好的数据,据此会在序列化时使用不同的aggregator。因为如果是Segment中预聚合好的数据,对应的aggregator需要做一些变化。例如指定某一指标需要Count聚合,对于原始数据就是用Count聚合就好,而对于预聚合好的Segment,就要使用LongSum聚合。

最后一步就是是输出Key-Value对,其中key是固定160字节的BytesWritable(包括一个四字节的SortableBytes长度和156字节的SortableBytes),Value就是InputRow序列化后的BytesWritable。SortableBytes的格式如下图二所示,其中groupKey标识了归属的Bucket,其作用是在Reduce阶段将属于同一个Bucket的数据放入同一次reduce函数的调用中,从而保存在同一个Segment文件里。sortKey中hashedDimensions是根据当前行截取后的时间戳以及所有维度的取值计算出的哈希值,sortKey的作用是将所有维度值相同的行排序时排在一起,可以减少Combine阶段和Reduce阶段的spill。

就这个Map输出的Key来讲,我认为还是有优化的空间:1. 图中各个字段均是定长的,最前面4个字节的groupKeySize可以省去;2. sortKey中不需要再写入truncatedTimestamp,时间戳在hashedDimensions中已有体现。

图二 Map Output SortableBytes

Combiner: IndexGeneratorCombiner

MapReduce中Combiner的作用是预聚合单节点Key相同的数据,减少Shuffle过程的数据传输量。由于任务设置了setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class),在Combine过程中会使用BytesWritable.Comparator作为比较运算符,对Map输出的Key-Value对进行分组预聚合。进入BytesWritable.Comparator,可以看到这个Comparator比较跳过了前4个字符,比较了Map阶段输出的SortableBytes。因此Combine阶段只有所有维度取值都相同的行才会被聚合在一起,输入到reduce函数中。

job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class)

public static class Comparator extends WritableComparator {
    public Comparator() { super(BytesWritable.class);}

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1 + 4, l1 - 4, b2, s2 + 4, l2 - 4);
    }
}

Combine阶段预聚合逻辑如下。首先,如果只有一行数据,则直接将该行数据输出;如果有多条数据,则需要将其预聚合起来,这也是Druid rollup的核心过程,将所有维度值(包括TruncatedTimestamp)相同的数据压缩成一行,减少数据存储量。

数据的聚合过程就是创建一个index,并不断把数据行加入index中,如果index满了(index.canAppendRow()里检查了Segment行数的配置和Segment大小的配置),就flush当前index,并打开一个新的index。具体index是如何计算的本文不去细究,后续会更新另外的文章解析索引构建的方法。

@Override
protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, final Context context)
    throws IOException, InterruptedException
{
    Iterator<BytesWritable> iter = values.iterator();
    BytesWritable first = iter.next();

    if (iter.hasNext()) {
        LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
        SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
        Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
        IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null);
        index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators));

        while (iter.hasNext()) {
            context.progress();
            InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators);

            if (!index.canAppendRow()) {
                dimOrder.addAll(index.getDimensionOrder());
                log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
                flushIndexToContextAndClose(key, index, context);
                index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities());
            }
            index.add(value);
        }

        flushIndexToContextAndClose(key, index, context);
    } else {
        context.write(key, first);
    }
}

Partitioner: IndexGeneratorPartitioner

Map或Combine阶段输出的Key-Value对会使用指定的Partitioner进行分区,之后Reducer会从每个Map或Combine的结果中读取属于自己的分区数据,完成Shuffle的过程。

int numReducers = Iterables.size(config.getAllBuckets().get());
if (numReducers == 0) {
    throw new RuntimeException("No buckets?? seems there is no data to index.");
}
job.setNumReduceTasks(numReducers);
//======================================================================================
@Override
public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions)
{
    final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
    bytes.position(4); // Skip length added by SortableBytes
    int shardNum = bytes.getInt();
    if ("local".equals(JobHelper.getJobTrackerAddress(config))) {
        return shardNum % numPartitions;
    } else {
        if (shardNum >= numPartitions) {
            throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
        }
        return shardNum;
    }
}

Reduce Task的个数取决于两部分配置。1. granularitySpec中时间段的划分:假设granularitySpec.intervals设置了一个一天的时间区间,granularitySpec.segmentGranularity.period配置了"PT1H",即以小时为粒度划分Segment,这样就会划分出24个Segment,相应的就有24个Reduce Task;2. tuningConfig.partitionsSpec中numShards配置:假设这个numShards配置的值为2,那么上面的每个时间区间又会分为2个分片,这样就有24*2=48个Reduce Task。

"granularitySpec": {
    "type": "uniform",
    "segmentGranularity": {
         "type": "period",
         "period": "PT1H",
         "origin": null
    },
    "rollup": true,
    "intervals": [
        "2020-04-15T16:00:00.000Z/2020-04-16T16:00:00.000Z"
    ]
}

"tuningConfig": {
    "type": "hadoop",
    "partitionsSpec": {
        "type": "hashed",
        "numShards": 2
    }
}

例如对于如上所示配置,在任务执行时会产生48个Reduce Task,其对应的partitionNum和shardNum如下图三所示。可以看到生成任务的shardNum的取值为0-47,而getPartition函数中传入的numPartitions为48(Reduce Task个数),因此可以直接用shardNum作为getPartition返回的结果。这里可以看到,shardNum实际上唯一确定了Bucket,因此相同Bucket中的数据会进入同一个Reduce Task中,最终会存储在同一个Segment中。

图三 partitionNum和shardNum示例

Reducer: IndexGeneratorReducer

Reducer最后会将属于自己的分区数据收集到一起,调用reduce函数进一步将预聚合过的数据合并为Segment文件,这里合并的逻辑其实和Combine阶段非常类似。注意SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class)中的job.setGroupingComparatorClass(SortableBytesGroupingComparator.class),在Reduce阶段,会根据SortableBytesGroupingComparator将数据分组,调用reduce函数,SortableBytesGroupingComparator只比较了图二中的groupKey部分,即同一个Bucket的数据会被放在一起计算。因此Reduce阶段的reduce函数对于每个Task只会执行一次,生成一个Segment(包括一个descriptor.json文件和一个index.zip文件),并写入指定的HDFS路径下。

public static class SortableBytesGroupingComparator extends WritableComparator
{
    protected SortableBytesGroupingComparator() { super(BytesWritable.class); }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
    {
        int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
        int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();

        final int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
        return retVal;
    }
}

三、执行大图

图四是上述的MapReduce过程中数据转化的示例:假设我们的输入有2个HDFS文件,对应2个Map Task,时间区间只划分为两部分,对应的truncatedTimestamp分别为T1和T2,numShards为2,这样会生成4个Segment。dx和dy表示2个维度,dxi和dyi表示具体维度的不同取值,m表示一个指标,mi表示指标的具体取值,m的聚合方式用agg表示。Map阶段会将HDFS文件读取为行数据,Combine阶段会对同一个Map任务的输出将时间和维度值都相同的行预聚合好。Combine后的行数据会根据行所对应的的分区分发到响应的Reduce任务,并进一步聚合生成Segment文件。

图四 Druid Hadoop-based数据摄入MapReduce执行示例

四、总结

通过学习Druid Hadoop-based数据摄入的流程,把“古老”的MapReduce过程又学习了一遍,学习的过程中参考了[4]中的一张MapReduce工作原理图,推荐想要学习MapReduce的同学都去看下;对任务Spec中各个配置字段的含义也有了更深入的了解,这里也给出一些参数设置建议:

  1. 建议使用targetRowsPerSegment而不是numShards。在partitionsSpec中targetRowsPerSegment和numShards配置是互斥的,使用targetRowsPerSegment配置可以更合理的控制每个Segment的大小,既不会出现超大Segment,也不会出现很多小Segment,利于historical节点进行加载和缓存。
  2. 如果使用numShards,建议同时配置partitionDimensions。partitionDimensions可以配置如账号ID之类的信息,这样同一个账号的数据会保存在同一个Segment中,查询时可以减少读取的Segment数目,提升查询性能。
  3. tuningConfig.useCombiner置为true。这个值默认是false,一般来讲,对于druid数据摄入的场景,预聚合可以很大程度上减少Shuffle过程中的数据传输量,减少作业运行时间。

参考文献

[1] https://druid.apache.org/docs/0.17.1/ingestion/index.html

[2] Hadoop-based ingestion,https://druid.apache.org/docs/latest/ingestion/hadoop.html

[3] Data formats,https://druid.apache.org/docs/latest/ingestion/data-formats.html

[4] mapreduce二次排序详解,https://www.lagou.com/lgeduarticle/8090.html

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
Druid源码阅读(一):Druid Hadoop-based ingestion实现
Apache Druid是一款开源时序OLAP数据库,支持流数据摄入和批数据摄入两种数据写入方式,其中批数据摄入又包括Native batch和Hadoop-b...
<<上一篇
下一篇>>