Dispatching & Queue Rewrite II:
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /* $Id$
2 *
3 * COPYRIGHT: See COPYING in the top level directory
4 * PROJECT: ReactOS kernel
5 * FILE: ntoskrnl/ke/queue.c
6 * PURPOSE: Implements kernel queues
7 *
8 * PROGRAMMERS: Eric Kohl (ekohl@rz-online.de)
9 */
10
11 /* INCLUDES *****************************************************************/
12
13 #include <ntoskrnl.h>
14 #define NDEBUG
15 #include <internal/debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head);
20
21 /*
22 * @implemented
23 */
24 VOID
25 STDCALL
26 KeInitializeQueue(IN PKQUEUE Queue,
27 IN ULONG Count OPTIONAL)
28 {
29 DPRINT("KeInitializeQueue %x\n", Queue);
30
31 /* Initialize the Header */
32 KeInitializeDispatcherHeader(&Queue->Header,
33 QueueObject,
34 sizeof(KQUEUE)/sizeof(ULONG),
35 0);
36
37 /* Initialize the Lists */
38 InitializeListHead(&Queue->EntryListHead);
39 InitializeListHead(&Queue->ThreadListHead);
40
41 /* Set the Current and Maximum Count */
42 Queue->CurrentCount = 0;
43 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
44 }
45
46 /*
47 * @implemented
48 */
49 LONG
50 STDCALL
51 KeInsertHeadQueue(IN PKQUEUE Queue,
52 IN PLIST_ENTRY Entry)
53 {
54 LONG PreviousState;
55 KIRQL OldIrql;
56
57 DPRINT("KeInsertHeadQueue %x\n", Queue);
58
59 /* Lock the Dispatcher Database */
60 OldIrql = KeAcquireDispatcherDatabaseLock();
61
62 /* Insert the Queue */
63 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
64
65 /* Release the Dispatcher Lock */
66 KeReleaseDispatcherDatabaseLock(OldIrql);
67
68 /* Return previous State */
69 return PreviousState;
70 }
71
72 /*
73 * @implemented
74 */
75 LONG STDCALL
76 KeInsertQueue(IN PKQUEUE Queue,
77 IN PLIST_ENTRY Entry)
78 {
79 LONG PreviousState;
80 KIRQL OldIrql;
81
82 DPRINT("KeInsertQueue %x\n", Queue);
83
84 /* Lock the Dispatcher Database */
85 OldIrql = KeAcquireDispatcherDatabaseLock();
86
87 /* Insert the Queue */
88 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
89
90 /* Release the Dispatcher Lock */
91 KeReleaseDispatcherDatabaseLock(OldIrql);
92
93 /* Return previous State */
94 return PreviousState;
95 }
96
97 /*
98 * @implemented
99 *
100 * Returns number of entries in the queue
101 */
102 LONG
103 STDCALL
104 KeReadStateQueue(IN PKQUEUE Queue)
105 {
106 /* Returns the Signal State */
107 return(Queue->Header.SignalState);
108 }
109
110 /*
111 * @implemented
112 */
113 PLIST_ENTRY
114 STDCALL
115 KeRemoveQueue(IN PKQUEUE Queue,
116 IN KPROCESSOR_MODE WaitMode,
117 IN PLARGE_INTEGER Timeout OPTIONAL)
118 {
119
120 PLIST_ENTRY ListEntry;
121 NTSTATUS Status;
122 PKTHREAD Thread = KeGetCurrentThread();
123 KIRQL OldIrql;
124 PKQUEUE PreviousQueue;
125 PKWAIT_BLOCK WaitBlock;
126 PKWAIT_BLOCK TimerWaitBlock;
127 PKTIMER Timer;
128
129 DPRINT("KeRemoveQueue %x\n", Queue);
130
131 /* Check if the Lock is already held */
132 if (Thread->WaitNext) {
133
134 DPRINT("Lock is already held\n");
135
136 } else {
137
138 /* Lock the Dispatcher Database */
139 DPRINT("Lock not held, acquiring\n");
140 OldIrql = KeAcquireDispatcherDatabaseLock();
141 Thread->WaitIrql = OldIrql;
142 }
143
144 /* This is needed so that we can set the new queue right here, before additional processing */
145 PreviousQueue = Thread->Queue;
146 Thread->Queue = Queue;
147
148 /* Check if this is a different queue */
149 if (Queue != PreviousQueue) {
150
151 /*
152 * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
153 * Queue->ThreadListHead when the thread registers with the queue and unlinked when
154 * the thread registers with a new queue. The Thread->Queue already tells us what
155 * queue the thread is registered with.
156 * -Gunnar
157 */
158 DPRINT("Different Queue\n");
159 if (PreviousQueue) {
160
161 /* Remove from this list */
162 DPRINT("Removing Old Queue\n");
163 RemoveEntryList(&Thread->QueueListEntry);
164
165 /* Wake the queue */
166 DPRINT("Activating new thread\n");
167 KiWakeQueue(PreviousQueue);
168 }
169
170 /* Insert in this new Queue */
171 DPRINT("Inserting new Queue!\n");
172 InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
173
174 } else {
175
176 /* Same queue, decrement waiting threads */
177 DPRINT("Same Queue!\n");
178 Queue->CurrentCount--;
179 }
180
181 /* Loop until the queue is processed */
182 while (TRUE) {
183
184 /* Get the Entry */
185 ListEntry = Queue->EntryListHead.Flink;
186
187 /* Check if the counts are valid and if there is still a queued entry */
188 if ((Queue->CurrentCount < Queue->MaximumCount) &&
189 (ListEntry != &Queue->EntryListHead)) {
190
191 /* Remove the Entry and Save it */
192 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
193 Queue->CurrentCount, Queue->MaximumCount);
194 ListEntry = RemoveHeadList(&Queue->EntryListHead);
195
196 /* Decrease the number of entries */
197 Queue->Header.SignalState--;
198
199 /* Increase numbef of running threads */
200 Queue->CurrentCount++;
201
202 /* Check if the entry is valid. If not, bugcheck */
203 if (!ListEntry->Flink || !ListEntry->Blink) {
204
205 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
206 }
207
208 /* Remove the Entry */
209 RemoveEntryList(ListEntry);
210 ListEntry->Flink = NULL;
211
212 /* Nothing to wait on */
213 break;
214
215 } else {
216
217 /* Do the wait */
218 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
219 Queue->CurrentCount, Queue->MaximumCount);
220
221 /* Use the Thread's Wait Block, it's big enough */
222 Thread->WaitBlockList = &Thread->WaitBlock[0];
223
224 /* Fail if there's an APC Pending */
225 if (WaitMode == UserMode && Thread->ApcState.UserApcPending) {
226
227 /* Return the status and increase the pending threads */
228 ListEntry = (PLIST_ENTRY)STATUS_USER_APC;
229 Queue->CurrentCount++;
230
231 /* Nothing to wait on */
232 break;
233 }
234
235 /* Build the Wait Block */
236 WaitBlock = &Thread->WaitBlock[0];
237 WaitBlock->Object = (PVOID)Queue;
238 WaitBlock->WaitKey = STATUS_SUCCESS;
239 WaitBlock->WaitType = WaitAny;
240 WaitBlock->Thread = Thread;
241 WaitBlock->NextWaitBlock = NULL;
242
243 Thread->WaitStatus = STATUS_SUCCESS;
244
245 /* We need to wait for the object... check if we have a timeout */
246 if (Timeout) {
247
248 /* If it's zero, then don't do any waiting */
249 if (!Timeout->QuadPart) {
250
251 /* Instant Timeout, return the status and increase the pending threads */
252 DPRINT("Queue Wait has timed out\n");
253 ListEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
254 Queue->CurrentCount++;
255
256 /* Nothing to wait on */
257 break;
258 }
259
260 /*
261 * Set up the Timer. We'll use the internal function so that we can
262 * hold on to the dispatcher lock.
263 */
264 Timer = &Thread->Timer;
265 TimerWaitBlock = &Thread->WaitBlock[1];
266
267 /* Set up the Timer Wait Block */
268 TimerWaitBlock->Object = (PVOID)Timer;
269 TimerWaitBlock->Thread = Thread;
270 TimerWaitBlock->WaitKey = STATUS_TIMEOUT;
271 TimerWaitBlock->WaitType = WaitAny;
272 TimerWaitBlock->NextWaitBlock = NULL;
273
274 /* Link the timer to this Wait Block */
275 InitializeListHead(&Timer->Header.WaitListHead);
276 InsertTailList(&Timer->Header.WaitListHead, &TimerWaitBlock->WaitListEntry);
277
278 /* Create Timer */
279 DPRINT("Creating Timer with timeout %I64d\n", *Timeout);
280 KiInsertTimer(Timer, *Timeout);
281 }
282
283 /* Insert the wait block into the Queues's wait list */
284 WaitBlock = Thread->WaitBlockList;
285 InsertTailList(&Queue->Header.WaitListHead, &WaitBlock->WaitListEntry);
286
287 /* Block the Thread */
288 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread);
289 PsBlockThread(&Status,
290 FALSE,
291 WaitMode,
292 WrQueue);
293
294 /* Reset the wait reason */
295 Thread->WaitReason = 0;
296
297 /* Check if we were executing an APC */
298 if (Status != STATUS_KERNEL_APC) {
299
300 /* Done Waiting */
301 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread);
302 return (PLIST_ENTRY)Status;
303 }
304
305 /* Acquire again the lock */
306 DPRINT("Looping again\n");
307 OldIrql = KeAcquireDispatcherDatabaseLock();
308
309 /* Save the new IRQL and decrease number of waiting threads */
310 Thread->WaitIrql = OldIrql;
311 Queue->CurrentCount--;
312 }
313 }
314
315 /* Unlock Database and return */
316 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
317 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
318 Queue->CurrentCount, Queue->MaximumCount);
319 return ListEntry;
320 }
321
322 /*
323 * @implemented
324 */
325 PLIST_ENTRY
326 STDCALL
327 KeRundownQueue(IN PKQUEUE Queue)
328 {
329 PLIST_ENTRY EnumEntry;
330 PLIST_ENTRY FirstEntry;
331 PKTHREAD Thread;
332 KIRQL OldIrql;
333
334 DPRINT("KeRundownQueue(Queue %x)\n", Queue);
335
336 /* Get the Dispatcher Lock */
337 OldIrql = KeAcquireDispatcherDatabaseLock();
338
339 /* Get the First Empty Entry */
340 FirstEntry = Queue->EntryListHead.Flink;
341
342 /* Make sure the list is not empty */
343 if (FirstEntry == &Queue->EntryListHead) {
344
345 /* It is, so don't return anything */
346 EnumEntry = NULL;
347
348 } else {
349
350 /* Remove it */
351 RemoveEntryList(&Queue->EntryListHead);
352 }
353
354 /* Unlink threads and clear their Thread->Queue */
355 while (!IsListEmpty(&Queue->ThreadListHead)) {
356
357 /* Get the Entry and Remove it */
358 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
359
360 /* Get the Entry's Thread */
361 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
362
363 /* Kill its Queue */
364 Thread->Queue = NULL;
365 }
366
367 /* Release the lock and return */
368 KeReleaseDispatcherDatabaseLock(OldIrql);
369 return FirstEntry;
370 }
371
372 /*
373 * Called when a thread which has a queue entry is entering a wait state
374 */
375 VOID
376 FASTCALL
377 KiWakeQueue(IN PKQUEUE Queue)
378 {
379 PLIST_ENTRY QueueEntry;
380 PLIST_ENTRY WaitEntry;
381 PKWAIT_BLOCK WaitBlock;
382
383 /* Decrement the number of active threads */
384 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue, KeGetCurrentThread());
385 Queue->CurrentCount--;
386
387 /* Make sure the counts are OK */
388 if (Queue->CurrentCount < Queue->MaximumCount) {
389
390 /* Get the Queue Entry */
391 QueueEntry = Queue->EntryListHead.Flink;
392
393 /* Get the Wait Entry */
394 WaitEntry = Queue->Header.WaitListHead.Blink;
395 DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry, WaitEntry);
396
397 /* Make sure that the Queue List isn't empty and that this entry is valid */
398 if (!IsListEmpty(&Queue->Header.WaitListHead) &&
399 (QueueEntry != &Queue->EntryListHead)) {
400
401 /* Remove this entry */
402 DPRINT("Queue in List, removing it\n");
403 RemoveEntryList(QueueEntry);
404 QueueEntry->Flink = NULL;
405
406 /* Decrease the Signal State */
407 Queue->Header.SignalState--;
408
409 /* Unwait the Thread */
410 DPRINT("Unwaiting Thread\n");
411 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
412 KiAbortWaitThread(WaitBlock->Thread, (NTSTATUS)QueueEntry);
413 }
414 }
415 }
416
417 /*
418 * Returns the previous number of entries in the queue
419 */
420 LONG
421 STDCALL
422 KiInsertQueue(IN PKQUEUE Queue,
423 IN PLIST_ENTRY Entry,
424 BOOLEAN Head)
425 {
426 ULONG InitialState;
427 PKTHREAD Thread = KeGetCurrentThread();
428 PKWAIT_BLOCK WaitBlock;
429 PLIST_ENTRY WaitEntry;
430
431 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
432
433 /* Save the old state */
434 InitialState = Queue->Header.SignalState;
435
436 /* Get the Entry */
437 WaitEntry = Queue->Header.WaitListHead.Blink;
438 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState, WaitEntry);
439
440 /*
441 * Why the KeGetCurrentThread()->Queue != Queue?
442 * KiInsertQueue might be called from an APC for the current thread.
443 * -Gunnar
444 */
445 if ((Queue->CurrentCount < Queue->MaximumCount) &&
446 (WaitEntry != &Queue->Header.WaitListHead) &&
447 ((Thread->Queue != Queue) || (Thread->WaitReason != WrQueue))) {
448
449 /* Remove the wait entry */
450 DPRINT("Removing Entry\n");
451 RemoveEntryList(WaitEntry);
452
453 /* Get the Wait Block and Thread */
454 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
455 DPRINT("Got wait block: %x\n", WaitBlock);
456 Thread = WaitBlock->Thread;
457
458 /* Reset the wait reason */
459 Thread->WaitReason = 0;
460
461 /* Increase the waiting threads */
462 Queue->CurrentCount++;
463
464 /* Check if there's a Thread Timer */
465 if (Thread->Timer.Header.Inserted) {
466
467 /* Cancel the Thread Timer with the no-lock fastpath */
468 DPRINT("Removing the Thread's Timer\n");
469 Thread->Timer.Header.Inserted = FALSE;
470 RemoveEntryList(&Thread->Timer.TimerListEntry);
471 }
472
473 /* Reschedule the Thread */
474 DPRINT("Unblocking the Thread\n");
475 PsUnblockThread((PETHREAD)Thread, (PNTSTATUS)&Entry, 0);
476
477 } else {
478
479 /* Increase the Entries */
480 DPRINT("Adding new Queue Entry: %d %d\n", Head, Queue->Header.SignalState);
481 Queue->Header.SignalState++;
482
483 if (Head) {
484
485 InsertHeadList(&Queue->EntryListHead, Entry);
486
487 } else {
488
489 InsertTailList(&Queue->EntryListHead, Entry);
490 }
491 }
492
493 /* Return the previous state */
494 DPRINT("Returning\n");
495 return InitialState;
496 }
497
498 /* EOF */