DbConnectionPool.java

/*
 * This file is part of Waarp Project (named also Waarp or GG).
 *
 *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
 *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
 * individual contributors.
 *
 *  All Waarp Project is free software: you can redistribute it and/or
 * modify it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along with
 * Waarp . If not, see <http://www.gnu.org/licenses/>.
 */
package org.waarp.common.database;

import javax.sql.ConnectionEvent;
import javax.sql.ConnectionEventListener;
import javax.sql.ConnectionPoolDataSource;
import javax.sql.PooledConnection;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * A simple standalone JDBC connection pool manager.
 * <p/>
 * The public methods of this class are thread-safe.
 * <p/>
 * Nothe that JDBC4 is needed and isValid() must be implemented (not yet in
 * PostGre in April 2012)
 * <p/>
 *
 * @author Christian d'Heureuse, Inventec Informatik AG, Zurich, Switzerland<br>
 *     Multi-licensed: EPL/LGPL/MPL.
 *     <br>
 *     Add TimerTask support to close after some "delay" any still connected
 *     sessions
 */
public class DbConnectionPool {
  private ConnectionPoolDataSource dataSource;

  private final int maxConnections;

  private final int timeout;

  private static final long TIME_OUT_FORCE_CLOSE = 300000; // 5 minutes

  private Semaphore semaphore;

  private final Queue<Con> recycledConnections;

  private int activeConnections;

  private final PoolConnectionEventListener poolConnectionEventListener;

  private boolean isDisposed;

  private static class Con {
    final PooledConnection pooledCon;

    final long lastRecyle;

    private Con(final PooledConnection pooledCon) {
      this.pooledCon = pooledCon;
      lastRecyle = System.currentTimeMillis();
    }

    @Override
    public final boolean equals(final Object o) {
      if (this == o) {
        return true;
      }
      if (!(o instanceof Con)) {
        return false;
      }

      final Con con = (Con) o;

      return pooledCon.equals(con.pooledCon);
    }

    @Override
    public final int hashCode() {
      return pooledCon.hashCode();
    }
  }

  /**
   * Class to check validity of connections in the pool
   */
  private static class TimerTaskCheckConnections extends TimerTask {
    DbConnectionPool pool;
    Timer timer;
    long delay;

    /**
     * @param timer
     * @param delay
     * @param pool
     */
    private TimerTaskCheckConnections(final Timer timer, final long delay,
                                      final DbConnectionPool pool) {
      if (pool == null || timer == null || delay < 1000) {
        throw new IllegalArgumentException(
            "Invalid values. Need pool, timer and delay >= 1000");
      }
      this.pool = pool;
      this.timer = timer;
      this.delay = delay;
    }

    @Override
    public void run() {
      final Iterator<Con> conIterator = pool.recycledConnections.iterator();
      final long now = System.currentTimeMillis();
      while (conIterator.hasNext()) {
        final Con c = conIterator.next();
        if (c.lastRecyle + TIME_OUT_FORCE_CLOSE < now) {
          conIterator.remove();
          pool.closeConnectionNoEx(c.pooledCon);
        } else {
          try {
            if (!c.pooledCon.getConnection()
                            .isValid(DbConstant.VALIDTESTDURATION)) {
              conIterator.remove();
              pool.closeConnectionNoEx(c.pooledCon);
            }
          } catch (final SQLException e) {
            conIterator.remove();
            pool.closeConnectionNoEx(c.pooledCon);
          }
        }
      }
      timer.schedule(this, delay);
    }

  }

  /**
   * Release all idle connections
   */
  public synchronized void freeIdleConnections() {
    final Iterator<Con> conIterator = recycledConnections.iterator();
    final long now = System.currentTimeMillis();
    while (conIterator.hasNext()) {
      final Con c = conIterator.next();
      if (c.lastRecyle + TIME_OUT_FORCE_CLOSE < now) {
        conIterator.remove();
        closeConnectionNoEx(c.pooledCon);
      }
    }
  }

