2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
9 * Eric Kohl (ekohl@rz-online.de)
12 /* INCLUDES *****************************************************************/
16 #include <internal/debug.h>
18 /* FUNCTIONS *****************************************************************/
20 LONG STDCALL
KiInsertQueue(IN PKQUEUE Queue
, IN PLIST_ENTRY Entry
, BOOLEAN Head
);
27 KeInitializeQueue(IN PKQUEUE Queue
,
28 IN ULONG Count OPTIONAL
)
30 DPRINT("KeInitializeQueue %x\n", Queue
);
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue
->Header
,
35 sizeof(KQUEUE
)/sizeof(ULONG
),
38 /* Initialize the Lists */
39 InitializeListHead(&Queue
->EntryListHead
);
40 InitializeListHead(&Queue
->ThreadListHead
);
42 /* Set the Current and Maximum Count */
43 Queue
->CurrentCount
= 0;
44 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
52 KeInsertHeadQueue(IN PKQUEUE Queue
,
58 DPRINT("KeInsertHeadQueue %x\n", Queue
);
60 /* Lock the Dispatcher Database */
61 OldIrql
= KeAcquireDispatcherDatabaseLock();
63 /* Insert the Queue */
64 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql
);
69 /* Return previous State */
77 KeInsertQueue(IN PKQUEUE Queue
,
83 DPRINT("KeInsertQueue %x\n", Queue
);
85 /* Lock the Dispatcher Database */
86 OldIrql
= KeAcquireDispatcherDatabaseLock();
88 /* Insert the Queue */
89 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql
);
94 /* Return previous State */
101 * Returns number of entries in the queue
105 KeReadStateQueue(IN PKQUEUE Queue
)
107 /* Returns the Signal State */
108 return(Queue
->Header
.SignalState
);
116 KeRemoveQueue(IN PKQUEUE Queue
,
117 IN KPROCESSOR_MODE WaitMode
,
118 IN PLARGE_INTEGER Timeout OPTIONAL
)
121 PLIST_ENTRY ListEntry
;
123 PKTHREAD Thread
= KeGetCurrentThread();
125 PKQUEUE PreviousQueue
;
126 PKWAIT_BLOCK WaitBlock
;
127 PKWAIT_BLOCK TimerWaitBlock
;
130 DPRINT("KeRemoveQueue %x\n", Queue
);
132 /* Check if the Lock is already held */
133 if (Thread
->WaitNext
) {
135 DPRINT("Lock is already held\n");
139 /* Lock the Dispatcher Database */
140 DPRINT("Lock not held, acquiring\n");
141 OldIrql
= KeAcquireDispatcherDatabaseLock();
142 Thread
->WaitIrql
= OldIrql
;
145 /* This is needed so that we can set the new queue right here, before additional processing */
146 PreviousQueue
= Thread
->Queue
;
147 Thread
->Queue
= Queue
;
149 /* Check if this is a different queue */
150 if (Queue
!= PreviousQueue
) {
153 * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
154 * Queue->ThreadListHead when the thread registers with the queue and unlinked when
155 * the thread registers with a new queue. The Thread->Queue already tells us what
156 * queue the thread is registered with.
159 DPRINT("Different Queue\n");
162 /* Remove from this list */
163 DPRINT("Removing Old Queue\n");
164 RemoveEntryList(&Thread
->QueueListEntry
);
167 DPRINT("Activating new thread\n");
168 KiWakeQueue(PreviousQueue
);
171 /* Insert in this new Queue */
172 DPRINT("Inserting new Queue!\n");
173 InsertTailList(&Queue
->ThreadListHead
, &Thread
->QueueListEntry
);
177 /* Same queue, decrement waiting threads */
178 DPRINT("Same Queue!\n");
179 Queue
->CurrentCount
--;
182 /* Loop until the queue is processed */
186 ListEntry
= Queue
->EntryListHead
.Flink
;
188 /* Check if the counts are valid and if there is still a queued entry */
189 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
190 (ListEntry
!= &Queue
->EntryListHead
)) {
192 /* Remove the Entry and Save it */
193 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
194 Queue
->CurrentCount
, Queue
->MaximumCount
);
195 ListEntry
= RemoveHeadList(&Queue
->EntryListHead
);
197 /* Decrease the number of entries */
198 Queue
->Header
.SignalState
--;
200 /* Increase numbef of running threads */
201 Queue
->CurrentCount
++;
203 /* Check if the entry is valid. If not, bugcheck */
204 if (!ListEntry
->Flink
|| !ListEntry
->Blink
) {
206 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
209 /* Remove the Entry */
210 RemoveEntryList(ListEntry
);
211 ListEntry
->Flink
= NULL
;
213 /* Nothing to wait on */
219 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
220 Queue
->CurrentCount
, Queue
->MaximumCount
);
222 /* Use the Thread's Wait Block, it's big enough */
223 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
225 /* Fail if there's an APC Pending */
226 if (WaitMode
== UserMode
&& Thread
->ApcState
.UserApcPending
) {
228 /* Return the status and increase the pending threads */
229 ListEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
230 Queue
->CurrentCount
++;
232 /* Nothing to wait on */
236 /* Build the Wait Block */
237 WaitBlock
= &Thread
->WaitBlock
[0];
238 WaitBlock
->Object
= (PVOID
)Queue
;
239 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
240 WaitBlock
->WaitType
= WaitAny
;
241 WaitBlock
->Thread
= Thread
;
242 WaitBlock
->NextWaitBlock
= WaitBlock
;
244 Thread
->WaitStatus
= STATUS_SUCCESS
;
246 /* We need to wait for the object... check if we have a timeout */
249 /* If it's zero, then don't do any waiting */
250 if (!Timeout
->QuadPart
) {
252 /* Instant Timeout, return the status and increase the pending threads */
253 DPRINT("Queue Wait has timed out\n");
254 ListEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
255 Queue
->CurrentCount
++;
257 /* Nothing to wait on */
262 * Set up the Timer. We'll use the internal function so that we can
263 * hold on to the dispatcher lock.
265 Timer
= &Thread
->Timer
;
266 TimerWaitBlock
= &Thread
->WaitBlock
[1];
268 /* Set up the Timer Wait Block */
269 TimerWaitBlock
->Object
= (PVOID
)Timer
;
270 TimerWaitBlock
->Thread
= Thread
;
271 TimerWaitBlock
->WaitKey
= STATUS_TIMEOUT
;
272 TimerWaitBlock
->WaitType
= WaitAny
;
273 TimerWaitBlock
->NextWaitBlock
= TimerWaitBlock
;
275 /* Link the timer to this Wait Block */
276 InitializeListHead(&Timer
->Header
.WaitListHead
);
277 InsertTailList(&Timer
->Header
.WaitListHead
, &TimerWaitBlock
->WaitListEntry
);
280 DPRINT("Creating Timer with timeout %I64d\n", *Timeout
);
281 KiInsertTimer(Timer
, *Timeout
);
284 /* Insert the wait block into the Queues's wait list */
285 WaitBlock
= Thread
->WaitBlockList
;
286 InsertTailList(&Queue
->Header
.WaitListHead
, &WaitBlock
->WaitListEntry
);
288 /* Block the Thread */
289 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
290 KiBlockThread(&Status
,
295 /* Reset the wait reason */
296 Thread
->WaitReason
= 0;
298 /* Check if we were executing an APC */
299 if (Status
!= STATUS_KERNEL_APC
) {
302 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
303 return (PLIST_ENTRY
)Status
;
306 /* Acquire again the lock */
307 DPRINT("Looping again\n");
308 OldIrql
= KeAcquireDispatcherDatabaseLock();
310 /* Save the new IRQL and decrease number of waiting threads */
311 Thread
->WaitIrql
= OldIrql
;
312 Queue
->CurrentCount
--;
316 /* Unlock Database and return */
317 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
318 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
319 Queue
->CurrentCount
, Queue
->MaximumCount
);
328 KeRundownQueue(IN PKQUEUE Queue
)
330 PLIST_ENTRY EnumEntry
;
331 PLIST_ENTRY FirstEntry
;
335 DPRINT("KeRundownQueue(Queue %x)\n", Queue
);
337 /* Get the Dispatcher Lock */
338 OldIrql
= KeAcquireDispatcherDatabaseLock();
340 /* Get the First Empty Entry */
341 FirstEntry
= Queue
->EntryListHead
.Flink
;
343 /* Make sure the list is not empty */
344 if (FirstEntry
== &Queue
->EntryListHead
) {
346 /* It is, so don't return anything */
352 RemoveEntryList(&Queue
->EntryListHead
);
355 /* Unlink threads and clear their Thread->Queue */
356 while (!IsListEmpty(&Queue
->ThreadListHead
)) {
358 /* Get the Entry and Remove it */
359 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
361 /* Get the Entry's Thread */
362 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
365 Thread
->Queue
= NULL
;
368 /* Release the lock and return */
369 KeReleaseDispatcherDatabaseLock(OldIrql
);
374 * Called when a thread which has a queue entry is entering a wait state
378 KiWakeQueue(IN PKQUEUE Queue
)
380 PLIST_ENTRY QueueEntry
;
381 PLIST_ENTRY WaitEntry
;
382 PKWAIT_BLOCK WaitBlock
;
384 /* Decrement the number of active threads */
385 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue
, KeGetCurrentThread());
386 Queue
->CurrentCount
--;
388 /* Make sure the counts are OK */
389 if (Queue
->CurrentCount
< Queue
->MaximumCount
) {
391 /* Get the Queue Entry */
392 QueueEntry
= Queue
->EntryListHead
.Flink
;
394 /* Get the Wait Entry */
395 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
396 DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry
, WaitEntry
);
398 /* Make sure that the Queue List isn't empty and that this entry is valid */
399 if (!IsListEmpty(&Queue
->Header
.WaitListHead
) &&
400 (QueueEntry
!= &Queue
->EntryListHead
)) {
402 /* Remove this entry */
403 DPRINT("Queue in List, removing it\n");
404 RemoveEntryList(QueueEntry
);
405 QueueEntry
->Flink
= NULL
;
407 /* Decrease the Signal State */
408 Queue
->Header
.SignalState
--;
410 /* Unwait the Thread */
411 DPRINT("Unwaiting Thread\n");
412 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
413 KiAbortWaitThread(WaitBlock
->Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
419 * Returns the previous number of entries in the queue
423 KiInsertQueue(IN PKQUEUE Queue
,
424 IN PLIST_ENTRY Entry
,
428 PKTHREAD Thread
= KeGetCurrentThread();
429 PKWAIT_BLOCK WaitBlock
;
430 PLIST_ENTRY WaitEntry
;
432 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue
, Entry
);
434 /* Save the old state */
435 InitialState
= Queue
->Header
.SignalState
;
438 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
439 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState
, WaitEntry
);
442 * Why the KeGetCurrentThread()->Queue != Queue?
443 * KiInsertQueue might be called from an APC for the current thread.
446 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
447 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
448 ((Thread
->Queue
!= Queue
) || (Thread
->WaitReason
!= WrQueue
))) {
450 /* Remove the wait entry */
451 DPRINT("Removing Entry\n");
452 RemoveEntryList(WaitEntry
);
454 /* Get the Wait Block and Thread */
455 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
456 DPRINT("Got wait block: %x\n", WaitBlock
);
457 Thread
= WaitBlock
->Thread
;
459 /* Reset the wait reason */
460 Thread
->WaitReason
= 0;
462 /* Increase the waiting threads */
463 Queue
->CurrentCount
++;
465 /* Check if there's a Thread Timer */
466 if (Thread
->Timer
.Header
.Inserted
) {
468 /* Cancel the Thread Timer with the no-lock fastpath */
469 DPRINT("Removing the Thread's Timer\n");
470 Thread
->Timer
.Header
.Inserted
= FALSE
;
471 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
474 /* Reschedule the Thread */
475 DPRINT("Unblocking the Thread\n");
476 KiUnblockThread(Thread
, (PNTSTATUS
)&Entry
, 0);
480 /* Increase the Entries */
481 DPRINT("Adding new Queue Entry: %d %d\n", Head
, Queue
->Header
.SignalState
);
482 Queue
->Header
.SignalState
++;
486 InsertHeadList(&Queue
->EntryListHead
, Entry
);
490 InsertTailList(&Queue
->EntryListHead
, Entry
);
494 /* Return the previous state */
495 DPRINT("Returning\n");