3 * Copyright (C) 2010 Guillaume Cottenceau and MNC S.A.
5 * This file is part of sdbl4j, and is licensed under the Apache 2.0 license.
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.ConcurrentModificationException;
14 import java.util.HashMap;
15 import java.util.HashSet;
16 import java.util.LinkedList;
17 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.sql.Connection;
23 import java.sql.SQLException;
25 import org.apache.log4j.Logger;
28 * There are subtle interleaved synchronizations happening here.
29 * Use a traffic load generator with low preOpenedConnections for all modifications in here!
31 public class DBConnectionPool {
33 private static Logger log = Logger.getLogger( DBConnectionPool.class );
35 private static class BusyConnection {
36 public Connection conn;
37 public String backtrace;
40 public BusyConnection( Connection conn ) {
42 backtrace = DBUtils.backtrace( 3 );
43 tsTotal = tsLatest = System.currentTimeMillis();
45 public void updateRequester() {
46 backtrace = DBUtils.backtrace( 2 );
47 tsLatest = System.currentTimeMillis();
49 public String toString() {
50 List<String> us = new ArrayList<String>();
51 boolean users_list_ok = false;
53 while ( ! users_list_ok ) {
55 Set<Thread> users = supplementary_thread_busy_connection_users.get( conn );
56 if ( users != null ) {
57 for ( Thread u : users ) {
58 us.add( u.getName() );
62 } catch ( ConcurrentModificationException cme ) {
63 // There are subtle interleaved synchronizations happening here.
64 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
67 log.error( "3 attempts, abandon" );
73 return "{" + conn + ",oldnessTotal=" + ( System.currentTimeMillis() - tsTotal )
74 + ",oldnessLatest=" + ( System.currentTimeMillis() - tsLatest )
75 + ",supplementaryThreadUsers=" + DBUtils.join( ";", us ) + ",requester=\n" + backtrace + "}";
79 private static Map<String, ConnectionParameters> dbParameters = null;
80 private static Map<String, LinkedList<Connection>> available_connections;
81 private static Map<String, ConcurrentMap<Object, BusyConnection>> busy_connections;
82 private static Map<Connection, Set<Thread>> supplementary_thread_busy_connection_users;
85 * List of threads that are created in external libraries in that require a db connection.
86 * All connections from those threads MUST be manually released after use.
88 private static Set<String> supported_external_threads;
90 public static void init( Map<String, ConnectionParameters> parameters ) {
91 dbParameters = parameters;
92 available_connections = new HashMap<String, LinkedList<Connection>>();
93 busy_connections = new HashMap<String, ConcurrentMap<Object, BusyConnection>>();
94 for ( Map.Entry<String, ConnectionParameters> db : parameters.entrySet() ) {
95 log.info( "Initializing: " + db.getValue() );
96 if ( db.getValue().getPreOpenedConnections() > db.getValue().getMaxConnections() ) {
97 log.error( db.getKey() + ": pre opened connections > max connections, this is stupid!" );
99 if ( db.getValue().getAlertLevel() < db.getValue().getPreOpenedConnections() ) {
100 log.error( db.getKey() + ": alert level < pre opened connections, this is stupid!" );
102 LinkedList<Connection> connections = new LinkedList<Connection>();
103 available_connections.put( db.getKey(), connections );
104 for ( int i = 0; i < db.getValue().getPreOpenedConnections(); i++ ) {
105 connections.add( db.getValue().createConnection() );
107 busy_connections.put( db.getKey(), new ConcurrentHashMap<Object, BusyConnection>() );
108 if ( db.getValue().getMaxConnections() < 1 ) {
109 log.error( db.getKey() + ": max connections cannot be < 1" );
112 supplementary_thread_busy_connection_users = new HashMap<Connection, Set<Thread>>();
113 supported_external_threads = new HashSet<String>();
117 * Add an external thread name that requires a DB connection.<br/>
118 * All connections from this thread MUST be manually released after use.
121 public static void addSupportedExternalThread( String threadName ) {
122 if ( threadName.equals( "" ) ) {
125 supported_external_threads.add( threadName );
128 /** Check if the given thread name is a supported thread creating from an external library. */
129 public static boolean isSupportedExternalThread( String threadName ) {
130 for ( String name : supported_external_threads ) {
131 if ( threadName.startsWith( name ) ) {
138 public static Map<String, ConnectionParameters> getParameters() {
142 public static List<String> listBusyConnections() {
143 List<String> retval = new ArrayList<String>();
144 List<String> kinds = new ArrayList<String>( dbParameters.keySet() );
145 Collections.sort( kinds );
146 for ( String dbkind : kinds ) {
148 List<String> kind = new ArrayList<String>();
149 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
150 if ( busy.getKey() instanceof Thread ) {
151 // e.g. used by a request processing thread
152 kind.add( ( (Thread) busy.getKey() ).getName() + busy.getValue() );
154 // e.g. used by supplementary thread(s)
155 kind.add( ( (ThreadGroup) busy.getKey() ).getName() + busy.getValue() );
160 retval.add( dbkind + ": 0" );
162 retval.add( dbkind + ": " + count + " (" + DBUtils.join( ", ", kind ) + ")" );
169 * Get a database connection to use. Normally never called directly, only by auto-generated database access
172 * - in case the calling Thread is a request processing Thread:
174 * This connection will be associated to the calling Thread and reserved for it (one connection dedicated to
175 * one processing thread model).
177 * - in case the calling Thread is a supplementary Thread(1)
179 * This connection can be reserved or shared with other supplementary Threads depending on configuration
180 * (<code>connectionGroup</code> for asynchronous events, dedicated instance properties for other threads).
182 * (1) a supplementary Thread, e.g. not a request processing Thread
184 public static Connection getConnection( String dbkind ) {
185 // for request processing threads, we have one connection per thread; for supplementary threads, we
186 // have one connection per thread group; to optimize request processing, we first try to look if we
187 // have a busy thread associated with this thread, even if it's possibly not a request processing thread
188 ConcurrentMap<Object, BusyConnection> connections = busy_connections.get( dbkind );
189 if ( connections == null ) {
190 log.error( "DB kind '" + dbkind + "' doesn't exist" );
193 BusyConnection conn = connections.get( Thread.currentThread() ); // ConcurrentMap
194 if ( conn != null ) {
195 if ( log.isTraceEnabled() ) {
196 log.trace( dbkind + ": use busy connection - " + conn );
198 conn.updateRequester();
202 // supplementary threads are recognized by using a specific parent's thread group
203 if ( Thread.currentThread().getThreadGroup().getParent() == SupplementaryThreadHelper.parentThreadGroup ) {
204 return getConnectionForSupplementaryThread( dbkind );
208 if ( ! Thread.currentThread().getName().startsWith( "http" )
209 && ! Thread.currentThread().getName().startsWith( "pool" )
210 && ! Thread.currentThread().getName().equals( "main" )
211 && ! Thread.currentThread().getName().equals( "startStop" ) ) {
212 if ( isSupportedExternalThread( Thread.currentThread().getName() ) ) {
213 log.debug( "Thread identified as being externally created (external library): the DB " +
214 "connections allocated to non-request processing threads MUST be " +
215 "manually released after use" );
217 log.error( "Thread identified as request processing thread (parent of thread group is not "
218 + "SupplementaryThreadHelper.parentThreadGroup), but name doesn't start with http or pool or main or startStop!"
219 + " (thread name: " + Thread.currentThread().getName() + ")" );
222 return findOrCreateAvailableConnection( dbkind, Thread.currentThread() );
227 public static void supplementaryThreadReleaseConnections() {
228 if ( dbParameters == null ) {
229 log.trace( "Do nothing (hook for combining old style and new style DB accesses in acme)" );
232 log.trace( "Enter for group " + Thread.currentThread().getThreadGroup().getName() + ", thread "
233 + Thread.currentThread().getName() );
234 for ( String dbkind : dbParameters.keySet() ) {
235 // There are subtle interleaved synchronizations happening here.
236 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
237 synchronized( Thread.currentThread().getThreadGroup() ) {
238 BusyConnection conn = busy_connections.get( dbkind ).get( Thread.currentThread().getThreadGroup() );
239 if ( conn != null ) {
240 // at least one of the threads in this thread group use this connection, either that thread,
241 // others, or that thread and others; first, check if that thread is really using it:
242 // There are subtle interleaved synchronizations happening here.
243 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
244 synchronized( supplementary_thread_busy_connection_users ) {
245 Set<Thread> users = supplementary_thread_busy_connection_users.get( conn.conn );
246 if ( log.isTraceEnabled() ) {
247 List<String> us = new ArrayList<String>();
248 for ( Thread u : users ) {
249 us.add( u.getName() );
252 if ( users.remove( Thread.currentThread() ) ) {
253 // this thread was using this connection. now let's see if no other threads are.
254 if ( users.size() == 0 ) {
255 if ( log.isTraceEnabled() ) {
256 log.trace( "Group " + Thread.currentThread().getThreadGroup().getName()
257 + ", thread " + Thread.currentThread().getName()
258 + ", release connection " + conn + " on no more users" );
260 releaseConnection( dbkind, Thread.currentThread().getThreadGroup() );
261 supplementary_thread_busy_connection_users.remove( conn.conn );
270 private static void releaseConnection( String dbkind, Object key ) {
271 LinkedList<Connection> available = available_connections.get( dbkind );
272 Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
273 ConnectionParameters dbParams = dbParameters.get( dbkind );
275 BusyConnection conn = busy.get( key ); // ConcurrentMap
276 if ( conn == null ) {
280 if ( available == null ) {
281 log.error( "No available connections for " + key + " - already closed?" );
285 // There are subtle interleaved synchronizations happening here.
286 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
287 synchronized( available ) {
288 BusyConnection conn2 = busy.remove( key );
290 if ( conn2 == null || ! conn.conn.equals( conn2.conn ) ) {
291 log.error( "Internal error, removed connection " + conn2 + " not equals to getted connection "
296 if ( busy.size() + available.size() >= dbParams.getPreOpenedConnections() ) {
297 if ( log.isTraceEnabled() ) {
298 log.trace( "closing connection for " + dbkind + " - " + conn + " - "
299 + " busy:" + busy.size() + " available:" + available.size() + " busies:\n"
300 + DBUtils.join( "\n", listBusyConnections() ) );
301 } else if ( log.isDebugEnabled() ) {
302 log.debug( "closing connection for " + dbkind + " - " + conn + " - "
303 + " busy:" + busy.size() + " available:" + available.size() );
307 } catch ( SQLException se ) {
308 log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
312 available.addFirst( conn.conn );
319 /** Close all of the database connections. */
320 public static void closeConnections() {
321 log.info( "START - closing connections" );
322 if ( available_connections.size() == 0 ) {
323 log.error( "No available connections - already closed?" );
326 for ( String dbkind : dbParameters.keySet() ) {
327 // There are subtle interleaved synchronizations happening here.
328 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
329 synchronized( available_connections.get( dbkind ) ) {
330 for ( Connection conn : available_connections.get( dbkind ) ) {
333 } catch ( SQLException se ) {
334 log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
337 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
338 log.warn( "Still busy connection (leak?): "
339 + ( ( busy.getKey() instanceof Thread ) ? ( (Thread) busy.getKey() ).getName()
340 : ( (ThreadGroup) busy.getKey() ).getName() )
341 + " - " + busy.getValue() );
345 available_connections.clear();
346 log.info( "END - closing connections" );
349 private static Connection getConnectionForSupplementaryThread( String dbkind ) {
351 ThreadGroup group = Thread.currentThread().getThreadGroup();
353 // synchronize on group, because more than one thread may be requesting a connection at the same time,
354 // and we want no race condition to potentially populate busy_connections in that situation
355 // There are subtle interleaved synchronizations happening here.
356 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
357 synchronized( group ) {
358 // re-run the check for existing busy connections, on the proper key now
359 BusyConnection conn = busy_connections.get( dbkind ).get( group );
360 if ( conn != null ) {
361 if ( log.isTraceEnabled() ) {
362 log.trace( dbkind + ": use busy connection for " + group.getName() + " - " + conn );
364 // remember this thread is also using this connection
365 // There are subtle interleaved synchronizations happening here.
366 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
367 synchronized( supplementary_thread_busy_connection_users ) {
368 supplementary_thread_busy_connection_users.get( conn.conn ).add( Thread.currentThread() );
370 conn.updateRequester();
374 Connection c = findOrCreateAvailableConnection( dbkind, group );
375 // that thread is the first user of this connection
376 Set<Thread> users = new HashSet<Thread>();
377 users.add( Thread.currentThread() );
378 // There are subtle interleaved synchronizations happening here.
379 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
380 synchronized( supplementary_thread_busy_connection_users ) {
381 Set<Thread> previous_users = supplementary_thread_busy_connection_users.put( c, users );
383 if ( previous_users != null ) {
384 List<String> us = new ArrayList<String>();
385 for ( Thread u : previous_users ) {
386 us.add( u.getName() );
388 log.error( "Internal error (previous users should be empty): previous users: "
397 * Find a currently available connection, or create a new one if none is available and we have
398 * not yet reached the maximum.
400 private static Connection findOrCreateAvailableConnection( String dbkind, Object busyKey ) {
402 // find an available connection
403 LinkedList<Connection> available = available_connections.get( dbkind );
404 Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
405 ConnectionParameters dbParams = dbParameters.get( dbkind );
406 // There are subtle interleaved synchronizations happening here.
407 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
408 synchronized( available ) {
409 Connection conn = available.poll();
410 if ( conn == null ) {
411 if ( busy.size() < dbParams.getMaxConnections() ) {
412 conn = dbParams.createConnection();
413 if ( conn == null ) {
414 throw new RuntimeException( "Connection creation not available" );
416 log.debug( "created connection " + conn + ", since busy size is " + busy.size() );
420 long time_before = System.currentTimeMillis();
421 conn.createStatement().executeQuery( "SELECT 1" );
422 if ( log.isDebugEnabled() ) {
423 log.debug( ( System.currentTimeMillis() - time_before ) + " ms for: SELECT 1 [connection validation]" );
425 } catch ( SQLException se ) {
426 log.error( "Validation error for idle connection for " + dbkind + ": " + se
427 + ", will replace connection with a newly created one" );
430 } catch ( SQLException se2 ) {}
431 conn = dbParams.createConnection();
432 if ( conn == null ) {
433 throw new RuntimeException( "Connection creation not available" );
436 if ( log.isTraceEnabled() ) {
437 if ( busyKey instanceof ThreadGroup ) {
438 log.trace( dbkind + ": use available connection for "
439 + ( (ThreadGroup) busyKey ).getName() + " - " + conn );
441 log.trace( dbkind + ": use available connection - " + conn );
445 if ( conn != null ) {
446 busy.put( busyKey, new BusyConnection( conn ) );
447 if ( busy.size() + available.size() >= dbParams.getAlertLevel() ) {
448 log.warn( "Many connections for dbkind=" + dbkind + ": busy=" + busy.size()
449 + " max=" + dbParams.getMaxConnections() + ": "
450 + DBUtils.join( ", ", listBusyConnections() ) );
455 // TODO: switch to log.warn when we are reasonably confident with
456 log.error( "No available connections for dbkind=" + dbkind + ": busy=" + busy.size()
457 + " max=" + dbParams.getMaxConnections() + " busies=["
458 + DBUtils.join( ", ", listBusyConnections() ) + "], waiting...\n"
459 + DBUtils.backtrace() );
462 } catch ( InterruptedException ex ) {}
463 log.error( "Wait over" );
464 return findOrCreateAvailableConnection( dbkind, busyKey );
469 public static void releaseConnectionsForProcessingThread() {
470 for ( String dbkind : dbParameters.keySet() ) {
471 releaseConnection( dbkind, Thread.currentThread() );