[KSPROXY]
[reactos.git] / reactos / dll / directx / ksproxy / output_pin.cpp
index c5d897c..e3d620e 100644 (file)
@@ -160,6 +160,11 @@ public:
     HRESULT STDMETHODCALLTYPE CheckFormat(const AM_MEDIA_TYPE *pmt);
     HRESULT STDMETHODCALLTYPE CreatePin(const AM_MEDIA_TYPE *pmt);
     HRESULT STDMETHODCALLTYPE CreatePinHandle(PKSPIN_MEDIUM Medium, PKSPIN_INTERFACE Interface, const AM_MEDIA_TYPE *pmt);
+    HRESULT WINAPI IoProcessRoutine();
+    HRESULT WINAPI InitializeIOThread();
+
+    friend DWORD WINAPI COutputPin_IoThreadStartup(LPVOID lpParameter);
+    friend HRESULT STDMETHODCALLTYPE COutputPin_SetState(IPin * Pin, KSSTATE State);
 
 protected:
     LONG m_Ref;
@@ -178,6 +183,7 @@ protected:
     PKSALLOCATOR_FRAMING_EX m_FramingEx[4];
 
     IMemAllocator * m_MemAllocator;
+    IMemInputPin * m_MemInputPin;
     LONG m_IoCount;
     KSPIN_COMMUNICATION m_Communication;
     KSPIN_INTERFACE m_Interface;
@@ -187,6 +193,14 @@ protected:
     IMediaSeeking * m_FilterMediaSeeking;
     ALLOCATOR_PROPERTIES m_Properties;
     IKsInterfaceHandler * m_InterfaceHandler;
+
+    HANDLE m_hStartEvent;
+    HANDLE m_hBufferAvailable;
+    HANDLE m_hStopEvent;
+    BOOL m_StopInProgress;
+    BOOL m_IoThreadStarted;
+
+    KSSTATE m_State;
 };
 
 COutputPin::~COutputPin()
