830bb63dd8606b9aa1ca128a004d4f6ab49d5826
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 *
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
8 * Gunnar Dalsnes
9 * Eric Kohl (ekohl@rz-online.de)
10 */
11
12 /* INCLUDES *****************************************************************/
13
14 #include <ntoskrnl.h>
15 #define NDEBUG
16 #include <internal/debug.h>
17
18 /* FUNCTIONS *****************************************************************/
19
20 LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head);
21
22 /*
23 * @implemented
24 */
25 VOID
26 STDCALL
27 KeInitializeQueue(IN PKQUEUE Queue,
28 IN ULONG Count OPTIONAL)
29 {
30 DPRINT("KeInitializeQueue %x\n", Queue);
31
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue->Header,
34 QueueObject,
35 sizeof(KQUEUE)/sizeof(ULONG),
36 0);
37
38 /* Initialize the Lists */
39 InitializeListHead(&Queue->EntryListHead);
40 InitializeListHead(&Queue->ThreadListHead);
41
42 /* Set the Current and Maximum Count */
43 Queue->CurrentCount = 0;
44 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
45 }
46
47 /*
48 * @implemented
49 */
50 LONG
51 STDCALL
52 KeInsertHeadQueue(IN PKQUEUE Queue,
53 IN PLIST_ENTRY Entry)
54 {
55 LONG PreviousState;
56 KIRQL OldIrql;
57
58 DPRINT("KeInsertHeadQueue %x\n", Queue);
59
60 /* Lock the Dispatcher Database */
61 OldIrql = KeAcquireDispatcherDatabaseLock();
62
63 /* Insert the Queue */
64 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
65
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql);
68
69 /* Return previous State */
70 return PreviousState;
71 }
72
73 /*
74 * @implemented
75 */
76 LONG STDCALL
77 KeInsertQueue(IN PKQUEUE Queue,
78 IN PLIST_ENTRY Entry)
79 {
80 LONG PreviousState;
81 KIRQL OldIrql;
82
83 DPRINT("KeInsertQueue %x\n", Queue);
84
85 /* Lock the Dispatcher Database */
86 OldIrql = KeAcquireDispatcherDatabaseLock();
87
88 /* Insert the Queue */
89 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
90
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql);
93
94 /* Return previous State */
95 return PreviousState;
96 }
97
98 /*
99 * @implemented
100 *
101 * Returns number of entries in the queue
102 */
103 LONG
104 STDCALL
105 KeReadStateQueue(IN PKQUEUE Queue)
106 {
107 /* Returns the Signal State */
108 return(Queue->Header.SignalState);
109 }
110
111 /*
112 * @implemented
113 */
114 PLIST_ENTRY
115 STDCALL
116 KeRemoveQueue(IN PKQUEUE Queue,
117 IN KPROCESSOR_MODE WaitMode,
118 IN PLARGE_INTEGER Timeout OPTIONAL)
119 {
120
121 PLIST_ENTRY ListEntry;
122 NTSTATUS Status;
123 PKTHREAD Thread = KeGetCurrentThread();
124 KIRQL OldIrql;
125 PKQUEUE PreviousQueue;
126 PKWAIT_BLOCK WaitBlock;
127 PKWAIT_BLOCK TimerWaitBlock;
128 PKTIMER Timer;
129
130 DPRINT("KeRemoveQueue %x\n", Queue);
131
132 /* Check if the Lock is already held */
133 if (Thread->WaitNext) {
134
135 DPRINT("Lock is already held\n");
136
137 } else {
138
139 /* Lock the Dispatcher Database */
140 DPRINT("Lock not held, acquiring\n");
141 OldIrql = KeAcquireDispatcherDatabaseLock();
142 Thread->WaitIrql = OldIrql;
143 }
144
145 /* This is needed so that we can set the new queue right here, before additional processing */
146 PreviousQueue = Thread->Queue;
147 Thread->Queue = Queue;
148
149 /* Check if this is a different queue */
150 if (Queue != PreviousQueue) {
151
152 /*
153 * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
154 * Queue->ThreadListHead when the thread registers with the queue and unlinked when
155 * the thread registers with a new queue. The Thread->Queue already tells us what
156 * queue the thread is registered with.
157 * -Gunnar
158 */
159 DPRINT("Different Queue\n");
160 if (PreviousQueue) {
161
162 /* Remove from this list */
163 DPRINT("Removing Old Queue\n");
164 RemoveEntryList(&Thread->QueueListEntry);
165
166 /* Wake the queue */
167 DPRINT("Activating new thread\n");
168 KiWakeQueue(PreviousQueue);
169 }
170
171 /* Insert in this new Queue */
172 DPRINT("Inserting new Queue!\n");
173 InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
174
175 } else {
176
177 /* Same queue, decrement waiting threads */
178 DPRINT("Same Queue!\n");
179 Queue->CurrentCount--;
180 }
181
182 /* Loop until the queue is processed */
183 while (TRUE) {
184
185 /* Get the Entry */
186 ListEntry = Queue->EntryListHead.Flink;
187
188 /* Check if the counts are valid and if there is still a queued entry */
189 if ((Queue->CurrentCount < Queue->MaximumCount) &&
190 (ListEntry != &Queue->EntryListHead)) {
191
192 /* Remove the Entry and Save it */
193 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
194 Queue->CurrentCount, Queue->MaximumCount);
195 ListEntry = RemoveHeadList(&Queue->EntryListHead);
196
197 /* Decrease the number of entries */
198 Queue->Header.SignalState--;
199
200 /* Increase numbef of running threads */
201 Queue->CurrentCount++;
202
203 /* Check if the entry is valid. If not, bugcheck */
204 if (!ListEntry->Flink || !ListEntry->Blink) {
205
206 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
207 }
208
209 /* Remove the Entry */
210 RemoveEntryList(ListEntry);
211 ListEntry->Flink = NULL;
212
213 /* Nothing to wait on */
214 break;
215
216 } else {
217
218 /* Do the wait */
219 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
220 Queue->CurrentCount, Queue->MaximumCount);
221
222 /* Use the Thread's Wait Block, it's big enough */
223 Thread->WaitBlockList = &Thread->WaitBlock[0];
224
225 /* Fail if there's an APC Pending */
226 if (WaitMode == UserMode && Thread->ApcState.UserApcPending) {
227
228 /* Return the status and increase the pending threads */
229 ListEntry = (PLIST_ENTRY)STATUS_USER_APC;
230 Queue->CurrentCount++;
231
232 /* Nothing to wait on */
233 break;
234 }
235
236 /* Build the Wait Block */
237 WaitBlock = &Thread->WaitBlock[0];
238 WaitBlock->Object = (PVOID)Queue;
239 WaitBlock->WaitKey = STATUS_SUCCESS;
240 WaitBlock->WaitType = WaitAny;
241 WaitBlock->Thread = Thread;
242 WaitBlock->NextWaitBlock = WaitBlock;
243
244 Thread->WaitStatus = STATUS_SUCCESS;
245
246 /* We need to wait for the object... check if we have a timeout */
247 if (Timeout) {
248
249 /* If it's zero, then don't do any waiting */
250 if (!Timeout->QuadPart) {
251
252 /* Instant Timeout, return the status and increase the pending threads */
253 DPRINT("Queue Wait has timed out\n");
254 ListEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
255 Queue->CurrentCount++;
256
257 /* Nothing to wait on */
258 break;
259 }
260
261 /*
262 * Set up the Timer. We'll use the internal function so that we can
263 * hold on to the dispatcher lock.
264 */
265 Timer = &Thread->Timer;
266 TimerWaitBlock = &Thread->WaitBlock[1];
267
268 /* Set up the Timer Wait Block */
269 TimerWaitBlock->Object = (PVOID)Timer;
270 TimerWaitBlock->Thread = Thread;
271 TimerWaitBlock->WaitKey = STATUS_TIMEOUT;
272 TimerWaitBlock->WaitType = WaitAny;
273 TimerWaitBlock->NextWaitBlock = TimerWaitBlock;
274
275 /* Link the timer to this Wait Block */
276 InitializeListHead(&Timer->Header.WaitListHead);
277 InsertTailList(&Timer->Header.WaitListHead, &TimerWaitBlock->WaitListEntry);
278
279 /* Create Timer */
280 DPRINT("Creating Timer with timeout %I64d\n", *Timeout);
281 KiInsertTimer(Timer, *Timeout);
282 }
283
284 /* Insert the wait block into the Queues's wait list */
285 WaitBlock = Thread->WaitBlockList;
286 InsertTailList(&Queue->Header.WaitListHead, &WaitBlock->WaitListEntry);
287
288 /* Block the Thread */
289 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread);
290 KiBlockThread(&Status,
291 FALSE,
292 WaitMode,
293 WrQueue);
294
295 /* Reset the wait reason */
296 Thread->WaitReason = 0;
297
298 /* Check if we were executing an APC */
299 if (Status != STATUS_KERNEL_APC) {
300
301 /* Done Waiting */
302 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread);
303 return (PLIST_ENTRY)Status;
304 }
305
306 /* Acquire again the lock */
307 DPRINT("Looping again\n");
308 OldIrql = KeAcquireDispatcherDatabaseLock();
309
310 /* Save the new IRQL and decrease number of waiting threads */
311 Thread->WaitIrql = OldIrql;
312 Queue->CurrentCount--;
313 }
314 }
315
316 /* Unlock Database and return */
317 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
318 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
319 Queue->CurrentCount, Queue->MaximumCount);
320 return ListEntry;
321 }
322
323 /*
324 * @implemented
325 */
326 PLIST_ENTRY
327 STDCALL
328 KeRundownQueue(IN PKQUEUE Queue)
329 {
330 PLIST_ENTRY EnumEntry;
331 PLIST_ENTRY FirstEntry;
332 PKTHREAD Thread;
333 KIRQL OldIrql;
334
335 DPRINT("KeRundownQueue(Queue %x)\n", Queue);
336
337 /* Get the Dispatcher Lock */
338 OldIrql = KeAcquireDispatcherDatabaseLock();
339
340 /* Get the First Empty Entry */
341 FirstEntry = Queue->EntryListHead.Flink;
342
343 /* Make sure the list is not empty */
344 if (FirstEntry == &Queue->EntryListHead) {
345
346 /* It is, so don't return anything */
347 EnumEntry = NULL;
348
349 } else {
350
351 /* Remove it */
352 RemoveEntryList(&Queue->EntryListHead);
353 }
354
355 /* Unlink threads and clear their Thread->Queue */
356 while (!IsListEmpty(&Queue->ThreadListHead)) {
357
358 /* Get the Entry and Remove it */
359 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
360
361 /* Get the Entry's Thread */
362 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
363
364 /* Kill its Queue */
365 Thread->Queue = NULL;
366 }
367
368 /* Release the lock and return */
369 KeReleaseDispatcherDatabaseLock(OldIrql);
370 return FirstEntry;
371 }
372
373 /*
374 * Called when a thread which has a queue entry is entering a wait state
375 */
376 VOID
377 FASTCALL
378 KiWakeQueue(IN PKQUEUE Queue)
379 {
380 PLIST_ENTRY QueueEntry;
381 PLIST_ENTRY WaitEntry;
382 PKWAIT_BLOCK WaitBlock;
383
384 /* Decrement the number of active threads */
385 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue, KeGetCurrentThread());
386 Queue->CurrentCount--;
387
388 /* Make sure the counts are OK */
389 if (Queue->CurrentCount < Queue->MaximumCount) {
390
391 /* Get the Queue Entry */
392 QueueEntry = Queue->EntryListHead.Flink;
393
394 /* Get the Wait Entry */
395 WaitEntry = Queue->Header.WaitListHead.Blink;
396 DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry, WaitEntry);
397
398 /* Make sure that the Queue List isn't empty and that this entry is valid */
399 if (!IsListEmpty(&Queue->Header.WaitListHead) &&
400 (QueueEntry != &Queue->EntryListHead)) {
401
402 /* Remove this entry */
403 DPRINT("Queue in List, removing it\n");
404 RemoveEntryList(QueueEntry);
405 QueueEntry->Flink = NULL;
406
407 /* Decrease the Signal State */
408 Queue->Header.SignalState--;
409
410 /* Unwait the Thread */
411 DPRINT("Unwaiting Thread\n");
412 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
413 KiAbortWaitThread(WaitBlock->Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
414 }
415 }
416 }
417
418 /*
419 * Returns the previous number of entries in the queue
420 */
421 LONG
422 STDCALL
423 KiInsertQueue(IN PKQUEUE Queue,
424 IN PLIST_ENTRY Entry,
425 BOOLEAN Head)
426 {
427 ULONG InitialState;
428 PKTHREAD Thread = KeGetCurrentThread();
429 PKWAIT_BLOCK WaitBlock;
430 PLIST_ENTRY WaitEntry;
431
432 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
433
434 /* Save the old state */
435 InitialState = Queue->Header.SignalState;
436
437 /* Get the Entry */
438 WaitEntry = Queue->Header.WaitListHead.Blink;
439 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState, WaitEntry);
440
441 /*
442 * Why the KeGetCurrentThread()->Queue != Queue?
443 * KiInsertQueue might be called from an APC for the current thread.
444 * -Gunnar
445 */
446 if ((Queue->CurrentCount < Queue->MaximumCount) &&
447 (WaitEntry != &Queue->Header.WaitListHead) &&
448 ((Thread->Queue != Queue) || (Thread->WaitReason != WrQueue))) {
449
450 /* Remove the wait entry */
451 DPRINT("Removing Entry\n");
452 RemoveEntryList(WaitEntry);
453
454 /* Get the Wait Block and Thread */
455 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
456 DPRINT("Got wait block: %x\n", WaitBlock);
457 Thread = WaitBlock->Thread;
458
459 /* Reset the wait reason */
460 Thread->WaitReason = 0;
461
462 /* Increase the waiting threads */
463 Queue->CurrentCount++;
464
465 /* Check if there's a Thread Timer */
466 if (Thread->Timer.Header.Inserted) {
467
468 /* Cancel the Thread Timer with the no-lock fastpath */
469 DPRINT("Removing the Thread's Timer\n");
470 Thread->Timer.Header.Inserted = FALSE;
471 RemoveEntryList(&Thread->Timer.TimerListEntry);
472 }
473
474 /* Reschedule the Thread */
475 DPRINT("Unblocking the Thread\n");
476 KiUnblockThread(Thread, (PNTSTATUS)&Entry, 0);
477
478 } else {
479
480 /* Increase the Entries */
481 DPRINT("Adding new Queue Entry: %d %d\n", Head, Queue->Header.SignalState);
482 Queue->Header.SignalState++;
483
484 if (Head) {
485
486 InsertHeadList(&Queue->EntryListHead, Entry);
487
488 } else {
489
490 InsertTailList(&Queue->EntryListHead, Entry);
491 }
492 }
493
494 /* Return the previous state */
495 DPRINT("Returning\n");
496 return InitialState;
497 }
498
499 /* EOF */