2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
9 * Eric Kohl (ekohl@rz-online.de)
12 /* INCLUDES *****************************************************************/
16 #include <internal/debug.h>
18 /* FUNCTIONS *****************************************************************/
20 LONG STDCALL
KiInsertQueue(IN PKQUEUE Queue
, IN PLIST_ENTRY Entry
, BOOLEAN Head
);
27 KeInitializeQueue(IN PKQUEUE Queue
,
28 IN ULONG Count OPTIONAL
)
30 DPRINT("KeInitializeQueue %x\n", Queue
);
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue
->Header
,
35 sizeof(KQUEUE
)/sizeof(ULONG
),
38 /* Initialize the Lists */
39 InitializeListHead(&Queue
->EntryListHead
);
40 InitializeListHead(&Queue
->ThreadListHead
);
42 /* Set the Current and Maximum Count */
43 Queue
->CurrentCount
= 0;
44 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
52 KeInsertHeadQueue(IN PKQUEUE Queue
,
58 DPRINT("KeInsertHeadQueue %x\n", Queue
);
60 /* Lock the Dispatcher Database */
61 OldIrql
= KeAcquireDispatcherDatabaseLock();
63 /* Insert the Queue */
64 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql
);
69 /* Return previous State */
77 KeInsertQueue(IN PKQUEUE Queue
,
83 DPRINT("KeInsertQueue %x\n", Queue
);
85 /* Lock the Dispatcher Database */
86 OldIrql
= KeAcquireDispatcherDatabaseLock();
88 /* Insert the Queue */
89 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql
);
94 /* Return previous State */
101 * Returns number of entries in the queue
105 KeReadStateQueue(IN PKQUEUE Queue
)
107 /* Returns the Signal State */
108 return(Queue
->Header
.SignalState
);
116 KeRemoveQueue(IN PKQUEUE Queue
,
117 IN KPROCESSOR_MODE WaitMode
,
118 IN PLARGE_INTEGER Timeout OPTIONAL
)
120 PLIST_ENTRY QueueEntry
;
122 PKTHREAD Thread
= KeGetCurrentThread();
124 PKQUEUE PreviousQueue
;
125 PKWAIT_BLOCK WaitBlock
;
127 DPRINT("KeRemoveQueue %x\n", Queue
);
129 /* Check if the Lock is already held */
130 if (Thread
->WaitNext
)
132 DPRINT("Lock is already held\n");
133 Thread
->WaitNext
= FALSE
;
137 /* Lock the Dispatcher Database */
138 DPRINT("Lock not held, acquiring\n");
139 OldIrql
= KeAcquireDispatcherDatabaseLock();
140 Thread
->WaitIrql
= OldIrql
;
143 /* This is needed so that we can set the new queue right here, before additional processing */
144 PreviousQueue
= Thread
->Queue
;
145 Thread
->Queue
= Queue
;
147 /* Check if this is a different queue */
148 if (Queue
!= PreviousQueue
)
150 DPRINT("Different Queue\n");
151 QueueEntry
= &Thread
->QueueListEntry
;
154 /* Remove from this list */
155 DPRINT("Removing Old Queue\n");
156 RemoveEntryList(QueueEntry
);
159 DPRINT("Activating new thread\n");
160 KiWakeQueue(PreviousQueue
);
163 /* Insert in this new Queue */
164 DPRINT("Inserting new Queue!\n");
165 InsertTailList(&Queue
->ThreadListHead
, QueueEntry
);
169 /* Same queue, decrement waiting threads */
170 DPRINT("Same Queue!\n");
171 Queue
->CurrentCount
--;
174 /* Loop until the queue is processed */
177 /* Check if the counts are valid and if there is still a queued entry */
178 QueueEntry
= Queue
->EntryListHead
.Flink
;
179 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
180 (QueueEntry
!= &Queue
->EntryListHead
))
182 /* Remove the Entry and Save it */
183 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
184 Queue
->CurrentCount
, Queue
->MaximumCount
);
186 /* Decrease the number of entries */
187 Queue
->Header
.SignalState
--;
189 /* Increase numbef of running threads */
190 Queue
->CurrentCount
++;
192 /* Check if the entry is valid. If not, bugcheck */
193 if (!(QueueEntry
->Flink
) || !(QueueEntry
->Blink
))
195 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
198 /* Remove the Entry */
199 RemoveEntryList(QueueEntry
);
200 QueueEntry
->Flink
= NULL
;
202 /* Nothing to wait on */
208 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
209 Queue
->CurrentCount
, Queue
->MaximumCount
);
211 /* Use the Thread's Wait Block, it's big enough */
212 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
214 /* Check if a kernel APC is pending and we were below APC_LEVEL */
215 if ((Thread
->ApcState
.KernelApcPending
) &&
216 (Thread
->WaitIrql
< APC_LEVEL
))
218 /* Increment the count and unlock the dispatcher */
219 Queue
->CurrentCount
++;
220 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
224 /* Fail if there's a User APC Pending */
225 if ((WaitMode
!= KernelMode
) && (Thread
->ApcState
.UserApcPending
))
227 /* Return the status and increase the pending threads */
228 QueueEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
229 Queue
->CurrentCount
++;
231 /* Nothing to wait on */
235 /* Build the Wait Block */
236 WaitBlock
= &Thread
->WaitBlock
[0];
237 WaitBlock
->Object
= (PVOID
)Queue
;
238 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
239 WaitBlock
->WaitType
= WaitAny
;
240 WaitBlock
->Thread
= Thread
;
241 Thread
->WaitStatus
= STATUS_WAIT_0
;
243 /* We need to wait for the object... check if we have a timeout */
246 /* If it's zero, then don't do any waiting */
247 if (!Timeout
->QuadPart
)
249 /* Instant Timeout, return the status and increase the pending threads */
250 DPRINT("Queue Wait has timed out\n");
251 QueueEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
252 Queue
->CurrentCount
++;
254 /* Nothing to wait on */
259 * Set up the Timer. We'll use the internal function so that we can
260 * hold on to the dispatcher lock.
262 Timer
= &Thread
->Timer
;
263 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[1];
264 WaitBlock
= &Thread
->WaitBlock
[1];
266 /* Set up the Timer Wait Block */
267 WaitBlock
->Object
= (PVOID
)Timer
;
268 WaitBlock
->Thread
= Thread
;
269 WaitBlock
->WaitKey
= STATUS_TIMEOUT
;
270 WaitBlock
->WaitType
= WaitAny
;
272 /* Link the timer to this Wait Block */
273 InitializeListHead(&Timer
->Header
.WaitListHead
);
274 InsertTailList(&Timer
->Header
.WaitListHead
, &WaitBlock
->WaitListEntry
);
277 DPRINT("Creating Timer with timeout %I64d\n", *Timeout
);
278 KiInsertTimer(Timer
, *Timeout
);
282 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[0];
284 /* Insert the wait block into the Queues's wait list */
285 WaitBlock
= &Thread
->WaitBlock
[0];
286 InsertTailList(&Queue
->Header
.WaitListHead
,
287 &WaitBlock
->WaitListEntry
);
289 /* Setup the wait information */
290 Thread
->WaitMode
= WaitMode
;
291 Thread
->WaitReason
= WrQueue
;
292 Thread
->Alertable
= FALSE
;
293 Thread
->WaitTime
= 0;
294 Thread
->State
= Waiting
;
296 /* Find a new thread to run */
297 DPRINT("Swapping threads\n");
298 Status
= KiSwapThread();
300 /* Reset the wait reason */
301 Thread
->WaitReason
= 0;
303 /* Check if we were executing an APC */
304 if (Status
!= STATUS_KERNEL_APC
)
307 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread
);
308 return (PLIST_ENTRY
)Status
;
311 /* Check if we had a timeout */
314 /* FIXME: Fixup interval */
317 /* Acquire again the lock */
319 DPRINT("Looping again\n");
320 OldIrql
= KeAcquireDispatcherDatabaseLock();
322 /* Save the new IRQL and decrease number of waiting threads */
323 Thread
->WaitIrql
= OldIrql
;
324 Queue
->CurrentCount
--;
328 /* Unlock Database and return */
329 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
330 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
331 Queue
->CurrentCount
, Queue
->MaximumCount
);
340 KeRundownQueue(IN PKQUEUE Queue
)
342 PLIST_ENTRY EnumEntry
;
343 PLIST_ENTRY FirstEntry
= NULL
;
347 DPRINT("KeRundownQueue(Queue %x)\n", Queue
);
349 /* Get the Dispatcher Lock */
350 OldIrql
= KeAcquireDispatcherDatabaseLock();
352 /* Make sure the list is not empty */
353 if (!IsListEmpty(&Queue
->EntryListHead
))
356 FirstEntry
= RemoveHeadList(&Queue
->EntryListHead
);
359 /* Unlink threads and clear their Thread->Queue */
360 while (!IsListEmpty(&Queue
->ThreadListHead
))
362 /* Get the Entry and Remove it */
363 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
365 /* Get the Entry's Thread */
366 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
369 Thread
->Queue
= NULL
;
372 /* Release the lock and return */
373 KeReleaseDispatcherDatabaseLock(OldIrql
);
378 * Called when a thread which has a queue entry is entering a wait state
382 KiWakeQueue(IN PKQUEUE Queue
)
384 PLIST_ENTRY QueueEntry
;
385 PLIST_ENTRY WaitEntry
;
386 PKWAIT_BLOCK WaitBlock
;
389 /* Decrement the number of active threads */
390 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue
, KeGetCurrentThread());
391 Queue
->CurrentCount
--;
393 /* Make sure the counts are OK */
394 if (Queue
->CurrentCount
< Queue
->MaximumCount
)
396 /* Get the Queue Entry */
397 QueueEntry
= Queue
->EntryListHead
.Flink
;
399 /* Get the Wait Entry */
400 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
401 DPRINT("Queue Count is ok; entries: %p, %p\n", QueueEntry
, WaitEntry
);
403 /* Make sure that the Queue entries are not part of empty lists */
404 if ((WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
405 (QueueEntry
!= &Queue
->EntryListHead
))
407 /* Remove this entry */
408 DPRINT("Queue in List, removing it\n");
409 RemoveEntryList(QueueEntry
);
410 QueueEntry
->Flink
= NULL
;
412 /* Decrease the Signal State */
413 Queue
->Header
.SignalState
--;
415 /* Unwait the Thread */
416 WaitBlock
= CONTAINING_RECORD(WaitEntry
,
419 Thread
= WaitBlock
->Thread
;
420 DPRINT1("Unwaiting Thread: %d\n", Thread
->State
);
421 KiAbortWaitThread(Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
427 * Returns the previous number of entries in the queue
431 KiInsertQueue(IN PKQUEUE Queue
,
432 IN PLIST_ENTRY Entry
,
436 PKTHREAD Thread
= KeGetCurrentThread();
437 PKWAIT_BLOCK WaitBlock
;
438 PLIST_ENTRY WaitEntry
;
439 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue
, Entry
);
441 /* Save the old state */
442 InitialState
= Queue
->Header
.SignalState
;
445 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
446 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState
, WaitEntry
);
449 * Why the KeGetCurrentThread()->Queue != Queue?
450 * KiInsertQueue might be called from an APC for the current thread.
453 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
454 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
455 ((Thread
->Queue
!= Queue
) || (Thread
->WaitReason
!= WrQueue
)))
457 /* Remove the wait entry */
458 DPRINT("Removing Entry\n");
459 RemoveEntryList(WaitEntry
);
461 /* Get the Wait Block and Thread */
462 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
463 DPRINT("Got wait block: %x\n", WaitBlock
);
464 Thread
= WaitBlock
->Thread
;
466 /* Reset the wait reason */
467 Thread
->WaitReason
= 0;
469 /* Increase the active threads and set the status*/
470 Queue
->CurrentCount
++;
471 Thread
->WaitStatus
= (NTSTATUS
)Entry
;
473 /* Remove the thread from its wait list */
474 RemoveEntryList(&Thread
->WaitListEntry
);
476 /* Check if there's a Thread Timer */
477 if (Thread
->Timer
.Header
.Inserted
)
479 /* Cancel the Thread Timer with the no-lock fastpath */
480 DPRINT("Removing the Thread's Timer\n");
481 Thread
->Timer
.Header
.Inserted
= FALSE
;
482 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
485 /* Reschedule the Thread */
486 DPRINT("Unblocking the Thread\n");
487 KiUnblockThread(Thread
, (PNTSTATUS
)&Entry
, 0);
491 /* Increase the Entries */
492 DPRINT("Adding new Queue Entry: %d %d\n", Head
, Queue
->Header
.SignalState
);
493 Queue
->Header
.SignalState
++;
495 /* Check which mode we're using */
498 /* Insert in the head */
499 InsertHeadList(&Queue
->EntryListHead
, Entry
);
503 /* Insert at the end */
504 InsertTailList(&Queue
->EntryListHead
, Entry
);
508 /* Return the previous state */
509 DPRINT("Returning\n");