@@ -211,10 +225,17 @@ COutputPin::COutputPin(
                                          m_bPinBusCacheInitialized(0),
                                          m_FilterName(0),
                                          m_MemAllocator(0),
+                                         m_MemInputPin(0),
                                          m_IoCount(0),
                                          m_Communication(Communication),
                                          m_FilterMediaSeeking(0),
-                                         m_InterfaceHandler(0)
+                                         m_InterfaceHandler(0),
+                                         m_hStartEvent(0),
+                                         m_hBufferAvailable(0),
+                                         m_hStopEvent(0),
+                                         m_StopInProgress(0),
+                                         m_IoThreadStarted(0),
+                                         m_State(KSSTATE_STOP)
 {
     HRESULT hr;
 
@@ -452,8 +473,12 @@ HRESULT
 STDMETHODCALLTYPE
 COutputPin::NotifyRelease()
 {
-    OutputDebugStringW(L"COutputPin::NotifyRelease NotImplemented\n");
-    return E_NOTIMPL;
+    OutputDebugStringW(L"COutputPin::NotifyRelease\n");
+
+    // notify thread of new available sample
+    SetEvent(m_hBufferAvailable);
+
+    return NOERROR;
 }
 
 //-------------------------------------------------------------------
@@ -1220,6 +1245,9 @@ STDMETHODCALLTYPE
 COutputPin::Connect(IPin *pReceivePin, const AM_MEDIA_TYPE *pmt)
 {
     HRESULT hr;
+    ALLOCATOR_PROPERTIES Properties;
+    IMemAllocatorCallbackTemp *pMemCallback;
+    WCHAR Buffer[200];
 
     OutputDebugStringW(L"COutputPin::Connect called\n");
     if (pmt)
@@ -1238,7 +1266,105 @@ COutputPin::Connect(IPin *pReceivePin, const AM_MEDIA_TYPE *pmt)
          pmt = &m_MediaFormat;
     }
 
-    //FIXME create pin handle
+    // query for IMemInput interface
+    hr = pReceivePin->QueryInterface(IID_IMemInputPin, (void**)&m_MemInputPin);
+    if (FAILED(hr))
+    {
+        OutputDebugStringW(L"COutputPin::Connect no IMemInputPin interface\n");
+        DebugBreak();
+        return hr;
+    }
+
+    // get input pin allocator properties
+    ZeroMemory(&Properties, sizeof(ALLOCATOR_PROPERTIES));
+    m_MemInputPin->GetAllocatorRequirements(&Properties);
+
+    //FIXME determine allocator properties
+    Properties.cBuffers = 16;
+    Properties.cbBuffer = 2048 * 188; //2048 frames * MPEG2 TS Payload size
+    Properties.cbAlign = 4;
+
+    // get input pin allocator
+    hr = m_MemInputPin->GetAllocator(&m_MemAllocator);
+    if (SUCCEEDED(hr))
+    {
+        // set allocator properties
+        hr = m_MemAllocator->SetProperties(&Properties, &m_Properties);
+        if (FAILED(hr))
+            m_MemAllocator->Release();
+    }
+
+    if (FAILED(hr))
+    {
+        hr = CKsAllocator_Constructor(NULL, IID_IMemAllocator, (void**)&m_MemAllocator);
+        if (FAILED(hr))
+            return hr;
+
+        // set allocator properties
+        hr = m_MemAllocator->SetProperties(&Properties, &m_Properties);
+        if (FAILED(hr))
+        {
+            swprintf(Buffer, L"COutputPin::Connect IMemAllocator::SetProperties failed with hr %lx\n", hr);
+            OutputDebugStringW(Buffer);
+            m_MemAllocator->Release();
+            m_MemInputPin->Release();
+            return hr;
+        }
+    }
+
+    // commit property changes
+    hr = m_MemAllocator->Commit();
+    if (FAILED(hr))
+    {
+        swprintf(Buffer, L"COutputPin::Connect IMemAllocator::Commit failed with hr %lx\n", hr);
+        OutputDebugStringW(Buffer);
+        m_MemAllocator->Release();
+        m_MemInputPin->Release();
+        return hr;
+    }
+
+    // get callback interface
+    hr = m_MemAllocator->QueryInterface(IID_IMemAllocatorCallbackTemp, (void**)&pMemCallback);
+    if (FAILED(hr))
+    {
+        swprintf(Buffer, L"COutputPin::Connect No IMemAllocatorCallbackTemp interface hr %lx\n", hr);
+        OutputDebugStringW(Buffer);
+        m_MemAllocator->Release();
+        m_MemInputPin->Release();
+        return hr;
+    }
+
+    // set notification routine
+    hr = pMemCallback->SetNotify((IMemAllocatorNotifyCallbackTemp*)this);
+
+    // release IMemAllocatorNotifyCallbackTemp interface
+    pMemCallback->Release();
+
+    if (FAILED(hr))
+    {
+        swprintf(Buffer, L"COutputPin::Connect IMemAllocatorNotifyCallbackTemp::SetNotify failed hr %lx\n", hr);
+        OutputDebugStringW(Buffer);
+        m_MemAllocator->Release();
+        m_MemInputPin->Release();
+        return hr;
+    }
+
+    // now set allocator
+    hr = m_MemInputPin->NotifyAllocator(m_MemAllocator, TRUE);
+    if (FAILED(hr))
+    {
+        swprintf(Buffer, L"COutputPin::Connect IMemInputPin::NotifyAllocator failed with hr %lx\n", hr);
+        OutputDebugStringW(Buffer);
+        m_MemAllocator->Release();
+        m_MemInputPin->Release();
+        return hr;
+    }
+
+    if (!m_hPin)
+    {
+        //FIXME create pin handle
+        assert(0);
+    }
 
     // receive connection;
     hr = pReceivePin->ReceiveConnection((IPin*)this, pmt);
@@ -1249,6 +1375,11 @@ COutputPin::Connect(IPin *pReceivePin, const AM_MEDIA_TYPE *pmt)
         m_Pin = pReceivePin;
         OutputDebugStringW(L"COutputPin::Connect success\n");
     }
+    else
+    {
+        m_MemInputPin->Release();
+        m_MemAllocator->Release();
+    }
 
     return hr;
 }
@@ -1257,7 +1388,6 @@ HRESULT
 STDMETHODCALLTYPE
 COutputPin::ReceiveConnection(IPin *pConnector, const AM_MEDIA_TYPE *pmt)
 {
-    OutputDebugStringW(L"COutputPin::ReceiveConnection\n");
     return E_UNEXPECTED;
 }
 HRESULT
@@ -1277,6 +1407,8 @@ COutputPin::Disconnect( void)
 
     m_Pin->Release();
     m_Pin = NULL;
