2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS system libraries
4 * PURPOSE: Work Item implementation
5 * FILE: lib/rtl/workitem.c
9 /* INCLUDES *****************************************************************/
16 /* FUNCTIONS ***************************************************************/
20 RtlpStartThread(IN PTHREAD_START_ROUTINE Function
,
22 OUT PHANDLE ThreadHandle
)
24 /* Create a native worker thread -- used for SMSS, CSRSS, etc... */
25 return RtlCreateUserThread(NtCurrentProcess(),
39 RtlpExitThread(IN NTSTATUS ExitStatus
)
41 /* Kill a native worker thread -- used for SMSS, CSRSS, etc... */
42 return NtTerminateThread(NtCurrentThread(), ExitStatus
);
45 PRTL_START_POOL_THREAD RtlpStartThreadFunc
= RtlpStartThread
;
46 PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc
= RtlpExitThread
;
48 #define MAX_WORKERTHREADS 0x100
49 #define WORKERTHREAD_CREATION_THRESHOLD 0x5
51 typedef struct _RTLP_IOWORKERTHREAD
56 } RTLP_IOWORKERTHREAD
, *PRTLP_IOWORKERTHREAD
;
58 typedef struct _RTLP_WORKITEM
60 WORKERCALLBACKFUNC Function
;
64 } RTLP_WORKITEM
, *PRTLP_WORKITEM
;
66 static LONG ThreadPoolInitialized
= 0;
67 static RTL_CRITICAL_SECTION ThreadPoolLock
;
68 static PRTLP_IOWORKERTHREAD PersistentIoThread
;
69 static LIST_ENTRY ThreadPoolIOWorkerThreadsList
;
70 static HANDLE ThreadPoolCompletionPort
;
71 static LONG ThreadPoolWorkerThreads
;
72 static LONG ThreadPoolWorkerThreadsRequests
;
73 static LONG ThreadPoolWorkerThreadsLongRequests
;
74 static LONG ThreadPoolIOWorkerThreads
;
75 static LONG ThreadPoolIOWorkerThreadsRequests
;
76 static LONG ThreadPoolIOWorkerThreadsLongRequests
;
78 #define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1)
81 RtlpInitializeThreadPool(VOID
)
83 NTSTATUS Status
= STATUS_SUCCESS
;
88 InitStatus
= InterlockedCompareExchange(&ThreadPoolInitialized
,
93 /* We're the first thread to initialize the thread pool */
95 InitializeListHead(&ThreadPoolIOWorkerThreadsList
);
97 PersistentIoThread
= NULL
;
99 ThreadPoolWorkerThreads
= 0;
100 ThreadPoolWorkerThreadsRequests
= 0;
101 ThreadPoolWorkerThreadsLongRequests
= 0;
102 ThreadPoolIOWorkerThreads
= 0;
103 ThreadPoolIOWorkerThreadsRequests
= 0;
104 ThreadPoolIOWorkerThreadsLongRequests
= 0;
106 /* Initialize the lock */
107 Status
= RtlInitializeCriticalSection(&ThreadPoolLock
);
108 if (!NT_SUCCESS(Status
))
111 /* Create the complection port */
112 Status
= NtCreateIoCompletion(&ThreadPoolCompletionPort
,
113 IO_COMPLETION_ALL_ACCESS
,
116 if (!NT_SUCCESS(Status
))
118 RtlDeleteCriticalSection(&ThreadPoolLock
);
123 /* Initialization done */
124 InterlockedExchange(&ThreadPoolInitialized
,
128 else if (InitStatus
== 2)
130 LARGE_INTEGER Timeout
;
132 /* Another thread is currently initializing the thread pool!
133 Poll after a short period of time to see if the initialization
136 Timeout
.QuadPart
= -10000000LL; /* Wait for a second */
137 NtDelayExecution(FALSE
,
140 } while (InitStatus
!= 1);
146 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle
)
150 Status
= NtOpenThreadToken(NtCurrentThread(),
154 if (Status
== STATUS_NO_TOKEN
|| Status
== STATUS_CANT_OPEN_ANONYMOUS
)
157 Status
= STATUS_SUCCESS
;
164 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine
)
168 LARGE_INTEGER Timeout
;
169 volatile LONG WorkerInitialized
= 0;
171 Timeout
.QuadPart
= -10000LL; /* Wait for 100ms */
173 /* Start the thread */
174 Status
= RtlpStartThreadFunc(StartRoutine
, (PVOID
)&WorkerInitialized
, &ThreadHandle
);
175 if (NT_SUCCESS(Status
))
177 NtResumeThread(ThreadHandle
, NULL
);
179 /* Poll until the thread got a chance to initialize */
180 while (WorkerInitialized
== 0)
182 NtDelayExecution(FALSE
,
186 NtClose(ThreadHandle
);
194 RtlpExecuteWorkItem(IN OUT PVOID NormalContext
,
195 IN OUT PVOID SystemArgument1
,
196 IN OUT PVOID SystemArgument2
)
199 BOOLEAN Impersonated
= FALSE
;
200 RTLP_WORKITEM WorkItem
= *(volatile RTLP_WORKITEM
*)SystemArgument2
;
202 RtlFreeHeap(RtlGetProcessHeap(),
206 if (WorkItem
.TokenHandle
!= NULL
)
208 Status
= NtSetInformationThread(NtCurrentThread(),
209 ThreadImpersonationToken
,
210 &WorkItem
.TokenHandle
,
213 NtClose(WorkItem
.TokenHandle
);
215 if (NT_SUCCESS(Status
))
223 DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem
.Function
, WorkItem
.Context
, WorkItem
.TokenHandle
);
225 /* Execute the function */
226 WorkItem
.Function(WorkItem
.Context
);
228 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER
)
230 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem
.Function
);
236 WorkItem
.TokenHandle
= NULL
;
237 Status
= NtSetInformationThread(NtCurrentThread(),
238 ThreadImpersonationToken
,
239 &WorkItem
.TokenHandle
,
241 if (!NT_SUCCESS(Status
))
243 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status
);
247 /* update the requests counter */
248 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests
);
250 if (WorkItem
.Flags
& WT_EXECUTELONGFUNCTION
)
252 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests
);
258 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem
)
260 NTSTATUS Status
= STATUS_SUCCESS
;
262 InterlockedIncrement(&ThreadPoolWorkerThreadsRequests
);
264 if (WorkItem
->Flags
& WT_EXECUTELONGFUNCTION
)
266 InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests
);
269 if (WorkItem
->Flags
& WT_EXECUTEINPERSISTENTTHREAD
)
271 Status
= RtlpInitializeTimerThread();
273 if (NT_SUCCESS(Status
))
275 /* Queue an APC in the timer thread */
276 Status
= NtQueueApcThread(TimerThreadHandle
,
285 /* Queue an IO completion message */
286 Status
= NtSetIoCompletion(ThreadPoolCompletionPort
,
293 if (!NT_SUCCESS(Status
))
295 InterlockedDecrement(&ThreadPoolWorkerThreadsRequests
);
297 if (WorkItem
->Flags
& WT_EXECUTELONGFUNCTION
)
299 InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests
);
308 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext
,
309 IN OUT PVOID SystemArgument1
,
310 IN OUT PVOID SystemArgument2
)
313 BOOLEAN Impersonated
= FALSE
;
314 PRTLP_IOWORKERTHREAD IoThread
= (PRTLP_IOWORKERTHREAD
)NormalContext
;
315 RTLP_WORKITEM WorkItem
= *(volatile RTLP_WORKITEM
*)SystemArgument2
;
317 ASSERT(IoThread
!= NULL
);
319 RtlFreeHeap(RtlGetProcessHeap(),
323 if (WorkItem
.TokenHandle
!= NULL
)
325 Status
= NtSetInformationThread(NtCurrentThread(),
326 ThreadImpersonationToken
,
327 &WorkItem
.TokenHandle
,
330 NtClose(WorkItem
.TokenHandle
);
332 if (NT_SUCCESS(Status
))
340 DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem
.Function
, WorkItem
.Context
, WorkItem
.TokenHandle
);
342 /* Execute the function */
343 WorkItem
.Function(WorkItem
.Context
);
345 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER
)
347 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem
.Function
);
353 WorkItem
.TokenHandle
= NULL
;
354 Status
= NtSetInformationThread(NtCurrentThread(),
355 ThreadImpersonationToken
,
356 &WorkItem
.TokenHandle
,
358 if (!NT_SUCCESS(Status
))
360 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status
);
364 /* remove the long function flag */
365 if (WorkItem
.Flags
& WT_EXECUTELONGFUNCTION
)
367 Status
= RtlEnterCriticalSection(&ThreadPoolLock
);
368 if (NT_SUCCESS(Status
))
370 IoThread
->Flags
&= ~WT_EXECUTELONGFUNCTION
;
371 RtlLeaveCriticalSection(&ThreadPoolLock
);
375 /* update the requests counter */
376 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests
);
378 if (WorkItem
.Flags
& WT_EXECUTELONGFUNCTION
)
380 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests
);
385 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem
)
387 PLIST_ENTRY CurrentEntry
;
388 PRTLP_IOWORKERTHREAD IoThread
= NULL
;
389 NTSTATUS Status
= STATUS_SUCCESS
;
391 if (WorkItem
->Flags
& WT_EXECUTEINPERSISTENTIOTHREAD
)
393 if (PersistentIoThread
!= NULL
)
395 /* We already have a persistent IO worker thread */
396 IoThread
= PersistentIoThread
;
400 /* We're not aware of any persistent IO worker thread. Search for a unused
401 worker thread that doesn't have a long function queued */
402 CurrentEntry
= ThreadPoolIOWorkerThreadsList
.Flink
;
403 while (CurrentEntry
!= &ThreadPoolIOWorkerThreadsList
)
405 IoThread
= CONTAINING_RECORD(CurrentEntry
,
409 if (!(IoThread
->Flags
& WT_EXECUTELONGFUNCTION
))
412 CurrentEntry
= CurrentEntry
->Flink
;
415 if (CurrentEntry
!= &ThreadPoolIOWorkerThreadsList
)
417 /* Found a worker thread we can use. */
418 ASSERT(IoThread
!= NULL
);
420 IoThread
->Flags
|= WT_EXECUTEINPERSISTENTIOTHREAD
;
421 PersistentIoThread
= IoThread
;
425 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
426 return STATUS_NO_MEMORY
;
432 /* Find a worker thread that is not currently executing a long function */
433 CurrentEntry
= ThreadPoolIOWorkerThreadsList
.Flink
;
434 while (CurrentEntry
!= &ThreadPoolIOWorkerThreadsList
)
436 IoThread
= CONTAINING_RECORD(CurrentEntry
,
440 if (!(IoThread
->Flags
& WT_EXECUTELONGFUNCTION
))
442 /* if we're trying to queue a long function then make sure we're not dealing
443 with the persistent thread */
444 if ((WorkItem
->Flags
& WT_EXECUTELONGFUNCTION
) && !(IoThread
->Flags
& WT_EXECUTEINPERSISTENTIOTHREAD
))
446 /* found a candidate */
451 CurrentEntry
= CurrentEntry
->Flink
;
454 if (CurrentEntry
== &ThreadPoolIOWorkerThreadsList
)
456 /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
457 if (ThreadPoolIOWorkerThreads
== 0)
459 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n", WorkItem
);
460 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList
));
461 return STATUS_NO_MEMORY
;
465 /* pick the first worker thread */
466 CurrentEntry
= ThreadPoolIOWorkerThreadsList
.Flink
;
467 IoThread
= CONTAINING_RECORD(CurrentEntry
,
471 /* Since this might be the persistent worker thread, don't run as a
473 WorkItem
->Flags
&= ~WT_EXECUTELONGFUNCTION
;
477 /* Move the picked thread to the end of the list. Since we're always searching
478 from the beginning, this improves distribution of work items */
479 RemoveEntryList(&IoThread
->ListEntry
);
480 InsertTailList(&ThreadPoolIOWorkerThreadsList
,
481 &IoThread
->ListEntry
);
484 ASSERT(IoThread
!= NULL
);
486 InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests
);
488 if (WorkItem
->Flags
& WT_EXECUTELONGFUNCTION
)
490 /* We're about to queue a long function, mark the thread */
491 IoThread
->Flags
|= WT_EXECUTELONGFUNCTION
;
493 InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests
);
496 /* It's time to queue the work item */
497 Status
= NtQueueApcThread(IoThread
->ThreadHandle
,
498 RtlpExecuteIoWorkItem
,
502 if (!NT_SUCCESS(Status
))
504 DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem
->Function
);
505 InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests
);
507 if (WorkItem
->Flags
& WT_EXECUTELONGFUNCTION
)
509 InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests
);
517 RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL
)
521 BOOLEAN CreatedHandle
= FALSE
;
522 BOOLEAN IsIoPending
= TRUE
;
524 if (ThreadHandle
== NULL
)
526 Status
= NtDuplicateObject(NtCurrentProcess(),
532 DUPLICATE_SAME_ACCESS
);
533 if (!NT_SUCCESS(Status
))
538 CreatedHandle
= TRUE
;
541 Status
= NtQueryInformationThread(ThreadHandle
,
546 if (NT_SUCCESS(Status
) && IoPending
== 0)
553 NtClose(ThreadHandle
);
561 RtlpIoWorkerThreadProc(IN PVOID Parameter
)
563 volatile RTLP_IOWORKERTHREAD ThreadInfo
;
564 LARGE_INTEGER Timeout
;
566 NTSTATUS Status
= STATUS_SUCCESS
;
568 if (InterlockedIncrement(&ThreadPoolIOWorkerThreads
) > MAX_WORKERTHREADS
)
570 /* Oops, too many worker threads... */
574 /* Get a thread handle to ourselves */
575 Status
= NtDuplicateObject(NtCurrentProcess(),
578 (PHANDLE
)&ThreadInfo
.ThreadHandle
,
581 DUPLICATE_SAME_ACCESS
);
582 if (!NT_SUCCESS(Status
))
584 DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status
);
587 InterlockedDecrement(&ThreadPoolIOWorkerThreads
);
589 /* Signal initialization completion */
590 InterlockedExchange((PLONG
)Parameter
,
593 RtlpExitThreadFunc(Status
);
597 ThreadInfo
.Flags
= 0;
599 /* Insert the thread into the list */
600 InsertHeadList((PLIST_ENTRY
)&ThreadPoolIOWorkerThreadsList
,
601 (PLIST_ENTRY
)&ThreadInfo
.ListEntry
);
603 /* Signal initialization completion */
604 InterlockedExchange((PLONG
)Parameter
,
609 Timeout
.QuadPart
= -50000000LL; /* Wait for 5 seconds by default */
614 /* Perform an alertable wait, the work items are going to be executed as APCs */
615 Status
= NtDelayExecution(TRUE
,
618 /* Loop as long as we executed an APC */
619 } while (Status
!= STATUS_SUCCESS
);
621 /* We timed out, let's see if we're allowed to terminate */
624 Status
= RtlEnterCriticalSection(&ThreadPoolLock
);
625 if (NT_SUCCESS(Status
))
627 if (ThreadInfo
.Flags
& WT_EXECUTEINPERSISTENTIOTHREAD
)
629 /* This thread is supposed to be persistent. Don't terminate! */
630 RtlLeaveCriticalSection(&ThreadPoolLock
);
632 Timeout
.QuadPart
= -0x7FFFFFFFFFFFFFFFLL
;
636 /* FIXME - figure out an effective method to determine if it's appropriate to
637 lower the number of threads. For now let's always terminate if there's
638 at least one thread and no queued items. */
639 Terminate
= (*((volatile LONG
*)&ThreadPoolIOWorkerThreads
) - *((volatile LONG
*)&ThreadPoolIOWorkerThreadsLongRequests
) >= WORKERTHREAD_CREATION_THRESHOLD
) &&
640 (*((volatile LONG
*)&ThreadPoolIOWorkerThreadsRequests
) == 0);
644 /* Prevent termination as long as IO is pending */
645 Terminate
= !RtlpIsIoPending(ThreadInfo
.ThreadHandle
);
650 /* Rundown the thread and unlink it from the list */
651 InterlockedDecrement(&ThreadPoolIOWorkerThreads
);
652 RemoveEntryList((PLIST_ENTRY
)&ThreadInfo
.ListEntry
);
655 RtlLeaveCriticalSection(&ThreadPoolLock
);
659 /* Break the infinite loop and terminate */
660 Status
= STATUS_SUCCESS
;
666 DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status
);
671 NtClose(ThreadInfo
.ThreadHandle
);
672 RtlpExitThreadFunc(Status
);
678 RtlpWorkerThreadProc(IN PVOID Parameter
)
680 LARGE_INTEGER Timeout
;
682 PVOID SystemArgument2
;
683 IO_STATUS_BLOCK IoStatusBlock
;
684 ULONG TimeoutCount
= 0;
685 PKNORMAL_ROUTINE ApcRoutine
;
686 NTSTATUS Status
= STATUS_SUCCESS
;
688 if (InterlockedIncrement(&ThreadPoolWorkerThreads
) > MAX_WORKERTHREADS
)
690 /* Signal initialization completion */
691 InterlockedExchange((PLONG
)Parameter
,
694 /* Oops, too many worker threads... */
695 RtlpExitThreadFunc(Status
);
699 /* Signal initialization completion */
700 InterlockedExchange((PLONG
)Parameter
,
705 Timeout
.QuadPart
= -50000000LL; /* Wait for 5 seconds by default */
707 /* Dequeue a completion message */
708 Status
= NtRemoveIoCompletion(ThreadPoolCompletionPort
,
714 if (Status
== STATUS_SUCCESS
)
720 /* Call the APC routine */
722 (PVOID
)IoStatusBlock
.Information
,
725 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER
)
735 if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock
)))
738 /* FIXME - this should be optimized, check if there's requests, etc */
740 if (Status
== STATUS_TIMEOUT
)
742 /* FIXME - we might want to optimize this */
743 if (TimeoutCount
++ > 2 &&
744 *((volatile LONG
*)&ThreadPoolWorkerThreads
) - *((volatile LONG
*)&ThreadPoolWorkerThreadsLongRequests
) >= WORKERTHREAD_CREATION_THRESHOLD
)
752 RtlLeaveCriticalSection(&ThreadPoolLock
);
756 /* Prevent termination as long as IO is pending */
757 Terminate
= !RtlpIsIoPending(NULL
);
762 InterlockedDecrement(&ThreadPoolWorkerThreads
);
763 Status
= STATUS_SUCCESS
;
769 RtlpExitThreadFunc(Status
);
779 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function
,
780 IN PVOID Context OPTIONAL
,
785 PRTLP_WORKITEM WorkItem
;
787 DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function
, Context
, Flags
);
789 /* Initialize the thread pool if not already initialized */
790 if (!IsThreadPoolInitialized())
792 Status
= RtlpInitializeThreadPool();
794 if (!NT_SUCCESS(Status
))
798 /* Allocate a work item */
799 WorkItem
= RtlAllocateHeap(RtlGetProcessHeap(),
801 sizeof(RTLP_WORKITEM
));
802 if (WorkItem
== NULL
)
803 return STATUS_NO_MEMORY
;
805 WorkItem
->Function
= Function
;
806 WorkItem
->Context
= Context
;
807 WorkItem
->Flags
= Flags
;
809 if (Flags
& WT_TRANSFER_IMPERSONATION
)
811 Status
= RtlpGetImpersonationToken(&WorkItem
->TokenHandle
);
813 if (!NT_SUCCESS(Status
))
815 DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status
);
820 WorkItem
->TokenHandle
= NULL
;
822 Status
= RtlEnterCriticalSection(&ThreadPoolLock
);
823 if (NT_SUCCESS(Status
))
825 if (Flags
& (WT_EXECUTEINIOTHREAD
| WT_EXECUTEINUITHREAD
| WT_EXECUTEINPERSISTENTIOTHREAD
))
827 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
829 FreeWorkers
= ThreadPoolIOWorkerThreads
- ThreadPoolIOWorkerThreadsLongRequests
;
831 if (((Flags
& (WT_EXECUTEINPERSISTENTIOTHREAD
| WT_EXECUTELONGFUNCTION
)) == WT_EXECUTELONGFUNCTION
) &&
832 PersistentIoThread
!= NULL
)
834 /* We shouldn't queue a long function into the persistent IO thread */
838 /* See if it's a good idea to grow the pool */
839 if (ThreadPoolIOWorkerThreads
< MAX_WORKERTHREADS
&&
840 (FreeWorkers
<= 0 || ThreadPoolIOWorkerThreads
- ThreadPoolIOWorkerThreadsRequests
< WORKERTHREAD_CREATION_THRESHOLD
))
842 /* Grow the thread pool */
843 Status
= RtlpStartWorkerThread(RtlpIoWorkerThreadProc
);
845 if (!NT_SUCCESS(Status
) && *((volatile LONG
*)&ThreadPoolIOWorkerThreads
) != 0)
847 /* We failed to create the thread, but there's at least one there so
848 we can at least queue the request */
849 Status
= STATUS_SUCCESS
;
853 if (NT_SUCCESS(Status
))
855 /* Queue a IO worker thread */
856 Status
= RtlpQueueIoWorkerThread(WorkItem
);
861 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
863 FreeWorkers
= ThreadPoolWorkerThreads
- ThreadPoolWorkerThreadsLongRequests
;
865 /* See if it's a good idea to grow the pool */
866 if (ThreadPoolWorkerThreads
< MAX_WORKERTHREADS
&&
867 (FreeWorkers
<= 0 || ThreadPoolWorkerThreads
- ThreadPoolWorkerThreadsRequests
< WORKERTHREAD_CREATION_THRESHOLD
))
869 /* Grow the thread pool */
870 Status
= RtlpStartWorkerThread(RtlpWorkerThreadProc
);
872 if (!NT_SUCCESS(Status
) && *((volatile LONG
*)&ThreadPoolWorkerThreads
) != 0)
874 /* We failed to create the thread, but there's at least one there so
875 we can at least queue the request */
876 Status
= STATUS_SUCCESS
;
880 if (NT_SUCCESS(Status
))
882 /* Queue a normal worker thread */
883 Status
= RtlpQueueWorkerThread(WorkItem
);
887 RtlLeaveCriticalSection(&ThreadPoolLock
);
890 if (!NT_SUCCESS(Status
))
892 if (WorkItem
->TokenHandle
!= NULL
)
894 NtClose(WorkItem
->TokenHandle
);
898 RtlFreeHeap(RtlGetProcessHeap(),
911 RtlSetIoCompletionCallback(IN HANDLE FileHandle
,
912 IN PIO_APC_ROUTINE Callback
,
915 IO_STATUS_BLOCK IoStatusBlock
;
916 FILE_COMPLETION_INFORMATION FileCompletionInfo
;
919 DPRINT("RtlSetIoCompletionCallback(0x%p, 0x%p, 0x%x)\n", FileHandle
, Callback
, Flags
);
921 /* Initialize the thread pool if not already initialized */
922 if (!IsThreadPoolInitialized())
924 Status
= RtlpInitializeThreadPool();
925 if (!NT_SUCCESS(Status
))
929 FileCompletionInfo
.Port
= ThreadPoolCompletionPort
;
930 FileCompletionInfo
.Key
= (PVOID
)Callback
;
932 Status
= NtSetInformationFile(FileHandle
,
935 sizeof(FileCompletionInfo
),
936 FileCompletionInformation
);
946 RtlSetThreadPoolStartFunc(IN PRTL_START_POOL_THREAD StartPoolThread
,
947 IN PRTL_EXIT_POOL_THREAD ExitPoolThread
)
949 RtlpStartThreadFunc
= StartPoolThread
;
950 RtlpExitThreadFunc
= ExitPoolThread
;
951 return STATUS_SUCCESS
;