8883a35cd24c17e99f0a58ee95e86e45d1e0afb9
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 *
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
8 * Gunnar Dalsnes
9 * Eric Kohl (ekohl@rz-online.de)
10 */
11
12 /* INCLUDES *****************************************************************/
13
14 #include <ntoskrnl.h>
15 #define NDEBUG
16 #include <internal/debug.h>
17
18 /* FUNCTIONS *****************************************************************/
19
20 LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head);
21
22 /*
23 * @implemented
24 */
25 VOID
26 STDCALL
27 KeInitializeQueue(IN PKQUEUE Queue,
28 IN ULONG Count OPTIONAL)
29 {
30 DPRINT("KeInitializeQueue %x\n", Queue);
31
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue->Header,
34 QueueObject,
35 sizeof(KQUEUE)/sizeof(ULONG),
36 0);
37
38 /* Initialize the Lists */
39 InitializeListHead(&Queue->EntryListHead);
40 InitializeListHead(&Queue->ThreadListHead);
41
42 /* Set the Current and Maximum Count */
43 Queue->CurrentCount = 0;
44 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
45 }
46
47 /*
48 * @implemented
49 */
50 LONG
51 STDCALL
52 KeInsertHeadQueue(IN PKQUEUE Queue,
53 IN PLIST_ENTRY Entry)
54 {
55 LONG PreviousState;
56 KIRQL OldIrql;
57
58 DPRINT("KeInsertHeadQueue %x\n", Queue);
59
60 /* Lock the Dispatcher Database */
61 OldIrql = KeAcquireDispatcherDatabaseLock();
62
63 /* Insert the Queue */
64 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
65
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql);
68
69 /* Return previous State */
70 return PreviousState;
71 }
72
73 /*
74 * @implemented
75 */
76 LONG STDCALL
77 KeInsertQueue(IN PKQUEUE Queue,
78 IN PLIST_ENTRY Entry)
79 {
80 LONG PreviousState;
81 KIRQL OldIrql;
82
83 DPRINT("KeInsertQueue %x\n", Queue);
84
85 /* Lock the Dispatcher Database */
86 OldIrql = KeAcquireDispatcherDatabaseLock();
87
88 /* Insert the Queue */
89 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
90
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql);
93
94 /* Return previous State */
95 return PreviousState;
96 }
97
98 /*
99 * @implemented
100 *
101 * Returns number of entries in the queue
102 */
103 LONG
104 STDCALL
105 KeReadStateQueue(IN PKQUEUE Queue)
106 {
107 /* Returns the Signal State */
108 return(Queue->Header.SignalState);
109 }
110
111 /*
112 * @implemented
113 */
114 PLIST_ENTRY
115 STDCALL
116 KeRemoveQueue(IN PKQUEUE Queue,
117 IN KPROCESSOR_MODE WaitMode,
118 IN PLARGE_INTEGER Timeout OPTIONAL)
119 {
120 PLIST_ENTRY QueueEntry;
121 NTSTATUS Status;
122 PKTHREAD Thread = KeGetCurrentThread();
123 KIRQL OldIrql;
124 PKQUEUE PreviousQueue;
125 PKWAIT_BLOCK WaitBlock;
126 PKTIMER Timer;
127 DPRINT("KeRemoveQueue %x\n", Queue);
128
129 /* Check if the Lock is already held */
130 if (Thread->WaitNext)
131 {
132 DPRINT("Lock is already held\n");
133 Thread->WaitNext = FALSE;
134 }
135 else
136 {
137 /* Lock the Dispatcher Database */
138 DPRINT("Lock not held, acquiring\n");
139 OldIrql = KeAcquireDispatcherDatabaseLock();
140 Thread->WaitIrql = OldIrql;
141 }
142
143 /* This is needed so that we can set the new queue right here, before additional processing */
144 PreviousQueue = Thread->Queue;
145 Thread->Queue = Queue;
146
147 /* Check if this is a different queue */
148 if (Queue != PreviousQueue)
149 {
150 DPRINT("Different Queue\n");
151 QueueEntry = &Thread->QueueListEntry;
152 if (PreviousQueue)
153 {
154 /* Remove from this list */
155 DPRINT("Removing Old Queue\n");
156 RemoveEntryList(QueueEntry);
157
158 /* Wake the queue */
159 DPRINT("Activating new thread\n");
160 KiWakeQueue(PreviousQueue);
161 }
162
163 /* Insert in this new Queue */
164 DPRINT("Inserting new Queue!\n");
165 InsertTailList(&Queue->ThreadListHead, QueueEntry);
166 }
167 else
168 {
169 /* Same queue, decrement waiting threads */
170 DPRINT("Same Queue!\n");
171 Queue->CurrentCount--;
172 }
173
174 /* Loop until the queue is processed */
175 while (TRUE)
176 {
177 /* Check if the counts are valid and if there is still a queued entry */
178 QueueEntry = Queue->EntryListHead.Flink;
179 if ((Queue->CurrentCount < Queue->MaximumCount) &&
180 (QueueEntry != &Queue->EntryListHead))
181 {
182 /* Remove the Entry and Save it */
183 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
184 Queue->CurrentCount, Queue->MaximumCount);
185
186 /* Decrease the number of entries */
187 Queue->Header.SignalState--;
188
189 /* Increase numbef of running threads */
190 Queue->CurrentCount++;
191
192 /* Check if the entry is valid. If not, bugcheck */
193 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
194 {
195 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
196 }
197
198 /* Remove the Entry */
199 RemoveEntryList(QueueEntry);
200 QueueEntry->Flink = NULL;
201
202 /* Nothing to wait on */
203 break;
204 }
205 else
206 {
207 /* Do the wait */
208 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
209 Queue->CurrentCount, Queue->MaximumCount);
210
211 /* Use the Thread's Wait Block, it's big enough */
212 Thread->WaitBlockList = &Thread->WaitBlock[0];
213
214 /* Check if a kernel APC is pending and we were below APC_LEVEL */
215 if ((Thread->ApcState.KernelApcPending) &&
216 (Thread->WaitIrql < APC_LEVEL))
217 {
218 /* Increment the count and unlock the dispatcher */
219 Queue->CurrentCount++;
220 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
221 goto SkipWait;
222 }
223
224 /* Fail if there's a User APC Pending */
225 if ((WaitMode != KernelMode) && (Thread->ApcState.UserApcPending))
226 {
227 /* Return the status and increase the pending threads */
228 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
229 Queue->CurrentCount++;
230
231 /* Nothing to wait on */
232 break;
233 }
234
235 /* Build the Wait Block */
236 WaitBlock = &Thread->WaitBlock[0];
237 WaitBlock->Object = (PVOID)Queue;
238 WaitBlock->WaitKey = STATUS_SUCCESS;
239 WaitBlock->WaitType = WaitAny;
240 WaitBlock->Thread = Thread;
241 Thread->WaitStatus = STATUS_WAIT_0;
242
243 /* We need to wait for the object... check if we have a timeout */
244 if (Timeout)
245 {
246 /* If it's zero, then don't do any waiting */
247 if (!Timeout->QuadPart)
248 {
249 /* Instant Timeout, return the status and increase the pending threads */
250 DPRINT("Queue Wait has timed out\n");
251 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
252 Queue->CurrentCount++;
253
254 /* Nothing to wait on */
255 break;
256 }
257
258 /*
259 * Set up the Timer. We'll use the internal function so that we can
260 * hold on to the dispatcher lock.
261 */
262 Timer = &Thread->Timer;
263 WaitBlock->NextWaitBlock = &Thread->WaitBlock[1];
264 WaitBlock = &Thread->WaitBlock[1];
265
266 /* Set up the Timer Wait Block */
267 WaitBlock->Object = (PVOID)Timer;
268 WaitBlock->Thread = Thread;
269 WaitBlock->WaitKey = STATUS_TIMEOUT;
270 WaitBlock->WaitType = WaitAny;
271
272 /* Link the timer to this Wait Block */
273 InitializeListHead(&Timer->Header.WaitListHead);
274 InsertTailList(&Timer->Header.WaitListHead, &WaitBlock->WaitListEntry);
275
276 /* Create Timer */
277 DPRINT("Creating Timer with timeout %I64d\n", *Timeout);
278 KiInsertTimer(Timer, *Timeout);
279 }
280
281 /* Close the loop */
282 WaitBlock->NextWaitBlock = &Thread->WaitBlock[0];
283
284 /* Insert the wait block into the Queues's wait list */
285 WaitBlock = &Thread->WaitBlock[0];
286 InsertTailList(&Queue->Header.WaitListHead,
287 &WaitBlock->WaitListEntry);
288
289 /* Setup the wait information */
290 Thread->WaitMode = WaitMode;
291 Thread->WaitReason = WrQueue;
292 Thread->Alertable = FALSE;
293 Thread->WaitTime = 0;
294 Thread->State = Waiting;
295
296 /* Find a new thread to run */
297 DPRINT("Swapping threads\n");
298 Status = KiSwapThread();
299
300 /* Reset the wait reason */
301 Thread->WaitReason = 0;
302
303 /* Check if we were executing an APC */
304 if (Status != STATUS_KERNEL_APC)
305 {
306 /* Done Waiting */
307 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread);
308 return (PLIST_ENTRY)Status;
309 }
310
311 /* Check if we had a timeout */
312 if (Timeout)
313 {
314 /* FIXME: Fixup interval */
315 }
316
317 /* Acquire again the lock */
318 SkipWait:
319 DPRINT("Looping again\n");
320 OldIrql = KeAcquireDispatcherDatabaseLock();
321
322 /* Save the new IRQL and decrease number of waiting threads */
323 Thread->WaitIrql = OldIrql;
324 Queue->CurrentCount--;
325 }
326 }
327
328 /* Unlock Database and return */
329 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
330 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
331 Queue->CurrentCount, Queue->MaximumCount);
332 return QueueEntry;
333 }
334
335 /*
336 * @implemented
337 */
338 PLIST_ENTRY
339 STDCALL
340 KeRundownQueue(IN PKQUEUE Queue)
341 {
342 PLIST_ENTRY EnumEntry;
343 PLIST_ENTRY FirstEntry = NULL;
344 PKTHREAD Thread;
345 KIRQL OldIrql;
346
347 DPRINT("KeRundownQueue(Queue %x)\n", Queue);
348
349 /* Get the Dispatcher Lock */
350 OldIrql = KeAcquireDispatcherDatabaseLock();
351
352 /* Make sure the list is not empty */
353 if (!IsListEmpty(&Queue->EntryListHead))
354 {
355 /* Remove it */
356 FirstEntry = RemoveHeadList(&Queue->EntryListHead);
357 }
358
359 /* Unlink threads and clear their Thread->Queue */
360 while (!IsListEmpty(&Queue->ThreadListHead))
361 {
362 /* Get the Entry and Remove it */
363 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
364
365 /* Get the Entry's Thread */
366 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
367
368 /* Kill its Queue */
369 Thread->Queue = NULL;
370 }
371
372 /* Release the lock and return */
373 KeReleaseDispatcherDatabaseLock(OldIrql);
374 return FirstEntry;
375 }
376
377 /*
378 * Called when a thread which has a queue entry is entering a wait state
379 */
380 VOID
381 FASTCALL
382 KiWakeQueue(IN PKQUEUE Queue)
383 {
384 PLIST_ENTRY QueueEntry;
385 PLIST_ENTRY WaitEntry;
386 PKWAIT_BLOCK WaitBlock;
387 PKTHREAD Thread;
388
389 /* Decrement the number of active threads */
390 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue, KeGetCurrentThread());
391 Queue->CurrentCount--;
392
393 /* Make sure the counts are OK */
394 if (Queue->CurrentCount < Queue->MaximumCount)
395 {
396 /* Get the Queue Entry */
397 QueueEntry = Queue->EntryListHead.Flink;
398
399 /* Get the Wait Entry */
400 WaitEntry = Queue->Header.WaitListHead.Blink;
401 DPRINT("Queue Count is ok; entries: %p, %p\n", QueueEntry, WaitEntry);
402
403 /* Make sure that the Queue entries are not part of empty lists */
404 if ((WaitEntry != &Queue->Header.WaitListHead) &&
405 (QueueEntry != &Queue->EntryListHead))
406 {
407 /* Remove this entry */
408 DPRINT("Queue in List, removing it\n");
409 RemoveEntryList(QueueEntry);
410 QueueEntry->Flink = NULL;
411
412 /* Decrease the Signal State */
413 Queue->Header.SignalState--;
414
415 /* Unwait the Thread */
416 WaitBlock = CONTAINING_RECORD(WaitEntry,
417 KWAIT_BLOCK,
418 WaitListEntry);
419 Thread = WaitBlock->Thread;
420 DPRINT1("Unwaiting Thread: %d\n", Thread->State);
421 KiAbortWaitThread(Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
422 }
423 }
424 }
425
426 /*
427 * Returns the previous number of entries in the queue
428 */
429 LONG
430 STDCALL
431 KiInsertQueue(IN PKQUEUE Queue,
432 IN PLIST_ENTRY Entry,
433 BOOLEAN Head)
434 {
435 ULONG InitialState;
436 PKTHREAD Thread = KeGetCurrentThread();
437 PKWAIT_BLOCK WaitBlock;
438 PLIST_ENTRY WaitEntry;
439 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
440
441 /* Save the old state */
442 InitialState = Queue->Header.SignalState;
443
444 /* Get the Entry */
445 WaitEntry = Queue->Header.WaitListHead.Blink;
446 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState, WaitEntry);
447
448 /*
449 * Why the KeGetCurrentThread()->Queue != Queue?
450 * KiInsertQueue might be called from an APC for the current thread.
451 * -Gunnar
452 */
453 if ((Queue->CurrentCount < Queue->MaximumCount) &&
454 (WaitEntry != &Queue->Header.WaitListHead) &&
455 ((Thread->Queue != Queue) || (Thread->WaitReason != WrQueue)))
456 {
457 /* Remove the wait entry */
458 DPRINT("Removing Entry\n");
459 RemoveEntryList(WaitEntry);
460
461 /* Get the Wait Block and Thread */
462 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
463 DPRINT("Got wait block: %x\n", WaitBlock);
464 Thread = WaitBlock->Thread;
465
466 /* Reset the wait reason */
467 Thread->WaitReason = 0;
468
469 /* Increase the active threads and set the status*/
470 Queue->CurrentCount++;
471 Thread->WaitStatus = (NTSTATUS)Entry;
472
473 /* Remove the thread from its wait list */
474 RemoveEntryList(&Thread->WaitListEntry);
475
476 /* Check if there's a Thread Timer */
477 if (Thread->Timer.Header.Inserted)
478 {
479 /* Cancel the Thread Timer with the no-lock fastpath */
480 DPRINT("Removing the Thread's Timer\n");
481 Thread->Timer.Header.Inserted = FALSE;
482 RemoveEntryList(&Thread->Timer.TimerListEntry);
483 }
484
485 /* Reschedule the Thread */
486 DPRINT("Unblocking the Thread\n");
487 KiUnblockThread(Thread, (PNTSTATUS)&Entry, 0);
488 }
489 else
490 {
491 /* Increase the Entries */
492 DPRINT("Adding new Queue Entry: %d %d\n", Head, Queue->Header.SignalState);
493 Queue->Header.SignalState++;
494
495 /* Check which mode we're using */
496 if (Head)
497 {
498 /* Insert in the head */
499 InsertHeadList(&Queue->EntryListHead, Entry);
500 }
501 else
502 {
503 /* Insert at the end */
504 InsertTailList(&Queue->EntryListHead, Entry);
505 }
506 }
507
508 /* Return the previous state */
509 DPRINT("Returning\n");
510 return InitialState;
511 }
512
513 /* EOF */