- Create another branch for networking fixes
[reactos.git] / lib / rtl / workitem.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS system libraries
4 * PURPOSE: Work Item implementation
5 * FILE: lib/rtl/workitem.c
6 * PROGRAMMER:
7 */
8
9 /* INCLUDES *****************************************************************/
10
11 #include <rtl.h>
12
13 #define NDEBUG
14 #include <debug.h>
15
16 /* FUNCTIONS ***************************************************************/
17
18 #define MAX_WORKERTHREADS 0x100
19 #define WORKERTHREAD_CREATION_THRESHOLD 0x5
20
21 typedef struct _RTLP_IOWORKERTHREAD
22 {
23 LIST_ENTRY ListEntry;
24 HANDLE ThreadHandle;
25 ULONG Flags;
26 } RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD;
27
28 typedef struct _RTLP_WORKITEM
29 {
30 WORKERCALLBACKFUNC Function;
31 PVOID Context;
32 ULONG Flags;
33 HANDLE TokenHandle;
34 } RTLP_WORKITEM, *PRTLP_WORKITEM;
35
36 static LONG ThreadPoolInitialized = 0;
37 static RTL_CRITICAL_SECTION ThreadPoolLock;
38 static PRTLP_IOWORKERTHREAD PersistentIoThread;
39 static LIST_ENTRY ThreadPoolIOWorkerThreadsList;
40 static HANDLE ThreadPoolCompletionPort;
41 static LONG ThreadPoolWorkerThreads;
42 static LONG ThreadPoolWorkerThreadsRequests;
43 static LONG ThreadPoolWorkerThreadsLongRequests;
44 static LONG ThreadPoolIOWorkerThreads;
45 static LONG ThreadPoolIOWorkerThreadsRequests;
46 static LONG ThreadPoolIOWorkerThreadsLongRequests;
47
48 #define IsThreadPoolInitialized() ((volatile LONG)ThreadPoolInitialized == 1)
49
50 static NTSTATUS
51 RtlpInitializeThreadPool(VOID)
52 {
53 NTSTATUS Status = STATUS_SUCCESS;
54 LONG InitStatus;
55
56 do
57 {
58 InitStatus = _InterlockedCompareExchange(&ThreadPoolInitialized,
59 2,
60 0);
61 if (InitStatus == 0)
62 {
63 /* We're the first thread to initialize the thread pool */
64
65 InitializeListHead(&ThreadPoolIOWorkerThreadsList);
66
67 PersistentIoThread = NULL;
68
69 ThreadPoolWorkerThreads = 0;
70 ThreadPoolWorkerThreadsRequests = 0;
71 ThreadPoolWorkerThreadsLongRequests = 0;
72 ThreadPoolIOWorkerThreads = 0;
73 ThreadPoolIOWorkerThreadsRequests = 0;
74 ThreadPoolIOWorkerThreadsLongRequests = 0;
75
76 /* Initialize the lock */
77 Status = RtlInitializeCriticalSection(&ThreadPoolLock);
78 if (!NT_SUCCESS(Status))
79 goto Finish;
80
81 /* Create the complection port */
82 Status = NtCreateIoCompletion(&ThreadPoolCompletionPort,
83 IO_COMPLETION_ALL_ACCESS,
84 NULL,
85 0);
86 if (!NT_SUCCESS(Status))
87 {
88 RtlDeleteCriticalSection(&ThreadPoolLock);
89 goto Finish;
90 }
91
92 Finish:
93 /* Initialization done */
94 _InterlockedExchange(&ThreadPoolInitialized,
95 1);
96 break;
97 }
98 else if (InitStatus == 2)
99 {
100 LARGE_INTEGER Timeout;
101
102 /* Another thread is currently initializing the thread pool!
103 Poll after a short period of time to see if the initialization
104 was completed */
105
106 Timeout.QuadPart = -10000000LL; /* Wait for a second */
107 NtDelayExecution(FALSE,
108 &Timeout);
109 }
110 } while (InitStatus != 1);
111
112 return Status;
113 }
114
115 static NTSTATUS
116 RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
117 {
118 NTSTATUS Status;
119
120 Status = NtOpenThreadToken(NtCurrentThread(),
121 TOKEN_IMPERSONATE,
122 TRUE,
123 TokenHandle);
124 if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS)
125 {
126 *TokenHandle = NULL;
127 Status = STATUS_SUCCESS;
128 }
129
130 return Status;
131 }
132
133 static NTSTATUS
134 RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
135 {
136 NTSTATUS Status;
137 HANDLE ThreadHandle;
138 LARGE_INTEGER Timeout;
139 volatile LONG WorkerInitialized = 0;
140
141 Timeout.QuadPart = -10000LL; /* Wait for 100ms */
142
143 /* Start the thread */
144 Status = RtlCreateUserThread(NtCurrentProcess(),
145 NULL,
146 FALSE,
147 0,
148 0,
149 0,
150 StartRoutine,
151 (PVOID)&WorkerInitialized,
152 &ThreadHandle,
153 NULL);
154
155 if (NT_SUCCESS(Status))
156 {
157 /* Poll until the thread got a chance to initialize */
158 while (WorkerInitialized == 0)
159 {
160 NtDelayExecution(FALSE,
161 &Timeout);
162 }
163
164 NtClose(ThreadHandle);
165 }
166
167 return Status;
168 }
169
170 static VOID
171 NTAPI
172 RtlpExecuteWorkItem(IN OUT PVOID NormalContext,
173 IN OUT PVOID SystemArgument1,
174 IN OUT PVOID SystemArgument2)
175 {
176 NTSTATUS Status;
177 BOOLEAN Impersonated = FALSE;
178 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
179
180 RtlFreeHeap(RtlGetProcessHeap(),
181 0,
182 SystemArgument2);
183
184 if (WorkItem.TokenHandle != NULL)
185 {
186 Status = NtSetInformationThread(NtCurrentThread(),
187 ThreadImpersonationToken,
188 &WorkItem.TokenHandle,
189 sizeof(HANDLE));
190
191 NtClose(WorkItem.TokenHandle);
192
193 if (NT_SUCCESS(Status))
194 {
195 Impersonated = TRUE;
196 }
197 }
198
199 _SEH2_TRY
200 {
201 DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
202
203 /* Execute the function */
204 WorkItem.Function(WorkItem.Context);
205 }
206 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
207 {
208 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
209 }
210 _SEH2_END;
211
212 if (Impersonated)
213 {
214 WorkItem.TokenHandle = NULL;
215 Status = NtSetInformationThread(NtCurrentThread(),
216 ThreadImpersonationToken,
217 &WorkItem.TokenHandle,
218 sizeof(HANDLE));
219 if (!NT_SUCCESS(Status))
220 {
221 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
222 }
223 }
224
225 /* update the requests counter */
226 _InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
227
228 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
229 {
230 _InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
231 }
232 }
233
234
235 static NTSTATUS
236 RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
237 {
238 NTSTATUS Status = STATUS_SUCCESS;
239
240 _InterlockedIncrement(&ThreadPoolWorkerThreadsRequests);
241
242 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
243 {
244 _InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests);
245 }
246
247 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD)
248 {
249 Status = RtlpInitializeTimerThread();
250
251 if (NT_SUCCESS(Status))
252 {
253 /* Queue an APC in the timer thread */
254 Status = NtQueueApcThread(TimerThreadHandle,
255 RtlpExecuteWorkItem,
256 NULL,
257 NULL,
258 WorkItem);
259 }
260 }
261 else
262 {
263 /* Queue an IO completion message */
264 Status = NtSetIoCompletion(ThreadPoolCompletionPort,
265 RtlpExecuteWorkItem,
266 WorkItem,
267 STATUS_SUCCESS,
268 0);
269 }
270
271 if (!NT_SUCCESS(Status))
272 {
273 _InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
274
275 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
276 {
277 _InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
278 }
279 }
280
281 return Status;
282 }
283
284 static VOID
285 NTAPI
286 RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,
287 IN OUT PVOID SystemArgument1,
288 IN OUT PVOID SystemArgument2)
289 {
290 NTSTATUS Status;
291 BOOLEAN Impersonated = FALSE;
292 PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
293 RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
294
295 ASSERT(IoThread != NULL);
296
297 RtlFreeHeap(RtlGetProcessHeap(),
298 0,
299 SystemArgument2);
300
301 if (WorkItem.TokenHandle != NULL)
302 {
303 Status = NtSetInformationThread(NtCurrentThread(),
304 ThreadImpersonationToken,
305 &WorkItem.TokenHandle,
306 sizeof(HANDLE));
307
308 NtClose(WorkItem.TokenHandle);
309
310 if (NT_SUCCESS(Status))
311 {
312 Impersonated = TRUE;
313 }
314 }
315
316 _SEH2_TRY
317 {
318 DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
319
320 /* Execute the function */
321 WorkItem.Function(WorkItem.Context);
322 }
323 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
324 {
325 DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
326 }
327 _SEH2_END;
328
329 if (Impersonated)
330 {
331 WorkItem.TokenHandle = NULL;
332 Status = NtSetInformationThread(NtCurrentThread(),
333 ThreadImpersonationToken,
334 &WorkItem.TokenHandle,
335 sizeof(HANDLE));
336 if (!NT_SUCCESS(Status))
337 {
338 DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
339 }
340 }
341
342 /* remove the long function flag */
343 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
344 {
345 Status = RtlEnterCriticalSection(&ThreadPoolLock);
346 if (NT_SUCCESS(Status))
347 {
348 IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
349 RtlLeaveCriticalSection(&ThreadPoolLock);
350 }
351 }
352
353 /* update the requests counter */
354 _InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
355
356 if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
357 {
358 _InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
359 }
360 }
361
362 static NTSTATUS
363 RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
364 {
365 PLIST_ENTRY CurrentEntry;
366 PRTLP_IOWORKERTHREAD IoThread = NULL;
367 NTSTATUS Status = STATUS_SUCCESS;
368
369 if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
370 {
371 if (PersistentIoThread != NULL)
372 {
373 /* We already have a persistent IO worker thread */
374 IoThread = PersistentIoThread;
375 }
376 else
377 {
378 /* We're not aware of any persistent IO worker thread. Search for a unused
379 worker thread that doesn't have a long function queued */
380 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
381 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
382 {
383 IoThread = CONTAINING_RECORD(CurrentEntry,
384 RTLP_IOWORKERTHREAD,
385 ListEntry);
386
387 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
388 break;
389
390 CurrentEntry = CurrentEntry->Flink;
391 }
392
393 if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
394 {
395 /* Found a worker thread we can use. */
396 ASSERT(IoThread != NULL);
397
398 IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD;
399 PersistentIoThread = IoThread;
400 }
401 else
402 {
403 DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
404 return STATUS_NO_MEMORY;
405 }
406 }
407 }
408 else
409 {
410 /* Find a worker thread that is not currently executing a long function */
411 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
412 while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
413 {
414 IoThread = CONTAINING_RECORD(CurrentEntry,
415 RTLP_IOWORKERTHREAD,
416 ListEntry);
417
418 if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
419 {
420 /* if we're trying to queue a long function then make sure we're not dealing
421 with the persistent thread */
422 if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD))
423 {
424 /* found a candidate */
425 break;
426 }
427 }
428
429 CurrentEntry = CurrentEntry->Flink;
430 }
431
432 if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
433 {
434 /* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
435 if (ThreadPoolIOWorkerThreads == 0)
436 {
437 DPRINT1("Failed to find a worker thread for the work item 0x%p!\n");
438 ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList));
439 return STATUS_NO_MEMORY;
440 }
441 else
442 {
443 /* pick the first worker thread */
444 CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
445 IoThread = CONTAINING_RECORD(CurrentEntry,
446 RTLP_IOWORKERTHREAD,
447 ListEntry);
448
449 /* Since this might be the persistent worker thread, don't run as a
450 long function */
451 WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
452 }
453 }
454
455 /* Move the picked thread to the end of the list. Since we're always searching
456 from the beginning, this improves distribution of work items */
457 RemoveEntryList(&IoThread->ListEntry);
458 InsertTailList(&ThreadPoolIOWorkerThreadsList,
459 &IoThread->ListEntry);
460 }
461
462 ASSERT(IoThread != NULL);
463
464 _InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests);
465
466 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
467 {
468 /* We're about to queue a long function, mark the thread */
469 IoThread->Flags |= WT_EXECUTELONGFUNCTION;
470
471 _InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests);
472 }
473
474 /* It's time to queue the work item */
475 Status = NtQueueApcThread(IoThread->ThreadHandle,
476 RtlpExecuteIoWorkItem,
477 IoThread,
478 NULL,
479 WorkItem);
480 if (!NT_SUCCESS(Status))
481 {
482 DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function);
483 _InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
484
485 if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
486 {
487 _InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
488 }
489 }
490
491 return Status;
492 }
493
494 static BOOLEAN
495 RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)
496 {
497 NTSTATUS Status;
498 ULONG IoPending;
499 BOOLEAN CreatedHandle = FALSE;
500 BOOLEAN IsIoPending = TRUE;
501
502 if (ThreadHandle == NULL)
503 {
504 Status = NtDuplicateObject(NtCurrentProcess(),
505 NtCurrentThread(),
506 NtCurrentProcess(),
507 &ThreadHandle,
508 0,
509 0,
510 DUPLICATE_SAME_ACCESS);
511 if (!NT_SUCCESS(Status))
512 {
513 return IsIoPending;
514 }
515
516 CreatedHandle = TRUE;
517 }
518
519 Status = NtQueryInformationThread(ThreadHandle,
520 ThreadIsIoPending,
521 &IoPending,
522 sizeof(IoPending),
523 NULL);
524 if (NT_SUCCESS(Status) && IoPending == 0)
525 {
526 IsIoPending = FALSE;
527 }
528
529 if (CreatedHandle)
530 {
531 NtClose(ThreadHandle);
532 }
533
534 return IsIoPending;
535 }
536
537 static ULONG
538 NTAPI
539 RtlpIoWorkerThreadProc(IN PVOID Parameter)
540 {
541 volatile RTLP_IOWORKERTHREAD ThreadInfo;
542 LARGE_INTEGER Timeout;
543 BOOLEAN Terminate;
544 NTSTATUS Status = STATUS_SUCCESS;
545
546 if (_InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS)
547 {
548 /* Oops, too many worker threads... */
549 goto InitFailed;
550 }
551
552 /* Get a thread handle to ourselves */
553 Status = NtDuplicateObject(NtCurrentProcess(),
554 NtCurrentThread(),
555 NtCurrentProcess(),
556 (PHANDLE)&ThreadInfo.ThreadHandle,
557 0,
558 0,
559 DUPLICATE_SAME_ACCESS);
560 if (!NT_SUCCESS(Status))
561 {
562 DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status);
563
564 InitFailed:
565 _InterlockedDecrement(&ThreadPoolIOWorkerThreads);
566
567 /* Signal initialization completion */
568 _InterlockedExchange((PLONG)Parameter,
569 1);
570
571 RtlExitUserThread(Status);
572 return 0;
573 }
574
575 ThreadInfo.Flags = 0;
576
577 /* Insert the thread into the list */
578 InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList,
579 (PLIST_ENTRY)&ThreadInfo.ListEntry);
580
581 /* Signal initialization completion */
582 _InterlockedExchange((PLONG)Parameter,
583 1);
584
585 for (;;)
586 {
587 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
588
589 Wait:
590 do
591 {
592 /* Perform an alertable wait, the work items are going to be executed as APCs */
593 Status = NtDelayExecution(TRUE,
594 &Timeout);
595
596 /* Loop as long as we executed an APC */
597 } while (Status != STATUS_SUCCESS);
598
599 /* We timed out, let's see if we're allowed to terminate */
600 Terminate = FALSE;
601
602 Status = RtlEnterCriticalSection(&ThreadPoolLock);
603 if (NT_SUCCESS(Status))
604 {
605 if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
606 {
607 /* This thread is supposed to be persistent. Don't terminate! */
608 RtlLeaveCriticalSection(&ThreadPoolLock);
609
610 Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
611 goto Wait;
612 }
613
614 /* FIXME - figure out an effective method to determine if it's appropriate to
615 lower the number of threads. For now let's always terminate if there's
616 at least one thread and no queued items. */
617 Terminate = ((volatile LONG)ThreadPoolIOWorkerThreads - (volatile LONG)ThreadPoolIOWorkerThreadsLongRequests >= WORKERTHREAD_CREATION_THRESHOLD) &&
618 ((volatile LONG)ThreadPoolIOWorkerThreadsRequests == 0);
619
620 if (Terminate)
621 {
622 /* Prevent termination as long as IO is pending */
623 Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
624 }
625
626 if (Terminate)
627 {
628 /* Rundown the thread and unlink it from the list */
629 _InterlockedDecrement(&ThreadPoolIOWorkerThreads);
630 RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry);
631 }
632
633 RtlLeaveCriticalSection(&ThreadPoolLock);
634
635 if (Terminate)
636 {
637 /* Break the infinite loop and terminate */
638 Status = STATUS_SUCCESS;
639 break;
640 }
641 }
642 else
643 {
644 DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status);
645 break;
646 }
647 }
648
649 NtClose(ThreadInfo.ThreadHandle);
650 RtlExitUserThread(Status);
651 return 0;
652 }
653
654 static ULONG
655 NTAPI
656 RtlpWorkerThreadProc(IN PVOID Parameter)
657 {
658 LARGE_INTEGER Timeout;
659 BOOLEAN Terminate;
660 PVOID SystemArgument2;
661 IO_STATUS_BLOCK IoStatusBlock;
662 ULONG TimeoutCount = 0;
663 PKNORMAL_ROUTINE ApcRoutine;
664 NTSTATUS Status = STATUS_SUCCESS;
665
666 if (_InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS)
667 {
668 /* Signal initialization completion */
669 _InterlockedExchange((PLONG)Parameter,
670 1);
671
672 /* Oops, too many worker threads... */
673 RtlExitUserThread(Status);
674 return 0;
675 }
676
677 /* Signal initialization completion */
678 _InterlockedExchange((PLONG)Parameter,
679 1);
680
681 for (;;)
682 {
683 Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
684
685 /* Dequeue a completion message */
686 Status = NtRemoveIoCompletion(ThreadPoolCompletionPort,
687 (PVOID*)&ApcRoutine,
688 &SystemArgument2,
689 &IoStatusBlock,
690 &Timeout);
691
692 if (Status == STATUS_SUCCESS)
693 {
694 TimeoutCount = 0;
695
696 _SEH2_TRY
697 {
698 /* Call the APC routine */
699 ApcRoutine(NULL,
700 (PVOID)IoStatusBlock.Information,
701 SystemArgument2);
702 }
703 _SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
704 {
705 }
706 _SEH2_END;
707 }
708 else
709 {
710 Terminate = FALSE;
711
712 if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock)))
713 continue;
714
715 /* FIXME - this should be optimized, check if there's requests, etc */
716
717 if (Status == STATUS_TIMEOUT)
718 {
719 /* FIXME - we might want to optimize this */
720 if (TimeoutCount++ > 2 &&
721 (volatile LONG)ThreadPoolWorkerThreads - (volatile LONG)ThreadPoolWorkerThreadsLongRequests >= WORKERTHREAD_CREATION_THRESHOLD)
722 {
723 Terminate = TRUE;
724 }
725 }
726 else
727 Terminate = TRUE;
728
729 RtlLeaveCriticalSection(&ThreadPoolLock);
730
731 if (Terminate)
732 {
733 /* Prevent termination as long as IO is pending */
734 Terminate = !RtlpIsIoPending(NULL);
735 }
736
737 if (Terminate)
738 {
739 _InterlockedDecrement(&ThreadPoolWorkerThreads);
740 Status = STATUS_SUCCESS;
741 break;
742 }
743 }
744 }
745
746 RtlExitUserThread(Status);
747 return 0;
748
749 }
750
751 /*
752 * @implemented
753 */
754 NTSTATUS
755 NTAPI
756 RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,
757 IN PVOID Context OPTIONAL,
758 IN ULONG Flags)
759 {
760 LONG FreeWorkers;
761 NTSTATUS Status;
762 PRTLP_WORKITEM WorkItem;
763
764 DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
765
766 /* Initialize the thread pool if not already initialized */
767 if (!IsThreadPoolInitialized())
768 {
769 Status = RtlpInitializeThreadPool();
770
771 if (!NT_SUCCESS(Status))
772 return Status;
773 }
774
775 /* Allocate a work item */
776 WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
777 0,
778 sizeof(RTLP_WORKITEM));
779 if (WorkItem == NULL)
780 return STATUS_NO_MEMORY;
781
782 WorkItem->Function = Function;
783 WorkItem->Context = Context;
784 WorkItem->Flags = Flags;
785
786 if (Flags & WT_TRANSFER_IMPERSONATION)
787 {
788 Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle);
789
790 if (!NT_SUCCESS(Status))
791 {
792 DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status);
793 goto Cleanup;
794 }
795 }
796 else
797 WorkItem->TokenHandle = NULL;
798
799 Status = RtlEnterCriticalSection(&ThreadPoolLock);
800 if (NT_SUCCESS(Status))
801 {
802 if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD))
803 {
804 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
805
806 FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests;
807
808 if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) &&
809 PersistentIoThread != NULL)
810 {
811 /* We shouldn't queue a long function into the persistent IO thread */
812 FreeWorkers--;
813 }
814
815 /* See if it's a good idea to grow the pool */
816 if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS &&
817 (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
818 {
819 /* Grow the thread pool */
820 Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc);
821
822 if (!NT_SUCCESS(Status) && (volatile LONG)ThreadPoolIOWorkerThreads != 0)
823 {
824 /* We failed to create the thread, but there's at least one there so
825 we can at least queue the request */
826 Status = STATUS_SUCCESS;
827 }
828 }
829
830 if (NT_SUCCESS(Status))
831 {
832 /* Queue a IO worker thread */
833 Status = RtlpQueueIoWorkerThread(WorkItem);
834 }
835 }
836 else
837 {
838 /* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
839
840 FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests;
841
842 /* See if it's a good idea to grow the pool */
843 if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS &&
844 (FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
845 {
846 /* Grow the thread pool */
847 Status = RtlpStartWorkerThread(RtlpWorkerThreadProc);
848
849 if (!NT_SUCCESS(Status) && (volatile LONG)ThreadPoolWorkerThreads != 0)
850 {
851 /* We failed to create the thread, but there's at least one there so
852 we can at least queue the request */
853 Status = STATUS_SUCCESS;
854 }
855 }
856
857 if (NT_SUCCESS(Status))
858 {
859 /* Queue a normal worker thread */
860 Status = RtlpQueueWorkerThread(WorkItem);
861 }
862 }
863
864 RtlLeaveCriticalSection(&ThreadPoolLock);
865 }
866
867 if (!NT_SUCCESS(Status))
868 {
869 if (WorkItem->TokenHandle != NULL)
870 {
871 NtClose(WorkItem->TokenHandle);
872 }
873
874 Cleanup:
875 RtlFreeHeap(RtlGetProcessHeap(),
876 0,
877 WorkItem);
878 }
879
880 return Status;
881 }
882
883 /*
884 * @unimplemented
885 */
886 NTSTATUS
887 NTAPI
888 RtlSetIoCompletionCallback(IN HANDLE FileHandle,
889 IN PIO_APC_ROUTINE Callback,
890 IN ULONG Flags)
891 {
892 UNIMPLEMENTED;
893 return STATUS_NOT_IMPLEMENTED;
894 }