2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS Kernel
4 * FILE: ntoskrnl/ex/work.c
5 * PURPOSE: Manage system work queues and worker threads
6 * PROGRAMMER: Alex Ionescu (alex@relsoft.net)
9 /* INCLUDES ******************************************************************/
15 #if defined (ALLOC_PRAGMA)
16 #pragma alloc_text(INIT, ExpInitializeWorkerThreads)
19 /* DATA **********************************************************************/
21 /* Number of worker threads for each Queue */
22 #define EX_HYPERCRITICAL_WORK_THREADS 1
23 #define EX_DELAYED_WORK_THREADS 3
24 #define EX_CRITICAL_WORK_THREADS 5
26 /* Magic flag for dynamic worker threads */
27 #define EX_DYNAMIC_WORK_THREAD 0x80000000
29 /* Worker thread priority increments (added to base priority) */
30 #define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT 7
31 #define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT 5
32 #define EX_DELAYED_QUEUE_PRIORITY_INCREMENT 4
34 /* The actual worker queue array */
35 EX_WORK_QUEUE ExWorkerQueue
[MaximumWorkQueue
];
37 /* Accounting of the total threads and registry hacked threads */
38 ULONG ExCriticalWorkerThreads
;
39 ULONG ExDelayedWorkerThreads
;
40 ULONG ExpAdditionalCriticalWorkerThreads
;
41 ULONG ExpAdditionalDelayedWorkerThreads
;
43 /* Future support for stack swapping worker threads */
44 BOOLEAN ExpWorkersCanSwap
;
45 LIST_ENTRY ExpWorkerListHead
;
46 FAST_MUTEX ExpWorkerSwapinMutex
;
48 /* The worker balance set manager events */
49 KEVENT ExpThreadSetManagerEvent
;
50 KEVENT ExpThreadSetManagerShutdownEvent
;
52 /* Thread pointers for future worker thread shutdown support */
53 PETHREAD ExpWorkerThreadBalanceManagerPtr
;
54 PETHREAD ExpLastWorkerThread
;
56 /* PRIVATE FUNCTIONS *********************************************************/
59 * @name ExpWorkerThreadEntryPoint
61 * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
62 * worker thread created by teh system.
65 * Contains the work queue type masked with a flag specifing whether the
66 * thread is dynamic or not.
70 * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
71 * while a static thread will never timeout.
73 * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
74 * active impersonation info, and must not have disabled APCs.
76 * NB: We will re-enable APCs for broken threads but all other cases
77 * will generate a bugcheck.
82 ExpWorkerThreadEntryPoint(IN PVOID Context
)
84 PWORK_QUEUE_ITEM WorkItem
;
85 PLIST_ENTRY QueueEntry
;
86 WORK_QUEUE_TYPE WorkQueueType
;
87 PEX_WORK_QUEUE WorkQueue
;
88 LARGE_INTEGER Timeout
;
89 PLARGE_INTEGER TimeoutPointer
= NULL
;
90 PETHREAD Thread
= PsGetCurrentThread();
91 KPROCESSOR_MODE WaitMode
;
92 EX_QUEUE_WORKER_INFO OldValue
, NewValue
;
94 /* Check if this is a dyamic thread */
95 if ((ULONG_PTR
)Context
& EX_DYNAMIC_WORK_THREAD
)
97 /* It is, which means we will eventually time out after 10 minutes */
98 Timeout
.QuadPart
= Int32x32To64(10, -10000000 * 60);
99 TimeoutPointer
= &Timeout
;
102 /* Get Queue Type and Worker Queue */
103 WorkQueueType
= (WORK_QUEUE_TYPE
)((ULONG_PTR
)Context
&
104 ~EX_DYNAMIC_WORK_THREAD
);
105 WorkQueue
= &ExWorkerQueue
[WorkQueueType
];
107 /* Select the wait mode */
108 WaitMode
= (UCHAR
)WorkQueue
->Info
.WaitMode
;
110 /* Nobody should have initialized this yet, do it now */
111 ASSERT(Thread
->ExWorkerCanWaitUser
== 0);
112 if (WaitMode
== UserMode
) Thread
->ExWorkerCanWaitUser
= TRUE
;
114 /* If we shouldn't swap, disable that feature */
115 if (!ExpWorkersCanSwap
) KeSetKernelStackSwapEnable(FALSE
);
117 /* Set the worker flags */
120 /* Check if the queue is being disabled */
121 if (WorkQueue
->Info
.QueueDisabled
)
123 /* Re-enable stack swapping and kill us */
124 KeSetKernelStackSwapEnable(TRUE
);
125 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN
);
128 /* Increase the worker count */
129 OldValue
= WorkQueue
->Info
;
131 NewValue
.WorkerCount
++;
133 while (InterlockedCompareExchange((PLONG
)&WorkQueue
->Info
,
135 *(PLONG
)&OldValue
) != *(PLONG
)&OldValue
);
137 /* Success, you are now officially a worker thread! */
138 Thread
->ActiveExWorker
= TRUE
;
144 /* Wait for something to happen on the queue */
145 QueueEntry
= KeRemoveQueue(&WorkQueue
->WorkerQueue
,
149 /* Check if we timed out and quit this loop in that case */
150 if ((NTSTATUS
)(ULONG_PTR
)QueueEntry
== STATUS_TIMEOUT
) break;
152 /* Increment Processed Work Items */
153 InterlockedIncrement((PLONG
)&WorkQueue
->WorkItemsProcessed
);
155 /* Get the Work Item */
156 WorkItem
= CONTAINING_RECORD(QueueEntry
, WORK_QUEUE_ITEM
, List
);
158 /* Make sure nobody is trying to play smart with us */
159 ASSERT((ULONG_PTR
)WorkItem
->WorkerRoutine
> MmUserProbeAddress
);
161 /* Call the Worker Routine */
162 WorkItem
->WorkerRoutine(WorkItem
->Parameter
);
164 /* Make sure APCs are not disabled */
165 if (Thread
->Tcb
.CombinedApcDisable
!= 0)
167 /* We're nice and do it behind your back */
168 DPRINT1("Warning: Broken Worker Thread: %p %p %p came back "
169 "with APCs disabled!\n",
170 WorkItem
->WorkerRoutine
,
173 ASSERT(Thread
->Tcb
.CombinedApcDisable
== 0);
174 Thread
->Tcb
.CombinedApcDisable
= 0;
177 /* Make sure it returned at right IRQL */
178 if (KeGetCurrentIrql() != PASSIVE_LEVEL
)
180 /* It didn't, bugcheck! */
181 KeBugCheckEx(WORKER_THREAD_RETURNED_AT_BAD_IRQL
,
182 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
184 (ULONG_PTR
)WorkItem
->Parameter
,
185 (ULONG_PTR
)WorkItem
);
188 /* Make sure it returned with Impersionation Disabled */
189 if (Thread
->ActiveImpersonationInfo
)
191 /* It didn't, bugcheck! */
192 KeBugCheckEx(IMPERSONATING_WORKER_THREAD
,
193 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
194 (ULONG_PTR
)WorkItem
->Parameter
,
200 /* This is a dynamic thread. Terminate it unless IRPs are pending */
201 if (!IsListEmpty(&Thread
->IrpList
)) goto ProcessLoop
;
203 /* Don't terminate it if the queue is disabled either */
204 if (WorkQueue
->Info
.QueueDisabled
) goto ProcessLoop
;
206 /* Set the worker flags */
209 /* Decrease the worker count */
210 OldValue
= WorkQueue
->Info
;
212 NewValue
.WorkerCount
--;
214 while (InterlockedCompareExchange((PLONG
)&WorkQueue
->Info
,
216 *(PLONG
)&OldValue
) != *(PLONG
)&OldValue
);
218 /* Decrement dynamic thread count */
219 InterlockedDecrement(&WorkQueue
->DynamicThreadCount
);
221 /* We're not a worker thread anymore */
222 Thread
->ActiveExWorker
= FALSE
;
224 /* Re-enable the stack swap */
225 KeSetKernelStackSwapEnable(TRUE
);
230 * @name ExpCreateWorkerThread
232 * The ExpCreateWorkerThread routine creates a new worker thread for the
236 * Type of the queue to use for this thread. Valid values are:
238 * - CriticalWorkQueue
239 * - HyperCriticalWorkQueue
242 * Specifies whether or not this thread is a dynamic thread.
246 * @remarks HyperCritical work threads run at priority 7; Critical work threads
247 * run at priority 5, and delayed work threads run at priority 4.
249 * This, worker threads cannot pre-empty a normal user-mode thread.
254 ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType
,
262 /* Check if this is going to be a dynamic thread */
263 Context
= WorkQueueType
;
265 /* Add the dynamic mask */
266 if (Dynamic
) Context
|= EX_DYNAMIC_WORK_THREAD
;
268 /* Create the System Thread */
269 PsCreateSystemThread(&hThread
,
274 ExpWorkerThreadEntryPoint
,
275 UlongToPtr(Context
));
277 /* If the thread is dynamic */
280 /* Increase the count */
281 InterlockedIncrement(&ExWorkerQueue
[WorkQueueType
].DynamicThreadCount
);
284 /* Set the priority */
285 if (WorkQueueType
== DelayedWorkQueue
)
288 Priority
= EX_DELAYED_QUEUE_PRIORITY_INCREMENT
;
290 else if (WorkQueueType
== CriticalWorkQueue
)
293 Priority
= EX_CRITICAL_QUEUE_PRIORITY_INCREMENT
;
298 Priority
= EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT
;
302 ObReferenceObjectByHandle(hThread
,
303 THREAD_SET_INFORMATION
,
309 /* Set the Priority */
310 KeSetBasePriorityThread(&Thread
->Tcb
, Priority
);
312 /* Dereference and close handle */
313 ObDereferenceObject(Thread
);
314 ObCloseHandle(hThread
, KernelMode
);
318 * @name ExpDetectWorkerThreadDeadlock
320 * The ExpDetectWorkerThreadDeadlock routine checks every queue and creates
321 * a dynamic thread if the queue seems to be deadlocked.
327 * @remarks The algorithm for deciding if a new thread must be created is based
328 * on whether the queue has processed no new items in the last second,
329 * and new items are still enqueued.
334 ExpDetectWorkerThreadDeadlock(VOID
)
337 PEX_WORK_QUEUE Queue
;
339 /* Loop the 3 queues */
340 for (i
= 0; i
< MaximumWorkQueue
; i
++)
343 Queue
= &ExWorkerQueue
[i
];
344 ASSERT(Queue
->DynamicThreadCount
<= 16);
346 /* Check if stuff is on the queue that still is unprocessed */
347 if ((Queue
->QueueDepthLastPass
) &&
348 (Queue
->WorkItemsProcessed
== Queue
->WorkItemsProcessedLastPass
) &&
349 (Queue
->DynamicThreadCount
< 16))
351 /* Stuff is still on the queue and nobody did anything about it */
352 DPRINT1("EX: Work Queue Deadlock detected: %lu\n", i
);
353 ExpCreateWorkerThread(i
, TRUE
);
354 DPRINT1("Dynamic threads queued %d\n", Queue
->DynamicThreadCount
);
357 /* Update our data */
358 Queue
->WorkItemsProcessedLastPass
= Queue
->WorkItemsProcessed
;
359 Queue
->QueueDepthLastPass
= KeReadStateQueue(&Queue
->WorkerQueue
);
364 * @name ExpCheckDynamicThreadCount
366 * The ExpCheckDynamicThreadCount routine checks every queue and creates
367 * a dynamic thread if the queue requires one.
373 * @remarks The algorithm for deciding if a new thread must be created is
374 * documented in the ExQueueWorkItem routine.
379 ExpCheckDynamicThreadCount(VOID
)
382 PEX_WORK_QUEUE Queue
;
384 /* Loop the 3 queues */
385 for (i
= 0; i
< MaximumWorkQueue
; i
++)
388 Queue
= &ExWorkerQueue
[i
];
390 /* Check if still need a new thread. See ExQueueWorkItem */
391 if ((Queue
->Info
.MakeThreadsAsNecessary
) &&
392 (!IsListEmpty(&Queue
->WorkerQueue
.EntryListHead
)) &&
393 (Queue
->WorkerQueue
.CurrentCount
<
394 Queue
->WorkerQueue
.MaximumCount
) &&
395 (Queue
->DynamicThreadCount
< 16))
397 /* Create a new thread */
398 DPRINT1("EX: Creating new dynamic thread as requested\n");
399 ExpCreateWorkerThread(i
, TRUE
);
405 * @name ExpWorkerThreadBalanceManager
407 * The ExpWorkerThreadBalanceManager routine is the entrypoint for the
408 * worker thread balance set manager.
415 * @remarks The worker thread balance set manager listens every second, but can
416 * also be woken up by an event when a new thread is needed, or by the
417 * special shutdown event. This thread runs at priority 7.
419 * This routine must run at IRQL == PASSIVE_LEVEL.
424 ExpWorkerThreadBalanceManager(IN PVOID Context
)
427 LARGE_INTEGER Timeout
;
431 UNREFERENCED_PARAMETER(Context
);
433 /* Raise our priority above all other worker threads */
434 KeSetBasePriorityThread(KeGetCurrentThread(),
435 EX_CRITICAL_QUEUE_PRIORITY_INCREMENT
+ 1);
437 /* Setup the timer */
438 KeInitializeTimer(&Timer
);
439 Timeout
.QuadPart
= Int32x32To64(-1, 10000000);
441 /* We'll wait on the periodic timer and also the emergency event */
442 WaitEvents
[0] = &Timer
;
443 WaitEvents
[1] = &ExpThreadSetManagerEvent
;
444 WaitEvents
[2] = &ExpThreadSetManagerShutdownEvent
;
446 /* Start wait loop */
449 /* Wait for the timer */
450 KeSetTimer(&Timer
, Timeout
, NULL
);
451 Status
= KeWaitForMultipleObjects(3,
461 /* Our timer expired. Check for deadlocks */
462 ExpDetectWorkerThreadDeadlock();
464 else if (Status
== 1)
466 /* Someone notified us, verify if we should create a new thread */
467 ExpCheckDynamicThreadCount();
469 else if (Status
== 2)
471 /* We are shutting down. Cancel the timer */
472 DPRINT1("System shutdown\n");
473 KeCancelTimer(&Timer
);
475 /* Make sure we have a final thread */
476 ASSERT(ExpLastWorkerThread
);
479 KeWaitForSingleObject(ExpLastWorkerThread
,
485 /* Dereference it and kill us */
486 ObDereferenceObject(ExpLastWorkerThread
);
487 PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN
);
492 * If WinDBG wants to attach or kill a user-mode process, and/or
493 * page-in an address region, queue a debugger worker thread.
495 if (ExpDebuggerWork
== WinKdWorkerStart
)
497 ExInitializeWorkItem(&ExpDebuggerWorkItem
, ExpDebuggerWorker
, NULL
);
498 ExpDebuggerWork
= WinKdWorkerInitialized
;
499 ExQueueWorkItem(&ExpDebuggerWorkItem
, DelayedWorkQueue
);
501 // #endif /* _WINKD_ */
506 * @name ExpInitializeWorkerThreads
508 * The ExpInitializeWorkerThreads routine initializes worker thread and
509 * work queue support.
515 * @remarks This routine is only called once during system initialization.
521 ExpInitializeWorkerThreads(VOID
)
524 ULONG CriticalThreads
, DelayedThreads
;
529 /* Setup the stack swap support */
530 ExInitializeFastMutex(&ExpWorkerSwapinMutex
);
531 InitializeListHead(&ExpWorkerListHead
);
532 ExpWorkersCanSwap
= TRUE
;
534 /* Set the number of critical and delayed threads. We shouldn't hardcode */
535 DelayedThreads
= EX_DELAYED_WORK_THREADS
;
536 CriticalThreads
= EX_CRITICAL_WORK_THREADS
;
538 /* Protect against greedy registry modifications */
539 ExpAdditionalDelayedWorkerThreads
=
540 min(ExpAdditionalDelayedWorkerThreads
, 16);
541 ExpAdditionalCriticalWorkerThreads
=
542 min(ExpAdditionalCriticalWorkerThreads
, 16);
544 /* Calculate final count */
545 DelayedThreads
+= ExpAdditionalDelayedWorkerThreads
;
546 CriticalThreads
+= ExpAdditionalCriticalWorkerThreads
;
548 /* Initialize the Array */
549 for (WorkQueueType
= 0; WorkQueueType
< MaximumWorkQueue
; WorkQueueType
++)
551 /* Clear the structure and initialize the queue */
552 RtlZeroMemory(&ExWorkerQueue
[WorkQueueType
], sizeof(EX_WORK_QUEUE
));
553 KeInitializeQueue(&ExWorkerQueue
[WorkQueueType
].WorkerQueue
, 0);
556 /* Dynamic threads are only used for the critical queue */
557 ExWorkerQueue
[CriticalWorkQueue
].Info
.MakeThreadsAsNecessary
= TRUE
;
559 /* Initialize the balance set manager events */
560 KeInitializeEvent(&ExpThreadSetManagerEvent
, SynchronizationEvent
, FALSE
);
561 KeInitializeEvent(&ExpThreadSetManagerShutdownEvent
,
565 /* Create the built-in worker threads for the critical queue */
566 for (i
= 0; i
< CriticalThreads
; i
++)
568 /* Create the thread */
569 ExpCreateWorkerThread(CriticalWorkQueue
, FALSE
);
570 ExCriticalWorkerThreads
++;
573 /* Create the built-in worker threads for the delayed queue */
574 for (i
= 0; i
< DelayedThreads
; i
++)
576 /* Create the thread */
577 ExpCreateWorkerThread(DelayedWorkQueue
, FALSE
);
578 ExDelayedWorkerThreads
++;
581 /* Create the built-in worker thread for the hypercritical queue */
582 ExpCreateWorkerThread(HyperCriticalWorkQueue
, FALSE
);
584 /* Create the balance set manager thread */
585 PsCreateSystemThread(&ThreadHandle
,
590 ExpWorkerThreadBalanceManager
,
593 /* Get a pointer to it for the shutdown process */
594 ObReferenceObjectByHandle(ThreadHandle
,
600 ExpWorkerThreadBalanceManagerPtr
= Thread
;
602 /* Close the handle and return */
603 ObCloseHandle(ThreadHandle
, KernelMode
);
608 ExpSetSwappingKernelApc(IN PKAPC Apc
,
609 OUT PKNORMAL_ROUTINE
*NormalRoutine
,
610 IN OUT PVOID
*NormalContext
,
611 IN OUT PVOID
*SystemArgument1
,
612 IN OUT PVOID
*SystemArgument2
)
615 PKEVENT Event
= (PKEVENT
)*SystemArgument1
;
617 /* Make sure it's an active worker */
618 if (PsGetCurrentThread()->ActiveExWorker
)
620 /* Read the setting from the context flag */
621 AllowSwap
= (PBOOLEAN
)NormalContext
;
622 KeSetKernelStackSwapEnable(*AllowSwap
);
625 /* Let caller know that we're done */
626 KeSetEvent(Event
, 0, FALSE
);
631 ExSwapinWorkerThreads(IN BOOLEAN AllowSwap
)
634 PETHREAD CurrentThread
= PsGetCurrentThread(), Thread
;
635 PEPROCESS Process
= PsInitialSystemProcess
;
639 /* Initialize an event so we know when we're done */
640 KeInitializeEvent(&Event
, NotificationEvent
, FALSE
);
642 /* Lock this routine */
643 ExAcquireFastMutex(&ExpWorkerSwapinMutex
);
645 /* New threads cannot swap anymore */
646 ExpWorkersCanSwap
= AllowSwap
;
648 /* Loop all threads in the system process */
649 Thread
= PsGetNextProcessThread(Process
, NULL
);
652 /* Skip threads with explicit permission to do this */
653 if (Thread
->ExWorkerCanWaitUser
) goto Next
;
655 /* Check if we reached ourselves */
656 if (Thread
== CurrentThread
)
659 KeSetKernelStackSwapEnable(AllowSwap
);
664 KeInitializeApc(&Apc
,
666 InsertApcEnvironment
,
667 ExpSetSwappingKernelApc
,
672 if (KeInsertQueueApc(&Apc
, &Event
, NULL
, 3))
674 /* Wait for the APC to run */
675 KeWaitForSingleObject(&Event
, Executive
, KernelMode
, FALSE
, NULL
);
676 KeClearEvent(&Event
);
682 Thread
= PsGetNextProcessThread(Process
, Thread
);
685 /* Release the lock */
686 ExReleaseFastMutex(&ExpWorkerSwapinMutex
);
689 /* PUBLIC FUNCTIONS **********************************************************/
692 * @name ExQueueWorkItem
695 * The ExQueueWorkItem routine acquires rundown protection for
696 * the specified descriptor.
699 * Pointer to an initialized Work Queue Item structure. This structure
700 * must be located in nonpaged pool memory.
703 * Type of the queue to use for this item. Can be one of the following:
705 * - CriticalWorkQueue
706 * - HyperCriticalWorkQueue
710 * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
712 * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
717 ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem
,
718 IN WORK_QUEUE_TYPE QueueType
)
720 PEX_WORK_QUEUE WorkQueue
= &ExWorkerQueue
[QueueType
];
721 ASSERT(QueueType
< MaximumWorkQueue
);
722 ASSERT(WorkItem
->List
.Flink
== NULL
);
724 /* Don't try to trick us */
725 if ((ULONG_PTR
)WorkItem
->WorkerRoutine
< MmUserProbeAddress
)
727 /* Bugcheck the system */
728 KeBugCheckEx(WORKER_INVALID
,
731 (ULONG_PTR
)WorkItem
->WorkerRoutine
,
735 /* Insert the Queue */
736 KeInsertQueue(&WorkQueue
->WorkerQueue
, &WorkItem
->List
);
737 ASSERT(!WorkQueue
->Info
.QueueDisabled
);
740 * Check if we need a new thread. Our decision is as follows:
741 * - This queue type must support Dynamic Threads (duh!)
742 * - It actually has to have unprocessed items
743 * - We have CPUs which could be handling another thread
744 * - We haven't abused our usage of dynamic threads.
746 if ((WorkQueue
->Info
.MakeThreadsAsNecessary
) &&
747 (!IsListEmpty(&WorkQueue
->WorkerQueue
.EntryListHead
)) &&
748 (WorkQueue
->WorkerQueue
.CurrentCount
<
749 WorkQueue
->WorkerQueue
.MaximumCount
) &&
750 (WorkQueue
->DynamicThreadCount
< 16))
752 /* Let the balance manager know about it */
753 DPRINT1("Requesting a new thread. CurrentCount: %lu. MaxCount: %lu\n",
754 WorkQueue
->WorkerQueue
.CurrentCount
,
755 WorkQueue
->WorkerQueue
.MaximumCount
);
756 KeSetEvent(&ExpThreadSetManagerEvent
, 0, FALSE
);