Memcached和Redis的Fail Fast实现暨Fault Tolerance设计

Fault Tolerance

大型互联网应用系统通常会有成百上千个子系统,例如前端(PHP)会依赖业务平台(Java),业务平台会依赖资源平台(C++)。而且往往是一个简单前端功能越往下依赖,包括服务,组件,模块,会越来越多。这种复杂的树状依赖关系,往往会把底层的资源的问题迅速扩散到这个系统,导致雪崩效应的发生。

这就要求在设计每个子系统必须Fault Tolerance(尤其是边界处,即对外依赖),实现Fault Tolerance必须要满足如下条件,摘自http://en.wikipedia.org/wiki/Fault_tolerance

  1. 没有单点
  2. 异常隔离,即能够感知并恰当处理来自下层的异常
  3. 异常控制,即把异常的影响范围控制在局部系统,避免影响全局
  4. 有效的恢复模式

“有效的恢复模式”具体可以参考如下措施:

  1. 系统的物理隔离性及可替代性(如多机房)
  2. 流量可迁移(如切DNS)
  3. 业务降级(即系统各部分都具备自裁的能力,^_^)

上面提到的主要是系统宏观层面架构设计的一些准则,而落实到代码实现层面,需要重点关注的是系统的边界,即各种客户端的“异常隔离”能力,下面重点分析一些memcache和redis客户端的异常隔离实现机制。

Fail Fast

对于memcached和redis客户端异常隔离实现有好多种叫法,有的叫Fail Fast,有的叫Fail Over。首先需要做一个名词解释,释义来自wiki:

  • Fail Fast - 指系统响应异常的速度
  • Fail Over - 指系统从异常中恢复

当然还有很多其他词,Fail Stop,Fail Safe等等就不一一展开了,科普到此结束。

Memcached Client的Fail Fast实现:

主要是在SockIOPool中实现的(即在连接池中实现),主要分成两块,一个是创建连接的fail fast和请求异常的fail fast。 对于创建连接的fail fast实现原理比较简单,即每次创建失败后设置一个间隔时间,此间隔时间段内不接受新建连接的请求,同时间隔时间每次递增。

