View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.common.database;
21  
22  import javax.sql.ConnectionEvent;
23  import javax.sql.ConnectionEventListener;
24  import javax.sql.ConnectionPoolDataSource;
25  import javax.sql.PooledConnection;
26  import java.sql.Connection;
27  import java.sql.SQLException;
28  import java.util.ArrayDeque;
29  import java.util.Iterator;
30  import java.util.Queue;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  import java.util.concurrent.Semaphore;
34  import java.util.concurrent.TimeUnit;
35  
36  /**
37   * A simple standalone JDBC connection pool manager.
38   * <p/>
39   * The public methods of this class are thread-safe.
40   * <p/>
41   * Nothe that JDBC4 is needed and isValid() must be implemented (not yet in
42   * PostGre in April 2012)
43   * <p/>
44   *
45   * @author Christian d'Heureuse, Inventec Informatik AG, Zurich, Switzerland<br>
46   *     Multi-licensed: EPL/LGPL/MPL.
47   *     <br>
48   *     Add TimerTask support to close after some "delay" any still connected
49   *     sessions
50   */
51  public class DbConnectionPool {
52    private ConnectionPoolDataSource dataSource;
53  
54    private final int maxConnections;
55  
56    private final int timeout;
57  
58    private static final long TIME_OUT_FORCE_CLOSE = 300000; // 5 minutes
59  
60    private Semaphore semaphore;
61  
62    private final Queue<Con> recycledConnections;
63  
64    private int activeConnections;
65  
66    private final PoolConnectionEventListener poolConnectionEventListener;
67  
68    private boolean isDisposed;
69  
70    private static class Con {
71      final PooledConnection pooledCon;
72  
73      final long lastRecyle;
74  
75      private Con(final PooledConnection pooledCon) {
76        this.pooledCon = pooledCon;
77        lastRecyle = System.currentTimeMillis();
78      }
79  
80      @Override
81      public final boolean equals(final Object o) {
82        if (this == o) {
83          return true;
84        }
85        if (!(o instanceof Con)) {
86          return false;
87        }
88  
89        final Con con = (Con) o;
90  
91        return pooledCon.equals(con.pooledCon);
92      }
93  
94      @Override
95      public final int hashCode() {
96        return pooledCon.hashCode();
97      }
98    }
99  
100   /**
101    * Class to check validity of connections in the pool
102    */
103   private static class TimerTaskCheckConnections extends TimerTask {
104     DbConnectionPool pool;
105     Timer timer;
106     long delay;
107 
108     /**
109      * @param timer
110      * @param delay
111      * @param pool
112      */
113     private TimerTaskCheckConnections(final Timer timer, final long delay,
114                                       final DbConnectionPool pool) {
115       if (pool == null || timer == null || delay < 1000) {
116         throw new IllegalArgumentException(
117             "Invalid values. Need pool, timer and delay >= 1000");
118       }
119       this.pool = pool;
120       this.timer = timer;
121       this.delay = delay;
122     }
123 
124     @Override
125     public void run() {
126       final Iterator<Con> conIterator = pool.recycledConnections.iterator();
127       final long now = System.currentTimeMillis();
128       while (conIterator.hasNext()) {
129         final Con c = conIterator.next();
130         if (c.lastRecyle + TIME_OUT_FORCE_CLOSE < now) {
131           conIterator.remove();
132           pool.closeConnectionNoEx(c.pooledCon);
133         } else {
134           try {
135             if (!c.pooledCon.getConnection()
136                             .isValid(DbConstant.VALIDTESTDURATION)) {
137               conIterator.remove();
138               pool.closeConnectionNoEx(c.pooledCon);
139             }
140           } catch (final SQLException e) {
141             conIterator.remove();
142             pool.closeConnectionNoEx(c.pooledCon);
143           }
144         }
145       }
146       timer.schedule(this, delay);
147     }
148 
149   }
150 
151   /**
152    * Release all idle connections
153    */
154   public synchronized void freeIdleConnections() {
155     final Iterator<Con> conIterator = recycledConnections.iterator();
156     final long now = System.currentTimeMillis();
157     while (conIterator.hasNext()) {
158       final Con c = conIterator.next();
159       if (c.lastRecyle + TIME_OUT_FORCE_CLOSE < now) {
160         conIterator.remove();
161         closeConnectionNoEx(c.pooledCon);
162       }
163     }
164   }
165 
166   /**
167    * Thrown in when no free connection becomes available within
168    * {@code timeout} seconds.
169    */
170   private static class TimeoutException extends RuntimeException {
171     private static final long serialVersionUID = 1;
172 
173     public TimeoutException() {
174       super("Timeout while waiting for a free database connection.");
175     }
176   }
177 
178   /**
179    * Constructs a MiniConnectionPoolManager object with no timeout and no
180    * limit.
181    *
182    * @param dataSource the data source for the connections.
183    */
184   public DbConnectionPool(final ConnectionPoolDataSource dataSource) {
185     this(dataSource, 0, DbConstant.DELAYMAXCONNECTION);
186   }
187 
188   /**
189    * Constructs a MiniConnectionPoolManager object with no timeout and no
190    * limit.
191    *
192    * @param dataSource the data source for the connections.
193    * @param timer
194    * @param delay in ms period of time to check existing connections
195    *     and
196    *     limit to get a new connection
197    */
198   public DbConnectionPool(final ConnectionPoolDataSource dataSource,
199                           final Timer timer, final long delay) {
200     this(dataSource, 0, (int) (delay / 1000));
201     timer.schedule(new TimerTaskCheckConnections(timer, delay, this), delay);
202   }
203 
204   /**
205    * Constructs a MiniConnectionPoolManager object with a timeout of
206    * DbConstant.DELAYMAXCONNECTION seconds.
207    *
208    * @param dataSource the data source for the connections.
209    * @param maxConnections the maximum number of connections. 0 means
210    *     no
211    *     limit
212    */
213   public DbConnectionPool(final ConnectionPoolDataSource dataSource,
214                           final int maxConnections) {
215     this(dataSource, maxConnections, DbConstant.DELAYMAXCONNECTION);
216   }
217 
218   /**
219    * Constructs a ConnectionPool object.
220    *
221    * @param dataSource the data source for the connections.
222    * @param maxConnections the maximum number of connections. 0 means
223    *     no
224    *     limit
225    * @param timeout the maximum time in seconds to wait for a free
226    *     connection.
227    */
228   public DbConnectionPool(final ConnectionPoolDataSource dataSource,
229                           final int maxConnections, final int timeout) {
230     this.dataSource = dataSource;
231     this.maxConnections = maxConnections;
232     this.timeout = timeout;
233     if (maxConnections != 0) {
234       if (timeout <= 0) {
235         throw new IllegalArgumentException("Invalid timeout value.");
236       }
237       semaphore = new Semaphore(maxConnections, true);
238     }
239     recycledConnections = new ArrayDeque<Con>();
240     poolConnectionEventListener = new PoolConnectionEventListener(this);
241   }
242 
243   public synchronized void resetPoolDataSource(
244       final ConnectionPoolDataSource dataSource) {
245     this.dataSource = dataSource;
246   }
247 
248   /**
249    * @return the max number of connections
250    */
251   public final int getMaxConnections() {
252     return maxConnections;
253   }
254 
255   /**
256    * @return the Login Timeout in second
257    */
258   public final long getLoginTimeout() {
259     return timeout;
260   }
261 
262   /**
263    * @return the Force Close Timeout in ms
264    */
265   public final long getTimeoutForceClose() {
266     return TIME_OUT_FORCE_CLOSE;
267   }
268 
269   /**
270    * Closes all unused pooled connections.
271    *
272    * @throws SQLException //
273    */
274   public synchronized void dispose() throws SQLException {
275     if (isDisposed) {
276       return;
277     }
278     isDisposed = true;
279     SQLException e = null;
280     while (!recycledConnections.isEmpty()) {
281       final Con c = recycledConnections.remove();
282       final PooledConnection pconn = c.pooledCon;
283       try {
284         pconn.close();
285       } catch (final SQLException e2) {
286         if (e == null) {
287           e = e2;
288         }
289       }
290     }
291     if (e != null) {
292       throw e;
293     }
294   }
295 
296   /**
297    * Retrieves a connection from the connection pool. If
298    * {@code maxConnections} connections are already in
299    * use, the method waits until a connection becomes available or
300    * {@code timeout} seconds elapsed. When
301    * the application is finished using the connection, it must close it in
302    * order
303    * to return it to the pool.
304    *
305    * @return a new Connection object.
306    *
307    * @throws TimeoutException when no connection becomes available
308    *     within
309    *     {@code timeout} seconds.
310    * @throws SQLException //
311    */
312   public Connection getConnection() throws SQLException {
313     // This routine is unsynchronized, because semaphore.tryAcquire() may
314     // block.
315     synchronized (this) {
316       if (isDisposed) {
317         throw new IllegalStateException("Connection pool has been disposed.");
318       }
319     }
320     if (semaphore != null) {
321       try {
322         if (!semaphore.tryAcquire(timeout, TimeUnit.SECONDS)) {
323           throw new TimeoutException();
324         }
325       } catch (final InterruptedException e) {//NOSONAR
326         throw new RuntimeException(
327             "Interrupted while waiting for a database connection.", e);
328       }
329     }
330     boolean ok = false;
331     try {
332       final Connection conn = getConnection2();
333       ok = true;
334       return conn;
335     } finally {
336       if (semaphore != null && !ok) {
337         semaphore.release();
338       }
339     }
340   }
341 
342   private synchronized Connection getConnection2() throws SQLException {
343     if (isDisposed) {
344       throw new IllegalStateException(
345           "Connection pool has been disposed."); // test again with
346     }
347     // lock
348     final long time = System.currentTimeMillis() + timeout * 1000;
349     while (time <= System.currentTimeMillis()) {
350       final PooledConnection pconn;
351       if (!recycledConnections.isEmpty()) {
352         pconn = recycledConnections.remove().pooledCon;
353       } else {
354         pconn = dataSource.getPooledConnection();
355       }
356 
357       final Connection conn = pconn.getConnection();
358       if (conn.isValid(DbConstant.VALIDTESTDURATION)) {
359         activeConnections++;
360         pconn.addConnectionEventListener(poolConnectionEventListener);
361         assertInnerState();
362         return conn;
363       }
364     }
365 
366     throw new SQLException("Could not get a valid connection before timeout");
367   }
368 
369   private synchronized void recycleConnection(final PooledConnection pconn) {
370     if (isDisposed) {
371       disposeConnection(pconn);
372       return;
373     }
374     try {
375       if (!pconn.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
376         disposeConnection(pconn);
377         return;
378       }
379     } catch (final SQLException e) {
380       disposeConnection(pconn);
381       return;
382     }
383     if (activeConnections <= 0) {
384       throw new AssertionError();
385     }
386     activeConnections--;
387     if (semaphore != null) {
388       semaphore.release();
389     }
390     recycledConnections.add(new Con(pconn));
391     assertInnerState();
392   }
393 
394   private synchronized void disposeConnection(final PooledConnection pconn) {
395     if (activeConnections <= 0) {
396       throw new AssertionError();
397     }
398     activeConnections--;
399     if (semaphore != null) {
400       semaphore.release();
401     }
402     closeConnectionNoEx(pconn);
403     assertInnerState();
404   }
405 
406   private void closeConnectionNoEx(final PooledConnection pconn) {
407     try {
408       pconn.close();
409     } catch (final SQLException e) {
410       //
411     }
412   }
413 
414   private void assertInnerState() {
415     if (activeConnections < 0) {
416       throw new AssertionError();
417     }
418     if (semaphore != null) {
419       if (activeConnections + recycledConnections.size() > maxConnections) {
420         throw new AssertionError();
421       }
422       if (activeConnections + semaphore.availablePermits() > maxConnections) {
423         throw new AssertionError();
424       }
425     }
426   }
427 
428   private static class PoolConnectionEventListener
429       implements ConnectionEventListener {
430     private final DbConnectionPool pool;
431 
432     private PoolConnectionEventListener(final DbConnectionPool pool) {
433       this.pool = pool;
434     }
435 
436     @Override
437     public final void connectionClosed(final ConnectionEvent event) {
438       final PooledConnection pconn = (PooledConnection) event.getSource();
439       pconn.removeConnectionEventListener(this);
440       pool.recycleConnection(pconn);
441     }
442 
443     @Override
444     public final void connectionErrorOccurred(final ConnectionEvent event) {
445       final PooledConnection pconn = (PooledConnection) event.getSource();
446       pconn.removeConnectionEventListener(this);
447       pool.disposeConnection(pconn);
448     }
449   }
450 
451   /**
452    * Returns the number of active (open) connections of this pool. This is the
453    * number of {@code Connection}
454    * objects that have been issued by {@link #getConnection()} for which
455    * {@code Connection.close()} has not
456    * yet been called.
457    *
458    * @return the number of active connections.
459    */
460   public synchronized int getActiveConnections() {
461     return activeConnections;
462   }
463 }