Fix stack corruption. Thanks to Waxdragon and the fact he uses an -O2 build (hint...
[reactos.git] / reactos / ntoskrnl / ex / work.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS Kernel
4 * FILE: ntoskrnl/ex/work.c
5 * PURPOSE: Manage system work queues and worker threads
6 * PROGRAMMER: Alex Ionescu (alex@relsoft.net)
7 */
8
9 /* INCLUDES ******************************************************************/
10
11 #include <ntoskrnl.h>
12 #define NDEBUG
13 #include <internal/debug.h>
14
15 #if defined (ALLOC_PRAGMA)
16 #pragma alloc_text(INIT, ExpInitializeWorkerThreads)
17 #endif
18
19 /* DATA **********************************************************************/
20
21 /* Number of worker threads for each Queue */
22 #define EX_HYPERCRITICAL_WORK_THREADS 1
23 #define EX_DELAYED_WORK_THREADS 3
24 #define EX_CRITICAL_WORK_THREADS 5
25
26 /* Magic flag for dynamic worker threads */
27 #define EX_DYNAMIC_WORK_THREAD 0x80000000
28
29 /* Worker thread priority increments (added to base priority) */
30 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT 7
31 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT 5
32 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT 4
33
34 /* The actual worker queue array */
35 EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
36
37 /* Accounting of the total threads and registry hacked threads */
38 ULONG ExpCriticalWorkerThreads;
39 ULONG ExpDelayedWorkerThreads;
40 ULONG ExpAdditionalCriticalWorkerThreads;
41 ULONG ExpAdditionalDelayedWorkerThreads;
42
43 /* Future support for stack swapping worker threads */
44 BOOLEAN ExpWorkersCanSwap;
45 LIST_ENTRY ExpWorkerListHead;
46 KMUTANT ExpWorkerSwapinMutex;
47
48 /* The worker balance set manager events */
49 KEVENT ExpThreadSetManagerEvent;
50 KEVENT ExpThreadSetManagerShutdownEvent;
51
52 /* Thread pointers for future worker thread shutdown support */
53 PETHREAD ExpWorkerThreadBalanceManagerPtr;
54 PETHREAD ExpLastWorkerThread;
55
56 /* PRIVATE FUNCTIONS *********************************************************/
57
58 /*++
59 * @name ExpWorkerThreadEntryPoint
60 *
61 * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
62 * worker thread created by teh system.
63 *
64 * @param Context
65 * Contains the work queue type masked with a flag specifing whether the
66 * thread is dynamic or not.
67 *
68 * @return None.
69 *
70 * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
71 * while a static thread will never timeout.
72 *
73 * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
74 * active impersonation info, and must not have disabled APCs.
75 *
76 * NB: We will re-enable APCs for broken threads but all other cases
77 * will generate a bugcheck.
78 *
79 *--*/
80 VOID
81 NTAPI
82 ExpWorkerThreadEntryPoint(IN PVOID Context)
83 {
84 PWORK_QUEUE_ITEM WorkItem;
85 PLIST_ENTRY QueueEntry;
86 WORK_QUEUE_TYPE WorkQueueType;
87 PEX_WORK_QUEUE WorkQueue;
88 LARGE_INTEGER Timeout;
89 PLARGE_INTEGER TimeoutPointer = NULL;
90 PETHREAD Thread = PsGetCurrentThread();
91 KPROCESSOR_MODE WaitMode;
92 EX_QUEUE_WORKER_INFO OldValue, NewValue;
93
94 /* Check if this is a dyamic thread */
95 if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
96 {
97 /* It is, which means we will eventually time out after 10 minutes */
98 Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
99 TimeoutPointer = &Timeout;
100 }
101
102 /* Get Queue Type and Worker Queue */
103 WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
104 ~EX_DYNAMIC_WORK_THREAD);
105 WorkQueue = &ExWorkerQueue[WorkQueueType];
106
107 /* Select the wait mode */
108 WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
109
110 /* Nobody should have initialized this yet, do it now */
111 ASSERT(Thread->ExWorkerCanWaitUser == 0);
112 if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
113
114 /* If we shouldn't swap, disable that feature */
115 if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
116
117 /* Set the worker flags */
118 do
119 {
120 /* Check if the queue is being disabled */
121 if (WorkQueue->Info.QueueDisabled)
122 {
123 /* Re-enable stack swapping and kill us */
124 KeSetKernelStackSwapEnable(TRUE);
125 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
126 }
127
128 /* Increase the worker count */
129 OldValue = WorkQueue->Info;
130 NewValue = OldValue;
131 NewValue.WorkerCount++;
132 }
133 while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
134 *(PLONG)&NewValue,
135 *(PLONG)&OldValue) != *(PLONG)&OldValue);
136
137 /* Success, you are now officially a worker thread! */
138 Thread->ActiveExWorker = TRUE;
139
140 /* Loop forever */
141 ProcessLoop:
142 for (;;)
143 {
144 /* Wait for Something to Happen on the Queue */
145 QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
146 WaitMode,
147 TimeoutPointer);
148
149 /* Check if we timed out and quit this loop in that case */
150 if ((NTSTATUS)QueueEntry == STATUS_TIMEOUT) break;
151
152 /* Increment Processed Work Items */
153 InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
154
155 /* Get the Work Item */
156 WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
157
158 /* Make sure nobody is trying to play smart with us */
159 ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress);
160
161 /* Call the Worker Routine */
162 WorkItem->WorkerRoutine(WorkItem->Parameter);
163
164 /* Make sure APCs are not disabled */
165 if (Thread->Tcb.SpecialApcDisable)
166 {
167 /* We're nice and do it behind your back */
168 DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
169 "with APCs disabled!\n",
170 WorkItem->WorkerRoutine,
171 WorkItem->Parameter,
172 WorkItem);
173 Thread->Tcb.SpecialApcDisable = 0;
174 }
175
176 /* Make sure it returned at right IRQL */
177 if (KeGetCurrentIrql() != PASSIVE_LEVEL)
178 {
179 /* It didn't, bugcheck! */
180 KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
181 (ULONG_PTR)WorkItem->WorkerRoutine,
182 KeGetCurrentIrql(),
183 (ULONG_PTR)WorkItem->Parameter,
184 (ULONG_PTR)WorkItem);
185 }
186
187 /* Make sure it returned with Impersionation Disabled */
188 if (Thread->ActiveImpersonationInfo)
189 {
190 /* It didn't, bugcheck! */
191 KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD,
192 (ULONG_PTR)WorkItem->WorkerRoutine,
193 (ULONG_PTR)WorkItem->Parameter,
194 (ULONG_PTR)WorkItem,
195 0);
196 }
197 }
198
199 /* This is a dynamic thread. Terminate it unless IRPs are pending */
200 if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
201
202 /* Don't terminate it if the queue is disabled either */
203 if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
204
205 /* Set the worker flags */
206 do
207 {
208 /* Decrease the worker count */
209 OldValue = WorkQueue->Info;
210 NewValue = OldValue;
211 NewValue.WorkerCount--;
212 }
213 while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
214 *(PLONG)&NewValue,
215 *(PLONG)&OldValue) != *(PLONG)&OldValue);
216
217 /* Decrement dynamic thread count */
218 InterlockedDecrement(&WorkQueue->DynamicThreadCount);
219
220 /* We're not a worker thread anymore */
221 Thread->ActiveExWorker = FALSE;
222
223 /* Re-enable the stack swap */
224 KeSetKernelStackSwapEnable(TRUE);
225 return;
226 }
227
228 /*++
229 * @name ExpCreateWorkerThread
230 *
231 * The ExpCreateWorkerThread routine creates a new worker thread for the
232 * specified queue.
233 **
234 * @param QueueType
235 * Type of the queue to use for this thread. Valid values are:
236 * - DelayedWorkQueue
237 * - CriticalWorkQueue
238 * - HyperCriticalWorkQueue
239 *
240 * @param Dynamic
241 * Specifies whether or not this thread is a dynamic thread.
242 *
243 * @return None.
244 *
245 * @remarks HyperCritical work threads run at priority 7; Critical work threads
246 * run at priority 5, and delayed work threads run at priority 4.
247 *
248 * This, worker threads cannot pre-empty a normal user-mode thread.
249 *
250 *--*/
251 VOID
252 NTAPI
253 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
254 IN BOOLEAN Dynamic)
255 {
256 PETHREAD Thread;
257 HANDLE hThread;
258 ULONG Context;
259 KPRIORITY Priority;
260
261 /* Check if this is going to be a dynamic thread */
262 Context = WorkQueueType;
263
264 /* Add the dynamic mask */
265 if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
266
267 /* Create the System Thread */
268 PsCreateSystemThread(&hThread,
269 THREAD_ALL_ACCESS,
270 NULL,
271 NULL,
272 NULL,
273 ExpWorkerThreadEntryPoint,
274 (PVOID)Context);
275
276 /* If the thread is dynamic */
277 if (Dynamic)
278 {
279 /* Increase the count */
280 InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
281 }
282
283 /* Set the priority */
284 if (WorkQueueType == DelayedWorkQueue)
285 {
286 /* Priority == 4 */
287 Priority = EX_DELAYED_QUEUE_PRIORITY_INCREMENT;
288 }
289 else if (WorkQueueType == CriticalWorkQueue)
290 {
291 /* Priority == 5 */
292 Priority = EX_CRITICAL_QUEUE_PRIORITY_INCREMENT;
293 }
294 else
295 {
296 /* Priority == 7 */
297 Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT;
298 }
299
300 /* Get the Thread */
301 ObReferenceObjectByHandle(hThread,
302 THREAD_SET_INFORMATION,
303 PsThreadType,
304 KernelMode,
305 (PVOID*)&Thread,
306 NULL);
307
308 /* Set the Priority */
309 KeSetBasePriorityThread(&Thread->Tcb, Priority);
310
311 /* Dereference and close handle */
312 ObDereferenceObject(Thread);
313 ZwClose(hThread);
314 }
315
316 /*++
317 * @name ExpCheckDynamicThreadCount
318 *
319 * The ExpCheckDynamicThreadCount routine checks every queue and creates a
320 * dynamic thread if the queue seems to be deadlocked.
321 *
322 * @param None
323 *
324 * @return None.
325 *
326 * @remarks The algorithm for deciding if a new thread must be created is
327 * based on wether the queue has processed no new items in the last
328 * second, and new items are still enqueued.
329 *
330 *--*/
331 VOID
332 NTAPI
333 ExpDetectWorkerThreadDeadlock(VOID)
334 {
335 ULONG i;
336 PEX_WORK_QUEUE Queue;
337
338 /* Loop the 3 queues */
339 for (i = 0; i < MaximumWorkQueue; i++)
340 {
341 /* Get the queue */
342 Queue = &ExWorkerQueue[i];
343 ASSERT(Queue->DynamicThreadCount <= 16);
344
345 /* Check if stuff is on the queue that still is unprocessed */
346 if ((Queue->QueueDepthLastPass) &&
347 (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) &&
348 (Queue->DynamicThreadCount < 16))
349 {
350 /* Stuff is still on the queue and nobody did anything about it */
351 DPRINT1("EX: Work Queue Deadlock detected: %d\n", i);
352 ExpCreateWorkerThread(i, TRUE);
353 DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount);
354 }
355
356 /* Update our data */
357 Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
358 Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue);
359 }
360 }
361
362 /*++
363 * @name ExpCheckDynamicThreadCount
364 *
365 * The ExpCheckDynamicThreadCount routine checks every queue and creates a
366 * dynamic thread if the queue requires one.
367 *
368 * @param None
369 *
370 * @return None.
371 *
372 * @remarks The algorithm for deciding if a new thread must be created is
373 * documented in the ExQueueWorkItem routine.
374 *
375 *--*/
376 VOID
377 NTAPI
378 ExpCheckDynamicThreadCount(VOID)
379 {
380 ULONG i;
381 PEX_WORK_QUEUE Queue;
382
383 /* Loop the 3 queues */
384 for (i = 0; i < MaximumWorkQueue; i++)
385 {
386 /* Get the queue */
387 Queue = &ExWorkerQueue[i];
388
389 /* Check if still need a new thread. See ExQueueWorkItem */
390 if ((Queue->Info.MakeThreadsAsNecessary) &&
391 (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
392 (Queue->WorkerQueue.CurrentCount <
393 Queue->WorkerQueue.MaximumCount) &&
394 (Queue->DynamicThreadCount < 16))
395 {
396 /* Create a new thread */
397 DPRINT1("EX: Creating new dynamic thread as requested\n");
398 ExpCreateWorkerThread(i, TRUE);
399 }
400 }
401 }
402
403 /*++
404 * @name ExpWorkerThreadBalanceManager
405 *
406 * The ExpWorkerThreadBalanceManager routine is the entrypoint for the
407 * worker thread balance set manager.
408 *
409 * @param Context
410 * Unused.
411 *
412 * @return None.
413 *
414 * @remarks The worker thread balance set manager listens every second, but can
415 * also be woken up by an event when a new thread is needed, or by the
416 * special shutdown event. This thread runs at priority 7.
417 *
418 * This routine must run at IRQL == PASSIVE_LEVEL.
419 *
420 *--*/
421 VOID
422 NTAPI
423 ExpWorkerThreadBalanceManager(IN PVOID Context)
424 {
425 KTIMER Timer;
426 LARGE_INTEGER Timeout;
427 NTSTATUS Status;
428 PVOID WaitEvents[3];
429 PAGED_CODE();
430 UNREFERENCED_PARAMETER(Context);
431
432 /* Raise our priority above all other worker threads */
433 KeSetBasePriorityThread(KeGetCurrentThread(),
434 EX_CRITICAL_QUEUE_PRIORITY_INCREMENT + 1);
435
436 /* Setup the timer */
437 KeInitializeTimer(&Timer);
438 Timeout.QuadPart = Int32x32To64(-1, 10000000);
439
440 /* We'll wait on the periodic timer and also the emergency event */
441 WaitEvents[0] = &Timer;
442 WaitEvents[1] = &ExpThreadSetManagerEvent;
443 WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
444
445 /* Start wait loop */
446 for (;;)
447 {
448 /* Wait for the timer */
449 KeSetTimer(&Timer, Timeout, NULL);
450 Status = KeWaitForMultipleObjects(3,
451 WaitEvents,
452 WaitAny,
453 Executive,
454 KernelMode,
455 FALSE,
456 NULL,
457 NULL);
458 if (Status == 0)
459 {
460 /* Our timer expired. Check for deadlocks */
461 ExpDetectWorkerThreadDeadlock();
462 }
463 else if (Status == 1)
464 {
465 /* Someone notified us, verify if we should create a new thread */
466 ExpCheckDynamicThreadCount();
467 }
468 else if (Status == 2)
469 {
470 /* We are shutting down. Cancel the timer */
471 DPRINT1("System shutdown\n");
472 KeCancelTimer(&Timer);
473
474 /* Make sure we have a final thread */
475 ASSERT(ExpLastWorkerThread);
476
477 /* Wait for it */
478 KeWaitForSingleObject(ExpLastWorkerThread,
479 Executive,
480 KernelMode,
481 FALSE,
482 NULL);
483
484 /* Dereference it and kill us */
485 ObDereferenceObject(ExpLastWorkerThread);
486 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
487 }
488 }
489 }
490
491 /*++
492 * @name ExpInitializeWorkerThreads
493 *
494 * The ExpInitializeWorkerThreads routine initializes worker thread and
495 * work queue support.
496 *
497 * @param None.
498 *
499 * @return None.
500 *
501 * @remarks This routine is only called once during system initialization.
502 *
503 *--*/
504 VOID
505 INIT_FUNCTION
506 NTAPI
507 ExpInitializeWorkerThreads(VOID)
508 {
509 ULONG WorkQueueType;
510 ULONG CriticalThreads, DelayedThreads;
511 HANDLE ThreadHandle;
512 PETHREAD Thread;
513 ULONG i;
514
515 /* Setup the stack swap support */
516 KeInitializeMutex(&ExpWorkerSwapinMutex, FALSE);
517 InitializeListHead(&ExpWorkerListHead);
518 ExpWorkersCanSwap = TRUE;
519
520 /* Set the number of critical and delayed threads. We shouldn't hardcode */
521 DelayedThreads = EX_DELAYED_WORK_THREADS;
522 CriticalThreads = EX_CRITICAL_WORK_THREADS;
523
524 /* Protect against greedy registry modifications */
525 ExpAdditionalDelayedWorkerThreads =
526 min(ExpAdditionalDelayedWorkerThreads, 16);
527 ExpAdditionalCriticalWorkerThreads =
528 min(ExpAdditionalCriticalWorkerThreads, 16);
529
530 /* Calculate final count */
531 DelayedThreads += ExpAdditionalDelayedWorkerThreads;
532 CriticalThreads += ExpAdditionalCriticalWorkerThreads;
533
534 /* Initialize the Array */
535 for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++)
536 {
537 /* Clear the structure and initialize the queue */
538 RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
539 KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
540 }
541
542 /* Dynamic threads are only used for the critical queue */
543 ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE;
544
545 /* Initialize the balance set manager events */
546 KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE);
547 KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
548 NotificationEvent,
549 FALSE);
550
551 /* Create the built-in worker threads for the critical queue */
552 for (i = 0; i < CriticalThreads; i++)
553 {
554 /* Create the thread */
555 ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
556 ExpCriticalWorkerThreads++;
557 }
558
559 /* Create the built-in worker threads for the delayed queue */
560 for (i = 0; i < DelayedThreads; i++)
561 {
562 /* Create the thread */
563 ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
564 ExpDelayedWorkerThreads++;
565 }
566
567 /* Create the built-in worker thread for the hypercritical queue */
568 ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
569
570 /* Create the balance set manager thread */
571 PsCreateSystemThread(&ThreadHandle,
572 THREAD_ALL_ACCESS,
573 NULL,
574 0,
575 NULL,
576 ExpWorkerThreadBalanceManager,
577 NULL);
578
579 /* Get a pointer to it for the shutdown process */
580 ObReferenceObjectByHandle(ThreadHandle,
581 THREAD_ALL_ACCESS,
582 NULL,
583 KernelMode,
584 (PVOID*)&Thread,
585 NULL);
586 ExpWorkerThreadBalanceManagerPtr = Thread;
587
588 /* Close the handle and return */
589 ZwClose(ThreadHandle);
590 }
591
592 /* PUBLIC FUNCTIONS **********************************************************/
593
594 /*++
595 * @name ExQueueWorkItem
596 * @implemented NT4
597 *
598 * The ExQueueWorkItem routine acquires rundown protection for
599 * the specified descriptor.
600 *
601 * @param WorkItem
602 * Pointer to an initialized Work Queue Item structure. This structure
603 * must be located in nonpaged pool memory.
604 *
605 * @param QueueType
606 * Type of the queue to use for this item. Can be one of the following:
607 * - DelayedWorkQueue
608 * - CriticalWorkQueue
609 * - HyperCriticalWorkQueue
610 *
611 * @return None.
612 *
613 * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
614 *
615 * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
616 *
617 *--*/
618 VOID
619 NTAPI
620 ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
621 WORK_QUEUE_TYPE QueueType)
622 {
623 PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
624 ASSERT(QueueType < MaximumWorkQueue);
625 ASSERT(WorkItem->List.Flink == NULL);
626
627 /* Don't try to trick us */
628 if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
629 {
630 /* Bugcheck the system */
631 KEBUGCHECKEX(WORKER_INVALID,
632 1,
633 (ULONG_PTR)WorkItem,
634 (ULONG_PTR)WorkItem->WorkerRoutine,
635 0);
636 }
637
638 /* Insert the Queue */
639 KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
640 ASSERT(!WorkQueue->Info.QueueDisabled);
641
642 /*
643 * Check if we need a new thread. Our decision is as follows:
644 * - This queue type must support Dynamic Threads (duh!)
645 * - It actually has to have unprocessed items
646 * - We have CPUs which could be handling another thread
647 * - We haven't abused our usage of dynamic threads.
648 */
649 if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
650 (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
651 (WorkQueue->WorkerQueue.CurrentCount <
652 WorkQueue->WorkerQueue.MaximumCount) &&
653 (WorkQueue->DynamicThreadCount < 16))
654 {
655 /* Let the balance manager know about it */
656 DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
657 WorkQueue->WorkerQueue.CurrentCount,
658 WorkQueue->WorkerQueue.MaximumCount);
659 KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
660 }
661 }
662
663 /* EOF */
664