2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS Kernel
4 * FILE: ntoskrnl/ex/work.c
5 * PURPOSE: Manage system work queues and worker threads
6 * PROGRAMMER: Alex Ionescu (alex@relsoft.net)
9 /* INCLUDES ******************************************************************/
13 #include <internal/debug.h>
15 #if defined (ALLOC_PRAGMA)
16 #pragma alloc_text(INIT, ExpInitializeWorkerThreads)
19 /* DATA **********************************************************************/
21 /* Number of worker threads for each Queue */
22 #define EX_HYPERCRITICAL_WORK_THREADS 1
23 #define EX_DELAYED_WORK_THREADS 3
24 #define EX_CRITICAL_WORK_THREADS 5
26 /* Magic flag for dynamic worker threads */
27 #define EX_DYNAMIC_WORK_THREAD 0x80000000
29 /* Worker thread priority increments (added to base priority) */
30 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT 7
31 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT 5
32 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT 4
34 /* The actual worker queue array */
35 EX_WORK_QUEUE ExWorkerQueue
[MaximumWorkQueue
];
37 /* Accounting of the total threads and registry hacked threads */
38 ULONG ExpCriticalWorkerThreads
;
39 ULONG ExpDelayedWorkerThreads
;
40 ULONG ExpAdditionalCriticalWorkerThreads
;
41 ULONG ExpAdditionalDelayedWorkerThreads
;
43 /* Future support for stack swapping worker threads */
44 BOOLEAN ExpWorkersCanSwap
;
45 LIST_ENTRY ExpWorkerListHead
;
46 KMUTANT ExpWorkerSwapinMutex
;
48 /* The worker balance set manager events */
49 KEVENT ExpThreadSetManagerEvent
;
50 KEVENT ExpThreadSetManagerShutdownEvent
;
52 /* Thread pointers for future worker thread shutdown support */
53 PETHREAD ExpWorkerThreadBalanceManagerPtr
;
54 PETHREAD ExpLastWorkerThread
;
56 /* PRIVATE FUNCTIONS *********************************************************/
59 * @name ExpWorkerThreadEntryPoint
61 * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
62 * worker thread created by teh system.
65 * Contains the work queue type masked with a flag specifing whether the
66 * thread is dynamic or not.
70 * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
71 * while a static thread will never timeout.
73 * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
74 * active impersonation info, and must not have disabled APCs.
76 * NB: We will re-enable APCs for broken threads but all other cases
77 * will generate a bugcheck.
82 ExpWorkerThreadEntryPoint(IN PVOID Context
)
84 PWORK_QUEUE_ITEM WorkItem
;
85 PLIST_ENTRY QueueEntry
;
86 WORK_QUEUE_TYPE WorkQueueType
;
87 PEX_WORK_QUEUE WorkQueue
;
88 LARGE_INTEGER Timeout
;
89 PLARGE_INTEGER TimeoutPointer
= NULL
;
90 PETHREAD Thread
= PsGetCurrentThread();
91 KPROCESSOR_MODE WaitMode
;
92 EX_QUEUE_WORKER_INFO OldValue
, NewValue
;
94 /* Check if this is a dyamic thread */
95 if ((ULONG_PTR
)Context
& EX_DYNAMIC_WORK_THREAD
)
97 /* It is, which means we will eventually time out after 10 minutes */
98 Timeout
.QuadPart
= Int32x32To64(10, -10000000 * 60);
99 TimeoutPointer
= &Timeout
;
102 /* Get Queue Type and Worker Queue */
103 WorkQueueType
= (WORK_QUEUE_TYPE
)((ULONG_PTR
)Context
&
104 ~EX_DYNAMIC_WORK_THREAD
);
105 WorkQueue
= &ExWorkerQueue
[WorkQueueType
];
107 /* Select the wait mode */
108 WaitMode
= (UCHAR
)WorkQueue
->Info
.WaitMode
;
110 /* Nobody should have initialized this yet, do it now */
111 ASSERT(Thread
->ExWorkerCanWaitUser
== 0);
112 if (WaitMode
== UserMode
) Thread
->ExWorkerCanWaitUser
= TRUE
;
114 /* If we shouldn't swap, disable that feature */
115 if (!ExpWorkersCanSwap
) KeSetKernelStackSwapEnable(FALSE
);
117 /* Set the worker flags */
120 /* Check if the queue is being disabled */
121 if (WorkQueue
->Info
.QueueDisabled
)
123 /* Re-enable stack swapping and kill us */
124 KeSetKernelStackSwapEnable(TRUE
);
125 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN
);
128 /* Increase the worker count */
129 OldValue
= WorkQueue
->Info
;
131 NewValue
.WorkerCount
++;
133 while (InterlockedCompareExchange((PLONG
)&WorkQueue
->Info
,
135 *(PLONG
)&OldValue
) != *(PLONG
)&OldValue
);
137 /* Success, you are now officially a worker thread! */
138 Thread
->ActiveExWorker
= TRUE
;
144 /* Wait for Something to Happen on the Queue */
145 QueueEntry
= KeRemoveQueue(&WorkQueue
->WorkerQueue
,
149 /* Check if we timed out and quit this loop in that case */
150 if ((NTSTATUS
)QueueEntry
== STATUS_TIMEOUT
) break;
152 /* Increment Processed Work Items */
153 InterlockedIncrement((PLONG
)&WorkQueue
->WorkItemsProcessed
);
155 /* Get the Work Item */
156 WorkItem
= CONTAINING_RECORD(QueueEntry
, WORK_QUEUE_ITEM
, List
);
158 /* Make sure nobody is trying to play smart with us */
159 ASSERT((ULONG_PTR
)WorkItem
->WorkerRoutine
> MmUserProbeAddress
);
161 /* Call the Worker Routine */
162 WorkItem
->WorkerRoutine(WorkItem
->Parameter
);
164 /* Make sure APCs are not disabled */
165 if (Thread
->Tcb
.SpecialApcDisable
)
167 /* We're nice and do it behind your back */
168 DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
169 "with APCs disabled!\n",
170 WorkItem
->WorkerRoutine
,
173 Thread
->Tcb
.SpecialApcDisable
= 0;
176 /* Make sure it returned at right IRQL */
177 if (KeGetCurrentIrql() != PASSIVE_LEVEL
)
179 /* It didn't, bugcheck! */
180 KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL
,
181 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
183 (ULONG_PTR
)WorkItem
->Parameter
,
184 (ULONG_PTR
)WorkItem
);
187 /* Make sure it returned with Impersionation Disabled */
188 if (Thread
->ActiveImpersonationInfo
)
190 /* It didn't, bugcheck! */
191 KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD
,
192 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
193 (ULONG_PTR
)WorkItem
->Parameter
,
199 /* This is a dynamic thread. Terminate it unless IRPs are pending */
200 if (!IsListEmpty(&Thread
->IrpList
)) goto ProcessLoop
;
202 /* Don't terminate it if the queue is disabled either */
203 if (WorkQueue
->Info
.QueueDisabled
) goto ProcessLoop
;
205 /* Set the worker flags */
208 /* Decrease the worker count */
209 OldValue
= WorkQueue
->Info
;
211 NewValue
.WorkerCount
--;
213 while (InterlockedCompareExchange((PLONG
)&WorkQueue
->Info
,
215 *(PLONG
)&OldValue
) != *(PLONG
)&OldValue
);
217 /* Decrement dynamic thread count */
218 InterlockedDecrement(&WorkQueue
->DynamicThreadCount
);
220 /* We're not a worker thread anymore */
221 Thread
->ActiveExWorker
= FALSE
;
223 /* Re-enable the stack swap */
224 KeSetKernelStackSwapEnable(TRUE
);
229 * @name ExpCreateWorkerThread
231 * The ExpCreateWorkerThread routine creates a new worker thread for the
235 * Type of the queue to use for this thread. Valid values are:
237 * - CriticalWorkQueue
238 * - HyperCriticalWorkQueue
241 * Specifies whether or not this thread is a dynamic thread.
245 * @remarks HyperCritical work threads run at priority 7; Critical work threads
246 * run at priority 5, and delayed work threads run at priority 4.
248 * This, worker threads cannot pre-empty a normal user-mode thread.
253 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType
,
261 /* Check if this is going to be a dynamic thread */
262 Context
= WorkQueueType
;
264 /* Add the dynamic mask */
265 if (Dynamic
) Context
|= EX_DYNAMIC_WORK_THREAD
;
267 /* Create the System Thread */
268 PsCreateSystemThread(&hThread
,
273 ExpWorkerThreadEntryPoint
,
276 /* If the thread is dynamic */
279 /* Increase the count */
280 InterlockedIncrement(&ExWorkerQueue
[WorkQueueType
].DynamicThreadCount
);
283 /* Set the priority */
284 if (WorkQueueType
== DelayedWorkQueue
)
287 Priority
= EX_DELAYED_QUEUE_PRIORITY_INCREMENT
;
289 else if (WorkQueueType
== CriticalWorkQueue
)
292 Priority
= EX_CRITICAL_QUEUE_PRIORITY_INCREMENT
;
297 Priority
= EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT
;
301 ObReferenceObjectByHandle(hThread
,
302 THREAD_SET_INFORMATION
,
308 /* Set the Priority */
309 KeSetBasePriorityThread(&Thread
->Tcb
, Priority
);
311 /* Dereference and close handle */
312 ObDereferenceObject(Thread
);
317 * @name ExpCheckDynamicThreadCount
319 * The ExpCheckDynamicThreadCount routine checks every queue and creates a
320 * dynamic thread if the queue seems to be deadlocked.
326 * @remarks The algorithm for deciding if a new thread must be created is
327 * based on wether the queue has processed no new items in the last
328 * second, and new items are still enqueued.
333 ExpDetectWorkerThreadDeadlock(VOID
)
336 PEX_WORK_QUEUE Queue
;
338 /* Loop the 3 queues */
339 for (i
= 0; i
< MaximumWorkQueue
; i
++)
342 Queue
= &ExWorkerQueue
[i
];
343 ASSERT(Queue
->DynamicThreadCount
<= 16);
345 /* Check if stuff is on the queue that still is unprocessed */
346 if ((Queue
->QueueDepthLastPass
) &&
347 (Queue
->WorkItemsProcessed
== Queue
->WorkItemsProcessedLastPass
) &&
348 (Queue
->DynamicThreadCount
< 16))
350 /* Stuff is still on the queue and nobody did anything about it */
351 DPRINT1("EX: Work Queue Deadlock detected: %d\n", i
);
352 ExpCreateWorkerThread(i
, TRUE
);
353 DPRINT1("Dynamic threads queued %d\n", Queue
->DynamicThreadCount
);
356 /* Update our data */
357 Queue
->WorkItemsProcessedLastPass
= Queue
->WorkItemsProcessed
;
358 Queue
->QueueDepthLastPass
= KeReadStateQueue(&Queue
->WorkerQueue
);
363 * @name ExpCheckDynamicThreadCount
365 * The ExpCheckDynamicThreadCount routine checks every queue and creates a
366 * dynamic thread if the queue requires one.
372 * @remarks The algorithm for deciding if a new thread must be created is
373 * documented in the ExQueueWorkItem routine.
378 ExpCheckDynamicThreadCount(VOID
)
381 PEX_WORK_QUEUE Queue
;
383 /* Loop the 3 queues */
384 for (i
= 0; i
< MaximumWorkQueue
; i
++)
387 Queue
= &ExWorkerQueue
[i
];
389 /* Check if still need a new thread. See ExQueueWorkItem */
390 if ((Queue
->Info
.MakeThreadsAsNecessary
) &&
391 (!IsListEmpty(&Queue
->WorkerQueue
.EntryListHead
)) &&
392 (Queue
->WorkerQueue
.CurrentCount
<
393 Queue
->WorkerQueue
.MaximumCount
) &&
394 (Queue
->DynamicThreadCount
< 16))
396 /* Create a new thread */
397 DPRINT1("EX: Creating new dynamic thread as requested\n");
398 ExpCreateWorkerThread(i
, TRUE
);
404 * @name ExpWorkerThreadBalanceManager
406 * The ExpWorkerThreadBalanceManager routine is the entrypoint for the
407 * worker thread balance set manager.
414 * @remarks The worker thread balance set manager listens every second, but can
415 * also be woken up by an event when a new thread is needed, or by the
416 * special shutdown event. This thread runs at priority 7.
418 * This routine must run at IRQL == PASSIVE_LEVEL.
423 ExpWorkerThreadBalanceManager(IN PVOID Context
)
426 LARGE_INTEGER Timeout
;
430 UNREFERENCED_PARAMETER(Context
);
432 /* Raise our priority above all other worker threads */
433 KeSetBasePriorityThread(KeGetCurrentThread(),
434 EX_CRITICAL_QUEUE_PRIORITY_INCREMENT
+ 1);
436 /* Setup the timer */
437 KeInitializeTimer(&Timer
);
438 Timeout
.QuadPart
= Int32x32To64(-1, 10000000);
440 /* We'll wait on the periodic timer and also the emergency event */
441 WaitEvents
[0] = &Timer
;
442 WaitEvents
[1] = &ExpThreadSetManagerEvent
;
443 WaitEvents
[2] = &ExpThreadSetManagerShutdownEvent
;
445 /* Start wait loop */
448 /* Wait for the timer */
449 KeSetTimer(&Timer
, Timeout
, NULL
);
450 Status
= KeWaitForMultipleObjects(3,
460 /* Our timer expired. Check for deadlocks */
461 ExpDetectWorkerThreadDeadlock();
463 else if (Status
== 1)
465 /* Someone notified us, verify if we should create a new thread */
466 ExpCheckDynamicThreadCount();
468 else if (Status
== 2)
470 /* We are shutting down. Cancel the timer */
471 DPRINT1("System shutdown\n");
472 KeCancelTimer(&Timer
);
474 /* Make sure we have a final thread */
475 ASSERT(ExpLastWorkerThread
);
478 KeWaitForSingleObject(ExpLastWorkerThread
,
484 /* Dereference it and kill us */
485 ObDereferenceObject(ExpLastWorkerThread
);
486 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN
);
492 * @name ExpInitializeWorkerThreads
494 * The ExpInitializeWorkerThreads routine initializes worker thread and
495 * work queue support.
501 * @remarks This routine is only called once during system initialization.
507 ExpInitializeWorkerThreads(VOID
)
510 ULONG CriticalThreads
, DelayedThreads
;
515 /* Setup the stack swap support */
516 KeInitializeMutex(&ExpWorkerSwapinMutex
, FALSE
);
517 InitializeListHead(&ExpWorkerListHead
);
518 ExpWorkersCanSwap
= TRUE
;
520 /* Set the number of critical and delayed threads. We shouldn't hardcode */
521 DelayedThreads
= EX_DELAYED_WORK_THREADS
;
522 CriticalThreads
= EX_CRITICAL_WORK_THREADS
;
524 /* Protect against greedy registry modifications */
525 ExpAdditionalDelayedWorkerThreads
=
526 min(ExpAdditionalCriticalWorkerThreads
, 16);
527 ExpAdditionalCriticalWorkerThreads
=
528 min(ExpAdditionalCriticalWorkerThreads
, 16);
530 /* Calculate final count */
531 DelayedThreads
+= ExpAdditionalDelayedWorkerThreads
;
532 CriticalThreads
+= ExpAdditionalCriticalWorkerThreads
;
534 /* Initialize the Array */
535 for (WorkQueueType
= 0; WorkQueueType
< MaximumWorkQueue
; WorkQueueType
++)
537 /* Clear the structure and initialize the queue */
538 RtlZeroMemory(&ExWorkerQueue
[WorkQueueType
], sizeof(EX_WORK_QUEUE
));
539 KeInitializeQueue(&ExWorkerQueue
[WorkQueueType
].WorkerQueue
, 0);
542 /* Dynamic threads are only used for the critical queue */
543 ExWorkerQueue
[CriticalWorkQueue
].Info
.MakeThreadsAsNecessary
= TRUE
;
545 /* Initialize the balance set manager events */
546 KeInitializeEvent(&ExpThreadSetManagerEvent
, SynchronizationEvent
, FALSE
);
547 KeInitializeEvent(&ExpThreadSetManagerShutdownEvent
,
551 /* Create the built-in worker threads for the critical queue */
552 for (i
= 0; i
< CriticalThreads
; i
++)
554 /* Create the thread */
555 ExpCreateWorkerThread(CriticalWorkQueue
, FALSE
);
556 ExpCriticalWorkerThreads
++;
559 /* Create the built-in worker threads for the delayed queue */
560 for (i
= 0; i
< DelayedThreads
; i
++)
562 /* Create the thread */
563 ExpCreateWorkerThread(DelayedWorkQueue
, FALSE
);
564 ExpDelayedWorkerThreads
++;
567 /* Create the built-in worker thread for the hypercritical queue */
568 ExpCreateWorkerThread(HyperCriticalWorkQueue
, FALSE
);
570 /* Create the balance set manager thread */
571 PsCreateSystemThread(&ThreadHandle
,
576 ExpWorkerThreadBalanceManager
,
579 /* Get a pointer to it for the shutdown process */
580 ObReferenceObjectByHandle(ThreadHandle
,
586 ExpWorkerThreadBalanceManagerPtr
= Thread
;
588 /* Close the handle and return */
589 ZwClose(ThreadHandle
);
592 /* PUBLIC FUNCTIONS **********************************************************/
595 * @name ExQueueWorkItem
598 * The ExQueueWorkItem routine acquires rundown protection for
599 * the specified descriptor.
602 * Pointer to an initialized Work Queue Item structure. This structure
603 * must be located in nonpaged pool memory.
606 * Type of the queue to use for this item. Can be one of the following:
608 * - CriticalWorkQueue
609 * - HyperCriticalWorkQueue
613 * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
615 * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
620 ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem
,
621 WORK_QUEUE_TYPE QueueType
)
623 PEX_WORK_QUEUE WorkQueue
= &ExWorkerQueue
[QueueType
];
624 ASSERT(QueueType
< MaximumWorkQueue
);
625 ASSERT(WorkItem
->List
.Flink
== NULL
);
627 /* Don't try to trick us */
628 if ((ULONG_PTR
)WorkItem
->WorkerRoutine
< MmUserProbeAddress
)
630 /* Bugcheck the system */
631 KEBUGCHECKEX(WORKER_INVALID
,
634 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
638 /* Insert the Queue */
639 KeInsertQueue(&WorkQueue
->WorkerQueue
, &WorkItem
->List
);
640 ASSERT(!WorkQueue
->Info
.QueueDisabled
);
643 * Check if we need a new thread. Our decision is as follows:
644 * - This queue type must support Dynamic Threads (duh!)
645 * - It actually has to have unprocessed items
646 * - We have CPUs which could be handling another thread
647 * - We haven't abused our usage of dynamic threads.
649 if ((WorkQueue
->Info
.MakeThreadsAsNecessary
) &&
650 (!IsListEmpty(&WorkQueue
->WorkerQueue
.EntryListHead
)) &&
651 (WorkQueue
->WorkerQueue
.CurrentCount
<
652 WorkQueue
->WorkerQueue
.MaximumCount
) &&
653 (WorkQueue
->DynamicThreadCount
< 16))
655 /* Let the balance manager know about it */
656 DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
657 WorkQueue
->WorkerQueue
.CurrentCount
,
658 WorkQueue
->WorkerQueue
.MaximumCount
);
659 KeSetEvent(&ExpThreadSetManagerEvent
, 0, FALSE
);