Synchronize with trunk revision 59636 (just before Alex's CreateProcess revamp).
[reactos.git] / drivers / ksfilter / ks / worker.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS Kernel Streaming
4 * FILE: drivers/ksfilter/ks/worker.c
5 * PURPOSE: KS Allocator functions
6 * PROGRAMMER: Johannes Anderwald
7 */
8
9
10 #include "priv.h"
11
12 /* ===============================================================
13 Worker Management Functions
14 */
15
16 typedef struct
17 {
18 WORK_QUEUE_ITEM WorkItem;
19
20 KEVENT Event;
21 KSPIN_LOCK Lock;
22 WORK_QUEUE_TYPE Type;
23 LONG Counter;
24 LONG QueuedWorkItemCount;
25 LIST_ENTRY QueuedWorkItems;
26
27 PWORK_QUEUE_ITEM CountedWorkItem;
28 }KSIWORKER, *PKSIWORKER;
29
30 VOID
31 NTAPI
32 WorkItemRoutine(
33 IN PVOID Context)
34 {
35 PKSIWORKER KsWorker;
36 KIRQL OldLevel;
37 PWORK_QUEUE_ITEM WorkItem;
38 PLIST_ENTRY Entry;
39
40
41 /* get ks worker implementation */
42 KsWorker = (PKSIWORKER)Context;
43
44 /* acquire back the lock */
45 KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
46
47 do
48 {
49 /* sanity check */
50 ASSERT(!IsListEmpty(&KsWorker->QueuedWorkItems));
51
52 /* remove first entry */
53 Entry = RemoveHeadList(&KsWorker->QueuedWorkItems);
54 /* get offset to work item */
55 WorkItem = (PWORK_QUEUE_ITEM)CONTAINING_RECORD(Entry, WORK_QUEUE_ITEM, List);
56
57 /* release lock as the callback might call one KsWorker functions */
58 KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
59
60 /* now dispatch the work */
61 WorkItem->WorkerRoutine(WorkItem->Parameter);
62
63 /* acquire back the lock */
64 KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
65
66 /* decrement queued work item count */
67 InterlockedDecrement(&KsWorker->QueuedWorkItemCount);
68
69 }while(KsWorker->QueuedWorkItemCount);
70
71 /* release the lock */
72 KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
73
74 /* signal completion event */
75 KeSetEvent(&KsWorker->Event, IO_NO_INCREMENT, FALSE);
76
77 }
78
79
80 /*
81 @implemented
82 */
83 KSDDKAPI
84 NTSTATUS
85 NTAPI
86 KsRegisterWorker(
87 IN WORK_QUEUE_TYPE WorkQueueType,
88 OUT PKSWORKER* Worker)
89 {
90 PKSIWORKER KsWorker;
91
92
93 if (WorkQueueType != CriticalWorkQueue &&
94 WorkQueueType != DelayedWorkQueue &&
95 WorkQueueType != HyperCriticalWorkQueue)
96 {
97 return STATUS_INVALID_PARAMETER;
98 }
99
100 /* allocate worker context */
101 KsWorker = AllocateItem(NonPagedPool, sizeof(KSIWORKER));
102 if (!KsWorker)
103 return STATUS_INSUFFICIENT_RESOURCES;
104
105 /* initialze the work ctx */
106 ExInitializeWorkItem(&KsWorker->WorkItem, WorkItemRoutine, (PVOID)KsWorker);
107 /* setup type */
108 KsWorker->Type = WorkQueueType;
109 /* Initialize work item queue */
110 InitializeListHead(&KsWorker->QueuedWorkItems);
111 /* initialize work item lock */
112 KeInitializeSpinLock(&KsWorker->Lock);
113 /* initialize event */
114 KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE);
115
116 *Worker = KsWorker;
117 return STATUS_SUCCESS;
118 }
119
120 /*
121 @implemented
122 */
123 KSDDKAPI
124 VOID
125 NTAPI
126 KsUnregisterWorker(
127 IN PKSWORKER Worker)
128 {
129 PKSIWORKER KsWorker;
130 KIRQL OldIrql;
131
132 if (!Worker)
133 return;
134
135 /* get ks worker implementation */
136 KsWorker = (PKSIWORKER)Worker;
137 /* acquire spinlock */
138 KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
139 /* fake status running to avoid work items to be queued by the counted worker */
140 KsWorker->Counter = 1;
141 /* is there currently a work item active */
142 if (KsWorker->QueuedWorkItemCount)
143 {
144 /* release the lock */
145 KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
146 /* wait for the worker routine to finish */
147 KeWaitForSingleObject(&KsWorker->Event, Executive, KernelMode, FALSE, NULL);
148 }
149 else
150 {
151 /* no work item active, just release the lock */
152 KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
153 }
154 /* free worker context */
155 FreeItem(KsWorker);
156 }
157
158 /*
159 @implemented
160 */
161 KSDDKAPI
162 NTSTATUS
163 NTAPI
164 KsRegisterCountedWorker(
165 IN WORK_QUEUE_TYPE WorkQueueType,
166 IN PWORK_QUEUE_ITEM CountedWorkItem,
167 OUT PKSWORKER* Worker)
168 {
169 NTSTATUS Status;
170 PKSIWORKER KsWorker;
171
172 /* check for counted work item parameter */
173 if (!CountedWorkItem)
174 return STATUS_INVALID_PARAMETER_2;
175
176 /* create the work ctx */
177 Status = KsRegisterWorker(WorkQueueType, Worker);
178 /* check for success */
179 if (NT_SUCCESS(Status))
180 {
181 /* get ks worker implementation */
182 KsWorker = *(PKSIWORKER*)Worker;
183 /* store counted work item */
184 KsWorker->CountedWorkItem = CountedWorkItem;
185 }
186
187 return Status;
188 }
189
190 /*
191 @implemented
192 */
193 KSDDKAPI
194 ULONG
195 NTAPI
196 KsDecrementCountedWorker(
197 IN PKSWORKER Worker)
198 {
199 PKSIWORKER KsWorker;
200 LONG Counter;
201
202 /* did the caller pass a work ctx */
203 if (!Worker)
204 return STATUS_INVALID_PARAMETER;
205
206 /* get ks worker implementation */
207 KsWorker = (PKSIWORKER)Worker;
208 /* decrement counter */
209 Counter = InterlockedDecrement(&KsWorker->Counter);
210 /* return result */
211 return Counter;
212 }
213
214 /*
215 @implemented
216 */
217 KSDDKAPI
218 ULONG
219 NTAPI
220 KsIncrementCountedWorker(
221 IN PKSWORKER Worker)
222 {
223 PKSIWORKER KsWorker;
224 LONG Counter;
225
226 /* did the caller pass a work ctx */
227 if (!Worker)
228 return STATUS_INVALID_PARAMETER;
229
230 /* get ks worker implementation */
231 KsWorker = (PKSIWORKER)Worker;
232 /* increment counter */
233 Counter = InterlockedIncrement(&KsWorker->Counter);
234 if (Counter == 1)
235 {
236 /* this is the first work item in list, so queue a real work item */
237 KsQueueWorkItem(Worker, KsWorker->CountedWorkItem);
238 }
239
240 /* return current counter */
241 return Counter;
242 }
243
244 /*
245 @implemented
246 */
247 KSDDKAPI
248 NTSTATUS
249 NTAPI
250 KsQueueWorkItem(
251 IN PKSWORKER Worker,
252 IN PWORK_QUEUE_ITEM WorkItem)
253 {
254 PKSIWORKER KsWorker;
255 KIRQL OldIrql;
256
257 /* check for all parameters */
258 if (!Worker || !WorkItem)
259 return STATUS_INVALID_PARAMETER;
260
261 /* get ks worker implementation */
262 KsWorker = (PKSIWORKER)Worker;
263 /* lock the work queue */
264 KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
265 /* insert work item to list */
266 InsertTailList(&KsWorker->QueuedWorkItems, &WorkItem->List);
267 /* increment active count */
268 InterlockedIncrement(&KsWorker->QueuedWorkItemCount);
269 /* is this the first work item */
270 if (KsWorker->QueuedWorkItemCount == 1)
271 {
272 /* clear event */
273 KeClearEvent(&KsWorker->Event);
274 /* it is, queue it */
275 ExQueueWorkItem(&KsWorker->WorkItem, KsWorker->Type);
276 }
277 /* release lock */
278 KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
279
280 return STATUS_SUCCESS;
281 }