[KS]
authorJohannes Anderwald <johannes.anderwald@reactos.org>
Sun, 4 Apr 2010 21:27:07 +0000 (21:27 +0000)
committerJohannes Anderwald <johannes.anderwald@reactos.org>
Sun, 4 Apr 2010 21:27:07 +0000 (21:27 +0000)
- Add hack to IKsClock_DispatchDeviceIoControl
- Store device state before calling SetDeviceState
- Partly implement KsPinGetLeadingEdgeStreamPointer, KsStreamPointerDelete, KsStreamPointerClone, KsStreamPointerAdvanceOffsets
- Implement a worker routine to dispatch read/write stream requests for pin centric filters
- Tv tuner is now able to transfer MPEG2 TS to user mode, WIP, needs more testing

svn path=/trunk/; revision=46723

reactos/drivers/ksfilter/ks/api.c
reactos/drivers/ksfilter/ks/bag.c
reactos/drivers/ksfilter/ks/clocks.c
reactos/drivers/ksfilter/ks/driver.c
reactos/drivers/ksfilter/ks/filter.c
reactos/drivers/ksfilter/ks/pin.c

index 8ec1186..f1ac9a8 100644 (file)
@@ -94,6 +94,8 @@ KsReleaseDeviceSecurityLock(
 {
     PKSIDEVICE_HEADER Header = (PKSIDEVICE_HEADER)DevHeader;
 
+    DPRINT("KsReleaseDevice\n");
+
     ExReleaseResourceLite(&Header->SecurityLock);
     KeLeaveCriticalRegion();
 }
@@ -1623,7 +1625,7 @@ KsAcquireDevice(
     IKsDevice *KsDevice;
     PKSIDEVICE_HEADER DeviceHeader;
 
-
+    DPRINT("KsAcquireDevice\n");
     DeviceHeader = (PKSIDEVICE_HEADER)CONTAINING_RECORD(Device, KSIDEVICE_HEADER, KsDevice);
 
     /* get device interface*/
index 1b7eec9..d0b76cc 100644 (file)
@@ -89,6 +89,8 @@ KsAddItemToObjectBag(
     PKSIOBJECT_BAG Bag;
     PKSIOBJECT_BAG_ENTRY BagEntry;
 
+    DPRINT("KsAddItemToObjectBag\n");
+
     /* get real object bag */
     Bag = (PKSIOBJECT_BAG)ObjectBag;
 
@@ -363,6 +365,8 @@ _KsEdit(
     PVOID Item;
     NTSTATUS Status;
 
+    DPRINT("_KsEdit\n");
+
     /* get real object bag */
     Bag = (PKSIOBJECT_BAG)ObjectBag;
 
index 3961193..f3e7d7a 100644 (file)
@@ -98,10 +98,10 @@ IKsClock_DispatchDeviceIoControl(
 {
     UNIMPLEMENTED
 
-    Irp->IoStatus.Status = STATUS_NOT_IMPLEMENTED;
+    Irp->IoStatus.Status = STATUS_SUCCESS;
     IoCompleteRequest(Irp, IO_NO_INCREMENT);
 
-    return STATUS_NOT_IMPLEMENTED;
+    return STATUS_SUCCESS;
 }
 
 NTSTATUS
index 949e128..0b6cf43 100644 (file)
@@ -39,6 +39,8 @@ KsGetDevice(
 {
     PKSBASIC_HEADER BasicHeader = (PKSBASIC_HEADER)((ULONG_PTR)Object - sizeof(KSBASIC_HEADER));
 
+    DPRINT("KsGetDevice\n");
+
     ASSERT(BasicHeader->Type == KsObjectTypeFilterFactory || BasicHeader->Type == KsObjectTypeFilter || BasicHeader->Type == KsObjectTypePin);
     ASSERT(BasicHeader->KsDevice);
     ASSERT(BasicHeader->KsDevice->Started);
@@ -152,6 +154,8 @@ KsInitializeDriver(
     PKS_DRIVER_EXTENSION DriverObjectExtension;
     NTSTATUS Status = STATUS_SUCCESS;
 
+    DPRINT("KsInitializeDriver\n");
+
     if (Descriptor)
     {
         Status = IoAllocateDriverObjectExtension(DriverObject, (PVOID)KsInitializeDriver, sizeof(KS_DRIVER_EXTENSION), (PVOID*)&DriverObjectExtension);
index 5c8fe7f..2fb6997 100644 (file)
@@ -1572,6 +1572,8 @@ KsGetFilterFromIrp(
     PIO_STACK_LOCATION IoStack;
     PKSIOBJECT_HEADER ObjectHeader;
 
+    DPRINT("KsGetFilterFromIrp\n");
+
     /* get current irp stack location */
     IoStack = IoGetCurrentIrpStackLocation(Irp);
 
index 70e17e6..a4cc63b 100644 (file)
 
 typedef struct _KSISTREAM_POINTER
 {
-    KSSTREAM_POINTER StreamPointer;
     PFNKSSTREAMPOINTER Callback;
     PIRP Irp;
     KTIMER Timer;
     KDPC TimerDpc;
     struct _KSISTREAM_POINTER *Next;
-
+    PKSPIN Pin;
+    KSSTREAM_POINTER StreamPointer;
 }KSISTREAM_POINTER, *PKSISTREAM_POINTER;
 
 typedef struct
@@ -56,6 +56,12 @@ typedef struct
     IKsReferenceClockVtbl * lpVtblReferenceClock;
     PKSDEFAULTCLOCK DefaultClock;
 
+    PKSWORKER PinWorker;
+    WORK_QUEUE_ITEM PinWorkQueueItem;
+    IRP * Irp;
+    KEVENT FrameComplete;
+
+
 }IKsPinImpl;
 
 NTSTATUS NTAPI IKsPin_PinStatePropertyHandler(IN PIRP Irp, IN PKSIDENTIFIER Request, IN OUT PVOID Data);
@@ -283,6 +289,7 @@ IKsPin_PinStatePropertyHandler(
 
             /* set new state */
             This->Pin.ClientState  = *NewState;
+            This->Pin.DeviceState = *NewState;
 
             /* check if it supported */
             Status = This->Pin.Descriptor->Dispatch->SetDeviceState(&This->Pin, *NewState, OldState);
@@ -293,6 +300,7 @@ IKsPin_PinStatePropertyHandler(
             {
                 /* revert to old state */
                 This->Pin.ClientState = OldState;
+                This->Pin.DeviceState = OldState;
                 DbgBreakPoint();
             }
             else
@@ -892,7 +900,7 @@ KsPinAttemptProcessing(
     IN BOOLEAN  Asynchronous)
 {
     DPRINT("KsPinAttemptProcessing\n");
-DbgBreakPoint();
+    DbgBreakPoint();
     UNIMPLEMENTED
 }
 
@@ -1204,10 +1212,33 @@ KsPinGetLeadingEdgeStreamPointer(
     IN PKSPIN Pin,
     IN KSSTREAM_POINTER_STATE State)
 {
-    UNIMPLEMENTED
-    DPRINT("KsPinGetLeadingEdgeStreamPointer Pin %p State %x\n", Pin, State);
-DbgBreakPoint();
-    return NULL;
+    IKsPinImpl * This;
+
+    This = (IKsPinImpl*)CONTAINING_RECORD(Pin, IKsPinImpl, Pin);
+
+    DPRINT("KsPinGetLeadingEdgeStreamPointer Pin %p State %x Count %lu Remaining %lu\n", Pin, State, 
+           This->LeadingEdgeStreamPointer->StreamPointer.Offset->Count,
+           This->LeadingEdgeStreamPointer->StreamPointer.Offset->Remaining);
+
+    /* sanity check */
+    ASSERT(This->LeadingEdgeStreamPointer);
+    ASSERT(State == KSSTREAM_POINTER_STATE_LOCKED);
+
+    if (State == KSSTREAM_POINTER_STATE_LOCKED)
+    {
+        /* do we have an irp packet */
+        if (!This->Irp)
+        {
+            /* run out of packets */
+            return NULL;
+        }
+
+        if (!This->LeadingEdgeStreamPointer->StreamPointer.Offset->Remaining)
+            return NULL;
+     }
+     DPRINT("LeadingEdge %p\n", &This->LeadingEdgeStreamPointer->StreamPointer);
+     This->LeadingEdgeStreamPointer->Pin = &This->Pin;
+    return &This->LeadingEdgeStreamPointer->StreamPointer;
 }
 
 /*
@@ -1262,8 +1293,8 @@ KsStreamPointerUnlock(
     IN BOOLEAN Eject)
 {
     UNIMPLEMENTED
-    DPRINT("KsStreamPointerUnlock\n");
-DbgBreakPoint();
+    DPRINT("KsStreamPointerUnlock Eject %lu\n", Eject);
+    DbgBreakPoint();
 }
 
 /*
@@ -1278,8 +1309,8 @@ KsStreamPointerAdvanceOffsetsAndUnlock(
     IN ULONG OutUsed,
     IN BOOLEAN Eject)
 {
-    DPRINT("KsStreamPointerAdvanceOffsets\n");
-DbgBreakPoint();
+    DPRINT("KsStreamPointerAdvanceOffsets InUsed %lu OutUsed %lu Eject %lu\n", InUsed, OutUsed, Eject);
+    DbgBreakPoint();
     UNIMPLEMENTED
 }
 
@@ -1294,10 +1325,10 @@ KsStreamPointerDelete(
 {
     IKsPinImpl * This;
     PKSISTREAM_POINTER Cur, Last;
-    PKSISTREAM_POINTER Pointer = (PKSISTREAM_POINTER)StreamPointer;
+    PKSISTREAM_POINTER Pointer = (PKSISTREAM_POINTER)CONTAINING_RECORD(StreamPointer, KSISTREAM_POINTER, StreamPointer);
+
+    DPRINT("KsStreamPointerDelete %p\n", Pointer);
 
-    DPRINT("KsStreamPointerDelete\n");
-DbgBreakPoint();
     This = (IKsPinImpl*)CONTAINING_RECORD(Pointer->StreamPointer.Pin, IKsPinImpl, Pin);
 
     /* point to first stream pointer */
@@ -1332,7 +1363,7 @@ DbgBreakPoint();
 }
 
 /*
-    @unimplemented
+    @implemented
 */
 KSDDKAPI
 NTSTATUS
@@ -1343,14 +1374,57 @@ KsStreamPointerClone(
     IN ULONG ContextSize,
     OUT PKSSTREAM_POINTER* CloneStreamPointer)
 {
-    UNIMPLEMENTED
-    DPRINT("KsStreamPointerClone\n");
-DbgBreakPoint();
-    return STATUS_NOT_IMPLEMENTED;
+    IKsPinImpl * This;
+    PKSISTREAM_POINTER CurFrame;
+    PKSISTREAM_POINTER NewFrame;
+    ULONG RefCount;
+    ULONG Size;
+
+    DPRINT("KsStreamPointerClone StreamPointer %p CancelCallback %p ContextSize %p CloneStreamPointer %p\n", StreamPointer, CancelCallback, ContextSize, CloneStreamPointer);
+
+    /* get stream pointer */
+    CurFrame = (PKSISTREAM_POINTER)CONTAINING_RECORD(StreamPointer, KSISTREAM_POINTER, StreamPointer);
+
+    /* calculate context size */
+    Size = sizeof(KSISTREAM_POINTER) + ContextSize;
+
+    /* allocate new stream pointer */
+    NewFrame = (PKSISTREAM_POINTER)ExAllocatePool(NonPagedPool, Size);
+
+    if (!NewFrame)
+        return STATUS_INSUFFICIENT_RESOURCES;
+
+    /* get current irp stack location */
+    RefCount = (ULONG)CurFrame->Irp->Tail.Overlay.DriverContext[0];
+
+    /* increment reference count */
+    RefCount++;
+    CurFrame->Irp->Tail.Overlay.DriverContext[0] = (PVOID)RefCount;
+
+    /* copy stream pointer */
+    RtlMoveMemory(NewFrame, CurFrame, sizeof(KSISTREAM_POINTER));
+
+    if (ContextSize)
+        NewFrame->StreamPointer.Context = (NewFrame + 1);
+
+    /* locate pin */
+    This = (IKsPinImpl*)CONTAINING_RECORD(CurFrame->Pin, IKsPinImpl, Pin);
+
+    NewFrame->StreamPointer.Pin = &This->Pin;
+
+    ASSERT(NewFrame->StreamPointer.Pin);
+    ASSERT(NewFrame->StreamPointer.Context);
+    ASSERT(NewFrame->StreamPointer.Offset);
+    ASSERT(NewFrame->StreamPointer.StreamHeader);
+
+    /* store result */
+    *CloneStreamPointer = &NewFrame->StreamPointer;
+
+    return STATUS_SUCCESS;
 }
 
 /*
-    @unimplemented
+    @implemented
 */
 KSDDKAPI
 NTSTATUS
@@ -1361,8 +1435,58 @@ KsStreamPointerAdvanceOffsets(
     IN ULONG OutUsed,
     IN BOOLEAN Eject)
 {
-    UNIMPLEMENTED
-    return STATUS_NOT_IMPLEMENTED;
+    PKSISTREAM_POINTER CurFrame;
+    IKsPinImpl * This;
+
+    DPRINT("KsStreamPointerAdvanceOffsets      InUsed %lu OutUsed %lu Eject %lu\n", InUsed, OutUsed, Eject);
+
+    /* get stream pointer */
+    CurFrame = (PKSISTREAM_POINTER)CONTAINING_RECORD(StreamPointer, KSISTREAM_POINTER, StreamPointer);
+
+    CurFrame->StreamPointer.OffsetIn.Remaining -= InUsed;
+    CurFrame->StreamPointer.OffsetOut.Remaining -= OutUsed;
+    CurFrame->StreamPointer.OffsetIn.Count -= InUsed;
+    CurFrame->StreamPointer.OffsetOut.Count -= OutUsed;
+    CurFrame->StreamPointer.OffsetIn.Data = (PVOID)((ULONG_PTR)CurFrame->StreamPointer.OffsetIn.Data + InUsed);
+    CurFrame->StreamPointer.OffsetOut.Data = (PVOID)((ULONG_PTR)CurFrame->StreamPointer.OffsetOut.Data + OutUsed);
+
+    if (!CurFrame->StreamPointer.OffsetIn.Remaining)
+        CurFrame->StreamPointer.OffsetIn.Data = NULL;
+
+    if (!CurFrame->StreamPointer.OffsetOut.Remaining)
+        CurFrame->StreamPointer.OffsetOut.Data = NULL;
+
+    /* locate pin */
+    This = (IKsPinImpl*)CONTAINING_RECORD(CurFrame->Pin, IKsPinImpl, Pin);
+
+    if (This->Pin.Descriptor->PinDescriptor.DataFlow == KSPIN_DATAFLOW_IN)
+    {
+        if (CurFrame->StreamPointer.OffsetIn.Remaining == 0)
+        {
+            /* get next mapping */
+            This->Irp = KsRemoveIrpFromCancelableQueue(&This->IrpList, &This->IrpListLock, KsListEntryHead, KsAcquireAndRemoveOnlySingleItem);
+            if (!This->Irp)
+                return STATUS_DEVICE_NOT_READY;
+
+            /* FIXME handle me */
+            ASSERT(0);
+        }
+    }
+    else
+    {
+        if (CurFrame->StreamPointer.OffsetOut.Remaining == 0)
+        {
+            /* get next mapping */
+            This->Irp = KsRemoveIrpFromCancelableQueue(&This->IrpList, &This->IrpListLock, KsListEntryHead, KsAcquireAndRemoveOnlySingleItem);
+            if (!This->Irp)
+                return STATUS_DEVICE_NOT_READY;
+
+            /* FIXME handle me */
+            ASSERT(0);
+        }
+    }
+
+    return STATUS_SUCCESS;
 }
 
 /*
@@ -1458,7 +1582,7 @@ KsPinGetFirstCloneStreamPointer(
     IKsPinImpl * This;
 
     DPRINT("KsPinGetFirstCloneStreamPointer %p\n", Pin);
-DbgBreakPoint();
+    DbgBreakPoint();
     This = (IKsPinImpl*)CONTAINING_RECORD(Pin, IKsPinImpl, Pin);
     /* return first cloned stream pointer */
     return &This->ClonedStreamPointer->StreamPointer;
@@ -1476,7 +1600,7 @@ KsStreamPointerGetNextClone(
     PKSISTREAM_POINTER Pointer = (PKSISTREAM_POINTER)StreamPointer;
 
     DPRINT("KsStreamPointerGetNextClone\n");
-DbgBreakPoint();
+    DbgBreakPoint();
     /* is there a another cloned stream pointer */
     if (!Pointer->Next)
         return NULL;
@@ -1484,7 +1608,101 @@ DbgBreakPoint();
     /* return next stream pointer */
     return &Pointer->Next->StreamPointer;
 }
+
+VOID
+NTAPI
+IKsPin_PinCentricWorker(
+    IN PVOID Parameter)
+{
+    PIO_STACK_LOCATION IoStack;
+    PKSSTREAM_HEADER Header;
+    ULONG NumHeaders;
+    NTSTATUS Status;
+    PIRP Irp;
+    IKsPinImpl * This = (IKsPinImpl*)Parameter;
+
+    DPRINT("IKsPin_PinCentricWorker\n");
+
+    /* sanity checks */
+    ASSERT(This);
+    ASSERT(This->Pin.Descriptor);
+    ASSERT(This->Pin.Descriptor->Dispatch);
+    ASSERT(This->Pin.Descriptor->Dispatch->Process);
+    ASSERT(KeGetCurrentIrql() == PASSIVE_LEVEL);
+    ASSERT(!(This->Pin.Descriptor->Flags & KSPIN_FLAG_DISPATCH_LEVEL_PROCESSING));
+    ASSERT(This->LeadingEdgeStreamPointer);
+
+    do
+    {
+        /* do we have an irp packet */
+        if (!This->Irp)
+        {
+            /* fetch new irp packet */
+            This->Irp = KsRemoveIrpFromCancelableQueue(&This->IrpList, &This->IrpListLock, KsListEntryHead, KsAcquireAndRemoveOnlySingleItem);
+
+            if (!This->Irp)
+            {
+                /* reached last packet */
+                break;
+            }
+        }
+
+        /* get current irp stack location */
+        IoStack = IoGetCurrentIrpStackLocation(This->Irp);
+
+       if (This->Irp->RequestorMode == UserMode)
+           This->LeadingEdgeStreamPointer->StreamPointer.StreamHeader = Header = (PKSSTREAM_HEADER)This->Irp->AssociatedIrp.SystemBuffer;
+       else
+           This->LeadingEdgeStreamPointer->StreamPointer.StreamHeader = Header = (PKSSTREAM_HEADER)This->Irp->UserBuffer;
+
+        /* calculate num headers */
+        NumHeaders = IoStack->Parameters.DeviceIoControl.OutputBufferLength / Header->Size;
+
+        /* assume headers of same length */
+        ASSERT(IoStack->Parameters.DeviceIoControl.OutputBufferLength % Header->Size == 0);
+
+        /* FIXME support multiple stream headers */
+       ASSERT(NumHeaders == 1);
+
+        if (This->Irp->RequestorMode == UserMode)
+        {
+            /* prepare header */
+            Header->Data = MmGetSystemAddressForMdlSafe(This->Irp->MdlAddress, NormalPagePriority);
+        }
+
+        /* set up stream pointer */
+        This->LeadingEdgeStreamPointer->Irp = Irp = This->Irp;
+        This->LeadingEdgeStreamPointer->StreamPointer.Context = NULL;
+        This->LeadingEdgeStreamPointer->StreamPointer.Pin = &This->Pin;
+        This->LeadingEdgeStreamPointer->StreamPointer.OffsetOut.Count = max(Header->DataUsed, Header->FrameExtent);
+        This->LeadingEdgeStreamPointer->StreamPointer.OffsetOut.Data = Header->Data;
+        This->LeadingEdgeStreamPointer->StreamPointer.OffsetOut.Count = max(Header->DataUsed, Header->FrameExtent);
+        This->LeadingEdgeStreamPointer->StreamPointer.OffsetOut.Remaining = max(Header->DataUsed, Header->FrameExtent);
+        This->LeadingEdgeStreamPointer->Pin = &This->Pin;
+
+        DPRINT("IKsPin_PinCentricWorker calling Pin Process Routine\n");
+
+            Status = This->Pin.Descriptor->Dispatch->Process(&This->Pin);
+            DPRINT("IKsPin_PinCentricWorker Status %lx, Count %lu Remaining %lu\n", Status,
+                   This->LeadingEdgeStreamPointer->StreamPointer.Offset->Count,
+                   This->LeadingEdgeStreamPointer->StreamPointer.Offset->Remaining);
+
+        ASSERT(Status != STATUS_PENDING);
+
+        // HACK complete irp
+        Irp->IoStatus.Information = max(Header->DataUsed, Header->FrameExtent);
+        Irp->IoStatus.Status = Status;
+        IoCompleteRequest(Irp, IO_NO_INCREMENT);
+        KsDecrementCountedWorker(This->PinWorker);
+
+
+        break;
+
+    }while(TRUE);
+}
+
 NTSTATUS
+NTAPI
 IKsPin_DispatchKsStream(
     PDEVICE_OBJECT DeviceObject,
     PIRP Irp,
@@ -1492,6 +1710,7 @@ IKsPin_DispatchKsStream(
 {
     PKSPROCESSPIN_INDEXENTRY ProcessPinIndex;
     PKSFILTER Filter;
+    PIO_STACK_LOCATION IoStack;
     NTSTATUS Status = STATUS_SUCCESS;
 
     DPRINT("IKsPin_DispatchKsStream\n");
@@ -1499,18 +1718,41 @@ IKsPin_DispatchKsStream(
     /* FIXME handle reset states */
     ASSERT(This->Pin.ResetState == KSRESET_END);
 
-    /* mark irp as pending */
-    IoMarkIrpPending(Irp);
+    /* get current stack location */
+    IoStack = IoGetCurrentIrpStackLocation(Irp);
 
-    /* add irp to cancelable queue */
-    KsAddIrpToCancelableQueue(&This->IrpList, &This->IrpListLock, Irp, KsListEntryTail, NULL /* FIXME */);
+    if (IoStack->Parameters.DeviceIoControl.IoControlCode == IOCTL_KS_WRITE_STREAM)
+        Status = KsProbeStreamIrp(Irp, KSSTREAM_WRITE | KSPROBE_ALLOCATEMDL | KSPROBE_PROBEANDLOCK | KSPROBE_SYSTEMADDRESS, This->Pin.StreamHeaderSize);
+    else
+        Status = KsProbeStreamIrp(Irp, KSSTREAM_READ  | KSPROBE_ALLOCATEMDL | KSPROBE_PROBEANDLOCK | KSPROBE_SYSTEMADDRESS, This->Pin.StreamHeaderSize);
+
+    if (!NT_SUCCESS(Status))
+    {
+        DPRINT1("KsProbeStreamIrp failed with %x\n", Status);
+
+        Irp->IoStatus.Status = Status;
+        IoCompleteRequest(Irp, IO_NO_INCREMENT);
+        return Status;
+    }
 
     if (This->Pin.Descriptor->Dispatch->Process)
     {
         /* it is a pin centric avstream */
-        ASSERT(0);
-        //Status = This->Pin.Descriptor->Dispatch->Process(&This->Pin);
-        /* TODO */
+
+        /* mark irp as pending */
+        IoMarkIrpPending(Irp);
+
+        /* add irp to cancelable queue */
+        KsAddIrpToCancelableQueue(&This->IrpList, &This->IrpListLock, Irp, KsListEntryTail, NULL /* FIXME */);
+
+        /* sanity checks */
+        ASSERT(!(This->Pin.Descriptor->Flags & KSPIN_FLAG_DISPATCH_LEVEL_PROCESSING));
+        ASSERT(This->PinWorker);
+
+        /* start the processing loop */
+        KsIncrementCountedWorker(This->PinWorker);
+
+        Status = STATUS_PENDING;
     }
     else
     {
@@ -1534,9 +1776,17 @@ IKsPin_DispatchKsStream(
             return STATUS_UNSUCCESSFUL;
         }
 
+        /* mark irp as pending */
+        IoMarkIrpPending(Irp);
+
+        /* add irp to cancelable queue */
+        KsAddIrpToCancelableQueue(&This->IrpList, &This->IrpListLock, Irp, KsListEntryTail, NULL /* FIXME */);
+
+
         Status = Filter->Descriptor->Dispatch->Process(Filter, ProcessPinIndex);
 
         DPRINT("IKsPin_DispatchKsStream FilterCentric: Status %lx \n", Status);
+
     }
 
     return Status;
@@ -2037,6 +2287,41 @@ KspCreatePin(
             return Status;
         }
     }
+    else if (Descriptor->Dispatch && Descriptor->Dispatch->Process)
+    {
+        /* pin centric processing filter */
+
+        /* allocate leading stream pointer */
+        Status = _KsEdit(This->Pin.Bag, (PVOID*)&This->LeadingEdgeStreamPointer, sizeof(KSISTREAM_POINTER), sizeof(KSISTREAM_POINTER), 0);
+
+        /* FIXME cleanup */
+        ASSERT(Status == STATUS_SUCCESS);
+
+        /* FIXME cleanup */
+        ASSERT(Status == STATUS_SUCCESS);
+
+        /* setup stream pointer offset */
+         This->LeadingEdgeStreamPointer->StreamPointer.Offset = &This->LeadingEdgeStreamPointer->StreamPointer.OffsetOut;
+
+        /* initialize work item */
+        ExInitializeWorkItem(&This->PinWorkQueueItem, IKsPin_PinCentricWorker, (PVOID)This);
+
+        /* allocate counted work item */
+        Status = KsRegisterCountedWorker(HyperCriticalWorkQueue, &This->PinWorkQueueItem, &This->PinWorker);
+
+        if (!NT_SUCCESS(Status))
+        {
+            DPRINT("Failed to register Worker %lx\n", Status);
+            KsFreeObjectBag((KSOBJECT_BAG)This->Pin.Bag);
+            KsFreeObjectHeader(&This->ObjectHeader);
+            FreeItem(This);
+            FreeItem(CreateItem);
+            return Status;
+        }
+
+        KeInitializeEvent(&This->FrameComplete, NotificationEvent, FALSE);
+
+    }
 
     /* FIXME add pin instance to filter instance */
 
@@ -2045,7 +2330,6 @@ KspCreatePin(
     {
         Status = Descriptor->Dispatch->SetDataFormat(&This->Pin, NULL, NULL, This->Pin.ConnectionFormat, NULL);
         DPRINT("KspCreatePin SetDataFormat %lx\n", Status);
-        DbgBreakPoint();
     }
 
 
@@ -2058,7 +2342,6 @@ KspCreatePin(
     }
 
 
-
     DPRINT("KspCreatePin Status %lx\n", Status);
 
     if (!NT_SUCCESS(Status) && Status != STATUS_PENDING)