  /**
   * Thrown in when no free connection becomes available within
   * {@code timeout} seconds.
   */
  private static class TimeoutException extends RuntimeException {
    private static final long serialVersionUID = 1;

    public TimeoutException() {
      super("Timeout while waiting for a free database connection.");
    }
  }

  /**
   * Constructs a MiniConnectionPoolManager object with no timeout and no
   * limit.
   *
   * @param dataSource the data source for the connections.
   */
  public DbConnectionPool(final ConnectionPoolDataSource dataSource) {
    this(dataSource, 0, DbConstant.DELAYMAXCONNECTION);
  }

  /**
   * Constructs a MiniConnectionPoolManager object with no timeout and no
   * limit.
   *
   * @param dataSource the data source for the connections.
   * @param timer
   * @param delay in ms period of time to check existing connections
   *     and
   *     limit to get a new connection
   */
  public DbConnectionPool(final ConnectionPoolDataSource dataSource,
                          final Timer timer, final long delay) {
    this(dataSource, 0, (int) (delay / 1000));
    timer.schedule(new TimerTaskCheckConnections(timer, delay, this), delay);
  }

  /**
   * Constructs a MiniConnectionPoolManager object with a timeout of
   * DbConstant.DELAYMAXCONNECTION seconds.
   *
   * @param dataSource the data source for the connections.
   * @param maxConnections the maximum number of connections. 0 means
   *     no
   *     limit
   */
  public DbConnectionPool(final ConnectionPoolDataSource dataSource,
                          final int maxConnections) {
    this(dataSource, maxConnections, DbConstant.DELAYMAXCONNECTION);
  }

  /**
   * Constructs a ConnectionPool object.
   *
   * @param dataSource the data source for the connections.
   * @param maxConnections the maximum number of connections. 0 means
   *     no
   *     limit
   * @param timeout the maximum time in seconds to wait for a free
   *     connection.
   */
  public DbConnectionPool(final ConnectionPoolDataSource dataSource,
                          final int maxConnections, final int timeout) {
    this.dataSource = dataSource;
    this.maxConnections = maxConnections;
    this.timeout = timeout;
    if (maxConnections != 0) {
      if (timeout <= 0) {
        throw new IllegalArgumentException("Invalid timeout value.");
      }
      semaphore = new Semaphore(maxConnections, true);
    }
    recycledConnections = new ArrayDeque<Con>();
    poolConnectionEventListener = new PoolConnectionEventListener(this);
  }

  public synchronized void resetPoolDataSource(
      final ConnectionPoolDataSource dataSource) {
    this.dataSource = dataSource;
  }

  /**
   * @return the max number of connections
   */
  public final int getMaxConnections() {
    return maxConnections;
  }

  /**
   * @return the Login Timeout in second
   */
  public final long getLoginTimeout() {
    return timeout;
  }

  /**
   * @return the Force Close Timeout in ms
   */
  public final long getTimeoutForceClose() {
    return TIME_OUT_FORCE_CLOSE;
  }

  /**
   * Closes all unused pooled connections.
   *
   * @throws SQLException //
   */
  public synchronized void dispose() throws SQLException {
    if (isDisposed) {
      return;
    }
    isDisposed = true;
    SQLException e = null;
    while (!recycledConnections.isEmpty()) {
      final Con c = recycledConnections.remove();
      final PooledConnection pconn = c.pooledCon;
      try {
        pconn.close();
      } catch (final SQLException e2) {
        if (e == null) {
          e = e2;
        }
      }
    }
    if (e != null) {
      throw e;
    }
  }

