- Guarded the calls to IoSetCancelRoutine with IoAcquireCancelSpinLock/IoReleaseCance...
authorHartmut Birr <osexpert@googlemail.com>
Wed, 23 Mar 2005 22:11:20 +0000 (22:11 +0000)
committerHartmut Birr <osexpert@googlemail.com>
Wed, 23 Mar 2005 22:11:20 +0000 (22:11 +0000)
- Used a fastmutex as lock for the data queue.
- Used paged pool for the data buffers.
- Allowed the server to read (and to wait) on a listening pipe.
- Implemented the non blocking read operations.

svn path=/trunk/; revision=14296

reactos/drivers/fs/np/create.c
reactos/drivers/fs/np/fsctrl.c
reactos/drivers/fs/np/npfs.c
reactos/drivers/fs/np/npfs.h
reactos/drivers/fs/np/rw.c

index 4e2c633..3ab8deb 100644 (file)
@@ -49,6 +49,7 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
 {
   PLIST_ENTRY CurrentEntry;
   PNPFS_WAITER_ENTRY Waiter;
+  KIRQL oldIrql;
 
   CurrentEntry = Pipe->WaiterListHead.Flink;
   while (CurrentEntry != &Pipe->WaiterListHead)
@@ -58,11 +59,15 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
           !Waiter->Irp->Cancel)
        {
          DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
-
-          if (IoSetCancelRoutine(Waiter->Irp, NULL) != NULL)
-            {
+  
+         IoAcquireCancelSpinLock(&oldIrql);
+          if (!Waiter->Irp->Cancel)
+           {
+             IoSetCancelRoutine(Waiter->Irp, NULL);
+              IoReleaseCancelSpinLock(oldIrql);
               return Waiter->Fcb;
             }
+          IoReleaseCancelSpinLock(oldIrql);
        }
 
       CurrentEntry = CurrentEntry->Flink;
@@ -174,7 +179,7 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
   /* Initialize data list. */
   if (Pipe->OutboundQuota)
     {
-      ClientFcb->Data = ExAllocatePool(NonPagedPool, Pipe->OutboundQuota);
+      ClientFcb->Data = ExAllocatePool(PagedPool, Pipe->OutboundQuota);
       if (ClientFcb->Data == NULL)
         {
           DPRINT("No memory!\n");
@@ -195,7 +200,7 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
   ClientFcb->ReadDataAvailable = 0;
   ClientFcb->WriteQuotaAvailable = Pipe->OutboundQuota;
   ClientFcb->MaxDataLength = Pipe->OutboundQuota;
-  KeInitializeSpinLock(&ClientFcb->DataListLock);
+  ExInitializeFastMutex(&ClientFcb->DataListLock);
   KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE);
   KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
 
@@ -455,13 +460,17 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
 
    if (Pipe->InboundQuota)
      {
-       Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota);
+       Fcb->Data = ExAllocatePool(PagedPool, Pipe->InboundQuota);
        if (Fcb->Data == NULL)
          {
            ExFreePool(Fcb);
 
            if (NewPipe)
              {
+               /* 
+                * FIXME:
+               *   Lock the pipelist and remove the pipe from the list.
+               */
                RtlFreeUnicodeString(&Pipe->PipeName);
                ExFreePool(Pipe);
              }
@@ -481,7 +490,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
    Fcb->ReadDataAvailable = 0;
    Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
    Fcb->MaxDataLength = Pipe->InboundQuota;
-   KeInitializeSpinLock(&Fcb->DataListLock);
+   ExInitializeFastMutex(&Fcb->DataListLock);
 
    Pipe->CurrentInstances++;
 
index cc6ed88..715b84a 100644 (file)
@@ -46,6 +46,7 @@ NpfsAddListeningServerInstance(PIRP Irp,
                               PNPFS_FCB Fcb)
 {
   PNPFS_WAITER_ENTRY Entry;
+  KIRQL oldIrql;
 
   Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
   if (Entry == NULL)
@@ -61,13 +62,15 @@ NpfsAddListeningServerInstance(PIRP Irp,
   Irp->Tail.Overlay.DriverContext[0] = Entry;
   InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
 
-  IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
-  
+  IoAcquireCancelSpinLock(&oldIrql);
   if (!Irp->Cancel)
     {
+      IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
+      IoReleaseCancelSpinLock(oldIrql);
       KeUnlockMutex(&Fcb->Pipe->FcbListLock);
       return STATUS_PENDING;
     }
+  IoReleaseCancelSpinLock(oldIrql);
   
   RemoveEntryList(&Entry->Entry);
   
index 9075ac7..8ac5c48 100644 (file)
@@ -73,8 +73,10 @@ DriverEntry(PDRIVER_OBJECT DriverObject,
    /* initialize the device extension */
    DeviceExtension = DeviceObject->DeviceExtension;
    InitializeListHead(&DeviceExtension->PipeListHead);
+   InitializeListHead(&DeviceExtension->ThreadListHead);
    KeInitializeMutex(&DeviceExtension->PipeListLock,
                     0);
+   DeviceExtension->EmptyWaiterCount = 0;
 
    /* set the size quotas */
    DeviceExtension->MinQuota = PAGE_SIZE;
index 1038411..a7a6360 100644 (file)
@@ -6,7 +6,9 @@
 typedef struct _NPFS_DEVICE_EXTENSION
 {
   LIST_ENTRY PipeListHead;
+  LIST_ENTRY ThreadListHead;
   KMUTEX PipeListLock;
+  ULONG EmptyWaiterCount;
   ULONG MinQuota;
   ULONG DefaultQuota;
   ULONG MaxQuota;
@@ -20,6 +22,7 @@ typedef struct _NPFS_PIPE
   LIST_ENTRY ServerFcbListHead;
   LIST_ENTRY ClientFcbListHead;
   LIST_ENTRY WaiterListHead;
+  LIST_ENTRY EmptyBufferListHead;
   ULONG PipeType;
   ULONG ReadMode;
   ULONG WriteMode;
@@ -50,9 +53,29 @@ typedef struct _NPFS_FCB
   PVOID WritePtr;
   ULONG MaxDataLength;
 
-  KSPIN_LOCK DataListLock;     /* Data queue lock */
+  FAST_MUTEX DataListLock;     /* Data queue lock */
 } NPFS_FCB, *PNPFS_FCB;
 
+typedef struct _NPFS_CONTEXT
+{
+  PDEVICE_OBJECT DeviceObject;
+  PIRP Irp;
+  PNPFS_FCB Fcb;
+  UCHAR MajorFunction;
+  BOOLEAN AllocatedFromPool;
+} NPFS_CONTEXT, *PNPFS_CONTEXT;
+
+typedef struct _NPFS_THREAD_CONTEXT
+{
+  ULONG Count;
+  KEVENT Event;
+  PNPFS_DEVICE_EXTENSION DeviceExt;
+  LIST_ENTRY ListEntry;
+  PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
+  KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
+  PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
+} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
+
 typedef struct _NPFS_WAITER_ENTRY
 {
   LIST_ENTRY Entry;
index 254f4b1..10b085c 100644 (file)
@@ -46,16 +46,197 @@ VOID HexDump(PUCHAR Buffer, ULONG Length)
 }
 #endif
 
+static NTSTATUS
+NpfsReadFromPipe(PNPFS_CONTEXT Context);
 
-NTSTATUS STDCALL
-NpfsRead(PDEVICE_OBJECT DeviceObject,
-        PIRP Irp)
+static VOID STDCALL 
+NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+                         IN PIRP Irp)
+{
+   PNPFS_CONTEXT Context;
+   PNPFS_DEVICE_EXTENSION DeviceExt;
+
+   DPRINT1("NpfsWaitingCancelRoutine() called\n");
+
+   IoReleaseCancelSpinLock(Irp->CancelIrql);
+
+   Context = Irp->Tail.Overlay.DriverContext[0];
+   DeviceExt = Context->DeviceObject->DeviceExtension;
+
+   KeLockMutex(&DeviceExt->PipeListLock);
+   KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
+   KeUnlockMutex(&DeviceExt->PipeListLock);
+}
+
+static VOID STDCALL
+NpfsWaiterThread(PVOID Context)
+{
+   PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
+   ULONG CurrentCount, Count = 0;
+   PNPFS_CONTEXT WaitContext = NULL;
+   NTSTATUS Status;
+   BOOLEAN Terminate = FALSE;
+   BOOLEAN Cancel = FALSE;
+   KIRQL oldIrql;
+
+   KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+
+   while (1)
+     {
+       CurrentCount = ThreadContext->Count;
+       KeResetEvent(&ThreadContext->Event);
+       KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+       if (WaitContext)
+         {
+           if (Cancel)
+             {
+              WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
+               WaitContext->Irp->IoStatus.Information = 0;
+               IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
+              ExFreePool(WaitContext);
+            }
+          else
+            {
+              switch (WaitContext->MajorFunction)
+                {
+                  case IRP_MJ_READ:
+                     NpfsReadFromPipe(WaitContext);
+                    break;
+                  default:
+                    KEBUGCHECK(0);
+                }
+            }
+         }
+       if (Terminate)
+         {
+          break;
+        }
+       Status = KeWaitForMultipleObjects(CurrentCount,
+                                        ThreadContext->WaitObjectArray,
+                                        WaitAny,
+                                        Executive,
+                                        KernelMode,
+                                        FALSE,
+                                        NULL,
+                                        ThreadContext->WaitBlockArray);
+       KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+       if (!NT_SUCCESS(Status))
+         {
+           KEBUGCHECK(0);
+         }
+       Count = Status - STATUS_SUCCESS;
+       ASSERT (Count <= CurrentCount);
+       if (Count > 0)
+         {
+          WaitContext = ThreadContext->WaitContextArray[Count];
+          ThreadContext->Count--;
+          ThreadContext->DeviceExt->EmptyWaiterCount++;
+          ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
+          ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count];
+           IoAcquireCancelSpinLock(&oldIrql);
+          Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
+           IoReleaseCancelSpinLock(oldIrql);
+        }
+      else
+        {
+         /* someone has add a new wait request */
+          WaitContext = NULL;
+        }
+      if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
+        {
+          /* it exist an other thread with empty wait slots, we can remove our thread from the list */
+          RemoveEntryList(&ThreadContext->ListEntry);
+          ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
+         Terminate = TRUE;
+        }
+     }
+   KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+   ExFreePool(ThreadContext);
+}
+
+static NTSTATUS
+NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb)
+{
+   PLIST_ENTRY ListEntry;
+   PNPFS_THREAD_CONTEXT ThreadContext;
+   NTSTATUS Status;
+   HANDLE hThread;
+   KIRQL oldIrql;
+
+   KeLockMutex(&DeviceExt->PipeListLock);
+
+   ListEntry = DeviceExt->ThreadListHead.Flink;
+   while (ListEntry != &DeviceExt->ThreadListHead)
+     {
+       ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
+       if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
+         {
+           break;
+         }
+       ListEntry = ListEntry->Flink;
+     }
+   if (ListEntry == &DeviceExt->ThreadListHead)
+     {
+       ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
+       if (ThreadContext == NULL)
+         {
+           KeUnlockMutex(&DeviceExt->PipeListLock);
+           return STATUS_NO_MEMORY;
+         }
+       ThreadContext->DeviceExt = DeviceExt;
+       KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE);
+       ThreadContext->Count = 1;
+       ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
+
+   
+       DPRINT("Creating a new system thread for waiting read requests\n");
+      
+       Status = PsCreateSystemThread(&hThread,
+                                    THREAD_ALL_ACCESS,
+                                    NULL,
+                                    NULL,
+                                    NULL,
+                                    NpfsWaiterThread,
+                                    (PVOID)ThreadContext);
+       if (!NT_SUCCESS(Status))
+         {
+           ExFreePool(ThreadContext);
+           KeUnlockMutex(&DeviceExt->PipeListLock);
+           return Status;
+        }
+       InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
+       DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;       
+     }
+   IoMarkIrpPending(Context->Irp);
+   Context->Irp->Tail.Overlay.DriverContext[0] = Context;
+
+   IoAcquireCancelSpinLock(&oldIrql);
+   if (Context->Irp->Cancel)
+     {
+       IoReleaseCancelSpinLock(oldIrql);
+       Status = STATUS_CANCELLED;
+     }
+   else
+     {
+       IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
+       IoReleaseCancelSpinLock(oldIrql);
+       ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event;
+       ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
+       ThreadContext->Count++;
+       DeviceExt->EmptyWaiterCount--;
+       KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
+       Status = STATUS_SUCCESS;
+     }
+   KeUnlockMutex(&DeviceExt->PipeListLock);
+   return Status;
+}
+
+static NTSTATUS
+NpfsReadFromPipe(PNPFS_CONTEXT Context)
 {
   PIO_STACK_LOCATION IoStack;
   PFILE_OBJECT FileObject;
   NTSTATUS Status;
-  PNPFS_DEVICE_EXTENSION DeviceExt;
-  KIRQL OldIrql;
   ULONG Information;
   PNPFS_FCB Fcb;
   PNPFS_FCB WriterFcb;
@@ -65,23 +246,14 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
   ULONG CopyLength;
   ULONG TempLength;
 
-  DPRINT("NpfsRead(DeviceObject %p  Irp %p)\n", DeviceObject, Irp);
+  DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
 
-  DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
-  IoStack = IoGetCurrentIrpStackLocation(Irp);
+  IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
   FileObject = IoStack->FileObject;
   Fcb = FileObject->FsContext;
   Pipe = Fcb->Pipe;
   WriterFcb = Fcb->OtherSide;
 
-  if (Irp->MdlAddress == NULL)
-    {
-      DPRINT("Irp->MdlAddress == NULL\n");
-      Status = STATUS_UNSUCCESSFUL;
-      Information = 0;
-      goto done;
-    }
-
   if (Fcb->Data == NULL)
     {
       DPRINT("Pipe is NOT readable!\n");
@@ -94,41 +266,71 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
   Length = IoStack->Parameters.Read.Length;
   Information = 0;
 
-  Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
-  KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+  Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
+  ExAcquireFastMutex(&Fcb->DataListLock);
   while (1)
     {
-      /* FIXME: check if in blocking mode */
       if (Fcb->ReadDataAvailable == 0)
        {
          if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
            {
              KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
            }
-         KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+         ExReleaseFastMutex(&Fcb->DataListLock);
          if (Information > 0)
            {
              Status = STATUS_SUCCESS;
              goto done;
            }
 
-         if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+         if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE &&
+             !(Fcb->PipeState == FILE_PIPE_LISTENING_STATE && Fcb->PipeEnd == FILE_PIPE_SERVER_END))
            {
              DPRINT("PipeState: %x\n", Fcb->PipeState);
              Status = STATUS_PIPE_BROKEN;
              goto done;
            }
 
-         /* Wait for ReadEvent to become signaled */
-         DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
-         Status = KeWaitForSingleObject(&Fcb->Event,
-                                        UserRequest,
-                                        KernelMode,
-                                        FALSE,
-                                        NULL);
-         DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+         if (IoIsOperationSynchronous(Context->Irp))
+           {
+             /* Wait for ReadEvent to become signaled */
+             DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
+             Status = KeWaitForSingleObject(&Fcb->Event,
+                                            UserRequest,
+                                            KernelMode,
+                                            FALSE,
+                                            NULL);
+             DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+           }
+         else
+           {
+             PNPFS_CONTEXT NewContext;
 
-         KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+             NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT));
+             if (NewContext == NULL)
+               {
+                  Status = STATUS_NO_MEMORY;
+                  goto done;
+               }
+             memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
+             NewContext->AllocatedFromPool = TRUE;
+             NewContext->Fcb = Fcb;
+             NewContext->MajorFunction = IRP_MJ_READ;
+
+             Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb);
+                
+             if (NT_SUCCESS(Status))
+               {
+                 Status = STATUS_PENDING;
+               }
+             else
+               {
+                 ExFreePool(NewContext);
+               }
+             goto done;
+           }
+
+         ExAcquireFastMutex(&Fcb->DataListLock);
        }
 
       if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
@@ -217,19 +419,47 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
        }
     }
 
