- Implement KiRecalculateDueTime to handle cases where a timeout wait has been interu...
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7 * Gunnar Dalsnes
8 * Eric Kohl (ekohl@rz-online.de)
9 */
10
11 /* INCLUDES ******************************************************************/
12
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <internal/debug.h>
16
17 /* PRIVATE FUNCTIONS *********************************************************/
18
19 /*
20 * Called when a thread which has a queue entry is entering a wait state
21 */
22 VOID
23 FASTCALL
24 KiWakeQueue(IN PKQUEUE Queue)
25 {
26 PLIST_ENTRY QueueEntry;
27 PLIST_ENTRY WaitEntry;
28 PKWAIT_BLOCK WaitBlock;
29 PKTHREAD Thread;
30 ASSERT_QUEUE(Queue);
31
32 /* Decrement the number of active threads */
33 Queue->CurrentCount--;
34
35 /* Make sure the counts are OK */
36 if (Queue->CurrentCount < Queue->MaximumCount)
37 {
38 /* Get the Queue Entry */
39 QueueEntry = Queue->EntryListHead.Flink;
40
41 /* Get the Wait Entry */
42 WaitEntry = Queue->Header.WaitListHead.Blink;
43
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry != &Queue->Header.WaitListHead) &&
46 (QueueEntry != &Queue->EntryListHead))
47 {
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry);
50 QueueEntry->Flink = NULL;
51
52 /* Decrease the Signal State */
53 Queue->Header.SignalState--;
54
55 /* Unwait the Thread */
56 WaitBlock = CONTAINING_RECORD(WaitEntry,
57 KWAIT_BLOCK,
58 WaitListEntry);
59 Thread = WaitBlock->Thread;
60 KiAbortWaitThread(Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
61 }
62 }
63 }
64
65 /*
66 * Returns the previous number of entries in the queue
67 */
68 LONG
69 NTAPI
70 KiInsertQueue(IN PKQUEUE Queue,
71 IN PLIST_ENTRY Entry,
72 BOOLEAN Head)
73 {
74 ULONG InitialState;
75 PKTHREAD Thread = KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock;
77 PLIST_ENTRY WaitEntry;
78 ASSERT_QUEUE(Queue);
79
80 /* Save the old state */
81 InitialState = Queue->Header.SignalState;
82
83 /* Get the Entry */
84 WaitEntry = Queue->Header.WaitListHead.Blink;
85
86 /*
87 * Why the KeGetCurrentThread()->Queue != Queue?
88 * KiInsertQueue might be called from an APC for the current thread.
89 * -Gunnar
90 */
91 if ((Queue->CurrentCount < Queue->MaximumCount) &&
92 (WaitEntry != &Queue->Header.WaitListHead) &&
93 ((Thread->Queue != Queue) ||
94 (Thread->WaitReason != WrQueue)))
95 {
96 /* Remove the wait entry */
97 RemoveEntryList(WaitEntry);
98
99 /* Get the Wait Block and Thread */
100 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
101 Thread = WaitBlock->Thread;
102
103 /* Remove the queue from the thread's wait list */
104 Thread->WaitStatus = (NTSTATUS)Entry;
105 if (Thread->WaitListEntry.Flink) RemoveEntryList(&Thread->WaitListEntry);
106 Thread->WaitReason = 0;
107
108 /* Increase the active threads and set the status*/
109 Queue->CurrentCount++;
110
111 /* Check if there's a Thread Timer */
112 if (Thread->Timer.Header.Inserted)
113 {
114 /* Cancel the Thread Timer with the no-lock fastpath */
115 Thread->Timer.Header.Inserted = FALSE;
116 RemoveEntryList(&Thread->Timer.TimerListEntry);
117 }
118
119 /* Reschedule the Thread */
120 KiReadyThread(Thread);
121 }
122 else
123 {
124 /* Increase the Entries */
125 Queue->Header.SignalState++;
126
127 /* Check which mode we're using */
128 if (Head)
129 {
130 /* Insert in the head */
131 InsertHeadList(&Queue->EntryListHead, Entry);
132 }
133 else
134 {
135 /* Insert at the end */
136 InsertTailList(&Queue->EntryListHead, Entry);
137 }
138 }
139
140 /* Return the previous state */
141 return InitialState;
142 }
143
144 /* PUBLIC FUNCTIONS **********************************************************/
145
146 /*
147 * @implemented
148 */
149 VOID
150 NTAPI
151 KeInitializeQueue(IN PKQUEUE Queue,
152 IN ULONG Count OPTIONAL)
153 {
154 /* Initialize the Header */
155 KeInitializeDispatcherHeader(&Queue->Header,
156 QueueObject,
157 sizeof(KQUEUE) / sizeof(ULONG),
158 0);
159
160 /* Initialize the Lists */
161 InitializeListHead(&Queue->EntryListHead);
162 InitializeListHead(&Queue->ThreadListHead);
163
164 /* Set the Current and Maximum Count */
165 Queue->CurrentCount = 0;
166 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
167 }
168
169 /*
170 * @implemented
171 */
172 LONG
173 NTAPI
174 KeInsertHeadQueue(IN PKQUEUE Queue,
175 IN PLIST_ENTRY Entry)
176 {
177 LONG PreviousState;
178 KIRQL OldIrql;
179 ASSERT_QUEUE(Queue);
180 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
181
182 /* Lock the Dispatcher Database */
183 OldIrql = KeAcquireDispatcherDatabaseLock();
184
185 /* Insert the Queue */
186 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
187
188 /* Release the Dispatcher Lock */
189 KeReleaseDispatcherDatabaseLock(OldIrql);
190
191 /* Return previous State */
192 return PreviousState;
193 }
194
195 /*
196 * @implemented
197 */
198 LONG
199 NTAPI
200 KeInsertQueue(IN PKQUEUE Queue,
201 IN PLIST_ENTRY Entry)
202 {
203 LONG PreviousState;
204 KIRQL OldIrql;
205 ASSERT_QUEUE(Queue);
206 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
207
208 /* Lock the Dispatcher Database */
209 OldIrql = KeAcquireDispatcherDatabaseLock();
210
211 /* Insert the Queue */
212 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
213
214 /* Release the Dispatcher Lock */
215 KeReleaseDispatcherDatabaseLock(OldIrql);
216
217 /* Return previous State */
218 return PreviousState;
219 }
220
221 /*
222 * @implemented
223 *
224 * Returns number of entries in the queue
225 */
226 LONG
227 NTAPI
228 KeReadStateQueue(IN PKQUEUE Queue)
229 {
230 /* Returns the Signal State */
231 ASSERT_QUEUE(Queue);
232 return Queue->Header.SignalState;
233 }
234
235 /*
236 * @implemented
237 */
238 PLIST_ENTRY
239 NTAPI
240 KeRemoveQueue(IN PKQUEUE Queue,
241 IN KPROCESSOR_MODE WaitMode,
242 IN PLARGE_INTEGER Timeout OPTIONAL)
243 {
244 PLIST_ENTRY QueueEntry;
245 NTSTATUS Status;
246 PKTHREAD Thread = KeGetCurrentThread();
247 KIRQL OldIrql;
248 PKQUEUE PreviousQueue;
249 PKWAIT_BLOCK WaitBlock;
250 PKTIMER Timer;
251 BOOLEAN Swappable;
252 PLARGE_INTEGER OriginalDueTime = Timeout;
253 LARGE_INTEGER DueTime, NewDueTime;
254 ASSERT_QUEUE(Queue);
255 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
256
257 /* Check if the Lock is already held */
258 if (Thread->WaitNext)
259 {
260 /* It is, so next time don't do expect this */
261 Thread->WaitNext = FALSE;
262 }
263 else
264 {
265 /* Lock the Dispatcher Database */
266 OldIrql = KeAcquireDispatcherDatabaseLock();
267 Thread->WaitIrql = OldIrql;
268 }
269
270 /*
271 * This is needed so that we can set the new queue right here,
272 * before additional processing
273 */
274 PreviousQueue = Thread->Queue;
275 Thread->Queue = Queue;
276
277 /* Check if this is a different queue */
278 if (Queue != PreviousQueue)
279 {
280 /* Get the current entry */
281 QueueEntry = &Thread->QueueListEntry;
282 if (PreviousQueue)
283 {
284 /* Remove from this list */
285 RemoveEntryList(QueueEntry);
286
287 /* Wake the queue */
288 KiWakeQueue(PreviousQueue);
289 }
290
291 /* Insert in this new Queue */
292 InsertTailList(&Queue->ThreadListHead, QueueEntry);
293 }
294 else
295 {
296 /* Same queue, decrement waiting threads */
297 Queue->CurrentCount--;
298 }
299
300 /* Loop until the queue is processed */
301 while (TRUE)
302 {
303 /* Check if the counts are valid and if there is still a queued entry */
304 QueueEntry = Queue->EntryListHead.Flink;
305 if ((Queue->CurrentCount < Queue->MaximumCount) &&
306 (QueueEntry != &Queue->EntryListHead))
307 {
308 /* Decrease the number of entries */
309 Queue->Header.SignalState--;
310
311 /* Increase numbef of running threads */
312 Queue->CurrentCount++;
313
314 /* Check if the entry is valid. If not, bugcheck */
315 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
316 {
317 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
318 }
319
320 /* Remove the Entry */
321 RemoveEntryList(QueueEntry);
322 QueueEntry->Flink = NULL;
323
324 /* Nothing to wait on */
325 break;
326 }
327 else
328 {
329 /* Use the Thread's Wait Block, it's big enough */
330 Thread->WaitBlockList = &Thread->WaitBlock[0];
331
332 /* Check if a kernel APC is pending and we're below APC_LEVEL */
333 if ((Thread->ApcState.KernelApcPending) &&
334 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
335 {
336 /* Increment the count and unlock the dispatcher */
337 Queue->CurrentCount++;
338 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
339 }
340 else
341 {
342 /* Fail if there's a User APC Pending */
343 if ((WaitMode != KernelMode) &&
344 (Thread->ApcState.UserApcPending))
345 {
346 /* Return the status and increase the pending threads */
347 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
348 Queue->CurrentCount++;
349 break;
350 }
351
352 /* Build the Wait Block */
353 WaitBlock = &Thread->WaitBlock[0];
354 WaitBlock->Object = (PVOID)Queue;
355 WaitBlock->WaitKey = STATUS_SUCCESS;
356 WaitBlock->WaitType = WaitAny;
357 WaitBlock->Thread = Thread;
358 Thread->WaitStatus = STATUS_WAIT_0;
359
360 /* Check if we can swap the thread's stack */
361 Thread->WaitListEntry.Flink = NULL;
362 KiCheckThreadStackSwap(WaitMode, Thread, Swappable);
363
364 /* We need to wait for the object... check for a timeout */
365 if (Timeout)
366 {
367 /* Check if it's zero */
368 if (!Timeout->QuadPart)
369 {
370 /* Don't wait. Return and increase pending threads */
371 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
372 Queue->CurrentCount++;
373 break;
374 }
375
376 /*
377 * Set up the Timer. We'll use the internal function so
378 * that we can hold on to the dispatcher lock.
379 */
380 Timer = &Thread->Timer;
381 WaitBlock->NextWaitBlock = &Thread->WaitBlock[1];
382 WaitBlock = &Thread->WaitBlock[1];
383
384 /* Set up the Timer Wait Block */
385 WaitBlock->Object = (PVOID)Timer;
386 WaitBlock->Thread = Thread;
387 WaitBlock->WaitKey = STATUS_TIMEOUT;
388 WaitBlock->WaitType = WaitAny;
389
390 /* Link the timer to this Wait Block */
391 Timer->Header.WaitListHead.Flink =
392 &WaitBlock->WaitListEntry;
393 Timer->Header.WaitListHead.Blink =
394 &WaitBlock->WaitListEntry;
395 WaitBlock->WaitListEntry.Flink =
396 &Timer->Header.WaitListHead;
397 WaitBlock->WaitListEntry.Blink =
398 &Timer->Header.WaitListHead;
399
400 /* Create Timer */
401 if (!KiInsertTimer(Timer, *Timeout))
402 {
403 /* FIXME */
404 DPRINT1("If you see thie message contact Alex ASAP\n");
405 KEBUGCHECK(0);
406 }
407
408 /* Set timer due time */
409 DueTime.QuadPart = Timer->DueTime.QuadPart;
410 }
411
412 /* Close the loop */
413 WaitBlock->NextWaitBlock = &Thread->WaitBlock[0];
414
415 /* Insert the wait block into the Queues's wait list */
416 WaitBlock = &Thread->WaitBlock[0];
417 InsertTailList(&Queue->Header.WaitListHead,
418 &WaitBlock->WaitListEntry);
419
420 /* Setup the wait information */
421 Thread->WaitMode = WaitMode;
422 Thread->WaitReason = WrQueue;
423 Thread->Alertable = FALSE;
424 Thread->WaitTime = ((PLARGE_INTEGER)&KeTickCount)->LowPart;
425 Thread->State = Waiting;
426
427 /* Find a new thread to run */
428 KiAddThreadToWaitList(Thread, Swappable);
429 Status = KiSwapThread();
430
431 /* Reset the wait reason */
432 Thread->WaitReason = 0;
433
434 /* Check if we were executing an APC */
435 if (Status != STATUS_KERNEL_APC)
436 {
437 /* Done Waiting */
438 return (PLIST_ENTRY)Status;
439 }
440
441 /* Check if we had a timeout */
442 if (Timeout)
443 {
444 /* Recalculate due times */
445 Timeout = KiRecalculateDueTime(OriginalDueTime,
446 &DueTime,
447 &NewDueTime);
448 }
449 }
450
451 /* Reacquire the lock */
452 OldIrql = KeAcquireDispatcherDatabaseLock();
453
454 /* Save the new IRQL and decrease number of waiting threads */
455 Thread->WaitIrql = OldIrql;
456 Queue->CurrentCount--;
457 }
458 }
459
460 /* Unlock Database and return */
461 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
462 return QueueEntry;
463 }
464
465 /*
466 * @implemented
467 */
468 PLIST_ENTRY
469 NTAPI
470 KeRundownQueue(IN PKQUEUE Queue)
471 {
472 PLIST_ENTRY EnumEntry;
473 PLIST_ENTRY FirstEntry = NULL;
474 PKTHREAD Thread;
475 KIRQL OldIrql;
476 ASSERT_QUEUE(Queue);
477 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
478 ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
479
480 /* Get the Dispatcher Lock */
481 OldIrql = KeAcquireDispatcherDatabaseLock();
482
483 /* Make sure the list is not empty */
484 if (!IsListEmpty(&Queue->EntryListHead))
485 {
486 /* Remove it */
487 FirstEntry = RemoveHeadList(&Queue->EntryListHead);
488 }
489
490 /* Unlink threads and clear their Thread->Queue */
491 while (!IsListEmpty(&Queue->ThreadListHead))
492 {
493 /* Get the Entry and Remove it */
494 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
495
496 /* Get the Entry's Thread */
497 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
498
499 /* Kill its Queue */
500 Thread->Queue = NULL;
501 }
502
503 /* Release the lock and return */
504 KeReleaseDispatcherDatabaseLock(OldIrql);
505 return FirstEntry;
506 }
507
508 /* EOF */