MapReduce推测机制及自定义推测机制

作者:wyattliu  腾讯TEG工程师

|导语 集群中的机器机型、负载并不一定完全相同,所以即使一个MapReduce任务每一个Map/Reduce task要处理的数据量完全相同,其运行时间也可能会相差很大。推测机制就是为了缓解这种问题而设计的。但是针对不同的任务,默认的推测机制并不总能产生预期的效果。本文主要分析了MapReduce在演进过程中使用到的推测机制的利弊,以及介绍如何根据需求定制推测机制。

01

MapReduce推测机制

这篇文章 Hadoop的任务推测执行算法 @Hazza Cheng 对各个版本的hadoop 推测机制描述的非常清晰,这里摘取文章中对2.0版本推测机制的描述。说明一下。

2.0版本至今

Apache MapReduce 2.0(简称MRv2也称为YARN,即Yet Another Resource Negotiator)采用了不同于以上两种算法的推测执行机制,它重点关注新启动的备份任务是否有潜力比当前正在运行的任务完成得更早。如果通过一定的算法推测某一时刻启动备份任务,该备份任务肯定会比当前任务完成得晚,那么启动该备份任务只会浪费更多的资源。然而,从另一个 角度看,如果推测备份任务比当前任务完成得早,则启动备份任务会加快数据处理,且备份任务完成得越早,启动备份任务的价值越大。 

假设某一时刻,任务T的执行进度为progress,则可通过一定的算法推测出该任务的最终完成时刻estimatedEndTime。公式:

estimatedRunTime = (currentTimestamp − taskStartTime) / progress

estimatedEndTime = estimatedRunTime + taskStartTime

另一方面,如果此刻为该任务启动一个备份任务,则可推断出它可能的完成的时刻estimatedRunTime′ 和 estimatedRunTime′:

estimatedRunTime′ = averageRunTime

estimatedEndTime′ = currentTimestamp + estimatedRunTime′

在此版本的代码中核心就是如何预估 estimatedRunTime 和 estimatedRunTime′ ?

上面的公式中是YARN中已经给出的默认的实现方法。其一般能够适配绝大多数的MapReduce的任务推测。但是对于一些特殊的场景,该方法可能会失效:

  • 常见于滚动更新/聚合的数据处理任务,已经自定义好了分片规则,前一天的输出作为后一天的输入,此时在输出的分片数没有变化的情况下(旧数据一般呢不会进行shuffle),map端在merge阶段很可能会有一个分片非常大(注意并没有发生倾斜,因为所有的task都需要处理这样的数据),如果此时部分节点负载过高(CPU,MEM,IO),很可能导致运行在该节点的任务阻塞,此时进程可能已经进行了99%以上,在很长一段时间计算出来的 estimatedRunTime < estimatedRunTime′,并不会产生新的推测任务。
  • 如果在cleanup中自定义了一些额外的处理逻辑(耗时比较长),比如优化输出结果或者引用一些额外的组件对输出的结果进行再加工(如构建数据索引)。那么此时任务的进度已经是100%,但是实际任务还需要在节点上计算一段时间,此时如果节点负载过高,任务运行过慢的情况YARN的Speculator进程是无法感知的;

Yarn推测机制源代码简述

推测机制进程是一个独立的进程,其定义是在:

org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator

一些核心的MR配置如下:

推测机制开关

  • mapreduce.map.speculative : map阶段是否启动推测的开关,默认为 true
  • mapreduce.reduce.speculative : reduce阶段是否启动推测的开关,默认为 true

同时允许进行推测的最大任务数(下面三个配置取最大值)

  • mapreduce.job.speculative.speculative-cap-running-tasks :推测任务占当前正在运行的任务数的比例,默认为 0.1
  • mapreduce.job.speculative.speculative-cap-total-tasks :推测任务占全部要处理任务数的比例,默认为 0.01
  • mapreduce.job.speculative.minimum-allowed-tasks :最少允许同时运行的推测任务数量,默认为10 

