Modified reading of pipes.
authorHartmut Birr <osexpert@googlemail.com>
Tue, 7 May 2002 22:41:22 +0000 (22:41 +0000)
committerHartmut Birr <osexpert@googlemail.com>
Tue, 7 May 2002 22:41:22 +0000 (22:41 +0000)
Fixed some bugs.

svn path=/trunk/; revision=2936

reactos/drivers/fs/np/npfs.h
reactos/drivers/fs/np/rw.c

index b8f2697..0309cf1 100644 (file)
@@ -1,3 +1,5 @@
+/* $Id: npfs.h,v 1.11 2002/05/07 22:41:22 hbirr Exp $ */
+
 #ifndef __SERVICES_FS_NP_NPFS_H
 #define __SERVICES_FS_NP_NPFS_H
 
@@ -66,6 +68,17 @@ extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;
 
 #define CP DPRINT("\n");
 
+static inline VOID
+NpfsFreePipeData(PNPFS_PIPE_DATA PipeData)
+{
+  if (PipeData->Data)
+    {
+      ExFreePool(PipeData->Data);
+    }
+  ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData);
+}
+
+
 NTSTATUS STDCALL NpfsCreate(PDEVICE_OBJECT DeviceObject, PIRP Irp);
 NTSTATUS STDCALL NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, PIRP Irp);
 NTSTATUS STDCALL NpfsClose(PDEVICE_OBJECT DeviceObject, PIRP Irp);
index 00e2c7b..5988aeb 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: rw.c,v 1.5 2001/11/20 20:34:29 ekohl Exp $
+/* $Id: rw.c,v 1.6 2002/05/07 22:41:22 hbirr Exp $
  *
  * COPYRIGHT:  See COPYING in the top level directory
  * PROJECT:    ReactOS kernel
 
 /* FUNCTIONS *****************************************************************/
 
-static inline VOID
-NpfsFreePipeData(PNPFS_PIPE_DATA PipeData)
-{
-  if (PipeData->Data)
-    {
-      ExFreePool(PipeData->Data);
-    }
-
-  ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData);
-}
-
-
 static inline PNPFS_PIPE_DATA
 NpfsAllocatePipeData(PVOID Data,
                     ULONG Size)
@@ -94,7 +82,6 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
   ULONG Length;
   PVOID Buffer;
   ULONG CopyLength;
-  BOOLEAN DataListEmpty;
 
   DPRINT("NpfsRead(DeviceObject %p  Irp %p)\n", DeviceObject, Irp);
   
@@ -109,7 +96,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
     {
       DPRINT("Pipe is NOT connected!\n");
       Status = STATUS_UNSUCCESSFUL;
-      Length = 0;
+      Information = 0;
       goto done;
     }
 
@@ -117,99 +104,120 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
     {
       DPRINT("Irp->MdlAddress == NULL\n");
       Status = STATUS_UNSUCCESSFUL;
-      Length = 0;
+      Information = 0;
       goto done;
     }
 
   Status = STATUS_SUCCESS;
   Length = IoStack->Parameters.Read.Length;
+  Information = 0;
 
   Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
   DPRINT("Length %d Buffer %x\n",Length,Buffer);
 
   KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
