2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS Kernel
4 * FILE: ntoskrnl/ex/work.c
5 * PURPOSE: Manage system work queues and worker threads
6 * PROGRAMMER: Alex Ionescu (alex@relsoft.net)
9 /* INCLUDES ******************************************************************/
15 #if defined (ALLOC_PRAGMA)
16 #pragma alloc_text(INIT, ExpInitializeWorkerThreads)
19 /* DATA **********************************************************************/
21 /* Number of worker threads for each Queue */
22 #define EX_HYPERCRITICAL_WORK_THREADS 1
23 #define EX_DELAYED_WORK_THREADS 3
24 #define EX_CRITICAL_WORK_THREADS 5
26 /* Magic flag for dynamic worker threads */
27 #define EX_DYNAMIC_WORK_THREAD 0x80000000
29 /* Worker thread priority increments (added to base priority) */
30 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT 7
31 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT 5
32 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT 4
34 /* The actual worker queue array */
35 EX_WORK_QUEUE ExWorkerQueue
[MaximumWorkQueue
];
37 /* Accounting of the total threads and registry hacked threads */
38 ULONG ExpCriticalWorkerThreads
;
39 ULONG ExpDelayedWorkerThreads
;
40 ULONG ExpAdditionalCriticalWorkerThreads
;
41 ULONG ExpAdditionalDelayedWorkerThreads
;
43 /* Future support for stack swapping worker threads */
44 BOOLEAN ExpWorkersCanSwap
;
45 LIST_ENTRY ExpWorkerListHead
;
46 FAST_MUTEX ExpWorkerSwapinMutex
;
48 /* The worker balance set manager events */
49 KEVENT ExpThreadSetManagerEvent
;
50 KEVENT ExpThreadSetManagerShutdownEvent
;
52 /* Thread pointers for future worker thread shutdown support */
53 PETHREAD ExpWorkerThreadBalanceManagerPtr
;
54 PETHREAD ExpLastWorkerThread
;
56 /* PRIVATE FUNCTIONS *********************************************************/
59 * @name ExpWorkerThreadEntryPoint
61 * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
62 * worker thread created by teh system.
65 * Contains the work queue type masked with a flag specifing whether the
66 * thread is dynamic or not.
70 * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
71 * while a static thread will never timeout.
73 * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
74 * active impersonation info, and must not have disabled APCs.
76 * NB: We will re-enable APCs for broken threads but all other cases
77 * will generate a bugcheck.
82 ExpWorkerThreadEntryPoint(IN PVOID Context
)
84 PWORK_QUEUE_ITEM WorkItem
;
85 PLIST_ENTRY QueueEntry
;
86 WORK_QUEUE_TYPE WorkQueueType
;
87 PEX_WORK_QUEUE WorkQueue
;
88 LARGE_INTEGER Timeout
;
89 PLARGE_INTEGER TimeoutPointer
= NULL
;
90 PETHREAD Thread
= PsGetCurrentThread();
91 KPROCESSOR_MODE WaitMode
;
92 EX_QUEUE_WORKER_INFO OldValue
, NewValue
;
94 /* Check if this is a dyamic thread */
95 if ((ULONG_PTR
)Context
& EX_DYNAMIC_WORK_THREAD
)
97 /* It is, which means we will eventually time out after 10 minutes */
98 Timeout
.QuadPart
= Int32x32To64(10, -10000000 * 60);
99 TimeoutPointer
= &Timeout
;
102 /* Get Queue Type and Worker Queue */
103 WorkQueueType
= (WORK_QUEUE_TYPE
)((ULONG_PTR
)Context
&
104 ~EX_DYNAMIC_WORK_THREAD
);
105 WorkQueue
= &ExWorkerQueue
[WorkQueueType
];
107 /* Select the wait mode */
108 WaitMode
= (UCHAR
)WorkQueue
->Info
.WaitMode
;
110 /* Nobody should have initialized this yet, do it now */
111 ASSERT(Thread
->ExWorkerCanWaitUser
== 0);
112 if (WaitMode
== UserMode
) Thread
->ExWorkerCanWaitUser
= TRUE
;
114 /* If we shouldn't swap, disable that feature */
115 if (!ExpWorkersCanSwap
) KeSetKernelStackSwapEnable(FALSE
);
117 /* Set the worker flags */
120 /* Check if the queue is being disabled */
121 if (WorkQueue
->Info
.QueueDisabled
)
123 /* Re-enable stack swapping and kill us */
124 KeSetKernelStackSwapEnable(TRUE
);
125 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN
);
128 /* Increase the worker count */
129 OldValue
= WorkQueue
->Info
;
131 NewValue
.WorkerCount
++;
133 while (InterlockedCompareExchange((PLONG
)&WorkQueue
->Info
,
135 *(PLONG
)&OldValue
) != *(PLONG
)&OldValue
);
137 /* Success, you are now officially a worker thread! */
138 Thread
->ActiveExWorker
= TRUE
;
144 /* Wait for Something to Happen on the Queue */
145 QueueEntry
= KeRemoveQueue(&WorkQueue
->WorkerQueue
,
149 /* Check if we timed out and quit this loop in that case */
150 if ((NTSTATUS
)(ULONG_PTR
)QueueEntry
== STATUS_TIMEOUT
) break;
152 /* Increment Processed Work Items */
153 InterlockedIncrement((PLONG
)&WorkQueue
->WorkItemsProcessed
);
155 /* Get the Work Item */
156 WorkItem
= CONTAINING_RECORD(QueueEntry
, WORK_QUEUE_ITEM
, List
);
158 /* Make sure nobody is trying to play smart with us */
159 ASSERT((ULONG_PTR
)WorkItem
->WorkerRoutine
> MmUserProbeAddress
);
161 /* Call the Worker Routine */
162 WorkItem
->WorkerRoutine(WorkItem
->Parameter
);
164 /* Make sure APCs are not disabled */
165 if (Thread
->Tcb
.SpecialApcDisable
)
167 /* We're nice and do it behind your back */
168 DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
169 "with APCs disabled!\n",
170 WorkItem
->WorkerRoutine
,
173 Thread
->Tcb
.SpecialApcDisable
= 0;
176 /* Make sure it returned at right IRQL */
177 if (KeGetCurrentIrql() != PASSIVE_LEVEL
)
179 /* It didn't, bugcheck! */
180 KeBugCheckEx(WORKER_THREAD_RETURNED_AT_BAD_IRQL
,
181 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
183 (ULONG_PTR
)WorkItem
->Parameter
,
184 (ULONG_PTR
)WorkItem
);
187 /* Make sure it returned with Impersionation Disabled */
188 if (Thread
->ActiveImpersonationInfo
)
190 /* It didn't, bugcheck! */
191 KeBugCheckEx(IMPERSONATING_WORKER_THREAD
,
192 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
193 (ULONG_PTR
)WorkItem
->Parameter
,
199 /* This is a dynamic thread. Terminate it unless IRPs are pending */
200 if (!IsListEmpty(&Thread
->IrpList
)) goto ProcessLoop
;
202 /* Don't terminate it if the queue is disabled either */
203 if (WorkQueue
->Info
.QueueDisabled
) goto ProcessLoop
;
205 /* Set the worker flags */
208 /* Decrease the worker count */
209 OldValue
= WorkQueue
->Info
;
211 NewValue
.WorkerCount
--;
213 while (InterlockedCompareExchange((PLONG
)&WorkQueue
->Info
,
215 *(PLONG
)&OldValue
) != *(PLONG
)&OldValue
);
217 /* Decrement dynamic thread count */
218 InterlockedDecrement(&WorkQueue
->DynamicThreadCount
);
220 /* We're not a worker thread anymore */
221 Thread
->ActiveExWorker
= FALSE
;
223 /* Re-enable the stack swap */
224 KeSetKernelStackSwapEnable(TRUE
);
229 * @name ExpCreateWorkerThread
231 * The ExpCreateWorkerThread routine creates a new worker thread for the
235 * Type of the queue to use for this thread. Valid values are:
237 * - CriticalWorkQueue
238 * - HyperCriticalWorkQueue
241 * Specifies whether or not this thread is a dynamic thread.
245 * @remarks HyperCritical work threads run at priority 7; Critical work threads
246 * run at priority 5, and delayed work threads run at priority 4.
248 * This, worker threads cannot pre-empty a normal user-mode thread.
253 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType
,
261 /* Check if this is going to be a dynamic thread */
262 Context
= WorkQueueType
;
264 /* Add the dynamic mask */
265 if (Dynamic
) Context
|= EX_DYNAMIC_WORK_THREAD
;
267 /* Create the System Thread */
268 PsCreateSystemThread(&hThread
,
273 ExpWorkerThreadEntryPoint
,
274 UlongToPtr(Context
));
276 /* If the thread is dynamic */
279 /* Increase the count */
280 InterlockedIncrement(&ExWorkerQueue
[WorkQueueType
].DynamicThreadCount
);
283 /* Set the priority */
284 if (WorkQueueType
== DelayedWorkQueue
)
287 Priority
= EX_DELAYED_QUEUE_PRIORITY_INCREMENT
;
289 else if (WorkQueueType
== CriticalWorkQueue
)
292 Priority
= EX_CRITICAL_QUEUE_PRIORITY_INCREMENT
;
297 Priority
= EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT
;
301 ObReferenceObjectByHandle(hThread
,
302 THREAD_SET_INFORMATION
,
308 /* Set the Priority */
309 KeSetBasePriorityThread(&Thread
->Tcb
, Priority
);
311 /* Dereference and close handle */
312 ObDereferenceObject(Thread
);
313 ObCloseHandle(hThread
, KernelMode
);
317 * @name ExpCheckDynamicThreadCount
319 * The ExpCheckDynamicThreadCount routine checks every queue and creates a
320 * dynamic thread if the queue seems to be deadlocked.
326 * @remarks The algorithm for deciding if a new thread must be created is
327 * based on wether the queue has processed no new items in the last
328 * second, and new items are still enqueued.
333 ExpDetectWorkerThreadDeadlock(VOID
)
336 PEX_WORK_QUEUE Queue
;
338 /* Loop the 3 queues */
339 for (i
= 0; i
< MaximumWorkQueue
; i
++)
342 Queue
= &ExWorkerQueue
[i
];
343 ASSERT(Queue
->DynamicThreadCount
<= 16);
345 /* Check if stuff is on the queue that still is unprocessed */
346 if ((Queue
->QueueDepthLastPass
) &&
347 (Queue
->WorkItemsProcessed
== Queue
->WorkItemsProcessedLastPass
) &&
348 (Queue
->DynamicThreadCount
< 16))
350 /* Stuff is still on the queue and nobody did anything about it */
351 DPRINT1("EX: Work Queue Deadlock detected: %d\n", i
);
352 ExpCreateWorkerThread(i
, TRUE
);
353 DPRINT1("Dynamic threads queued %d\n", Queue
->DynamicThreadCount
);
356 /* Update our data */
357 Queue
->WorkItemsProcessedLastPass
= Queue
->WorkItemsProcessed
;
358 Queue
->QueueDepthLastPass
= KeReadStateQueue(&Queue
->WorkerQueue
);
363 * @name ExpCheckDynamicThreadCount
365 * The ExpCheckDynamicThreadCount routine checks every queue and creates a
366 * dynamic thread if the queue requires one.
372 * @remarks The algorithm for deciding if a new thread must be created is
373 * documented in the ExQueueWorkItem routine.
378 ExpCheckDynamicThreadCount(VOID
)
381 PEX_WORK_QUEUE Queue
;
383 /* Loop the 3 queues */
384 for (i
= 0; i
< MaximumWorkQueue
; i
++)
387 Queue
= &ExWorkerQueue
[i
];
389 /* Check if still need a new thread. See ExQueueWorkItem */
390 if ((Queue
->Info
.MakeThreadsAsNecessary
) &&
391 (!IsListEmpty(&Queue
->WorkerQueue
.EntryListHead
)) &&
392 (Queue
->WorkerQueue
.CurrentCount
<
393 Queue
->WorkerQueue
.MaximumCount
) &&
394 (Queue
->DynamicThreadCount
< 16))
396 /* Create a new thread */
397 DPRINT1("EX: Creating new dynamic thread as requested\n");
398 ExpCreateWorkerThread(i
, TRUE
);
404 * @name ExpWorkerThreadBalanceManager
406 * The ExpWorkerThreadBalanceManager routine is the entrypoint for the
407 * worker thread balance set manager.
414 * @remarks The worker thread balance set manager listens every second, but can
415 * also be woken up by an event when a new thread is needed, or by the
416 * special shutdown event. This thread runs at priority 7.
418 * This routine must run at IRQL == PASSIVE_LEVEL.
423 ExpWorkerThreadBalanceManager(IN PVOID Context
)
426 LARGE_INTEGER Timeout
;
430 UNREFERENCED_PARAMETER(Context
);
432 /* Raise our priority above all other worker threads */
433 KeSetBasePriorityThread(KeGetCurrentThread(),
434 EX_CRITICAL_QUEUE_PRIORITY_INCREMENT
+ 1);
436 /* Setup the timer */
437 KeInitializeTimer(&Timer
);
438 Timeout
.QuadPart
= Int32x32To64(-1, 10000000);
440 /* We'll wait on the periodic timer and also the emergency event */
441 WaitEvents
[0] = &Timer
;
442 WaitEvents
[1] = &ExpThreadSetManagerEvent
;
443 WaitEvents
[2] = &ExpThreadSetManagerShutdownEvent
;
445 /* Start wait loop */
448 /* Wait for the timer */
449 KeSetTimer(&Timer
, Timeout
, NULL
);
450 Status
= KeWaitForMultipleObjects(3,
460 /* Our timer expired. Check for deadlocks */
461 ExpDetectWorkerThreadDeadlock();
463 else if (Status
== 1)
465 /* Someone notified us, verify if we should create a new thread */
466 ExpCheckDynamicThreadCount();
468 else if (Status
== 2)
470 /* We are shutting down. Cancel the timer */
471 DPRINT1("System shutdown\n");
472 KeCancelTimer(&Timer
);
474 /* Make sure we have a final thread */
475 ASSERT(ExpLastWorkerThread
);
478 KeWaitForSingleObject(ExpLastWorkerThread
,
484 /* Dereference it and kill us */
485 ObDereferenceObject(ExpLastWorkerThread
);
486 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN
);
492 * @name ExpInitializeWorkerThreads
494 * The ExpInitializeWorkerThreads routine initializes worker thread and
495 * work queue support.
501 * @remarks This routine is only called once during system initialization.
507 ExpInitializeWorkerThreads(VOID
)
510 ULONG CriticalThreads
, DelayedThreads
;
515 /* Setup the stack swap support */
516 ExInitializeFastMutex(&ExpWorkerSwapinMutex
);
517 InitializeListHead(&ExpWorkerListHead
);
518 ExpWorkersCanSwap
= TRUE
;
520 /* Set the number of critical and delayed threads. We shouldn't hardcode */
521 DelayedThreads
= EX_DELAYED_WORK_THREADS
;
522 CriticalThreads
= EX_CRITICAL_WORK_THREADS
;
524 /* Protect against greedy registry modifications */
525 ExpAdditionalDelayedWorkerThreads
=
526 min(ExpAdditionalDelayedWorkerThreads
, 16);
527 ExpAdditionalCriticalWorkerThreads
=
528 min(ExpAdditionalCriticalWorkerThreads
, 16);
530 /* Calculate final count */
531 DelayedThreads
+= ExpAdditionalDelayedWorkerThreads
;
532 CriticalThreads
+= ExpAdditionalCriticalWorkerThreads
;
534 /* Initialize the Array */
535 for (WorkQueueType
= 0; WorkQueueType
< MaximumWorkQueue
; WorkQueueType
++)
537 /* Clear the structure and initialize the queue */
538 RtlZeroMemory(&ExWorkerQueue
[WorkQueueType
], sizeof(EX_WORK_QUEUE
));
539 KeInitializeQueue(&ExWorkerQueue
[WorkQueueType
].WorkerQueue
, 0);
542 /* Dynamic threads are only used for the critical queue */
543 ExWorkerQueue
[CriticalWorkQueue
].Info
.MakeThreadsAsNecessary
= TRUE
;
545 /* Initialize the balance set manager events */
546 KeInitializeEvent(&ExpThreadSetManagerEvent
, SynchronizationEvent
, FALSE
);
547 KeInitializeEvent(&ExpThreadSetManagerShutdownEvent
,
551 /* Create the built-in worker threads for the critical queue */
552 for (i
= 0; i
< CriticalThreads
; i
++)
554 /* Create the thread */
555 ExpCreateWorkerThread(CriticalWorkQueue
, FALSE
);
556 ExpCriticalWorkerThreads
++;
559 /* Create the built-in worker threads for the delayed queue */
560 for (i
= 0; i
< DelayedThreads
; i
++)
562 /* Create the thread */
563 ExpCreateWorkerThread(DelayedWorkQueue
, FALSE
);
564 ExpDelayedWorkerThreads
++;
567 /* Create the built-in worker thread for the hypercritical queue */
568 ExpCreateWorkerThread(HyperCriticalWorkQueue
, FALSE
);
570 /* Create the balance set manager thread */
571 PsCreateSystemThread(&ThreadHandle
,
576 ExpWorkerThreadBalanceManager
,
579 /* Get a pointer to it for the shutdown process */
580 ObReferenceObjectByHandle(ThreadHandle
,
586 ExpWorkerThreadBalanceManagerPtr
= Thread
;
588 /* Close the handle and return */
589 ObCloseHandle(ThreadHandle
, KernelMode
);
594 ExpSetSwappingKernelApc(IN PKAPC Apc
,
595 OUT PKNORMAL_ROUTINE
*NormalRoutine
,
596 IN OUT PVOID
*NormalContext
,
597 IN OUT PVOID
*SystemArgument1
,
598 IN OUT PVOID
*SystemArgument2
)
601 PKEVENT Event
= (PKEVENT
)*SystemArgument1
;
603 /* Make sure it's an active worker */
604 if (PsGetCurrentThread()->ActiveExWorker
)
606 /* Read the setting from the context flag */
607 AllowSwap
= (PBOOLEAN
)NormalContext
;
608 KeSetKernelStackSwapEnable(*AllowSwap
);
611 /* Let caller know that we're done */
612 KeSetEvent(Event
, 0, FALSE
);
617 ExSwapinWorkerThreads(IN BOOLEAN AllowSwap
)
620 PETHREAD CurrentThread
= PsGetCurrentThread(), Thread
;
621 PEPROCESS Process
= PsInitialSystemProcess
;
625 /* Initialize an event so we know when we're done */
626 KeInitializeEvent(&Event
, NotificationEvent
, FALSE
);
628 /* Lock this routine */
629 ExAcquireFastMutex(&ExpWorkerSwapinMutex
);
631 /* New threads cannot swap anymore */
632 ExpWorkersCanSwap
= AllowSwap
;
634 /* Loop all threads in the system process */
635 Thread
= PsGetNextProcessThread(Process
, NULL
);
638 /* Skip threads with explicit permission to do this */
639 if (Thread
->ExWorkerCanWaitUser
) goto Next
;
641 /* Check if we reached ourselves */
642 if (Thread
== CurrentThread
)
645 KeSetKernelStackSwapEnable(AllowSwap
);
650 KeInitializeApc(&Apc
,
652 InsertApcEnvironment
,
653 ExpSetSwappingKernelApc
,
658 if (KeInsertQueueApc(&Apc
, &Event
, NULL
, 3))
660 /* Wait for the APC to run */
661 KeWaitForSingleObject(&Event
, Executive
, KernelMode
, FALSE
, NULL
);
662 KeClearEvent(&Event
);
668 Thread
= PsGetNextProcessThread(Process
, Thread
);
671 /* Release the lock */
672 ExReleaseFastMutex(&ExpWorkerSwapinMutex
);
675 /* PUBLIC FUNCTIONS **********************************************************/
678 * @name ExQueueWorkItem
681 * The ExQueueWorkItem routine acquires rundown protection for
682 * the specified descriptor.
685 * Pointer to an initialized Work Queue Item structure. This structure
686 * must be located in nonpaged pool memory.
689 * Type of the queue to use for this item. Can be one of the following:
691 * - CriticalWorkQueue
692 * - HyperCriticalWorkQueue
696 * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
698 * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
703 ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem
,
704 IN WORK_QUEUE_TYPE QueueType
)
706 PEX_WORK_QUEUE WorkQueue
= &ExWorkerQueue
[QueueType
];
707 ASSERT(QueueType
< MaximumWorkQueue
);
708 ASSERT(WorkItem
->List
.Flink
== NULL
);
710 /* Don't try to trick us */
711 if ((ULONG_PTR
)WorkItem
->WorkerRoutine
< MmUserProbeAddress
)
713 /* Bugcheck the system */
714 KeBugCheckEx(WORKER_INVALID
,
717 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
721 /* Insert the Queue */
722 KeInsertQueue(&WorkQueue
->WorkerQueue
, &WorkItem
->List
);
723 ASSERT(!WorkQueue
->Info
.QueueDisabled
);
726 * Check if we need a new thread. Our decision is as follows:
727 * - This queue type must support Dynamic Threads (duh!)
728 * - It actually has to have unprocessed items
729 * - We have CPUs which could be handling another thread
730 * - We haven't abused our usage of dynamic threads.
732 if ((WorkQueue
->Info
.MakeThreadsAsNecessary
) &&
733 (!IsListEmpty(&WorkQueue
->WorkerQueue
.EntryListHead
)) &&
734 (WorkQueue
->WorkerQueue
.CurrentCount
<
735 WorkQueue
->WorkerQueue
.MaximumCount
) &&
736 (WorkQueue
->DynamicThreadCount
< 16))
738 /* Let the balance manager know about it */
739 DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
740 WorkQueue
->WorkerQueue
.CurrentCount
,
741 WorkQueue
->WorkerQueue
.MaximumCount
);
742 KeSetEvent(&ExpThreadSetManagerEvent
, 0, FALSE
);