-/* $Id: rw.c,v 1.6 2002/05/07 22:41:22 hbirr Exp $
- *
+/*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
- * FILE: services/fs/np/rw.c
+ * FILE: drivers/fs/np/rw.c
* PURPOSE: Named pipe filesystem
* PROGRAMMER: David Welch <welch@cwcom.net>
*/
/* INCLUDES ******************************************************************/
-#include <ddk/ntddk.h>
-#include "npfs.h"
-
#define NDEBUG
#include <debug.h>
+#include "npfs.h"
/* FUNCTIONS *****************************************************************/
-static inline PNPFS_PIPE_DATA
-NpfsAllocatePipeData(PVOID Data,
- ULONG Size)
+#ifndef NDEBUG
+VOID HexDump(PUCHAR Buffer, ULONG Length)
{
- PNPFS_PIPE_DATA PipeData;
+ CHAR Line[65];
+ UCHAR ch;
+ const char Hex[] = "0123456789ABCDEF";
+ int i, j;
- PipeData = ExAllocateFromNPagedLookasideList(&NpfsPipeDataLookasideList);
- if (!PipeData)
+ DbgPrint("---------------\n");
+
+ for (i = 0; i < Length; i+= 16)
{
- return NULL;
+ memset(Line, ' ', 64);
+ Line[64] = 0;
+
+ for (j = 0; j < 16 && j + i < Length; j++)
+ {
+ ch = Buffer[i + j];
+ Line[3*j + 0] = Hex[ch >> 4];
+ Line[3*j + 1] = Hex[ch & 0x0f];
+ Line[48 + j] = isprint(ch) ? ch : '.';
+ }
+ DbgPrint("%s\n", Line);
}
+ DbgPrint("---------------\n");
+}
+#endif
- PipeData->Data = Data;
- PipeData->Size = Size;
- PipeData->Offset = 0;
-
- return PipeData;
+static VOID STDCALL
+NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
+{
+ PNPFS_CONTEXT Context;
+ PNPFS_DEVICE_EXTENSION DeviceExt;
+ PIO_STACK_LOCATION IoStack;
+ PNPFS_FCB Fcb;
+ BOOLEAN Complete = FALSE;
+
+ DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %x, Irp %x)\n", DeviceObject, Irp);
+
+ IoReleaseCancelSpinLock(Irp->CancelIrql);
+
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+ DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
+ IoStack = IoGetCurrentIrpStackLocation(Irp);
+ Fcb = IoStack->FileObject->FsContext;
+
+ KeLockMutex(&DeviceExt->PipeListLock);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ switch(IoStack->MajorFunction)
+ {
+ case IRP_MJ_READ:
+ if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+ {
+ /* we are not the first in the list, remove an complete us */
+ RemoveEntryList(&Context->ListEntry);
+ Complete = TRUE;
+ }
+ else
+ {
+ KeSetEvent(&Fcb->ReadEvent, IO_NO_INCREMENT, FALSE);
+ }
+ break;
+ default:
+ KEBUGCHECK(0);
+ }
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ if (Complete)
+ {
+ Irp->IoStatus.Status = STATUS_CANCELLED;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ }
}
+static VOID STDCALL
+NpfsWaiterThread(PVOID InitContext)
+{
+ PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
+ ULONG CurrentCount;
+ ULONG Count = 0;
+ PIRP Irp = NULL;
+ PIRP NextIrp;
+ NTSTATUS Status;
+ BOOLEAN Terminate = FALSE;
+ BOOLEAN Cancel = FALSE;
+ PIO_STACK_LOCATION IoStack = NULL;
+ PNPFS_CONTEXT Context;
+ PNPFS_CONTEXT NextContext;
+ PNPFS_FCB Fcb;
+
+ KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+
+ while (1)
+ {
+ CurrentCount = ThreadContext->Count;
+ KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+ if (Irp)
+ {
+ if (Cancel)
+ {
+ Irp->IoStatus.Status = STATUS_CANCELLED;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ }
+ else
+ {
+ switch (IoStack->MajorFunction)
+ {
+ case IRP_MJ_READ:
+ NpfsRead(IoStack->DeviceObject, Irp);
+ break;
+ default:
+ KEBUGCHECK(0);
+ }
+ }
+ }
+ if (Terminate)
+ {
+ break;
+ }
+ Status = KeWaitForMultipleObjects(CurrentCount,
+ ThreadContext->WaitObjectArray,
+ WaitAny,
+ Executive,
+ KernelMode,
+ FALSE,
+ NULL,
+ ThreadContext->WaitBlockArray);
+ if (!NT_SUCCESS(Status))
+ {
+ KEBUGCHECK(0);
+ }
+ KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+ Count = Status - STATUS_SUCCESS;
+ ASSERT (Count < CurrentCount);
+ if (Count > 0)
+ {
+ Irp = ThreadContext->WaitIrpArray[Count];
+ ThreadContext->Count--;
+ ThreadContext->DeviceExt->EmptyWaiterCount++;
+ ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
+ ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
+
+ Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+ IoStack = IoGetCurrentIrpStackLocation(Irp);
+
+ if (Cancel)
+ {
+ Fcb = IoStack->FileObject->FsContext;
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ RemoveEntryList(&Context->ListEntry);
+ switch (IoStack->MajorFunction)
+ {
+ case IRP_MJ_READ:
+ if (!IsListEmpty(&Fcb->ReadRequestListHead))
+ {
+ /* put the next request on the wait list */
+ NextContext = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
+ ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
+ NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
+ ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
+ ThreadContext->Count++;
+ ThreadContext->DeviceExt->EmptyWaiterCount--;
+ }
+ KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+ break;
+ default:
+ KEBUGCHECK(0);
+ }
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ }
+ }
+ else
+ {
+ /* someone has add a new wait request */
+ Irp = NULL;
+ }
+ if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
+ {
+ /* it exist an other thread with empty wait slots, we can remove our thread from the list */
+ RemoveEntryList(&ThreadContext->ListEntry);
+ ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
+ Terminate = TRUE;
+ }
+ }
+ ExFreePool(ThreadContext);
+}
-static inline PNPFS_PIPE_DATA
-NpfsInitializePipeData(
- PVOID Data,
- ULONG Size)
+static NTSTATUS
+NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
{
- PNPFS_PIPE_DATA PipeData;
- PVOID Buffer;
+ PLIST_ENTRY ListEntry;
+ PNPFS_THREAD_CONTEXT ThreadContext = NULL;
+ NTSTATUS Status;
+ HANDLE hThread;
+ KIRQL oldIrql;
- Buffer = ExAllocatePool(NonPagedPool, Size);
- if (!Buffer)
- {
- return NULL;
- }
+ PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+ PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
- RtlMoveMemory(Buffer, Data, Size);
+ DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
- PipeData = NpfsAllocatePipeData(Buffer, Size);
- if (!PipeData)
- {
- ExFreePool(Buffer);
- }
+ KeLockMutex(&DeviceExt->PipeListLock);
- return PipeData;
-}
+ ListEntry = DeviceExt->ThreadListHead.Flink;
+ while (ListEntry != &DeviceExt->ThreadListHead)
+ {
+ ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
+ if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
+ {
+ break;
+ }
+ ListEntry = ListEntry->Flink;
+ }
+ if (ListEntry == &DeviceExt->ThreadListHead)
+ {
+ ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
+ if (ThreadContext == NULL)
+ {
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ return STATUS_NO_MEMORY;
+ }
+ ThreadContext->DeviceExt = DeviceExt;
+ KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
+ ThreadContext->Count = 1;
+ ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
+
+
+ DPRINT("Creating a new system thread for waiting read/write requests\n");
+
+ Status = PsCreateSystemThread(&hThread,
+ THREAD_ALL_ACCESS,
+ NULL,
+ NULL,
+ NULL,
+ NpfsWaiterThread,
+ (PVOID)ThreadContext);
+ if (!NT_SUCCESS(Status))
+ {
+ ExFreePool(ThreadContext);
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ return Status;
+ }
+ InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
+ DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
+ }
+ IoMarkIrpPending(Irp);
+ IoAcquireCancelSpinLock(&oldIrql);
+ if (Irp->Cancel)
+ {
+ IoReleaseCancelSpinLock(oldIrql);
+ Status = STATUS_CANCELLED;
+ }
+ else
+ {
+ IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
+ IoReleaseCancelSpinLock(oldIrql);
+ ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
+ ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
+ ThreadContext->Count++;
+ DeviceExt->EmptyWaiterCount--;
+ KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
+ Status = STATUS_SUCCESS;
+ }
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ return Status;
+}
NTSTATUS STDCALL
-NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
+NpfsRead(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
{
- PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
NTSTATUS Status;
- PNPFS_DEVICE_EXTENSION DeviceExt;
- PWSTR PipeName;
- KIRQL OldIrql;
- PLIST_ENTRY CurrentEntry;
- PNPFS_PIPE_DATA Current;
- ULONG Information;
+ NTSTATUS OriginalStatus = STATUS_SUCCESS;
PNPFS_FCB Fcb;
- PNPFS_FCB ReadFcb;
- PNPFS_PIPE Pipe;
+ PNPFS_CONTEXT Context;
+ KEVENT Event;
ULONG Length;
- PVOID Buffer;
+ ULONG Information;
ULONG CopyLength;
+ ULONG TempLength;
+ BOOLEAN IsOriginalRequest = TRUE;
+ PVOID Buffer;
- DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp);
-
- DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
- IoStack = IoGetCurrentIrpStackLocation(Irp);
- FileObject = IoStack->FileObject;
- Fcb = FileObject->FsContext;
- Pipe = Fcb->Pipe;
- ReadFcb = Fcb->OtherSide;
-
- if (ReadFcb == NULL)
- {
- DPRINT("Pipe is NOT connected!\n");
- Status = STATUS_UNSUCCESSFUL;
- Information = 0;
- goto done;
- }
+ DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
if (Irp->MdlAddress == NULL)
- {
- DPRINT("Irp->MdlAddress == NULL\n");
- Status = STATUS_UNSUCCESSFUL;
- Information = 0;
- goto done;
- }
+ {
+ DPRINT("Irp->MdlAddress == NULL\n");
+ Status = STATUS_UNSUCCESSFUL;
+ Irp->IoStatus.Information = 0;
+ goto done;
+ }
- Status = STATUS_SUCCESS;
- Length = IoStack->Parameters.Read.Length;
- Information = 0;
+ FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
+ Fcb = FileObject->FsContext;
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
- Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
- DPRINT("Length %d Buffer %x\n",Length,Buffer);
+ if (Fcb->Data == NULL)
+ {
+ DPRINT1("Pipe is NOT readable!\n");
+ Status = STATUS_UNSUCCESSFUL;
+ Irp->IoStatus.Information = 0;
+ goto done;
+ }
- KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
- while (1)
+ ExAcquireFastMutex(&Fcb->DataListLock);
+
+ if (IoIsOperationSynchronous(Irp))
{
- /* FIXME: check if in blocking mode */
- if (IsListEmpty(&ReadFcb->DataListHead))
+ InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
+ if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
{
- KeResetEvent(&Fcb->ReadEvent);
- KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
- if (Information > 0)
+ KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
+ Context->WaitEvent = &Event;
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ Status = KeWaitForSingleObject(&Event,
+ Executive,
+ KernelMode,
+ FALSE,
+ NULL);
+ if (!NT_SUCCESS(Status))
{
- Status = STATUS_SUCCESS;
- goto done;
+ KEBUGCHECK(0);
}
- if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ }
+ Irp->IoStatus.Information = 0;
+ }
+ else
+ {
+ KIRQL oldIrql;
+ if (IsListEmpty(&Fcb->ReadRequestListHead) ||
+ Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+ {
+ /* this is a new request */
+ Irp->IoStatus.Information = 0;
+ Context->WaitEvent = &Fcb->ReadEvent;
+ InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
+ if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
{
- Status = STATUS_PIPE_BROKEN;
+ /* there was already a request on the list */
+ IoAcquireCancelSpinLock(&oldIrql);
+ if (Irp->Cancel)
+ {
+ IoReleaseCancelSpinLock(oldIrql);
+ RemoveEntryList(&Context->ListEntry);
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ Status = STATUS_CANCELLED;
+ goto done;
+ }
+ IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
+ IoReleaseCancelSpinLock(oldIrql);
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ IoMarkIrpPending(Irp);
+ Status = STATUS_PENDING;
goto done;
}
- /* Wait for ReadEvent to become signaled */
- DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
- Status = KeWaitForSingleObject(&Fcb->ReadEvent,
- UserRequest,
- KernelMode,
- FALSE,
- NULL);
- DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
- KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
}
+ }
- if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE)
+ while (1)
+ {
+ Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
+ Information = Irp->IoStatus.Information;
+ Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
+ ASSERT (Information <= Length);
+ Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
+ Length -= Information;
+ Status = STATUS_SUCCESS;
+
+ while (1)
{
- DPRINT("Byte stream mode\n");
+ if (Fcb->ReadDataAvailable == 0)
+ {
+ if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+ {
+ ASSERT(Fcb->OtherSide != NULL);
+ KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+ }
+ if (Information > 0 &&
+ (Fcb->Pipe->ReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
+ Fcb->PipeState != FILE_PIPE_CONNECTED_STATE))
+ {
+ break;
+ }
+ if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+ {
+ DPRINT("PipeState: %x\n", Fcb->PipeState);
+ Status = STATUS_PIPE_BROKEN;
+ break;
+ }
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ if (IoIsOperationSynchronous(Irp))
+ {
+ /* Wait for ReadEvent to become signaled */
+
+ DPRINT("Waiting for readable data (%wZ)\n", &Fcb->Pipe->PipeName);
+ Status = KeWaitForSingleObject(&Fcb->ReadEvent,
+ UserRequest,
+ KernelMode,
+ FALSE,
+ NULL);
+ DPRINT("Finished waiting (%wZ)! Status: %x\n", &Fcb->Pipe->PipeName, Status);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ }
+ else
+ {
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
- /* Byte stream mode */
- CurrentEntry = NULL;
- while (Length > 0 && !IsListEmpty(&ReadFcb->DataListHead))
- {
- CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
- Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
-
- DPRINT("Took pipe data at %p off the queue\n", Current);
-
- CopyLength = RtlMin(Current->Size, Length);
- RtlCopyMemory(Buffer,
- ((PVOID)((PVOID)Current->Data + Current->Offset)),
- CopyLength);
- Buffer += CopyLength;
- Length -= CopyLength;
- Information += CopyLength;
-
- /* Update the data buffer */
- Current->Offset += CopyLength;
- Current->Size -= CopyLength;
- if (Current->Size == 0)
+ Context->WaitEvent = &Fcb->ReadEvent;
+ Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
+
+ if (NT_SUCCESS(Status))
+ {
+ Status = STATUS_PENDING;
+ }
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ break;
+ }
+ }
+ ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
+ if (Fcb->Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
+ {
+ DPRINT("Byte stream mode\n");
+ /* Byte stream mode */
+ while (Length > 0 && Fcb->ReadDataAvailable > 0)
{
- NpfsFreePipeData(Current);
- CurrentEntry = NULL;
+ CopyLength = min(Fcb->ReadDataAvailable, Length);
+ if ((ULONG_PTR)Fcb->ReadPtr + CopyLength <= (ULONG_PTR)Fcb->Data + Fcb->MaxDataLength)
+ {
+ memcpy(Buffer, Fcb->ReadPtr, CopyLength);
+ Fcb->ReadPtr = (PVOID)((ULONG_PTR)Fcb->ReadPtr + CopyLength);
+ if (Fcb->ReadPtr == (PVOID)((ULONG_PTR)Fcb->Data + Fcb->MaxDataLength))
+ {
+ Fcb->ReadPtr = Fcb->Data;
+ }
+ }
+ else
+ {
+ TempLength = (ULONG)((ULONG_PTR)Fcb->Data + Fcb->MaxDataLength - (ULONG_PTR)Fcb->ReadPtr);
+ memcpy(Buffer, Fcb->ReadPtr, TempLength);
+ memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Fcb->Data, CopyLength - TempLength);
+ Fcb->ReadPtr = (PVOID)((ULONG_PTR)Fcb->Data + CopyLength - TempLength);
+ }
+
+ Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
+ Length -= CopyLength;
+ Information += CopyLength;
+
+ Fcb->ReadDataAvailable -= CopyLength;
+ Fcb->WriteQuotaAvailable += CopyLength;
}
- }
- if (CurrentEntry && Current->Size > 0)
- {
- DPRINT("Putting pipe data at %p back in queue\n", Current);
+ if (Length == 0)
+ {
+ if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+ {
+ KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+ }
+ KeResetEvent(&Fcb->ReadEvent);
+ break;
+ }
+ }
+ else
+ {
+ DPRINT("Message mode\n");
- /* The caller's buffer could not contain the complete message,
- so put it back on the queue */
- InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
- }
+ /* Message mode */
+ if (Fcb->ReadDataAvailable)
+ {
+ /* Truncate the message if the receive buffer is too small */
+ CopyLength = min(Fcb->ReadDataAvailable, Length);
+ memcpy(Buffer, Fcb->Data, CopyLength);
+
+#ifndef NDEBUG
+ DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
+ HexDump((PUCHAR)Buffer, CopyLength);
+#endif
+
+ Information = CopyLength;
+
+ if (Fcb->ReadDataAvailable > Length)
+ {
+ memmove(Fcb->Data, (PVOID)((ULONG_PTR)Fcb->Data + Length),
+ Fcb->ReadDataAvailable - Length);
+ Fcb->ReadDataAvailable -= Length;
+ Status = STATUS_MORE_ENTRIES;
+ }
+ else
+ {
+ KeResetEvent(&Fcb->ReadEvent);
+ if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+ {
+ KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+ }
+ Fcb->ReadDataAvailable = 0;
+ Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
+ }
+ }
- if (Length == 0)
+ if (Information > 0)
+ {
+ break;
+ }
+ }
+ }
+ Irp->IoStatus.Information = Information;
+ Irp->IoStatus.Status = Status;
+
+ ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
+
+ if (IoIsOperationSynchronous(Irp))
+ {
+ RemoveEntryList(&Context->ListEntry);
+ if (!IsListEmpty(&Fcb->ReadRequestListHead))
{
- break;
+ Context = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
+ KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
}
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+
+ DPRINT("NpfsRead done (Status %lx)\n", Status);
+ return Status;
}
else
{
- DPRINT("Message mode\n");
-
- /* Message mode */
- if (!IsListEmpty(&ReadFcb->DataListHead))
+ if (IsOriginalRequest)
{
- CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
- Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
-
- DPRINT("Took pipe data at %p off the queue\n", Current);
-
- /* Truncate the message if the receive buffer is too small */
- CopyLength = RtlMin(Current->Size, Length);
- RtlCopyMemory(Buffer, Current->Data, CopyLength);
- Information = CopyLength;
-
- Current->Offset += CopyLength;
- NpfsFreePipeData(Current);
+ IsOriginalRequest = FALSE;
+ OriginalStatus = Status;
}
- if (Information > 0)
+ if (Status == STATUS_PENDING)
{
- break;
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
+ return OriginalStatus;
+ }
+ RemoveEntryList(&Context->ListEntry);
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ if (IsListEmpty(&Fcb->ReadRequestListHead))
+ {
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
+ return OriginalStatus;
}
+ Context = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
+ Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
}
}
- /* reset ReaderEvent */
- KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
-
done:
Irp->IoStatus.Status = Status;
- Irp->IoStatus.Information = Information;
- IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ if (Status != STATUS_PENDING)
+ {
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ }
+ DPRINT("NpfsRead done (Status %lx)\n", Status);
- return(Status);
+ return Status;
}
-
NTSTATUS STDCALL
NpfsWrite(PDEVICE_OBJECT DeviceObject,
PIRP Irp)
PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
PNPFS_FCB Fcb = NULL;
+ PNPFS_FCB ReaderFcb;
PNPFS_PIPE Pipe = NULL;
PUCHAR Buffer;
NTSTATUS Status = STATUS_SUCCESS;
ULONG Length;
ULONG Offset;
- KIRQL OldIrql;
- PNPFS_PIPE_DATA PipeData;
+ ULONG Information;
+ ULONG CopyLength;
+ ULONG TempLength;
DPRINT("NpfsWrite()\n");
DPRINT("Pipe name %wZ\n", &FileObject->FileName);
Fcb = FileObject->FsContext;
+ ReaderFcb = Fcb->OtherSide;
Pipe = Fcb->Pipe;
Length = IoStack->Parameters.Write.Length;
Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
+ Information = 0;
if (Irp->MdlAddress == NULL)
{
- DbgPrint ("Irp->MdlAddress == NULL\n");
+ DPRINT("Irp->MdlAddress == NULL\n");
Status = STATUS_UNSUCCESSFUL;
Length = 0;
goto done;
}
- if (Fcb->OtherSide == NULL)
+ if (ReaderFcb == NULL)
{
DPRINT("Pipe is NOT connected!\n");
+ if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
+ Status = STATUS_PIPE_LISTENING;
+ else if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
+ Status = STATUS_PIPE_DISCONNECTED;
+ else
+ Status = STATUS_UNSUCCESSFUL;
+ Length = 0;
+ goto done;
+ }
+
+ if (ReaderFcb->Data == NULL)
+ {
+ DPRINT("Pipe is NOT writable!\n");
Status = STATUS_UNSUCCESSFUL;
Length = 0;
goto done;
}
+ Status = STATUS_SUCCESS;
Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
+
+ ExAcquireFastMutex(&ReaderFcb->DataListLock);
+#ifndef NDEBUG
DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
+ HexDump(Buffer, Length);
+#endif
- PipeData = NpfsInitializePipeData(Buffer, Length);
- if (PipeData)
+ while(1)
{
- DPRINT("Attaching pipe data at %p (%d bytes)\n", PipeData, Length);
-
- KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
- InsertTailList(&Fcb->DataListHead, &PipeData->ListEntry);
-
- /* signal the readers ReadEvent */
- KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
-
- KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+ if (ReaderFcb->WriteQuotaAvailable == 0)
+ {
+ KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE);
+ if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+ {
+ Status = STATUS_PIPE_BROKEN;
+ ExReleaseFastMutex(&ReaderFcb->DataListLock);
+ goto done;
+ }
+ ExReleaseFastMutex(&ReaderFcb->DataListLock);
+
+ DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer);
+ Status = KeWaitForSingleObject(&Fcb->WriteEvent,
+ UserRequest,
+ KernelMode,
+ FALSE,
+ NULL);
+ DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+
+ ExAcquireFastMutex(&ReaderFcb->DataListLock);
+ /*
+ * It's possible that the event was signaled because the
+ * other side of pipe was closed.
+ */
+ if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+ {
+ DPRINT("PipeState: %x\n", Fcb->PipeState);
+ Status = STATUS_PIPE_BROKEN;
+ ExReleaseFastMutex(&ReaderFcb->DataListLock);
+ goto done;
+ }
+ }
+ if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
+ {
+ DPRINT("Byte stream mode\n");
+ while (Length > 0 && ReaderFcb->WriteQuotaAvailable > 0)
+ {
+ CopyLength = min(Length, ReaderFcb->WriteQuotaAvailable);
+ if ((ULONG_PTR)ReaderFcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderFcb->Data + ReaderFcb->MaxDataLength)
+ {
+ memcpy(ReaderFcb->WritePtr, Buffer, CopyLength);
+ ReaderFcb->WritePtr = (PVOID)((ULONG_PTR)ReaderFcb->WritePtr + CopyLength);
+ if ((ULONG_PTR)ReaderFcb->WritePtr == (ULONG_PTR)ReaderFcb->Data + ReaderFcb->MaxDataLength)
+ {
+ ReaderFcb->WritePtr = ReaderFcb->Data;
+ }
+ }
+ else
+ {
+ TempLength = (ULONG)((ULONG_PTR)ReaderFcb->Data + ReaderFcb->MaxDataLength - (ULONG_PTR)ReaderFcb->WritePtr);
+ memcpy(ReaderFcb->WritePtr, Buffer, TempLength);
+ memcpy(ReaderFcb->Data, Buffer + TempLength, CopyLength - TempLength);
+ ReaderFcb->WritePtr = (PVOID)((ULONG_PTR)ReaderFcb->Data + CopyLength - TempLength);
+ }
+
+ Buffer += CopyLength;
+ Length -= CopyLength;
+ Information += CopyLength;
+
+ ReaderFcb->ReadDataAvailable += CopyLength;
+ ReaderFcb->WriteQuotaAvailable -= CopyLength;
+ }
+
+ if (Length == 0)
+ {
+ KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE);
+ KeResetEvent(&Fcb->WriteEvent);
+ break;
+ }
+ }
+ else
+ {
+ DPRINT("Message mode\n");
+ if (Length > 0)
+ {
+ CopyLength = min(Length, ReaderFcb->WriteQuotaAvailable);
+ memcpy(ReaderFcb->Data, Buffer, CopyLength);
+
+ Information = CopyLength;
+ ReaderFcb->ReadDataAvailable = CopyLength;
+ ReaderFcb->WriteQuotaAvailable = 0;
+ }
+
+ if (Information > 0)
+ {
+ KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE);
+ KeResetEvent(&Fcb->WriteEvent);
+ break;
+ }
+ }
}
- else
- {
- Length = 0;
- Status = STATUS_INSUFFICIENT_RESOURCES;
- }
+
+ ExReleaseFastMutex(&ReaderFcb->DataListLock);
done:
Irp->IoStatus.Status = Status;
- Irp->IoStatus.Information = Length;
-
+ Irp->IoStatus.Information = Information;
+
IoCompleteRequest(Irp, IO_NO_INCREMENT);
-
- return(Status);
+
+ DPRINT("NpfsWrite done (Status %lx)\n", Status);
+
+ return Status;
}
/* EOF */