3 * Copyright (C) 2010 Guillaume Cottenceau and MNC S.A.
5 * This file is part of sdbl4j, and is licensed under the Apache 2.0 license.
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.ConcurrentModificationException;
14 import java.util.HashMap;
15 import java.util.HashSet;
16 import java.util.LinkedList;
17 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.sql.Connection;
23 import java.sql.SQLException;
25 import org.apache.log4j.Logger;
28 * There are subtle interleaved synchronizations happening here.
29 * Use a traffic load generator with low preOpenedConnections for all modifications in here!
31 public class DBConnectionPool {
33 private static Logger log = Logger.getLogger( DBConnectionPool.class );
35 private static class BusyConnection {
36 public Connection conn;
37 public String backtrace;
40 public BusyConnection( Connection conn ) {
42 backtrace = DBUtils.backtrace( 3 );
43 tsTotal = tsLatest = System.currentTimeMillis();
45 public void updateRequester() {
46 backtrace = DBUtils.backtrace( 2 );
47 tsLatest = System.currentTimeMillis();
49 public String toString() {
50 List<String> us = new ArrayList<String>();
51 boolean users_list_ok = false;
53 while ( ! users_list_ok ) {
55 Set<Thread> users = supplementary_thread_busy_connection_users.get( conn );
56 if ( users != null ) {
57 for ( Thread u : users ) {
58 us.add( u.getName() );
62 } catch ( ConcurrentModificationException cme ) {
63 // There are subtle interleaved synchronizations happening here.
64 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
67 log.error( "3 attempts, abandon" );
73 return "{" + conn + ",oldnessTotal=" + ( System.currentTimeMillis() - tsTotal )
74 + ",oldnessLatest=" + ( System.currentTimeMillis() - tsLatest )
75 + ",supplementaryThreadUsers=" + DBUtils.join( ";", us ) + ",requester=\n" + backtrace + "}";
79 private static Map<String, ConnectionParameters> dbParameters = null;
80 private static Map<String, LinkedList<Connection>> available_connections;
81 private static Map<String, ConcurrentMap<Object, BusyConnection>> busy_connections;
82 private static Map<Connection, Set<Thread>> supplementary_thread_busy_connection_users;
85 * List of threads that are created in external libraries in that require a db connection.
86 * All connections from those threads MUST be manually released after use.
88 private static Set<String> supported_external_threads;
90 public static void init( Map<String, ConnectionParameters> parameters ) {
91 dbParameters = parameters;
92 available_connections = new HashMap<String, LinkedList<Connection>>();
93 busy_connections = new HashMap<String, ConcurrentMap<Object, BusyConnection>>();
94 for ( Map.Entry<String, ConnectionParameters> db : parameters.entrySet() ) {
95 log.info( "Initializing: " + db.getValue() );
96 if ( db.getValue().getPreOpenedConnections() > db.getValue().getMaxConnections() ) {
97 log.error( db.getKey() + ": pre opened connections > max connections, this is stupid!" );
99 if ( db.getValue().getAlertLevel() < db.getValue().getPreOpenedConnections() ) {
100 log.error( db.getKey() + ": alert level < pre opened connections, this is stupid!" );
102 LinkedList<Connection> connections = new LinkedList<Connection>();
103 available_connections.put( db.getKey(), connections );
104 for ( int i = 0; i < db.getValue().getPreOpenedConnections(); i++ ) {
105 connections.add( db.getValue().createConnection() );
107 busy_connections.put( db.getKey(), new ConcurrentHashMap<Object, BusyConnection>() );
108 if ( db.getValue().getMaxConnections() < 1 ) {
109 log.error( db.getKey() + ": max connections cannot be < 1" );
112 supplementary_thread_busy_connection_users = new HashMap<Connection, Set<Thread>>();
113 supported_external_threads = new HashSet<String>();
117 * Add an external thread name that requires a DB connection.<br/>
118 * All connections from this thread MUST be manually released after use.
121 public static void addSupportedExternalThread( String threadName ) {
122 if ( threadName.equals( "" ) ) {
125 supported_external_threads.add( threadName );
128 /** Check if the given thread name is a supported thread creating from an external library. */
129 public static boolean isSupportedExternalThread( String threadName ) {
130 for ( String name : supported_external_threads ) {
131 if ( threadName.startsWith( name ) ) {
138 public static Map<String, ConnectionParameters> getParameters() {
142 public static List<String> listBusyConnections() {
143 List<String> retval = new ArrayList<String>();
144 List<String> kinds = new ArrayList<String>( dbParameters.keySet() );
145 Collections.sort( kinds );
146 for ( String dbkind : kinds ) {
148 List<String> kind = new ArrayList<String>();
149 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
150 if ( busy.getKey() instanceof Thread ) {
151 // e.g. used by a request processing thread
152 kind.add( ( (Thread) busy.getKey() ).getName() + busy.getValue() );
154 // e.g. used by supplementary thread(s)
155 kind.add( ( (ThreadGroup) busy.getKey() ).getName() + busy.getValue() );
160 retval.add( dbkind + ": 0" );
162 retval.add( dbkind + ": " + count + " (" + DBUtils.join( ", ", kind ) + ")" );
169 * Get a database connection to use. Normally never called directly, only by auto-generated database access
172 * - in case the calling Thread is a request processing Thread:
174 * This connection will be associated to the calling Thread and reserved for it (one connection dedicated to
175 * one processing thread model).
177 * - in case the calling Thread is a supplementary Thread(1)
179 * This connection can be reserved or shared with other supplementary Threads depending on configuration
180 * (<code>connectionGroup</code> for asynchronous events, dedicated instance properties for other threads).
182 * (1) a supplementary Thread, e.g. not a request processing Thread, is thus:
183 * - any Thread invoking asynchronous events (such as Spooler, ElectionsStartStop, etc)
184 * - the Spooler sub threads used for MT retries
185 * - any {@link ThreadedElementsConsumer} uses, e.g. DB loggers, DLR processors, Notification Sender
186 * - the MessageSender threads
188 public static Connection getConnection( String dbkind ) {
189 // for request processing threads, we have one connection per thread; for supplementary threads, we
190 // have one connection per thread group; to optimize request processing, we first try to look if we
191 // have a busy thread associated with this thread, even if it's possibly not a request processing thread
192 ConcurrentMap<Object, BusyConnection> connections = busy_connections.get( dbkind );
193 if ( connections == null ) {
194 log.error( "DB kind '" + dbkind + "' doesn't exist" );
197 BusyConnection conn = connections.get( Thread.currentThread() ); // ConcurrentMap
198 if ( conn != null ) {
199 if ( log.isTraceEnabled() ) {
200 log.trace( dbkind + ": use busy connection - " + conn );
202 conn.updateRequester();
206 // supplementary threads are recognized by using a specific parent's thread group
207 if ( Thread.currentThread().getThreadGroup().getParent() == SupplementaryThreadHelper.parentThreadGroup ) {
208 return getConnectionForSupplementaryThread( dbkind );
212 if ( ! Thread.currentThread().getName().startsWith( "http" )
213 && ! Thread.currentThread().getName().startsWith( "pool" )
214 && ! Thread.currentThread().getName().equals( "main" )
215 && ! Thread.currentThread().getName().equals( "startStop" ) ) {
216 if ( isSupportedExternalThread( Thread.currentThread().getName() ) ) {
217 log.debug( "Thread identified as being externally created (external library): the DB " +
218 "connections allocated to non-request processing threads MUST be " +
219 "manually released after use" );
221 log.error( "Thread identified as request processing thread (parent of thread group is not "
222 + "SupplementaryThreadHelper.parentThreadGroup), but name doesn't start with http or pool or main or startStop!"
223 + " (thread name: " + Thread.currentThread().getName() + ")" );
226 return findOrCreateAvailableConnection( dbkind, Thread.currentThread() );
231 public static void supplementaryThreadReleaseConnections() {
232 if ( dbParameters == null ) {
233 log.trace( "Do nothing (hook for combining old style and new style DB accesses in acme)" );
236 log.trace( "Enter for group " + Thread.currentThread().getThreadGroup().getName() + ", thread "
237 + Thread.currentThread().getName() );
238 for ( String dbkind : dbParameters.keySet() ) {
239 // There are subtle interleaved synchronizations happening here.
240 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
241 synchronized( Thread.currentThread().getThreadGroup() ) {
242 BusyConnection conn = busy_connections.get( dbkind ).get( Thread.currentThread().getThreadGroup() );
243 if ( conn != null ) {
244 // at least one of the threads in this thread group use this connection, either that thread,
245 // others, or that thread and others; first, check if that thread is really using it:
246 // There are subtle interleaved synchronizations happening here.
247 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
248 synchronized( supplementary_thread_busy_connection_users ) {
249 Set<Thread> users = supplementary_thread_busy_connection_users.get( conn.conn );
250 if ( log.isTraceEnabled() ) {
251 List<String> us = new ArrayList<String>();
252 for ( Thread u : users ) {
253 us.add( u.getName() );
256 if ( users.remove( Thread.currentThread() ) ) {
257 // this thread was using this connection. now let's see if no other threads are.
258 if ( users.size() == 0 ) {
259 if ( log.isTraceEnabled() ) {
260 log.trace( "Group " + Thread.currentThread().getThreadGroup().getName()
261 + ", thread " + Thread.currentThread().getName()
262 + ", release connection " + conn + " on no more users" );
264 releaseConnection( dbkind, Thread.currentThread().getThreadGroup() );
265 supplementary_thread_busy_connection_users.remove( conn.conn );
274 private static void releaseConnection( String dbkind, Object key ) {
275 LinkedList<Connection> available = available_connections.get( dbkind );
276 Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
277 ConnectionParameters dbParams = dbParameters.get( dbkind );
279 BusyConnection conn = busy.get( key ); // ConcurrentMap
280 if ( conn == null ) {
284 if ( available == null ) {
285 log.error( "No available connections for " + key + " - already closed?" );
289 // There are subtle interleaved synchronizations happening here.
290 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
291 synchronized( available ) {
292 BusyConnection conn2 = busy.remove( key );
294 if ( conn2 == null || ! conn.conn.equals( conn2.conn ) ) {
295 log.error( "Internal error, removed connection " + conn2 + " not equals to getted connection "
300 if ( busy.size() + available.size() >= dbParams.getPreOpenedConnections() ) {
301 if ( log.isTraceEnabled() ) {
302 log.trace( "closing connection for " + dbkind + " - " + conn + " - "
303 + " busy:" + busy.size() + " available:" + available.size() + " busies:\n"
304 + DBUtils.join( "\n", listBusyConnections() ) );
305 } else if ( log.isDebugEnabled() ) {
306 log.debug( "closing connection for " + dbkind + " - " + conn + " - "
307 + " busy:" + busy.size() + " available:" + available.size() );
311 } catch ( SQLException se ) {
312 log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
316 available.addFirst( conn.conn );
323 /** Close all of the database connections. */
324 public static void closeConnections() {
325 log.info( "START - closing connections" );
326 if ( available_connections.size() == 0 ) {
327 log.error( "No available connections - already closed?" );
330 for ( String dbkind : dbParameters.keySet() ) {
331 // There are subtle interleaved synchronizations happening here.
332 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
333 synchronized( available_connections.get( dbkind ) ) {
334 for ( Connection conn : available_connections.get( dbkind ) ) {
337 } catch ( SQLException se ) {
338 log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
341 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
342 log.warn( "Still busy connection (leak?): "
343 + ( ( busy.getKey() instanceof Thread ) ? ( (Thread) busy.getKey() ).getName()
344 : ( (ThreadGroup) busy.getKey() ).getName() )
345 + " - " + busy.getValue() );
349 available_connections.clear();
350 log.info( "END - closing connections" );
353 private static Connection getConnectionForSupplementaryThread( String dbkind ) {
355 ThreadGroup group = Thread.currentThread().getThreadGroup();
357 // synchronize on group, because more than one thread may be requesting a connection at the same time,
358 // and we want no race condition to potentially populate busy_connections in that situation
359 // There are subtle interleaved synchronizations happening here.
360 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
361 synchronized( group ) {
362 // re-run the check for existing busy connections, on the proper key now
363 BusyConnection conn = busy_connections.get( dbkind ).get( group );
364 if ( conn != null ) {
365 if ( log.isTraceEnabled() ) {
366 log.trace( dbkind + ": use busy connection for " + group.getName() + " - " + conn );
368 // remember this thread is also using this connection
369 // There are subtle interleaved synchronizations happening here.
370 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
371 synchronized( supplementary_thread_busy_connection_users ) {
372 supplementary_thread_busy_connection_users.get( conn.conn ).add( Thread.currentThread() );
374 conn.updateRequester();
378 Connection c = findOrCreateAvailableConnection( dbkind, group );
379 // that thread is the first user of this connection
380 Set<Thread> users = new HashSet<Thread>();
381 users.add( Thread.currentThread() );
382 // There are subtle interleaved synchronizations happening here.
383 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
384 synchronized( supplementary_thread_busy_connection_users ) {
385 Set<Thread> previous_users = supplementary_thread_busy_connection_users.put( c, users );
387 if ( previous_users != null ) {
388 List<String> us = new ArrayList<String>();
389 for ( Thread u : previous_users ) {
390 us.add( u.getName() );
392 log.error( "Internal error (previous users should be empty): previous users: "
401 * Find a currently available connection, or create a new one if none is available and we have
402 * not yet reached the maximum.
404 private static Connection findOrCreateAvailableConnection( String dbkind, Object busyKey ) {
406 // find an available connection
407 LinkedList<Connection> available = available_connections.get( dbkind );
408 Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
409 ConnectionParameters dbParams = dbParameters.get( dbkind );
410 // There are subtle interleaved synchronizations happening here.
411 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
412 synchronized( available ) {
413 Connection conn = available.poll();
414 if ( conn == null ) {
415 if ( busy.size() < dbParams.getMaxConnections() ) {
416 conn = dbParams.createConnection();
417 if ( conn == null ) {
418 throw new RuntimeException( "Connection creation not available" );
420 log.debug( "created connection " + conn + ", since busy size is " + busy.size() );
424 long time_before = System.currentTimeMillis();
425 conn.createStatement().executeQuery( "SELECT 1" );
426 if ( log.isDebugEnabled() ) {
427 log.debug( ( System.currentTimeMillis() - time_before ) + " ms for: SELECT 1 [connection validation]" );
429 } catch ( SQLException se ) {
430 log.error( "Validation error for idle connection for " + dbkind + ": " + se
431 + ", will replace connection with a newly created one" );
434 } catch ( SQLException se2 ) {}
435 conn = dbParams.createConnection();
436 if ( conn == null ) {
437 throw new RuntimeException( "Connection creation not available" );
440 if ( log.isTraceEnabled() ) {
441 if ( busyKey instanceof ThreadGroup ) {
442 log.trace( dbkind + ": use available connection for "
443 + ( (ThreadGroup) busyKey ).getName() + " - " + conn );
445 log.trace( dbkind + ": use available connection - " + conn );
449 if ( conn != null ) {
450 busy.put( busyKey, new BusyConnection( conn ) );
451 if ( busy.size() + available.size() >= dbParams.getAlertLevel() ) {
452 log.warn( "Many connections for dbkind=" + dbkind + ": busy=" + busy.size()
453 + " max=" + dbParams.getMaxConnections() + ": "
454 + DBUtils.join( ", ", listBusyConnections() ) );
459 // TODO: switch to log.warn when we are reasonably confident with
460 log.error( "No available connections for dbkind=" + dbkind + ": busy=" + busy.size()
461 + " max=" + dbParams.getMaxConnections() + " busies=["
462 + DBUtils.join( ", ", listBusyConnections() ) + "], waiting...\n"
463 + DBUtils.backtrace() );
466 } catch ( InterruptedException ex ) {}
467 log.error( "Wait over" );
468 return findOrCreateAvailableConnection( dbkind, busyKey );
473 public static void releaseConnectionsForProcessingThread() {
474 for ( String dbkind : dbParameters.keySet() ) {
475 releaseConnection( dbkind, Thread.currentThread() );