+    m_MemInputPin->Release();
+    m_MemAllocator->Release();
 
     OutputDebugStringW(L"COutputPin::Disconnect\n");
     return S_OK;
@@ -1421,29 +1553,34 @@ HRESULT
 STDMETHODCALLTYPE
 COutputPin::EndOfStream( void)
 {
-    OutputDebugStringW(L"COutputPin::EndOfStream called\n");
-    return E_NOTIMPL;
+    /* should be called only on input pins */
+    return E_UNEXPECTED;
 }
 HRESULT
 STDMETHODCALLTYPE
 COutputPin::BeginFlush( void)
 {
-    OutputDebugStringW(L"COutputPin::BeginFlush called\n");
-    return E_NOTIMPL;
+    /* should be called only on input pins */
+    return E_UNEXPECTED;
 }
 HRESULT
 STDMETHODCALLTYPE
 COutputPin::EndFlush( void)
 {
-    OutputDebugStringW(L"COutputPin::EndFlush called\n");
-    return E_NOTIMPL;
+    /* should be called only on input pins */
+    return E_UNEXPECTED;
 }
 HRESULT
 STDMETHODCALLTYPE
 COutputPin::NewSegment(REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate)
 {
-    OutputDebugStringW(L"COutputPin::NewSegment called\n");
-    return E_NOTIMPL;
+    if (!m_Pin)
+    {
+        // we are not connected
+        return VFW_E_NOT_CONNECTED;
+    }
+
+    return m_Pin->NewSegment(tStart, tStop, dRate);
 }
 
 //-------------------------------------------------------------------
