2 ** Author: Samuel R. Blackburn
3 ** Internet: wfc@pobox.com
5 ** You can use it any way you like as long as you don't try to sell it.
7 ** Any attempt to sell WFC in source code form must have the permission
8 ** of the original author. You can produce commercial executables with
9 ** WFC but you can't sell WFC.
11 ** Copyright, 2000, Samuel R. Blackburn
13 ** NOTE: I modified the info block below so it hopefully wouldn't conflict
14 ** with the original file. Royce Mitchell III
17 #ifndef QUEUET_CLASS_HEADER
18 #define QUEUET_CLASS_HEADER
23 #include <sys/types.h> // off_t
24 #define HEAPCREATE(size) m_Heap = ::HeapCreate ( HEAP_NO_SERIALIZE, size, 0 )
25 #define HEAPALLOC(size) ::HeapAlloc ( m_Heap, HEAP_NO_SERIALIZE, size )
26 #define HEAPREALLOC(p,size) ::HeapReAlloc( m_Heap, HEAP_NO_SERIALIZE, p, size )
27 #define HEAPFREE(p) ::HeapFree ( m_Heap, HEAP_NO_SERIALIZE, p )
28 #define HEAPDESTROY() ::HeapDestroy ( m_Heap ); m_Heap = 0;
30 #define HEAPCREATE(size)
31 #define HEAPALLOC(size) malloc(size)
32 #define HEAPREALLOC(p,size) realloc(p,size);
33 #define HEAPFREE(p) free(p)
38 class CQueueT
: public Uncopyable
42 // What we want to protect
56 inline void m_GrowBy ( size_t number_of_new_items
);
60 inline CQueueT ( size_t initial_size
= 1024 );
63 inline bool Add( const T
& new_item
);
64 inline void Empty() { m_AddIndex
= 0; m_GetIndex
= 0; };
65 inline bool Get( T
& item
);
66 inline size_t GetLength() const;
67 inline size_t GetMaximumLength() const { return( m_Size
); };
68 inline bool AddArray ( const T
* new_items
, int item_count
);
69 inline int GetArray ( T
* items
, const int maxget
, const T
& tEnd
);
70 inline bool Contains ( const T
& t
);
74 inline CQueueT
<T
>::CQueueT ( size_t initial_size
)
80 if ( initial_size
== 0 )
85 ** We create our own heap because all of the pointers used are allocated
86 ** and freed be us. We don't have to worry about a non-serialized thread
87 ** accessing something we allocated. Because of this, we can perform our
88 ** memory allocations in a heap dedicated to queueing. This means when we
89 ** have to allocate more memory, we don't have to wait for all other threads
90 ** to pause while we allocate from the shared heap (like the C Runtime heap)
93 HEAPCREATE( ( ( ( 2 * initial_size
* sizeof(T
) ) < 65536 ) ? 65536 : (2 * initial_size
* sizeof(T
) ) ) );
95 m_Items
= (T
*)HEAPALLOC ( initial_size
* sizeof(T
) );
97 m_Size
= ( m_Items
== NULL
) ? 0 : initial_size
;
101 inline CQueueT
<T
>::~CQueueT()
107 if ( m_Items
!= NULL
)
117 inline bool CQueueT
<T
>::Add ( const T
& item
)
119 // Block other threads from entering Add();
120 Mutex::Lock
addlock ( m_AddMutex
);
124 m_Items
[ m_AddIndex
] = item
;
127 // Many many thanks go to Lou Franco (lfranco@spheresoft.com)
128 // for finding an bug here. It rare but recreatable situations,
129 // m_AddIndex could be in an invalid state.
131 // Make sure m_AddIndex is never invalid
133 off_t new_add_index
= ( ( m_AddIndex
+ 1 ) >= (off_t
)m_Size
) ? 0 : m_AddIndex
+ 1;
135 if ( new_add_index
== m_GetIndex
)
137 // The queue is full. We need to grow.
138 // Stop anyone from getting from the queue
139 Mutex::Lock
getlock ( m_GetMutex
);
141 m_AddIndex
= new_add_index
;
143 // One last double-check.
145 if ( m_AddIndex
== m_GetIndex
)
153 m_AddIndex
= new_add_index
;
160 inline bool CQueueT
<T
>::Get( T
& item
)
162 // Prevent other threads from entering Get()
163 Mutex::Lock
getlock ( m_GetMutex
);
165 if ( m_GetIndex
== m_AddIndex
)
167 // Let's check to see if our queue has grown too big
168 // If it has, then shrink it
172 // Yup, we're too big for our britches
173 Mutex::TryLock
addlock ( m_AddMutex
);
176 // Now, no one can add to the queue
178 if ( m_GetIndex
== m_AddIndex
) // is queue empty?
180 // See if we can just shrink it...
181 T
* return_value
= (T
*)HEAPREALLOC(m_Items
,1024 * sizeof(T
));
183 if ( return_value
!= NULL
)
185 m_Items
= (T
*) return_value
;
189 // Looks like we'll have to do it the hard way
190 HEAPFREE ( m_Items
);
191 m_Items
= (T
*) HEAPALLOC ( 1024 * sizeof(T
) );
194 m_Size
= ( m_Items
== NULL
) ? 0 : 1024;
200 // m_GetIndex != m_AddIndex, this means that someone added
201 // to the queue between the time we checked m_Size for being
202 // too big and the time we entered the add critical section.
203 // If this happened then we are too busy to shrink
210 item
= m_Items
[ m_GetIndex
];
212 // Make sure m_GetIndex is never invalid
214 m_GetIndex
= ( ( m_GetIndex
+ 1 ) >= (off_t
)m_Size
) ? 0 : m_GetIndex
+ 1;
220 inline int CQueueT
<T
>::GetArray ( T
* items
, const int maxget
, const T
& tEnd
)
222 // TODO - oooh baby does this need to be optimized
223 // Prevent other threads from entering Get()
224 Mutex::Lock
getlock ( m_GetMutex
); //::EnterCriticalSection( &m_GetCriticalSection );
227 for ( int i
= 0; i
< maxget
; i
++ )
229 if ( !Get(items
[i
]) )
232 if ( items
[i
] == tEnd
)
235 // Let other threads call Get() now
236 //::LeaveCriticalSection( &m_GetCriticalSection );
241 inline size_t CQueueT
<T
>::GetLength() const
243 // This is a very expensive process!
244 // No one can call Add() or Get() while we're computing this
246 size_t number_of_items_in_the_queue
= 0;
248 Mutex::Lock
addlock ( m_AddMutex
);
249 Mutex::Lock
getlock ( m_GetMutex
);
251 number_of_items_in_the_queue
= ( m_AddIndex
>= m_GetIndex
) ?
252 ( m_AddIndex
- m_GetIndex
) :
253 ( ( m_AddIndex
+ m_Size
) - m_GetIndex
);
255 return number_of_items_in_the_queue
;
259 inline void CQueueT
<T
>::m_GrowBy ( size_t number_of_new_items
)
261 // Prevent other threads from calling Get().
262 // We don't need to enter the AddCriticalSection because
263 // m_GrowBy() is only called from Add();
266 T
* pointer_to_free
= NULL
;
268 size_t new_size
= m_Size
+ number_of_new_items
;
270 { // Prevent other threads from getting
271 Mutex::Lock
getlock ( m_GetMutex
);
274 // Thanks go to Royce Mitchell III (royce3@aim-controls.com) for finding
275 // a HUGE bug here. I was using HeapReAlloc as a short cut but my logic
276 // was flawed. In certain circumstances, queue items were being dropped.
278 new_array
= (T
*)HEAPALLOC ( new_size
* sizeof(T
) );
280 // Now copy all of the old items from the old queue to the new one.
282 // Get the entries from the get-index to the end of the array
283 memcpy ( new_array
, &m_Items
[m_GetIndex
], ( m_Size
- m_GetIndex
) * sizeof(T
) );
285 // Get the entries from the beginning of the array to the add-index
286 memcpy ( &new_array
[m_Size
-m_GetIndex
], m_Items
, m_AddIndex
* sizeof(T
) );
288 m_AddIndex
= (off_t
)m_Size
;
291 pointer_to_free
= m_Items
;
294 HEAPFREE ( pointer_to_free
);
298 inline bool CQueueT
<T
>::Contains ( const T
& t
)
300 Mutex::Lock
addlock ( m_AddMutex
);
301 Mutex::Lock
getlock ( m_GetMutex
);
303 for ( int i
= m_GetIndex
; i
!= m_AddIndex
; i
++ )
307 if ( m_Items
[i
] == t
)
310 return m_Items
[m_AddIndex
] == t
;
313 typedef CQueueT
<char> CCharQueue
;
315 #endif // QUEUE_CLASS_HEADER