apollo客户端通知原理

微信公众号:PersistentCoder

一、使用场景

Apollo是携程开源的一个分布式配置中心,提供了丰富的能力,其中就包括配置发布动态通知。

动态通知有很多应用场景,其目的就是将配置的更新实时同步到应用内存粒度,比如:

  • 动态规则维护
  • 黑白名单
  • 半自动化刷新缓存

二、使用

本篇文章主要围绕半自动化刷新缓存展开。在电商环境,分为商家B端和客户C端,商家在平台或者ERP更新或者发布一些配置变更需要同步到C端让用户感知到最新的内容。

首先考虑到B端的配置变更频率不会太频繁,所以C端会做缓存,那么如果B端发生变更如何通知到C端刷新缓存拉取最新配置内容,有两种实现方式:

  • B端配置变更后发布消息,C端监听变更消息,然后自动失效缓存
  • B端配置变更后,手动通知C端,然后失效缓存,也就是半自动化刷新

自动失效缓存不展开分析,半自动化刷新实现也很简单,基于Apollo的客户端通知机制就可以实现,在配置中心发布变更主体,然后在应用层监听变更内容并做出响应操作即可。

1.Apollo新增配置

在配置平台新增业务相关的key-value:

apollo.xxx.config_refresh = {"buzzId":"xxx","platform":1,"version":3}

包含业务主体信息,以及版本字段(用于处理配置无变更问题)。

2.编写事件监听

使用ApolloConfigChangeListener注解标注处理对应key内容变更的方法。

@ApolloConfigChangeListener(interestedKeys = {APOLLO_XXX_CONFIG_REFRESH})
public void onChange(ConfigChangeEvent changeEvent) {
    Set<String> changedKeys = changeEvent.changedKeys();
    if(CollectionUtils.isEmpty(changedKeys)) {
        log.warn("onChange nothing change;changeKeys={}",changedKeys);
        return;
    }
    if(!changedKeys.contains(APOLLO_XXX_CONFIG_REFRESH)) {
        log.warn(".onChange change event not contains config;changeKeys={}",changedKeys);
        return;
    }
    log.info("onChange config change;start reinitialize ...........");
    ConfigChange change = changeEvent.getChange(APOLLO_XXX_CONFIG_REFRESH);
    String oldVal = change.getOldValue();
    String newVal = change.getNewValue();
    log.info("onChange;change '{}' from oldVal:{} to newValue:{}",APOLLO_XXX_CONFIG_REFRESH,oldVal,newVal);

    if(!this.isJson(newVal)) {
        log.info("onChange not valid json;newVal={}",newVal);
        return;
    }
    JSONObject json = JSON.parseObject(newVal);
    String buzzId = null;
    if(null == (buzzId = json.getString(BUZZ_ID_KEY))) {
        return;
    }
    Integer platform = json.getInteger(PLATFORM_KEY);

    Integer version = json.getInteger(VERSION_KEY);

    //手动让缓存失效
    try {
        this.doExpireCache(buzzId,platform,version)
    } catch (Exception e) {
        log.error("onChange refresh config cache occur error;buzzId={},platform={},version={}",buzzId,platform,version,e);
    }
}

这样在发生B端配置变更的时候,在配置平台发布对应key-value,然后C端应用接收到变更内容,就会做出相应处理,将缓存清掉。

三、原理&源码分析

从前边的案例可以看出,核心能力支撑就是Apollo的客户端通知,那么我们就来分析一下Apollo客户端通知能力的实现原理。

Apollo客户端通知的实现,分为三个维度分析,分别是配置变更监听器准备、变更通知准备、变更通知执行。

1.配置变更监听器准备

在不接入其他中间件封装的情况下,使用的入口是EnableApolloConfig注解,我们从该注解着手分析。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(ApolloConfigRegistrar.class)
public @interface EnableApolloConfig {

  String[] value() default {ConfigConsts.NAMESPACE_APPLICATION};

  int order() default Ordered.LOWEST_PRECEDENCE;
}

该注解导入并激活ApolloConfigRegistrar类。

public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar {

  private ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class);

  @Override
  public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    helper.registerBeanDefinitions(importingClassMetadata, registry);
  }
}

ApolloConfigRegistrar是一个ImportBeanDefinitionRegistrar,其原理和调用时机可参考《ImportBeanDefinitionRegistrar原理》,通过java spi机制加载ApolloConfigRegistrarHelper实现类DefaultApolloConfigRegistrarHelper的实例。

com.ctrip.framework.apollo.spring.spi.DefaultApolloConfigRegistrarHelper

然后调用registerBeanDefinitions方法注册BeanDefinition:

