Make dev node count correct, since for example Mac Virtual PC BIOS reports slightly...
[reactos.git] / irc / ArchBlackmann / ThreadPool.cpp
1 // ThreadPool.cpp
2 // This file is (C) 2003-2004 Royce Mitchell III
3 // and released under the LGPL & BSD licenses
4
5 #include <vector>
6 using std::vector;
7 #include "ThreadPool.h"
8 #include "QueueT.h"
9 #include "auto_vector.h"
10 #include "verify.h"
11 #include "ReliMT.h"
12
13 class PoolableThread : public ActiveObject
14 {
15 public:
16 PoolableThread ( ThreadPoolImpl& );
17 ~PoolableThread()
18 {
19 Kill();
20 }
21 void InitThread();
22 void Run();
23 void FlushThread();
24
25 ThreadPoolImpl& _pool;
26 };
27
28 class ThreadPoolLaunchData
29 {
30 public:
31 ThreadPoolFunc* pFun;
32 void* pArg;
33 };
34
35 template <class T>
36 class AtomicCounter : public Uncopyable
37 {
38 Mutex _m;
39 T _t;
40 public:
41 AtomicCounter ( T init = 0 ) : _t(init)
42 {
43 }
44 AtomicCounter ( const AtomicCounter<T>& t )
45 {
46 //Mutex::Lock l ( _m ); // don't need to lock since this is a ctor
47 Mutex::Lock l2 ( t._m );
48 _t = t._t;
49 }
50 const AtomicCounter<T>& operator = ( const AtomicCounter<T>& t )
51 {
52 Mutex::Lock l ( _m );
53 Mutex::Lock l2 ( t._m );
54 _t = t._t;
55 return *this;
56 }
57 T operator ++ ()
58 {
59 Mutex::Lock l ( _m );
60 T t = _t++;
61 return t;
62 }
63 const AtomicCounter<T>& operator ++ ( int )
64 {
65 Mutex::Lock l ( _m );
66 ++_t;
67 return *this;
68 }
69 T operator -- ()
70 {
71 Mutex::Lock l ( _m );
72 T t = _t--;
73 return t;
74 }
75 const AtomicCounter<T>& operator -- ( int )
76 {
77 Mutex::Lock l ( _m );
78 --_t;
79 return *this;
80 }
81 const AtomicCounter<T>& operator += ( T t )
82 {
83 Mutex::Lock l ( _m );
84 return _t += t;
85 return *this;
86 }
87 const AtomicCounter<T>& operator -= ( T t )
88 {
89 Mutex::Lock l ( _m );
90 return _t -= t;
91 return *this;
92 }
93 operator const T& () const
94 {
95 //Mutex::Lock l ( _m );
96 return _t;
97 }
98 T operator !() const
99 {
100 //Mutex::Lock l ( _m );
101 return !_t;
102 }
103 };
104
105 class ThreadPoolImpl : public Uncopyable
106 {
107 public:
108 ThreadPoolImpl() : _isDying(false), _idleThreads(0)
109 {
110 }
111
112 ~ThreadPoolImpl()
113 {
114 }
115
116 void Shutdown()
117 {
118 _isDying = true;
119 while ( _idleThreads )
120 {
121 _threadWaitEvent.Release();
122 _threadStartEvent.Wait(); // let thread actually get a grip
123 }
124 }
125
126 void Launch ( ThreadPoolFunc* pFun, void* pArg )
127 {
128 // this mutex is necessary to make sure we never have a conflict
129 // between checking !_idleThreads and the call to _threadStartEvent.Wait()
130 // basically if 2 threads call Launch() simultaneously, and there is only
131 // 1 idle thread, it's possible that a new thread won't be created to
132 // satisfy the 2nd request until an existing thread finishes.
133 Mutex::Lock launchlock ( _launchMutex );
134
135 ASSERT ( pFun );
136 ThreadPoolLaunchData* data;
137 {
138 Mutex::Lock lock ( _vectorMutex );
139 if ( !_spareData.size() )
140 _spareData.push_back ( new ThreadPoolLaunchData() );
141 data = _spareData.pop_back().release();
142 if ( !_idleThreads )
143 _threads.push_back ( new PoolableThread(*this) );
144 }
145
146 data->pFun = pFun;
147 data->pArg = pArg;
148 verify ( _pendingData.Add ( data ) );
149 _threadWaitEvent.Release(); // tell a thread to do it's thing...
150 _threadStartEvent.Wait(); // wait on a thread to pick up the request
151 }
152
153 // functions for threads to call...
154 ThreadPoolLaunchData* GetPendingData()
155 {
156 ThreadPoolLaunchData* data = NULL;
157 ++_idleThreads;
158 _threadWaitEvent.Wait(); // waits until there's a request
159 --_idleThreads;
160 _threadStartEvent.Release(); // tell requester we got it
161 if ( _isDying )
162 return NULL;
163 _pendingData.Get ( data );
164 ASSERT ( data );
165 return data;
166 }
167
168 void RecycleData ( ThreadPoolLaunchData* data )
169 {
170 Mutex::Lock lock ( _vectorMutex );
171 _spareData.push_back ( data );
172 }
173
174 bool _isDying;
175 Mutex _vectorMutex, _launchMutex;
176 auto_vector<PoolableThread> _threads;
177 auto_vector<ThreadPoolLaunchData> _spareData;
178 CQueueT<ThreadPoolLaunchData*> _pendingData;
179 Event _threadWaitEvent, _threadStartEvent;
180 AtomicCounter<int> _idleThreads;
181 };
182
183 ///////////////////////////////////////////////////////////////////////////////
184 // ThreadPool
185
186 /*static*/ ThreadPool& ThreadPool::Instance()
187 {
188 static ThreadPool tp;
189 return tp;
190 }
191
192 ThreadPool::ThreadPool() : _pimpl ( new ThreadPoolImpl )
193 {
194 };
195
196 ThreadPool::~ThreadPool()
197 {
198 _pimpl->Shutdown();
199 delete _pimpl;
200 _pimpl = 0;
201 }
202
203 void ThreadPool::Launch ( ThreadPoolFunc* pFun, void* pArg )
204 {
205 _pimpl->Launch ( pFun, pArg );
206 }
207
208 int ThreadPool::IdleThreads()
209 {
210 return _pimpl->_idleThreads;
211 }
212
213 ///////////////////////////////////////////////////////////////////////////////
214 // PoolableThread
215
216 PoolableThread::PoolableThread ( ThreadPoolImpl& pool ) : _pool(pool)
217 {
218 Start();
219 }
220
221 void PoolableThread::InitThread()
222 {
223 }
224
225 void PoolableThread::Run()
226 {
227 ThreadPoolLaunchData* data;
228 while ( !_isDying )
229 {
230 data = _pool.GetPendingData(); // enter wait state if none...
231 if ( !data ) // NULL data means kill thread
232 break;
233 (*data->pFun) ( data->pArg ); // call the function
234 _pool.RecycleData ( data );
235 }
236 }
237
238 void PoolableThread::FlushThread()
239 {
240 }