推测的频率(每次推测最多只有一个推测任务会被下发,频率决定了单位时间内最多允许多少个推测任务下发)

  • mapreduce.job.speculative.retry-after-no-speculate :本次推测没有任务下发,执行下一次推测任务的等待时间,默认 1000(ms)
  • mapreduce.job.speculative.retry-after-speculate:本次推测有任务下发,执行下一次推测任务的等待时间,默认 15000(ms)

定义推测机制算法:

  • yarn.app.mapreduce.am.job.speculator.class:定义了推测机制核心算法的类,2.0版本需要实现 org.apache.hadoop.mapreduce.v2.app.speculate.Speculator 接口,默认使用的org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
  • yarn.app.mapreduce.am.job.task.estimator.class :与DefaultSpeculator配套,其中定义了预估 estimatedRunTime 和 estimatedRunTime′ 的方法,需要实现 org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator接口,默认使用的是 org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;

基本无用的配置

  • mapreduce.job.speculative.slowtaskthreshold :旧版本推测机制依赖的配置,当前版本基本上已经无用了,但是并没有废弃,默认 1.0

如何判断当前推测是否需要启动推测任务?

1. 一次推测会针对Map 和 Reduce 任务分别独立进行;

2. 对于一个job的每一个task,分别使用speculationValue计算新启动推测任务的收益,并选择最大收益的task作为候选;

3. 确认当前已经推测任务数量是否超过最大允许推测的阈值,如果超过了那么本次推测不启动推测任务,否则启动候选task的推测执行任务;

 private int computeSpeculations() {

    // We'll try to issue one map and one reduce speculation per job per run

    return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();

  }

  private int maybeScheduleASpeculation(TaskType type) {

    int successes = 0;

    long now = clock.getTime();

    ConcurrentMap<JobId, AtomicInteger> containerNeeds

        = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;

    for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {

      // This race conditon is okay.  If we skip a speculation attempt we

      //  should have tried because the event that lowers the number of

      //  containers needed to zero hasn't come through, it will next time.

      // Also, if we miss the fact that the number of containers needed was

      //  zero but increased due to a failure it's not too bad to launch one

      //  container prematurely.

      if (jobEntry.getValue().get() > 0) {

        continue;

      }

      int numberSpeculationsAlready = 0;

      int numberRunningTasks = 0;

      // loop through the tasks of the kind

      Job job = context.getJob(jobEntry.getKey());

      Map<TaskId, Task> tasks = job.getTasks(type);

      int numberAllowedSpeculativeTasks

          = (int) Math.max(minimumAllowedSpeculativeTasks,

              proportionTotalTasksSpeculatable * tasks.size());

      TaskId bestTaskID = null;

      long bestSpeculationValue = -1L;

      // this loop is potentially pricey.

      // TODO track the tasks that are potentially worth looking at

      for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {

        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);

        if (mySpeculationValue == ALREADY_SPECULATING) {

          ++numberSpeculationsAlready;

        }

        if (mySpeculationValue != NOT_RUNNING) {

          ++numberRunningTasks;

        }

        if (mySpeculationValue > bestSpeculationValue) {

          bestTaskID = taskEntry.getKey();

          bestSpeculationValue = mySpeculationValue;

        }

      }

      numberAllowedSpeculativeTasks

          = (int) Math.max(numberAllowedSpeculativeTasks,

              proportionRunningTasksSpeculatable * numberRunningTasks);

      // If we found a speculation target, fire it off

      if (bestTaskID != null

          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {

        addSpeculativeAttempt(bestTaskID);

        ++successes;

      }

    }

    return successes;

  }

代码中是怎么对各个task评估启动推测任务的收益的呢? 

各个task启动推测任务的收益的计算被定义在 speculationValue 函数当中,其具体执行逻辑与第一节的公式是一致的,不过加上了一些边界情况的处理,返回的结果有以下几种情况:

