2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
11 /* INCLUDES ******************************************************************/
17 /* PRIVATE FUNCTIONS *********************************************************/
20 * Called when a thread which has a queue entry is entering a wait state
24 KiActivateWaiterQueue(IN PKQUEUE Queue
)
26 PLIST_ENTRY QueueEntry
;
27 PLIST_ENTRY WaitEntry
;
28 PKWAIT_BLOCK WaitBlock
;
32 /* Decrement the number of active threads */
33 Queue
->CurrentCount
--;
35 /* Make sure the counts are OK */
36 if (Queue
->CurrentCount
< Queue
->MaximumCount
)
38 /* Get the Queue Entry */
39 QueueEntry
= Queue
->EntryListHead
.Flink
;
41 /* Get the Wait Entry */
42 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
46 (QueueEntry
!= &Queue
->EntryListHead
))
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry
);
50 QueueEntry
->Flink
= NULL
;
52 /* Decrease the Signal State */
53 Queue
->Header
.SignalState
--;
55 /* Unwait the Thread */
56 WaitBlock
= CONTAINING_RECORD(WaitEntry
,
59 Thread
= WaitBlock
->Thread
;
60 KiUnwaitThread(Thread
, (LONG_PTR
)QueueEntry
, IO_NO_INCREMENT
);
66 * Returns the previous number of entries in the queue
70 KiInsertQueue(IN PKQUEUE Queue
,
75 PKTHREAD Thread
= KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock
;
77 PLIST_ENTRY WaitEntry
;
81 /* Save the old state */
82 InitialState
= Queue
->Header
.SignalState
;
85 WaitEntry
= Queue
->Header
.WaitListHead
.Blink
;
88 * Why the KeGetCurrentThread()->Queue != Queue?
89 * KiInsertQueue might be called from an APC for the current thread.
92 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
93 (WaitEntry
!= &Queue
->Header
.WaitListHead
) &&
94 ((Thread
->Queue
!= Queue
) ||
95 (Thread
->WaitReason
!= WrQueue
)))
97 /* Remove the wait entry */
98 RemoveEntryList(WaitEntry
);
100 /* Get the Wait Block and Thread */
101 WaitBlock
= CONTAINING_RECORD(WaitEntry
, KWAIT_BLOCK
, WaitListEntry
);
102 Thread
= WaitBlock
->Thread
;
104 /* Remove the queue from the thread's wait list */
105 Thread
->WaitStatus
= (LONG_PTR
)Entry
;
106 if (Thread
->WaitListEntry
.Flink
) RemoveEntryList(&Thread
->WaitListEntry
);
108 /* Increase the active threads and remove any wait reason */
109 Queue
->CurrentCount
++;
110 Thread
->WaitReason
= 0;
112 /* Check if there's a Thread Timer */
113 Timer
= &Thread
->Timer
;
114 if (Timer
->Header
.Inserted
) KxRemoveTreeTimer(Timer
);
116 /* Reschedule the Thread */
117 KiReadyThread(Thread
);
121 /* Increase the Entries */
122 Queue
->Header
.SignalState
++;
124 /* Check which mode we're using */
127 /* Insert in the head */
128 InsertHeadList(&Queue
->EntryListHead
, Entry
);
132 /* Insert at the end */
133 InsertTailList(&Queue
->EntryListHead
, Entry
);
137 /* Return the previous state */
141 /* PUBLIC FUNCTIONS **********************************************************/
148 KeInitializeQueue(IN PKQUEUE Queue
,
149 IN ULONG Count OPTIONAL
)
151 /* Initialize the Header */
152 Queue
->Header
.Type
= QueueObject
;
153 Queue
->Header
.Abandoned
= 0;
154 Queue
->Header
.Size
= sizeof(KQUEUE
) / sizeof(ULONG
);
155 Queue
->Header
.SignalState
= 0;
156 InitializeListHead(&(Queue
->Header
.WaitListHead
));
158 /* Initialize the Lists */
159 InitializeListHead(&Queue
->EntryListHead
);
160 InitializeListHead(&Queue
->ThreadListHead
);
162 /* Set the Current and Maximum Count */
163 Queue
->CurrentCount
= 0;
164 Queue
->MaximumCount
= (Count
== 0) ? (ULONG
) KeNumberProcessors
: Count
;
172 KeInsertHeadQueue(IN PKQUEUE Queue
,
173 IN PLIST_ENTRY Entry
)
178 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
180 /* Lock the Dispatcher Database */
181 OldIrql
= KiAcquireDispatcherLock();
183 /* Insert the Queue */
184 PreviousState
= KiInsertQueue(Queue
, Entry
, TRUE
);
186 /* Release the Dispatcher Lock */
187 KiReleaseDispatcherLock(OldIrql
);
189 /* Return previous State */
190 return PreviousState
;
198 KeInsertQueue(IN PKQUEUE Queue
,
199 IN PLIST_ENTRY Entry
)
204 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
206 /* Lock the Dispatcher Database */
207 OldIrql
= KiAcquireDispatcherLock();
209 /* Insert the Queue */
210 PreviousState
= KiInsertQueue(Queue
, Entry
, FALSE
);
212 /* Release the Dispatcher Lock */
213 KiReleaseDispatcherLock(OldIrql
);
215 /* Return previous State */
216 return PreviousState
;
222 * Returns number of entries in the queue
226 KeReadStateQueue(IN PKQUEUE Queue
)
228 /* Returns the Signal State */
230 return Queue
->Header
.SignalState
;
238 KeRemoveQueue(IN PKQUEUE Queue
,
239 IN KPROCESSOR_MODE WaitMode
,
240 IN PLARGE_INTEGER Timeout OPTIONAL
)
242 PLIST_ENTRY QueueEntry
;
244 PKTHREAD Thread
= KeGetCurrentThread();
245 PKQUEUE PreviousQueue
;
246 PKWAIT_BLOCK WaitBlock
= &Thread
->WaitBlock
[0];
247 PKWAIT_BLOCK TimerBlock
= &Thread
->WaitBlock
[TIMER_WAIT_BLOCK
];
248 PKTIMER Timer
= &Thread
->Timer
;
250 PLARGE_INTEGER OriginalDueTime
= Timeout
;
251 LARGE_INTEGER DueTime
= {{0}}, NewDueTime
, InterruptTime
;
254 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
256 /* Check if the Lock is already held */
257 if (Thread
->WaitNext
)
259 /* It is, so next time don't do expect this */
260 Thread
->WaitNext
= FALSE
;
265 /* Raise IRQL to synch, prepare the wait, then lock the database */
266 Thread
->WaitIrql
= KeRaiseIrqlToSynchLevel();
268 KiAcquireDispatcherLockAtDpcLevel();
272 * This is needed so that we can set the new queue right here,
273 * before additional processing
275 PreviousQueue
= Thread
->Queue
;
276 Thread
->Queue
= Queue
;
278 /* Check if this is a different queue */
279 if (Queue
!= PreviousQueue
)
281 /* Get the current entry */
282 QueueEntry
= &Thread
->QueueListEntry
;
285 /* Remove from this list */
286 RemoveEntryList(QueueEntry
);
289 KiActivateWaiterQueue(PreviousQueue
);
292 /* Insert in this new Queue */
293 InsertTailList(&Queue
->ThreadListHead
, QueueEntry
);
297 /* Same queue, decrement waiting threads */
298 Queue
->CurrentCount
--;
301 /* Loop until the queue is processed */
304 /* Check if the counts are valid and if there is still a queued entry */
305 QueueEntry
= Queue
->EntryListHead
.Flink
;
306 if ((Queue
->CurrentCount
< Queue
->MaximumCount
) &&
307 (QueueEntry
!= &Queue
->EntryListHead
))
309 /* Decrease the number of entries */
310 Queue
->Header
.SignalState
--;
312 /* Increase numbef of running threads */
313 Queue
->CurrentCount
++;
315 /* Check if the entry is valid. If not, bugcheck */
316 if (!(QueueEntry
->Flink
) || !(QueueEntry
->Blink
))
319 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM
,
320 (ULONG_PTR
)QueueEntry
,
323 (ULONG_PTR
)((PWORK_QUEUE_ITEM
)QueueEntry
)->
327 /* Remove the Entry */
328 RemoveEntryList(QueueEntry
);
329 QueueEntry
->Flink
= NULL
;
331 /* Nothing to wait on */
336 /* Check if a kernel APC is pending and we're below APC_LEVEL */
337 if ((Thread
->ApcState
.KernelApcPending
) &&
338 !(Thread
->SpecialApcDisable
) && (Thread
->WaitIrql
< APC_LEVEL
))
340 /* Increment the count and unlock the dispatcher */
341 Queue
->CurrentCount
++;
342 KiReleaseDispatcherLockFromDpcLevel();
343 KiExitDispatcher(Thread
->WaitIrql
);
347 /* Fail if there's a User APC Pending */
348 if ((WaitMode
!= KernelMode
) &&
349 (Thread
->ApcState
.UserApcPending
))
351 /* Return the status and increase the pending threads */
352 QueueEntry
= (PLIST_ENTRY
)STATUS_USER_APC
;
353 Queue
->CurrentCount
++;
357 /* Enable the Timeout Timer if there was any specified */
360 /* Check if the timer expired */
361 InterruptTime
.QuadPart
= KeQueryInterruptTime();
362 if ((ULONG64
)InterruptTime
.QuadPart
>= Timer
->DueTime
.QuadPart
)
364 /* It did, so we don't need to wait */
365 QueueEntry
= (PLIST_ENTRY
)STATUS_TIMEOUT
;
366 Queue
->CurrentCount
++;
370 /* It didn't, so activate it */
371 Timer
->Header
.Inserted
= TRUE
;
374 /* Insert the wait block in the list */
375 InsertTailList(&Queue
->Header
.WaitListHead
,
376 &WaitBlock
->WaitListEntry
);
378 /* Setup the wait information */
379 Thread
->State
= Waiting
;
381 /* Add the thread to the wait list */
382 KiAddThreadToWaitList(Thread
, Swappable
);
384 /* Activate thread swap */
385 ASSERT(Thread
->WaitIrql
<= DISPATCH_LEVEL
);
386 KiSetThreadSwapBusy(Thread
);
388 /* Check if we have a timer */
392 KxInsertTimer(Timer
, Hand
);
396 /* Otherwise, unlock the dispatcher */
397 KiReleaseDispatcherLockFromDpcLevel();
400 /* Do the actual swap */
401 Status
= KiSwapThread(Thread
, KeGetCurrentPrcb());
403 /* Reset the wait reason */
404 Thread
->WaitReason
= 0;
406 /* Check if we were executing an APC */
407 if (Status
!= STATUS_KERNEL_APC
) return (PLIST_ENTRY
)Status
;
409 /* Check if we had a timeout */
412 /* Recalculate due times */
413 Timeout
= KiRecalculateDueTime(OriginalDueTime
,
419 /* Start another wait */
420 Thread
->WaitIrql
= KeRaiseIrqlToSynchLevel();
422 KiAcquireDispatcherLockAtDpcLevel();
423 Queue
->CurrentCount
--;
427 /* Unlock Database and return */
428 KiReleaseDispatcherLockFromDpcLevel();
429 KiExitDispatcher(Thread
->WaitIrql
);
438 KeRundownQueue(IN PKQUEUE Queue
)
440 PLIST_ENTRY FirstEntry
, NextEntry
;
444 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL
);
445 ASSERT(IsListEmpty(&Queue
->Header
.WaitListHead
));
447 /* Get the Dispatcher Lock */
448 OldIrql
= KiAcquireDispatcherLock();
450 /* Check if the list is empty */
451 FirstEntry
= Queue
->EntryListHead
.Flink
;
452 if (FirstEntry
== &Queue
->EntryListHead
)
454 /* We won't return anything */
459 /* Remove this entry */
460 RemoveEntryList(&Queue
->EntryListHead
);
464 while (!IsListEmpty(&Queue
->ThreadListHead
))
466 /* Get the next entry */
467 NextEntry
= Queue
->ThreadListHead
.Flink
;
469 /* Get the associated thread */
470 Thread
= CONTAINING_RECORD(NextEntry
, KTHREAD
, QueueListEntry
);
472 /* Clear its queue */
473 Thread
->Queue
= NULL
;
475 /* Remove this entry */
476 RemoveEntryList(NextEntry
);
479 /* Release the dispatcher lock */
480 KiReleaseDispatcherLockFromDpcLevel();
482 /* Exit the dispatcher and return the first entry (if any) */
483 KiExitDispatcher(OldIrql
);