- Implemented InterlockedBitTestAndReset, InterlockedBitTestAndSet, InterlockedExchan...
[reactos.git] / reactos / ntoskrnl / ke / queue.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: ntoskrnl/ke/queue.c
5 * PURPOSE: Implements kernel queues
6 *
7 * PROGRAMMERS: Alex Ionescu (alex@relsoft.net)
8 * Gunnar Dalsnes
9 * Eric Kohl (ekohl@rz-online.de)
10 */
11
12 /* INCLUDES *****************************************************************/
13
14 #include <ntoskrnl.h>
15 #define NDEBUG
16 #include <internal/debug.h>
17
18 /* FUNCTIONS *****************************************************************/
19
20 LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head);
21
22 /*
23 * @implemented
24 */
25 VOID
26 STDCALL
27 KeInitializeQueue(IN PKQUEUE Queue,
28 IN ULONG Count OPTIONAL)
29 {
30 DPRINT("KeInitializeQueue %x\n", Queue);
31
32 /* Initialize the Header */
33 KeInitializeDispatcherHeader(&Queue->Header,
34 QueueObject,
35 sizeof(KQUEUE)/sizeof(ULONG),
36 0);
37
38 /* Initialize the Lists */
39 InitializeListHead(&Queue->EntryListHead);
40 InitializeListHead(&Queue->ThreadListHead);
41
42 /* Set the Current and Maximum Count */
43 Queue->CurrentCount = 0;
44 Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
45 }
46
47 /*
48 * @implemented
49 */
50 LONG
51 STDCALL
52 KeInsertHeadQueue(IN PKQUEUE Queue,
53 IN PLIST_ENTRY Entry)
54 {
55 LONG PreviousState;
56 KIRQL OldIrql;
57
58 DPRINT("KeInsertHeadQueue %x\n", Queue);
59
60 /* Lock the Dispatcher Database */
61 OldIrql = KeAcquireDispatcherDatabaseLock();
62
63 /* Insert the Queue */
64 PreviousState = KiInsertQueue(Queue, Entry, TRUE);
65
66 /* Release the Dispatcher Lock */
67 KeReleaseDispatcherDatabaseLock(OldIrql);
68
69 /* Return previous State */
70 return PreviousState;
71 }
72
73 /*
74 * @implemented
75 */
76 LONG STDCALL
77 KeInsertQueue(IN PKQUEUE Queue,
78 IN PLIST_ENTRY Entry)
79 {
80 LONG PreviousState;
81 KIRQL OldIrql;
82
83 DPRINT("KeInsertQueue %x\n", Queue);
84
85 /* Lock the Dispatcher Database */
86 OldIrql = KeAcquireDispatcherDatabaseLock();
87
88 /* Insert the Queue */
89 PreviousState = KiInsertQueue(Queue, Entry, FALSE);
90
91 /* Release the Dispatcher Lock */
92 KeReleaseDispatcherDatabaseLock(OldIrql);
93
94 /* Return previous State */
95 return PreviousState;
96 }
97
98 /*
99 * @implemented
100 *
101 * Returns number of entries in the queue
102 */
103 LONG
104 STDCALL
105 KeReadStateQueue(IN PKQUEUE Queue)
106 {
107 /* Returns the Signal State */
108 return(Queue->Header.SignalState);
109 }
110
111 /*
112 * @implemented
113 */
114 PLIST_ENTRY
115 STDCALL
116 KeRemoveQueue(IN PKQUEUE Queue,
117 IN KPROCESSOR_MODE WaitMode,
118 IN PLARGE_INTEGER Timeout OPTIONAL)
119 {
120
121 PLIST_ENTRY ListEntry;
122 NTSTATUS Status;
123 PKTHREAD Thread = KeGetCurrentThread();
124 KIRQL OldIrql;
125 PKQUEUE PreviousQueue;
126 PKWAIT_BLOCK WaitBlock;
127 PKWAIT_BLOCK TimerWaitBlock;
128 PKTIMER Timer;
129
130 DPRINT("KeRemoveQueue %x\n", Queue);
131
132 /* Check if the Lock is already held */
133 if (Thread->WaitNext) {
134
135 DPRINT("Lock is already held\n");
136
137 } else {
138
139 /* Lock the Dispatcher Database */
140 DPRINT("Lock not held, acquiring\n");
141 OldIrql = KeAcquireDispatcherDatabaseLock();
142 Thread->WaitIrql = OldIrql;
143 }
144
145 /* This is needed so that we can set the new queue right here, before additional processing */
146 PreviousQueue = Thread->Queue;
147 Thread->Queue = Queue;
148
149 /* Check if this is a different queue */
150 if (Queue != PreviousQueue) {
151
152 /*
153 * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
154 * Queue->ThreadListHead when the thread registers with the queue and unlinked when
155 * the thread registers with a new queue. The Thread->Queue already tells us what
156 * queue the thread is registered with.
157 * -Gunnar
158 */
159 DPRINT("Different Queue\n");
160 if (PreviousQueue) {
161
162 /* Remove from this list */
163 DPRINT("Removing Old Queue\n");
164 RemoveEntryList(&Thread->QueueListEntry);
165
166 /* Wake the queue */
167 DPRINT("Activating new thread\n");
168 KiWakeQueue(PreviousQueue);
169 }
170
171 /* Insert in this new Queue */
172 DPRINT("Inserting new Queue!\n");
173 InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
174
175 } else {
176
177 /* Same queue, decrement waiting threads */
178 DPRINT("Same Queue!\n");
179 Queue->CurrentCount--;
180 }
181
182 /* Loop until the queue is processed */
183 while (TRUE) {
184
185 /* Check if the counts are valid and if there is still a queued entry */
186 if ((Queue->CurrentCount < Queue->MaximumCount) &&
187 !IsListEmpty(&Queue->EntryListHead)) {
188
189 /* Remove the Entry and Save it */
190 DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
191 Queue->CurrentCount, Queue->MaximumCount);
192 ListEntry = Queue->EntryListHead.Flink;
193
194 /* Decrease the number of entries */
195 Queue->Header.SignalState--;
196
197 /* Increase numbef of running threads */
198 Queue->CurrentCount++;
199
200 /* Check if the entry is valid. If not, bugcheck */
201 if (!ListEntry->Flink || !ListEntry->Blink) {
202
203 KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
204 }
205
206 /* Remove the Entry */
207 RemoveEntryList(ListEntry);
208 ListEntry->Flink = NULL;
209
210 /* Nothing to wait on */
211 break;
212
213 } else {
214
215 /* Do the wait */
216 DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
217 Queue->CurrentCount, Queue->MaximumCount);
218
219 /* Use the Thread's Wait Block, it's big enough */
220 Thread->WaitBlockList = &Thread->WaitBlock[0];
221
222 /* Fail if there's an APC Pending */
223 if (WaitMode != KernelMode && Thread->ApcState.UserApcPending) {
224
225 /* Return the status and increase the pending threads */
226 ListEntry = (PLIST_ENTRY)STATUS_USER_APC;
227 Queue->CurrentCount++;
228
229 /* Nothing to wait on */
230 break;
231 }
232
233 /* Build the Wait Block */
234 WaitBlock = &Thread->WaitBlock[0];
235 WaitBlock->Object = (PVOID)Queue;
236 WaitBlock->WaitKey = STATUS_SUCCESS;
237 WaitBlock->WaitType = WaitAny;
238 WaitBlock->Thread = Thread;
239 WaitBlock->NextWaitBlock = WaitBlock;
240
241 Thread->WaitStatus = STATUS_SUCCESS;
242
243 /* We need to wait for the object... check if we have a timeout */
244 if (Timeout) {
245
246 /* If it's zero, then don't do any waiting */
247 if (!Timeout->QuadPart) {
248
249 /* Instant Timeout, return the status and increase the pending threads */
250 DPRINT("Queue Wait has timed out\n");
251 ListEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
252 Queue->CurrentCount++;
253
254 /* Nothing to wait on */
255 break;
256 }
257
258 /*
259 * Set up the Timer. We'll use the internal function so that we can
260 * hold on to the dispatcher lock.
261 */
262 Timer = &Thread->Timer;
263 TimerWaitBlock = &Thread->WaitBlock[1];
264
265 /* Set up the Timer Wait Block */
266 TimerWaitBlock->Object = (PVOID)Timer;
267 TimerWaitBlock->Thread = Thread;
268 TimerWaitBlock->WaitKey = STATUS_TIMEOUT;
269 TimerWaitBlock->WaitType = WaitAny;
270 TimerWaitBlock->NextWaitBlock = TimerWaitBlock;
271
272 /* Link the timer to this Wait Block */
273 InitializeListHead(&Timer->Header.WaitListHead);
274 InsertTailList(&Timer->Header.WaitListHead, &TimerWaitBlock->WaitListEntry);
275
276 /* Create Timer */
277 DPRINT("Creating Timer with timeout %I64d\n", *Timeout);
278 KiInsertTimer(Timer, *Timeout);
279 }
280
281 /* Insert the wait block into the Queues's wait list */
282 WaitBlock = Thread->WaitBlockList;
283 InsertTailList(&Queue->Header.WaitListHead, &WaitBlock->WaitListEntry);
284
285 /* Block the Thread */
286 DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread);
287 KiBlockThread(&Status,
288 FALSE,
289 WaitMode,
290 WrQueue);
291
292 /* Reset the wait reason */
293 Thread->WaitReason = 0;
294
295 /* Check if we were executing an APC */
296 if (Status != STATUS_KERNEL_APC) {
297
298 /* Done Waiting */
299 DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread);
300 return (PLIST_ENTRY)Status;
301 }
302
303 /* Acquire again the lock */
304 DPRINT("Looping again\n");
305 OldIrql = KeAcquireDispatcherDatabaseLock();
306
307 /* Save the new IRQL and decrease number of waiting threads */
308 Thread->WaitIrql = OldIrql;
309 Queue->CurrentCount--;
310 }
311 }
312
313 /* Unlock Database and return */
314 KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
315 DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
316 Queue->CurrentCount, Queue->MaximumCount);
317 return ListEntry;
318 }
319
320 /*
321 * @implemented
322 */
323 PLIST_ENTRY
324 STDCALL
325 KeRundownQueue(IN PKQUEUE Queue)
326 {
327 PLIST_ENTRY EnumEntry;
328 PLIST_ENTRY FirstEntry = NULL;
329 PKTHREAD Thread;
330 KIRQL OldIrql;
331
332 DPRINT("KeRundownQueue(Queue %x)\n", Queue);
333
334 /* Get the Dispatcher Lock */
335 OldIrql = KeAcquireDispatcherDatabaseLock();
336
337 /* Make sure the list is not empty */
338 if (!IsListEmpty(&Queue->EntryListHead))
339 {
340 /* Remove it */
341 FirstEntry = RemoveHeadList(&Queue->EntryListHead);
342 }
343
344 /* Unlink threads and clear their Thread->Queue */
345 while (!IsListEmpty(&Queue->ThreadListHead)) {
346
347 /* Get the Entry and Remove it */
348 EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
349
350 /* Get the Entry's Thread */
351 Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
352
353 /* Kill its Queue */
354 Thread->Queue = NULL;
355 }
356
357 /* Release the lock and return */
358 KeReleaseDispatcherDatabaseLock(OldIrql);
359 return FirstEntry;
360 }
361
362 /*
363 * Called when a thread which has a queue entry is entering a wait state
364 */
365 VOID
366 FASTCALL
367 KiWakeQueue(IN PKQUEUE Queue)
368 {
369 PLIST_ENTRY QueueEntry;
370 PLIST_ENTRY WaitEntry;
371 PKWAIT_BLOCK WaitBlock;
372
373 /* Decrement the number of active threads */
374 DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue, KeGetCurrentThread());
375 Queue->CurrentCount--;
376
377 /* Make sure the counts are OK */
378 if (Queue->CurrentCount < Queue->MaximumCount) {
379
380 /* Get the Queue Entry */
381 QueueEntry = Queue->EntryListHead.Flink;
382
383 /* Get the Wait Entry */
384 WaitEntry = Queue->Header.WaitListHead.Blink;
385 DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry, WaitEntry);
386
387 /* Make sure that the Queue List isn't empty and that this entry is valid */
388 if (!IsListEmpty(&Queue->Header.WaitListHead) &&
389 (QueueEntry != &Queue->EntryListHead)) {
390
391 /* Remove this entry */
392 DPRINT("Queue in List, removing it\n");
393 RemoveEntryList(QueueEntry);
394 QueueEntry->Flink = NULL;
395
396 /* Decrease the Signal State */
397 Queue->Header.SignalState--;
398
399 /* Unwait the Thread */
400 DPRINT("Unwaiting Thread\n");
401 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
402 KiAbortWaitThread(WaitBlock->Thread, (NTSTATUS)QueueEntry, IO_NO_INCREMENT);
403 }
404 }
405 }
406
407 /*
408 * Returns the previous number of entries in the queue
409 */
410 LONG
411 STDCALL
412 KiInsertQueue(IN PKQUEUE Queue,
413 IN PLIST_ENTRY Entry,
414 BOOLEAN Head)
415 {
416 ULONG InitialState;
417 PKTHREAD Thread = KeGetCurrentThread();
418 PKWAIT_BLOCK WaitBlock;
419 PLIST_ENTRY WaitEntry;
420
421 DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
422
423 /* Save the old state */
424 InitialState = Queue->Header.SignalState;
425
426 /* Get the Entry */
427 WaitEntry = Queue->Header.WaitListHead.Blink;
428 DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState, WaitEntry);
429
430 /*
431 * Why the KeGetCurrentThread()->Queue != Queue?
432 * KiInsertQueue might be called from an APC for the current thread.
433 * -Gunnar
434 */
435 if ((Queue->CurrentCount < Queue->MaximumCount) &&
436 (WaitEntry != &Queue->Header.WaitListHead) &&
437 ((Thread->Queue != Queue) || (Thread->WaitReason != WrQueue))) {
438
439 /* Remove the wait entry */
440 DPRINT("Removing Entry\n");
441 RemoveEntryList(WaitEntry);
442
443 /* Get the Wait Block and Thread */
444 WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
445 DPRINT("Got wait block: %x\n", WaitBlock);
446 Thread = WaitBlock->Thread;
447
448 /* Reset the wait reason */
449 Thread->WaitReason = 0;
450
451 /* Increase the waiting threads */
452 Queue->CurrentCount++;
453
454 /* Check if there's a Thread Timer */
455 if (Thread->Timer.Header.Inserted) {
456
457 /* Cancel the Thread Timer with the no-lock fastpath */
458 DPRINT("Removing the Thread's Timer\n");
459 Thread->Timer.Header.Inserted = FALSE;
460 RemoveEntryList(&Thread->Timer.TimerListEntry);
461 }
462
463 /* Reschedule the Thread */
464 DPRINT("Unblocking the Thread\n");
465 KiUnblockThread(Thread, (PNTSTATUS)&Entry, 0);
466
467 } else {
468
469 /* Increase the Entries */
470 DPRINT("Adding new Queue Entry: %d %d\n", Head, Queue->Header.SignalState);
471 Queue->Header.SignalState++;
472
473 if (Head) {
474
475 InsertHeadList(&Queue->EntryListHead, Entry);
476
477 } else {
478
479 InsertTailList(&Queue->EntryListHead, Entry);
480 }
481 }
482
483 /* Return the previous state */
484 DPRINT("Returning\n");
485 return InitialState;
486 }
487
488 /* EOF */