2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
8 * Eric Kohl (ekohl@rz-online.de)
11 /* INCLUDES ******************************************************************/
15 #include <internal/debug.h>
17 /* PRIVATE FUNCTIONS *********************************************************/
20 * Called when a thread which has a queue entry is entering a wait state
24 KiActivateWaiterQueue(IN PKQUEUE Queue
)
26 PLIST_ENTRY QueueEntry
;
27 PLIST_ENTRY WaitEntry
;
28 PKWAIT_BLOCK WaitBlock
;
32 /* Decrement the number of active threads */
33 Queue
->CurrentCount
--;
35 /* Make sure the counts are OK */
36 if (Queue
->CurrentCount
< Queue
->MaximumCount
)
38 /* Get the Queue Entry */
39 QueueEntry
= Queue
->EntryListHead
.Flink
;
41 /* Get the Wait Entry */
42 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
46 (QueueEntry
!= &Queue
->EntryListHead
))
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry
);
50 QueueEntry
->Flink
= NULL
;
52 /* Decrease the Signal State */
53 Queue
->Header
.SignalState
--;
55 /* Unwait the Thread */
56 WaitBlock
= CONTAINING_RECORD(WaitEntry
,
59 Thread
= WaitBlock
->Thread
;
60 KiUnwaitThread(Thread
, (NTSTATUS
)QueueEntry
, IO_NO_INCREMENT
);
66 * Returns the previous number of entries in the queue
70 KiInsertQueue(IN PKQUEUE Queue
,
75 PKTHREAD Thread
= KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock
;
77 PLIST_ENTRY WaitEntry
;
81 /* Save the old state */
82 InitialState
= Queue
->Header
.SignalState
;
85 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
88 * Why the KeGetCurrentThread()->Queue != Queue?
89 * KiInsertQueue might be called from an APC for the current thread.
92 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
93 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
94 ((Thread
->Queue
!= Queue
) ||
95 (Thread
->WaitReason
!= WrQueue
)))
97 /* Remove the wait entry */
98 RemoveEntryList(WaitEntry
);
100 /* Get the Wait Block and Thread */
101 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
102 Thread
= WaitBlock
->Thread
;
104 /* Remove the queue from the thread's wait list */
105 Thread
->WaitStatus
= (NTSTATUS
)Entry
;
106 if (Thread
->WaitListEntry
.Flink
) RemoveEntryList(&Thread
->WaitListEntry
);
108 /* Increase the active threads and remove any wait reason */
109 Queue
->CurrentCount
++;
110 Thread
->WaitReason
= 0;
112 /* Check if there's a Thread Timer */
113 Timer
= &Thread
->Timer
;
114 if (Timer
->Header
.Inserted
) KxRemoveTreeTimer(Timer
);
116 /* Reschedule the Thread */
117 KiReadyThread(Thread
);
121 /* Increase the Entries */
122 Queue
->Header
.SignalState
++;
124 /* Check which mode we're using */
127 /* Insert in the head */
128 InsertHeadList(&Queue
->EntryListHead
, Entry
);
132 /* Insert at the end */
133 InsertTailList(&Queue
->EntryListHead
, Entry
);
137 /* Return the previous state */
141 /* PUBLIC FUNCTIONS **********************************************************/
148 KeInitializeQueue(IN PKQUEUE Queue
,
149 IN ULONG Count OPTIONAL
)
151 /* Initialize the Header */
152 KeInitializeDispatcherHeader(&Queue
->Header
,
154 sizeof(KQUEUE
) / sizeof(ULONG
),
157 /* Initialize the Lists */
158 InitializeListHead(&Queue
->EntryListHead
);
159 InitializeListHead(&Queue
->ThreadListHead
);
161 /* Set the Current and Maximum Count */
162 Queue
->CurrentCount
= 0;
163 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
171 KeInsertHeadQueue(IN PKQUEUE Queue
,
172 IN PLIST_ENTRY Entry
)
177 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
179 /* Lock the Dispatcher Database */
180 OldIrql
= KiAcquireDispatcherLock();
182 /* Insert the Queue */
183 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
185 /* Release the Dispatcher Lock */
186 KiReleaseDispatcherLock(OldIrql
);
188 /* Return previous State */
189 return PreviousState
;
197 KeInsertQueue(IN PKQUEUE Queue
,
198 IN PLIST_ENTRY Entry
)
203 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
205 /* Lock the Dispatcher Database */
206 OldIrql
= KiAcquireDispatcherLock();
208 /* Insert the Queue */
209 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
211 /* Release the Dispatcher Lock */
212 KiReleaseDispatcherLock(OldIrql
);
214 /* Return previous State */
215 return PreviousState
;
221 * Returns number of entries in the queue
225 KeReadStateQueue(IN PKQUEUE Queue
)
227 /* Returns the Signal State */
229 return Queue
->Header
.SignalState
;
237 KeRemoveQueue(IN PKQUEUE Queue
,
238 IN KPROCESSOR_MODE WaitMode
,
239 IN PLARGE_INTEGER Timeout OPTIONAL
)
241 PLIST_ENTRY QueueEntry
;
243 PKTHREAD Thread
= KeGetCurrentThread();
244 PKQUEUE PreviousQueue
;
245 PKWAIT_BLOCK WaitBlock
= &Thread
->WaitBlock
[0];
246 PKWAIT_BLOCK TimerBlock
= &Thread
->WaitBlock
[TIMER_WAIT_BLOCK
];
247 PKTIMER Timer
= &Thread
->Timer
;
249 PLARGE_INTEGER OriginalDueTime
= Timeout
;
250 LARGE_INTEGER DueTime
, NewDueTime
, InterruptTime
;
253 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
255 /* Check if the Lock is already held */
256 if (Thread
->WaitNext
)
258 /* It is, so next time don't do expect this */
259 Thread
->WaitNext
= FALSE
;
264 /* Raise IRQL to synch, prepare the wait, then lock the database */
265 Thread
->WaitIrql
= KeRaiseIrqlToSynchLevel();
267 KiAcquireDispatcherLockAtDpcLevel();
271 * This is needed so that we can set the new queue right here,
272 * before additional processing
274 PreviousQueue
= Thread
->Queue
;
275 Thread
->Queue
= Queue
;
277 /* Check if this is a different queue */
278 if (Queue
!= PreviousQueue
)
280 /* Get the current entry */
281 QueueEntry
= &Thread
->QueueListEntry
;
284 /* Remove from this list */
285 RemoveEntryList(QueueEntry
);
288 KiActivateWaiterQueue(PreviousQueue
);
291 /* Insert in this new Queue */
292 InsertTailList(&Queue
->ThreadListHead
, QueueEntry
);
296 /* Same queue, decrement waiting threads */
297 Queue
->CurrentCount
--;
300 /* Loop until the queue is processed */
303 /* Check if the counts are valid and if there is still a queued entry */
304 QueueEntry
= Queue
->EntryListHead
.Flink
;
305 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
306 (QueueEntry
!= &Queue
->EntryListHead
))
308 /* Decrease the number of entries */
309 Queue
->Header
.SignalState
--;
311 /* Increase numbef of running threads */
312 Queue
->CurrentCount
++;
314 /* Check if the entry is valid. If not, bugcheck */
315 if (!(QueueEntry
->Flink
) || !(QueueEntry
->Blink
))
318 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM
,
319 (ULONG_PTR
)QueueEntry
,
322 (ULONG_PTR
)((PWORK_QUEUE_ITEM
)QueueEntry
)->
326 /* Remove the Entry */
327 RemoveEntryList(QueueEntry
);
328 QueueEntry
->Flink
= NULL
;
330 /* Nothing to wait on */
335 /* Check if a kernel APC is pending and we're below APC_LEVEL */
336 if ((Thread
->ApcState
.KernelApcPending
) &&
337 !(Thread
->SpecialApcDisable
) && (Thread
->WaitIrql
< APC_LEVEL
))
339 /* Increment the count and unlock the dispatcher */
340 Queue
->CurrentCount
++;
341 KiReleaseDispatcherLockFromDpcLevel();
342 KiExitDispatcher(Thread
->WaitIrql
);
346 /* Fail if there's a User APC Pending */
347 if ((WaitMode
!= KernelMode
) &&
348 (Thread
->ApcState
.UserApcPending
))
350 /* Return the status and increase the pending threads */
351 QueueEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
352 Queue
->CurrentCount
++;
356 /* Enable the Timeout Timer if there was any specified */
359 /* Check if the timer expired */
360 InterruptTime
.QuadPart
= KeQueryInterruptTime();
361 if (InterruptTime
.QuadPart
>= Timer
->DueTime
.QuadPart
)
363 /* It did, so we don't need to wait */
364 QueueEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
365 Queue
->CurrentCount
++;
369 /* It didn't, so activate it */
370 Timer
->Header
.Inserted
= TRUE
;
373 /* Insert the wait block in the list */
374 InsertTailList(&Queue
->Header
.WaitListHead
,
375 &WaitBlock
->WaitListEntry
);
377 /* Setup the wait information */
378 Thread
->State
= Waiting
;
380 /* Add the thread to the wait list */
381 KiAddThreadToWaitList(Thread
, Swappable
);
383 /* Activate thread swap */
384 ASSERT(Thread
->WaitIrql
<= DISPATCH_LEVEL
);
385 KiSetThreadSwapBusy(Thread
);
387 /* Check if we have a timer */
391 KxInsertTimer(Timer
, Hand
);
395 /* Otherwise, unlock the dispatcher */
396 KiReleaseDispatcherLockFromDpcLevel();
399 /* Do the actual swap */
400 Status
= KiSwapThread(Thread
, KeGetCurrentPrcb());
402 /* Reset the wait reason */
403 Thread
->WaitReason
= 0;
405 /* Check if we were executing an APC */
406 if (Status
!= STATUS_KERNEL_APC
) return (PLIST_ENTRY
)Status
;
408 /* Check if we had a timeout */
411 /* Recalculate due times */
412 Timeout
= KiRecalculateDueTime(OriginalDueTime
,
418 /* Start another wait */
419 Thread
->WaitIrql
= KeRaiseIrqlToSynchLevel();
421 KiAcquireDispatcherLockAtDpcLevel();
422 Queue
->CurrentCount
--;
426 /* Unlock Database and return */
427 KiReleaseDispatcherLockFromDpcLevel();
428 KiExitDispatcher(Thread
->WaitIrql
);
437 KeRundownQueue(IN PKQUEUE Queue
)
439 PLIST_ENTRY ListHead
, NextEntry
;
440 PLIST_ENTRY FirstEntry
= NULL
;
444 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
445 ASSERT(IsListEmpty(&Queue
->Header
.WaitListHead
));
447 /* Get the Dispatcher Lock */
448 OldIrql
= KiAcquireDispatcherLock();
450 /* Make sure the list is not empty */
451 if (!IsListEmpty(&Queue
->EntryListHead
))
454 FirstEntry
= RemoveHeadList(&Queue
->EntryListHead
);
457 /* Unlink threads and clear their Thread->Queue */
458 ListHead
= &Queue
->ThreadListHead
;
459 NextEntry
= ListHead
->Flink
;
460 while (ListHead
!= NextEntry
)
462 /* Get the Entry's Thread */
463 Thread
= CONTAINING_RECORD(NextEntry
, KTHREAD
, QueueListEntry
);
466 Thread
->Queue
= NULL
;
468 /* Remove this entry */
469 RemoveEntryList(NextEntry
);
471 /* Get the next entry */
472 NextEntry
= NextEntry
->Flink
;
475 /* Release the lock and return */
476 KiReleaseDispatcherLock(OldIrql
);