Fix stack corruption. Thanks to Waxdragon and the fact he uses an -O2 build (hint...
[reactos.git] / reactos / ntoskrnl / ex / work.c
index 1f1b556..5af58d8 100644 (file)
-/* $Id: work.c,v 1.14 2002/09/07 15:12:50 chorns Exp $
- *
+/*
  * COPYRIGHT:          See COPYING in the top level directory
- * PROJECT:            ReactOS kernel
- * FILE:               mkernel/kernel/work.c
- * PURPOSE:            Manage system work queues
- * PROGRAMMER:         David Welch (welch@mcmail.com)
- * REVISION HISTORY:
- *             29/06/98: Created
+ * PROJECT:            ReactOS Kernel
+ * FILE:               ntoskrnl/ex/work.c
+ * PURPOSE:            Manage system work queues and worker threads
+ * PROGRAMMER:         Alex Ionescu (alex@relsoft.net)
  */
 
 /* INCLUDES ******************************************************************/
 
 #include <ntoskrnl.h>
-
 #define NDEBUG
 #include <internal/debug.h>
 
+#if defined (ALLOC_PRAGMA)
+#pragma alloc_text(INIT, ExpInitializeWorkerThreads)
+#endif
+
+/* DATA **********************************************************************/
+
+/* Number of worker threads for each Queue */
+#define EX_HYPERCRITICAL_WORK_THREADS               1
+#define EX_DELAYED_WORK_THREADS                     3
+#define EX_CRITICAL_WORK_THREADS                    5
+
+/* Magic flag for dynamic worker threads */
+#define EX_DYNAMIC_WORK_THREAD                      0x80000000
+
+/* Worker thread priority increments (added to base priority) */
+#define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT   7
+#define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT        5
+#define EX_DELAYED_QUEUE_PRIORITY_INCREMENT         4
+
+/* The actual worker queue array */
+EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
 
-/* DEFINES *******************************************************************/
+/* Accounting of the total threads and registry hacked threads */
+ULONG ExpCriticalWorkerThreads;
+ULONG ExpDelayedWorkerThreads;
+ULONG ExpAdditionalCriticalWorkerThreads;
+ULONG ExpAdditionalDelayedWorkerThreads;
 
-#define NUMBER_OF_WORKER_THREADS   (5)
+/* Future support for stack swapping worker threads */
+BOOLEAN ExpWorkersCanSwap;
+LIST_ENTRY ExpWorkerListHead;
+KMUTANT ExpWorkerSwapinMutex;
 
-/* TYPES *********************************************************************/
+/* The worker balance set manager events */
+KEVENT ExpThreadSetManagerEvent;
+KEVENT ExpThreadSetManagerShutdownEvent;
 
-typedef struct _WORK_QUEUE
+/* Thread pointers for future worker thread shutdown support */
+PETHREAD ExpWorkerThreadBalanceManagerPtr;
+PETHREAD ExpLastWorkerThread;
+
+/* PRIVATE FUNCTIONS *********************************************************/
+
+/*++
+ * @name ExpWorkerThreadEntryPoint
+ *
+ *     The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
+ *     worker thread created by teh system.
+ *
+ * @param Context
+ *        Contains the work queue type masked with a flag specifing whether the
+ *        thread is dynamic or not.
+ *
+ * @return None.
+ *
+ * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
+ *          while a static thread will never timeout.
+ *
+ *          Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
+ *          active impersonation info, and must not have disabled APCs.
+ *
+ *          NB: We will re-enable APCs for broken threads but all other cases
+ *              will generate a bugcheck.
+ *
+ *--*/
+VOID
+NTAPI
+ExpWorkerThreadEntryPoint(IN PVOID Context)
 {
-   /*
-    * PURPOSE: Head of the list of waiting work items
-    */
-   LIST_ENTRY Head;
-   
-   /*
-    * PURPOSE: Sychronize access to the work queue
-    */
-   KSPIN_LOCK Lock;
-   
-   /*
-    * PURPOSE: Worker threads with nothing to do wait on this event
-    */
-   KSEMAPHORE Sem;
-   
-   /*
-    * PURPOSE: Thread associated with work queue
-    */
-   HANDLE Thread[NUMBER_OF_WORKER_THREADS];
-} WORK_QUEUE, *PWORK_QUEUE;
-
-/* GLOBALS *******************************************************************/
+    PWORK_QUEUE_ITEM WorkItem;
+    PLIST_ENTRY QueueEntry;
+    WORK_QUEUE_TYPE WorkQueueType;
+    PEX_WORK_QUEUE WorkQueue;
+    LARGE_INTEGER Timeout;
+    PLARGE_INTEGER TimeoutPointer = NULL;
+    PETHREAD Thread = PsGetCurrentThread();
+    KPROCESSOR_MODE WaitMode;
+    EX_QUEUE_WORKER_INFO OldValue, NewValue;
 
-/*
- * PURPOSE: Queue of items waiting to be processed at normal priority
- */
-WORK_QUEUE EiNormalWorkQueue;
+    /* Check if this is a dyamic thread */
+    if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
+    {
+        /* It is, which means we will eventually time out after 10 minutes */
+        Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
+        TimeoutPointer = &Timeout;
+    }
 
-WORK_QUEUE EiCriticalWorkQueue;
+    /* Get Queue Type and Worker Queue */
+    WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
+                                      ~EX_DYNAMIC_WORK_THREAD);
+    WorkQueue = &ExWorkerQueue[WorkQueueType];
 
