7709d0832e80fed3d476161ed3d82e06114b6c9d
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7 * Gunnar Dalsnes
8 * Eric Kohl (ekohl@rz-online.de)
9 */
10
11 /* INCLUDES ******************************************************************/
12
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <internal/debug.h>
16
17 /* PRIVATE FUNCTIONS *********************************************************/
18
19 /*
20 * Called when a thread which has a queue entry is entering a wait state
21 */
22 VOID
23 FASTCALL
24 KiWakeQueue(IN PKQUEUE Queue)
25 {
26 PLIST_ENTRY QueueEntry;
27 PLIST_ENTRY WaitEntry;
28 PKWAIT_BLOCK WaitBlock;
29 PKTHREAD Thread;
30 ASSERT_QUEUE(Queue);
31
32 /* Decrement the number of active threads */
33 Queue->CurrentCount--;
34
35 /* Make sure the counts are OK */
36 if (Queue->CurrentCount < Queue->MaximumCount)
37 {
38 /* Get the Queue Entry */
39 QueueEntry = Queue->EntryListHead.Flink;
40
41 /* Get the Wait Entry */
42 WaitEntry = Queue->Header.WaitListHead.Blink;
43
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry != &Queue->Header.WaitListHead) &&
46 (QueueEntry != &Queue->EntryListHead))
47 {
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry);
50 QueueEntry->Flink = NULL;
51
52 /* Decrease the Signal State */
53 Queue->Header.SignalState--;
54
55 /* Unwait the Thread */
56 WaitBlock = CONTAINING_RECORD(WaitEntry,
57 KWAIT_BLOCK,
58 WaitListEntry);
59 Thread = WaitBlock->Thread;
60 KiAbortWaitThread(Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
61 }
62 }
63 }
64
65 /*
66 * Returns the previous number of entries in the queue
67 */
68 LONG
69 NTAPI
70 KiInsertQueue(IN PKQUEUE Queue,
71 IN PLIST_ENTRY Entry,
72 BOOLEAN Head)
73 {
74 ULONG InitialState;
75 PKTHREAD Thread = KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock;
77 PLIST_ENTRY WaitEntry;
78 ASSERT_QUEUE(Queue);
79
80 /* Save the old state */
81 InitialState = Queue->Header.SignalState;
82
83 /* Get the Entry */
84 WaitEntry = Queue->Header.WaitListHead.Blink;
85
86 /*
87 * Why the KeGetCurrentThread()->Queue != Queue?
88 * KiInsertQueue might be called from an APC for the current thread.
89 * -Gunnar
90 */
91 if ((Queue->CurrentCount < Queue->MaximumCount) &&
92 (WaitEntry != &Queue->Header.WaitListHead) &&
93 ((Thread->Queue != Queue) ||
94 (Thread->WaitReason != WrQueue)))
95 {
96 /* Remove the wait entry */
97 RemoveEntryList(WaitEntry);
98
99 /* Get the Wait Block and Thread */
100 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
101 Thread = WaitBlock->Thread;
102
103 /* Remove the queue from the thread's wait list */
104 Thread->WaitStatus = (NTSTATUS)Entry;
105 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
106 Thread->WaitReason = 0;
107
108 /* Increase the active threads and set the status*/
109 Queue->CurrentCount++;
110
111 /* Check if there's a Thread Timer */
112 if (Thread->Timer.Header.Inserted)
113 {
114 /* Cancel the Thread Timer with the no-lock fastpath */
115 Thread->Timer.Header.Inserted = FALSE;
116 RemoveEntryList(&Thread->Timer.TimerListEntry);
117 }
118
119 /* Reschedule the Thread */
120 KiReadyThread(Thread);
121 }
122 else
123 {
124 /* Increase the Entries */
125 Queue->Header.SignalState++;
126
127 /* Check which mode we're using */
128 if (Head)
129 {
130 /* Insert in the head */
131 InsertHeadList(&Queue->EntryListHead, Entry);
132 }
133 else
134 {
135 /* Insert at the end */
136 InsertTailList(&Queue->EntryListHead, Entry);
137 }
138 }
139
140 /* Return the previous state */
141 return InitialState;
142 }
143
144 /* PUBLIC FUNCTIONS **********************************************************/
145
146 /*
147 * @implemented
148 */
149 VOID
150 NTAPI
151 KeInitializeQueue(IN PKQUEUE Queue,
152 IN ULONG Count OPTIONAL)
153 {
154 /* Initialize the Header */
155 KeInitializeDispatcherHeader(&Queue->Header,
156 QueueObject,
157 sizeof(KQUEUE) / sizeof(ULONG),
158 0);
159
160 /* Initialize the Lists */
161 InitializeListHead(&Queue->EntryListHead);
162 InitializeListHead(&Queue->ThreadListHead);
163
164 /* Set the Current and Maximum Count */
165 Queue->CurrentCount = 0;
166 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
167 }
168
169 /*
170 * @implemented
171 */
172 LONG
173 NTAPI
174 KeInsertHeadQueue(IN PKQUEUE Queue,
175 IN PLIST_ENTRY Entry)
176 {
177 LONG PreviousState;
178 KIRQL OldIrql;
179 ASSERT_QUEUE(Queue);
180 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
181
182 /* Lock the Dispatcher Database */
183 OldIrql = KeAcquireDispatcherDatabaseLock();
184
185 /* Insert the Queue */
186 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
187
188 /* Release the Dispatcher Lock */
189 KeReleaseDispatcherDatabaseLock(OldIrql);
190
191 /* Return previous State */
192 return PreviousState;
193 }
194
195 /*
196 * @implemented
197 */
198 LONG
199 NTAPI
200 KeInsertQueue(IN PKQUEUE Queue,
201 IN PLIST_ENTRY Entry)
202 {
203 LONG PreviousState;
204 KIRQL OldIrql;
205 ASSERT_QUEUE(Queue);
206 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
207
208 /* Lock the Dispatcher Database */
209 OldIrql = KeAcquireDispatcherDatabaseLock();
210
211 /* Insert the Queue */
212 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
213
214 /* Release the Dispatcher Lock */
215 KeReleaseDispatcherDatabaseLock(OldIrql);
216
217 /* Return previous State */
218 return PreviousState;
219 }
220
221 /*
222 * @implemented
223 *
224 * Returns number of entries in the queue
225 */
226 LONG
227 NTAPI
228 KeReadStateQueue(IN PKQUEUE Queue)
229 {
230 /* Returns the Signal State */
231 ASSERT_QUEUE(Queue);
232 return Queue->Header.SignalState;
233 }
234
235 /*
236 * @implemented
237 */
238 PLIST_ENTRY
239 NTAPI
240 KeRemoveQueue(IN PKQUEUE Queue,
241 IN KPROCESSOR_MODE WaitMode,
242 IN PLARGE_INTEGER Timeout OPTIONAL)
243 {
244 PLIST_ENTRY QueueEntry;
245 NTSTATUS Status;
246 PKTHREAD Thread = KeGetCurrentThread();
247 KIRQL OldIrql;
248 PKQUEUE PreviousQueue;
249 PKWAIT_BLOCK WaitBlock;
250 PKTIMER Timer;
251 BOOLEAN Swappable;
252 ASSERT_QUEUE(Queue);
253 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
254
255 /* Check if the Lock is already held */
256 if (Thread->WaitNext)
257 {
258 /* It is, so next time don't do expect this */
259 Thread->WaitNext = FALSE;
260 }
261 else
262 {
263 /* Lock the Dispatcher Database */
264 OldIrql = KeAcquireDispatcherDatabaseLock();
265 Thread->WaitIrql = OldIrql;
266 }
267
268 /*
269 * This is needed so that we can set the new queue right here,
270 * before additional processing
271 */
272 PreviousQueue = Thread->Queue;
273 Thread->Queue = Queue;
274
275 /* Check if this is a different queue */
276 if (Queue != PreviousQueue)
277 {
278 /* Get the current entry */
279 QueueEntry = &Thread->QueueListEntry;
280 if (PreviousQueue)
281 {
282 /* Remove from this list */
283 RemoveEntryList(QueueEntry);
284
285 /* Wake the queue */
286 KiWakeQueue(PreviousQueue);
287 }
288
289 /* Insert in this new Queue */
290 InsertTailList(&Queue->ThreadListHead, QueueEntry);
291 }
292 else
293 {
294 /* Same queue, decrement waiting threads */
295 Queue->CurrentCount--;
296 }
297
298 /* Loop until the queue is processed */
299 while (TRUE)
300 {
301 /* Check if the counts are valid and if there is still a queued entry */
302 QueueEntry = Queue->EntryListHead.Flink;
303 if ((Queue->CurrentCount < Queue->MaximumCount) &&
304 (QueueEntry != &Queue->EntryListHead))
305 {
306 /* Decrease the number of entries */
307 Queue->Header.SignalState--;
308
309 /* Increase numbef of running threads */
310 Queue->CurrentCount++;
311
312 /* Check if the entry is valid. If not, bugcheck */
313 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
314 {
315 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
316 }
317
318 /* Remove the Entry */
319 RemoveEntryList(QueueEntry);
320 QueueEntry->Flink = NULL;
321
322 /* Nothing to wait on */
323 break;
324 }
325 else
326 {
327 /* Use the Thread's Wait Block, it's big enough */
328 Thread->WaitBlockList = &Thread->WaitBlock[0];
329
330 /* Check if a kernel APC is pending and we're below APC_LEVEL */
331 if ((Thread->ApcState.KernelApcPending) &&
332 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
333 {
334 /* Increment the count and unlock the dispatcher */
335 Queue->CurrentCount++;
336 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
337 }
338 else
339 {
340 /* Fail if there's a User APC Pending */
341 if ((WaitMode != KernelMode) &&
342 (Thread->ApcState.UserApcPending))
343 {
344 /* Return the status and increase the pending threads */
345 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
346 Queue->CurrentCount++;
347 break;
348 }
349
350 /* Build the Wait Block */
351 WaitBlock = &Thread->WaitBlock[0];
352 WaitBlock->Object = (PVOID)Queue;
353 WaitBlock->WaitKey = STATUS_SUCCESS;
354 WaitBlock->WaitType = WaitAny;
355 WaitBlock->Thread = Thread;
356 Thread->WaitStatus = STATUS_WAIT_0;
357
358 /* Check if we can swap the thread's stack */
359 Thread->WaitListEntry.Flink = NULL;
360 KiCheckThreadStackSwap(WaitMode, Thread, Swappable);
361
362 /* We need to wait for the object... check for a timeout */
363 if (Timeout)
364 {
365 /* Check if it's zero */
366 if (!Timeout->QuadPart)
367 {
368 /* Don't wait. Return and increase pending threads */
369 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
370 Queue->CurrentCount++;
371 break;
372 }
373
374 /*
375 * Set up the Timer. We'll use the internal function so
376 * that we can hold on to the dispatcher lock.
377 */
378 Timer = &Thread->Timer;
379 WaitBlock->NextWaitBlock = &Thread->WaitBlock[1];
380 WaitBlock = &Thread->WaitBlock[1];
381
382 /* Set up the Timer Wait Block */
383 WaitBlock->Object = (PVOID)Timer;
384 WaitBlock->Thread = Thread;
385 WaitBlock->WaitKey = STATUS_TIMEOUT;
386 WaitBlock->WaitType = WaitAny;
387
388 /* Link the timer to this Wait Block */
389 Timer->Header.WaitListHead.Flink =
390 &WaitBlock->WaitListEntry;
391 Timer->Header.WaitListHead.Blink =
392 &WaitBlock->WaitListEntry;
393 WaitBlock->WaitListEntry.Flink =
394 &Timer->Header.WaitListHead;
395 WaitBlock->WaitListEntry.Blink =
396 &Timer->Header.WaitListHead;
397
398 /* Create Timer */
399 KiInsertTimer(Timer, *Timeout);
400 }
401
402 /* Close the loop */
403 WaitBlock->NextWaitBlock = &Thread->WaitBlock[0];
404
405 /* Insert the wait block into the Queues's wait list */
406 WaitBlock = &Thread->WaitBlock[0];
407 InsertTailList(&Queue->Header.WaitListHead,
408 &WaitBlock->WaitListEntry);
409
410 /* Setup the wait information */
411 Thread->WaitMode = WaitMode;
412 Thread->WaitReason = WrQueue;
413 Thread->Alertable = FALSE;
414 Thread->WaitTime = ((PLARGE_INTEGER)&KeTickCount)->LowPart;
415 Thread->State = Waiting;
416
417 /* Find a new thread to run */
418 KiAddThreadToWaitList(Thread, Swappable);
419 Status = KiSwapThread();
420
421 /* Reset the wait reason */
422 Thread->WaitReason = 0;
423
424 /* Check if we were executing an APC */
425 if (Status != STATUS_KERNEL_APC)
426 {
427 /* Done Waiting */
428 return (PLIST_ENTRY)Status;
429 }
430
431 /* Check if we had a timeout */
432 if (Timeout)
433 {
434 DPRINT1("If you see this message, contact Alex ASAP\n");
435 KEBUGCHECK(0);
436 }
437 }
438
439 /* Reacquire the lock */
440 OldIrql = KeAcquireDispatcherDatabaseLock();
441
442 /* Save the new IRQL and decrease number of waiting threads */
443 Thread->WaitIrql = OldIrql;
444 Queue->CurrentCount--;
445 }
446 }
447
448 /* Unlock Database and return */
449 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
450 return QueueEntry;
451 }
452
453 /*
454 * @implemented
455 */
456 PLIST_ENTRY
457 NTAPI
458 KeRundownQueue(IN PKQUEUE Queue)
459 {
460 PLIST_ENTRY EnumEntry;
461 PLIST_ENTRY FirstEntry = NULL;
462 PKTHREAD Thread;
463 KIRQL OldIrql;
464 ASSERT_QUEUE(Queue);
465 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
466 ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
467
468 /* Get the Dispatcher Lock */
469 OldIrql = KeAcquireDispatcherDatabaseLock();
470
471 /* Make sure the list is not empty */
472 if (!IsListEmpty(&Queue->EntryListHead))
473 {
474 /* Remove it */
475 FirstEntry = RemoveHeadList(&Queue->EntryListHead);
476 }
477
478 /* Unlink threads and clear their Thread->Queue */
479 while (!IsListEmpty(&Queue->ThreadListHead))
480 {
481 /* Get the Entry and Remove it */
482 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
483
484 /* Get the Entry's Thread */
485 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
486
487 /* Kill its Queue */
488 Thread->Queue = NULL;
489 }
490
491 /* Release the lock and return */
492 KeReleaseDispatcherDatabaseLock(OldIrql);
493 return FirstEntry;
494 }
495
496 /* EOF */