2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
9 * Eric Kohl (ekohl@rz-online.de)
12 /* INCLUDES *****************************************************************/
16 #include <internal/debug.h>
18 /* FUNCTIONS *****************************************************************/
20 LONG STDCALL
KiInsertQueue(IN PKQUEUE Queue
, IN PLIST_ENTRY Entry
, BOOLEAN Head
);
27 KeInitializeQueue(IN PKQUEUE Queue
,
28 IN ULONG Count OPTIONAL
)
30 DPRINT("KeInitializeQueue %x\n", Queue
);
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue
->Header
,
35 sizeof(KQUEUE
)/sizeof(ULONG
),
38 /* Initialize the Lists */
39 InitializeListHead(&Queue
->EntryListHead
);
40 InitializeListHead(&Queue
->ThreadListHead
);
42 /* Set the Current and Maximum Count */
43 Queue
->CurrentCount
= 0;
44 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
52 KeInsertHeadQueue(IN PKQUEUE Queue
,
58 DPRINT("KeInsertHeadQueue %x\n", Queue
);
60 /* Lock the Dispatcher Database */
61 OldIrql
= KeAcquireDispatcherDatabaseLock();
63 /* Insert the Queue */
64 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql
);
69 /* Return previous State */
77 KeInsertQueue(IN PKQUEUE Queue
,
83 DPRINT("KeInsertQueue %x\n", Queue
);
85 /* Lock the Dispatcher Database */
86 OldIrql
= KeAcquireDispatcherDatabaseLock();
88 /* Insert the Queue */
89 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql
);
94 /* Return previous State */
101 * Returns number of entries in the queue
105 KeReadStateQueue(IN PKQUEUE Queue
)
107 /* Returns the Signal State */
108 return(Queue
->Header
.SignalState
);
116 KeRemoveQueue(IN PKQUEUE Queue
,
117 IN KPROCESSOR_MODE WaitMode
,
118 IN PLARGE_INTEGER Timeout OPTIONAL
)
121 PLIST_ENTRY ListEntry
;
123 PKTHREAD Thread
= KeGetCurrentThread();
125 PKQUEUE PreviousQueue
;
126 PKWAIT_BLOCK WaitBlock
;
127 PKWAIT_BLOCK TimerWaitBlock
;
130 DPRINT("KeRemoveQueue %x\n", Queue
);
132 /* Check if the Lock is already held */
133 if (Thread
->WaitNext
) {
135 DPRINT("Lock is already held\n");
139 /* Lock the Dispatcher Database */
140 DPRINT("Lock not held, acquiring\n");
141 OldIrql
= KeAcquireDispatcherDatabaseLock();
142 Thread
->WaitIrql
= OldIrql
;
145 /* This is needed so that we can set the new queue right here, before additional processing */
146 PreviousQueue
= Thread
->Queue
;
147 Thread
->Queue
= Queue
;
149 /* Check if this is a different queue */
150 if (Queue
!= PreviousQueue
) {
153 * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
154 * Queue->ThreadListHead when the thread registers with the queue and unlinked when
155 * the thread registers with a new queue. The Thread->Queue already tells us what
156 * queue the thread is registered with.
159 DPRINT("Different Queue\n");
162 /* Remove from this list */
163 DPRINT("Removing Old Queue\n");
164 RemoveEntryList(&Thread
->QueueListEntry
);
167 DPRINT("Activating new thread\n");
168 KiWakeQueue(PreviousQueue
);
171 /* Insert in this new Queue */
172 DPRINT("Inserting new Queue!\n");
173 InsertTailList(&Queue
->ThreadListHead
, &Thread
->QueueListEntry
);
177 /* Same queue, decrement waiting threads */
178 DPRINT("Same Queue!\n");
179 Queue
->CurrentCount
--;
182 /* Loop until the queue is processed */
185 /* Check if the counts are valid and if there is still a queued entry */
186 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
187 !IsListEmpty(&Queue
->EntryListHead
)) {
189 /* Remove the Entry and Save it */
190 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
191 Queue
->CurrentCount
, Queue
->MaximumCount
);
192 ListEntry
= Queue
->EntryListHead
.Flink
;
194 /* Decrease the number of entries */
195 Queue
->Header
.SignalState
--;
197 /* Increase numbef of running threads */
198 Queue
->CurrentCount
++;
200 /* Check if the entry is valid. If not, bugcheck */
201 if (!ListEntry
->Flink
|| !ListEntry
->Blink
) {
203 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
206 /* Remove the Entry */
207 RemoveEntryList(ListEntry
);
208 ListEntry
->Flink
= NULL
;
210 /* Nothing to wait on */
216 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
217 Queue
->CurrentCount
, Queue
->MaximumCount
);
219 /* Use the Thread's Wait Block, it's big enough */
220 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
222 /* Fail if there's an APC Pending */
223 if (WaitMode
!= KernelMode
&& Thread
->ApcState
.UserApcPending
) {
225 /* Return the status and increase the pending threads */
226 ListEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
227 Queue
->CurrentCount
++;
229 /* Nothing to wait on */
233 /* Build the Wait Block */
234 WaitBlock
= &Thread
->WaitBlock
[0];
235 WaitBlock
->Object
= (PVOID
)Queue
;
236 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
237 WaitBlock
->WaitType
= WaitAny
;
238 WaitBlock
->Thread
= Thread
;
239 WaitBlock
->NextWaitBlock
= WaitBlock
;
241 Thread
->WaitStatus
= STATUS_SUCCESS
;
243 /* We need to wait for the object... check if we have a timeout */
246 /* If it's zero, then don't do any waiting */
247 if (!Timeout
->QuadPart
) {
249 /* Instant Timeout, return the status and increase the pending threads */
250 DPRINT("Queue Wait has timed out\n");
251 ListEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
252 Queue
->CurrentCount
++;
254 /* Nothing to wait on */
259 * Set up the Timer. We'll use the internal function so that we can
260 * hold on to the dispatcher lock.
262 Timer
= &Thread
->Timer
;
263 TimerWaitBlock
= &Thread
->WaitBlock
[1];
265 /* Set up the Timer Wait Block */
266 TimerWaitBlock
->Object
= (PVOID
)Timer
;
267 TimerWaitBlock
->Thread
= Thread
;
268 TimerWaitBlock
->WaitKey
= STATUS_TIMEOUT
;
269 TimerWaitBlock
->WaitType
= WaitAny
;
270 TimerWaitBlock
->NextWaitBlock
= TimerWaitBlock
;
272 /* Link the timer to this Wait Block */
273 InitializeListHead(&Timer
->Header
.WaitListHead
);
274 InsertTailList(&Timer
->Header
.WaitListHead
, &TimerWaitBlock
->WaitListEntry
);
277 DPRINT("Creating Timer with timeout %I64d\n", *Timeout
);
278 KiInsertTimer(Timer
, *Timeout
);
281 /* Insert the wait block into the Queues's wait list */
282 WaitBlock
= Thread
->WaitBlockList
;
283 InsertTailList(&Queue
->Header
.WaitListHead
, &WaitBlock
->WaitListEntry
);
285 /* Block the Thread */
286 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
287 KiBlockThread(&Status
,
292 /* Reset the wait reason */
293 Thread
->WaitReason
= 0;
295 /* Check if we were executing an APC */
296 if (Status
!= STATUS_KERNEL_APC
) {
299 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
300 return (PLIST_ENTRY
)Status
;
303 /* Acquire again the lock */
304 DPRINT("Looping again\n");
305 OldIrql
= KeAcquireDispatcherDatabaseLock();
307 /* Save the new IRQL and decrease number of waiting threads */
308 Thread
->WaitIrql
= OldIrql
;
309 Queue
->CurrentCount
--;
313 /* Unlock Database and return */
314 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
315 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
316 Queue
->CurrentCount
, Queue
->MaximumCount
);
325 KeRundownQueue(IN PKQUEUE Queue
)
327 PLIST_ENTRY EnumEntry
;
328 PLIST_ENTRY FirstEntry
= NULL
;
332 DPRINT("KeRundownQueue(Queue %x)\n", Queue
);
334 /* Get the Dispatcher Lock */
335 OldIrql
= KeAcquireDispatcherDatabaseLock();
337 /* Make sure the list is not empty */
338 if (!IsListEmpty(&Queue
->EntryListHead
))
341 FirstEntry
= RemoveHeadList(&Queue
->EntryListHead
);
344 /* Unlink threads and clear their Thread->Queue */
345 while (!IsListEmpty(&Queue
->ThreadListHead
)) {
347 /* Get the Entry and Remove it */
348 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
350 /* Get the Entry's Thread */
351 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
354 Thread
->Queue
= NULL
;
357 /* Release the lock and return */
358 KeReleaseDispatcherDatabaseLock(OldIrql
);
363 * Called when a thread which has a queue entry is entering a wait state
367 KiWakeQueue(IN PKQUEUE Queue
)
369 PLIST_ENTRY QueueEntry
;
370 PLIST_ENTRY WaitEntry
;
371 PKWAIT_BLOCK WaitBlock
;
373 /* Decrement the number of active threads */
374 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue
, KeGetCurrentThread());
375 Queue
->CurrentCount
--;
377 /* Make sure the counts are OK */
378 if (Queue
->CurrentCount
< Queue
->MaximumCount
) {
380 /* Get the Queue Entry */
381 QueueEntry
= Queue
->EntryListHead
.Flink
;
383 /* Get the Wait Entry */
384 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
385 DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry
, WaitEntry
);
387 /* Make sure that the Queue List isn't empty and that this entry is valid */
388 if (!IsListEmpty(&Queue
->Header
.WaitListHead
) &&
389 (QueueEntry
!= &Queue
->EntryListHead
)) {
391 /* Remove this entry */
392 DPRINT("Queue in List, removing it\n");
393 RemoveEntryList(QueueEntry
);
394 QueueEntry
->Flink
= NULL
;
396 /* Decrease the Signal State */
397 Queue
->Header
.SignalState
--;
399 /* Unwait the Thread */
400 DPRINT("Unwaiting Thread\n");
401 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
402 KiAbortWaitThread(WaitBlock
->Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
408 * Returns the previous number of entries in the queue
412 KiInsertQueue(IN PKQUEUE Queue
,
413 IN PLIST_ENTRY Entry
,
417 PKTHREAD Thread
= KeGetCurrentThread();
418 PKWAIT_BLOCK WaitBlock
;
419 PLIST_ENTRY WaitEntry
;
421 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue
, Entry
);
423 /* Save the old state */
424 InitialState
= Queue
->Header
.SignalState
;
427 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
428 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState
, WaitEntry
);
431 * Why the KeGetCurrentThread()->Queue != Queue?
432 * KiInsertQueue might be called from an APC for the current thread.
435 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
436 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
437 ((Thread
->Queue
!= Queue
) || (Thread
->WaitReason
!= WrQueue
))) {
439 /* Remove the wait entry */
440 DPRINT("Removing Entry\n");
441 RemoveEntryList(WaitEntry
);
443 /* Get the Wait Block and Thread */
444 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
445 DPRINT("Got wait block: %x\n", WaitBlock
);
446 Thread
= WaitBlock
->Thread
;
448 /* Reset the wait reason */
449 Thread
->WaitReason
= 0;
451 /* Increase the waiting threads */
452 Queue
->CurrentCount
++;
454 /* Check if there's a Thread Timer */
455 if (Thread
->Timer
.Header
.Inserted
) {
457 /* Cancel the Thread Timer with the no-lock fastpath */
458 DPRINT("Removing the Thread's Timer\n");
459 Thread
->Timer
.Header
.Inserted
= FALSE
;
460 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
463 /* Reschedule the Thread */
464 DPRINT("Unblocking the Thread\n");
465 KiUnblockThread(Thread
, (PNTSTATUS
)&Entry
, 0);
469 /* Increase the Entries */
470 DPRINT("Adding new Queue Entry: %d %d\n", Head
, Queue
->Header
.SignalState
);
471 Queue
->Header
.SignalState
++;
475 InsertHeadList(&Queue
->EntryListHead
, Entry
);
479 InsertTailList(&Queue
->EntryListHead
, Entry
);
483 /* Return the previous state */
484 DPRINT("Returning\n");