-WORK_QUEUE EiHyperCriticalWorkQueue;
+    /* Select the wait mode */
+    WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
 
-/* FUNCTIONS ****************************************************************/
+    /* Nobody should have initialized this yet, do it now */
+    ASSERT(Thread->ExWorkerCanWaitUser == 0);
+    if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
 
-static VOID STDCALL
-ExWorkerThreadEntryPoint(PVOID context)
-/*
- * FUNCTION: Entry point for a worker thread
- * ARGUMENTS:
- *         context = Parameters
- * RETURNS: Status
- * NOTE: To kill a worker thread you must queue an item whose callback
- * calls PsTerminateSystemThread
- */
+    /* If we shouldn't swap, disable that feature */
+    if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
+
+    /* Set the worker flags */
+    do
+    {
+        /* Check if the queue is being disabled */
+        if (WorkQueue->Info.QueueDisabled)
+        {
+            /* Re-enable stack swapping and kill us */
+            KeSetKernelStackSwapEnable(TRUE);
+            PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
+        }
+
+        /* Increase the worker count */
+        OldValue = WorkQueue->Info;
+        NewValue = OldValue;
+        NewValue.WorkerCount++;
+    }
+    while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
+                                      *(PLONG)&NewValue,
+                                      *(PLONG)&OldValue) != *(PLONG)&OldValue);
+
+    /* Success, you are now officially a worker thread! */
+    Thread->ActiveExWorker = TRUE;
+
+    /* Loop forever */
+ProcessLoop:
+    for (;;)
+    {
+        /* Wait for Something to Happen on the Queue */
+        QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
+                                   WaitMode,
+                                   TimeoutPointer);
+
+        /* Check if we timed out and quit this loop in that case */
+        if ((NTSTATUS)QueueEntry == STATUS_TIMEOUT) break;
+
+        /* Increment Processed Work Items */
+        InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
+
+        /* Get the Work Item */
+        WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
+
+        /* Make sure nobody is trying to play smart with us */
+        ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress);
+
+        /* Call the Worker Routine */
+        WorkItem->WorkerRoutine(WorkItem->Parameter);
+
+        /* Make sure APCs are not disabled */
+        if (Thread->Tcb.SpecialApcDisable)
+        {
+            /* We're nice and do it behind your back */
+            DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
+                    "with APCs disabled!\n",
+                    WorkItem->WorkerRoutine,
+                    WorkItem->Parameter,
+                    WorkItem);
+            Thread->Tcb.SpecialApcDisable = 0;
+        }
+
+        /* Make sure it returned at right IRQL */
+        if (KeGetCurrentIrql() != PASSIVE_LEVEL)
+        {
+            /* It didn't, bugcheck! */
+            KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
+                         (ULONG_PTR)WorkItem->WorkerRoutine,
+                         KeGetCurrentIrql(),
+                         (ULONG_PTR)WorkItem->Parameter,
+                         (ULONG_PTR)WorkItem);
+        }
+
+        /* Make sure it returned with Impersionation Disabled */
+        if (Thread->ActiveImpersonationInfo)
+        {
+            /* It didn't, bugcheck! */
+            KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD,
+                         (ULONG_PTR)WorkItem->WorkerRoutine,
+                         (ULONG_PTR)WorkItem->Parameter,
+                         (ULONG_PTR)WorkItem,
+                         0);
+        }
+    }
+
+    /* This is a dynamic thread. Terminate it unless IRPs are pending */
+    if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
+
+    /* Don't terminate it if the queue is disabled either */
+    if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
+
+    /* Set the worker flags */
+    do
+    {
+        /* Decrease the worker count */
+        OldValue = WorkQueue->Info;
+        NewValue = OldValue;
+        NewValue.WorkerCount--;
+    }
+    while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
+                                      *(PLONG)&NewValue,
+                                      *(PLONG)&OldValue) != *(PLONG)&OldValue);
+
+    /* Decrement dynamic thread count */
+    InterlockedDecrement(&WorkQueue->DynamicThreadCount);
+
+    /* We're not a worker thread anymore */
+    Thread->ActiveExWorker = FALSE;
+
+    /* Re-enable the stack swap */
+    KeSetKernelStackSwapEnable(TRUE);
+    return;
+}
+
+/*++
+ * @name ExpCreateWorkerThread
+ *
+ *     The ExpCreateWorkerThread routine creates a new worker thread for the
+ *     specified queue.
+ **
+ * @param QueueType
+ *        Type of the queue to use for this thread. Valid values are:
+ *          - DelayedWorkQueue
+ *          - CriticalWorkQueue
+ *          - HyperCriticalWorkQueue
+ *
+ * @param Dynamic
+ *        Specifies whether or not this thread is a dynamic thread.
+ *
+ * @return None.
+ *
+ * @remarks HyperCritical work threads run at priority 7; Critical work threads
+ *          run at priority 5, and delayed work threads run at priority 4.
+ *
+ *          This, worker threads cannot pre-empty a normal user-mode thread.
+ *
+ *--*/
+VOID
+NTAPI
+ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
+                      IN BOOLEAN Dynamic)
 {
-   PWORK_QUEUE queue = (PWORK_QUEUE)context;
-   PWORK_QUEUE_ITEM item;
-   PLIST_ENTRY current;
-   
-   for(;;)
-     {
-       current = ExInterlockedRemoveHeadList(&queue->Head,
-                                             &queue->Lock);
-       if (current!=NULL)
-         {
-            item = CONTAINING_RECORD(current,WORK_QUEUE_ITEM,List);
-            (item->WorkerRoutine)(item->Parameter);
-         }
-       else
-         {
-            KeWaitForSingleObject((PVOID)&queue->Sem,
-                                  Executive,
-                                  KernelMode,
-                                  FALSE,
-                                  NULL);
-            DPRINT("Woke from wait\n");
-         }
-     }
+    PETHREAD Thread;
+    HANDLE hThread;
+    ULONG Context;
+    KPRIORITY Priority;
+
+    /* Check if this is going to be a dynamic thread */
+    Context = WorkQueueType;
+
+    /* Add the dynamic mask */
+    if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
+
+    /* Create the System Thread */
+    PsCreateSystemThread(&hThread,
+                         THREAD_ALL_ACCESS,
+                         NULL,
+                         NULL,
+                         NULL,
+                         ExpWorkerThreadEntryPoint,
+                         (PVOID)Context);
+
+    /* If the thread is dynamic */
+    if (Dynamic)
+    {
+        /* Increase the count */
+        InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
+    }
+
+    /* Set the priority */
+    if (WorkQueueType == DelayedWorkQueue)
+    {
+        /* Priority == 4 */
+        Priority = EX_DELAYED_QUEUE_PRIORITY_INCREMENT;
+    }
+    else if (WorkQueueType == CriticalWorkQueue)
+    {
+        /* Priority == 5 */
+        Priority = EX_CRITICAL_QUEUE_PRIORITY_INCREMENT;
+    }
+    else
+    {
+        /* Priority == 7 */
+        Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT;
+    }
+
+    /* Get the Thread */
+    ObReferenceObjectByHandle(hThread,
+                              THREAD_SET_INFORMATION,
+                              PsThreadType,
+                              KernelMode,
+                              (PVOID*)&Thread,
+                              NULL);
+
+    /* Set the Priority */
+    KeSetBasePriorityThread(&Thread->Tcb, Priority);
+
+    /* Dereference and close handle */
+    ObDereferenceObject(Thread);
+    ZwClose(hThread);
 }
 
