[STRMBASE]
[reactos.git] / reactos / lib / 3rdparty / strmbase / outputqueue.c
1 /*
2 * Generic Implementation of COutputQueue
3 *
4 * Copyright 2011 Aric Stewart, CodeWeavers
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 */
20
21 #include "strmbase_private.h"
22
23 enum {SAMPLE_PACKET, EOS_PACKET};
24
25 typedef struct tagQueuedEvent {
26 int type;
27 struct list entry;
28
29 IMediaSample *pSample;
30 } QueuedEvent;
31
32 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data)
33 {
34 OutputQueue *This = (OutputQueue *)data;
35 return This->pFuncsTable->pfnThreadProc(This);
36 }
37
38 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue)
39 {
40 struct list *cursor, *cursor2;
41 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
42 {
43 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
44 list_remove(cursor);
45 HeapFree(GetProcessHeap(),0,qev);
46 }
47 }
48
49 HRESULT WINAPI OutputQueue_Construct(
50 BaseOutputPin *pInputPin,
51 BOOL bAuto,
52 BOOL bQueue,
53 LONG lBatchSize,
54 BOOL bBatchExact,
55 DWORD dwPriority,
56 const OutputQueueFuncTable* pFuncsTable,
57 OutputQueue **ppOutputQueue )
58
59 {
60 HRESULT hr = S_OK;
61 BOOL threaded = FALSE;
62 DWORD tid;
63
64 OutputQueue *This;
65
66 if (!pInputPin || !pFuncsTable || !ppOutputQueue)
67 return E_INVALIDARG;
68
69 *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue));
70 if (!*ppOutputQueue)
71 return E_OUTOFMEMORY;
72
73 This = *ppOutputQueue;
74 This->pFuncsTable = pFuncsTable;
75 This->lBatchSize = lBatchSize;
76 This->bBatchExact = bBatchExact;
77 InitializeCriticalSection(&This->csQueue);
78 This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue");
79 This->SampleList = HeapAlloc(GetProcessHeap(),0,sizeof(struct list));
80 if (!This->SampleList)
81 {
82 OutputQueue_Destroy(This);
83 *ppOutputQueue = NULL;
84 return E_OUTOFMEMORY;
85 }
86 list_init(This->SampleList);
87
88 This->pInputPin = pInputPin;
89 IPin_AddRef(&pInputPin->pin.IPin_iface);
90
91 EnterCriticalSection(&This->csQueue);
92 if (bAuto && pInputPin->pMemInputPin)
93 threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin);
94 else
95 threaded = bQueue;
96
97 if (threaded)
98 {
99 This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid);
100 if (This->hThread)
101 {
102 SetThreadPriority(This->hThread, dwPriority);
103 This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL);
104 }
105 }
106 LeaveCriticalSection(&This->csQueue);
107
108 return hr;
109 }
110
111 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue)
112 {
113 EnterCriticalSection(&pOutputQueue->csQueue);
114 OutputQueue_FreeSamples(pOutputQueue);
115 pOutputQueue->bTerminate = TRUE;
116 SetEvent(pOutputQueue->hProcessQueue);
117 LeaveCriticalSection(&pOutputQueue->csQueue);
118
119 pOutputQueue->csQueue.DebugInfo->Spare[0] = 0;
120 DeleteCriticalSection(&pOutputQueue->csQueue);
121 CloseHandle(pOutputQueue->hProcessQueue);
122
123 HeapFree(GetProcessHeap(),0,pOutputQueue->SampleList);
124
125 IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface);
126 HeapFree(GetProcessHeap(),0,pOutputQueue);
127 return S_OK;
128 }
129
130 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed)
131 {
132 HRESULT hr = S_OK;
133 int i;
134
135 if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin)
136 return VFW_E_NOT_CONNECTED;
137
138 if (!pOutputQueue->hThread)
139 {
140 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
141 hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed);
142 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
143 }
144 else
145 {
146 EnterCriticalSection(&pOutputQueue->csQueue);
147 *nSamplesProcessed = 0;
148
149 for (i = 0; i < nSamples; i++)
150 {
151 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
152 if (!qev)
153 {
154 ERR("Out of Memory\n");
155 hr = E_OUTOFMEMORY;
156 break;
157 }
158 qev->type = SAMPLE_PACKET;
159 qev->pSample = ppSamples[i];
160 IMediaSample_AddRef(ppSamples[i]);
161 list_add_tail(pOutputQueue->SampleList, &qev->entry);
162 (*nSamplesProcessed)++;
163 }
164
165 if (!pOutputQueue->bBatchExact || list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)
166 SetEvent(pOutputQueue->hProcessQueue);
167 LeaveCriticalSection(&pOutputQueue->csQueue);
168 }
169 return hr;
170 }
171
172 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample)
173 {
174 LONG processed;
175 return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
176 }
177
178 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
179 {
180 if (pOutputQueue->hThread)
181 {
182 EnterCriticalSection(&pOutputQueue->csQueue);
183 if (!list_empty(pOutputQueue->SampleList))
184 {
185 pOutputQueue->bSendAnyway = TRUE;
186 SetEvent(pOutputQueue->hProcessQueue);
187 }
188 LeaveCriticalSection(&pOutputQueue->csQueue);
189 }
190 }
191
192 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
193 {
194 EnterCriticalSection(&pOutputQueue->csQueue);
195 if (pOutputQueue->hThread)
196 {
197 QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
198 if (!qev)
199 {
200 ERR("Out of Memory\n");
201 LeaveCriticalSection(&pOutputQueue->csQueue);
202 return;
203 }
204 qev->type = EOS_PACKET;
205 qev->pSample = NULL;
206 list_add_tail(pOutputQueue->SampleList, &qev->entry);
207 }
208 else
209 {
210 IPin* ppin = NULL;
211 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
212 if (ppin)
213 {
214 IPin_EndOfStream(ppin);
215 IPin_Release(ppin);
216 }
217 }
218 LeaveCriticalSection(&pOutputQueue->csQueue);
219 /* Covers sending the Event to the worker Thread */
220 OutputQueue_SendAnyway(pOutputQueue);
221 }
222
223 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
224 {
225 do
226 {
227 EnterCriticalSection(&pOutputQueue->csQueue);
228 if (!list_empty(pOutputQueue->SampleList) &&
229 (!pOutputQueue->bBatchExact ||
230 list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
231 pOutputQueue->bSendAnyway
232 )
233 )
234 {
235 while (!list_empty(pOutputQueue->SampleList))
236 {
237 IMediaSample **ppSamples;
238 LONG nSamples;
239 LONG nSamplesProcessed;
240 struct list *cursor, *cursor2;
241 int i = 0;
242
243 /* First Pass Process Samples */
244 i = list_count(pOutputQueue->SampleList);
245 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
246 nSamples = 0;
247 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
248 {
249 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
250 if (qev->type == SAMPLE_PACKET)
251 ppSamples[nSamples++] = qev->pSample;
252 else
253 break;
254 list_remove(cursor);
255 HeapFree(GetProcessHeap(),0,qev);
256 }
257
258 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
259 {
260 IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
261 LeaveCriticalSection(&pOutputQueue->csQueue);
262 IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
263 EnterCriticalSection(&pOutputQueue->csQueue);
264 IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
265 }
266 for (i = 0; i < nSamples; i++)
267 IMediaSample_Release(ppSamples[i]);
268 HeapFree(GetProcessHeap(),0,ppSamples);
269
270 /* Process Non-Samples */
271 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
272 {
273 QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
274 if (qev->type == EOS_PACKET)
275 {
276 IPin* ppin = NULL;
277 IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
278 if (ppin)
279 {
280 IPin_EndOfStream(ppin);
281 IPin_Release(ppin);
282 }
283 }
284 else if (qev->type == SAMPLE_PACKET)
285 break;
286 else
287 FIXME("Unhandled Event type %i\n",qev->type);
288 list_remove(cursor);
289 HeapFree(GetProcessHeap(),0,qev);
290 }
291 }
292 pOutputQueue->bSendAnyway = FALSE;
293 }
294 LeaveCriticalSection(&pOutputQueue->csQueue);
295 WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
296 }
297 while (!pOutputQueue->bTerminate);
298 return S_OK;
299 }