Druid源码阅读(一):Druid Hadoop-based ingestion实现
一、Druid Hadoop-based ingestion简介
Apache Druid是一款开源时序OLAP数据库,支持流数据摄入和批数据摄入两种数据写入方式,其中批数据摄入又包括Native batch和Hadoop-based两种方式。根据官方文档[1],Druid推荐使用Native batch方式摄入数据,因为这种方式更灵活,且没有对外部Hadoop集群的依赖。但Hadoop-based数据摄入也有其优势: 1. 生成Segment的离线计算过程可以使用Hadoop集群的计算资源,减少了druid集群的计算压力,做到计算和存储分离;2. 与大数据处理生态对接更方便,前期的数据预处理可以使用Spark、Flink等计算引擎,将处理结果放在某个HDFS目录下即可。
Hadoop-based数据摄入一般是向Druid Overload节点提交一个Json文件,里面会定义数据源、写入的datasource、数据格式等信息,具体语法可参考Druid官方文档[2]。
本文的目的就是对照Druid源码,解析Druid如何通过MapReduce任务完成索引计算并生成Segment文件存储。本文会聚焦于MapReduce任务的执行,略过Druid数据聚合和生成索引的逻辑。数据聚合和生成索引的逻辑相对比较复杂且独立,打算后续在另外的文章中详细描述。
二、MapReduce任务
Druid的Hadoop数据摄入任务实现在indexing-hadoop子工程中,核心代码是IndexGeneratorJob.java这个文件。接下来就深入这个文件,看看Druid如何将HDFS文件中的数据通过MapReduce任务转化为Segment存储下来。
任务构建
现在Spark、Flink框架比较流行,可能很多人已经不认识原始的MapReduce任务代码了。下面的代码就完成了构建一个MapReduce任务并提交给Hadoop集群。可以看到任务使用IndexGeneratorMapper类作为Mapper、使用IndexGeneratorPartitioner作为Partitioner、使用IndexGeneratorCombiner作为Combiner、使用IndexGeneratorReducer作为Reducer,并设置输入输出路径、任务配置参数,最后job.submit()提交任务。
这里job.getConfiguration().set("io.sort.record.percent","0.23")是配置环形缓冲区中用多大比例来保存数据索引,默认值是0.05。
job.getConfiguration().set("io.sort.record.percent", "0.23");
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);
SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class);
if (config.getSchema().getTuningConfig().getUseCombiner()) {
job.setCombinerClass(IndexGeneratorCombiner.class);
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
}
// setReducerClass(job);
job.setReducerClass(IndexGeneratorReducer.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());
config.addInputPaths(job);
config.intoConfiguration(job);
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
job
);
job.submit();
下面就具体说明Mapper、Combiner、Reducer的逻辑。
Mapper: IndexGeneratorMapper
IndexGeneratorMapper继承自HadoopDruidIndexerMapper<BytesWritable, BytesWritable>,进入HadoopDruidIndexerMapper,我们看到了熟悉的map函数。
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
{
try {
final List<InputRow> inputRows = parseInputRow(value, parser);
for (InputRow inputRow : inputRows) {
try {
if (inputRow == null) {
// Throw away null rows from the parser.
log.debug("Throwing away row [%s]", value);
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
continue;
}
if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
final String errorMsg = StringUtils.format(
"Encountered row with timestamp that cannot be represented as a long: [%s]",
inputRow
);
throw new ParseException(errorMsg);
}
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, context);
} else {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
}
}
catch (ParseException pe) {
handleParseException(pe, context);
}
}
}
catch (ParseException pe) {
handleParseException(pe, context);
}
catch (RuntimeException e) {
throw new RE(e, "Failure on row[%s]", value);
}
}
首先parseInputRow,将文本格式输入中的每一行转换为InputRow,这里的parser实例是HadoopyStringInputRowParser,inputRows的实例是MapBasedInputRow。具体调用的ParserSpec会根据提交Json中的spec.dataSchema.parser来实例化,Druid官方文档[3]中说明的数据格式在图一中都能找到对应ParserSpec实现。
然后对每一行数据做一个过滤,过滤掉空行、没有时间戳的行以及不在任务指定时间范围内的行。这里的判断逻辑是基于提交Json中的spec.dataSchema.granularitySpec.intervals字段,若该字段不存在,则任意时间的数据都可以摄入;若指定了该字段,则需要检查当前行的时间戳是否在需要摄入的时间范围内,以决定是否丢弃该行数据。如果通过了这些条件的校验,函数最终会调用到innerMap函数,否则会对丢弃的行计数,用于日志或监控。
下面来看IndexGeneratorMapper.innerMap函数。
protected void innerMap(
InputRow inputRow,
Context context
) throws IOException, InterruptedException
{
// Group by bucket, sort by timestamp
final Optional<Bucket> bucket = getConfig().getBucket(inputRow);
if (!bucket.isPresent()) {
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}
final long truncatedTimestamp = granularitySpec.getQueryGranularity()
.bucketStart(inputRow.getTimestamp())
.getMillis();
// type SegmentInputRow serves as a marker that these InputRow instances have already been combined
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
InputRowSerde.SerializeResult serializeResult = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(
typeHelperMap,
inputRow,
aggsForSerializingSegmentInputRow
)
:
InputRowSerde.toBytes(
typeHelperMap,
inputRow,
aggregators
);
final byte[] hashedDimensions = HASH_FUNCTION.hashBytes(
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(
Rows.toGroupKey(
truncatedTimestamp,
inputRow
)
)
).asBytes();
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
// sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side
ByteBuffer.allocate(Long.BYTES + hashedDimensions.length)
.putLong(truncatedTimestamp)
.put(hashedDimensions)
.array()
).toBytesWritable(),
new BytesWritable(serializeResult.getSerializedRow())
);
ParseException pe = IncrementalIndex.getCombinedParseException(
inputRow,
serializeResult.getParseExceptionMessages(),
null
);
if (pe != null) {
throw pe;
} else {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1);
}
}
这里第一步是获取该行所在的Bucket,代码中的Bucket的概念与Segment的概念是对应的,即在运算阶段属于同一个Bucket的数据,最终会被写入同一个Segment文件中。Bucket由shardNum、truncatedTimestamp和partitionNum唯一确定(理论上通过shardNum就可以唯一确定,后面会看到shardNum、truncatedTimestamp和partitionNum之间的关系)。因为在进入该函数前已经按时间区间做了过滤,所以这里获取Bucket理论上一定会成功。然后需要对行的时间戳做截取,比如queryGranularity设置为小时,这里就会将时间戳截取至小时粒度,剩余的精度全部补0。时间戳截取的目的是为了方便rollup,即将属于同一个时间区间的数据预聚合起来,这样虽然丢失了部分原始信息,但可以很大程度减少存储的数据量,并提升查询效率。
下一步将InputRow序列化,作为Map输出的Value部分。这里判断了inputRow instanceof SegmentInputRow,即当前inputRow是原始数据还是其他Segment中已经预聚合好的数据,据此会在序列化时使用不同的aggregator。因为如果是Segment中预聚合好的数据,对应的aggregator需要做一些变化。例如指定某一指标需要Count聚合,对于原始数据就是用Count聚合就好,而对于预聚合好的Segment,就要使用LongSum聚合。
最后一步就是是输出Key-Value对,其中key是固定160字节的BytesWritable(包括一个四字节的SortableBytes长度和156字节的SortableBytes),Value就是InputRow序列化后的BytesWritable。SortableBytes的格式如下图二所示,其中groupKey标识了归属的Bucket,其作用是在Reduce阶段将属于同一个Bucket的数据放入同一次reduce函数的调用中,从而保存在同一个Segment文件里。sortKey中hashedDimensions是根据当前行截取后的时间戳以及所有维度的取值计算出的哈希值,sortKey的作用是将所有维度值相同的行排序时排在一起,可以减少Combine阶段和Reduce阶段的spill。
就这个Map输出的Key来讲,我认为还是有优化的空间:1. 图中各个字段均是定长的,最前面4个字节的groupKeySize可以省去;2. sortKey中不需要再写入truncatedTimestamp,时间戳在hashedDimensions中已有体现。
Combiner: IndexGeneratorCombiner
MapReduce中Combiner的作用是预聚合单节点Key相同的数据,减少Shuffle过程的数据传输量。由于任务设置了setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class),在Combine过程中会使用BytesWritable.Comparator作为比较运算符,对Map输出的Key-Value对进行分组预聚合。进入BytesWritable.Comparator,可以看到这个Comparator比较跳过了前4个字符,比较了Map阶段输出的SortableBytes。因此Combine阶段只有所有维度取值都相同的行才会被聚合在一起,输入到reduce函数中。
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class)
public static class Comparator extends WritableComparator {
public Comparator() { super(BytesWritable.class);}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return compareBytes(b1, s1 + 4, l1 - 4, b2, s2 + 4, l2 - 4);
}
}
Combine阶段预聚合逻辑如下。首先,如果只有一行数据,则直接将该行数据输出;如果有多条数据,则需要将其预聚合起来,这也是Druid rollup的核心过程,将所有维度值(包括TruncatedTimestamp)相同的数据压缩成一行,减少数据存储量。
数据的聚合过程就是创建一个index,并不断把数据行加入index中,如果index满了(index.canAppendRow()里检查了Segment行数的配置和Segment大小的配置),就flush当前index,并打开一个新的index。具体index是如何计算的本文不去细究,后续会更新另外的文章解析索引构建的方法。
@Override
protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, final Context context)
throws IOException, InterruptedException
{
Iterator<BytesWritable> iter = values.iterator();
BytesWritable first = iter.next();
if (iter.hasNext()) {
LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null);
index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators));
while (iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators);
if (!index.canAppendRow()) {
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities());
}
index.add(value);
}
flushIndexToContextAndClose(key, index, context);
} else {
context.write(key, first);
}
}
Partitioner: IndexGeneratorPartitioner
Map或Combine阶段输出的Key-Value对会使用指定的Partitioner进行分区,之后Reducer会从每个Map或Combine的结果中读取属于自己的分区数据,完成Shuffle的过程。
int numReducers = Iterables.size(config.getAllBuckets().get());
if (numReducers == 0) {
throw new RuntimeException("No buckets?? seems there is no data to index.");
}
job.setNumReduceTasks(numReducers);
//======================================================================================
@Override
public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions)
{
final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
bytes.position(4); // Skip length added by SortableBytes
int shardNum = bytes.getInt();
if ("local".equals(JobHelper.getJobTrackerAddress(config))) {
return shardNum % numPartitions;
} else {
if (shardNum >= numPartitions) {
throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
}
return shardNum;
}
}
Reduce Task的个数取决于两部分配置。1. granularitySpec中时间段的划分:假设granularitySpec.intervals设置了一个一天的时间区间,granularitySpec.segmentGranularity.period配置了"PT1H",即以小时为粒度划分Segment,这样就会划分出24个Segment,相应的就有24个Reduce Task;2. tuningConfig.partitionsSpec中numShards配置:假设这个numShards配置的值为2,那么上面的每个时间区间又会分为2个分片,这样就有24*2=48个Reduce Task。
"granularitySpec": {
"type": "uniform",
"segmentGranularity": {
"type": "period",
"period": "PT1H",
"origin": null
},
"rollup": true,
"intervals": [
"2020-04-15T16:00:00.000Z/2020-04-16T16:00:00.000Z"
]
}
"tuningConfig": {
"type": "hadoop",
"partitionsSpec": {
"type": "hashed",
"numShards": 2
}
}
例如对于如上所示配置,在任务执行时会产生48个Reduce Task,其对应的partitionNum和shardNum如下图三所示。可以看到生成任务的shardNum的取值为0-47,而getPartition函数中传入的numPartitions为48(Reduce Task个数),因此可以直接用shardNum作为getPartition返回的结果。这里可以看到,shardNum实际上唯一确定了Bucket,因此相同Bucket中的数据会进入同一个Reduce Task中,最终会存储在同一个Segment中。
Reducer: IndexGeneratorReducer
Reducer最后会将属于自己的分区数据收集到一起,调用reduce函数进一步将预聚合过的数据合并为Segment文件,这里合并的逻辑其实和Combine阶段非常类似。注意SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class)中的job.setGroupingComparatorClass(SortableBytesGroupingComparator.class),在Reduce阶段,会根据SortableBytesGroupingComparator将数据分组,调用reduce函数,SortableBytesGroupingComparator只比较了图二中的groupKey部分,即同一个Bucket的数据会被放在一起计算。因此Reduce阶段的reduce函数对于每个Task只会执行一次,生成一个Segment(包括一个descriptor.json文件和一个index.zip文件),并写入指定的HDFS路径下。
public static class SortableBytesGroupingComparator extends WritableComparator
{
protected SortableBytesGroupingComparator() { super(BytesWritable.class); }
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();
final int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
return retVal;
}
}
三、执行大图
图四是上述的MapReduce过程中数据转化的示例:假设我们的输入有2个HDFS文件,对应2个Map Task,时间区间只划分为两部分,对应的truncatedTimestamp分别为T1和T2,numShards为2,这样会生成4个Segment。dx和dy表示2个维度,dxi和dyi表示具体维度的不同取值,m表示一个指标,mi表示指标的具体取值,m的聚合方式用agg表示。Map阶段会将HDFS文件读取为行数据,Combine阶段会对同一个Map任务的输出将时间和维度值都相同的行预聚合好。Combine后的行数据会根据行所对应的的分区分发到响应的Reduce任务,并进一步聚合生成Segment文件。
四、总结
通过学习Druid Hadoop-based数据摄入的流程,把“古老”的MapReduce过程又学习了一遍,学习的过程中参考了[4]中的一张MapReduce工作原理图,推荐想要学习MapReduce的同学都去看下;对任务Spec中各个配置字段的含义也有了更深入的了解,这里也给出一些参数设置建议:
- 建议使用targetRowsPerSegment而不是numShards。在partitionsSpec中targetRowsPerSegment和numShards配置是互斥的,使用targetRowsPerSegment配置可以更合理的控制每个Segment的大小,既不会出现超大Segment,也不会出现很多小Segment,利于historical节点进行加载和缓存。
- 如果使用numShards,建议同时配置partitionDimensions。partitionDimensions可以配置如账号ID之类的信息,这样同一个账号的数据会保存在同一个Segment中,查询时可以减少读取的Segment数目,提升查询性能。
- tuningConfig.useCombiner置为true。这个值默认是false,一般来讲,对于druid数据摄入的场景,预聚合可以很大程度上减少Shuffle过程中的数据传输量,减少作业运行时间。
参考文献
[1] https://druid.apache.org/docs/0.17.1/ingestion/index.html
[2] Hadoop-based ingestion,https://druid.apache.org/docs/latest/ingestion/hadoop.html
[3] Data formats,https://druid.apache.org/docs/latest/ingestion/data-formats.html
[4] mapreduce二次排序详解,https://www.lagou.com/lgeduarticle/8090.html