2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
8 * Eric Kohl (ekohl@rz-online.de)
11 /* INCLUDES ******************************************************************/
15 #include <internal/debug.h>
17 /* PRIVATE FUNCTIONS *********************************************************/
20 * Called when a thread which has a queue entry is entering a wait state
24 KiWakeQueue(IN PKQUEUE Queue
)
26 PLIST_ENTRY QueueEntry
;
27 PLIST_ENTRY WaitEntry
;
28 PKWAIT_BLOCK WaitBlock
;
32 /* Decrement the number of active threads */
33 Queue
->CurrentCount
--;
35 /* Make sure the counts are OK */
36 if (Queue
->CurrentCount
< Queue
->MaximumCount
)
38 /* Get the Queue Entry */
39 QueueEntry
= Queue
->EntryListHead
.Flink
;
41 /* Get the Wait Entry */
42 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
46 (QueueEntry
!= &Queue
->EntryListHead
))
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry
);
50 QueueEntry
->Flink
= NULL
;
52 /* Decrease the Signal State */
53 Queue
->Header
.SignalState
--;
55 /* Unwait the Thread */
56 WaitBlock
= CONTAINING_RECORD(WaitEntry
,
59 Thread
= WaitBlock
->Thread
;
60 KiAbortWaitThread(Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
66 * Returns the previous number of entries in the queue
70 KiInsertQueue(IN PKQUEUE Queue
,
75 PKTHREAD Thread
= KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock
;
77 PLIST_ENTRY WaitEntry
;
80 /* Save the old state */
81 InitialState
= Queue
->Header
.SignalState
;
84 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
87 * Why the KeGetCurrentThread()->Queue != Queue?
88 * KiInsertQueue might be called from an APC for the current thread.
91 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
92 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
93 ((Thread
->Queue
!= Queue
) ||
94 (Thread
->WaitReason
!= WrQueue
)))
96 /* Remove the wait entry */
97 RemoveEntryList(WaitEntry
);
99 /* Get the Wait Block and Thread */
100 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
101 Thread
= WaitBlock
->Thread
;
103 /* Remove the queue from the thread's wait list */
104 Thread
->WaitStatus
= (NTSTATUS
)Entry
;
105 RemoveEntryList(&Thread
->WaitListEntry
);
106 Thread
->WaitReason
= 0;
108 /* Increase the active threads and set the status*/
109 Queue
->CurrentCount
++;
111 /* Check if there's a Thread Timer */
112 if (Thread
->Timer
.Header
.Inserted
)
114 /* Cancel the Thread Timer with the no-lock fastpath */
115 Thread
->Timer
.Header
.Inserted
= FALSE
;
116 RemoveEntryList(&Thread
->Timer
.TimerListEntry
);
119 /* Reschedule the Thread */
120 KiReadyThread(Thread
);
124 /* Increase the Entries */
125 Queue
->Header
.SignalState
++;
127 /* Check which mode we're using */
130 /* Insert in the head */
131 InsertHeadList(&Queue
->EntryListHead
, Entry
);
135 /* Insert at the end */
136 InsertTailList(&Queue
->EntryListHead
, Entry
);
140 /* Return the previous state */
144 /* PUBLIC FUNCTIONS **********************************************************/
151 KeInitializeQueue(IN PKQUEUE Queue
,
152 IN ULONG Count OPTIONAL
)
154 /* Initialize the Header */
155 KeInitializeDispatcherHeader(&Queue
->Header
,
157 sizeof(KQUEUE
) / sizeof(ULONG
),
160 /* Initialize the Lists */
161 InitializeListHead(&Queue
->EntryListHead
);
162 InitializeListHead(&Queue
->ThreadListHead
);
164 /* Set the Current and Maximum Count */
165 Queue
->CurrentCount
= 0;
166 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
174 KeInsertHeadQueue(IN PKQUEUE Queue
,
175 IN PLIST_ENTRY Entry
)
180 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
182 /* Lock the Dispatcher Database */
183 OldIrql
= KeAcquireDispatcherDatabaseLock();
185 /* Insert the Queue */
186 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
188 /* Release the Dispatcher Lock */
189 KeReleaseDispatcherDatabaseLock(OldIrql
);
191 /* Return previous State */
192 return PreviousState
;
200 KeInsertQueue(IN PKQUEUE Queue
,
201 IN PLIST_ENTRY Entry
)
206 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
208 /* Lock the Dispatcher Database */
209 OldIrql
= KeAcquireDispatcherDatabaseLock();
211 /* Insert the Queue */
212 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
214 /* Release the Dispatcher Lock */
215 KeReleaseDispatcherDatabaseLock(OldIrql
);
217 /* Return previous State */
218 return PreviousState
;
224 * Returns number of entries in the queue
228 KeReadStateQueue(IN PKQUEUE Queue
)
230 /* Returns the Signal State */
232 return Queue
->Header
.SignalState
;
240 KeRemoveQueue(IN PKQUEUE Queue
,
241 IN KPROCESSOR_MODE WaitMode
,
242 IN PLARGE_INTEGER Timeout OPTIONAL
)
244 PLIST_ENTRY QueueEntry
;
246 PKTHREAD Thread
= KeGetCurrentThread();
248 PKQUEUE PreviousQueue
;
249 PKWAIT_BLOCK WaitBlock
;
252 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
254 /* Check if the Lock is already held */
255 if (Thread
->WaitNext
)
257 /* It is, so next time don't do expect this */
258 Thread
->WaitNext
= FALSE
;
262 /* Lock the Dispatcher Database */
263 OldIrql
= KeAcquireDispatcherDatabaseLock();
264 Thread
->WaitIrql
= OldIrql
;
268 * This is needed so that we can set the new queue right here,
269 * before additional processing
271 PreviousQueue
= Thread
->Queue
;
272 Thread
->Queue
= Queue
;
274 /* Check if this is a different queue */
275 if (Queue
!= PreviousQueue
)
277 /* Get the current entry */
278 QueueEntry
= &Thread
->QueueListEntry
;
281 /* Remove from this list */
282 RemoveEntryList(QueueEntry
);
285 KiWakeQueue(PreviousQueue
);
288 /* Insert in this new Queue */
289 InsertTailList(&Queue
->ThreadListHead
, QueueEntry
);
293 /* Same queue, decrement waiting threads */
294 Queue
->CurrentCount
--;
297 /* Loop until the queue is processed */
300 /* Check if the counts are valid and if there is still a queued entry */
301 QueueEntry
= Queue
->EntryListHead
.Flink
;
302 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
303 (QueueEntry
!= &Queue
->EntryListHead
))
305 /* Decrease the number of entries */
306 Queue
->Header
.SignalState
--;
308 /* Increase numbef of running threads */
309 Queue
->CurrentCount
++;
311 /* Check if the entry is valid. If not, bugcheck */
312 if (!(QueueEntry
->Flink
) || !(QueueEntry
->Blink
))
314 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM
);
317 /* Remove the Entry */
318 RemoveEntryList(QueueEntry
);
319 QueueEntry
->Flink
= NULL
;
321 /* Nothing to wait on */
326 /* Use the Thread's Wait Block, it's big enough */
327 Thread
->WaitBlockList
= &Thread
->WaitBlock
[0];
329 /* Check if a kernel APC is pending and we're below APC_LEVEL */
330 if ((Thread
->ApcState
.KernelApcPending
) &&
331 !(Thread
->SpecialApcDisable
) && (Thread
->WaitIrql
< APC_LEVEL
))
333 /* Increment the count and unlock the dispatcher */
334 Queue
->CurrentCount
++;
335 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
339 /* Fail if there's a User APC Pending */
340 if ((WaitMode
!= KernelMode
) &&
341 (Thread
->ApcState
.UserApcPending
))
343 /* Return the status and increase the pending threads */
344 QueueEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
345 Queue
->CurrentCount
++;
349 /* Build the Wait Block */
350 WaitBlock
= &Thread
->WaitBlock
[0];
351 WaitBlock
->Object
= (PVOID
)Queue
;
352 WaitBlock
->WaitKey
= STATUS_SUCCESS
;
353 WaitBlock
->WaitType
= WaitAny
;
354 WaitBlock
->Thread
= Thread
;
355 Thread
->WaitStatus
= STATUS_WAIT_0
;
357 /* We need to wait for the object... check for a timeout */
360 /* Check if it's zero */
361 if (!Timeout
->QuadPart
)
363 /* Don't wait. Return and increase pending threads */
364 QueueEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
365 Queue
->CurrentCount
++;
370 * Set up the Timer. We'll use the internal function so
371 * that we can hold on to the dispatcher lock.
373 Timer
= &Thread
->Timer
;
374 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[1];
375 WaitBlock
= &Thread
->WaitBlock
[1];
377 /* Set up the Timer Wait Block */
378 WaitBlock
->Object
= (PVOID
)Timer
;
379 WaitBlock
->Thread
= Thread
;
380 WaitBlock
->WaitKey
= STATUS_TIMEOUT
;
381 WaitBlock
->WaitType
= WaitAny
;
383 /* Link the timer to this Wait Block */
384 Timer
->Header
.WaitListHead
.Flink
=
385 &WaitBlock
->WaitListEntry
;
386 Timer
->Header
.WaitListHead
.Blink
=
387 &WaitBlock
->WaitListEntry
;
390 KiInsertTimer(Timer
, *Timeout
);
394 WaitBlock
->NextWaitBlock
= &Thread
->WaitBlock
[0];
396 /* Insert the wait block into the Queues's wait list */
397 WaitBlock
= &Thread
->WaitBlock
[0];
398 InsertTailList(&Queue
->Header
.WaitListHead
,
399 &WaitBlock
->WaitListEntry
);
401 /* Setup the wait information */
402 Thread
->WaitMode
= WaitMode
;
403 Thread
->WaitReason
= WrQueue
;
404 Thread
->Alertable
= FALSE
;
405 Thread
->WaitTime
= ((PLARGE_INTEGER
)&KeTickCount
)->LowPart
;
406 Thread
->State
= Waiting
;
408 /* Find a new thread to run */
409 Status
= KiSwapThread();
411 /* Reset the wait reason */
412 Thread
->WaitReason
= 0;
414 /* Check if we were executing an APC */
415 if (Status
!= STATUS_KERNEL_APC
)
418 return (PLIST_ENTRY
)Status
;
421 /* Check if we had a timeout */
424 /* FIXME: Fixup interval */
425 DPRINT1("FIXME!!!\n");
429 /* Reacquire the lock */
430 OldIrql
= KeAcquireDispatcherDatabaseLock();
432 /* Save the new IRQL and decrease number of waiting threads */
433 Thread
->WaitIrql
= OldIrql
;
434 Queue
->CurrentCount
--;
438 /* Unlock Database and return */
439 KeReleaseDispatcherDatabaseLock(Thread
->WaitIrql
);
448 KeRundownQueue(IN PKQUEUE Queue
)
450 PLIST_ENTRY EnumEntry
;
451 PLIST_ENTRY FirstEntry
= NULL
;
455 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
456 ASSERT(IsListEmpty(&Queue
->Header
.WaitListHead
));
458 /* Get the Dispatcher Lock */
459 OldIrql
= KeAcquireDispatcherDatabaseLock();
461 /* Make sure the list is not empty */
462 if (!IsListEmpty(&Queue
->EntryListHead
))
465 FirstEntry
= RemoveHeadList(&Queue
->EntryListHead
);
468 /* Unlink threads and clear their Thread->Queue */
469 while (!IsListEmpty(&Queue
->ThreadListHead
))
471 /* Get the Entry and Remove it */
472 EnumEntry
= RemoveHeadList(&Queue
->ThreadListHead
);
474 /* Get the Entry's Thread */
475 Thread
= CONTAINING_RECORD(EnumEntry
, KTHREAD
, QueueListEntry
);
478 Thread
->Queue
= NULL
;
481 /* Release the lock and return */
482 KeReleaseDispatcherDatabaseLock(OldIrql
);