-static VOID ExInitializeWorkQueue(PWORK_QUEUE WorkQueue,
-                                 KPRIORITY Priority)
+/*++
+ * @name ExpCheckDynamicThreadCount
+ *
+ *     The ExpCheckDynamicThreadCount routine checks every queue and creates a
+ *     dynamic thread if the queue seems to be deadlocked.
+ *
+ * @param None
+ *
+ * @return None.
+ *
+ * @remarks The algorithm for deciding if a new thread must be created is
+ *          based on wether the queue has processed no new items in the last
+ *          second, and new items are still enqueued.
+ *
+ *--*/
+VOID
+NTAPI
+ExpDetectWorkerThreadDeadlock(VOID)
 {
-   ULONG i;
-   PETHREAD Thread;
-   
-   InitializeListHead(&WorkQueue->Head);
-   KeInitializeSpinLock(&WorkQueue->Lock);
-   KeInitializeSemaphore(&WorkQueue->Sem,
-                        0,
-                        256);
-   for (i=0; i<NUMBER_OF_WORKER_THREADS; i++)
-     {
-       PsCreateSystemThread(&WorkQueue->Thread[i],
-                            THREAD_ALL_ACCESS,
-                            NULL,
-                            NULL,
-                            NULL,
-                            ExWorkerThreadEntryPoint,
-                            WorkQueue);
-       ObReferenceObjectByHandle(WorkQueue->Thread[i],
-                                 THREAD_ALL_ACCESS,
-                                 PsThreadType,
-                                 KernelMode,
-                                 (PVOID*)&Thread,
-                                 NULL);
-       KeSetPriorityThread(&Thread->Tcb,
-                           Priority);
-       ObDereferenceObject(Thread);
-     }
+    ULONG i;
+    PEX_WORK_QUEUE Queue;
+
+    /* Loop the 3 queues */
+    for (i = 0; i < MaximumWorkQueue; i++)
+    {
+        /* Get the queue */
+        Queue = &ExWorkerQueue[i];
+        ASSERT(Queue->DynamicThreadCount <= 16);
+
+        /* Check if stuff is on the queue that still is unprocessed */
+        if ((Queue->QueueDepthLastPass) &&
+            (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) &&
+            (Queue->DynamicThreadCount < 16))
+        {
+            /* Stuff is still on the queue and nobody did anything about it */
+            DPRINT1("EX: Work Queue Deadlock detected: %d\n", i);
+            ExpCreateWorkerThread(i, TRUE);
+            DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount);
+        }
+
+        /* Update our data */
+        Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
+        Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue);
+    }
 }
 
