-/* $Id: work.c,v 1.13 2002/05/14 21:19:16 dwelch Exp $
- *
+/*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
- * FILE: mkernel/kernel/work.c
+ * FILE: ntoskrnl/ex/work.c
* PURPOSE: Manage system work queues
- * PROGRAMMER: David Welch (welch@mcmail.com)
- * REVISION HISTORY:
- * 29/06/98: Created
+ *
+ * PROGRAMMERS: Alex Ionescu - Used correct work queue array and added some fixes and checks.
+ * Gunnar Dalsnes - Implemented
*/
/* INCLUDES ******************************************************************/
-#include <ddk/ntddk.h>
-
-#include <internal/ps.h>
-
+#include <ntoskrnl.h>
#define NDEBUG
#include <internal/debug.h>
/* TYPES *********************************************************************/
-typedef struct _WORK_QUEUE
-{
- /*
- * PURPOSE: Head of the list of waiting work items
- */
- LIST_ENTRY Head;
-
- /*
- * PURPOSE: Sychronize access to the work queue
- */
- KSPIN_LOCK Lock;
-
- /*
- * PURPOSE: Worker threads with nothing to do wait on this event
- */
- KSEMAPHORE Sem;
-
- /*
- * PURPOSE: Thread associated with work queue
- */
- HANDLE Thread[NUMBER_OF_WORKER_THREADS];
-} WORK_QUEUE, *PWORK_QUEUE;
-
/* GLOBALS *******************************************************************/
/*
* PURPOSE: Queue of items waiting to be processed at normal priority
*/
-WORK_QUEUE EiNormalWorkQueue;
-
-WORK_QUEUE EiCriticalWorkQueue;
-
-WORK_QUEUE EiHyperCriticalWorkQueue;
+EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
/* FUNCTIONS ****************************************************************/
-static NTSTATUS STDCALL
-ExWorkerThreadEntryPoint(PVOID context)
/*
* FUNCTION: Entry point for a worker thread
* ARGUMENTS:
* NOTE: To kill a worker thread you must queue an item whose callback
* calls PsTerminateSystemThread
*/
+static
+VOID
+STDCALL
+ExpWorkerThreadEntryPoint(IN PVOID Context)
{
- PWORK_QUEUE queue = (PWORK_QUEUE)context;
- PWORK_QUEUE_ITEM item;
- PLIST_ENTRY current;
-
- for(;;)
- {
- current = ExInterlockedRemoveHeadList(&queue->Head,
- &queue->Lock);
- if (current!=NULL)
- {
- item = CONTAINING_RECORD(current,WORK_QUEUE_ITEM,Entry);
- item->Routine(item->Context);
- }
- else
- {
- KeWaitForSingleObject((PVOID)&queue->Sem,
- Executive,
- KernelMode,
- FALSE,
- NULL);
- DPRINT("Woke from wait\n");
- }
- }
+ PWORK_QUEUE_ITEM WorkItem;
+ PLIST_ENTRY QueueEntry;
+ WORK_QUEUE_TYPE WorkQueueType;
+ PEX_WORK_QUEUE WorkQueue;
+
+ /* Get Queue Type and Worker Queue */
+ WorkQueueType = (WORK_QUEUE_TYPE)Context;
+ WorkQueue = &ExWorkerQueue[WorkQueueType];
+
+ /* Loop forever */
+ while (TRUE) {
+
+ /* Wait for Something to Happen on the Queue */
+ QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode, NULL);
+
+ /* Can't happen since we do a KernelMode wait (and we're a system thread) */
+ ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC);
+
+ /* this should never happen either, since we wait with NULL timeout,
+ * but there's a slight possibility that STATUS_TIMEOUT is returned
+ * at queue rundown in NT (unlikely) -Gunnar
+ */
+ ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT);
+
+ /* Increment Processed Work Items */
+ InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
+
+ /* Get the Work Item */
+ WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
+
+ /* Call the Worker Routine */
+ WorkItem->WorkerRoutine(WorkItem->Parameter);
+
+ /* Make sure it returned at right IRQL */
+ if (KeGetCurrentIrql() != PASSIVE_LEVEL) {
+
+ KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
+ (ULONG_PTR)WorkItem->WorkerRoutine,
+ KeGetCurrentIrql(),
+ (ULONG_PTR)WorkItem->Parameter,
+ (ULONG_PTR)WorkItem);
+ }
+
+ /* Make sure it returned with Impersionation Disabled */
+ if (PsGetCurrentThread()->ActiveImpersonationInfo) {
+
+ KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD,
+ (ULONG_PTR)WorkItem->WorkerRoutine,
+ (ULONG_PTR)WorkItem->Parameter,
+ (ULONG_PTR)WorkItem,
+ 0);
+ }
+ }
}
-static VOID ExInitializeWorkQueue(PWORK_QUEUE WorkQueue,
- KPRIORITY Priority)
+static
+VOID
+STDCALL
+ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType,
+ KPRIORITY Priority)
{
- ULONG i;
- PETHREAD Thread;
-
- InitializeListHead(&WorkQueue->Head);
- KeInitializeSpinLock(&WorkQueue->Lock);
- KeInitializeSemaphore(&WorkQueue->Sem,
- 0,
- 256);
- for (i=0; i<NUMBER_OF_WORKER_THREADS; i++)
- {
- PsCreateSystemThread(&WorkQueue->Thread[i],
- THREAD_ALL_ACCESS,
- NULL,
- NULL,
- NULL,
- ExWorkerThreadEntryPoint,
- WorkQueue);
- ObReferenceObjectByHandle(WorkQueue->Thread[i],
- THREAD_ALL_ACCESS,
- PsThreadType,
- KernelMode,
- (PVOID*)&Thread,
- NULL);
- KeSetPriorityThread(&Thread->Tcb,
- Priority);
- ObDereferenceObject(Thread);
- }
+ ULONG i;
+ PETHREAD Thread;
+ HANDLE hThread;
+
+ /* Loop through how many threads we need to create */
+ for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) {
+
+ /* Create the System Thread */
+ PsCreateSystemThread(&hThread,
+ THREAD_ALL_ACCESS,
+ NULL,
+ NULL,
+ NULL,
+ ExpWorkerThreadEntryPoint,
+ (PVOID)WorkQueueType);
+
+ /* Get the Thread */
+ ObReferenceObjectByHandle(hThread,
+ THREAD_SET_INFORMATION,
+ PsThreadType,
+ KernelMode,
+ (PVOID*)&Thread,
+ NULL);
+
+ /* Set the Priority */
+ KeSetPriorityThread(&Thread->Tcb, Priority);
+
+ /* Dereference and close handle */
+ ObDereferenceObject(Thread);
+ ZwClose(hThread);
+ }
}
-VOID ExInitializeWorkerThreads(VOID)
+VOID
+INIT_FUNCTION
+STDCALL
+ExpInitializeWorkerThreads(VOID)
{
- ExInitializeWorkQueue(&EiNormalWorkQueue,
- LOW_PRIORITY);
- ExInitializeWorkQueue(&EiCriticalWorkQueue,
- LOW_REALTIME_PRIORITY);
- ExInitializeWorkQueue(&EiHyperCriticalWorkQueue,
- HIGH_PRIORITY);
+ ULONG WorkQueueType;
+
+ /* Initialize the Array */
+ for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) {
+
+ RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
+ KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
+ }
+
+ /* Create the built-in worker threads for each work queue */
+ ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY);
+ ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY);
+ ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY);
}
-VOID STDCALL
-ExQueueWorkItem (PWORK_QUEUE_ITEM WorkItem,
- WORK_QUEUE_TYPE QueueType)
/*
+ * @implemented
+ *
* FUNCTION: Inserts a work item in a queue for one of the system worker
* threads to process
* ARGUMENTS:
* WorkItem = Item to insert
* QueueType = Queue to insert it in
*/
+VOID
+STDCALL
+ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
+ WORK_QUEUE_TYPE QueueType)
{
- assert(WorkItem!=NULL);
- ASSERT_IRQL(DISPATCH_LEVEL);
-
- /*
- * Insert the item in the appropiate queue and wake up any thread
- * waiting for something to do
- */
- switch(QueueType)
- {
- case DelayedWorkQueue:
- ExInterlockedInsertTailList(&EiNormalWorkQueue.Head,
- &WorkItem->Entry,
- &EiNormalWorkQueue.Lock);
- KeReleaseSemaphore(&EiNormalWorkQueue.Sem,
- IO_NO_INCREMENT,
- 1,
- FALSE);
- break;
-
- case CriticalWorkQueue:
- ExInterlockedInsertTailList(&EiCriticalWorkQueue.Head,
- &WorkItem->Entry,
- &EiCriticalWorkQueue.Lock);
- KeReleaseSemaphore(&EiCriticalWorkQueue.Sem,
- IO_NO_INCREMENT,
- 1,
- FALSE);
- break;
-
- case HyperCriticalWorkQueue:
- ExInterlockedInsertTailList(&EiHyperCriticalWorkQueue.Head,
- &WorkItem->Entry,
- &EiHyperCriticalWorkQueue.Lock);
- KeReleaseSemaphore(&EiHyperCriticalWorkQueue.Sem,
- IO_NO_INCREMENT,
- 1,
- FALSE);
- break;
-
- }
+ ASSERT(WorkItem!=NULL);
+ ASSERT_IRQL(DISPATCH_LEVEL);
+ ASSERT(WorkItem->List.Flink == NULL);
+
+ /* Insert the Queue */
+ KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue, &WorkItem->List);
}
/* EOF */