2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
8 * Eric Kohl (ekohl@rz-online.de)
11 /* INCLUDES ******************************************************************/
15 #include <internal/debug.h>
17 /* PRIVATE FUNCTIONS *********************************************************/
20 * Called when a thread which has a queue entry is entering a wait state
24 KiWakeQueue(IN PKQUEUE Queue
)
26 PLIST_ENTRY QueueEntry
;
27 PLIST_ENTRY WaitEntry
;
28 PKWAIT_BLOCK WaitBlock
;
32 /* Decrement the number of active threads */
33 Queue
->CurrentCount
--;
35 /* Make sure the counts are OK */
36 if (Queue
->CurrentCount
< Queue
->MaximumCount
)
38 /* Get the Queue Entry */
39 QueueEntry
= Queue
->EntryListHead
.Flink
;
41 /* Get the Wait Entry */
42 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
46 (QueueEntry
!= &Queue
->EntryListHead
))
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry
);
50 QueueEntry
->Flink
= NULL
;
52 /* Decrease the Signal State */
53 Queue
->Header
.SignalState
--;
55 /* Unwait the Thread */
56 WaitBlock
= CONTAINING_RECORD(WaitEntry
,
59 Thread
= WaitBlock
->Thread
;
60 KiAbortWaitThread(Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
66 * Returns the previous number of entries in the queue
70 KiInsertQueue(IN PKQUEUE Queue
,
75 PKTHREAD Thread
= KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock
;
77 PLIST_ENTRY WaitEntry
;
80 /* Save the old state */
81 InitialState
= Queue
->Header
.SignalState
;
84 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
87 * Why the KeGetCurrentThread()->Queue != Queue?
88 * KiInsertQueue might be called from an APC for the current thread.
91 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
92 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
93 ((Thread
->Queue
!= Queue
) ||
94 (Thread
->WaitReason
!= WrQueue
)))
96 /* Remove the wait entry */
97 RemoveEntryList(WaitEntry
);
99 /* Get the Wait Block and Thread */
100 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
101 Thread
= WaitBlock
->Thread
;
103 /* Remove the queue from the thread's wait list */
104 Thread
->WaitStatus
= (NTSTATUS
)Entry
;
105 if (Thread
->WaitListEntry
.Flink
) RemoveEntryList(&Thread
->WaitListEntry
);
106 Thread
->WaitReason
= 0;
108 /* Increase the active threads and set the status*/
109 Queue
->CurrentCount
++;
111 /* Check if there's a Thread Timer */
112 if (Thread
->Timer
.Header
.Inserted
)
114 /* Cancel the Thread Timer with the no-lock fastpath */
115 Thread
->Timer
.Header
.Inserted
= FALSE
;
116 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
119 /* Reschedule the Thread */
120 KiReadyThread(Thread
);
124 /* Increase the Entries */
125 Queue
->Header
.SignalState
++;
127 /* Check which mode we're using */
130 /* Insert in the head */
131 InsertHeadList(&Queue
->EntryListHead
, Entry
);
135 /* Insert at the end */
136 InsertTailList(&Queue
->EntryListHead
, Entry
);
140 /* Return the previous state */
144 /* PUBLIC FUNCTIONS **********************************************************/
151 KeInitializeQueue(IN PKQUEUE Queue
,
152 IN ULONG Count OPTIONAL
)
154 /* Initialize the Header */
155 KeInitializeDispatcherHeader(&Queue
->Header
,
157 sizeof(KQUEUE
) / sizeof(ULONG
),
160 /* Initialize the Lists */
161 InitializeListHead(&Queue
->EntryListHead
);
162 InitializeListHead(&Queue
->ThreadListHead
);
164 /* Set the Current and Maximum Count */
165 Queue
->CurrentCount
= 0;
166 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
174 KeInsertHeadQueue(IN PKQUEUE Queue
,
175 IN PLIST_ENTRY Entry
)
180 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
182 /* Lock the Dispatcher Database */
183 OldIrql
= KeAcquireDispatcherDatabaseLock();
185 /* Insert the Queue */
186 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
188 /* Release the Dispatcher Lock */
189 KeReleaseDispatcherDatabaseLock(OldIrql
);
191 /* Return previous State */
192 return PreviousState
;
200 KeInsertQueue(IN PKQUEUE Queue
,
201 IN PLIST_ENTRY Entry
)
206 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
208 /* Lock the Dispatcher Database */
209 OldIrql
= KeAcquireDispatcherDatabaseLock();
211 /* Insert the Queue */
212 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
214 /* Release the Dispatcher Lock */
215 KeReleaseDispatcherDatabaseLock(OldIrql
);
217 /* Return previous State */
218 return PreviousState
;
224 * Returns number of entries in the queue
228 KeReadStateQueue(IN PKQUEUE Queue
)
230 /* Returns the Signal State */
232 return Queue
->Header
.SignalState
;
240 KeRemoveQueue(IN PKQUEUE Queue
,
241 IN KPROCESSOR_MODE WaitMode
,
242 IN PLARGE_INTEGER Timeout OPTIONAL
)
244 PLIST_ENTRY QueueEntry
;
246 PKTHREAD Thread
= KeGetCurrentThread();
248 PKQUEUE PreviousQueue
;
249 PKWAIT_BLOCK WaitBlock
;
252 PLARGE_INTEGER OriginalDueTime
= Timeout
;
253 LARGE_INTEGER DueTime
, NewDueTime
;
255 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
257 /* Check if the Lock is already held */
258 if (Thread
->WaitNext
)
260 /* It is, so next time don't do expect this */
261 Thread
->WaitNext
= FALSE
;
265 /* Lock the Dispatcher Database */
266 OldIrql
= KeAcquireDispatcherDatabaseLock();
267 Thread
->WaitIrql
= OldIrql
;
271 * This is needed so that we can set the new queue right here,
272 * before additional processing
274 PreviousQueue
= Thread
->Queue
;
275 Thread
->Queue
= Queue
;
277 /* Check if this is a different queue */
278 if (Queue
!= PreviousQueue
)
280 /* Get the current entry */
281 QueueEntry
= &Thread
->QueueListEntry
;
284 /* Remove from this list */
285 RemoveEntryList(QueueEntry
);
288 KiWakeQueue(PreviousQueue
);
291 /* Insert in this new Queue */
292 InsertTailList(&Queue
->ThreadListHead
, QueueEntry
);
296 /* Same queue, decrement waiting threads */
297 Queue
->CurrentCount
--;
300 /* Loop until the queue is processed */
303 /* Check if the counts are valid and if there is still a queued entry */
304 QueueEntry
= Queue
->EntryListHead
.Flink
;
305 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
306 (QueueEntry
!= &Queue
->EntryListHead
))
308 /* Decrease the number of entries */
309 Queue
->Header
.SignalState
--;
311 /* Increase numbef of running threads */
312 Queue
->CurrentCount
++;
314 /* Check if the entry is valid. If not, bugcheck */
315 if (!(QueueEntry
->Flink
) || !(QueueEntry
->Blink
))
317 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
320 /* Remove the Entry */
321 RemoveEntryList(QueueEntry
);
322 QueueEntry
->Flink
= NULL
;
324 /* Nothing to wait on */
329 /* Use the Thread's Wait Block, it's big enough */
330 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
332 /* Check if a kernel APC is pending and we're below APC_LEVEL */
333 if ((Thread
->ApcState
.KernelApcPending
) &&
334 !(Thread
->SpecialApcDisable
) && (Thread
->WaitIrql
< APC_LEVEL
))
336 /* Increment the count and unlock the dispatcher */
337 Queue
->CurrentCount
++;
338 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
342 /* Fail if there's a User APC Pending */
343 if ((WaitMode
!= KernelMode
) &&
344 (Thread
->ApcState
.UserApcPending
))
346 /* Return the status and increase the pending threads */
347 QueueEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
348 Queue
->CurrentCount
++;
352 /* Build the Wait Block */
353 WaitBlock
= &Thread
->WaitBlock
[0];
354 WaitBlock
->Object
= (PVOID
)Queue
;
355 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
356 WaitBlock
->WaitType
= WaitAny
;
357 WaitBlock
->Thread
= Thread
;
358 Thread
->WaitStatus
= STATUS_WAIT_0
;
360 /* Check if we can swap the thread's stack */
361 Thread
->WaitListEntry
.Flink
= NULL
;
362 KiCheckThreadStackSwap(WaitMode
, Thread
, Swappable
);
364 /* We need to wait for the object... check for a timeout */
367 /* Check if it's zero */
368 if (!Timeout
->QuadPart
)
370 /* Don't wait. Return and increase pending threads */
371 QueueEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
372 Queue
->CurrentCount
++;
377 * Set up the Timer. We'll use the internal function so
378 * that we can hold on to the dispatcher lock.
380 Timer
= &Thread
->Timer
;
381 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[1];
382 WaitBlock
= &Thread
->WaitBlock
[1];
384 /* Set up the Timer Wait Block */
385 WaitBlock
->Object
= (PVOID
)Timer
;
386 WaitBlock
->Thread
= Thread
;
387 WaitBlock
->WaitKey
= STATUS_TIMEOUT
;
388 WaitBlock
->WaitType
= WaitAny
;
390 /* Link the timer to this Wait Block */
391 Timer
->Header
.WaitListHead
.Flink
=
392 &WaitBlock
->WaitListEntry
;
393 Timer
->Header
.WaitListHead
.Blink
=
394 &WaitBlock
->WaitListEntry
;
395 WaitBlock
->WaitListEntry
.Flink
=
396 &Timer
->Header
.WaitListHead
;
397 WaitBlock
->WaitListEntry
.Blink
=
398 &Timer
->Header
.WaitListHead
;
401 if (!KiInsertTimer(Timer
, *Timeout
))
404 DPRINT1("If you see thie message contact Alex ASAP\n");
408 /* Set timer due time */
409 DueTime
.QuadPart
= Timer
->DueTime
.QuadPart
;
413 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[0];
415 /* Insert the wait block into the Queues's wait list */
416 WaitBlock
= &Thread
->WaitBlock
[0];
417 InsertTailList(&Queue
->Header
.WaitListHead
,
418 &WaitBlock
->WaitListEntry
);
420 /* Setup the wait information */
421 Thread
->WaitMode
= WaitMode
;
422 Thread
->WaitReason
= WrQueue
;
423 Thread
->Alertable
= FALSE
;
424 Thread
->WaitTime
= ((PLARGE_INTEGER
)&KeTickCount
)->LowPart
;
425 Thread
->State
= Waiting
;
427 /* Find a new thread to run */
428 KiAddThreadToWaitList(Thread
, Swappable
);
429 Status
= KiSwapThread();
431 /* Reset the wait reason */
432 Thread
->WaitReason
= 0;
434 /* Check if we were executing an APC */
435 if (Status
!= STATUS_KERNEL_APC
)
438 return (PLIST_ENTRY
)Status
;
441 /* Check if we had a timeout */
444 /* Recalculate due times */
445 Timeout
= KiRecalculateDueTime(OriginalDueTime
,
451 /* Reacquire the lock */
452 OldIrql
= KeAcquireDispatcherDatabaseLock();
454 /* Save the new IRQL and decrease number of waiting threads */
455 Thread
->WaitIrql
= OldIrql
;
456 Queue
->CurrentCount
--;
460 /* Unlock Database and return */
461 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
470 KeRundownQueue(IN PKQUEUE Queue
)
472 PLIST_ENTRY EnumEntry
;
473 PLIST_ENTRY FirstEntry
= NULL
;
477 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
478 ASSERT(IsListEmpty(&Queue
->Header
.WaitListHead
));
480 /* Get the Dispatcher Lock */
481 OldIrql
= KeAcquireDispatcherDatabaseLock();
483 /* Make sure the list is not empty */
484 if (!IsListEmpty(&Queue
->EntryListHead
))
487 FirstEntry
= RemoveHeadList(&Queue
->EntryListHead
);
490 /* Unlink threads and clear their Thread->Queue */
491 while (!IsListEmpty(&Queue
->ThreadListHead
))
493 /* Get the Entry and Remove it */
494 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
496 /* Get the Entry's Thread */
497 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
500 Thread
->Queue
= NULL
;
503 /* Release the lock and return */
504 KeReleaseDispatcherDatabaseLock(OldIrql
);