-VOID ExInitializeWorkerThreads(VOID)
+/*++
+ * @name ExpCheckDynamicThreadCount
+ *
+ *     The ExpCheckDynamicThreadCount routine checks every queue and creates a
+ *     dynamic thread if the queue requires one.
+ *
+ * @param None
+ *
+ * @return None.
+ *
+ * @remarks The algorithm for deciding if a new thread must be created is
+ *          documented in the ExQueueWorkItem routine.
+ *
+ *--*/
+VOID
+NTAPI
+ExpCheckDynamicThreadCount(VOID)
 {
-   ExInitializeWorkQueue(&EiNormalWorkQueue,
-                        LOW_PRIORITY);
-   ExInitializeWorkQueue(&EiCriticalWorkQueue,
-                        LOW_REALTIME_PRIORITY);
-   ExInitializeWorkQueue(&EiHyperCriticalWorkQueue,
-                        HIGH_PRIORITY);
+    ULONG i;
+    PEX_WORK_QUEUE Queue;
+
+    /* Loop the 3 queues */
+    for (i = 0; i < MaximumWorkQueue; i++)
+    {
+        /* Get the queue */
+        Queue = &ExWorkerQueue[i];
+
+        /* Check if still need a new thread. See ExQueueWorkItem */
+        if ((Queue->Info.MakeThreadsAsNecessary) &&
+            (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
+            (Queue->WorkerQueue.CurrentCount <
+             Queue->WorkerQueue.MaximumCount) &&
+            (Queue->DynamicThreadCount < 16))
+        {
+            /* Create a new thread */
+            DPRINT1("EX: Creating new dynamic thread as requested\n");
+            ExpCreateWorkerThread(i, TRUE);
+        }
+    }
 }
 
-VOID STDCALL
-ExQueueWorkItem (PWORK_QUEUE_ITEM      WorkItem,
-                WORK_QUEUE_TYPE                QueueType)
-/*
- * FUNCTION: Inserts a work item in a queue for one of the system worker
- * threads to process
- * ARGUMENTS:
- *        WorkItem = Item to insert
- *        QueueType = Queue to insert it in
- */
+/*++
+ * @name ExpWorkerThreadBalanceManager
+ *
+ *     The ExpWorkerThreadBalanceManager routine is the entrypoint for the
+ *     worker thread balance set manager.
+ *
+ * @param Context
+ *        Unused.
+ *
+ * @return None.
+ *
+ * @remarks The worker thread balance set manager listens every second, but can
+ *          also be woken up by an event when a new thread is needed, or by the
+ *          special shutdown event. This thread runs at priority 7.
+ *
+ *          This routine must run at IRQL == PASSIVE_LEVEL.
+ *
+ *--*/
+VOID
+NTAPI
+ExpWorkerThreadBalanceManager(IN PVOID Context)
+{
+    KTIMER Timer;
+    LARGE_INTEGER Timeout;
+    NTSTATUS Status;
+    PVOID WaitEvents[3];
+    PAGED_CODE();
+    UNREFERENCED_PARAMETER(Context);
+
+    /* Raise our priority above all other worker threads */
+    KeSetBasePriorityThread(KeGetCurrentThread(),
+                            EX_CRITICAL_QUEUE_PRIORITY_INCREMENT + 1);
+
+    /* Setup the timer */
+    KeInitializeTimer(&Timer);
+    Timeout.QuadPart = Int32x32To64(-1, 10000000);
+
+    /* We'll wait on the periodic timer and also the emergency event */
+    WaitEvents[0] = &Timer;
+    WaitEvents[1] = &ExpThreadSetManagerEvent;
+    WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
+
+    /* Start wait loop */
+    for (;;)
+    {
+        /* Wait for the timer */
+        KeSetTimer(&Timer, Timeout, NULL);
+        Status = KeWaitForMultipleObjects(3,
+                                          WaitEvents,
+                                          WaitAny,
+                                          Executive,
+                                          KernelMode,
+                                          FALSE,
+                                          NULL,
+                                          NULL);
+        if (Status == 0)
+        {
+            /* Our timer expired. Check for deadlocks */
+            ExpDetectWorkerThreadDeadlock();
+        }
+        else if (Status == 1)
+        {
+            /* Someone notified us, verify if we should create a new thread */
+            ExpCheckDynamicThreadCount();
+        }
+        else if (Status == 2)
+        {
+            /* We are shutting down. Cancel the timer */
+            DPRINT1("System shutdown\n");
+            KeCancelTimer(&Timer);
+
+            /* Make sure we have a final thread */
+            ASSERT(ExpLastWorkerThread);
+
+            /* Wait for it */
+            KeWaitForSingleObject(ExpLastWorkerThread,
+                                  Executive,
+                                  KernelMode,
+                                  FALSE,
+                                  NULL);
+
+            /* Dereference it and kill us */
+            ObDereferenceObject(ExpLastWorkerThread);
+            PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
+        }
+    }
+}
+
+/*++
+ * @name ExpInitializeWorkerThreads
+ *
+ *     The ExpInitializeWorkerThreads routine initializes worker thread and
+ *     work queue support.
+ *
+ * @param None.
+ *
+ * @return None.
+ *
+ * @remarks This routine is only called once during system initialization.
+ *
+ *--*/
+VOID
+INIT_FUNCTION
+NTAPI
+ExpInitializeWorkerThreads(VOID)
+{
+    ULONG WorkQueueType;
+    ULONG CriticalThreads, DelayedThreads;
+    HANDLE ThreadHandle;
+    PETHREAD Thread;
+    ULONG i;
+
+    /* Setup the stack swap support */
+    KeInitializeMutex(&ExpWorkerSwapinMutex, FALSE);
+    InitializeListHead(&ExpWorkerListHead);
+    ExpWorkersCanSwap = TRUE;
+
+    /* Set the number of critical and delayed threads. We shouldn't hardcode */
+    DelayedThreads = EX_DELAYED_WORK_THREADS;
+    CriticalThreads = EX_CRITICAL_WORK_THREADS;
+
+    /* Protect against greedy registry modifications */
+    ExpAdditionalDelayedWorkerThreads =
+        min(ExpAdditionalDelayedWorkerThreads, 16);
+    ExpAdditionalCriticalWorkerThreads =
+        min(ExpAdditionalCriticalWorkerThreads, 16);
+
+    /* Calculate final count */
+    DelayedThreads += ExpAdditionalDelayedWorkerThreads;
+    CriticalThreads += ExpAdditionalCriticalWorkerThreads;
+
+    /* Initialize the Array */
+    for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++)
+    {
+        /* Clear the structure and initialize the queue */
+        RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
+        KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
+    }
+
+    /* Dynamic threads are only used for the critical queue */
+    ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE;
+
+    /* Initialize the balance set manager events */
+    KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE);
+    KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
+                      NotificationEvent,
+                      FALSE);
+
+    /* Create the built-in worker threads for the critical queue */
+    for (i = 0; i < CriticalThreads; i++)
+    {
+        /* Create the thread */
+        ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
+        ExpCriticalWorkerThreads++;
+    }
+
+    /* Create the built-in worker threads for the delayed queue */
+    for (i = 0; i < DelayedThreads; i++)
+    {
+        /* Create the thread */
+        ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
+        ExpDelayedWorkerThreads++;
+    }
+
+    /* Create the built-in worker thread for the hypercritical queue */
+    ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
+
+    /* Create the balance set manager thread */
+    PsCreateSystemThread(&ThreadHandle,
+                         THREAD_ALL_ACCESS,
+                         NULL,
+                         0,
+                         NULL,
+                         ExpWorkerThreadBalanceManager,
+                         NULL);
+
+    /* Get a pointer to it for the shutdown process */
+    ObReferenceObjectByHandle(ThreadHandle,
+                              THREAD_ALL_ACCESS,
+                              NULL,
+                              KernelMode,
+                              (PVOID*)&Thread,
+                              NULL);
+    ExpWorkerThreadBalanceManagerPtr = Thread;
+
+    /* Close the handle and return */
+    ZwClose(ThreadHandle);
+}
+
+/* PUBLIC FUNCTIONS **********************************************************/
+
+/*++
+ * @name ExQueueWorkItem
+ * @implemented NT4
+ *
+ *     The ExQueueWorkItem routine acquires rundown protection for
+ *     the specified descriptor.
+ *
+ * @param WorkItem
+ *        Pointer to an initialized Work Queue Item structure. This structure
+ *        must be located in nonpaged pool memory.
+ *
+ * @param QueueType
+ *        Type of the queue to use for this item. Can be one of the following:
+ *          - DelayedWorkQueue
+ *          - CriticalWorkQueue
+ *          - HyperCriticalWorkQueue
+ *
+ * @return None.
+ *
+ * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
+ *
+ *          Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
+ *
+ *--*/
+VOID
+NTAPI
+ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
+                WORK_QUEUE_TYPE QueueType)
 {
-   assert(WorkItem!=NULL);
-   ASSERT_IRQL(DISPATCH_LEVEL);
-   
-   /*
-    * Insert the item in the appropiate queue and wake up any thread
-    * waiting for something to do
-    */
-   switch(QueueType)
-     {
-      case DelayedWorkQueue:
-       ExInterlockedInsertTailList(&EiNormalWorkQueue.Head,
-                                   &WorkItem->List,
-                                   &EiNormalWorkQueue.Lock);
-       KeReleaseSemaphore(&EiNormalWorkQueue.Sem,
-                          IO_NO_INCREMENT,
-                          1,
-                          FALSE);
-       break;
-       
-      case CriticalWorkQueue:
-       ExInterlockedInsertTailList(&EiCriticalWorkQueue.Head,
-                                   &WorkItem->List,
-                                   &EiCriticalWorkQueue.Lock);
-       KeReleaseSemaphore(&EiCriticalWorkQueue.Sem,
-                          IO_NO_INCREMENT,
-                          1,
-                          FALSE);
-       break;
-
-      case HyperCriticalWorkQueue:
-       ExInterlockedInsertTailList(&EiHyperCriticalWorkQueue.Head,
-                                   &WorkItem->List,
-                                   &EiHyperCriticalWorkQueue.Lock);
-       KeReleaseSemaphore(&EiHyperCriticalWorkQueue.Sem,
-                          IO_NO_INCREMENT,
-                          1,
-                          FALSE);
-       break;
-       default:
-               assert(FALSE);
-     }
+    PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
+    ASSERT(QueueType < MaximumWorkQueue);
+    ASSERT(WorkItem->List.Flink == NULL);
+
+    /* Don't try to trick us */
+    if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
+    {
+        /* Bugcheck the system */
+        KEBUGCHECKEX(WORKER_INVALID,
+                     1,
+                     (ULONG_PTR)WorkItem,
+                     (ULONG_PTR)WorkItem->WorkerRoutine,
+                     0);
+    }
+
+    /* Insert the Queue */
+    KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
+    ASSERT(!WorkQueue->Info.QueueDisabled);
+
+    /*
+     * Check if we need a new thread. Our decision is as follows:
+     *  - This queue type must support Dynamic Threads (duh!)
+     *  - It actually has to have unprocessed items
+     *  - We have CPUs which could be handling another thread
+     *  - We haven't abused our usage of dynamic threads.
+     */
+    if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
+        (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
+        (WorkQueue->WorkerQueue.CurrentCount <
+         WorkQueue->WorkerQueue.MaximumCount) &&
+        (WorkQueue->DynamicThreadCount < 16))
+    {
+        /* Let the balance manager know about it */
+        DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
+                WorkQueue->WorkerQueue.CurrentCount,
+                WorkQueue->WorkerQueue.MaximumCount);
+        KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
+    }
 }
 
 /* EOF */
+