public class DefaultApolloConfigRegistrarHelper implements ApolloConfigRegistrarHelper {
  @Override
  public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    AnnotationAttributes attributes = AnnotationAttributes
        .fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName()));
   //省略
    BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloAnnotationProcessor.class.getName(),
        ApolloAnnotationProcessor.class);
    //省略
  }
  @Override
  public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
  }
}

其中有一行注册ApolloAnnotationProcessor类定义,我们看一下ApolloAnnotationProcessor是何方神圣。

它是一个BeanPostProcessor,父类ApolloProcessor重写了postProcessBeforeInitialization方法:

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
  throws BeansException {
Class clazz = bean.getClass();
for (Field field : findAllField(clazz)) {
  processField(bean, beanName, field);
}
for (Method method : findAllMethod(clazz)) {
  processMethod(bean, beanName, method);
}
return bean;
}

该方法在Bean实例化之后初始化之前执行,扫描目标类的所有属性和方法然后执行逻辑,我们重点看processMethod方法,看一下ApolloAnnotationProcessor实现:

@Override
protected void processMethod(final Object bean, String beanName, final Method method) {
    ApolloConfigChangeListener annotation = AnnotationUtils
        .findAnnotation(method, ApolloConfigChangeListener.class);
    if (annotation == null) {
      return;
    }
    Class<?>[] parameterTypes = method.getParameterTypes();

    ReflectionUtils.makeAccessible(method);
    String[] namespaces = annotation.value();
    String[] annotatedInterestedKeys = annotation.interestedKeys();
    String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
    ConfigChangeListener configChangeListener = new ConfigChangeListener() {
      @Override
      public void onChange(ConfigChangeEvent changeEvent) {
        ReflectionUtils.invokeMethod(method, bean, changeEvent);
      }
    };

    Set<String> interestedKeys = annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
    Set<String> interestedKeyPrefixes = annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes) : null;
    for (String namespace : namespaces) {
      Config config = ConfigService.getConfig(namespace);
      if (interestedKeys == null && interestedKeyPrefixes == null) {
        config.addChangeListener(configChangeListener);
      } else {
        config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes);
      }
    }
}

将ApolloConfigChangeListener标注的方法包装成ConfigChangeListener然后注册到对应namespace的Config中。

注册流程如下:

2.变更通知准备

前边分析了将客户端的通知变更逻辑封装成了监听器注册备用,那么谁去触发监听器的逻辑呢?

接下来我们分析下如何将变更和通知逻辑关联起来。

apollo-client包中spring.factories定义了ApolloApplicationContextInitializer类型ApplicationContextInitializer,而ApplicationContextInitializer会在应用启动时加载:

public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) {
  //...
  setInitializers((Collection) getSpringFactoriesInstances(ApplicationContextInitializer.class));
  //***
}

并且会在容器创建之后刷新之前执行ApplicationContextInitializer的initialize方法。

private void prepareContext(DefaultBootstrapContext bootstrapContext, ConfigurableApplicationContext context,
    ConfigurableEnvironment environment, SpringApplicationRunListeners listeners,
    ApplicationArguments applicationArguments, Banner printedBanner) {
  //***
  applyInitializers(context);
  //***
}

protected void applyInitializers(ConfigurableApplicationContext context) {
  for (ApplicationContextInitializer initializer : getInitializers()) {
    Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(),
        ApplicationContextInitializer.class);
    Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
    initializer.initialize(context);
  }
}

所以在应用启动的时候,ApolloApplicationContextInitializer的initialize会被调用到。

@Override
public void initialize(ConfigurableApplicationContext context) {
ConfigurableEnvironment environment = context.getEnvironment();

if (!environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false)) {
  logger.debug("Apollo bootstrap config is not enabled for context {}, see property: ${{}}", context, PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED);
  return;
}
logger.debug("Apollo bootstrap config is enabled for context {}", context);

initialize(environment);
}

调用内部initialize方法进行初始化操作:

  protected void initialize(ConfigurableEnvironment environment) {
    if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
      return;
    }
    String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
    logger.debug("Apollo bootstrap namespaces: {}", namespaces);
    List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);

    CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
    for (String namespace : namespaceList) {
      Config config = ConfigService.getConfig(namespace);

      composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
    }
    environment.getPropertySources().addFirst(composite);
  }

调用ConfigService#getConfig获取每个namespace的配置信息,具体会委托给DefaultConfigManager的实现:

public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);

if (config == null) {
  synchronized (this) {
    config = m_configs.get(namespace);

    if (config == null) {
      ConfigFactory factory = m_factoryManager.getFactory(namespace);

      config = factory.create(namespace);
      m_configs.put(namespace, config);
    }
  }
}