-  KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+  ExReleaseFastMutex(&Fcb->DataListLock);
 
 done:
-  Irp->IoStatus.Status = Status;
-  Irp->IoStatus.Information = Information;
+  Context->Irp->IoStatus.Status = Status;
+  Context->Irp->IoStatus.Information = Information;
 
-  IoCompleteRequest(Irp, IO_NO_INCREMENT);
+  if (Status != STATUS_PENDING)
+    {
+      IoCompleteRequest(Context->Irp, IO_NO_INCREMENT);
+    }
 
+  if (Context->AllocatedFromPool)
+    {
+      ExFreePool(Context);
+    }
   DPRINT("NpfsRead done (Status %lx)\n", Status);
 
   return Status;
 }
 
+NTSTATUS STDCALL
+NpfsRead(PDEVICE_OBJECT DeviceObject,
+        PIRP Irp)
+{
+  NPFS_CONTEXT Context;
+
+  Context.AllocatedFromPool = FALSE;
+  Context.DeviceObject = DeviceObject;
+  Context.Irp = Irp;
+  
+  if (Irp->MdlAddress == NULL)
+    {
+      DPRINT("Irp->MdlAddress == NULL\n");
+      Irp->IoStatus.Status = STATUS_UNSUCCESSFUL;
+      Irp->IoStatus.Information = 0;
+      IoCompleteRequest(Irp, IO_NO_INCREMENT);
+      return STATUS_UNSUCCESSFUL;
+    }
+
+  return NpfsReadFromPipe(&Context);
+}
 
 NTSTATUS STDCALL
 NpfsWrite(PDEVICE_OBJECT DeviceObject,
@@ -244,7 +474,6 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
   NTSTATUS Status = STATUS_SUCCESS;
   ULONG Length;
   ULONG Offset;
-  KIRQL OldIrql;
   ULONG Information;
   ULONG CopyLength;
   ULONG TempLength;
@@ -296,7 +525,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
   Status = STATUS_SUCCESS;
   Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
 
-  KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+  ExAcquireFastMutex(&ReaderFcb->DataListLock);
 #ifndef NDEBUG
   DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
   HexDump(Buffer, Length);
@@ -307,7 +536,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
       if (ReaderFcb->WriteQuotaAvailable == 0)
        {
          KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
-         KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+         ExReleaseFastMutex(&ReaderFcb->DataListLock);
          if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
            {
              Status = STATUS_PIPE_BROKEN;
@@ -332,7 +561,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
              Status = STATUS_PIPE_BROKEN;
              goto done;
            }
-         KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+         ExAcquireFastMutex(&ReaderFcb->DataListLock);
        }
 
       if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
@@ -395,7 +624,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
        }
     }
 
-  KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+  ExReleaseFastMutex(&ReaderFcb->DataListLock);
 
 done:
   Irp->IoStatus.Status = Status;