NOT_RUNNING -> 任务并没有任何attempt下发
ON_SCHEDULE -> 已经有attempt下发,任务处于PENDING状态,还未执行
TOO_NEW -> 任务的attempt在本轮推测开始之后,刚刚开始执行
ALREADY_SPECULATING -> 已经有推测的attempt在执行当中了
PROGRESS_IS_GOOD -> 预期结束时间比当本轮推测开始时间要早,任务可能已经完成
TOO_LATE_TO_SPECULATE -> 启动推测没有用,此时 estimatedEndTime < estimatedReplacementEndTime

estimatedEndTime - estimatedReplacementEndTime -> 非以上情况的其他情况,只有此种情况的task才可能在本轮推测中启动执行推测任务

可以看到这里有一个比较难受的问题,当任务走到TOO_LATE_TO_SPECULATE时,永远不会启动推测,此时就可能会忽略掉由于机器异常导致的任务阻塞,除非手动kill掉,否则的话要等很久很久任务才可能会启动执行别的attempt 或者 等待机器恢复。

private long speculationValue(TaskId taskID, long now) {

    Job job = context.getJob(taskID.getJobId());

    Task task = job.getTask(taskID);

    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();

    long acceptableRuntime = Long.MIN_VALUE;

    long result = Long.MIN_VALUE;

    if (!mayHaveSpeculated.contains(taskID)) {

      acceptableRuntime = estimator.thresholdRuntime(taskID);

      if (acceptableRuntime == Long.MAX_VALUE) {

        return ON_SCHEDULE; // 任务还未启动,不需要进行推测

      }

    }

    TaskAttemptId runningTaskAttemptID = null;

    int numberRunningAttempts = 0;

    for (TaskAttempt taskAttempt : attempts.values()) {

      if (taskAttempt.getState() == TaskAttemptState.RUNNING

          || taskAttempt.getState() == TaskAttemptState.STARTING) {

        if (++numberRunningAttempts > 1) {

          return ALREADY_SPECULATING; // 已经启动了推测任务,不需要进行推测

        }

        runningTaskAttemptID = taskAttempt.getID();

        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);

        long taskAttemptStartTime

            = estimator.attemptEnrolledTime(runningTaskAttemptID);

        if (taskAttemptStartTime > now) {

          // This background process ran before we could process the task

          //  attempt status change that chronicles the attempt start

          return TOO_NEW; // 任务刚刚启动,不需要进行推测

        }

        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;

        long estimatedReplacementEndTime

            = now + estimator.estimatedNewAttemptRuntime(taskID);

        float progress = taskAttempt.getProgress();

        TaskAttemptHistoryStatistics data =

            runningTaskAttemptStatistics.get(runningTaskAttemptID);

        if (data == null) {

          runningTaskAttemptStatistics.put(runningTaskAttemptID,

            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));

        } else {

          if (estimatedRunTime == data.getEstimatedRunTime()

              && progress == data.getProgress()) {

            // Previous stats are same as same stats

            if (data.notHeartbeatedInAWhile(now)) {

              // Stats have stagnated for a while, simulate heart-beat.

              TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();

              taskAttemptStatus.id = runningTaskAttemptID;

              taskAttemptStatus.progress = progress;

              taskAttemptStatus.taskState = taskAttempt.getState();

              // Now simulate the heart-beat

              handleAttempt(taskAttemptStatus);

            }

          } else {

            // Stats have changed - update our data structure

            data.setEstimatedRunTime(estimatedRunTime);

            data.setProgress(progress);

            data.resetHeartBeatTime(now);

          }

        }

        if (estimatedEndTime < now) {

          return PROGRESS_IS_GOOD; // 任务预期已经完成,不需要进行推测

        }

        if (estimatedReplacementEndTime >= estimatedEndTime) {

          return TOO_LATE_TO_SPECULATE; // 启动推测任务收效很低,不需要进行推测

        }

        result = estimatedEndTime - estimatedReplacementEndTime; 

      }

    }

    // If we are here, there's at most one task attempt.

    if (numberRunningAttempts == 0) {

      return NOT_RUNNING;

    }

    if (acceptableRuntime == Long.MIN_VALUE) {

      acceptableRuntime = estimator.thresholdRuntime(taskID);

      if (acceptableRuntime == Long.MAX_VALUE) {

        return ON_SCHEDULE;

      }

    }

    return result;

  }

