Support asynchronous (aka overlapped) connect, read and write requests.
authorEric Kohl <eric.kohl@reactos.org>
Sat, 5 Mar 2005 12:08:50 +0000 (12:08 +0000)
committerEric Kohl <eric.kohl@reactos.org>
Sat, 5 Mar 2005 12:08:50 +0000 (12:08 +0000)
svn path=/trunk/; revision=13826

reactos/drivers/fs/np/create.c
reactos/drivers/fs/np/fsctrl.c
reactos/drivers/fs/np/npfs.h
reactos/drivers/fs/np/rw.c

index cda16fe..106c511 100644 (file)
@@ -48,17 +48,18 @@ static PNPFS_FCB
 NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
 {
   PLIST_ENTRY CurrentEntry;
-  PNPFS_FCB ServerFcb;
+  PNPFS_WAITER_ENTRY Waiter;
 
-  CurrentEntry = Pipe->ServerFcbListHead.Flink;
-  while (CurrentEntry != &Pipe->ServerFcbListHead)
+  CurrentEntry = Pipe->WaiterListHead.Flink;
+  while (CurrentEntry != &Pipe->WaiterListHead)
     {
-      ServerFcb = CONTAINING_RECORD(CurrentEntry, NPFS_FCB, FcbListEntry);
-      if (ServerFcb->PipeState == FILE_PIPE_LISTENING_STATE)
+      Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
+      if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
        {
-         DPRINT("Server found! Fcb %p\n", ServerFcb);
-         return ServerFcb;
+         DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
+         return Waiter->Fcb;
        }
+
       CurrentEntry = CurrentEntry->Flink;
     }
 
@@ -66,6 +67,35 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
 }
 
 
+static VOID
+NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe,
+                                          PNPFS_FCB Fcb)
+{
+  PLIST_ENTRY CurrentEntry;
+  PNPFS_WAITER_ENTRY Waiter;
+
+  CurrentEntry = Pipe->WaiterListHead.Flink;
+  while (CurrentEntry != &Pipe->WaiterListHead)
+    {
+      Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
+      if (Waiter->Fcb == Fcb)
+       {
+         DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
+
+         KeSetEvent(Waiter->Irp->UserEvent, 0, FALSE);
+         Waiter->Irp->UserIosb->Status = FILE_PIPE_CONNECTED_STATE;
+         Waiter->Irp->UserIosb->Information = 0;
+         IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT);
+
+         RemoveEntryList(&Waiter->Entry);
+         ExFreePool(Waiter);
+         return;
+       }
+      CurrentEntry = CurrentEntry->Flink;
+    }
+}
+
+
 NTSTATUS STDCALL
 NpfsCreate(PDEVICE_OBJECT DeviceObject,
           PIRP Irp)
@@ -206,9 +236,8 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
       ClientFcb->PipeState = FILE_PIPE_CONNECTED_STATE;
       ServerFcb->PipeState = FILE_PIPE_CONNECTED_STATE;
 
-      /* Wake server thread */
-      DPRINT("Setting the ConnectEvent for %x\n", ServerFcb);
-      KeSetEvent(&ServerFcb->ConnectEvent, 0, FALSE);
+      /* Signal the server thread and remove it from the waiter list */
+      NpfsSignalAndRemoveListeningServerInstance(Pipe, ServerFcb);
     }
 
   KeUnlockMutex(&Pipe->FcbListLock);
@@ -318,6 +347,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
 
        InitializeListHead(&Pipe->ServerFcbListHead);
        InitializeListHead(&Pipe->ClientFcbListHead);
+       InitializeListHead(&Pipe->WaiterListHead);
        KeInitializeMutex(&Pipe->FcbListLock, 0);
 
        Pipe->PipeType = Buffer->NamedPipeType;
index f245fdf..8151989 100644 (file)
 
 /* FUNCTIONS *****************************************************************/
 
+static VOID
+NpfsListeningCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+                           IN PIRP Irp)
+{
+  PNPFS_WAITER_ENTRY Waiter;
+
+  DPRINT1("NpfsListeningCancelRoutine() called\n");
+  /* FIXME: Not tested. */
+
+  Waiter = Irp->Tail.Overlay.DriverContext[0];
+
+  RemoveEntryList(&Waiter->Entry);
+  ExFreePool(Waiter);
+
+  IoReleaseCancelSpinLock(Irp->CancelIrql);
+
+  Irp->IoStatus.Status = STATUS_CANCELLED;
+  Irp->IoStatus.Information = 0;
+  IoCompleteRequest(Irp, IO_NO_INCREMENT);
+}
+
+
+static NTSTATUS
+NpfsAddListeningServerInstance(PIRP Irp,
+                              PNPFS_FCB Fcb)
+{
+  PNPFS_WAITER_ENTRY Entry;
+  KIRQL OldIrql;
+
+  Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
+  if (Entry == NULL)
+    return STATUS_INSUFFICIENT_RESOURCES;
+
+  Entry->Irp = Irp;
+  Entry->Fcb = Fcb;
+  InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
+
+  IoAcquireCancelSpinLock(&OldIrql);
+  if (!Irp->Cancel)
+    {
+      Irp->Tail.Overlay.DriverContext[0] = Entry;
+      IoMarkIrpPending(Irp);
+      IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
+      IoReleaseCancelSpinLock(OldIrql);
+      return STATUS_PENDING;
+    }
+  /* IRP has already been cancelled */
+  IoReleaseCancelSpinLock(OldIrql);
+
+  DPRINT1("FIXME: Remove waiter entry!\n");
+  RemoveEntryList(&Entry->Entry);
+  ExFreePool(Entry);
+
+  return STATUS_CANCELLED;
+}
+
+
 static NTSTATUS