return config;
}

由于系统刚启动,Config还没被缓存,所以会通过调用ConfigFactory的create方法创建Config.

public Config create(String namespace) {
  ConfigFileFormat format = determineFileFormat(namespace);
  if (ConfigFileFormat.isPropertiesCompatible(format)) {
    return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
  }
  return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}

然后会调用到RemoteConfigRepository的构造方法:

public RemoteConfigRepository(String namespace) {
  //省略
  this.trySync();
  this.schedulePeriodicRefresh();
  this.scheduleLongPollingRefresh();
}

里边调用了三个方法,首次同步、定时刷新和长轮询刷新。

先看一下首次同步:

@Override
protected synchronized void sync() {
  Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
  try {
    ApolloConfig previous = m_configCache.get();
    ApolloConfig current = loadApolloConfig();

    //reference equals means HTTP 304
    if (previous != current) {
      logger.debug("Remote Config refreshed!");
      m_configCache.set(current);
      this.fireRepositoryChange(m_namespace, this.getConfig());
    }
    if (current != null) {
      Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
          current.getReleaseKey());
    }
    transaction.setStatus(Transaction.SUCCESS);
  } catch (Throwable ex) {
    transaction.setStatus(ex);
    throw ex;
  } finally {
    transaction.complete();
  }
}

将本地缓存和远程加载的数据进行对比,如果不一致,用远程覆盖本地,然后触发变更事件fireRepositoryChange:

protected void fireRepositoryChange(String namespace, Properties newProperties) {
  for (RepositoryChangeListener listener : m_listeners) {
    try {
      listener.onRepositoryChange(namespace, newProperties);
    } catch (Throwable ex) {
      Tracer.logError(ex);
      logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
    }
  }
}

然后会调用触发配置变更,调用ConfigChangeListener的逻辑:

protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
  for (final ConfigChangeListener listener : m_listeners) {
    // check whether the listener is interested in this change event
    if (!isConfigChangeListenerInterested(listener, changeEvent)) {
      continue;
    }
    m_executorService.submit(new Runnable() {
      @Override
      public void run() {
        String listenerName = listener.getClass().getName();
        Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
        try {
          listener.onChange(changeEvent);
          transaction.setStatus(Transaction.SUCCESS);
        } catch (Throwable ex) {
          transaction.setStatus(ex);
          Tracer.logError(ex);
          logger.error("Failed to invoke config change listener {}", listenerName, ex);
        } finally {
          transaction.complete();
        }
      }
    });
  }
}

对于定时刷新和长轮询刷新这两个功能在 apollo 的 github 文档中有介绍:

1.客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送
2.客户端还会定时从Apollo配置中心拉取应用的最新配置
这是一个fallback机制,为了防止推送机制失效导致配置不更新
客户端定时拉取会上报本地版本,通常对于定时拉取操作,服务端都会返回304
定时频率默认每5分钟拉一次,客户端也可以通过在运行时指定来覆盖,单位分钟。3.客户端从Apollo配置中心获取应用的最新配置后,会保存在内存中
4.客户端会把从服务端获取到的配置在本地缓存一份
遇到服务不可用,或网络不通时,依然能从本地恢复配置
5应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知

长连接是更新配置的主要手段,定时刷新是辅助手段,避免长轮训失败造成数据更新丢失。

看一下定时刷新实现:

