af0223c95756a973db89edde32657fbe84c9aa46
[sdbl4j] / src / java / org / gc / sdbl4j / DBConnectionPool.java
1 /*
2  *
3  * Copyright (C) 2010 Guillaume Cottenceau and MNC S.A.
4  *
5  * This file is part of sdbl4j, and is licensed under the Apache 2.0 license.
6  *
7  */
8
9 package org.gc.sdbl4j;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.ConcurrentModificationException;
14 import java.util.HashMap;
15 import java.util.HashSet;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.sql.Connection;
23 import java.sql.SQLException;
24
25 import org.apache.log4j.Logger;
26
27 /**
28  * There are subtle interleaved synchronizations happening here.
29  * Use a traffic load generator with low preOpenedConnections for all modifications in here!
30  */
31 public class DBConnectionPool {
32
33     private static Logger log = Logger.getLogger( DBConnectionPool.class ); 
34
35     private static class BusyConnection {
36         public Connection conn;
37         public String backtrace;
38         public long tsTotal;
39         public long tsLatest;
40         public BusyConnection( Connection conn ) {
41             this.conn = conn;
42             backtrace = DBUtils.backtrace( 3 );
43             tsTotal = tsLatest = System.currentTimeMillis();
44         }
45         public void updateRequester() {
46             backtrace = DBUtils.backtrace( 2 );
47             tsLatest = System.currentTimeMillis();
48         }
49         public String toString() {
50             List<String> us = new ArrayList<String>();
51             boolean users_list_ok = false;
52             int attempts = 0;
53             while ( ! users_list_ok ) {
54                 try {
55                     Set<Thread> users = supplementary_thread_busy_connection_users.get( conn );
56                     if ( users != null ) {
57                         for ( Thread u : users ) {
58                             us.add( u.getName() );
59                         }
60                     }
61                     users_list_ok = true;
62                 } catch ( ConcurrentModificationException cme ) {
63                     // There are subtle interleaved synchronizations happening here.
64                     // Use a traffic load generator with low preOpenedConnections for all modifications in here!
65                     attempts++;
66                     if ( attempts > 3 ) {
67                         log.error( "3 attempts, abandon" );
68                         break;
69                     }
70                     us.clear();
71                 }
72             }
73             return "{" + conn + ",oldnessTotal=" + ( System.currentTimeMillis() - tsTotal )
74                    + ",oldnessLatest=" + ( System.currentTimeMillis() - tsLatest )
75                    + ",supplementaryThreadUsers=" + DBUtils.join( ";", us ) + ",requester=\n" + backtrace + "}";
76         }
77     }
78     
79     private static Map<String, ConnectionParameters> dbParameters = null;
80     private static Map<String, LinkedList<Connection>> available_connections;
81     private static Map<String, ConcurrentMap<Object, BusyConnection>> busy_connections;
82     private static Map<Connection, Set<Thread>> supplementary_thread_busy_connection_users;
83     
84     /** 
85      * List of threads that are created in external libraries in that require a db connection. 
86      * All connections from those threads MUST be manually released after use.
87      * */
88     private static Set<String> supported_external_threads;
89
90     public static void init( Map<String, ConnectionParameters> parameters ) {
91         dbParameters = parameters;
92         available_connections = new HashMap<String, LinkedList<Connection>>();
93         busy_connections = new HashMap<String, ConcurrentMap<Object, BusyConnection>>();
94         for ( Map.Entry<String, ConnectionParameters> db : parameters.entrySet() ) {
95             log.info( "Initializing: " + db.getValue() );
96             if ( db.getValue().getPreOpenedConnections() > db.getValue().getMaxConnections() ) {
97                 log.error( db.getKey() + ": pre opened connections > max connections, this is stupid!" );
98             }
99             if ( db.getValue().getAlertLevel() < db.getValue().getPreOpenedConnections() ) {
100                 log.error( db.getKey() + ": alert level < pre opened connections, this is stupid!" );
101             }
102             LinkedList<Connection> connections = new LinkedList<Connection>();
103             available_connections.put( db.getKey(), connections );
104             for ( int i = 0; i < db.getValue().getPreOpenedConnections(); i++ ) {
105                 connections.add( db.getValue().createConnection() );
106             }
107             busy_connections.put( db.getKey(), new ConcurrentHashMap<Object, BusyConnection>() );
108             if ( db.getValue().getMaxConnections() < 1 ) {
109                 log.error( db.getKey() + ": max connections cannot be < 1" );
110             }
111         }
112         supplementary_thread_busy_connection_users = new HashMap<Connection, Set<Thread>>();
113         supported_external_threads = new HashSet<String>();
114     }
115
116     /**
117      * Add an external thread name that requires a DB connection.<br/>
118      * All connections from this thread MUST be manually released after use.
119      * @param threadName
120      */
121     public static void addSupportedExternalThread( String threadName ) {
122         if ( threadName.equals( "" ) ) {
123             return;
124         }
125         supported_external_threads.add( threadName );
126     }
127     
128     /** Check if the given thread name is a supported thread creating from an external library. */
129     public static boolean isSupportedExternalThread( String threadName ) {
130         for ( String name : supported_external_threads ) {
131             if ( threadName.startsWith( name ) ) {
132                 return true;
133             }
134         }
135         return false;
136     }
137     
138     public static Map<String, ConnectionParameters> getParameters() {
139         return dbParameters;
140     }
141     
142     public static List<String> listBusyConnections() {
143         List<String> retval = new ArrayList<String>();
144         List<String> kinds = new ArrayList<String>( dbParameters.keySet() );
145         Collections.sort( kinds );
146         for ( String dbkind : kinds ) {
147             int count = 0;
148             List<String> kind = new ArrayList<String>();
149             for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
150                 if ( busy.getKey() instanceof Thread ) {
151                     // e.g. used by a request processing thread
152                     kind.add( ( (Thread) busy.getKey() ).getName() + busy.getValue() );
153                 } else {
154                     // e.g. used by supplementary thread(s)
155                     kind.add( ( (ThreadGroup) busy.getKey() ).getName() + busy.getValue() );
156                 }
157                 count++;
158             }
159             if ( count == 0 ) {
160                 retval.add( dbkind + ": 0" );
161             } else {
162                 retval.add( dbkind + ": " + count + " (" + DBUtils.join( ", ", kind ) + ")" );
163             }
164         }
165         return retval;
166     }
167     
168     /**
169      * Get a database connection to use. Normally never called directly, only by auto-generated database access
170      * code.
171      * 
172      * - in case the calling Thread is a request processing Thread:
173      * 
174      *   This connection will be associated to the calling Thread and reserved for it (one connection dedicated to
175      *   one processing thread model).
176      * 
177      * - in case the calling Thread is a supplementary Thread(1)
178      * 
179      *   This connection can be reserved or shared with other supplementary Threads depending on configuration
180      *   (<code>connectionGroup</code> for asynchronous events, dedicated instance properties for other threads).
181      *   
182      * (1) a supplementary Thread, e.g. not a request processing Thread, is thus:
183      *     - any Thread invoking asynchronous events (such as Spooler, ElectionsStartStop, etc)
184      *     - the Spooler sub threads used for MT retries
185      *     - any {@link ThreadedElementsConsumer} uses, e.g. DB loggers, DLR processors, Notification Sender
186      *     - the MessageSender threads
187      */ 
188     public static Connection getConnection( String dbkind ) {
189         // for request processing threads, we have one connection per thread; for supplementary threads, we
190         // have one connection per thread group; to optimize request processing, we first try to look if we
191         // have a busy thread associated with this thread, even if it's possibly not a request processing thread
192         ConcurrentMap<Object, BusyConnection> connections = busy_connections.get( dbkind );
193         if ( connections == null ) {
194             log.error( "DB kind '" + dbkind + "' doesn't exist" );
195             return null;
196         }
197         BusyConnection conn = connections.get( Thread.currentThread() );  // ConcurrentMap
198         if ( conn != null ) {
199             if ( log.isTraceEnabled() ) {
200                 log.trace( dbkind + ": use busy connection - " + conn );
201             }
202             conn.updateRequester();
203             return conn.conn;
204         }
205         
206         // supplementary threads are recognized by using a specific parent's thread group
207         if ( Thread.currentThread().getThreadGroup().getParent() == SupplementaryThreadHelper.parentThreadGroup ) {
208             return getConnectionForSupplementaryThread( dbkind );
209             
210         } else {
211             // sanity
212             if ( ! Thread.currentThread().getName().startsWith( "http" )
213                  && ! Thread.currentThread().getName().startsWith( "pool" )
214                  && ! Thread.currentThread().getName().equals( "main" )
215                  && ! Thread.currentThread().getName().equals( "startStop" ) ) {
216                 if ( isSupportedExternalThread( Thread.currentThread().getName() ) ) {
217                     log.debug( "Thread identified as being externally created (external library): the DB " +
218                                "connections allocated to non-request processing threads MUST be " +
219                                "manually released after use" );
220                 } else {
221                     log.error( "Thread identified as request processing thread (parent of thread group is not "
222                                + "SupplementaryThreadHelper.parentThreadGroup), but name doesn't start with http or pool or main or startStop!"
223                                + " (thread name: " + Thread.currentThread().getName() + ")" );
224                 }
225             }
226             return findOrCreateAvailableConnection( dbkind, Thread.currentThread() );
227         }
228
229     }
230
231     public static void supplementaryThreadReleaseConnections() {
232         if ( dbParameters == null ) {
233             log.trace( "Do nothing (hook for combining old style and new style DB accesses in acme)" );
234             return;
235         }
236         log.trace( "Enter for group " + Thread.currentThread().getThreadGroup().getName() + ", thread "
237                    + Thread.currentThread().getName() );
238         for ( String dbkind : dbParameters.keySet() ) {
239             // There are subtle interleaved synchronizations happening here.
240             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
241             synchronized( Thread.currentThread().getThreadGroup() ) {
242                 BusyConnection conn = busy_connections.get( dbkind ).get( Thread.currentThread().getThreadGroup() );
243                 if ( conn != null ) {
244                     // at least one of the threads in this thread group use this connection, either that thread, 
245                     // others, or that thread and others; first, check if that thread is really using it:
246                     // There are subtle interleaved synchronizations happening here.
247                     // Use a traffic load generator with low preOpenedConnections for all modifications in here!
248                     synchronized( supplementary_thread_busy_connection_users ) {
249                         Set<Thread> users = supplementary_thread_busy_connection_users.get( conn.conn );
250                         if ( log.isTraceEnabled() ) {
251                             List<String> us = new ArrayList<String>();
252                             for ( Thread u : users ) {
253                                 us.add( u.getName() );
254                             }
255                         }
256                         if ( users.remove( Thread.currentThread() ) ) {
257                             // this thread was using this connection. now let's see if no other threads are.
258                             if ( users.size() == 0 ) {
259                                 if ( log.isTraceEnabled() ) {
260                                     log.trace( "Group " + Thread.currentThread().getThreadGroup().getName()
261                                                + ", thread " + Thread.currentThread().getName()
262                                                + ", release connection " + conn + " on no more users" );
263                                 }
264                                 releaseConnection( dbkind, Thread.currentThread().getThreadGroup() );
265                                 supplementary_thread_busy_connection_users.remove( conn.conn );
266                             }
267                         }
268                     }
269                 }
270             }
271         }
272     }
273     
274     private static void releaseConnection( String dbkind, Object key ) {
275         LinkedList<Connection> available = available_connections.get( dbkind );
276         Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
277         ConnectionParameters dbParams = dbParameters.get( dbkind );
278
279         BusyConnection conn = busy.get( key );  // ConcurrentMap
280         if ( conn == null ) {
281             return;
282         }
283
284         if ( available == null ) {
285             log.error( "No available connections for " + key + " - already closed?" );
286             return;
287         }
288         
289         // There are subtle interleaved synchronizations happening here.
290         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
291         synchronized( available ) {
292             BusyConnection conn2 = busy.remove( key );
293             // sanity
294             if ( conn2 == null || ! conn.conn.equals( conn2.conn ) ) {
295                 log.error( "Internal error, removed connection " + conn2 + " not equals to getted connection "
296                            + conn );
297                 return;
298             }
299
300             if ( busy.size() + available.size() >= dbParams.getPreOpenedConnections() ) {
301                 if ( log.isTraceEnabled() ) {
302                     log.trace( "closing connection for " + dbkind + " - " + conn + " - "
303                                + " busy:" + busy.size() + " available:" + available.size() + " busies:\n"
304                                + DBUtils.join( "\n", listBusyConnections() ) );
305                 } else if ( log.isDebugEnabled() ) {
306                     log.debug( "closing connection for " + dbkind + " - " + conn + " - "
307                                + " busy:" + busy.size() + " available:" + available.size() );
308                 }
309                 try {
310                     conn.conn.close();
311                 } catch ( SQLException se ) {
312                     log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
313                 }
314
315             } else {
316                 available.addFirst( conn.conn );
317             }
318
319             available.notify();
320         }
321     }
322             
323     /** Close all of the database connections. */
324     public static void closeConnections() {
325         log.info( "START - closing connections" );
326         if ( available_connections.size() == 0 ) {
327             log.error( "No available connections - already closed?" );
328             return;
329         }
330         for ( String dbkind : dbParameters.keySet() ) {
331             // There are subtle interleaved synchronizations happening here.
332             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
333             synchronized( available_connections.get( dbkind ) ) {
334                 for ( Connection conn : available_connections.get( dbkind ) ) {
335                     try {
336                         conn.close();
337                     } catch ( SQLException se ) {
338                         log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
339                     }
340                 }
341                 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
342                     log.warn( "Still busy connection (leak?): "
343                               + ( ( busy.getKey() instanceof Thread ) ? ( (Thread) busy.getKey() ).getName()
344                                                                       : ( (ThreadGroup) busy.getKey() ).getName() )
345                               + " - " + busy.getValue() );
346                 }
347             }
348         }
349         available_connections.clear();
350         log.info( "END - closing connections" );
351     }
352     
353     private static Connection getConnectionForSupplementaryThread( String dbkind ) {
354         
355         ThreadGroup group = Thread.currentThread().getThreadGroup();
356         
357         // synchronize on group, because more than one thread may be requesting a connection at the same time,
358         // and we want no race condition to potentially populate busy_connections in that situation
359         // There are subtle interleaved synchronizations happening here.
360         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
361         synchronized( group ) {
362             // re-run the check for existing busy connections, on the proper key now
363             BusyConnection conn = busy_connections.get( dbkind ).get( group );
364             if ( conn != null ) {
365                 if ( log.isTraceEnabled() ) {
366                     log.trace( dbkind + ": use busy connection for " + group.getName() + " - " + conn );
367                 }
368                 // remember this thread is also using this connection
369                 // There are subtle interleaved synchronizations happening here.
370                 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
371                 synchronized( supplementary_thread_busy_connection_users ) {
372                     supplementary_thread_busy_connection_users.get( conn.conn ).add( Thread.currentThread() );
373                 }
374                 conn.updateRequester();
375                 return conn.conn;
376             }
377
378             Connection c = findOrCreateAvailableConnection( dbkind, group );
379             // that thread is the first user of this connection
380             Set<Thread> users = new HashSet<Thread>();
381             users.add( Thread.currentThread() );
382             // There are subtle interleaved synchronizations happening here.
383             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
384             synchronized( supplementary_thread_busy_connection_users ) {
385                 Set<Thread> previous_users = supplementary_thread_busy_connection_users.put( c, users );
386                 // sanity
387                 if ( previous_users != null ) {
388                     List<String> us = new ArrayList<String>();
389                     for ( Thread u : previous_users ) {
390                         us.add( u.getName() );
391                     }
392                     log.error( "Internal error (previous users should be empty): previous users: "
393                                + previous_users );
394                 }
395             }
396             return c;
397         }
398     }
399
400     /**
401      * Find a currently available connection, or create a new one if none is available and we have
402      * not yet reached the maximum.
403      */
404     private static Connection findOrCreateAvailableConnection( String dbkind, Object busyKey ) {
405         
406         // find an available connection
407         LinkedList<Connection> available = available_connections.get( dbkind );
408         Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
409         ConnectionParameters dbParams = dbParameters.get( dbkind );
410         // There are subtle interleaved synchronizations happening here.
411         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
412         synchronized( available ) {
413             Connection conn = available.poll();
414             if ( conn == null ) {
415                 if ( busy.size() < dbParams.getMaxConnections() ) {
416                     conn = dbParams.createConnection();
417                     if ( conn == null ) {
418                         throw new RuntimeException( "Connection creation not available" );
419                     }
420                     log.debug( "created connection " + conn + ", since busy size is " + busy.size() );
421                 }
422             } else {
423                 try {
424                     long time_before = System.currentTimeMillis();
425                     conn.createStatement().executeQuery( "SELECT 1" );
426                     if ( log.isDebugEnabled() ) {
427                         log.debug( ( System.currentTimeMillis() - time_before ) + " ms for: SELECT 1 [connection validation]" );
428                     }
429                 } catch ( SQLException se ) {
430                     log.error( "Validation error for idle connection for " + dbkind + ": " + se
431                                + ", will replace connection with a newly created one" );
432                     try {
433                         conn.close();
434                     } catch ( SQLException se2 ) {}
435                     conn = dbParams.createConnection();
436                     if ( conn == null ) {
437                         throw new RuntimeException( "Connection creation not available" );
438                     }
439                 }
440                 if ( log.isTraceEnabled() ) {
441                     if ( busyKey instanceof ThreadGroup ) {
442                         log.trace( dbkind + ": use available connection for "
443                                    + ( (ThreadGroup) busyKey ).getName() + " - " + conn );
444                     } else {
445                         log.trace( dbkind + ": use available connection - " + conn );
446                     }
447                 }
448             }
449             if ( conn != null ) {
450                 busy.put( busyKey, new BusyConnection( conn ) );
451                 if ( busy.size() + available.size() >= dbParams.getAlertLevel() ) {
452                     log.warn( "Many connections for dbkind=" + dbkind + ": busy=" + busy.size()
453                               + " max=" + dbParams.getMaxConnections() + ": "
454                               + DBUtils.join( ", ", listBusyConnections() ) );
455                 }
456                 return conn;
457
458             } else {
459                 // TODO: switch to log.warn when we are reasonably confident with
460                 log.error( "No available connections for dbkind=" + dbkind + ": busy=" + busy.size()
461                           + " max=" + dbParams.getMaxConnections() + " busies=["
462                           + DBUtils.join( ", ", listBusyConnections() ) + "], waiting...\n"
463                           + DBUtils.backtrace() );
464                 try {
465                     available.wait(); 
466                 } catch ( InterruptedException ex ) {}
467                 log.error( "Wait over" );
468                 return findOrCreateAvailableConnection( dbkind, busyKey );
469             }
470         }
471     }
472     
473     public static void releaseConnectionsForProcessingThread() {
474         for ( String dbkind : dbParameters.keySet() ) {
475             releaseConnection( dbkind, Thread.currentThread() );
476         }
477     }
478     
479 }