- Fix utterly messed up unblocking/readying thread logic.
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * PROJECT: ReactOS Kernel
3 * LICENSE: GPL - See COPYING in the top level directory
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 * PROGRAMMERS: Alex Ionescu (alex.ionescu@reactos.org)
7 * Gunnar Dalsnes
8 * Eric Kohl (ekohl@rz-online.de)
9 */
10
11 /* INCLUDES ******************************************************************/
12
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <internal/debug.h>
16
17 /* PRIVATE FUNCTIONS *********************************************************/
18
19 /*
20 * Called when a thread which has a queue entry is entering a wait state
21 */
22 VOID
23 FASTCALL
24 KiWakeQueue(IN PKQUEUE Queue)
25 {
26 PLIST_ENTRY QueueEntry;
27 PLIST_ENTRY WaitEntry;
28 PKWAIT_BLOCK WaitBlock;
29 PKTHREAD Thread;
30 ASSERT_QUEUE(Queue);
31
32 /* Decrement the number of active threads */
33 Queue->CurrentCount--;
34
35 /* Make sure the counts are OK */
36 if (Queue->CurrentCount < Queue->MaximumCount)
37 {
38 /* Get the Queue Entry */
39 QueueEntry = Queue->EntryListHead.Flink;
40
41 /* Get the Wait Entry */
42 WaitEntry = Queue->Header.WaitListHead.Blink;
43
44 /* Make sure that the Queue entries are not part of empty lists */
45 if ((WaitEntry != &Queue->Header.WaitListHead) &&
46 (QueueEntry != &Queue->EntryListHead))
47 {
48 /* Remove this entry */
49 RemoveEntryList(QueueEntry);
50 QueueEntry->Flink = NULL;
51
52 /* Decrease the Signal State */
53 Queue->Header.SignalState--;
54
55 /* Unwait the Thread */
56 WaitBlock = CONTAINING_RECORD(WaitEntry,
57 KWAIT_BLOCK,
58 WaitListEntry);
59 Thread = WaitBlock->Thread;
60 KiAbortWaitThread(Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
61 }
62 }
63 }
64
65 /*
66 * Returns the previous number of entries in the queue
67 */
68 LONG
69 NTAPI
70 KiInsertQueue(IN PKQUEUE Queue,
71 IN PLIST_ENTRY Entry,
72 BOOLEAN Head)
73 {
74 ULONG InitialState;
75 PKTHREAD Thread = KeGetCurrentThread();
76 PKWAIT_BLOCK WaitBlock;
77 PLIST_ENTRY WaitEntry;
78 ASSERT_QUEUE(Queue);
79
80 /* Save the old state */
81 InitialState = Queue->Header.SignalState;
82
83 /* Get the Entry */
84 WaitEntry = Queue->Header.WaitListHead.Blink;
85
86 /*
87 * Why the KeGetCurrentThread()->Queue != Queue?
88 * KiInsertQueue might be called from an APC for the current thread.
89 * -Gunnar
90 */
91 if ((Queue->CurrentCount < Queue->MaximumCount) &&
92 (WaitEntry != &Queue->Header.WaitListHead) &&
93 ((Thread->Queue != Queue) ||
94 (Thread->WaitReason != WrQueue)))
95 {
96 /* Remove the wait entry */
97 RemoveEntryList(WaitEntry);
98
99 /* Get the Wait Block and Thread */
100 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
101 Thread = WaitBlock->Thread;
102
103 /* Remove the queue from the thread's wait list */
104 Thread->WaitStatus = (NTSTATUS)Entry;
105 RemoveEntryList(&Thread->WaitListEntry);
106 Thread->WaitReason = 0;
107
108 /* Increase the active threads and set the status*/
109 Queue->CurrentCount++;
110
111 /* Check if there's a Thread Timer */
112 if (Thread->Timer.Header.Inserted)
113 {
114 /* Cancel the Thread Timer with the no-lock fastpath */
115 Thread->Timer.Header.Inserted = FALSE;
116 RemoveEntryList(&Thread->Timer.TimerListEntry);
117 }
118
119 /* Reschedule the Thread */
120 KiReadyThread(Thread);
121 }
122 else
123 {
124 /* Increase the Entries */
125 Queue->Header.SignalState++;
126
127 /* Check which mode we're using */
128 if (Head)
129 {
130 /* Insert in the head */
131 InsertHeadList(&Queue->EntryListHead, Entry);
132 }
133 else
134 {
135 /* Insert at the end */
136 InsertTailList(&Queue->EntryListHead, Entry);
137 }
138 }
139
140 /* Return the previous state */
141 return InitialState;
142 }
143
144 /* PUBLIC FUNCTIONS **********************************************************/
145
146 /*
147 * @implemented
148 */
149 VOID
150 NTAPI
151 KeInitializeQueue(IN PKQUEUE Queue,
152 IN ULONG Count OPTIONAL)
153 {
154 /* Initialize the Header */
155 KeInitializeDispatcherHeader(&Queue->Header,
156 QueueObject,
157 sizeof(KQUEUE) / sizeof(ULONG),
158 0);
159
160 /* Initialize the Lists */
161 InitializeListHead(&Queue->EntryListHead);
162 InitializeListHead(&Queue->ThreadListHead);
163
164 /* Set the Current and Maximum Count */
165 Queue->CurrentCount = 0;
166 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
167 }
168
169 /*
170 * @implemented
171 */
172 LONG
173 NTAPI
174 KeInsertHeadQueue(IN PKQUEUE Queue,
175 IN PLIST_ENTRY Entry)
176 {
177 LONG PreviousState;
178 KIRQL OldIrql;
179 ASSERT_QUEUE(Queue);
180 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
181
182 /* Lock the Dispatcher Database */
183 OldIrql = KeAcquireDispatcherDatabaseLock();
184
185 /* Insert the Queue */
186 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
187
188 /* Release the Dispatcher Lock */
189 KeReleaseDispatcherDatabaseLock(OldIrql);
190
191 /* Return previous State */
192 return PreviousState;
193 }
194
195 /*
196 * @implemented
197 */
198 LONG
199 NTAPI
200 KeInsertQueue(IN PKQUEUE Queue,
201 IN PLIST_ENTRY Entry)
202 {
203 LONG PreviousState;
204 KIRQL OldIrql;
205 ASSERT_QUEUE(Queue);
206 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
207
208 /* Lock the Dispatcher Database */
209 OldIrql = KeAcquireDispatcherDatabaseLock();
210
211 /* Insert the Queue */
212 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
213
214 /* Release the Dispatcher Lock */
215 KeReleaseDispatcherDatabaseLock(OldIrql);
216
217 /* Return previous State */
218 return PreviousState;
219 }
220
221 /*
222 * @implemented
223 *
224 * Returns number of entries in the queue
225 */
226 LONG
227 NTAPI
228 KeReadStateQueue(IN PKQUEUE Queue)
229 {
230 /* Returns the Signal State */
231 ASSERT_QUEUE(Queue);
232 return Queue->Header.SignalState;
233 }
234
235 /*
236 * @implemented
237 */
238 PLIST_ENTRY
239 NTAPI
240 KeRemoveQueue(IN PKQUEUE Queue,
241 IN KPROCESSOR_MODE WaitMode,
242 IN PLARGE_INTEGER Timeout OPTIONAL)
243 {
244 PLIST_ENTRY QueueEntry;
245 NTSTATUS Status;
246 PKTHREAD Thread = KeGetCurrentThread();
247 KIRQL OldIrql;
248 PKQUEUE PreviousQueue;
249 PKWAIT_BLOCK WaitBlock;
250 PKTIMER Timer;
251 ASSERT_QUEUE(Queue);
252 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
253
254 /* Check if the Lock is already held */
255 if (Thread->WaitNext)
256 {
257 /* It is, so next time don't do expect this */
258 Thread->WaitNext = FALSE;
259 }
260 else
261 {
262 /* Lock the Dispatcher Database */
263 OldIrql = KeAcquireDispatcherDatabaseLock();
264 Thread->WaitIrql = OldIrql;
265 }
266
267 /*
268 * This is needed so that we can set the new queue right here,
269 * before additional processing
270 */
271 PreviousQueue = Thread->Queue;
272 Thread->Queue = Queue;
273
274 /* Check if this is a different queue */
275 if (Queue != PreviousQueue)
276 {
277 /* Get the current entry */
278 QueueEntry = &Thread->QueueListEntry;
279 if (PreviousQueue)
280 {
281 /* Remove from this list */
282 RemoveEntryList(QueueEntry);
283
284 /* Wake the queue */
285 KiWakeQueue(PreviousQueue);
286 }
287
288 /* Insert in this new Queue */
289 InsertTailList(&Queue->ThreadListHead, QueueEntry);
290 }
291 else
292 {
293 /* Same queue, decrement waiting threads */
294 Queue->CurrentCount--;
295 }
296
297 /* Loop until the queue is processed */
298 while (TRUE)
299 {
300 /* Check if the counts are valid and if there is still a queued entry */
301 QueueEntry = Queue->EntryListHead.Flink;
302 if ((Queue->CurrentCount < Queue->MaximumCount) &&
303 (QueueEntry != &Queue->EntryListHead))
304 {
305 /* Decrease the number of entries */
306 Queue->Header.SignalState--;
307
308 /* Increase numbef of running threads */
309 Queue->CurrentCount++;
310
311 /* Check if the entry is valid. If not, bugcheck */
312 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
313 {
314 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
315 }
316
317 /* Remove the Entry */
318 RemoveEntryList(QueueEntry);
319 QueueEntry->Flink = NULL;
320
321 /* Nothing to wait on */
322 break;
323 }
324 else
325 {
326 /* Use the Thread's Wait Block, it's big enough */
327 Thread->WaitBlockList = &Thread->WaitBlock[0];
328
329 /* Check if a kernel APC is pending and we're below APC_LEVEL */
330 if ((Thread->ApcState.KernelApcPending) &&
331 !(Thread->SpecialApcDisable) && (Thread->WaitIrql < APC_LEVEL))
332 {
333 /* Increment the count and unlock the dispatcher */
334 Queue->CurrentCount++;
335 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
336 }
337 else
338 {
339 /* Fail if there's a User APC Pending */
340 if ((WaitMode != KernelMode) &&
341 (Thread->ApcState.UserApcPending))
342 {
343 /* Return the status and increase the pending threads */
344 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
345 Queue->CurrentCount++;
346 break;
347 }
348
349 /* Build the Wait Block */
350 WaitBlock = &Thread->WaitBlock[0];
351 WaitBlock->Object = (PVOID)Queue;
352 WaitBlock->WaitKey = STATUS_SUCCESS;
353 WaitBlock->WaitType = WaitAny;
354 WaitBlock->Thread = Thread;
355 Thread->WaitStatus = STATUS_WAIT_0;
356
357 /* We need to wait for the object... check for a timeout */
358 if (Timeout)
359 {
360 /* Check if it's zero */
361 if (!Timeout->QuadPart)
362 {
363 /* Don't wait. Return and increase pending threads */
364 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
365 Queue->CurrentCount++;
366 break;
367 }
368
369 /*
370 * Set up the Timer. We'll use the internal function so
371 * that we can hold on to the dispatcher lock.
372 */
373 Timer = &Thread->Timer;
374 WaitBlock->NextWaitBlock = &Thread->WaitBlock[1];
375 WaitBlock = &Thread->WaitBlock[1];
376
377 /* Set up the Timer Wait Block */
378 WaitBlock->Object = (PVOID)Timer;
379 WaitBlock->Thread = Thread;
380 WaitBlock->WaitKey = STATUS_TIMEOUT;
381 WaitBlock->WaitType = WaitAny;
382
383 /* Link the timer to this Wait Block */
384 Timer->Header.WaitListHead.Flink =
385 &WaitBlock->WaitListEntry;
386 Timer->Header.WaitListHead.Blink =
387 &WaitBlock->WaitListEntry;
388
389 /* Create Timer */
390 KiInsertTimer(Timer, *Timeout);
391 }
392
393 /* Close the loop */
394 WaitBlock->NextWaitBlock = &Thread->WaitBlock[0];
395
396 /* Insert the wait block into the Queues's wait list */
397 WaitBlock = &Thread->WaitBlock[0];
398 InsertTailList(&Queue->Header.WaitListHead,
399 &WaitBlock->WaitListEntry);
400
401 /* Setup the wait information */
402 Thread->WaitMode = WaitMode;
403 Thread->WaitReason = WrQueue;
404 Thread->Alertable = FALSE;
405 Thread->WaitTime = ((PLARGE_INTEGER)&KeTickCount)->LowPart;
406 Thread->State = Waiting;
407
408 /* Find a new thread to run */
409 Status = KiSwapThread();
410
411 /* Reset the wait reason */
412 Thread->WaitReason = 0;
413
414 /* Check if we were executing an APC */
415 if (Status != STATUS_KERNEL_APC)
416 {
417 /* Done Waiting */
418 return (PLIST_ENTRY)Status;
419 }
420
421 /* Check if we had a timeout */
422 if (Timeout)
423 {
424 /* FIXME: Fixup interval */
425 DPRINT1("FIXME!!!\n");
426 }
427 }
428
429 /* Reacquire the lock */
430 OldIrql = KeAcquireDispatcherDatabaseLock();
431
432 /* Save the new IRQL and decrease number of waiting threads */
433 Thread->WaitIrql = OldIrql;
434 Queue->CurrentCount--;
435 }
436 }
437
438 /* Unlock Database and return */
439 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
440 return QueueEntry;
441 }
442
443 /*
444 * @implemented
445 */
446 PLIST_ENTRY
447 NTAPI
448 KeRundownQueue(IN PKQUEUE Queue)
449 {
450 PLIST_ENTRY EnumEntry;
451 PLIST_ENTRY FirstEntry = NULL;
452 PKTHREAD Thread;
453 KIRQL OldIrql;
454 ASSERT_QUEUE(Queue);
455 ASSERT_IRQL_LESS_OR_EQUAL(DISPATCH_LEVEL);
456 ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
457
458 /* Get the Dispatcher Lock */
459 OldIrql = KeAcquireDispatcherDatabaseLock();
460
461 /* Make sure the list is not empty */
462 if (!IsListEmpty(&Queue->EntryListHead))
463 {
464 /* Remove it */
465 FirstEntry = RemoveHeadList(&Queue->EntryListHead);
466 }
467
468 /* Unlink threads and clear their Thread->Queue */
469 while (!IsListEmpty(&Queue->ThreadListHead))
470 {
471 /* Get the Entry and Remove it */
472 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
473
474 /* Get the Entry's Thread */
475 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
476
477 /* Kill its Queue */
478 Thread->Queue = NULL;
479 }
480
481 /* Release the lock and return */
482 KeReleaseDispatcherDatabaseLock(OldIrql);
483 return FirstEntry;
484 }
485
486 /* EOF */