3 * COPYRIGHT: See COPYING in the top level directory
4 * PROJECT: ReactOS kernel
5 * FILE: ntoskrnl/ke/queue.c
6 * PURPOSE: Implements kernel queues
8 * PROGRAMMERS: Eric Kohl (ekohl@rz-online.de)
11 /* INCLUDES *****************************************************************/
15 #include <internal/debug.h>
17 /* FUNCTIONS *****************************************************************/
19 LONG STDCALL
KiInsertQueue(IN PKQUEUE Queue
, IN PLIST_ENTRY Entry
, BOOLEAN Head
);
26 KeInitializeQueue(IN PKQUEUE Queue
,
27 IN ULONG Count OPTIONAL
)
29 DPRINT("KeInitializeQueue %x\n", Queue
);
31 /* Initialize the Header */
32 KeInitializeDispatcherHeader(&Queue
->Header
,
34 sizeof(KQUEUE
)/sizeof(ULONG
),
37 /* Initialize the Lists */
38 InitializeListHead(&Queue
->EntryListHead
);
39 InitializeListHead(&Queue
->ThreadListHead
);
41 /* Set the Current and Maximum Count */
42 Queue
->CurrentCount
= 0;
43 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
51 KeInsertHeadQueue(IN PKQUEUE Queue
,
57 DPRINT("KeInsertHeadQueue %x\n", Queue
);
59 /* Lock the Dispatcher Database */
60 OldIrql
= KeAcquireDispatcherDatabaseLock();
62 /* Insert the Queue */
63 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
65 /* Release the Dispatcher Lock */
66 KeReleaseDispatcherDatabaseLock(OldIrql
);
68 /* Return previous State */
76 KeInsertQueue(IN PKQUEUE Queue
,
82 DPRINT("KeInsertQueue %x\n", Queue
);
84 /* Lock the Dispatcher Database */
85 OldIrql
= KeAcquireDispatcherDatabaseLock();
87 /* Insert the Queue */
88 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
90 /* Release the Dispatcher Lock */
91 KeReleaseDispatcherDatabaseLock(OldIrql
);
93 /* Return previous State */
100 * Returns number of entries in the queue
104 KeReadStateQueue(IN PKQUEUE Queue
)
106 /* Returns the Signal State */
107 return(Queue
->Header
.SignalState
);
115 KeRemoveQueue(IN PKQUEUE Queue
,
116 IN KPROCESSOR_MODE WaitMode
,
117 IN PLARGE_INTEGER Timeout OPTIONAL
)
120 PLIST_ENTRY ListEntry
;
122 PKTHREAD Thread
= KeGetCurrentThread();
124 PKQUEUE PreviousQueue
;
125 PKWAIT_BLOCK WaitBlock
;
126 PKWAIT_BLOCK TimerWaitBlock
;
129 DPRINT("KeRemoveQueue %x\n", Queue
);
131 /* Check if the Lock is already held */
132 if (Thread
->WaitNext
) {
134 DPRINT("Lock is already held\n");
138 /* Lock the Dispatcher Database */
139 DPRINT("Lock not held, acquiring\n");
140 OldIrql
= KeAcquireDispatcherDatabaseLock();
141 Thread
->WaitIrql
= OldIrql
;
144 /* This is needed so that we can set the new queue right here, before additional processing */
145 PreviousQueue
= Thread
->Queue
;
146 Thread
->Queue
= Queue
;
148 /* Check if this is a different queue */
149 if (Queue
!= PreviousQueue
) {
152 * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
153 * Queue->ThreadListHead when the thread registers with the queue and unlinked when
154 * the thread registers with a new queue. The Thread->Queue already tells us what
155 * queue the thread is registered with.
158 DPRINT("Different Queue\n");
161 /* Remove from this list */
162 DPRINT("Removing Old Queue\n");
163 RemoveEntryList(&Thread
->QueueListEntry
);
166 DPRINT("Activating new thread\n");
167 KiWakeQueue(PreviousQueue
);
170 /* Insert in this new Queue */
171 DPRINT("Inserting new Queue!\n");
172 InsertTailList(&Queue
->ThreadListHead
, &Thread
->QueueListEntry
);
176 /* Same queue, decrement waiting threads */
177 DPRINT("Same Queue!\n");
178 Queue
->CurrentCount
--;
181 /* Loop until the queue is processed */
185 ListEntry
= Queue
->EntryListHead
.Flink
;
187 /* Check if the counts are valid and if there is still a queued entry */
188 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
189 (ListEntry
!= &Queue
->EntryListHead
)) {
191 /* Remove the Entry and Save it */
192 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
193 Queue
->CurrentCount
, Queue
->MaximumCount
);
194 ListEntry
= RemoveHeadList(&Queue
->EntryListHead
);
196 /* Decrease the number of entries */
197 Queue
->Header
.SignalState
--;
199 /* Increase numbef of running threads */
200 Queue
->CurrentCount
++;
202 /* Check if the entry is valid. If not, bugcheck */
203 if (!ListEntry
->Flink
|| !ListEntry
->Blink
) {
205 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
208 /* Remove the Entry */
209 RemoveEntryList(ListEntry
);
210 ListEntry
->Flink
= NULL
;
212 /* Nothing to wait on */
218 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
219 Queue
->CurrentCount
, Queue
->MaximumCount
);
221 /* Use the Thread's Wait Block, it's big enough */
222 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
224 /* Fail if there's an APC Pending */
225 if (WaitMode
== UserMode
&& Thread
->ApcState
.UserApcPending
) {
227 /* Return the status and increase the pending threads */
228 ListEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
229 Queue
->CurrentCount
++;
231 /* Nothing to wait on */
235 /* Build the Wait Block */
236 WaitBlock
= &Thread
->WaitBlock
[0];
237 WaitBlock
->Object
= (PVOID
)Queue
;
238 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
239 WaitBlock
->WaitType
= WaitAny
;
240 WaitBlock
->Thread
= Thread
;
241 WaitBlock
->NextWaitBlock
= NULL
;
243 Thread
->WaitStatus
= STATUS_SUCCESS
;
245 /* We need to wait for the object... check if we have a timeout */
248 /* If it's zero, then don't do any waiting */
249 if (!Timeout
->QuadPart
) {
251 /* Instant Timeout, return the status and increase the pending threads */
252 DPRINT("Queue Wait has timed out\n");
253 ListEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
254 Queue
->CurrentCount
++;
256 /* Nothing to wait on */
261 * Set up the Timer. We'll use the internal function so that we can
262 * hold on to the dispatcher lock.
264 Timer
= &Thread
->Timer
;
265 TimerWaitBlock
= &Thread
->WaitBlock
[1];
267 /* Set up the Timer Wait Block */
268 TimerWaitBlock
->Object
= (PVOID
)Timer
;
269 TimerWaitBlock
->Thread
= Thread
;
270 TimerWaitBlock
->WaitKey
= STATUS_TIMEOUT
;
271 TimerWaitBlock
->WaitType
= WaitAny
;
272 TimerWaitBlock
->NextWaitBlock
= NULL
;
274 /* Link the timer to this Wait Block */
275 InitializeListHead(&Timer
->Header
.WaitListHead
);
276 InsertTailList(&Timer
->Header
.WaitListHead
, &TimerWaitBlock
->WaitListEntry
);
279 DPRINT("Creating Timer with timeout %I64d\n", *Timeout
);
280 KiInsertTimer(Timer
, *Timeout
);
283 /* Insert the wait block into the Queues's wait list */
284 WaitBlock
= Thread
->WaitBlockList
;
285 InsertTailList(&Queue
->Header
.WaitListHead
, &WaitBlock
->WaitListEntry
);
287 /* Block the Thread */
288 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
289 PsBlockThread(&Status
,
294 /* Reset the wait reason */
295 Thread
->WaitReason
= 0;
297 /* Check if we were executing an APC */
298 if (Status
!= STATUS_KERNEL_APC
) {
301 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
302 return (PLIST_ENTRY
)Status
;
305 /* Acquire again the lock */
306 DPRINT("Looping again\n");
307 OldIrql
= KeAcquireDispatcherDatabaseLock();
309 /* Save the new IRQL and decrease number of waiting threads */
310 Thread
->WaitIrql
= OldIrql
;
311 Queue
->CurrentCount
--;
315 /* Unlock Database and return */
316 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
317 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
318 Queue
->CurrentCount
, Queue
->MaximumCount
);
327 KeRundownQueue(IN PKQUEUE Queue
)
329 PLIST_ENTRY EnumEntry
;
330 PLIST_ENTRY FirstEntry
;
334 DPRINT("KeRundownQueue(Queue %x)\n", Queue
);
336 /* Get the Dispatcher Lock */
337 OldIrql
= KeAcquireDispatcherDatabaseLock();
339 /* Get the First Empty Entry */
340 FirstEntry
= Queue
->EntryListHead
.Flink
;
342 /* Make sure the list is not empty */
343 if (FirstEntry
== &Queue
->EntryListHead
) {
345 /* It is, so don't return anything */
351 RemoveEntryList(&Queue
->EntryListHead
);
354 /* Unlink threads and clear their Thread->Queue */
355 while (!IsListEmpty(&Queue
->ThreadListHead
)) {
357 /* Get the Entry and Remove it */
358 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
360 /* Get the Entry's Thread */
361 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
364 Thread
->Queue
= NULL
;
367 /* Release the lock and return */
368 KeReleaseDispatcherDatabaseLock(OldIrql
);
373 * Called when a thread which has a queue entry is entering a wait state
377 KiWakeQueue(IN PKQUEUE Queue
)
379 PLIST_ENTRY QueueEntry
;
380 PLIST_ENTRY WaitEntry
;
381 PKWAIT_BLOCK WaitBlock
;
383 /* Decrement the number of active threads */
384 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue
, KeGetCurrentThread());
385 Queue
->CurrentCount
--;
387 /* Make sure the counts are OK */
388 if (Queue
->CurrentCount
< Queue
->MaximumCount
) {
390 /* Get the Queue Entry */
391 QueueEntry
= Queue
->EntryListHead
.Flink
;
393 /* Get the Wait Entry */
394 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
395 DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry
, WaitEntry
);
397 /* Make sure that the Queue List isn't empty and that this entry is valid */
398 if (!IsListEmpty(&Queue
->Header
.WaitListHead
) &&
399 (QueueEntry
!= &Queue
->EntryListHead
)) {
401 /* Remove this entry */
402 DPRINT("Queue in List, removing it\n");
403 RemoveEntryList(QueueEntry
);
404 QueueEntry
->Flink
= NULL
;
406 /* Decrease the Signal State */
407 Queue
->Header
.SignalState
--;
409 /* Unwait the Thread */
410 DPRINT("Unwaiting Thread\n");
411 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
412 KiAbortWaitThread(WaitBlock
->Thread
, (NTSTATUS
)QueueEntry
);
418 * Returns the previous number of entries in the queue
422 KiInsertQueue(IN PKQUEUE Queue
,
423 IN PLIST_ENTRY Entry
,
427 PKTHREAD Thread
= KeGetCurrentThread();
428 PKWAIT_BLOCK WaitBlock
;
429 PLIST_ENTRY WaitEntry
;
431 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue
, Entry
);
433 /* Save the old state */
434 InitialState
= Queue
->Header
.SignalState
;
437 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
438 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState
, WaitEntry
);
441 * Why the KeGetCurrentThread()->Queue != Queue?
442 * KiInsertQueue might be called from an APC for the current thread.
445 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
446 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
447 ((Thread
->Queue
!= Queue
) || (Thread
->WaitReason
!= WrQueue
))) {
449 /* Remove the wait entry */
450 DPRINT("Removing Entry\n");
451 RemoveEntryList(WaitEntry
);
453 /* Get the Wait Block and Thread */
454 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
455 DPRINT("Got wait block: %x\n", WaitBlock
);
456 Thread
= WaitBlock
->Thread
;
458 /* Reset the wait reason */
459 Thread
->WaitReason
= 0;
461 /* Increase the waiting threads */
462 Queue
->CurrentCount
++;
464 /* Check if there's a Thread Timer */
465 if (Thread
->Timer
.Header
.Inserted
) {
467 /* Cancel the Thread Timer with the no-lock fastpath */
468 DPRINT("Removing the Thread's Timer\n");
469 Thread
->Timer
.Header
.Inserted
= FALSE
;
470 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
473 /* Reschedule the Thread */
474 DPRINT("Unblocking the Thread\n");
475 PsUnblockThread((PETHREAD
)Thread
, (PNTSTATUS
)&Entry
, 0);
479 /* Increase the Entries */
480 DPRINT("Adding new Queue Entry: %d %d\n", Head
, Queue
->Header
.SignalState
);
481 Queue
->Header
.SignalState
++;
485 InsertHeadList(&Queue
->EntryListHead
, Entry
);
489 InsertTailList(&Queue
->EntryListHead
, Entry
);
493 /* Return the previous state */
494 DPRINT("Returning\n");