initial incarnation of newdblayer as really separated opensource sdbl4j library
[sdbl4j] / src / java / org / gc / sdbl4j / DBConnectionPool.java
1 /*
2  *
3  * Copyright (C) 2010 Guillaume Cottenceau and MNC S.A.
4  *
5  * This file is part of sdbl4j, and is licensed under the Apache 2.0 license.
6  *
7  */
8
9 package org.gc.sdbl4j;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.ConcurrentModificationException;
14 import java.util.HashMap;
15 import java.util.HashSet;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.sql.Connection;
23 import java.sql.SQLException;
24
25 import org.apache.log4j.Logger;
26
27 /**
28  * There are subtle interleaved synchronizations happening here.
29  * Use a traffic load generator with low preOpenedConnections for all modifications in here!
30  */
31 public class DBConnectionPool {
32
33     private static Logger log = Logger.getLogger( DBConnectionPool.class ); 
34
35     private static class BusyConnection {
36         public Connection conn;
37         public String backtrace;
38         public long tsTotal;
39         public long tsLatest;
40         public BusyConnection( Connection conn ) {
41             this.conn = conn;
42             backtrace = DBUtils.backtrace( 3 );
43             tsTotal = tsLatest = System.currentTimeMillis();
44         }
45         public void updateRequester() {
46             backtrace = DBUtils.backtrace( 2 );
47             tsLatest = System.currentTimeMillis();
48         }
49         public String toString() {
50             List<String> us = new ArrayList<String>();
51             boolean users_list_ok = false;
52             int attempts = 0;
53             while ( ! users_list_ok ) {
54                 try {
55                     Set<Thread> users = supplementary_thread_busy_connection_users.get( conn );
56                     if ( users != null ) {
57                         for ( Thread u : users ) {
58                             us.add( u.getName() );
59                         }
60                     }
61                     users_list_ok = true;
62                 } catch ( ConcurrentModificationException cme ) {
63                     // There are subtle interleaved synchronizations happening here.
64                     // Use a traffic load generator with low preOpenedConnections for all modifications in here!
65                     attempts++;
66                     if ( attempts > 3 ) {
67                         log.error( "3 attempts, abandon" );
68                         break;
69                     }
70                     us.clear();
71                 }
72             }
73             return "{" + conn + ",oldnessTotal=" + ( System.currentTimeMillis() - tsTotal )
74                    + ",oldnessLatest=" + ( System.currentTimeMillis() - tsLatest )
75                    + ",supplementaryThreadUsers=" + DBUtils.join( ";", us ) + ",requester=\n" + backtrace + "}";
76         }
77     }
78     
79     private static Map<String, ConnectionParameters> dbParameters = null;
80     private static Map<String, LinkedList<Connection>> available_connections;
81     private static Map<String, ConcurrentMap<Object, BusyConnection>> busy_connections;
82     private static Map<Connection, Set<Thread>> supplementary_thread_busy_connection_users;
83     
84     /** 
85      * List of threads that are created in external libraries in that require a db connection. 
86      * All connections from those threads MUST be manually released after use.
87      * */
88     private static Set<String> supported_external_threads;
89
90     public static void init( Map<String, ConnectionParameters> parameters ) {
91         dbParameters = parameters;
92         available_connections = new HashMap<String, LinkedList<Connection>>();
93         busy_connections = new HashMap<String, ConcurrentMap<Object, BusyConnection>>();
94         for ( Map.Entry<String, ConnectionParameters> db : parameters.entrySet() ) {
95             log.info( "Initializing: " + db.getValue() );
96             if ( db.getValue().getPreOpenedConnections() > db.getValue().getMaxConnections() ) {
97                 log.error( db.getKey() + ": pre opened connections > max connections, this is stupid!" );
98             }
99             if ( db.getValue().getAlertLevel() < db.getValue().getPreOpenedConnections() ) {
100                 log.error( db.getKey() + ": alert level < pre opened connections, this is stupid!" );
101             }
102             LinkedList<Connection> connections = new LinkedList<Connection>();
103             available_connections.put( db.getKey(), connections );
104             for ( int i = 0; i < db.getValue().getPreOpenedConnections(); i++ ) {
105                 connections.add( db.getValue().createConnection() );
106             }
107             busy_connections.put( db.getKey(), new ConcurrentHashMap<Object, BusyConnection>() );
108             if ( db.getValue().getMaxConnections() < 1 ) {
109                 log.error( db.getKey() + ": max connections cannot be < 1" );
110             }
111         }
112         supplementary_thread_busy_connection_users = new HashMap<Connection, Set<Thread>>();
113         supported_external_threads = new HashSet<String>();
114     }
115
116     /**
117      * Add an external thread name that requires a DB connection.<br/>
118      * All connections from this thread MUST be manually released after use.
119      * @param threadName
120      */
121     public static void addSupportedExternalThread( String threadName ) {
122         if ( threadName.equals( "" ) ) {
123             return;
124         }
125         supported_external_threads.add( threadName );
126     }
127     
128     /** Check if the given thread name is a supported thread creating from an external library. */
129     public static boolean isSupportedExternalThread( String threadName ) {
130         for ( String name : supported_external_threads ) {
131             if ( threadName.startsWith( name ) ) {
132                 return true;
133             }
134         }
135         return false;
136     }
137     
138     public static Map<String, ConnectionParameters> getParameters() {
139         return dbParameters;
140     }
141     
142     public static List<String> listBusyConnections() {
143         List<String> retval = new ArrayList<String>();
144         List<String> kinds = new ArrayList<String>( dbParameters.keySet() );
145         Collections.sort( kinds );
146         for ( String dbkind : kinds ) {
147             int count = 0;
148             List<String> kind = new ArrayList<String>();
149             for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
150                 if ( busy.getKey() instanceof Thread ) {
151                     // e.g. used by a request processing thread
152                     kind.add( ( (Thread) busy.getKey() ).getName() + busy.getValue() );
153                 } else {
154                     // e.g. used by supplementary thread(s)
155                     kind.add( ( (ThreadGroup) busy.getKey() ).getName() + busy.getValue() );
156                 }
157                 count++;
158             }
159             if ( count == 0 ) {
160                 retval.add( dbkind + ": 0" );
161             } else {
162                 retval.add( dbkind + ": " + count + " (" + DBUtils.join( ", ", kind ) + ")" );
163             }
164         }
165         return retval;
166     }
167     
168     /**
169      * Get a database connection to use. Normally never called directly, only by auto-generated database access
170      * code.
171      * 
172      * - in case the calling Thread is a request processing Thread:
173      * 
174      *   This connection will be associated to the calling Thread and reserved for it (one connection dedicated to
175      *   one processing thread model).
176      * 
177      * - in case the calling Thread is a supplementary Thread(1)
178      * 
179      *   This connection can be reserved or shared with other supplementary Threads depending on configuration
180      *   (<code>connectionGroup</code> for asynchronous events, dedicated instance properties for other threads).
181      *   
182      * (1) a supplementary Thread, e.g. not a request processing Thread, is thus:
183      *     - any Thread invoking asynchronous events (such as Spooler, ElectionsStartStop, etc)
184      *     - the Spooler sub threads used for MT retries
185      *     - any {@link ThreadedElementsConsumer} uses, e.g. DB loggers, DLR processors, Notification Sender
186      *     - the MessageSender threads
187      */ 
188     public static Connection getConnection( String dbkind ) {
189         // for request processing threads, we have one connection per thread; for supplementary threads, we
190         // have one connection per thread group; to optimize request processing, we first try to look if we
191         // have a busy thread associated with this thread, even if it's possibly not a request processing thread
192         ConcurrentMap<Object, BusyConnection> connections = busy_connections.get( dbkind );
193         if ( connections == null ) {
194             log.error( "DB kind '" + dbkind + "' doesn't exist" );
195             return null;
196         }
197         BusyConnection conn = connections.get( Thread.currentThread() );  // ConcurrentMap
198         if ( conn != null ) {
199             if ( log.isTraceEnabled() ) {
200                 log.trace( dbkind + ": use busy connection - " + conn );
201             }
202             conn.updateRequester();
203             return conn.conn;
204         }
205         
206         // supplementary threads are recognized by using a specific parent's thread group
207         if ( Thread.currentThread().getThreadGroup().getParent() == SupplementaryThreadHelper.parentThreadGroup ) {
208             return getConnectionForSupplementaryThread( dbkind );
209             
210         } else {
211             // sanity
212             if ( ! Thread.currentThread().getName().startsWith( "http" )
213                 && ! Thread.currentThread().getName().equals( "main" ) ) {
214                 if ( isSupportedExternalThread( Thread.currentThread().getName() ) ) {
215                     log.debug( "Thread identified as being externally created (external library): the DB " +
216                                "connections allocated to non-request processing threads MUST be " +
217                                "manually released after use" );
218                 } else {
219                     log.error( "Thread identified as request processing thread (parent of thread group is not "
220                                + "SupplementaryThreadHelper.parentThreadGroup), but name doesn't start with http!"
221                                + " (thread name: " + Thread.currentThread().getName() + ")" );
222                 }
223             }
224             return findOrCreateAvailableConnection( dbkind, Thread.currentThread() );
225         }
226
227     }
228
229     public static void supplementaryThreadReleaseConnections() {
230         if ( dbParameters == null ) {
231             log.trace( "Do nothing (hook for combining old style and new style DB accesses in acme)" );
232             return;
233         }
234         log.trace( "Enter for group " + Thread.currentThread().getThreadGroup().getName() + ", thread "
235                    + Thread.currentThread().getName() );
236         for ( String dbkind : dbParameters.keySet() ) {
237             // There are subtle interleaved synchronizations happening here.
238             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
239             synchronized( Thread.currentThread().getThreadGroup() ) {
240                 BusyConnection conn = busy_connections.get( dbkind ).get( Thread.currentThread().getThreadGroup() );
241                 if ( conn != null ) {
242                     // at least one of the threads in this thread group use this connection, either that thread, 
243                     // others, or that thread and others; first, check if that thread is really using it:
244                     // There are subtle interleaved synchronizations happening here.
245                     // Use a traffic load generator with low preOpenedConnections for all modifications in here!
246                     synchronized( supplementary_thread_busy_connection_users ) {
247                         Set<Thread> users = supplementary_thread_busy_connection_users.get( conn.conn );
248                         if ( log.isTraceEnabled() ) {
249                             List<String> us = new ArrayList<String>();
250                             for ( Thread u : users ) {
251                                 us.add( u.getName() );
252                             }
253                         }
254                         if ( users.remove( Thread.currentThread() ) ) {
255                             // this thread was using this connection. now let's see if no other threads are.
256                             if ( users.size() == 0 ) {
257                                 if ( log.isTraceEnabled() ) {
258                                     log.trace( "Group " + Thread.currentThread().getThreadGroup().getName()
259                                                + ", thread " + Thread.currentThread().getName()
260                                                + ", release connection " + conn + " on no more users" );
261                                 }
262                                 releaseConnection( dbkind, Thread.currentThread().getThreadGroup() );
263                                 supplementary_thread_busy_connection_users.remove( conn.conn );
264                             }
265                         }
266                     }
267                 }
268             }
269         }
270     }
271     
272     private static void releaseConnection( String dbkind, Object key ) {
273         LinkedList<Connection> available = available_connections.get( dbkind );
274         Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
275         ConnectionParameters dbParams = dbParameters.get( dbkind );
276
277         BusyConnection conn = busy.get( key );  // ConcurrentMap
278         if ( conn == null ) {
279             return;
280         }
281         
282         // There are subtle interleaved synchronizations happening here.
283         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
284         synchronized( available ) {
285             BusyConnection conn2 = busy.remove( key );
286             // sanity
287             if ( conn2 == null || ! conn.conn.equals( conn2.conn ) ) {
288                 log.error( "Internal error, removed connection " + conn2 + " not equals to getted connection "
289                            + conn );
290                 return;
291             }
292
293             if ( busy.size() + available.size() >= dbParams.getPreOpenedConnections() ) {
294                 if ( log.isTraceEnabled() ) {
295                     log.trace( "closing connection for " + dbkind + " - " + conn + " - "
296                                + " busy:" + busy.size() + " available:" + available.size() + " busies:\n"
297                                + DBUtils.join( "\n", listBusyConnections() ) );
298                 } else if ( log.isDebugEnabled() ) {
299                     log.debug( "closing connection for " + dbkind + " - " + conn + " - "
300                                + " busy:" + busy.size() + " available:" + available.size() );
301                 }
302                 try {
303                     conn.conn.close();
304                 } catch ( SQLException se ) {
305                     log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
306                 }
307
308             } else {
309                 available.addFirst( conn.conn );
310             }
311
312             available.notify();
313         }
314     }
315             
316     /** Close all of the database connections. */
317     public static void closeConnections() {
318         log.info( "START - closing connections" );
319         if ( available_connections.size() == 0 ) {
320             log.error( "No available connections - already closed?" );
321             return;
322         }
323         for ( String dbkind : dbParameters.keySet() ) {
324             // There are subtle interleaved synchronizations happening here.
325             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
326             synchronized( available_connections.get( dbkind ) ) {
327                 for ( Connection conn : available_connections.get( dbkind ) ) {
328                     try {
329                         conn.close();
330                     } catch ( SQLException se ) {
331                         log.warn( "error closing connection for dbkind=" + dbkind + ": " + se );
332                     }
333                 }
334                 for ( Map.Entry<Object, BusyConnection> busy : busy_connections.get( dbkind ).entrySet() ) {
335                     log.warn( "Still busy connection (leak?): "
336                               + ( ( busy.getKey() instanceof Thread ) ? ( (Thread) busy.getKey() ).getName()
337                                                                       : ( (ThreadGroup) busy.getKey() ).getName() )
338                               + " - " + busy.getValue() );
339                 }
340             }
341         }
342         available_connections.clear();
343         log.info( "END - closing connections" );
344     }
345     
346     private static Connection getConnectionForSupplementaryThread( String dbkind ) {
347         
348         ThreadGroup group = Thread.currentThread().getThreadGroup();
349         
350         // synchronize on group, because more than one thread may be requesting a connection at the same time,
351         // and we want no race condition to potentially populate busy_connections in that situation
352         // There are subtle interleaved synchronizations happening here.
353         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
354         synchronized( group ) {
355             // re-run the check for existing busy connections, on the proper key now
356             BusyConnection conn = busy_connections.get( dbkind ).get( group );
357             if ( conn != null ) {
358                 if ( log.isTraceEnabled() ) {
359                     log.trace( dbkind + ": use busy connection for " + group.getName() + " - " + conn );
360                 }
361                 // remember this thread is also using this connection
362                 // There are subtle interleaved synchronizations happening here.
363                 // Use a traffic load generator with low preOpenedConnections for all modifications in here!
364                 synchronized( supplementary_thread_busy_connection_users ) {
365                     supplementary_thread_busy_connection_users.get( conn.conn ).add( Thread.currentThread() );
366                 }
367                 conn.updateRequester();
368                 return conn.conn;
369             }
370
371             Connection c = findOrCreateAvailableConnection( dbkind, group );
372             // that thread is the first user of this connection
373             Set<Thread> users = new HashSet<Thread>();
374             users.add( Thread.currentThread() );
375             // There are subtle interleaved synchronizations happening here.
376             // Use a traffic load generator with low preOpenedConnections for all modifications in here!
377             synchronized( supplementary_thread_busy_connection_users ) {
378                 Set<Thread> previous_users = supplementary_thread_busy_connection_users.put( c, users );
379                 // sanity
380                 if ( previous_users != null ) {
381                     List<String> us = new ArrayList<String>();
382                     for ( Thread u : previous_users ) {
383                         us.add( u.getName() );
384                     }
385                     log.error( "Internal error (previous users should be empty): previous users: "
386                                + previous_users );
387                 }
388             }
389             return c;
390         }
391     }
392
393     /**
394      * Find a currently available connection, or create a new one if none is available and we have
395      * not yet reached the maximum.
396      */
397     private static Connection findOrCreateAvailableConnection( String dbkind, Object busyKey ) {
398         
399         // find an available connection
400         LinkedList<Connection> available = available_connections.get( dbkind );
401         Map<Object, BusyConnection> busy = busy_connections.get( dbkind );
402         ConnectionParameters dbParams = dbParameters.get( dbkind );
403         // There are subtle interleaved synchronizations happening here.
404         // Use a traffic load generator with low preOpenedConnections for all modifications in here!
405         synchronized( available ) {
406             Connection conn = available.poll();
407             if ( conn == null ) {
408                 if ( busy.size() < dbParams.getMaxConnections() ) {
409                     conn = dbParams.createConnection();
410                     if ( conn == null ) {
411                         throw new RuntimeException( "Connection creation not available" );
412                     }
413                     log.debug( "created connection " + conn + ", since busy size is " + busy.size() );
414                 }
415             } else {
416                 // TODO: check if the connection is still alive?
417                 if ( log.isTraceEnabled() ) {
418                     if ( busyKey instanceof ThreadGroup ) {
419                         log.trace( dbkind + ": use available connection for "
420                                    + ( (ThreadGroup) busyKey ).getName() + " - " + conn );
421                     } else {
422                         log.trace( dbkind + ": use available connection - " + conn );
423                     }
424                 }
425             }
426             if ( conn != null ) {
427                 busy.put( busyKey, new BusyConnection( conn ) );
428                 if ( busy.size() + available.size() >= dbParams.getAlertLevel() ) {
429                     log.warn( "Many connections for dbkind=" + dbkind + ": busy=" + busy.size()
430                               + " max=" + dbParams.getMaxConnections() + ": "
431                               + DBUtils.join( ", ", listBusyConnections() ) );
432                 }
433                 return conn;
434
435             } else {
436                 // TODO: switch to log.warn when we are reasonably confident with
437                 log.error( "No available connections for dbkind=" + dbkind + ": busy=" + busy.size()
438                           + " max=" + dbParams.getMaxConnections() + " busies=["
439                           + DBUtils.join( ", ", listBusyConnections() ) + "], waiting...\n"
440                           + DBUtils.backtrace() );
441                 try {
442                     available.wait(); 
443                 } catch ( InterruptedException ex ) {}
444                 log.error( "Wait over" );
445                 return findOrCreateAvailableConnection( dbkind, busyKey );
446             }
447         }
448     }
449     
450     public static void releaseConnectionsForProcessingThread() {
451         for ( String dbkind : dbParameters.keySet() ) {
452             releaseConnection( dbkind, Thread.currentThread() );
453         }
454     }
455     
456 }