[CMAKE]
[reactos.git] / ntoskrnl / ke / queue.c
1 /*
2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7 * Gunnar Dalsnes
8 * Eric Kohl
9 */
10
11 /* INCLUDES ******************************************************************/
12
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <debug.h>
16
17 /* PRIVATE FUNCTIONS *********************************************************/
18
19 /*
20 * Called when a thread which has a queue entry is entering a wait state
21 */
22 VOID
23 FASTCALL
24 KiActivateWaiterQueue(IN PKQUEUE Queue)
25 {
26 PLIST_ENTRY QueueEntry;
27 PLIST_ENTRY WaitEntry;
28 PKWAIT_BLOCK WaitBlock;
29 PKTHREAD Thread;
30 ASSERT_QUEUE(Queue);
31
32 /* Decrement the number of active threads */
33 Queue->CurrentCount--;
34
35 /* Make sure the counts are OK */
36 if (Queue->CurrentCount < Queue->MaximumCount)
37 {
38 /* Get the Queue Entry */
39 QueueEntry = Queue->EntryListHead.Flink;
40
41 /* Get the Wait Entry */
42 WaitEntry = Queue->Header.WaitListHead.Blink;
43
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry != &Queue->Header.WaitListHead) &&
46 (QueueEntry != &Queue->EntryListHead))
47 {
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry);
50 QueueEntry->Flink = NULL;
51
52 /* Decrease the Signal State */
53 Queue->Header.SignalState--;
54
55 /* Unwait the Thread */
56 WaitBlock = CONTAINING_RECORD(WaitEntry,
57 KWAIT_BLOCK,
58 WaitListEntry);
59 Thread = WaitBlock->Thread;
60 KiUnwaitThread(Thread, (LONG_PTR)QueueEntry, IO_NO_INCREMENT);
61 }
62 }
63 }
64
65 /*
66 * Returns the previous number of entries in the queue
67 */
68 LONG
69 NTAPI
70 KiInsertQueue(IN PKQUEUE Queue,
71 IN PLIST_ENTRY Entry,
72 IN BOOLEAN Head)
73 {
74 ULONG InitialState;
75 PKTHREAD Thread = KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock;
77 PLIST_ENTRY WaitEntry;
78 PKTIMER Timer;
79 ASSERT_QUEUE(Queue);
80
81 /* Save the old state */
82 InitialState = Queue->Header.SignalState;
83
84 /* Get the Entry */
85 WaitEntry = Queue->Header.WaitListHead.Blink;
86
87 /*
88 * Why the KeGetCurrentThread()->Queue != Queue?
89 * KiInsertQueue might be called from an APC for the current thread.
90 * -Gunnar
91 */
92 if ((Queue->CurrentCount < Queue->MaximumCount) &&
93 (WaitEntry != &Queue->Header.WaitListHead) &&
94 ((Thread->Queue != Queue) ||
95 (Thread->WaitReason != WrQueue)))
96 {
97 /* Remove the wait entry */
98 RemoveEntryList(WaitEntry);
99
100 /* Get the Wait Block and Thread */
101 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
102 Thread = WaitBlock->Thread;
103
104 /* Remove the queue from the thread's wait list */
105 Thread->WaitStatus = (LONG_PTR)Entry;
106 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
107
108 /* Increase the active threads and remove any wait reason */
109 Queue->CurrentCount++;
110 Thread->WaitReason = 0;
111
112 /* Check if there's a Thread Timer */
113 Timer = &Thread->Timer;
114 if (Timer->Header.Inserted) KxRemoveTreeTimer(Timer);
115
116 /* Reschedule the Thread */
117 KiReadyThread(Thread);
118 }
119 else
120 {
121 /* Increase the Entries */
122 Queue->Header.SignalState++;
123
124 /* Check which mode we're using */
125 if (Head)
126 {
127 /* Insert in the head */
128 InsertHeadList(&Queue->EntryListHead, Entry);
129 }
130 else
131 {
132 /* Insert at the end */
133 InsertTailList(&Queue->EntryListHead, Entry);
134 }
135 }
136
137 /* Return the previous state */
138 return InitialState;
139 }
140
141 /* PUBLIC FUNCTIONS **********************************************************/
142
143 /*
144 * @implemented
145 */
146 VOID
147 NTAPI
148 KeInitializeQueue(IN PKQUEUE Queue,
149 IN ULONG Count OPTIONAL)
150 {
151 /* Initialize the Header */
152 KeInitializeDispatcherHeader(&Queue->Header,
153 QueueObject,
154 sizeof(KQUEUE) / sizeof(ULONG),
155 0);
156
157 /* Initialize the Lists */
158 InitializeListHead(&Queue->EntryListHead);
159 InitializeListHead(&Queue->ThreadListHead);
160
161 /* Set the Current and Maximum Count */
162 Queue->CurrentCount = 0;
163 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
164 }
165
166 /*
167 * @implemented
168 */
169 LONG
170 NTAPI
171 KeInsertHeadQueue(IN PKQUEUE Queue,
172 IN PLIST_ENTRY Entry)
173 {
174 LONG PreviousState;
175 KIRQL OldIrql;
176 ASSERT_QUEUE(Queue);
177 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
178
179 /* Lock the Dispatcher Database */
180 OldIrql = KiAcquireDispatcherLock();
181
182 /* Insert the Queue */
183 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
184
185 /* Release the Dispatcher Lock */
186 KiReleaseDispatcherLock(OldIrql);
187
188 /* Return previous State */
189 return PreviousState;
190 }
191
192 /*
193 * @implemented
194 */
195 LONG
196 NTAPI
197 KeInsertQueue(IN PKQUEUE Queue,
198 IN PLIST_ENTRY Entry)
199 {
200 LONG PreviousState;
201 KIRQL OldIrql;
202 ASSERT_QUEUE(Queue);
203 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
204
205 /* Lock the Dispatcher Database */
206 OldIrql = KiAcquireDispatcherLock();
207
208 /* Insert the Queue */
209 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
210
211 /* Release the Dispatcher Lock */
212 KiReleaseDispatcherLock(OldIrql);
213
214 /* Return previous State */
215 return PreviousState;
216 }
217
218 /*
219 * @implemented
220 *
221 * Returns number of entries in the queue
222 */
223 LONG
224 NTAPI
225 KeReadStateQueue(IN PKQUEUE Queue)
226 {
227 /* Returns the Signal State */
228 ASSERT_QUEUE(Queue);
229 return Queue->Header.SignalState;
230 }
231
232 /*
233 * @implemented
234 */
235 PLIST_ENTRY
236 NTAPI
237 KeRemoveQueue(IN PKQUEUE Queue,
238 IN KPROCESSOR_MODE WaitMode,
239 IN PLARGE_INTEGER Timeout OPTIONAL)
240 {
241 PLIST_ENTRY QueueEntry;
242 LONG_PTR Status;
243 PKTHREAD Thread = KeGetCurrentThread();
244 PKQUEUE PreviousQueue;
245 PKWAIT_BLOCK WaitBlock = &Thread->WaitBlock[0];
246 PKWAIT_BLOCK TimerBlock = &Thread->WaitBlock[TIMER_WAIT_BLOCK];
247 PKTIMER Timer = &Thread->Timer;
248 BOOLEAN Swappable;
249 PLARGE_INTEGER OriginalDueTime = Timeout;
250 LARGE_INTEGER DueTime, NewDueTime, InterruptTime;
251 ULONG Hand = 0;
252 ASSERT_QUEUE(Queue);
253 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
254
255 /* Check if the Lock is already held */
256 if (Thread->WaitNext)
257 {
258 /* It is, so next time don't do expect this */
259 Thread->WaitNext = FALSE;
260 KxQueueThreadWait();
261 }
262 else
263 {
264 /* Raise IRQL to synch, prepare the wait, then lock the database */
265 Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
266 KxQueueThreadWait();
267 KiAcquireDispatcherLockAtDpcLevel();
268 }
269
270 /*
271 * This is needed so that we can set the new queue right here,
272 * before additional processing
273 */
274 PreviousQueue = Thread->Queue;
275 Thread->Queue = Queue;
276
277 /* Check if this is a different queue */
278 if (Queue != PreviousQueue)
279 {
280 /* Get the current entry */
281 QueueEntry = &Thread->QueueListEntry;
282 if (PreviousQueue)
283 {
284 /* Remove from this list */
285 RemoveEntryList(QueueEntry);
286
287 /* Wake the queue */
288 KiActivateWaiterQueue(PreviousQueue);
289 }
290
291 /* Insert in this new Queue */
292 InsertTailList(&Queue->ThreadListHead, QueueEntry);
293 }
294 else
295 {
296 /* Same queue, decrement waiting threads */
297 Queue->CurrentCount--;
298 }
299
300 /* Loop until the queue is processed */
301 while (TRUE)
302 {
303 /* Check if the counts are valid and if there is still a queued entry */
304 QueueEntry = Queue->EntryListHead.Flink;
305 if ((Queue->CurrentCount < Queue->MaximumCount) &&
306 (QueueEntry != &Queue->EntryListHead))
307 {
308 /* Decrease the number of entries */
309 Queue->Header.SignalState--;
310
311 /* Increase numbef of running threads */
312 Queue->CurrentCount++;
313
314 /* Check if the entry is valid. If not, bugcheck */
315 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
316 {
317 /* Invalid item */
318 KeBugCheckEx(INVALID_WORK_QUEUE_ITEM,
319 (ULONG_PTR)QueueEntry,
320 (ULONG_PTR)Queue,
321 (ULONG_PTR)NULL,
322 (ULONG_PTR)((PWORK_QUEUE_ITEM)QueueEntry)->
323 WorkerRoutine);
324 }
325
326 /* Remove the Entry */
327 RemoveEntryList(QueueEntry);
328 QueueEntry->Flink = NULL;
329
330 /* Nothing to wait on */
331 break;
332 }
333 else
334 {
335 /* Check if a kernel APC is pending and we're below APC_LEVEL */
336 if ((Thread->ApcState.KernelApcPending) &&
337 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
338 {
339 /* Increment the count and unlock the dispatcher */
340 Queue->CurrentCount++;
341 KiReleaseDispatcherLockFromDpcLevel();
342 KiExitDispatcher(Thread->WaitIrql);
343 }
344 else
345 {
346 /* Fail if there's a User APC Pending */
347 if ((WaitMode != KernelMode) &&
348 (Thread->ApcState.UserApcPending))
349 {
350 /* Return the status and increase the pending threads */
351 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
352 Queue->CurrentCount++;
353 break;
354 }
355
356 /* Enable the Timeout Timer if there was any specified */
357 if (Timeout)
358 {
359 /* Check if the timer expired */
360 InterruptTime.QuadPart = KeQueryInterruptTime();
361 if ((ULONG64)InterruptTime.QuadPart >= Timer->DueTime.QuadPart)
362 {
363 /* It did, so we don't need to wait */
364 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
365 Queue->CurrentCount++;
366 break;
367 }
368
369 /* It didn't, so activate it */
370 Timer->Header.Inserted = TRUE;
371 }
372
373 /* Insert the wait block in the list */
374 InsertTailList(&Queue->Header.WaitListHead,
375 &WaitBlock->WaitListEntry);
376
377 /* Setup the wait information */
378 Thread->State = Waiting;
379
380 /* Add the thread to the wait list */
381 KiAddThreadToWaitList(Thread, Swappable);
382
383 /* Activate thread swap */
384 ASSERT(Thread->WaitIrql <= DISPATCH_LEVEL);
385 KiSetThreadSwapBusy(Thread);
386
387 /* Check if we have a timer */
388 if (Timeout)
389 {
390 /* Insert it */
391 KxInsertTimer(Timer, Hand);
392 }
393 else
394 {
395 /* Otherwise, unlock the dispatcher */
396 KiReleaseDispatcherLockFromDpcLevel();
397 }
398
399 /* Do the actual swap */
400 Status = KiSwapThread(Thread, KeGetCurrentPrcb());
401
402 /* Reset the wait reason */
403 Thread->WaitReason = 0;
404
405 /* Check if we were executing an APC */
406 if (Status != STATUS_KERNEL_APC) return (PLIST_ENTRY)Status;
407
408 /* Check if we had a timeout */
409 if (Timeout)
410 {
411 /* Recalculate due times */
412 Timeout = KiRecalculateDueTime(OriginalDueTime,
413 &DueTime,
414 &NewDueTime);
415 }
416 }
417
418 /* Start another wait */
419 Thread->WaitIrql = KeRaiseIrqlToSynchLevel();
420 KxQueueThreadWait();
421 KiAcquireDispatcherLockAtDpcLevel();
422 Queue->CurrentCount--;
423 }
424 }
425
426 /* Unlock Database and return */
427 KiReleaseDispatcherLockFromDpcLevel();
428 KiExitDispatcher(Thread->WaitIrql);
429 return QueueEntry;
430 }
431
432 /*
433 * @implemented
434 */
435 PLIST_ENTRY
436 NTAPI
437 KeRundownQueue(IN PKQUEUE Queue)
438 {
439 PLIST_ENTRY FirstEntry, NextEntry;
440 PKTHREAD Thread;
441 KIRQL OldIrql;
442 ASSERT_QUEUE(Queue);
443 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
444 ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
445
446 /* Get the Dispatcher Lock */
447 OldIrql = KiAcquireDispatcherLock();
448
449 /* Check if the list is empty */
450 FirstEntry = Queue->EntryListHead.Flink;
451 if (FirstEntry == &Queue->EntryListHead)
452 {
453 /* We won't return anything */
454 FirstEntry = NULL;
455 }
456 else
457 {
458 /* Remove this entry */
459 RemoveEntryList(&Queue->EntryListHead);
460 }
461
462 /* Loop the list */
463 while (!IsListEmpty(&Queue->ThreadListHead))
464 {
465 /* Get the next entry */
466 NextEntry = Queue->ThreadListHead.Flink;
467
468 /* Get the associated thread */
469 Thread = CONTAINING_RECORD(NextEntry, KTHREAD, QueueListEntry);
470
471 /* Clear its queue */
472 Thread->Queue = NULL;
473
474 /* Remove this entry */
475 RemoveEntryList(NextEntry);
476 }
477
478 /* Release the dispatcher lock */
479 KiReleaseDispatcherLockFromDpcLevel();
480
481 /* Exit the dispatcher and return the first entry (if any) */
482 KiExitDispatcher(OldIrql);
483 return FirstEntry;
484 }
485
486 /* EOF */