-NpfsConnectPipe(PNPFS_FCB Fcb)
+NpfsConnectPipe(PIRP Irp,
+                PNPFS_FCB Fcb)
 {
   PNPFS_PIPE Pipe;
   PLIST_ENTRY current_entry;
@@ -88,29 +146,18 @@ NpfsConnectPipe(PNPFS_FCB Fcb)
       current_entry = current_entry->Flink;
     }
 
-  KeUnlockMutex(&Pipe->FcbListLock);
-
   /* no listening client fcb found */
   DPRINT("No listening client fcb found -- waiting for client\n");
 
   Fcb->PipeState = FILE_PIPE_LISTENING_STATE;
 
-  Status = KeWaitForSingleObject(&Fcb->ConnectEvent,
-                                UserRequest,
-                                KernelMode,
-                                FALSE,
-                                NULL);
-  if (!NT_SUCCESS(Status))
-    {
-      DPRINT("KeWaitForSingleObject() failed (Status %lx)\n", Status);
-      return Status;
-    }
+  Status = NpfsAddListeningServerInstance(Irp, Fcb);
 
-  Fcb->PipeState = FILE_PIPE_CONNECTED_STATE;
+  KeUnlockMutex(&Pipe->FcbListLock);
 
-  DPRINT("Client Fcb: %p\n", Fcb->OtherSide);
+  DPRINT("NpfsConnectPipe() done (Status %lx)\n", Status);
 
-  return STATUS_PIPE_CONNECTED;
+  return Status;
 }
 
 
@@ -327,7 +374,6 @@ NpfsPeekPipe(PIRP Irp,
 }
 
 
-
 NTSTATUS STDCALL
 NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
                      PIRP Irp)
@@ -366,7 +412,7 @@ NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
 
       case FSCTL_PIPE_LISTEN:
        DPRINT("Connecting pipe %wZ\n", &Pipe->PipeName);
-       Status = NpfsConnectPipe(Fcb);
+       Status = NpfsConnectPipe(Irp, Fcb);
        break;
 
       case FSCTL_PIPE_PEEK:
@@ -439,12 +485,15 @@ NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
        Status = STATUS_UNSUCCESSFUL;
     }
 
-  Irp->IoStatus.Status = Status;
-  Irp->IoStatus.Information = 0;
-
-  IoCompleteRequest(Irp, IO_NO_INCREMENT);
+  if (Status != STATUS_PENDING)
+    {
+      Irp->IoStatus.Status = Status;
+      Irp->IoStatus.Information = 0;
+      IoCompleteRequest(Irp, IO_NO_INCREMENT);
+    }
 
-  return(Status);
+  return Status;
 }
 
 
index effd44d..1038411 100644 (file)
@@ -19,6 +19,7 @@ typedef struct _NPFS_PIPE
   KMUTEX FcbListLock;
   LIST_ENTRY ServerFcbListHead;
   LIST_ENTRY ClientFcbListHead;
+  LIST_ENTRY WaiterListHead;
   ULONG PipeType;
   ULONG ReadMode;
   ULONG WriteMode;
@@ -52,6 +53,14 @@ typedef struct _NPFS_FCB
   KSPIN_LOCK DataListLock;     /* Data queue lock */
 } NPFS_FCB, *PNPFS_FCB;
 
+typedef struct _NPFS_WAITER_ENTRY
+{
+  LIST_ENTRY Entry;
+  PIRP Irp;
+  PNPFS_PIPE Pipe;
+  PNPFS_FCB Fcb;
+} NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;
+
 
 extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;
 
index c52403b..5338323 100644 (file)
@@ -101,7 +101,6 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
       /* FIXME: check if in blocking mode */
       if (Fcb->ReadDataAvailable == 0)
        {
-         KeResetEvent(&Fcb->Event);
          if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
            {
              KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
@@ -167,6 +166,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
          if (Length == 0)
            {
              KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
+             KeResetEvent(&Fcb->Event);
              break;
            }
        }
@@ -187,8 +187,19 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
 #endif
 
              Information = CopyLength;
-             Fcb->ReadDataAvailable = 0;
-             Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
+
+             if (Fcb->ReadDataAvailable > Length)
+               {
+                 memmove(Fcb->Data, Fcb->Data + Length,
+                         Fcb->ReadDataAvailable - Length);
+                 Fcb->ReadDataAvailable -= Length;
+                 Status = STATUS_MORE_ENTRIES;
+               }
+             else
+               {
+                 Fcb->ReadDataAvailable = 0;
+                 Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
+               }
            }
 
          if (Information > 0)
@@ -197,6 +208,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
                {
                  KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
                }
+             KeResetEvent(&Fcb->Event);
              break;
            }
        }
@@ -291,7 +303,6 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
     {
       if (ReaderFcb->WriteQuotaAvailable == 0)
        {
-         KeResetEvent(&Fcb->Event);
          KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
          KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
          if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
@@ -355,6 +366,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
          if (Length == 0)
            {
              KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+             KeResetEvent(&Fcb->Event);
              break;
            }
        }
@@ -374,6 +386,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
          if (Information > 0)
            {
              KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+             KeResetEvent(&Fcb->Event);
              break;
            }
        }