Hikari连接池源码解析

发布时间 2023-10-09 19:48:12作者: 东北法师刘海柱

本文分享以下内容。

  1. 数据库连接的创建、获取和销毁
  2. 相关配置参数

涉及的类:

com.zaxxer.hikari.pool.HikariPool#HikariPool

涉及的变量:

private final AtomicInteger totalConnections;//连接总数


private final ThreadPoolExecutor addConnectionExecutor;//线程池,用于执行 POOL_ENTRY_CREATOR
private final PoolEntryCreator POOL_ENTRY_CREATOR = new PoolEntryCreator();//创建连接

private final ThreadPoolExecutor closeConnectionExecutor;//用于关闭连接

private final ScheduledThreadPoolExecutor houseKeepingExecutorService;//用于清理过期连接
private final ConcurrentBag<PoolEntry> connectionBag;//连接的载体

创建连接

有两个时机会创建连接

  1. 初始化 Hikaripool 时。会建立一次连接,建立后关闭,可以用于检查链路是否正常。
  2. 执行创建线程任务时,这个任务会创建线程,直到当前线程数量等于 max_pool_size。两种情况下会创建任务:
  3. 定期清理过期连接时,如果清理后连接数小于minimumIdle,会新建连接,重新填充连接池;
  4. 是从连接池中获取连接时,如果获取的连接是其他线程创建的,会创建新的连接“还”回去。
// 创建连接的底层方法,但是大部分时候是被 PoolEntry 类型包装,所以代码里常出现的 PoolEntry 其实就是连接。
Connection newConnection() throws Exception
{
   Connection connection = null;
   try {
      String username = config.getUsername();
      String password = config.getPassword();

      connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);
      if (connection == null) {
         throw new SQLTransientConnectionException("DataSource returned null unexpectedly");
      }

      setupConnection(connection);
      lastConnectionFailure.set(null);
      return connection;
   }
   catch (Exception e) {
      lastConnectionFailure.set(e);
      quietlyCloseConnection(connection, "(Failed to create/set connection)");
      throw e;
   }
}
//尝试建立一次连接
private void checkFailFast()
{
   if (config.isInitializationFailFast()) {
      try {
         newConnection().close();
      }
      catch (Throwable e) {
         try {
            shutdown();
         }
         catch (Throwable ex) {
            e.addSuppressed(ex);
         }

         throw new PoolInitializationException(e);
      }
   }
}
// 建立连接的主要方法
public Future<Boolean> addBagItem()
{
   return addConnectionExecutor.submit(POOL_ENTRY_CREATOR);//POOL_ENTRY_CREATOR 就是 PoolEntryCreator
}

// 建立连接的主要方法
private class PoolEntryCreator implements Callable<Boolean>
{
   @Override
   public Boolean call() throws Exception
   {
      long sleepBackoff = 250L;
      //当前连接数量小于 max_pool_size 时,循环创建连接
      while (poolState == POOL_NORMAL && totalConnections.get() < config.getMaximumPoolSize()) {
         final PoolEntry poolEntry = createPoolEntry();
         if (poolEntry != null) {
            totalConnections.incrementAndGet();
            connectionBag.add(poolEntry);
            return Boolean.TRUE;
         }

         // failed to get connection from db, sleep and retry
         quietlySleep(sleepBackoff);
         sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5)));
      }
      // Pool is suspended or shutdown or at max size
      return Boolean.FALSE;
   }

//Fill pool up from current idle connections (as they are perceived at the point of execution) 
//to minimumIdle connections.
private void fillPool()
{
   final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - totalConnections.get(), config.getMinimumIdle() - getIdleConnections())
                                - addConnectionExecutor.getQueue().size();
   for (int i = 0; i < connectionsToAdd; i++) {
      addBagItem();
   }

   if (connectionsToAdd > 0 && LOGGER.isDebugEnabled()) {
      addConnectionExecutor.execute(new Runnable() {
         @Override
         public void run() {
            logPoolState("After adding ");
         }
      });
   }
}
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
.....
            startSeq = synchronizer.currentSequence();
            for (T bagEntry : sharedList) {
               if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                  // if we might have stolen another thread's new connection, restart the add...
                  if (waiters.get() > 1 && addItemFuture == null) {
                     listener.addBagItem();
                  }

                  return bagEntry;
               }
            }
