- Data is now written to the readers data buffer and the reader reads from his own...
[reactos.git] / reactos / drivers / fs / np / rw.c
index 00e2c7b..f7ff3a5 100644 (file)
@@ -1,8 +1,8 @@
-/* $Id: rw.c,v 1.5 2001/11/20 20:34:29 ekohl Exp $
+/* $Id$
  *
  * COPYRIGHT:  See COPYING in the top level directory
  * PROJECT:    ReactOS kernel
- * FILE:       services/fs/np/rw.c
+ * FILE:       drivers/fs/np/rw.c
  * PURPOSE:    Named pipe filesystem
  * PROGRAMMER: David Welch <welch@cwcom.net>
  */
 /* INCLUDES ******************************************************************/
 
 #include <ddk/ntddk.h>
+#include <rosrtl/minmax.h>
 #include "npfs.h"
 
 #define NDEBUG
 #include <debug.h>
 
-
 /* FUNCTIONS *****************************************************************/
 
-static inline VOID
-NpfsFreePipeData(PNPFS_PIPE_DATA PipeData)
+#ifndef NDEBUG
+VOID HexDump(PUCHAR Buffer, ULONG Length)
 {
-  if (PipeData->Data)
-    {
-      ExFreePool(PipeData->Data);
-    }
-
-  ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData);
-}
+  CHAR Line[65];
+  UCHAR ch;
+  const char Hex[] = "0123456789ABCDEF";
+  int i, j;
 
+  DbgPrint("---------------\n");
 
-static inline PNPFS_PIPE_DATA
-NpfsAllocatePipeData(PVOID Data,
-                    ULONG Size)
-{
-  PNPFS_PIPE_DATA PipeData;
-
-  PipeData = ExAllocateFromNPagedLookasideList(&NpfsPipeDataLookasideList);
-  if (!PipeData)
+  for (i = 0; i < ROUND_UP(Length, 16); i+= 16)
     {
-      return NULL;
+      memset(Line, ' ', 64);
+      Line[64] = 0;
+
+      for (j = 0; j < 16 && j + i < Length; j++)
+        {
+          ch = Buffer[i + j];
+          Line[3*j + 0] = Hex[ch >> 4];
+         Line[3*j + 1] = Hex[ch & 0x0f];
+         Line[48 + j] = isprint(ch) ? ch : '.';
+        }
+      DbgPrint("%s\n", Line);
     }
-
-  PipeData->Data = Data;
-  PipeData->Size = Size;
-  PipeData->Offset = 0;
-
-  return PipeData;
-}
-
-
-static inline PNPFS_PIPE_DATA
-NpfsInitializePipeData(
-  PVOID Data,
-  ULONG Size)
-{
-  PNPFS_PIPE_DATA PipeData;
-  PVOID Buffer;
-
-  Buffer = ExAllocatePool(NonPagedPool, Size);
-  if (!Buffer)
-  {
-    return NULL;
-  }
-
-  RtlMoveMemory(Buffer, Data, Size);
-
-  PipeData = NpfsAllocatePipeData(Buffer, Size);
-  if (!PipeData)
-  {
-    ExFreePool(Buffer);
-  }
-
-  return PipeData;
+  DbgPrint("---------------\n");
 }
+#endif
 
 
 NTSTATUS STDCALL
-NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
+NpfsRead(PDEVICE_OBJECT DeviceObject,
+        PIRP Irp)
 {
   PIO_STACK_LOCATION IoStack;
   PFILE_OBJECT FileObject;
   NTSTATUS Status;
   PNPFS_DEVICE_EXTENSION DeviceExt;
-  PWSTR PipeName;
   KIRQL OldIrql;
-  PLIST_ENTRY CurrentEntry;
-  PNPFS_PIPE_DATA Current;
   ULONG Information;
   PNPFS_FCB Fcb;
-  PNPFS_FCB ReadFcb;
+  PNPFS_FCB WriterFcb;
   PNPFS_PIPE Pipe;
   ULONG Length;
   PVOID Buffer;
   ULONG CopyLength;
-  BOOLEAN DataListEmpty;
+  ULONG TempLength;
 
   DPRINT("NpfsRead(DeviceObject %p  Irp %p)\n", DeviceObject, Irp);
-  
+
   DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
   IoStack = IoGetCurrentIrpStackLocation(Irp);
   FileObject = IoStack->FileObject;
   Fcb = FileObject->FsContext;
   Pipe = Fcb->Pipe;
-  ReadFcb = Fcb->OtherSide;
+  WriterFcb = Fcb->OtherSide;
 
-  if (ReadFcb == NULL)
+  if (Irp->MdlAddress == NULL)
     {
-      DPRINT("Pipe is NOT connected!\n");
+      DPRINT("Irp->MdlAddress == NULL\n");
       Status = STATUS_UNSUCCESSFUL;
-      Length = 0;
+      Information = 0;
       goto done;
     }
 
-  if (Irp->MdlAddress == NULL)
+  if (Fcb->Data == NULL)
     {
-      DPRINT("Irp->MdlAddress == NULL\n");
+      DPRINT("Pipe is NOT readable!\n");
       Status = STATUS_UNSUCCESSFUL;
-      Length = 0;
+      Information = 0;
       goto done;
     }
 
   Status = STATUS_SUCCESS;
   Length = IoStack->Parameters.Read.Length;
+  Information = 0;
 
   Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
-  DPRINT("Length %d Buffer %x\n",Length,Buffer);
-
-  KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
-  DataListEmpty = IsListEmpty(&ReadFcb->DataListHead);
-  KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
-
-  /* FIXME: check if in blocking mode */
-  if (DataListEmpty == TRUE)
-    {
-      /* Wait for ReadEvent to become signaled */
-      DPRINT("Waiting for readable data\n");
-      Status = KeWaitForSingleObject(&Fcb->ReadEvent,
-                                    UserRequest,
-                                    KernelMode,
-                                    FALSE,
-                                    NULL);
-      DPRINT("Finished waiting! Status: %x\n", Status);
-    }
-
-  KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
-
-  if (Pipe->PipeReadMode & FILE_PIPE_BYTE_STREAM_MODE)
+  KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+  while (1)
     {
-      DPRINT("Byte stream mode\n");
-
-      /* Byte stream mode */
-      Information = 0;
-      CurrentEntry = ReadFcb->DataListHead.Flink;
-       while ((Length > 0) && (CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead)))
+      /* FIXME: check if in blocking mode */
+      if (Fcb->ReadDataAvailable == 0)
        {
-         Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
-
-         DPRINT("Took pipe data at %p off the queue\n", Current);
-
-         CopyLength = RtlMin(Current->Size, Length);
-         RtlCopyMemory(Buffer,
-                       ((PVOID)((ULONG_PTR)Current->Data + Current->Offset)),
-                       CopyLength);
-         Buffer += CopyLength;
-         Length -= CopyLength;
-         Information += CopyLength;
-
-         /* Update the data buffer */
-         Current->Offset += CopyLength;
-         Current->Size -= CopyLength;
-
-         CurrentEntry = CurrentEntry->Flink;
+         KeResetEvent(&Fcb->Event);
+         KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
+         KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+         if (Information > 0)
+           {
+             Status = STATUS_SUCCESS;
+             goto done;
+           }
+
+         if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+           {
+             DPRINT("PipeState: %x\n", Fcb->PipeState);
+             Status = STATUS_PIPE_BROKEN;
+             goto done;
+           }
+
+         /* Wait for ReadEvent to become signaled */
+         DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
+         Status = KeWaitForSingleObject(&Fcb->Event,
+                                        UserRequest,
+                                        KernelMode,
+                                        FALSE,
+                                        NULL);
+         DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+
+         KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
        }
 
-      if ((CurrentEntry != &ReadFcb->DataListHead) && (Current->Offset != Current->Size))
+      if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
        {
-         DPRINT("Putting pipe data at %p back in queue\n", Current);
-
-         /* The caller's buffer could not contain the complete message,
-            so put it back on the queue */
-         InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
+         DPRINT("Byte stream mode\n");
+         /* Byte stream mode */
+         while (Length > 0 && Fcb->ReadDataAvailable > 0)
+           {
+             CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
+             if (Fcb->ReadPtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength)
+               {
+                 memcpy(Buffer, Fcb->ReadPtr, CopyLength);
+                 Fcb->ReadPtr += CopyLength;
+                 if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
+                   {
+                     Fcb->ReadPtr = Fcb->Data;
+                   }
+               }
+             else
+               {
+                 TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr;
+                 memcpy(Buffer, Fcb->ReadPtr, TempLength);
+                 memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength);
+                 Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
+               }
+
+             Buffer += CopyLength;
+             Length -= CopyLength;
+             Information += CopyLength;
+
+             Fcb->ReadDataAvailable -= CopyLength;
+             Fcb->WriteQuotaAvailable += CopyLength;
+           }
+
+         if (Length == 0)
+           {
+             KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
+             break;
+           }
        }