任务耗时预估 TaskRuntimeEstimator

上面代码中使用到了一个

org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator的实例进行 estimatedRunTime 和 estimatedRunTime′ 的预估,分别对应函数:

estimatedRunTime  -> TaskRuntimeEstimator.estimatedRuntime(TaskAttemptId id)

estimatedRunTime′ -> TaskRuntimeEstimator.estimatedNewAttemptRuntime(TaskId id)

此外还有一些其他的API:

TaskRuntimeEstimator.contextualize(Configuration conf, AppContext context)

定义了estimator如何进行初始化,可以在这里进行一些配置加载的操作,可能还需要初始化保存Attempt执行信息的一些数据结构;

TaskRuntimeEstimator.updateAttempt(TaskAttemptStatus reportedStatus, long timestamp)

用来更新指定attempt的处理状态,reportedStatus中比较关键的两个元素(attemptId, progress),timestamp与progress相对应。

目前Yarn已经提供了一些实现:

LegacyTaskRuntimeEstimator 和 ExponentiallySmoothedTaskRuntimeEstimator,继承了同一个父类,共用相同的:

TaskRuntimeEstimator.estimatedNewAttemptRuntime(TaskId id)

其他部分不相同。

LegacyTaskRuntimeEstimator 对于旧任务的耗时预估简单粗暴 : 

estimatedRunTime = (currentTimestamp − taskStartTime) / progress

ExponentiallySmoothedTaskRuntimeEstimator 使用了指数平滑对耗时进行平滑,具体可以参考代码实现。

这两种方式对于我们在进行大规模数据(150T)处理时遇到上面的两个的问题都无能为力,所以针对这种情况可以自己设计Speculator算法或Estimator来适配自己的处理任务

02

定义自己的推测机制

mapreduce允许我们使用自己定义的Speculator实现 或  TaskRuntimeEstimator实现来控制任务的推测执行流程。

下面针对我们遇到的问题为例,定义一些新的TaskRuntimeEstimator实现(不一定适用于全部情况,最好针对自己遇到的问题设计一个适配自己任务的子类)

要处理的任务描述:

  1. 要处理的数据量很大,每天要处理150TB的数据;
  2. 任务优先级很高,允许冗余计算来确保任务每天能够按时完成(可以说是不计成本),有一个独立的应用组来处理这些数据,很少有任务和这个任务进行资源竞争;

目标:

  1. 不管当前任务执行到什么阶段,如果超过指定的时间任务还没有完成,那么就启动一个推测任务(即跳过TOO_LATE_TO_SPECULATE)对于任务的限制;
  2. 任务的处理速率使用指数衰减的方式进行控制,确保当前时间对于任务整体速度的影响越来越大;

下面是我这里定义的一种实现,其思路是:

  1. 添加一种配置表示Speculator对于task多久还没有启动推测执行任务的话会被强制启动推测任务;
  2. 添加一个配置ratio,表示旧速率在新的任务执行速率上的占比,对任务执行速率进行指数衰减:

smoothedNewSpeed = ratio * oldSpeed + (1 - ratio) * newSpeed

estimatedRunTime = (1 - progress) / smoothedNewSpeed

核心代码实现:

public class ThresholdExponentialSmoothedTaskRuntimeEstimator extends StartEndTimesBase {

  // 使用一个非常大的数字表示超过阈值之后所需运行的时间;

  public static final long MAX_RUNTIME_VALUE = 0x1fffffffffffffffL;

  /** 当任务执行多久之后,无论如何都会执行推断任务,避免由于机器问题导致的进度缓慢 */

  private long threshold;

  /** 旧speed的衰减参数:上一个时刻的平滑速率在最终平滑速率的占比 ratio ∈ (0, 1) */

  private double ratio;

  ...

  private class EstimateVector {

    final float basedOnProgress;