.....

   return null;
}
//The house keeping task to retire idle connections.
private class HouseKeeper implements Runnable
{
   private volatile long previous = clockSource.plusMillis(clockSource.currentTime(), -HOUSEKEEPING_PERIOD_MS);

   @Override
   public void run()
   {
      try {
         // refresh timeouts in case they changed via MBean
         connectionTimeout = config.getConnectionTimeout();
         validationTimeout = config.getValidationTimeout();
         leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());

         final long idleTimeout = config.getIdleTimeout();
         final long now = clockSource.currentTime();

         // Detect retrograde time, allowing +128ms as per NTP spec.
         if (clockSource.plusMillis(now, 128) < clockSource.plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {
            LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
                        clockSource.elapsedDisplayString(previous, now), poolName);
            previous = now;
            softEvictConnections();
            fillPool();
            return;
         }
         else if (now > clockSource.plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {
            // No point evicting for forward clock motion, this merely accelerates connection retirement anyway
            LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", clockSource.elapsedDisplayString(previous, now), poolName);
         }

         previous = now;

         String afterPrefix = "Pool ";
         if (idleTimeout > 0L) {
            final List<PoolEntry> idleList = connectionBag.values(STATE_NOT_IN_USE);
            int removable = idleList.size() - config.getMinimumIdle();
            if (removable > 0) {
               logPoolState("Before cleanup ");
               afterPrefix = "After cleanup  ";

               // Sort pool entries on lastAccessed
               Collections.sort(idleList, LASTACCESS_COMPARABLE);
               for (PoolEntry poolEntry : idleList) {
                  if (clockSource.elapsedMillis(poolEntry.lastAccessed, now) > idleTimeout && connectionBag.reserve(poolEntry)) {
                     closeConnection(poolEntry, "(connection has passed idleTimeout)");
                     if (--removable == 0) {
                        break; // keep min idle cons
                     }
                  }
               }
            }
         }

         logPoolState(afterPrefix);
         //检查连接数是否低于minimumIdle,如果低于则新建连接
         fillPool(); // Try to maintain minimum connections
      }
      catch (Exception e) {
         LOGGER.error("Unexpected exception in housekeeping task", e);
      }
   }
}

获取连接
方法写的比较直接,可以直接结合注释看。需要注意的是 sharedList 和 threadList 的设计。

public final Connection getConnection(final long hardTimeout) throws SQLException
{
   suspendResumeLock.acquire();
   final long startTime = clockSource.currentTime();

   try {
      long timeout = hardTimeout;
      do {
         final PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);//主要方法
         if (poolEntry == null) {
            break; // We timed out... break and throw exception
         }

         final long now = clockSource.currentTime();
         if (poolEntry.isMarkedEvicted() || (clockSource.elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
            //连接不可用,关闭连接
            closeConnection(poolEntry, "(connection is evicted or dead)"); // Throw away the dead connection (passed max age or failed alive test)
            timeout = hardTimeout - clockSource.elapsedMillis(startTime);
         }
         else {
            metricsTracker.recordBorrowStats(poolEntry, startTime);
            return poolEntry.createProxyConnection(leakTask.schedule(poolEntry), now);
         }
      } while (timeout > 0L);//timeout时间内没有获取到连接,抛异常
   }
   catch (InterruptedException e) {
      throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
   }
   finally {
      suspendResumeLock.release();
   }

   throw createTimeoutException(startTime);
}
private final CopyOnWriteArrayList<T> sharedList;
private final ThreadLocal<List<Object>> threadList;

private final boolean weakThreadLocals;


