53d451a50c68071f321b30336c4e8be87943275d
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 *
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
8 * Gunnar Dalsnes
9 * Eric Kohl (ekohl@rz-online.de)
10 */
11
12 /* INCLUDES *****************************************************************/
13
14 #include <ntoskrnl.h>
15 #define NDEBUG
16 #include <internal/debug.h>
17
18 /* FUNCTIONS *****************************************************************/
19
20 LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head);
21
22 /*
23 * @implemented
24 */
25 VOID
26 STDCALL
27 KeInitializeQueue(IN PKQUEUE Queue,
28 IN ULONG Count OPTIONAL)
29 {
30 DPRINT("KeInitializeQueue %x\n", Queue);
31
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue->Header,
34 QueueObject,
35 sizeof(KQUEUE)/sizeof(ULONG),
36 0);
37
38 /* Initialize the Lists */
39 InitializeListHead(&Queue->EntryListHead);
40 InitializeListHead(&Queue->ThreadListHead);
41
42 /* Set the Current and Maximum Count */
43 Queue->CurrentCount = 0;
44 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
45 }
46
47 /*
48 * @implemented
49 */
50 LONG
51 STDCALL
52 KeInsertHeadQueue(IN PKQUEUE Queue,
53 IN PLIST_ENTRY Entry)
54 {
55 LONG PreviousState;
56 KIRQL OldIrql;
57
58 DPRINT("KeInsertHeadQueue %x\n", Queue);
59
60 /* Lock the Dispatcher Database */
61 OldIrql = KeAcquireDispatcherDatabaseLock();
62
63 /* Insert the Queue */
64 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
65
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql);
68
69 /* Return previous State */
70 return PreviousState;
71 }
72
73 /*
74 * @implemented
75 */
76 LONG STDCALL
77 KeInsertQueue(IN PKQUEUE Queue,
78 IN PLIST_ENTRY Entry)
79 {
80 LONG PreviousState;
81 KIRQL OldIrql;
82
83 DPRINT("KeInsertQueue %x\n", Queue);
84
85 /* Lock the Dispatcher Database */
86 OldIrql = KeAcquireDispatcherDatabaseLock();
87
88 /* Insert the Queue */
89 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
90
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql);
93
94 /* Return previous State */
95 return PreviousState;
96 }
97
98 /*
99 * @implemented
100 *
101 * Returns number of entries in the queue
102 */
103 LONG
104 STDCALL
105 KeReadStateQueue(IN PKQUEUE Queue)
106 {
107 /* Returns the Signal State */
108 return(Queue->Header.SignalState);
109 }
110
111 /*
112 * @implemented
113 */
114 PLIST_ENTRY
115 STDCALL
116 KeRemoveQueue(IN PKQUEUE Queue,
117 IN KPROCESSOR_MODE WaitMode,
118 IN PLARGE_INTEGER Timeout OPTIONAL)
119 {
120 PLIST_ENTRY QueueEntry;
121 NTSTATUS Status;
122 PKTHREAD Thread = KeGetCurrentThread();
123 KIRQL OldIrql;
124 PKQUEUE PreviousQueue;
125 PKWAIT_BLOCK WaitBlock;
126 PKTIMER Timer;
127 DPRINT("KeRemoveQueue %x\n", Queue);
128
129 /* Check if the Lock is already held */
130 if (Thread->WaitNext)
131 {
132 DPRINT("Lock is already held\n");
133 Thread->WaitNext = FALSE;
134 }
135 else
136 {
137 /* Lock the Dispatcher Database */
138 DPRINT("Lock not held, acquiring\n");
139 OldIrql = KeAcquireDispatcherDatabaseLock();
140 Thread->WaitIrql = OldIrql;
141 }
142
143 /* This is needed so that we can set the new queue right here, before additional processing */
144 PreviousQueue = Thread->Queue;
145 Thread->Queue = Queue;
146
147 /* Check if this is a different queue */
148 if (Queue != PreviousQueue)
149 {
150 DPRINT("Different Queue\n");
151 QueueEntry = &Thread->QueueListEntry;
152 if (PreviousQueue)
153 {
154 /* Remove from this list */
155 DPRINT("Removing Old Queue\n");
156 RemoveEntryList(QueueEntry);
157
158 /* Wake the queue */
159 DPRINT("Activating new thread\n");
160 KiWakeQueue(PreviousQueue);
161 }
162
163 /* Insert in this new Queue */
164 DPRINT("Inserting new Queue!\n");
165 InsertTailList(&Queue->ThreadListHead, QueueEntry);
166 }
167 else
168 {
169 /* Same queue, decrement waiting threads */
170 DPRINT("Same Queue!\n");
171 Queue->CurrentCount--;
172 }
173
174 /* Loop until the queue is processed */
175 while (TRUE)
176 {
177 /* Check if the counts are valid and if there is still a queued entry */
178 QueueEntry = Queue->EntryListHead.Flink;
179 if ((Queue->CurrentCount < Queue->MaximumCount) &&
180 (QueueEntry != &Queue->EntryListHead))
181 {
182 /* Remove the Entry and Save it */
183 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
184 Queue->CurrentCount, Queue->MaximumCount);
185
186 /* Decrease the number of entries */
187 Queue->Header.SignalState--;
188
189 /* Increase numbef of running threads */
190 Queue->CurrentCount++;
191
192 /* Check if the entry is valid. If not, bugcheck */
193 if (!(QueueEntry->Flink) || !(QueueEntry->Blink))
194 {
195 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
196 }
197
198 /* Remove the Entry */
199 RemoveEntryList(QueueEntry);
200 QueueEntry->Flink = NULL;
201
202 /* Nothing to wait on */
203 break;
204 }
205 else
206 {
207 /* Do the wait */
208 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
209 Queue->CurrentCount, Queue->MaximumCount);
210
211 /* Use the Thread's Wait Block, it's big enough */
212 Thread->WaitBlockList = &Thread->WaitBlock[0];
213
214 /* Check if a kernel APC is pending and we were below APC_LEVEL */
215 if ((Thread->ApcState.KernelApcPending) &&
216 (Thread->WaitIrql < APC_LEVEL))
217 {
218 /* Increment the count and unlock the dispatcher */
219 Queue->CurrentCount++;
220 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
221 goto SkipWait;
222 }
223
224 /* Fail if there's a User APC Pending */
225 if ((WaitMode != KernelMode) && (Thread->ApcState.UserApcPending))
226 {
227 /* Return the status and increase the pending threads */
228 QueueEntry = (PLIST_ENTRY)STATUS_USER_APC;
229 Queue->CurrentCount++;
230
231 /* Nothing to wait on */
232 break;
233 }
234
235 /* Build the Wait Block */
236 WaitBlock = &Thread->WaitBlock[0];
237 WaitBlock->Object = (PVOID)Queue;
238 WaitBlock->WaitKey = STATUS_SUCCESS;
239 WaitBlock->WaitType = WaitAny;
240 WaitBlock->Thread = Thread;
241 Thread->WaitStatus = STATUS_WAIT_0;
242
243 /* We need to wait for the object... check if we have a timeout */
244 if (Timeout)
245 {
246 /* If it's zero, then don't do any waiting */
247 if (!Timeout->QuadPart)
248 {
249 /* Instant Timeout, return the status and increase the pending threads */
250 DPRINT("Queue Wait has timed out\n");
251 QueueEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
252 Queue->CurrentCount++;
253
254 /* Nothing to wait on */
255 break;
256 }
257
258 /*
259 * Set up the Timer. We'll use the internal function so that we can
260 * hold on to the dispatcher lock.
261 */
262 Timer = &Thread->Timer;
263 WaitBlock->NextWaitBlock = &Thread->WaitBlock[1];
264 WaitBlock = &Thread->WaitBlock[1];
265
266 /* Set up the Timer Wait Block */
267 WaitBlock->Object = (PVOID)Timer;
268 WaitBlock->Thread = Thread;
269 WaitBlock->WaitKey = STATUS_TIMEOUT;
270 WaitBlock->WaitType = WaitAny;
271
272 /* Link the timer to this Wait Block */
273 InitializeListHead(&Timer->Header.WaitListHead);
274 InsertTailList(&Timer->Header.WaitListHead, &WaitBlock->WaitListEntry);
275
276 /* Create Timer */
277 DPRINT("Creating Timer with timeout %I64d\n", *Timeout);
278 KiInsertTimer(Timer, *Timeout);
279 }
280
281 /* Close the loop */
282 WaitBlock->NextWaitBlock = &Thread->WaitBlock[0];
283
284 /* Insert the wait block into the Queues's wait list */
285 WaitBlock = &Thread->WaitBlock[0];
286 InsertTailList(&Queue->Header.WaitListHead,
287 &WaitBlock->WaitListEntry);
288
289 /* Block the Thread */
290 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread);
291 KiBlockThread(&Status,
292 FALSE,
293 WaitMode,
294 WrQueue);
295
296 /* Reset the wait reason */
297 Thread->WaitReason = 0;
298
299 /* Check if we were executing an APC */
300 if (Status != STATUS_KERNEL_APC)
301 {
302 /* Done Waiting */
303 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread);
304 return (PLIST_ENTRY)Status;
305 }
306
307 /* Check if we had a timeout */
308 if (Timeout)
309 {
310 /* FIXME: Fixup interval */
311 }
312
313 /* Acquire again the lock */
314 SkipWait:
315 DPRINT("Looping again\n");
316 OldIrql = KeAcquireDispatcherDatabaseLock();
317
318 /* Save the new IRQL and decrease number of waiting threads */
319 Thread->WaitIrql = OldIrql;
320 Queue->CurrentCount--;
321 }
322 }
323
324 /* Unlock Database and return */
325 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
326 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
327 Queue->CurrentCount, Queue->MaximumCount);
328 return QueueEntry;
329 }
330
331 /*
332 * @implemented
333 */
334 PLIST_ENTRY
335 STDCALL
336 KeRundownQueue(IN PKQUEUE Queue)
337 {
338 PLIST_ENTRY EnumEntry;
339 PLIST_ENTRY FirstEntry = NULL;
340 PKTHREAD Thread;
341 KIRQL OldIrql;
342
343 DPRINT("KeRundownQueue(Queue %x)\n", Queue);
344
345 /* Get the Dispatcher Lock */
346 OldIrql = KeAcquireDispatcherDatabaseLock();
347
348 /* Make sure the list is not empty */
349 if (!IsListEmpty(&Queue->EntryListHead))
350 {
351 /* Remove it */
352 FirstEntry = RemoveHeadList(&Queue->EntryListHead);
353 }
354
355 /* Unlink threads and clear their Thread->Queue */
356 while (!IsListEmpty(&Queue->ThreadListHead))
357 {
358 /* Get the Entry and Remove it */
359 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
360
361 /* Get the Entry's Thread */
362 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
363
364 /* Kill its Queue */
365 Thread->Queue = NULL;
366 }
367
368 /* Release the lock and return */
369 KeReleaseDispatcherDatabaseLock(OldIrql);
370 return FirstEntry;
371 }
372
373 /*
374 * Called when a thread which has a queue entry is entering a wait state
375 */
376 VOID
377 FASTCALL
378 KiWakeQueue(IN PKQUEUE Queue)
379 {
380 PLIST_ENTRY QueueEntry;
381 PLIST_ENTRY WaitEntry;
382 PKWAIT_BLOCK WaitBlock;
383 PKTHREAD Thread;
384
385 /* Decrement the number of active threads */
386 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue, KeGetCurrentThread());
387 Queue->CurrentCount--;
388
389 /* Make sure the counts are OK */
390 if (Queue->CurrentCount < Queue->MaximumCount)
391 {
392 /* Get the Queue Entry */
393 QueueEntry = Queue->EntryListHead.Flink;
394
395 /* Get the Wait Entry */
396 WaitEntry = Queue->Header.WaitListHead.Blink;
397 DPRINT("Queue Count is ok; entries: %p, %p\n", QueueEntry, WaitEntry);
398
399 /* Make sure that the Queue entries are not part of empty lists */
400 if ((WaitEntry != &Queue->Header.WaitListHead) &&
401 (QueueEntry != &Queue->EntryListHead))
402 {
403 /* Remove this entry */
404 DPRINT("Queue in List, removing it\n");
405 RemoveEntryList(QueueEntry);
406 QueueEntry->Flink = NULL;
407
408 /* Decrease the Signal State */
409 Queue->Header.SignalState--;
410
411 /* Unwait the Thread */
412 WaitBlock = CONTAINING_RECORD(WaitEntry,
413 KWAIT_BLOCK,
414 WaitListEntry);
415 Thread = WaitBlock->Thread;
416 DPRINT1("Unwaiting Thread: %d\n", Thread->State);
417 KiAbortWaitThread(Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
418 }
419 }
420 }
421
422 /*
423 * Returns the previous number of entries in the queue
424 */
425 LONG
426 STDCALL
427 KiInsertQueue(IN PKQUEUE Queue,
428 IN PLIST_ENTRY Entry,
429 BOOLEAN Head)
430 {
431 ULONG InitialState;
432 PKTHREAD Thread = KeGetCurrentThread();
433 PKWAIT_BLOCK WaitBlock;
434 PLIST_ENTRY WaitEntry;
435 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
436
437 /* Save the old state */
438 InitialState = Queue->Header.SignalState;
439
440 /* Get the Entry */
441 WaitEntry = Queue->Header.WaitListHead.Blink;
442 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState, WaitEntry);
443
444 /*
445 * Why the KeGetCurrentThread()->Queue != Queue?
446 * KiInsertQueue might be called from an APC for the current thread.
447 * -Gunnar
448 */
449 if ((Queue->CurrentCount < Queue->MaximumCount) &&
450 (WaitEntry != &Queue->Header.WaitListHead) &&
451 ((Thread->Queue != Queue) || (Thread->WaitReason != WrQueue)))
452 {
453 /* Remove the wait entry */
454 DPRINT("Removing Entry\n");
455 RemoveEntryList(WaitEntry);
456
457 /* Get the Wait Block and Thread */
458 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
459 DPRINT("Got wait block: %x\n", WaitBlock);
460 Thread = WaitBlock->Thread;
461
462 /* Reset the wait reason */
463 Thread->WaitReason = 0;
464
465 /* Increase the active threads and set the status*/
466 Queue->CurrentCount++;
467 Thread->WaitStatus = (NTSTATUS)Entry;
468
469 /* Remove the thread from its wait list */
470 RemoveEntryList(&Thread->WaitListEntry);
471
472 /* Check if there's a Thread Timer */
473 if (Thread->Timer.Header.Inserted)
474 {
475 /* Cancel the Thread Timer with the no-lock fastpath */
476 DPRINT("Removing the Thread's Timer\n");
477 Thread->Timer.Header.Inserted = FALSE;
478 RemoveEntryList(&Thread->Timer.TimerListEntry);
479 }
480
481 /* Reschedule the Thread */
482 DPRINT("Unblocking the Thread\n");
483 KiUnblockThread(Thread, (PNTSTATUS)&Entry, 0);
484 }
485 else
486 {
487 /* Increase the Entries */
488 DPRINT("Adding new Queue Entry: %d %d\n", Head, Queue->Header.SignalState);
489 Queue->Header.SignalState++;
490
491 /* Check which mode we're using */
492 if (Head)
493 {
494 /* Insert in the head */
495 InsertHeadList(&Queue->EntryListHead, Entry);
496 }
497 else
498 {
499 /* Insert at the end */
500 InsertTailList(&Queue->EntryListHead, Entry);
501 }
502 }
503
504 /* Return the previous state */
505 DPRINT("Returning\n");
506 return InitialState;
507 }
508
509 /* EOF */