    final long atTime;

    final double speed;

    ...

    EstimateVector incorporate(float newProgress, long newAtTime) {

      if (newAtTime <= atTime || newProgress < basedOnProgress) {

        return this;

      }

      double newSpeed = (newProgress - basedOnProgress) / (newAtTime - atTime);

      return new EstimateVector(newProgress, newAtTime, (1 - ratio) * newSpeed + ratio * speed);

    }

  }

  ...

  @Override

  public void contextualize(Configuration conf, AppContext context) {

    super.contextualize(conf, context);

    threshold = conf.getLong(MRSpeculateConfig.MR_AM_TASK_ESTIMATOR_MAX_PROCESS_TIME_MS,

            MRSpeculateConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_MAX_PROCESS_TIME_MS);

    ratio = conf.getDouble(MRSpeculateConfig.MR_AM_TASK_ESTIMATOR_OLD_SPEED_RATIO,

            MRSpeculateConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_OLD_SPEED_RATIO);

  }

  ...

  @Override

  public long estimatedRuntime(TaskAttemptId attemptID) {

    Long startTime = startTimes.get(attemptID);

    if (startTime == null) {

      return -1L;

    }

    EstimateVector vector = getEstimateVector(attemptID);

    if (vector == null) {

      return -1L;

    }

    long sunkTime = vector.atTime - startTime;

    double speed = vector.speed;

    float progress = vector.basedOnProgress;

    if (sunkTime > threshold) {

      // 处理的进度越小优先级越高

      return MAX_RUNTIME_VALUE - (long) (progress * 100000);

    }

    if (speed == 0) {

      return -1L;

    }

    double remainingTime = (1.0 - progress) / (speed + 1E-13);

    return sunkTime + (long) remainingTime;

  }

  ...

}

此时,只需要在mapreduce项目中引入自定义的TaskRuntimeEstimator,并在提交任务是配置如下配置即可生效:

yarn.app.mapreduce.am.job.task.estimator.class=com.gdt.log_process.dmp.mapreduce.v2.app.speculate.ThresholdExponentialSmoothedTaskRuntimeEstimator
yarn.app.mapreduce.am.job.task.estimator.dmp.threshold.threshold-ms=5400000 (ThresholdExponentialSmoothedTaskRuntimeEstimator中使用的自定义的配置)

03

 其他针对推测的优化建议

1. 在job的单个task需要处理的数据量较大,耗时比较长时,且资源充足的情况下,可以适当调大最大运行进行推测的任务数,相关配置:

  • mapreduce.job.speculative.speculative-cap-running-tasks :推测任务占当前正在运行的任务数的比例,默认为 0.1
  • mapreduce.job.speculative.speculative-cap-total-tasks :推测任务占全部要处理任务数的比例,默认为 0.01
  • mapreduce.job.speculative.minimum-allowed-tasks :最少允许同时运行的推测任务数量,默认为10 

2. 按照默认的推测启动的频次,前一次推测成功的话执行下一次推测要等待15秒,前一次推测失败的话下一次推测需要等待1秒。那么最好的情况下一小时推测任务最多只能启动 3600 / 15 = 240个。所以如果上面的配置调大的话,这里推测成功的等待耗时尽量调整小一些,防止这里成为瓶颈,相关配置:

  • mapreduce.job.speculative.retry-after-no-speculate :本次推测没有任务下发,执行下一次推测任务的等待时间,默认 1000(ms)
  • mapreduce.job.speculative.retry-after-speculate:本次推测有任务下发,执行下一次推测任务的等待时间,默认 15000(ms)

近期热文

浅谈SWOT分析法

《QQ炫舞》十二年成熟游戏的求变思新之路

基于Scrapy的爬虫解决方案

让我知道你在看

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
MapReduce推测机制及自定义推测机制
作者:wyattliu  腾讯TEG工程师 |导语 集群中的机器机型、负载并不一定完全相同,所以即使一个MapReduce任务每一个Map/Reduce ta...
<<上一篇
下一篇>>