// 这里维护了两个列表,其中一个是 threadLocal 的,而且会优先使用这个列表。(有文章说这可能是hikari连接池快的原因之一)
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // Try the thread-local list first
   List<Object> list = threadList.get();
   if (weakThreadLocals && list == null) {
      list = new ArrayList<>(16);
      threadList.set(list);
   }

   for (int i = list.size() - 1; i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }

   // Otherwise, scan the shared list ... for maximum of timeout
   timeout = timeUnit.toNanos(timeout);
   Future<Boolean> addItemFuture = null;
   final long startScan = System.nanoTime();
   final long originTimeout = timeout;
   long startSeq;
   waiters.incrementAndGet();
   try {
      do {
         // scan the shared list
         do {
            startSeq = synchronizer.currentSequence();
            for (T bagEntry : sharedList) {
               if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                  // if we might have stolen another thread's new connection, restart the add...
                  if (waiters.get() > 1 && addItemFuture == null) {
                     listener.addBagItem();
                  }

                  return bagEntry;
               }
            }
         } while (startSeq < synchronizer.currentSequence());

         if (addItemFuture == null || addItemFuture.isDone()) {
            addItemFuture = listener.addBagItem();
         }

         timeout = originTimeout - (System.nanoTime() - startScan);
      } while (timeout > 10_000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout));
   }
   finally {
      waiters.decrementAndGet();
   }

   return null;
}

关闭连接

下列情况会关闭连接:

  1. 创建连接时,会绑定定时任务,当存活时间超过 maxLifetime 时,连接关闭
  2. 连接空闲且空闲时间超过 idleTimeout时,定时任务 HouseKeeper 会关闭连接
  3. 连接不可用时
  4. 主动令连接过期时
final void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
   if (connectionBag.remove(poolEntry)) {
      final int tc = totalConnections.decrementAndGet();
      if (tc < 0) {
         LOGGER.warn("{} - Unexpected value of totalConnections={}", poolName, tc, new Exception());
      }
      final Connection connection = poolEntry.close();
      closeConnectionExecutor.execute(new Runnable() {
         @Override
         public void run() {
            quietlyCloseConnection(connection, closureReason);
         }
      });
   }
}

配置参数

来源于 https://github.com/brettwooldridge/HikariCP
idleTimeout
This property controls the maximum amount of time that a connection is allowed to sit idle in the pool. This setting only applies when minimumIdle is defined to be less than maximumPoolSize. Idle connections will not be retired once the pool reaches minimumIdle connections. Whether a connection is retired as idle or not is subject to a maximum variation of +30 seconds, and average variation of +15 seconds. A connection will never be retired as idle before this timeout. A value of 0 means that idle connections are never removed from the pool. The minimum allowed value is 10000ms (10 seconds). Default: 600000 (10 minutes)

maxLifetime
This property controls the maximum lifetime of a connection in the pool. An in-use connection will never be retired, only when it is closed will it then be removed. On a connection-by-connection basis, minor negative attenuation is applied to avoid mass-extinction in the pool. We strongly recommend setting this value, and it should be several seconds shorter than any database or infrastructure imposed connection time limit. A value of 0 indicates no maximum lifetime (infinite lifetime), subject of course to the idleTimeout setting. The minimum allowed value is 30000ms (30 seconds). Default: 1800000 (30 minutes)

minimumIdle
This property controls the minimum number of idle connections that HikariCP tries to maintain in the pool. If the idle connections dip below this value and total connections in the pool are less than maximumPoolSize, HikariCP will make a best effort to add additional connections quickly and efficiently. However, for maximum performance and responsiveness to spike demands, we recommend not setting this value and instead allowing HikariCP to act as a fixed size connection pool. Default: same as maximumPoolSize
默认情况下, minimumIdle 和 maximumPoolSize相等,也就是说,不会有连接因为空闲时间过长而被清理。

maximumPoolSize
This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. Basically this value will determine the maximum number of actual connections to the database backend. A reasonable value for this is best determined by your execution environment. When the pool reaches this size, and no idle connections are available, calls to getConnection() will block for up to connectionTimeout milliseconds before timing out. Please read about pool sizing. Default: 10

总结

Hikari连接池是动态变化的。多个任务相互作用,连接不断地被创建、过期、被清理。通过设置参数可以控制连接新陈代谢的速度。