[KS]
[reactos.git] / reactos / drivers / ksfilter / ks / worker.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS Kernel Streaming
4 * FILE: drivers/ksfilter/ks/worker.c
5 * PURPOSE: KS Allocator functions
6 * PROGRAMMER: Johannes Anderwald
7 */
8
9 #include "precomp.h"
10
11 #define NDEBUG
12 #include <debug.h>
13
14 /* ===============================================================
15 Worker Management Functions
16 */
17
18 typedef struct
19 {
20 WORK_QUEUE_ITEM WorkItem;
21
22 KEVENT Event;
23 KSPIN_LOCK Lock;
24 WORK_QUEUE_TYPE Type;
25 LONG Counter;
26 LONG QueuedWorkItemCount;
27 LIST_ENTRY QueuedWorkItems;
28
29 PWORK_QUEUE_ITEM CountedWorkItem;
30 }KSIWORKER, *PKSIWORKER;
31
32 VOID
33 NTAPI
34 WorkItemRoutine(
35 IN PVOID Context)
36 {
37 PKSIWORKER KsWorker;
38 KIRQL OldLevel;
39 PWORK_QUEUE_ITEM WorkItem;
40 PLIST_ENTRY Entry;
41
42
43 /* get ks worker implementation */
44 KsWorker = (PKSIWORKER)Context;
45
46 /* acquire back the lock */
47 KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
48
49 do
50 {
51 /* sanity check */
52 ASSERT(!IsListEmpty(&KsWorker->QueuedWorkItems));
53
54 /* remove first entry */
55 Entry = RemoveHeadList(&KsWorker->QueuedWorkItems);
56 /* get offset to work item */
57 WorkItem = (PWORK_QUEUE_ITEM)CONTAINING_RECORD(Entry, WORK_QUEUE_ITEM, List);
58
59 /* release lock as the callback might call one KsWorker functions */
60 KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
61
62 /* now dispatch the work */
63 WorkItem->WorkerRoutine(WorkItem->Parameter);
64
65 /* acquire back the lock */
66 KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
67
68 /* decrement queued work item count */
69 InterlockedDecrement(&KsWorker->QueuedWorkItemCount);
70
71 }while(KsWorker->QueuedWorkItemCount);
72
73 /* release the lock */
74 KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
75
76 /* signal completion event */
77 KeSetEvent(&KsWorker->Event, IO_NO_INCREMENT, FALSE);
78
79 }
80
81
82 /*
83 @implemented
84 */
85 KSDDKAPI
86 NTSTATUS
87 NTAPI
88 KsRegisterWorker(
89 IN WORK_QUEUE_TYPE WorkQueueType,
90 OUT PKSWORKER* Worker)
91 {
92 PKSIWORKER KsWorker;
93
94
95 if (WorkQueueType != CriticalWorkQueue &&
96 WorkQueueType != DelayedWorkQueue &&
97 WorkQueueType != HyperCriticalWorkQueue)
98 {
99 return STATUS_INVALID_PARAMETER;
100 }
101
102 /* allocate worker context */
103 KsWorker = AllocateItem(NonPagedPool, sizeof(KSIWORKER));
104 if (!KsWorker)
105 return STATUS_INSUFFICIENT_RESOURCES;
106
107 /* initialze the work ctx */
108 ExInitializeWorkItem(&KsWorker->WorkItem, WorkItemRoutine, (PVOID)KsWorker);
109 /* setup type */
110 KsWorker->Type = WorkQueueType;
111 /* Initialize work item queue */
112 InitializeListHead(&KsWorker->QueuedWorkItems);
113 /* initialize work item lock */
114 KeInitializeSpinLock(&KsWorker->Lock);
115 /* initialize event */
116 KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE);
117
118 *Worker = KsWorker;
119 return STATUS_SUCCESS;
120 }
121
122 /*
123 @implemented
124 */
125 KSDDKAPI
126 VOID
127 NTAPI
128 KsUnregisterWorker(
129 IN PKSWORKER Worker)
130 {
131 PKSIWORKER KsWorker;
132 KIRQL OldIrql;
133
134 if (!Worker)
135 return;
136
137 /* get ks worker implementation */
138 KsWorker = (PKSIWORKER)Worker;
139 /* acquire spinlock */
140 KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
141 /* fake status running to avoid work items to be queued by the counted worker */
142 KsWorker->Counter = 1;
143 /* is there currently a work item active */
144 if (KsWorker->QueuedWorkItemCount)
145 {
146 /* release the lock */
147 KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
148 /* wait for the worker routine to finish */
149 KeWaitForSingleObject(&KsWorker->Event, Executive, KernelMode, FALSE, NULL);
150 }
151 else
152 {
153 /* no work item active, just release the lock */
154 KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
155 }
156 /* free worker context */
157 FreeItem(KsWorker);
158 }
159
160 /*
161 @implemented
162 */
163 KSDDKAPI
164 NTSTATUS
165 NTAPI
166 KsRegisterCountedWorker(
167 IN WORK_QUEUE_TYPE WorkQueueType,
168 IN PWORK_QUEUE_ITEM CountedWorkItem,
169 OUT PKSWORKER* Worker)
170 {
171 NTSTATUS Status;
172 PKSIWORKER KsWorker;
173
174 /* check for counted work item parameter */
175 if (!CountedWorkItem)
176 return STATUS_INVALID_PARAMETER_2;
177
178 /* create the work ctx */
179 Status = KsRegisterWorker(WorkQueueType, Worker);
180 /* check for success */
181 if (NT_SUCCESS(Status))
182 {
183 /* get ks worker implementation */
184 KsWorker = *(PKSIWORKER*)Worker;
185 /* store counted work item */
186 KsWorker->CountedWorkItem = CountedWorkItem;
187 }
188
189 return Status;
190 }
191
192 /*
193 @implemented
194 */
195 KSDDKAPI
196 ULONG
197 NTAPI
198 KsDecrementCountedWorker(
199 IN PKSWORKER Worker)
200 {
201 PKSIWORKER KsWorker;
202 LONG Counter;
203
204 /* did the caller pass a work ctx */
205 if (!Worker)
206 return STATUS_INVALID_PARAMETER;
207
208 /* get ks worker implementation */
209 KsWorker = (PKSIWORKER)Worker;
210 /* decrement counter */
211 Counter = InterlockedDecrement(&KsWorker->Counter);
212 /* return result */
213 return Counter;
214 }
215
216 /*
217 @implemented
218 */
219 KSDDKAPI
220 ULONG
221 NTAPI
222 KsIncrementCountedWorker(
223 IN PKSWORKER Worker)
224 {
225 PKSIWORKER KsWorker;
226 LONG Counter;
227
228 /* did the caller pass a work ctx */
229 if (!Worker)
230 return STATUS_INVALID_PARAMETER;
231
232 /* get ks worker implementation */
233 KsWorker = (PKSIWORKER)Worker;
234 /* increment counter */
235 Counter = InterlockedIncrement(&KsWorker->Counter);
236 if (Counter == 1)
237 {
238 /* this is the first work item in list, so queue a real work item */
239 KsQueueWorkItem(Worker, KsWorker->CountedWorkItem);
240 }
241
242 /* return current counter */
243 return Counter;
244 }
245
246 /*
247 @implemented
248 */
249 KSDDKAPI
250 NTSTATUS
251 NTAPI
252 KsQueueWorkItem(
253 IN PKSWORKER Worker,
254 IN PWORK_QUEUE_ITEM WorkItem)
255 {
256 PKSIWORKER KsWorker;
257 KIRQL OldIrql;
258
259 /* check for all parameters */
260 if (!Worker || !WorkItem)
261 return STATUS_INVALID_PARAMETER;
262
263 /* get ks worker implementation */
264 KsWorker = (PKSIWORKER)Worker;
265 /* lock the work queue */
266 KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
267 /* insert work item to list */
268 InsertTailList(&KsWorker->QueuedWorkItems, &WorkItem->List);
269 /* increment active count */
270 InterlockedIncrement(&KsWorker->QueuedWorkItemCount);
271 /* is this the first work item */
272 if (KsWorker->QueuedWorkItemCount == 1)
273 {
274 /* clear event */
275 KeClearEvent(&KsWorker->Event);
276 /* it is, queue it */
277 ExQueueWorkItem(&KsWorker->WorkItem, KsWorker->Type);
278 }
279 /* release lock */
280 KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
281
282 return STATUS_SUCCESS;
283 }