cbcd96407661087c866c48fb618a0677d8cc3125
[reactos.git] / reactos / lib / 3rdparty / strmbase / outputqueue.c
1 /*
2 * Generic Implementation of COutputQueue
3 *
4 * Copyright 2011 Aric Stewart, CodeWeavers
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 */
20
21 #define COBJMACROS
22
23 #include "dshow.h"
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "wine/list.h"
27 #include "wine/strmbase.h"
28 #include "uuids.h"
29 #include "vfwmsgs.h"
30 #include <assert.h>
31
32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
33
34 enum {SAMPLE_PACKET, EOS_PACKET};
35
36 typedef struct tagQueuedEvent {
37 int type;
38 struct list entry;
39
40 IMediaSample *pSample;
41 } QueuedEvent;
42
43 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data)
44 {
45 OutputQueue *This = (OutputQueue *)data;
46 return This->pFuncsTable->pfnThreadProc(This);
47 }
48
49 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue)
50 {
51 struct list *cursor, *cursor2;
52 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
53 {
54 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
55 list_remove(cursor);
56 HeapFree(GetProcessHeap(),0,qev);
57 }
58 }
59
60 HRESULT WINAPI OutputQueue_Construct(
61 BaseOutputPin *pInputPin,
62 BOOL bAuto,
63 BOOL bQueue,
64 LONG lBatchSize,
65 BOOL bBatchExact,
66 DWORD dwPriority,
67 const OutputQueueFuncTable* pFuncsTable,
68 OutputQueue **ppOutputQueue )
69
70 {
71 HRESULT hr = S_OK;
72 BOOL threaded = FALSE;
73 DWORD tid;
74
75 OutputQueue *This;
76
77 if (!pInputPin || !pFuncsTable || !ppOutputQueue)
78 return E_INVALIDARG;
79
80 *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue));
81 if (!*ppOutputQueue)
82 return E_OUTOFMEMORY;
83
84 This = *ppOutputQueue;
85 This->pFuncsTable = pFuncsTable;
86 This->lBatchSize = lBatchSize;
87 This->bBatchExact = bBatchExact;
88 InitializeCriticalSection(&This->csQueue);
89 This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue");
90 This->SampleList = HeapAlloc(GetProcessHeap(),0,sizeof(struct list));
91 if (!This->SampleList)
92 {
93 OutputQueue_Destroy(This);
94 *ppOutputQueue = NULL;
95 return E_OUTOFMEMORY;
96 }
97 list_init(This->SampleList);
98
99 This->pInputPin = pInputPin;
100 IPin_AddRef(&pInputPin->pin.IPin_iface);
101
102 EnterCriticalSection(&This->csQueue);
103 if (bAuto && pInputPin->pMemInputPin)
104 threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin);
105 else
106 threaded = bQueue;
107
108 if (threaded)
109 {
110 This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid);
111 if (This->hThread)
112 {
113 SetThreadPriority(This->hThread, dwPriority);
114 This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL);
115 }
116 }
117 LeaveCriticalSection(&This->csQueue);
118
119 return hr;
120 }
121
122 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue)
123 {
124 EnterCriticalSection(&pOutputQueue->csQueue);
125 OutputQueue_FreeSamples(pOutputQueue);
126 pOutputQueue->bTerminate = TRUE;
127 SetEvent(pOutputQueue->hProcessQueue);
128 LeaveCriticalSection(&pOutputQueue->csQueue);
129
130 pOutputQueue->csQueue.DebugInfo->Spare[0] = 0;
131 DeleteCriticalSection(&pOutputQueue->csQueue);
132 CloseHandle(pOutputQueue->hProcessQueue);
133
134 HeapFree(GetProcessHeap(),0,pOutputQueue->SampleList);
135
136 IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface);
137 HeapFree(GetProcessHeap(),0,pOutputQueue);
138 return S_OK;
139 }
140
141 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed)
142 {
143 HRESULT hr = S_OK;
144 int i;
145
146 if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin)
147 return VFW_E_NOT_CONNECTED;
148
149 if (!pOutputQueue->hThread)
150 {
151 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
152 hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed);
153 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
154 }
155 else
156 {
157 EnterCriticalSection(&pOutputQueue->csQueue);
158 *nSamplesProcessed = 0;
159
160 for (i = 0; i < nSamples; i++)
161 {
162 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
163 if (!qev)
164 {
165 ERR("Out of Memory\n");
166 hr = E_OUTOFMEMORY;
167 break;
168 }
169 qev->type = SAMPLE_PACKET;
170 qev->pSample = ppSamples[i];
171 IMediaSample_AddRef(ppSamples[i]);
172 list_add_tail(pOutputQueue->SampleList, &qev->entry);
173 (*nSamplesProcessed)++;
174 }
175
176 if (!pOutputQueue->bBatchExact || list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)
177 SetEvent(pOutputQueue->hProcessQueue);
178 LeaveCriticalSection(&pOutputQueue->csQueue);
179 }
180 return hr;
181 }
182
183 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample)
184 {
185 LONG processed;
186 return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
187 }
188
189 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
190 {
191 if (pOutputQueue->hThread)
192 {
193 EnterCriticalSection(&pOutputQueue->csQueue);
194 if (!list_empty(pOutputQueue->SampleList))
195 {
196 pOutputQueue->bSendAnyway = TRUE;
197 SetEvent(pOutputQueue->hProcessQueue);
198 }
199 LeaveCriticalSection(&pOutputQueue->csQueue);
200 }
201 }
202
203 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
204 {
205 EnterCriticalSection(&pOutputQueue->csQueue);
206 if (pOutputQueue->hThread)
207 {
208 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
209 if (!qev)
210 {
211 ERR("Out of Memory\n");
212 LeaveCriticalSection(&pOutputQueue->csQueue);
213 return;
214 }
215 qev->type = EOS_PACKET;
216 qev->pSample = NULL;
217 list_add_tail(pOutputQueue->SampleList, &qev->entry);
218 }
219 else
220 {
221 IPin* ppin = NULL;
222 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
223 if (ppin)
224 {
225 IPin_EndOfStream(ppin);
226 IPin_Release(ppin);
227 }
228 }
229 LeaveCriticalSection(&pOutputQueue->csQueue);
230 /* Covers sending the Event to the worker Thread */
231 OutputQueue_SendAnyway(pOutputQueue);
232 }
233
234 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
235 {
236 do
237 {
238 EnterCriticalSection(&pOutputQueue->csQueue);
239 if (!list_empty(pOutputQueue->SampleList) &&
240 (!pOutputQueue->bBatchExact ||
241 list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
242 pOutputQueue->bSendAnyway
243 )
244 )
245 {
246 while (!list_empty(pOutputQueue->SampleList))
247 {
248 IMediaSample **ppSamples;
249 LONG nSamples;
250 LONG nSamplesProcessed;
251 struct list *cursor, *cursor2;
252 int i = 0;
253
254 /* First Pass Process Samples */
255 i = list_count(pOutputQueue->SampleList);
256 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
257 nSamples = 0;
258 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
259 {
260 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
261 if (qev->type == SAMPLE_PACKET)
262 ppSamples[nSamples++] = qev->pSample;
263 else
264 break;
265 list_remove(cursor);
266 HeapFree(GetProcessHeap(),0,qev);
267 }
268
269 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
270 {
271 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
272 LeaveCriticalSection(&pOutputQueue->csQueue);
273 IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
274 EnterCriticalSection(&pOutputQueue->csQueue);
275 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
276 }
277 for (i = 0; i < nSamples; i++)
278 IMediaSample_Release(ppSamples[i]);
279 HeapFree(GetProcessHeap(),0,ppSamples);
280
281 /* Process Non-Samples */
282 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
283 {
284 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
285 if (qev->type == EOS_PACKET)
286 {
287 IPin* ppin = NULL;
288 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
289 if (ppin)
290 {
291 IPin_EndOfStream(ppin);
292 IPin_Release(ppin);
293 }
294 }
295 else if (qev->type == SAMPLE_PACKET)
296 break;
297 else
298 FIXME("Unhandled Event type %i\n",qev->type);
299 list_remove(cursor);
300 HeapFree(GetProcessHeap(),0,qev);
301 }
302 }
303 pOutputQueue->bSendAnyway = FALSE;
304 }
305 LeaveCriticalSection(&pOutputQueue->csQueue);
306 WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
307 }
308 while (!pOutputQueue->bTerminate);
309 return S_OK;
310 }