-    }
-  else
-    {
-      DPRINT("Message mode\n");
-
-      /* Message mode */
-      CurrentEntry = ReadFcb->DataListHead.Flink;
-      if (CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead))
+      else
        {
-         Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
-
-         DPRINT("Took pipe data at %p off the queue\n", Current);
-
-         /* Truncate the message if the receive buffer is too small */
-         CopyLength = RtlMin(Current->Size, Length);
-         RtlCopyMemory(Buffer, Current->Data, CopyLength);
-         Information = CopyLength;
-
-         Current->Offset += CopyLength;
-
-         CurrentEntry = CurrentEntry->Flink;
+         DPRINT("Message mode\n");
+
+         /* Message mode */
+         if (Fcb->ReadDataAvailable)
+           {
+             /* Truncate the message if the receive buffer is too small */
+             CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
+             memcpy(Buffer, Fcb->Data, CopyLength);
+
+#ifndef NDEBUG
+             DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
+             HexDump((PUCHAR)Buffer, CopyLength);
+#endif
+
+             Information = CopyLength;
+             Fcb->ReadDataAvailable = 0;
+             Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
+           }
+
+         if (Information > 0)
+           {
+             KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
+             break;
+           }
        }
     }
 
-  KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
-
-  /* reset ReaderEvent */
-  KeResetEvent(&Fcb->ReadEvent);
+  KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
 
 done:
   Irp->IoStatus.Status = Status;
@@ -217,7 +204,9 @@ done:
 
   IoCompleteRequest(Irp, IO_NO_INCREMENT);
 
-  return(Status);
+  DPRINT("NpfsRead done (Status %lx)\n", Status);
+
+  return Status;
 }
 
 
@@ -228,13 +217,16 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
   PIO_STACK_LOCATION IoStack;
   PFILE_OBJECT FileObject;
   PNPFS_FCB Fcb = NULL;
+  PNPFS_FCB ReaderFcb;
   PNPFS_PIPE Pipe = NULL;
   PUCHAR Buffer;
   NTSTATUS Status = STATUS_SUCCESS;
   ULONG Length;
   ULONG Offset;
   KIRQL OldIrql;
-  PNPFS_PIPE_DATA PipeData;
+  ULONG Information;
+  ULONG CopyLength;
+  ULONG TempLength;
 
   DPRINT("NpfsWrite()\n");
 
@@ -244,55 +236,154 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
   DPRINT("Pipe name %wZ\n", &FileObject->FileName);
 
   Fcb = FileObject->FsContext;
+  ReaderFcb = Fcb->OtherSide;
   Pipe = Fcb->Pipe;
 
   Length = IoStack->Parameters.Write.Length;
   Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
+  Information = 0;
 
   if (Irp->MdlAddress == NULL)
     {
-      DbgPrint ("Irp->MdlAddress == NULL\n");
+      DPRINT("Irp->MdlAddress == NULL\n");
       Status = STATUS_UNSUCCESSFUL;
       Length = 0;
       goto done;
     }
 
-  if (Fcb->OtherSide == NULL)
+  if (ReaderFcb == NULL)
     {
       DPRINT("Pipe is NOT connected!\n");
+      if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
+        Status = STATUS_PIPE_LISTENING;
+      else if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
+        Status = STATUS_PIPE_DISCONNECTED;
+      else
+        Status = STATUS_UNSUCCESSFUL;
+      Length = 0;
+      goto done;
+    }
+
+  if (ReaderFcb->Data == NULL)
+    {
+      DPRINT("Pipe is NOT writable!\n");
       Status = STATUS_UNSUCCESSFUL;
       Length = 0;
       goto done;
     }
 