-  DataListEmpty = IsListEmpty(&ReadFcb->DataListHead);
-  KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
-
-  /* FIXME: check if in blocking mode */
-  if (DataListEmpty == TRUE)
-    {
-      /* Wait for ReadEvent to become signaled */
-      DPRINT("Waiting for readable data\n");
-      Status = KeWaitForSingleObject(&Fcb->ReadEvent,
-                                    UserRequest,
-                                    KernelMode,
-                                    FALSE,
-                                    NULL);
-      DPRINT("Finished waiting! Status: %x\n", Status);
-    }
-
-  KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
-
-  if (Pipe->PipeReadMode & FILE_PIPE_BYTE_STREAM_MODE)
-    {
-      DPRINT("Byte stream mode\n");
-
-      /* Byte stream mode */
-      Information = 0;
-      CurrentEntry = ReadFcb->DataListHead.Flink;
-       while ((Length > 0) && (CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead)))
+  while (1)
+  {
+     /* FIXME: check if in blocking mode */
+     if (IsListEmpty(&ReadFcb->DataListHead))
+     {
+        KeResetEvent(&Fcb->ReadEvent);
+        KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
+       if (Information > 0)
        {
-         Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
-
-         DPRINT("Took pipe data at %p off the queue\n", Current);
-
-         CopyLength = RtlMin(Current->Size, Length);
-         RtlCopyMemory(Buffer,
-                       ((PVOID)((ULONG_PTR)Current->Data + Current->Offset)),
-                       CopyLength);
-         Buffer += CopyLength;
-         Length -= CopyLength;
-         Information += CopyLength;
-
-         /* Update the data buffer */
-         Current->Offset += CopyLength;
-         Current->Size -= CopyLength;
-
-         CurrentEntry = CurrentEntry->Flink;
+          Status = STATUS_SUCCESS;
+          goto done;
+       }
+        if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+       {
+          Status = STATUS_PIPE_BROKEN;
+          goto done;
+       }
+        /* Wait for ReadEvent to become signaled */
+        DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
+        Status = KeWaitForSingleObject(&Fcb->ReadEvent,
+                                      UserRequest,
+                                      KernelMode,
+                                      FALSE,
+                                      NULL);
+        DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+        KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
+     }
+
+     if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE)
+     {
+        DPRINT("Byte stream mode\n");
+
+        /* Byte stream mode */
+        CurrentEntry = NULL;
+        while (Length > 0 && !IsListEmpty(&ReadFcb->DataListHead))
+       {
+          CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
+          Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
+
+          DPRINT("Took pipe data at %p off the queue\n", Current);
+
+          CopyLength = RtlMin(Current->Size, Length);
+          RtlCopyMemory(Buffer,
+                        ((PVOID)((PVOID)Current->Data + Current->Offset)),
+                        CopyLength);
+          Buffer += CopyLength;
+          Length -= CopyLength;
+          Information += CopyLength;
+
+          /* Update the data buffer */
+          Current->Offset += CopyLength;
+          Current->Size -= CopyLength;
+          if (Current->Size == 0)
+          {
+              NpfsFreePipeData(Current);
+              CurrentEntry = NULL;
+          }
        }
 
-      if ((CurrentEntry != &ReadFcb->DataListHead) && (Current->Offset != Current->Size))
+        if (CurrentEntry && Current->Size > 0)
        {
-         DPRINT("Putting pipe data at %p back in queue\n", Current);
+          DPRINT("Putting pipe data at %p back in queue\n", Current);
 
-         /* The caller's buffer could not contain the complete message,
-            so put it back on the queue */
-         InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
+          /* The caller's buffer could not contain the complete message,
+             so put it back on the queue */
+          InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
        }
-    }
-  else
-    {
-      DPRINT("Message mode\n");
 
-      /* Message mode */
-      CurrentEntry = ReadFcb->DataListHead.Flink;
-      if (CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead))
+       if (Length == 0)
        {
-         Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
+          break;
+       }
+     }
+     else
+     {
+        DPRINT("Message mode\n");
 
-         DPRINT("Took pipe data at %p off the queue\n", Current);
+        /* Message mode */
+        if (!IsListEmpty(&ReadFcb->DataListHead))
+       {
+          CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
+          Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
 
-         /* Truncate the message if the receive buffer is too small */
-         CopyLength = RtlMin(Current->Size, Length);
-         RtlCopyMemory(Buffer, Current->Data, CopyLength);
-         Information = CopyLength;
+          DPRINT("Took pipe data at %p off the queue\n", Current);
 
-         Current->Offset += CopyLength;
+          /* Truncate the message if the receive buffer is too small */
+          CopyLength = RtlMin(Current->Size, Length);
+          RtlCopyMemory(Buffer, Current->Data, CopyLength);
+          Information = CopyLength;
 
-         CurrentEntry = CurrentEntry->Flink;
+          Current->Offset += CopyLength;
+          NpfsFreePipeData(Current);
        }
-    }
-
+       if (Information > 0)
+       {
+          break;
+       }
+     }
+  }
+  /* reset ReaderEvent */
   KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
 
-  /* reset ReaderEvent */
-  KeResetEvent(&Fcb->ReadEvent);
 
 done:
   Irp->IoStatus.Status = Status;
@@ -275,10 +283,12 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
 
       KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
       InsertTailList(&Fcb->DataListHead, &PipeData->ListEntry);
-      KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
 
       /* signal the readers ReadEvent */
-      KeSetEvent(&Fcb->OtherSide->ConnectEvent, IO_NO_INCREMENT, FALSE);
+      KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
+
+      KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+
     }
   else
     {