实现上所采用的数据结构也比较简单,通过两个Map来保存异常状态,可以考虑一下为什么要用两个Map:

 // dead server map
 private ConcurrentMap<String, Date> hostDead;
 private ConcurrentMap<String, Long> hostDeadDur;

 protected SockIO createSocket(String host) {
      // if host is dead, then we don't need to try again
      // until the dead status has expired
      // we do not try to put back in if failback is off
      hostDeadLock.lock();
      try {
           if(failover && failback && shouldStopCreateSocket(host)){
                return null;
           }
      } finally {
           hostDeadLock.unlock();
      }

      …… // 具体创建连接

      // if we failed to get socket, then mark
      // host dead for a duration which falls off
      hostDeadLock.lock();
      try {
           if (socket == null) {
                if(doubleCheckWhenCreateSocket && shouldStopCreateSocket(host)){
                     return null;
                }
                Date now = new Date();
                hostDead.put(host, now);

                long expire = (hostDeadDur.containsKey(host)) ? (((Long) hostDeadDur
                          .get(host)).longValue() * 2)
                          : 1000;

                if (expire > MAX_RETRY_DELAY)
                     expire = MAX_RETRY_DELAY;

                hostDeadDur.put(host, new Long(expire));
                if (log.isDebugEnabled())
                     log.debug("++++ ignoring dead host: " + host + " for "
                               + expire + " ms");
                
                // also clear all entries for this host from availPool
                clearHostFromPool(host);
           } else {
                if (log.isDebugEnabled())
                     log.debug("++++ created socket (" + socket.toString()
                               + ") for host: " + host);
                if (hostDead.containsKey(host) || hostDeadDur.containsKey(host)) {
                     hostDead.remove(host);
                     hostDeadDur.remove(host);
                }
           }
      } finally {
           hostDeadLock.unlock();
      }

连接存续期间的所有请求类异常(IOException)也都会使连接进入到Fail Fast状态,实现原理上是通过回收线程来维护异常连接池健康检测。落实到代码上,数据结构采用了三个Map来保存连接的状态:

 // map to hold all available sockets
 // map to hold socket status;
 private ConcurrentMap<String, ConcurrentMap<SockIO, Long>> socketPool;       // 所有正常的连接池(包括空闲的和工作中的)
 private ConcurrentMap<String, Queue <SockIO>> usedPool;                                 // 当前空闲的连接池
 private ConcurrentMap<String, ConcurrentMap<SockIO, Integer>> statusPool;     // 不正常的连接池,即将被MaintThread回收掉的

先来看一下回收线程的实现:

 protected void selfMaint() {
      // find out how many to create
      for (Iterator<String> i = socketPool.keySet().iterator(); i.hasNext();) {
           String host = i.next();
           Map<SockIO, Long> sockets = socketPool.get(host);

           int usedcount =0;
           // 当前域名的所有连接数 - 空闲连接数 - 异常连接数
           usedcount=sockets.size()-usedPool.get(host).size()-statusPool.get(host).size();


           if (log.isDebugEnabled())
                log.debug("++++ Size of avail pool for host (" + host + ") = "
                          + sockets.size());

           // 本意应该是空闲连接数小于最小连接数的时候需要创建新连接
           if (sockets != null && sockets.size() - usedcount < minConn) {
                // need to create new sockets
                int need = minConn - sockets.size() + usedcount;
                needSockets.put(host, need);
           }
      }

      // now create
      for (String host : needSockets.keySet()) {
           Integer need = needSockets.get(host);

           // 创建连接......
      }

      // 对创建了过多连接的域名,开始关闭连接
      for (Iterator<String> i = socketPool.keySet().iterator(); i.hasNext();) {
           String host = i.next();
           Map<SockIO, Long> sockets = socketPool.get(host);

           if (log.isDebugEnabled())
                log.debug("++++ Size of avail pool for host (" + host + ") = "
                          + sockets.size());

           int usedcount = 0;

           usedcount=sockets.size()-usedPool.get(host).size()-statusPool.get(host).size();
           if (sockets != null && (sockets.size() - usedcount > maxConn)) {
                // 关闭连接......
           }
      }

      // finally clean out the deadPool
      for (Iterator<String> i = statusPool.keySet().iterator(); i.hasNext();) {
           String host = i.next();
           Map<SockIO, Integer> sockets = statusPool.get(host);

           // loop through all connections and check to see if we have any hung
           // connections

           for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
                // remove stale entries
                SockIO socket = j.next();

                try {
                     Integer status = null;
                     if (sockets != null && socket != null)
                          status = sockets.get(socket);

                     if (status != null
                               && status.intValue() == SOCKET_STATUS_DEAD) {

                          if (socketPool.containsKey(host))
                               socketPool.get(host).remove(socket);

                          if (statusPool.containsKey(host))
                               statusPool.get(host).remove(socket);

                          // bug: remove key wrong
                          // usedPool.remove(socket);
                          usedPool.get(host).remove(socket);
                          
                          socket.trueClose(false);    // 这里有些tricky,参数false表示不会再像当前操作的Map中记录异常连接,有更好的实现方式吗?

                          socket = null;
                     }
                } catch (Exception ex) {
                     log.error("++++ failed to close SockIO obj from deadPool");
                     log.error(ex.getMessage(), ex);
                }
           }

      }

而所有请求操作都会捕获IOException,然后通过trueClose函数来记录异常连接状态:

      catch ( IOException e ) {
           ApiLogger.fire(new StringBuilder(64).append("MC ").append(sock.getHost()).append(" Error:").append(e).toString());
           // if we have an errorHandler, use its hook
           if ( errorHandler != null )
                errorHandler.handleErrorOnGet( this, e, key );

           // exception thrown
           log.error( "++++ exception thrown while trying to get object from cache for key: " + key + ", sorket:" + sock.toString());
           log.error( e.getMessage(), e );

           try {
                sock.trueClose();   // 这里调用的是SockIO,trueClose会把socket记录到dead pool,来标记连接不可用
           }
           catch ( IOException ioe ) {
                log.error( "++++ failed to close socket : " + sock.toString() );
           }
           sock = null;
     }

正常的连接关闭逻辑如下:

 /**
  * Checks a SockIO object in with the pool.
  * 
  * This will remove SocketIO from busy pool, and optionally<br/> add to
  * avail pool.
  * 
  * @param socket
  *            socket to return
  * @param addToAvail
  *            add to avail pool if true
  */
 private void checkIn(SockIO socket, boolean addToAvail) {

      String host = socket.getHost();
      if (log.isDebugEnabled())
           log.debug("++++ calling check-in on socket: " + socket.toString()
                     + " for host: " + host);

      if (socket.isConnected() && addToAvail) {
           Queue<SockIO> queue=usedPool.get(host);
           queue.add(socket);
           
           // add to avail pool
           if (log.isDebugEnabled())
                log.debug("++++ returning socket (" + socket.toString()
                          + " to avail pool for host: " + host);
           
           
      } else {
           addSocketToPool(statusPool, host, socket, SOCKET_STATUS_DEAD, true);
           // socket = null;
      }

 }

留下几个问题:

  1. SockIOPool实现中的MaintThread需要创建多少个?是每个端口创建一个,还是全局创建一个?为什么?
  2. 找出MemCachedClient中getMulti函数实现如何实现Fail Fast的(即如何记录IOException的)?
  3. 假设当前端口配置最大,最小连接数都为5,且当前共创建了5个连接,其中3个正常,2个异常。问:socketPool,usedPool,statusPool的size分别是多大?
  4. usedPool在实现上使用Queue接口,为什么不用Stack或者List?好处是什么?为什么采用ConcurrentLinkedQueue而不是LinkedBlockingQueue或其他实现?
  5. MaintThread是通过继承Thread实现while(true) {…}的方法来实现回收线程,有没有更好,更健壮的实现方案?

Redis Client的Fail Fast实现

Redis由于采用域名配置方式(MC采用的是IP + Port的方式),所以对原有的Jedis实现进行了改良,支持自动感知DNS的变化来实现负载均衡。 代码架构

JedisPort   —> EndpointPool  —>JedisEndpointFactory
                      ^
                      |
               EndpointManagerImpl

其中JedisPort主要是实现redis的协议部分,并负责处理请求异常 EndpointPool顾名思义,就是连接池的实现,主要有borrow和return函数 JedisEndpointFactory负责具体创建、释放、检测连接 EndpointManagerImpl负责实现回收线程部分

其中Redis核心Fail Fast的实现如下,它主要通过三个变量continueFalseCount和continueFalseCountMinThreshold,continueFalseCountMaxThreshold来进行fail fast的控制

 private AtomicInteger continueFalseCount = new AtomicInteger(0);
 /** 连续失败N次后,设置server healthy为false */
 private int continueFalseCountMinThreshold = 10;
 private int continueFalseCountMaxThreshold = 30;
 
 private void incFalseCount() { 
     int falseCount = continueFalseCount.incrementAndGet();
      if(falseCount >= this.continueFalseCountMaxThreshold){
           compareAndsetHealthy(true, false);
           continueFalseCount.set(0);
           
     }else if(falseCount > this.continueFalseCountMinThreshold){
           HostAddressWatcherImpl.getInstance().tryWatchHostAddressImmediately(getConfig().getHostname());
           
      } 
}

留下几个问题:

  1. Redis的Fail Fast实现在Server端服务不可用的情况下表现有何不同?
  2. 参考以上Fail Fast的实现,请问HBase客户端的Fail Fast应该如何实现?
Written on November 3, 2014