e0a4422fda5d7a9f6683084c4c942df4f8933987
[sdbl4j] / src / java / org / gc / sdbl4j / DBConnectionPool.java
1 /*
2  *
3  * Copyright (C) 2010 Guillaume Cottenceau and MNC S.A.
4  *
5  * This file is part of sdbl4j, and is licensed under the Apache 2.0 license.
6  *
7  */
8
9 package org.gc.sdbl4j;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.ConcurrentModificationException;
14 import java.util.HashMap;
15 import java.util.HashSet;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.sql.Connection;
23 import java.sql.SQLException;
24
25 import org.apache.log4j.Logger;
26
27 /**
28  * There are subtle interleaved synchronizations happening here.
29  * Use a traffic load generator with low preOpenedConnections for all modifications in here!
30  */
31 public class DBConnectionPool {
32
33     private static Logger log = Logger.getLogger( DBConnectionPool.class ); 
34
35     private static class BusyConnection {
36         public Connection conn;
37         public String backtrace;
38         public long tsTotal;
39         public long tsLatest;
40         public BusyConnection( Connection conn ) {
41             this.conn = conn;
42             backtrace = DBUtils.backtrace( 3 );
43             tsTotal = tsLatest = System.currentTimeMillis();
44         }
45         public void updateRequester() {
46             backtrace = DBUtils.backtrace( 2 );
47             tsLatest = System.currentTimeMillis();
48         }
49         public String toString() {
50             List<String> us = new ArrayList<String>();
51             boolean users_list_ok = false;
52             int attempts = 0;
53             while ( ! users_list_ok ) {
54                 try {
55                     Set<Thread> users = supplementary_thread_busy_connection_users.get( conn );
56                     if ( users != null ) {
57                         for ( Thread u : users ) {
58                             us.add( u.getName() );
59                         }
60                     }
61                     users_list_ok = true;
62                 } catch ( ConcurrentModificationException cme ) {
63                     // There are subtle interleaved synchronizations happening here.
64                     // Use a traffic load generator with low preOpenedConnections for all modifications in here!
65                     attempts++;
66                     if ( attempts > 3 ) {
67                         log.error( "3 attempts, abandon" );
68                         break;
69                     }
70                     us.clear();
71                 }
72             }
73             return "{" + conn + ",oldnessTotal=" + ( System.currentTimeMillis() - tsTotal )
74                    + ",oldnessLatest=" + ( System.currentTimeMillis() - tsLatest )
75                    + ",supplementaryThreadUsers=" + DBUtils.join( ";", us ) + ",requester=\n" + backtrace + "}";
76         }
77     }
78     
79     private static Map<String, ConnectionParameters> dbParameters = null;
80     private static Map<String, LinkedList<Connection>> available_connections;
81     private static Map<String, ConcurrentMap<Object, BusyConnection>> busy_connections;
82     private static Map<Connection, Set<Thread>> supplementary_thread_busy_connection_users;
83     
84     /** 
85      * List of threads that are created in external libraries in that require a db connection. 
86      * All connections from those threads MUST be manually released after use.
87      * */
88     private static Set<String> supported_external_threads;
89
90     public static void init( Map<String, ConnectionParameters> parameters ) {
91         dbParameters = parameters;
92         available_connections = new HashMap<String, LinkedList<Connection>>();
93         busy_connections = new HashMap<String, ConcurrentMap<Object, BusyConnection>>();
94         for ( Map.Entry<String, ConnectionParameters> db : parameters.entrySet() ) {
95             log.info( "Initializing: " + db.getValue() );
96             if ( db.getValue().getPreOpenedConnections() > db.getValue().getMaxConnections() ) {
97                 log.error( db.getKey() + ": pre opened connections > max connections, this is stupid!" );
98             }
99             if ( db.getValue().getAlertLevel() < db.getValue().getPreOpenedConnections() ) {
100                 log.error( db.getKey() + ": alert level < pre opened connections, this is stupid!" );
101             }
102             LinkedList<Connection> connections = new LinkedList<Connection>();
103             available_connections.put( db.getKey(), connections );
104             for ( int i = 0; i < db.getValue().getPreOpenedConnections(); i++ ) {
105                 connections.add( db.getValue().createConnection() );
106             }
107             busy_connections.put( db.getKey(), new ConcurrentHashMap<Object, BusyConnection>() );
108             if ( db.getValue().getMaxConnections() < 1 ) {
109                 log.error( db.getKey() + ": max connections cannot be < 1" );
110             }
111         }
112         supplementary_thread_busy_connection_users = new HashMap<Connection, Set<Thread>>();
113         supported_external_threads = new HashSet<String>();
114     }
115
116     /**
117      * Add an external thread name that requires a DB connection.<br/>
118      * All connections from this thread MUST be manually released after use.
119      * @param threadName
120      */
121     public static void addSupportedExternalThread( String threadName ) {
122         if ( threadName.equals( "" ) ) {
123             return;
124         }
125         supported_external_threads.add( threadName );
126     }
127     
128     /** Check if the given thread name is a supported thread creating from an external library. */
129     public static boolean isSupportedExternalThread( String threadName ) {
130         for ( String name : supported_external_threads ) {
131             if ( threadName.startsWith( name ) ) {
132                 return true;
133             }
134         }
135         return false;
136     }
137     
138     public static Map<String, ConnectionParameters> getParameters() {
139         return dbParameters;
140     }
141     
142     public static List<String> listBusyConnections() {
143         List<String> retval = new ArrayList<String>();
144         List<String> kinds = new ArrayList<String>( dbParameters.keySet() );
145         Collections.sort( kinds );
146         for ( String dbkind : kinds ) {
147             int count = 0;
148             List<String> kind = new ArrayList<String>();
149             for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
150                 if ( busy.getKey() instanceof Thread ) {
151                     // e.g. used by a request processing thread
152                     kind.add( ( (Thread) busy.getKey() ).getName() + busy.getValue() );
153                 } else {
154                     // e.g. used by supplementary thread(s)
155                     kind.add( ( (ThreadGroup) busy.getKey() ).getName() + busy.getValue() );
156                 }
157                 count++;
158             }
159             if ( count == 0 ) {
160                 retval.add( dbkind + ": 0" );
161             } else {
162                 retval.add( dbkind + ": " + count + " (" + DBUtils.join( ", ", kind ) + ")" );
163             }
164         }
165         return retval;
166     }
167     
168     /**
169      * Get a database connection to use. Normally never called directly, only by auto-generated database access
170      * code.
171      * 
172      * - in case the calling Thread is a request processing Thread:
173      * 
174      *   This connection will be associated to the calling Thread and reserved for it (one connection dedicated to
175      *   one processing thread model).
176      * 
177      * - in case the calling Thread is a supplementary Thread(1)
178      * 
179      *   This connection can be reserved or shared with other supplementary Threads depending on configuration
180      *   (<code>connectionGroup</code> for asynchronous events, dedicated instance properties for other threads).
181      *   
182      * (1) a supplementary Thread, e.g. not a request processing Thread, is thus:
183      *     - any Thread invoking asynchronous events (such as Spooler, ElectionsStartStop, etc)
184      *     - the Spooler sub threads used for MT retries
185      *     - any {@link ThreadedElementsConsumer} uses, e.g. DB loggers, DLR processors, Notification Sender
186      *     - the MessageSender threads
187      */ 
188     public static Connection getConnection( String dbkind ) {
189         // for request processing threads, we have one connection per thread; for supplementary threads, we
190         // have one connection per thread group; to optimize request processing, we first try to look if we
191         // have a busy thread associated with this thread, even if it's possibly not a request processing thread
192         ConcurrentMap<Object, BusyConnection> connections = busy_connections.get( dbkind );
193         if ( connections == null ) {
194             log.error( "DB kind '" + dbkind + "' doesn't exist" );
195             return null;
196         }
197         BusyConnection conn = connections.get( Thread.currentThread() );  // ConcurrentMap
198         if ( conn != null ) {
199             if ( log.isTraceEnabled() ) {
200                 log.trace( dbkind + ": use busy connection - " + conn );
201             }
202             conn.updateRequester();
203             return conn.conn;
204         }
205         
206         // supplementary threads are recognized by using a specific parent's thread group
207         if ( Thread.currentThread().getThreadGroup().getParent() == SupplementaryThreadHelper.parentThreadGroup ) {
208             return getConnectionForSupplementaryThread( dbkind );
209             
210         } else {
211             // sanity
212             if ( ! Thread.currentThread().getName().startsWith( "http" )
213                  && ! Thread.currentThread().getName().startsWith( "pool" )
214                  && ! Thread.currentThread().getName().equals( "main" ) ) {
215                 if ( isSupportedExternalThread( Thread.currentThread().getName() ) ) {
216                     log.debug( "Thread identified as being externally created (external library): the DB " +
217                                "connections allocated to non-request processing threads MUST be " +
218                                "manually released after use" );
219                 } else {
220                     log.error( "Thread identified as request processing thread (parent of thread group is not "
221                                + "SupplementaryThreadHelper.parentThreadGroup), but name doesn't start with http!"
222                                + " (thread name: " + Thread.currentThread().getName() + ")" );
223                 }
224             }
225             return findOrCreateAvailableConnection( dbkind, Thread.currentThread() );
226         }
227
228     }
229
230     public static void supplementaryThreadReleaseConnections() {
231         if ( dbParameters == null ) {
232             log.trace( "Do nothing (hook for combining old style and new style DB accesses in acme)" );
233             return;
234         }
235         log.trace( "Enter for group " + Thread.currentThread().getThreadGroup().getName() + ", thread "
236                    + Thread.currentThread().getName() );
237         for ( String dbkind : dbParameters.keySet() ) {
238             // There are subtle interleaved synchronizations happening here.
239             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
240             synchronized( Thread.currentThread().getThreadGroup() ) {
241                 BusyConnection conn = busy_connections.get( dbkind ).get( Thread.currentThread().getThreadGroup() );
242                 if ( conn != null ) {
243                     // at least one of the threads in this thread group use this connection, either that thread, 
244                     // others, or that thread and others; first, check if that thread is really using it:
245                     // There are subtle interleaved synchronizations happening here.
246                     // Use a traffic load generator with low preOpenedConnections for all modifications in here!
247                     synchronized( supplementary_thread_busy_connection_users ) {
248                         Set<Thread> users = supplementary_thread_busy_connection_users.get( conn.conn );
249                         if ( log.isTraceEnabled() ) {
250                             List<String> us = new ArrayList<String>();
251                             for ( Thread u : users ) {
252                                 us.add( u.getName() );
253                             }
254                         }
255                         if ( users.remove( Thread.currentThread() ) ) {
256                             // this thread was using this connection. now let's see if no other threads are.
257                             if ( users.size() == 0 ) {
258                                 if ( log.isTraceEnabled() ) {
259                                     log.trace( "Group " + Thread.currentThread().getThreadGroup().getName()
260                                                + ", thread " + Thread.currentThread().getName()
261                                                + ", release connection " + conn + " on no more users" );
262                                 }
263                                 releaseConnection( dbkind, Thread.currentThread().getThreadGroup() );
264                                 supplementary_thread_busy_connection_users.remove( conn.conn );
265                             }
266                         }
267                     }
268                 }
269             }
270         }
271     }
272     
273     private static void releaseConnection( String dbkind, Object key ) {
274         LinkedList<Connection> available = available_connections.get( dbkind );
275         Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
276         ConnectionParameters dbParams = dbParameters.get( dbkind );
277
278         BusyConnection conn = busy.get( key );  // ConcurrentMap
279         if ( conn == null ) {
280             return;
281         }
282         
283         // There are subtle interleaved synchronizations happening here.
284         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
285         synchronized( available ) {
286             BusyConnection conn2 = busy.remove( key );
287             // sanity
288             if ( conn2 == null || ! conn.conn.equals( conn2.conn ) ) {
289                 log.error( "Internal error, removed connection " + conn2 + " not equals to getted connection "
290                            + conn );
291                 return;
292             }
293
294             if ( busy.size() + available.size() >= dbParams.getPreOpenedConnections() ) {
295                 if ( log.isTraceEnabled() ) {
296                     log.trace( "closing connection for " + dbkind + " - " + conn + " - "
297                                + " busy:" + busy.size() + " available:" + available.size() + " busies:\n"
298                                + DBUtils.join( "\n", listBusyConnections() ) );
299                 } else if ( log.isDebugEnabled() ) {
300                     log.debug( "closing connection for " + dbkind + " - " + conn + " - "
301                                + " busy:" + busy.size() + " available:" + available.size() );
302                 }
303                 try {
304                     conn.conn.close();
305                 } catch ( SQLException se ) {
306                     log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
307                 }
308
309             } else {
310                 available.addFirst( conn.conn );
311             }
312
313             available.notify();
314         }
315     }
316             
317     /** Close all of the database connections. */
318     public static void closeConnections() {
319         log.info( "START - closing connections" );
320         if ( available_connections.size() == 0 ) {
321             log.error( "No available connections - already closed?" );
322             return;
323         }
324         for ( String dbkind : dbParameters.keySet() ) {
325             // There are subtle interleaved synchronizations happening here.
326             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
327             synchronized( available_connections.get( dbkind ) ) {
328                 for ( Connection conn : available_connections.get( dbkind ) ) {
329                     try {
330                         conn.close();
331                     } catch ( SQLException se ) {
332                         log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
333                     }
334                 }
335                 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
336                     log.warn( "Still busy connection (leak?): "
337                               + ( ( busy.getKey() instanceof Thread ) ? ( (Thread) busy.getKey() ).getName()
338                                                                       : ( (ThreadGroup) busy.getKey() ).getName() )
339                               + " - " + busy.getValue() );
340                 }
341             }
342         }
343         available_connections.clear();
344         log.info( "END - closing connections" );
345     }
346     
347     private static Connection getConnectionForSupplementaryThread( String dbkind ) {
348         
349         ThreadGroup group = Thread.currentThread().getThreadGroup();
350         
351         // synchronize on group, because more than one thread may be requesting a connection at the same time,
352         // and we want no race condition to potentially populate busy_connections in that situation
353         // There are subtle interleaved synchronizations happening here.
354         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
355         synchronized( group ) {
356             // re-run the check for existing busy connections, on the proper key now
357             BusyConnection conn = busy_connections.get( dbkind ).get( group );
358             if ( conn != null ) {
359                 if ( log.isTraceEnabled() ) {
360                     log.trace( dbkind + ": use busy connection for " + group.getName() + " - " + conn );
361                 }
362                 // remember this thread is also using this connection
363                 // There are subtle interleaved synchronizations happening here.
364                 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
365                 synchronized( supplementary_thread_busy_connection_users ) {
366                     supplementary_thread_busy_connection_users.get( conn.conn ).add( Thread.currentThread() );
367                 }
368                 conn.updateRequester();
369                 return conn.conn;
370             }
371
372             Connection c = findOrCreateAvailableConnection( dbkind, group );
373             // that thread is the first user of this connection
374             Set<Thread> users = new HashSet<Thread>();
375             users.add( Thread.currentThread() );
376             // There are subtle interleaved synchronizations happening here.
377             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
378             synchronized( supplementary_thread_busy_connection_users ) {
379                 Set<Thread> previous_users = supplementary_thread_busy_connection_users.put( c, users );
380                 // sanity
381                 if ( previous_users != null ) {
382                     List<String> us = new ArrayList<String>();
383                     for ( Thread u : previous_users ) {
384                         us.add( u.getName() );
385                     }
386                     log.error( "Internal error (previous users should be empty): previous users: "
387                                + previous_users );
388                 }
389             }
390             return c;
391         }
392     }
393
394     /**
395      * Find a currently available connection, or create a new one if none is available and we have
396      * not yet reached the maximum.
397      */
398     private static Connection findOrCreateAvailableConnection( String dbkind, Object busyKey ) {
399         
400         // find an available connection
401         LinkedList<Connection> available = available_connections.get( dbkind );
402         Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
403         ConnectionParameters dbParams = dbParameters.get( dbkind );
404         // There are subtle interleaved synchronizations happening here.
405         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
406         synchronized( available ) {
407             Connection conn = available.poll();
408             if ( conn == null ) {
409                 if ( busy.size() < dbParams.getMaxConnections() ) {
410                     conn = dbParams.createConnection();
411                     if ( conn == null ) {
412                         throw new RuntimeException( "Connection creation not available" );
413                     }
414                     log.debug( "created connection " + conn + ", since busy size is " + busy.size() );
415                 }
416             } else {
417                 // TODO: check if the connection is still alive?
418                 if ( log.isTraceEnabled() ) {
419                     if ( busyKey instanceof ThreadGroup ) {
420                         log.trace( dbkind + ": use available connection for "
421                                    + ( (ThreadGroup) busyKey ).getName() + " - " + conn );
422                     } else {
423                         log.trace( dbkind + ": use available connection - " + conn );
424                     }
425                 }
426             }
427             if ( conn != null ) {
428                 busy.put( busyKey, new BusyConnection( conn ) );
429                 if ( busy.size() + available.size() >= dbParams.getAlertLevel() ) {
430                     log.warn( "Many connections for dbkind=" + dbkind + ": busy=" + busy.size()
431                               + " max=" + dbParams.getMaxConnections() + ": "
432                               + DBUtils.join( ", ", listBusyConnections() ) );
433                 }
434                 return conn;
435
436             } else {
437                 // TODO: switch to log.warn when we are reasonably confident with
438                 log.error( "No available connections for dbkind=" + dbkind + ": busy=" + busy.size()
439                           + " max=" + dbParams.getMaxConnections() + " busies=["
440                           + DBUtils.join( ", ", listBusyConnections() ) + "], waiting...\n"
441                           + DBUtils.backtrace() );
442                 try {
443                     available.wait(); 
444                 } catch ( InterruptedException ex ) {}
445                 log.error( "Wait over" );
446                 return findOrCreateAvailableConnection( dbkind, busyKey );
447             }
448         }
449     }
450     
451     public static void releaseConnectionsForProcessingThread() {
452         for ( String dbkind : dbParameters.keySet() ) {
453             releaseConnection( dbkind, Thread.currentThread() );
454         }
455     }
456     
457 }