1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.common.database;
21
22 import javax.sql.ConnectionEvent;
23 import javax.sql.ConnectionEventListener;
24 import javax.sql.ConnectionPoolDataSource;
25 import javax.sql.PooledConnection;
26 import java.sql.Connection;
27 import java.sql.SQLException;
28 import java.util.ArrayDeque;
29 import java.util.Iterator;
30 import java.util.Queue;
31 import java.util.Timer;
32 import java.util.TimerTask;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.TimeUnit;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class DbConnectionPool {
52 private ConnectionPoolDataSource dataSource;
53
54 private final int maxConnections;
55
56 private final int timeout;
57
58 private static final long TIME_OUT_FORCE_CLOSE = 300000;
59
60 private Semaphore semaphore;
61
62 private final Queue<Con> recycledConnections;
63
64 private int activeConnections;
65
66 private final PoolConnectionEventListener poolConnectionEventListener;
67
68 private boolean isDisposed;
69
70 private static class Con {
71 final PooledConnection pooledCon;
72
73 final long lastRecyle;
74
75 private Con(final PooledConnection pooledCon) {
76 this.pooledCon = pooledCon;
77 lastRecyle = System.currentTimeMillis();
78 }
79
80 @Override
81 public final boolean equals(final Object o) {
82 if (this == o) {
83 return true;
84 }
85 if (!(o instanceof Con)) {
86 return false;
87 }
88
89 final Con con = (Con) o;
90
91 return pooledCon.equals(con.pooledCon);
92 }
93
94 @Override
95 public final int hashCode() {
96 return pooledCon.hashCode();
97 }
98 }
99
100
101
102
103 private static class TimerTaskCheckConnections extends TimerTask {
104 DbConnectionPool pool;
105 Timer timer;
106 long delay;
107
108
109
110
111
112
113 private TimerTaskCheckConnections(final Timer timer, final long delay,
114 final DbConnectionPool pool) {
115 if (pool == null || timer == null || delay < 1000) {
116 throw new IllegalArgumentException(
117 "Invalid values. Need pool, timer and delay >= 1000");
118 }
119 this.pool = pool;
120 this.timer = timer;
121 this.delay = delay;
122 }
123
124 @Override
125 public void run() {
126 final Iterator<Con> conIterator = pool.recycledConnections.iterator();
127 final long now = System.currentTimeMillis();
128 while (conIterator.hasNext()) {
129 final Con c = conIterator.next();
130 if (c.lastRecyle + TIME_OUT_FORCE_CLOSE < now) {
131 conIterator.remove();
132 pool.closeConnectionNoEx(c.pooledCon);
133 } else {
134 try {
135 if (!c.pooledCon.getConnection()
136 .isValid(DbConstant.VALIDTESTDURATION)) {
137 conIterator.remove();
138 pool.closeConnectionNoEx(c.pooledCon);
139 }
140 } catch (final SQLException e) {
141 conIterator.remove();
142 pool.closeConnectionNoEx(c.pooledCon);
143 }
144 }
145 }
146 timer.schedule(this, delay);
147 }
148
149 }
150
151
152
153
154 public synchronized void freeIdleConnections() {
155 final Iterator<Con> conIterator = recycledConnections.iterator();
156 final long now = System.currentTimeMillis();
157 while (conIterator.hasNext()) {
158 final Con c = conIterator.next();
159 if (c.lastRecyle + TIME_OUT_FORCE_CLOSE < now) {
160 conIterator.remove();
161 closeConnectionNoEx(c.pooledCon);
162 }
163 }
164 }
165
166
167
168
169
170 private static class TimeoutException extends RuntimeException {
171 private static final long serialVersionUID = 1;
172
173 public TimeoutException() {
174 super("Timeout while waiting for a free database connection.");
175 }
176 }
177
178
179
180
181
182
183
184 public DbConnectionPool(final ConnectionPoolDataSource dataSource) {
185 this(dataSource, 0, DbConstant.DELAYMAXCONNECTION);
186 }
187
188
189
190
191
192
193
194
195
196
197
198 public DbConnectionPool(final ConnectionPoolDataSource dataSource,
199 final Timer timer, final long delay) {
200 this(dataSource, 0, (int) (delay / 1000));
201 timer.schedule(new TimerTaskCheckConnections(timer, delay, this), delay);
202 }
203
204
205
206
207
208
209
210
211
212
213 public DbConnectionPool(final ConnectionPoolDataSource dataSource,
214 final int maxConnections) {
215 this(dataSource, maxConnections, DbConstant.DELAYMAXCONNECTION);
216 }
217
218
219
220
221
222
223
224
225
226
227
228 public DbConnectionPool(final ConnectionPoolDataSource dataSource,
229 final int maxConnections, final int timeout) {
230 this.dataSource = dataSource;
231 this.maxConnections = maxConnections;
232 this.timeout = timeout;
233 if (maxConnections != 0) {
234 if (timeout <= 0) {
235 throw new IllegalArgumentException("Invalid timeout value.");
236 }
237 semaphore = new Semaphore(maxConnections, true);
238 }
239 recycledConnections = new ArrayDeque<Con>();
240 poolConnectionEventListener = new PoolConnectionEventListener(this);
241 }
242
243 public synchronized void resetPoolDataSource(
244 final ConnectionPoolDataSource dataSource) {
245 this.dataSource = dataSource;
246 }
247
248
249
250
251 public final int getMaxConnections() {
252 return maxConnections;
253 }
254
255
256
257
258 public final long getLoginTimeout() {
259 return timeout;
260 }
261
262
263
264
265 public final long getTimeoutForceClose() {
266 return TIME_OUT_FORCE_CLOSE;
267 }
268
269
270
271
272
273
274 public synchronized void dispose() throws SQLException {
275 if (isDisposed) {
276 return;
277 }
278 isDisposed = true;
279 SQLException e = null;
280 while (!recycledConnections.isEmpty()) {
281 final Con c = recycledConnections.remove();
282 final PooledConnection pconn = c.pooledCon;
283 try {
284 pconn.close();
285 } catch (final SQLException e2) {
286 if (e == null) {
287 e = e2;
288 }
289 }
290 }
291 if (e != null) {
292 throw e;
293 }
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 public Connection getConnection() throws SQLException {
313
314
315 synchronized (this) {
316 if (isDisposed) {
317 throw new IllegalStateException("Connection pool has been disposed.");
318 }
319 }
320 if (semaphore != null) {
321 try {
322 if (!semaphore.tryAcquire(timeout, TimeUnit.SECONDS)) {
323 throw new TimeoutException();
324 }
325 } catch (final InterruptedException e) {
326 throw new RuntimeException(
327 "Interrupted while waiting for a database connection.", e);
328 }
329 }
330 boolean ok = false;
331 try {
332 final Connection conn = getConnection2();
333 ok = true;
334 return conn;
335 } finally {
336 if (semaphore != null && !ok) {
337 semaphore.release();
338 }
339 }
340 }
341
342 private synchronized Connection getConnection2() throws SQLException {
343 if (isDisposed) {
344 throw new IllegalStateException(
345 "Connection pool has been disposed.");
346 }
347
348 final long time = System.currentTimeMillis() + timeout * 1000;
349 while (time <= System.currentTimeMillis()) {
350 final PooledConnection pconn;
351 if (!recycledConnections.isEmpty()) {
352 pconn = recycledConnections.remove().pooledCon;
353 } else {
354 pconn = dataSource.getPooledConnection();
355 }
356
357 final Connection conn = pconn.getConnection();
358 if (conn.isValid(DbConstant.VALIDTESTDURATION)) {
359 activeConnections++;
360 pconn.addConnectionEventListener(poolConnectionEventListener);
361 assertInnerState();
362 return conn;
363 }
364 }
365
366 throw new SQLException("Could not get a valid connection before timeout");
367 }
368
369 private synchronized void recycleConnection(final PooledConnection pconn) {
370 if (isDisposed) {
371 disposeConnection(pconn);
372 return;
373 }
374 try {
375 if (!pconn.getConnection().isValid(DbConstant.VALIDTESTDURATION)) {
376 disposeConnection(pconn);
377 return;
378 }
379 } catch (final SQLException e) {
380 disposeConnection(pconn);
381 return;
382 }
383 if (activeConnections <= 0) {
384 throw new AssertionError();
385 }
386 activeConnections--;
387 if (semaphore != null) {
388 semaphore.release();
389 }
390 recycledConnections.add(new Con(pconn));
391 assertInnerState();
392 }
393
394 private synchronized void disposeConnection(final PooledConnection pconn) {
395 if (activeConnections <= 0) {
396 throw new AssertionError();
397 }
398 activeConnections--;
399 if (semaphore != null) {
400 semaphore.release();
401 }
402 closeConnectionNoEx(pconn);
403 assertInnerState();
404 }
405
406 private void closeConnectionNoEx(final PooledConnection pconn) {
407 try {
408 pconn.close();
409 } catch (final SQLException e) {
410
411 }
412 }
413
414 private void assertInnerState() {
415 if (activeConnections < 0) {
416 throw new AssertionError();
417 }
418 if (semaphore != null) {
419 if (activeConnections + recycledConnections.size() > maxConnections) {
420 throw new AssertionError();
421 }
422 if (activeConnections + semaphore.availablePermits() > maxConnections) {
423 throw new AssertionError();
424 }
425 }
426 }
427
428 private static class PoolConnectionEventListener
429 implements ConnectionEventListener {
430 private final DbConnectionPool pool;
431
432 private PoolConnectionEventListener(final DbConnectionPool pool) {
433 this.pool = pool;
434 }
435
436 @Override
437 public final void connectionClosed(final ConnectionEvent event) {
438 final PooledConnection pconn = (PooledConnection) event.getSource();
439 pconn.removeConnectionEventListener(this);
440 pool.recycleConnection(pconn);
441 }
442
443 @Override
444 public final void connectionErrorOccurred(final ConnectionEvent event) {
445 final PooledConnection pconn = (PooledConnection) event.getSource();
446 pconn.removeConnectionEventListener(this);
447 pool.disposeConnection(pconn);
448 }
449 }
450
451
452
453
454
455
456
457
458
459
460 public synchronized int getActiveConnections() {
461 return activeConnections;
462 }
463 }