private void schedulePeriodicRefresh() {
  logger.debug("Schedule periodic refresh with interval: {} {}",
      m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
  m_executorService.scheduleAtFixedRate(
      new Runnable() {
        @Override
        public void run() {
          Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
          logger.debug("refresh config for namespace: {}", m_namespace);
          trySync();
          Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
        }
      }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
      m_configUtil.getRefreshIntervalTimeUnit());
}

定时调用trySync方法实现数据同步,然后触发ConfigChangeListener逻辑。

然后看一下长轮询实现:

  private void scheduleLongPollingRefresh() {
    remoteConfigLongPollService.submit(m_namespace, this);
  }

调用startLongPolling方法开启长轮询:

private void startLongPolling() {
  try {
    final String appId = m_configUtil.getAppId();
    final String cluster = m_configUtil.getCluster();
    final String dataCenter = m_configUtil.getDataCenter();
    final String secret = m_configUtil.getAccessKeySecret();
    final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
    m_longPollingService.submit(new Runnable() {
      @Override
      public void run() {
        if (longPollingInitialDelayInMills > 0) {
          try {

            TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
          } catch (InterruptedException e) {
          }
        }
        doLongPollingRefresh(appId, cluster, dataCenter, secret);
      }
    });
  } catch (Throwable ex) {
    m_longPollStarted.set(false);
    ApolloConfigException exception =
        new ApolloConfigException("Schedule long polling refresh failed", ex);

  }
}

调用doLongPollingRefresh方法执行长轮询刷新逻辑:

private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
  final Random random = new Random();
  ServiceDTO lastServiceDto = null;
  while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
    if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
    }
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
    String url = null;
    try {
      if (lastServiceDto == null) {
        List<ServiceDTO> configServices = getConfigServices();
        lastServiceDto = configServices.get(random.nextInt(configServices.size()));
      }
      url =
          assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
              m_notifications);

      HttpRequest request = new HttpRequest(url);
      request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
      if (!StringUtils.isBlank(secret)) {
        Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
        request.setHeaders(headers);
      }
      transaction.addData("Url", url);
      final HttpResponse<List<ApolloConfigNotification>> response =
          m_httpUtil.doGet(request, m_responseType);
      if (response.getStatusCode() == 200 && response.getBody() != null) {
        updateNotifications(response.getBody());
        updateRemoteNotifications(response.getBody());
        transaction.addData("Result", response.getBody().toString());
        notify(lastServiceDto, response.getBody());
      }
      if (response.getStatusCode() == 304 && random.nextBoolean()) {
        lastServiceDto = null;
      }
      m_longPollFailSchedulePolicyInSecond.success();
      transaction.addData("StatusCode", response.getStatusCode());
      transaction.setStatus(Transaction.SUCCESS);
    } catch (Throwable ex) {
      try {
        TimeUnit.SECONDS.sleep(sleepTimeInSecond);
      } catch (InterruptedException ie) {

      }
    } finally {
      transaction.complete();
    }
  }
}

每5s主动从Apollo Server拉取数据,如果请求成功,通知RemoteConfigRepository:

public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
  m_longPollServiceDto.set(longPollNotifiedServiceDto);
  m_remoteMessages.set(remoteMessages);
  m_executorService.submit(new Runnable() {
    @Override
    public void run() {
      m_configNeedForceRefresh.set(true);
      trySync();
    }
  });
}

和定时刷新一样,也调用到了trySync逻辑,最后触发注册到对应namespace的Config上的ConfigChangeListener逻辑。

3.变更通知执行

用户更新配置时,客户端如何监听到变更事件并做出响应处理呢?

基于前一小结,如果用户发布了属性变更,RemoteConfigRepository的定时刷新或长轮询逻辑会从Apollo Server拉取最新数据到本地,然后和本地缓存(上一个版本数据)做对比,如果发现不一样则触发配置变更,调用ConfigChangeListener逻辑。

四、相关实现

1.redis事件通知

比如我们要监听redis中的key失效事件,本地做一些定制化逻辑,那么就需要开启redis事件通知能力,然后本地做实现KeyExpirationEventMessageListener接口:

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    /**
     * 针对redis数据失效事件,进行逻辑处理
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        // 失效逻辑
        }
    }
}

redis中的key失效时会触发KeyExpirationEventMessageListener的onMessage,这样就实现了redis客户端的事件通知。

2.zookeeper watcher机制

在使用zk做做注册中心或者分布式锁场景,我们需要监听zk的节点变更事件,比如节点被删除,那么客户端就需要监听该事件,然后本地做一些逻辑处理。

public class WatcherDemo implements Watcher{
    public void process(WatchedEvent event) {
      //do something
    }
}

节点变更事件类型有NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged和None,对于注册中心场景,服务消费者监听到服务节点被删除,那么可以在本地剔除远程服务节点。

五、为什么使用长轮询

关于为什么使用 HTTP 长轮询,估计接触 Apollo 的人看到客户端通知实现方式时都会疑惑,为什么使用这种方式,而不是其他方式?

在网上找到了Apollo作者对该问题的解答

  1. 为什么不使用消息系统?太复杂,杀鸡用牛刀。
  2. 为什么不用 TCP 长连接?对网络环境要求高,容易推送失败。且有双写问题。
  3. 为什么使用 HTTP 长轮询?性能足够,结合 Servlet3 的异步特性,能够维持万级连接(一个客户端只有一个长连接)。直接使用 Servlet 的 HTTP 协议,比单独用 TCP 连接方便。HTTP 请求/响应模式,保证了不会出现双写的情况。最主要还是简单,性能暂时不是瓶颈。

版权声明:
作者:叔牙
链接:https://jkboy.com/archives/17912.html
来源:随风的博客
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
apollo客户端通知原理
Apollo是携程开源的一个分布式配置中心,提供了丰富的能力,其中就包括配置发布动态通知。
<<上一篇
下一篇>>