2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
9 * Eric Kohl (ekohl@rz-online.de)
12 /* INCLUDES *****************************************************************/
16 #include <internal/debug.h>
18 /* FUNCTIONS *****************************************************************/
20 LONG STDCALL
KiInsertQueue(IN PKQUEUE Queue
, IN PLIST_ENTRY Entry
, BOOLEAN Head
);
27 KeInitializeQueue(IN PKQUEUE Queue
,
28 IN ULONG Count OPTIONAL
)
30 DPRINT("KeInitializeQueue %x\n", Queue
);
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue
->Header
,
35 sizeof(KQUEUE
)/sizeof(ULONG
),
38 /* Initialize the Lists */
39 InitializeListHead(&Queue
->EntryListHead
);
40 InitializeListHead(&Queue
->ThreadListHead
);
42 /* Set the Current and Maximum Count */
43 Queue
->CurrentCount
= 0;
44 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
52 KeInsertHeadQueue(IN PKQUEUE Queue
,
58 DPRINT("KeInsertHeadQueue %x\n", Queue
);
60 /* Lock the Dispatcher Database */
61 OldIrql
= KeAcquireDispatcherDatabaseLock();
63 /* Insert the Queue */
64 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql
);
69 /* Return previous State */
77 KeInsertQueue(IN PKQUEUE Queue
,
83 DPRINT("KeInsertQueue %x\n", Queue
);
85 /* Lock the Dispatcher Database */
86 OldIrql
= KeAcquireDispatcherDatabaseLock();
88 /* Insert the Queue */
89 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql
);
94 /* Return previous State */
101 * Returns number of entries in the queue
105 KeReadStateQueue(IN PKQUEUE Queue
)
107 /* Returns the Signal State */
108 return(Queue
->Header
.SignalState
);
116 KeRemoveQueue(IN PKQUEUE Queue
,
117 IN KPROCESSOR_MODE WaitMode
,
118 IN PLARGE_INTEGER Timeout OPTIONAL
)
120 PLIST_ENTRY QueueEntry
;
122 PKTHREAD Thread
= KeGetCurrentThread();
124 PKQUEUE PreviousQueue
;
125 PKWAIT_BLOCK WaitBlock
;
127 DPRINT("KeRemoveQueue %x\n", Queue
);
129 /* Check if the Lock is already held */
130 if (Thread
->WaitNext
)
132 DPRINT("Lock is already held\n");
133 Thread
->WaitNext
= FALSE
;
137 /* Lock the Dispatcher Database */
138 DPRINT("Lock not held, acquiring\n");
139 OldIrql
= KeAcquireDispatcherDatabaseLock();
140 Thread
->WaitIrql
= OldIrql
;
143 /* This is needed so that we can set the new queue right here, before additional processing */
144 PreviousQueue
= Thread
->Queue
;
145 Thread
->Queue
= Queue
;
147 /* Check if this is a different queue */
148 if (Queue
!= PreviousQueue
)
150 DPRINT("Different Queue\n");
151 QueueEntry
= &Thread
->QueueListEntry
;
154 /* Remove from this list */
155 DPRINT("Removing Old Queue\n");
156 RemoveEntryList(QueueEntry
);
159 DPRINT("Activating new thread\n");
160 KiWakeQueue(PreviousQueue
);
163 /* Insert in this new Queue */
164 DPRINT("Inserting new Queue!\n");
165 InsertTailList(&Queue
->ThreadListHead
, QueueEntry
);
169 /* Same queue, decrement waiting threads */
170 DPRINT("Same Queue!\n");
171 Queue
->CurrentCount
--;
174 /* Loop until the queue is processed */
177 /* Check if the counts are valid and if there is still a queued entry */
178 QueueEntry
= Queue
->EntryListHead
.Flink
;
179 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
180 (QueueEntry
!= &Queue
->EntryListHead
))
182 /* Remove the Entry and Save it */
183 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
184 Queue
->CurrentCount
, Queue
->MaximumCount
);
186 /* Decrease the number of entries */
187 Queue
->Header
.SignalState
--;
189 /* Increase numbef of running threads */
190 Queue
->CurrentCount
++;
192 /* Check if the entry is valid. If not, bugcheck */
193 if (!(QueueEntry
->Flink
) || !(QueueEntry
->Blink
))
195 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
198 /* Remove the Entry */
199 RemoveEntryList(QueueEntry
);
200 QueueEntry
->Flink
= NULL
;
202 /* Nothing to wait on */
208 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
209 Queue
->CurrentCount
, Queue
->MaximumCount
);
211 /* Use the Thread's Wait Block, it's big enough */
212 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
214 /* Check if a kernel APC is pending and we were below APC_LEVEL */
215 if ((Thread
->ApcState
.KernelApcPending
) &&
216 (Thread
->WaitIrql
< APC_LEVEL
))
218 /* Increment the count and unlock the dispatcher */
219 Queue
->CurrentCount
++;
220 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
224 /* Fail if there's a User APC Pending */
225 if ((WaitMode
!= KernelMode
) && (Thread
->ApcState
.UserApcPending
))
227 /* Return the status and increase the pending threads */
228 QueueEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
229 Queue
->CurrentCount
++;
231 /* Nothing to wait on */
235 /* Build the Wait Block */
236 WaitBlock
= &Thread
->WaitBlock
[0];
237 WaitBlock
->Object
= (PVOID
)Queue
;
238 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
239 WaitBlock
->WaitType
= WaitAny
;
240 WaitBlock
->Thread
= Thread
;
241 Thread
->WaitStatus
= STATUS_WAIT_0
;
243 /* We need to wait for the object... check if we have a timeout */
246 /* If it's zero, then don't do any waiting */
247 if (!Timeout
->QuadPart
)
249 /* Instant Timeout, return the status and increase the pending threads */
250 DPRINT("Queue Wait has timed out\n");
251 QueueEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
252 Queue
->CurrentCount
++;
254 /* Nothing to wait on */
259 * Set up the Timer. We'll use the internal function so that we can
260 * hold on to the dispatcher lock.
262 Timer
= &Thread
->Timer
;
263 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[1];
264 WaitBlock
= &Thread
->WaitBlock
[1];
266 /* Set up the Timer Wait Block */
267 WaitBlock
->Object
= (PVOID
)Timer
;
268 WaitBlock
->Thread
= Thread
;
269 WaitBlock
->WaitKey
= STATUS_TIMEOUT
;
270 WaitBlock
->WaitType
= WaitAny
;
272 /* Link the timer to this Wait Block */
273 InitializeListHead(&Timer
->Header
.WaitListHead
);
274 InsertTailList(&Timer
->Header
.WaitListHead
, &WaitBlock
->WaitListEntry
);
277 DPRINT("Creating Timer with timeout %I64d\n", *Timeout
);
278 KiInsertTimer(Timer
, *Timeout
);
282 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[0];
284 /* Insert the wait block into the Queues's wait list */
285 WaitBlock
= &Thread
->WaitBlock
[0];
286 InsertTailList(&Queue
->Header
.WaitListHead
,
287 &WaitBlock
->WaitListEntry
);
289 /* Block the Thread */
290 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
291 KiBlockThread(&Status
,
296 /* Reset the wait reason */
297 Thread
->WaitReason
= 0;
299 /* Check if we were executing an APC */
300 if (Status
!= STATUS_KERNEL_APC
)
303 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
304 return (PLIST_ENTRY
)Status
;
307 /* Check if we had a timeout */
310 /* FIXME: Fixup interval */
313 /* Acquire again the lock */
315 DPRINT("Looping again\n");
316 OldIrql
= KeAcquireDispatcherDatabaseLock();
318 /* Save the new IRQL and decrease number of waiting threads */
319 Thread
->WaitIrql
= OldIrql
;
320 Queue
->CurrentCount
--;
324 /* Unlock Database and return */
325 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
326 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
327 Queue
->CurrentCount
, Queue
->MaximumCount
);
336 KeRundownQueue(IN PKQUEUE Queue
)
338 PLIST_ENTRY EnumEntry
;
339 PLIST_ENTRY FirstEntry
= NULL
;
343 DPRINT("KeRundownQueue(Queue %x)\n", Queue
);
345 /* Get the Dispatcher Lock */
346 OldIrql
= KeAcquireDispatcherDatabaseLock();
348 /* Make sure the list is not empty */
349 if (!IsListEmpty(&Queue
->EntryListHead
))
352 FirstEntry
= RemoveHeadList(&Queue
->EntryListHead
);
355 /* Unlink threads and clear their Thread->Queue */
356 while (!IsListEmpty(&Queue
->ThreadListHead
))
358 /* Get the Entry and Remove it */
359 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
361 /* Get the Entry's Thread */
362 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
365 Thread
->Queue
= NULL
;
368 /* Release the lock and return */
369 KeReleaseDispatcherDatabaseLock(OldIrql
);
374 * Called when a thread which has a queue entry is entering a wait state
378 KiWakeQueue(IN PKQUEUE Queue
)
380 PLIST_ENTRY QueueEntry
;
381 PLIST_ENTRY WaitEntry
;
382 PKWAIT_BLOCK WaitBlock
;
385 /* Decrement the number of active threads */
386 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue
, KeGetCurrentThread());
387 Queue
->CurrentCount
--;
389 /* Make sure the counts are OK */
390 if (Queue
->CurrentCount
< Queue
->MaximumCount
)
392 /* Get the Queue Entry */
393 QueueEntry
= Queue
->EntryListHead
.Flink
;
395 /* Get the Wait Entry */
396 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
397 DPRINT("Queue Count is ok; entries: %p, %p\n", QueueEntry
, WaitEntry
);
399 /* Make sure that the Queue entries are not part of empty lists */
400 if ((WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
401 (QueueEntry
!= &Queue
->EntryListHead
))
403 /* Remove this entry */
404 DPRINT("Queue in List, removing it\n");
405 RemoveEntryList(QueueEntry
);
406 QueueEntry
->Flink
= NULL
;
408 /* Decrease the Signal State */
409 Queue
->Header
.SignalState
--;
411 /* Unwait the Thread */
412 WaitBlock
= CONTAINING_RECORD(WaitEntry
,
415 Thread
= WaitBlock
->Thread
;
416 DPRINT1("Unwaiting Thread: %d\n", Thread
->State
);
417 KiAbortWaitThread(Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
423 * Returns the previous number of entries in the queue
427 KiInsertQueue(IN PKQUEUE Queue
,
428 IN PLIST_ENTRY Entry
,
432 PKTHREAD Thread
= KeGetCurrentThread();
433 PKWAIT_BLOCK WaitBlock
;
434 PLIST_ENTRY WaitEntry
;
435 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue
, Entry
);
437 /* Save the old state */
438 InitialState
= Queue
->Header
.SignalState
;
441 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
442 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState
, WaitEntry
);
445 * Why the KeGetCurrentThread()->Queue != Queue?
446 * KiInsertQueue might be called from an APC for the current thread.
449 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
450 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
451 ((Thread
->Queue
!= Queue
) || (Thread
->WaitReason
!= WrQueue
)))
453 /* Remove the wait entry */
454 DPRINT("Removing Entry\n");
455 RemoveEntryList(WaitEntry
);
457 /* Get the Wait Block and Thread */
458 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
459 DPRINT("Got wait block: %x\n", WaitBlock
);
460 Thread
= WaitBlock
->Thread
;
462 /* Reset the wait reason */
463 Thread
->WaitReason
= 0;
465 /* Increase the active threads and set the status*/
466 Queue
->CurrentCount
++;
467 Thread
->WaitStatus
= (NTSTATUS
)Entry
;
469 /* Remove the thread from its wait list */
470 RemoveEntryList(&Thread
->WaitListEntry
);
472 /* Check if there's a Thread Timer */
473 if (Thread
->Timer
.Header
.Inserted
)
475 /* Cancel the Thread Timer with the no-lock fastpath */
476 DPRINT("Removing the Thread's Timer\n");
477 Thread
->Timer
.Header
.Inserted
= FALSE
;
478 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
481 /* Reschedule the Thread */
482 DPRINT("Unblocking the Thread\n");
483 KiUnblockThread(Thread
, (PNTSTATUS
)&Entry
, 0);
487 /* Increase the Entries */
488 DPRINT("Adding new Queue Entry: %d %d\n", Head
, Queue
->Header
.SignalState
);
489 Queue
->Header
.SignalState
++;
491 /* Check which mode we're using */
494 /* Insert in the head */
495 InsertHeadList(&Queue
->EntryListHead
, Entry
);
499 /* Insert at the end */
500 InsertTailList(&Queue
->EntryListHead
, Entry
);
504 /* Return the previous state */
505 DPRINT("Returning\n");