  /**
   * Retrieves a connection from the connection pool. If
   * {@code maxConnections} connections are already in
   * use, the method waits until a connection becomes available or
   * {@code timeout} seconds elapsed. When
   * the application is finished using the connection, it must close it in
   * order
   * to return it to the pool.
   *
   * @return a new Connection object.
   *
   * @throws TimeoutException when no connection becomes available
   *     within
   *     {@code timeout} seconds.
   * @throws SQLException //
   */
  public Connection getConnection() throws SQLException {
    // This routine is unsynchronized, because semaphore.tryAcquire() may
    // block.
    synchronized (this) {
      if (isDisposed) {
        throw new IllegalStateException("Connection pool has been disposed.");
      }
    }
    if (semaphore != null) {
      try {
        if (!semaphore.tryAcquire(timeout, TimeUnit.SECONDS)) {
          throw new TimeoutException();
        }
      } catch (final InterruptedException e) {//NOSONAR
        throw new RuntimeException(
            "Interrupted while waiting for a database connection.", e);
      }
    }
    boolean ok = false;
    try {
      final Connection conn = getConnection2();
      ok = true;
      return conn;
    } finally {
      if (semaphore != null && !ok) {
        semaphore.release();
      }
    }
  }

  private synchronized Connection getConnection2() throws SQLException {
    if (isDisposed) {
      throw new IllegalStateException(
          "Connection pool has been disposed."); // test again with
    }
    // lock
    final long time = System.currentTimeMillis() + timeout * 1000;
    while (time <= System.currentTimeMillis()) {
      final PooledConnection pconn;
      if (!recycledConnections.isEmpty()) {
        pconn = recycledConnections.remove().pooledCon;
      } else {
        pconn = dataSource.getPooledConnection();
      }

      final Connection conn = pconn.getConnection();
      if (conn.isValid(DbConstant.VALIDTESTDURATION)) {
        activeConnections++;
        pconn.addConnectionEventListener(poolConnectionEventListener);
        assertInnerState();
        return conn;
      }
    }

    throw new SQLException("Could not get a valid connection before timeout");
  }

  private synchronized void recycleConnection(final PooledConnection pconn) {
    if (isDisposed) {
      disposeConnection(pconn);
      return;
    }
    try {
      if (!pconn.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
        disposeConnection(pconn);
        return;
      }
    } catch (final SQLException e) {
      disposeConnection(pconn);
      return;
    }
    if (activeConnections <= 0) {
      throw new AssertionError();
    }
    activeConnections--;
    if (semaphore != null) {
      semaphore.release();
    }
    recycledConnections.add(new Con(pconn));
    assertInnerState();
  }

  private synchronized void disposeConnection(final PooledConnection pconn) {
    if (activeConnections <= 0) {
      throw new AssertionError();
    }
    activeConnections--;
    if (semaphore != null) {
      semaphore.release();
    }
    closeConnectionNoEx(pconn);
    assertInnerState();
  }

  private void closeConnectionNoEx(final PooledConnection pconn) {
    try {
      pconn.close();
    } catch (final SQLException e) {
      //
    }
  }

  private void assertInnerState() {
    if (activeConnections < 0) {
      throw new AssertionError();
    }
    if (semaphore != null) {
      if (activeConnections + recycledConnections.size() > maxConnections) {
        throw new AssertionError();
      }
      if (activeConnections + semaphore.availablePermits() > maxConnections) {
        throw new AssertionError();
      }
    }
  }

  private static class PoolConnectionEventListener
      implements ConnectionEventListener {
    private final DbConnectionPool pool;

    private PoolConnectionEventListener(final DbConnectionPool pool) {
      this.pool = pool;
    }

    @Override
    public final void connectionClosed(final ConnectionEvent event) {
      final PooledConnection pconn = (PooledConnection) event.getSource();
      pconn.removeConnectionEventListener(this);
      pool.recycleConnection(pconn);
    }

    @Override
    public final void connectionErrorOccurred(final ConnectionEvent event) {
      final PooledConnection pconn = (PooledConnection) event.getSource();
      pconn.removeConnectionEventListener(this);
      pool.disposeConnection(pconn);
    }
  }

  /**
   * Returns the number of active (open) connections of this pool. This is the
   * number of {@code Connection}
   * objects that have been issued by {@link #getConnection()} for which
   * {@code Connection.close()} has not
   * yet been called.
   *
   * @return the number of active connections.
   */
  public synchronized int getActiveConnections() {
    return activeConnections;
  }
}