@@ -1600,6 +1737,9 @@ COutputPin::CreatePinHandle(
     PKSDATAFORMAT DataFormat;
     ULONG Length;
     HRESULT hr;
+    //KSALLOCATOR_FRAMING Framing;
+    //KSPROPERTY Property;
+    //ULONG BytesReturned;
 
     if (m_hPin != INVALID_HANDLE_VALUE)
     {
@@ -1651,7 +1791,7 @@ COutputPin::CreatePinHandle(
     assert(hFilter != NULL);
 
     // create pin
-    hr = KsCreatePin(hFilter, PinConnect, GENERIC_WRITE, &m_hPin);
+    hr = KsCreatePin(hFilter, PinConnect, GENERIC_READ, &m_hPin);
 
     if (SUCCEEDED(hr))
     {
@@ -1683,6 +1823,28 @@ COutputPin::CreatePinHandle(
             }
             CopyMemory(m_MediaFormat.pbFormat, pmt->pbFormat, pmt->cbFormat);
         }
+#if 0
+        Property.Set = KSPROPSETID_Connection;
+        Property.Id = KSPROPERTY_CONNECTION_ALLOCATORFRAMING;
+        Property.Flags = KSPROPERTY_TYPE_GET;
+
+        ZeroMemory(&Framing, sizeof(KSALLOCATOR_FRAMING));
+        hr = KsProperty(&Property, sizeof(KSPROPERTY), (PVOID)&Framing, sizeof(KSALLOCATOR_FRAMING), &BytesReturned);
+        if (SUCCEEDED(hr))
+        {
+            m_Properties.cbAlign = (Framing.FileAlignment + 1);
+            m_Properties.cbBuffer = Framing.FrameSize;
+            m_Properties.cbPrefix = 0; //FIXME
+            m_Properties.cBuffers = Framing.Frames;
+        }
+        hr = S_OK;
+#endif
+
+        if (FAILED(InitializeIOThread()))
+        {
+            OutputDebugStringW(L"COutputPin::CreatePinHandle failed to initialize i/o thread\n");
+            DebugBreak();
+        }
 
         //TODO
         // connect pin pipes
@@ -1694,6 +1856,277 @@ COutputPin::CreatePinHandle(
     return hr;
 }
 
+HRESULT
+WINAPI
+COutputPin::IoProcessRoutine()
+{
+    IMediaSample *Sample;
+    LONG SampleCount;
+    HRESULT hr;
+    PKSSTREAM_SEGMENT StreamSegment;
+    HANDLE hEvent;
+    WCHAR Buffer[100];
+    IMediaSample * Samples[1];
+
+    // first wait for the start event to signal
+    WaitForSingleObject(m_hStartEvent, INFINITE);
+
+    assert(m_InterfaceHandler);
+    REFERENCE_TIME Start = 0;
+    REFERENCE_TIME Stop = 1;
+    do
+    {
+        if (m_StopInProgress)
+        {
+            // stop io thread
+            break;
+        }
+
+        // get buffer
+        hr = m_MemAllocator->GetBuffer(&Sample, NULL, NULL, AM_GBF_NOWAIT);
+
+        if (FAILED(hr))
+        {
+            m_Pin->BeginFlush();
+            OutputDebugStringW(L"Beginning flushing...\n");
+            WaitForSingleObject(m_hBufferAvailable, INFINITE);
+            m_Pin->EndFlush();
+            // now retry again
+            continue;
+        }
+
+        // fill buffer
+        SampleCount = 1;
+        Samples[0] = Sample;
+
+
+        Sample->SetTime(&Start, &Stop);
+        hr = m_InterfaceHandler->KsProcessMediaSamples(NULL, /* FIXME */
+                                                       Samples,
+                                                       &SampleCount,
+                                                       KsIoOperation_Read,
+                                                       &StreamSegment);
+        if (FAILED(hr) || !StreamSegment)
+        {
+            swprintf(Buffer, L"COutputPin::IoProcessRoutine KsProcessMediaSamples PinName %s hr %lx StreamSegment %p\n", m_PinName, hr, StreamSegment);
+            OutputDebugStringW(Buffer);
+            break;
+        }
+
+        // get completion event
+        hEvent = StreamSegment->CompletionEvent;
+
+        // wait for i/o completion
+        WaitForSingleObject(hEvent, INFINITE);
+
+        // perform completion
+        m_InterfaceHandler->KsCompleteIo(StreamSegment);
+
+        // close completion event
+        CloseHandle(hEvent);
+
+        if (SUCCEEDED(hr))
+        {
+            Sample->GetTime(&Start, &Stop);
+            Start = Stop;
+            Stop++;
+
+            // now deliver the sample
+            hr = m_MemInputPin->Receive(Sample);
+
+            swprintf(Buffer, L"COutputPin::IoProcessRoutine IMemInputPin::Receive hr %lx Start %I64u Stop %I64u\n", hr, Start, Stop);
+            OutputDebugStringW(Buffer);
+        }
+    }while(TRUE);
+
+    // signal end of i/o thread
+    SetEvent(m_hStopEvent);
+
+    m_IoThreadStarted = false;
+
+    return NOERROR;
+}
+
+DWORD
+WINAPI
+COutputPin_IoThreadStartup(
+    LPVOID lpParameter)
+{
+    COutputPin * Pin = (COutputPin*)lpParameter;
+    assert(Pin);
+
+    return Pin->IoProcessRoutine();
+}
+
+
+HRESULT
+WINAPI
+COutputPin::InitializeIOThread()
+{
+    HANDLE hThread;
+
+    if (m_IoThreadStarted)
+        return NOERROR;
+
+    if (!m_hStartEvent)
+        m_hStartEvent = CreateEventW(NULL, FALSE, FALSE, NULL);
+    else
+        ResetEvent(m_hStartEvent);
+
+    if (!m_hStartEvent)
+        return E_OUTOFMEMORY;
+
+    if (!m_hStopEvent)
+        m_hStopEvent = CreateEventW(NULL, FALSE, FALSE, NULL);
+    else
+        ResetEvent(m_hStopEvent);
+
+    if (!m_hStopEvent)
+        return E_OUTOFMEMORY;
+
+    if (!m_hBufferAvailable)
+        m_hBufferAvailable = CreateEventW(NULL, FALSE, FALSE, NULL);
+    else
+        ResetEvent(m_hBufferAvailable);
+
+    if (!m_hBufferAvailable)
+        return E_OUTOFMEMORY;
+
+    m_StopInProgress = false;
+    m_IoThreadStarted = true;
+
+    // now create the startup thread
+    hThread = CreateThread(NULL, 0, COutputPin_IoThreadStartup, (LPVOID)this, 0, NULL);
+    if (!hThread)
+        return E_OUTOFMEMORY;
+
+
+    // close thread handle
+    CloseHandle(hThread);
+    return NOERROR;
+}
+
+HRESULT
+STDMETHODCALLTYPE
+COutputPin_SetState(
+    IPin * Pin,
+    KSSTATE State)
+{
+    HRESULT hr = S_OK;
+    KSPROPERTY Property;
+    KSSTATE CurState;
+    WCHAR Buffer[100];
+    ULONG BytesReturned;
+
+    COutputPin * pPin = (COutputPin*)Pin;
+
+
+    Property.Set = KSPROPSETID_Connection;
+    Property.Id = KSPROPERTY_CONNECTION_STATE;
+    Property.Flags = KSPROPERTY_TYPE_SET;
+
+
+    if (pPin->m_State < State)
+    {
+        if (pPin->m_State == KSSTATE_STOP)
+        {
+            CurState = KSSTATE_ACQUIRE;
+            hr = pPin->KsProperty(&Property, sizeof(KSPROPERTY), &CurState, sizeof(KSSTATE), &BytesReturned);
+
+            swprintf(Buffer, L"COutputPin_SetState Setting State KSSTATE_ACQUIRE PinName %s hr %lx\n", pPin->m_PinName, hr);
+            OutputDebugStringW(Buffer);
+            if (FAILED(hr))
+                return hr;
+
+            pPin->m_State = CurState;
+
+            if (pPin->m_State == State)
+                return hr;
+        }
+        if (pPin->m_State == KSSTATE_ACQUIRE)
+        {
+            CurState = KSSTATE_PAUSE;
+            hr = pPin->KsProperty(&Property, sizeof(KSPROPERTY), &CurState, sizeof(KSSTATE), &BytesReturned);
+
+            swprintf(Buffer, L"COutputPin_SetState Setting State KSSTATE_PAUSE PinName %s hr %lx\n", pPin->m_PinName, hr);
+            OutputDebugStringW(Buffer);
+            if (FAILED(hr))
+                return hr;
+
+            pPin->m_State = CurState;
+
+            if (pPin->m_State == State)
+                return hr;
+        }
+
+        CurState = KSSTATE_RUN;
+        hr = pPin->KsProperty(&Property, sizeof(KSPROPERTY), &CurState, sizeof(KSSTATE), &BytesReturned);
+
+        swprintf(Buffer, L"COutputPin_SetState Setting State KSSTATE_RUN PinName %s hr %lx\n", pPin->m_PinName, hr);
+        OutputDebugStringW(Buffer);
+        if (FAILED(hr))
+            return hr;
+
+        // signal start event
+        SetEvent(pPin->m_hStartEvent);
+
+
+        pPin->m_State = CurState;
+        return hr;
+    }
+    else
+    {
+        if (pPin->m_State == KSSTATE_RUN)
+        {
+            CurState = KSSTATE_PAUSE;
+            hr = pPin->KsProperty(&Property, sizeof(KSPROPERTY), &CurState, sizeof(KSSTATE), &BytesReturned);
+
+            swprintf(Buffer, L"COutputPin_SetState Setting State KSSTATE_PAUSE PinName %u hr %lx\n", pPin->m_PinName, hr);
+            OutputDebugStringW(Buffer);
+            if (FAILED(hr))
+                return hr;
+
+            pPin->m_State = CurState;
+
+            if (pPin->m_State == State)
+                return hr;
+        }
+        if (pPin->m_State == KSSTATE_PAUSE)
+        {
+            CurState = KSSTATE_ACQUIRE;
+            hr = pPin->KsProperty(&Property, sizeof(KSPROPERTY), &CurState, sizeof(KSSTATE), &BytesReturned);
+
+            swprintf(Buffer, L"COutputPin_SetState Setting State KSSTATE_ACQUIRE PinName %u hr %lx\n", pPin->m_PinName, hr);
+            OutputDebugStringW(Buffer);
+            if (FAILED(hr))
+                return hr;
+
+            pPin->m_State = CurState;
+
+            if (pPin->m_State == State)
+                return hr;
+        }
+
+        // setting pending stop flag
+        pPin->m_StopInProgress = true;
+
+        CurState = KSSTATE_STOP;
+        hr = pPin->KsProperty(&Property, sizeof(KSPROPERTY), &CurState, sizeof(KSSTATE), &BytesReturned);
+
+        swprintf(Buffer, L"COutputPin_SetState Setting State KSSTATE_STOP PinName %s hr %lx\n", pPin->m_PinName, hr);
+        OutputDebugStringW(Buffer);
+        if (FAILED(hr))
+            return hr;
+
+        // wait until i/o thread is done
+        WaitForSingleObject(pPin->m_hStopEvent, INFINITE);
+
+        pPin->m_State = CurState;
+        return hr;
+    }
+}
+
+
 HRESULT
 WINAPI
 COutputPin_Constructor(