+  Status = STATUS_SUCCESS;
   Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
+
+  KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+#ifndef NDEBUG
   DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
+  HexDump(Buffer, Length);
+#endif
 
-  PipeData = NpfsInitializePipeData(Buffer, Length);
-  if (PipeData)
+  while(1)
     {
-      DPRINT("Attaching pipe data at %p (%d bytes)\n", PipeData, Length);
-
-      KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
-      InsertTailList(&Fcb->DataListHead, &PipeData->ListEntry);
-      KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+      if (ReaderFcb->WriteQuotaAvailable == 0)
+       {
+         KeResetEvent(&Fcb->Event);
+         KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+         KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+         if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+           {
+             Status = STATUS_PIPE_BROKEN;
+             goto done;
+           }
+
+         DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer);
+         Status = KeWaitForSingleObject(&Fcb->Event,
+                                        UserRequest,
+                                        KernelMode,
+                                        FALSE,
+                                        NULL);
+         DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+
+         /*
+          * It's possible that the event was signaled because the
+          * other side of pipe was closed.
+          */
+         if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+           {
+             DPRINT("PipeState: %x\n", Fcb->PipeState);
+             Status = STATUS_PIPE_BROKEN;
+             goto done;
+           }
+         KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+       }
 
-      /* signal the readers ReadEvent */
-      KeSetEvent(&Fcb->OtherSide->ConnectEvent, IO_NO_INCREMENT, FALSE);
-    }
-  else
-    {
-      Length = 0;
-      Status = STATUS_INSUFFICIENT_RESOURCES;
+      if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
+       {
+         DPRINT("Byte stream mode\n");
+         while (Length > 0 && ReaderFcb->WriteQuotaAvailable > 0)
+           {
+             CopyLength = RtlRosMin(Length, ReaderFcb->WriteQuotaAvailable);
+             if (ReaderFcb->WritePtr + CopyLength <= ReaderFcb->Data + ReaderFcb->MaxDataLength)
+               {
+                 memcpy(ReaderFcb->WritePtr, Buffer, CopyLength);
+                 ReaderFcb->WritePtr += CopyLength;
+                 if (ReaderFcb->WritePtr == ReaderFcb->Data + ReaderFcb->MaxDataLength)
+                   {
+                     ReaderFcb->WritePtr = ReaderFcb->Data;
+                   }
+               }
+             else
+               {
+                 TempLength = ReaderFcb->Data + ReaderFcb->MaxDataLength - ReaderFcb->WritePtr;
+                 memcpy(ReaderFcb->WritePtr, Buffer, TempLength);
+                 memcpy(ReaderFcb->Data, Buffer + TempLength, CopyLength - TempLength);
+                 ReaderFcb->WritePtr = ReaderFcb->Data + CopyLength - TempLength;
+               }
+
+             Buffer += CopyLength;
+             Length -= CopyLength;
+             Information += CopyLength;
+
+             ReaderFcb->ReadDataAvailable += CopyLength;
+             ReaderFcb->WriteQuotaAvailable -= CopyLength;
+           }
+
+         if (Length == 0)
+           {
+             KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+             break;
+           }
+       }
+      else
+       {
+         DPRINT("Message mode\n");
+         if (Length > 0)
+           {
+             CopyLength = RtlRosMin(Length, ReaderFcb->WriteQuotaAvailable);
+             memcpy(ReaderFcb->Data, Buffer, CopyLength);
+
+             Information = CopyLength;
+             ReaderFcb->ReadDataAvailable = CopyLength;
+             ReaderFcb->WriteQuotaAvailable = 0;
+           }
+
+         if (Information > 0)
+           {
+             KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+             break;
+           }
+       }
     }
 
+  KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+
 done:
   Irp->IoStatus.Status = Status;
-  Irp->IoStatus.Information = Length;
-  
+  Irp->IoStatus.Information = Information;
+
   IoCompleteRequest(Irp, IO_NO_INCREMENT);
-  
-  return(Status);
+
+  DPRINT("NpfsWrite done (Status %lx)\n", Status);
+
+  return Status;
 }
 
 /* EOF */