3 * COPYRIGHT: See COPYING in the top level directory
4 * PROJECT: ReactOS kernel
5 * FILE: drivers/fs/np/rw.c
6 * PURPOSE: Named pipe filesystem
7 * PROGRAMMER: David Welch <welch@cwcom.net>
10 /* INCLUDES ******************************************************************/
12 #include <ddk/ntddk.h>
13 #include <rosrtl/minmax.h>
19 /* FUNCTIONS *****************************************************************/
22 VOID
HexDump(PUCHAR Buffer
, ULONG Length
)
26 const char Hex
[] = "0123456789ABCDEF";
29 DbgPrint("---------------\n");
31 for (i
= 0; i
< ROUND_UP(Length
, 16); i
+= 16)
33 memset(Line
, ' ', 64);
36 for (j
= 0; j
< 16 && j
+ i
< Length
; j
++)
39 Line
[3*j
+ 0] = Hex
[ch
>> 4];
40 Line
[3*j
+ 1] = Hex
[ch
& 0x0f];
41 Line
[48 + j
] = isprint(ch
) ? ch
: '.';
43 DbgPrint("%s\n", Line
);
45 DbgPrint("---------------\n");
50 NpfsReadFromPipe(PNPFS_CONTEXT Context
);
53 NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject
,
56 PNPFS_CONTEXT Context
;
57 PNPFS_DEVICE_EXTENSION DeviceExt
;
59 DPRINT1("NpfsWaitingCancelRoutine() called\n");
61 IoReleaseCancelSpinLock(Irp
->CancelIrql
);
63 Context
= Irp
->Tail
.Overlay
.DriverContext
[0];
64 DeviceExt
= Context
->DeviceObject
->DeviceExtension
;
66 KeLockMutex(&DeviceExt
->PipeListLock
);
67 KeSetEvent(&Context
->Fcb
->Event
, IO_NO_INCREMENT
, FALSE
);
68 KeUnlockMutex(&DeviceExt
->PipeListLock
);
72 NpfsWaiterThread(PVOID Context
)
74 PNPFS_THREAD_CONTEXT ThreadContext
= (PNPFS_THREAD_CONTEXT
) Context
;
75 ULONG CurrentCount
, Count
= 0;
76 PNPFS_CONTEXT WaitContext
= NULL
;
78 BOOLEAN Terminate
= FALSE
;
79 BOOLEAN Cancel
= FALSE
;
82 KeLockMutex(&ThreadContext
->DeviceExt
->PipeListLock
);
86 CurrentCount
= ThreadContext
->Count
;
87 KeResetEvent(&ThreadContext
->Event
);
88 KeUnlockMutex(&ThreadContext
->DeviceExt
->PipeListLock
);
93 WaitContext
->Irp
->IoStatus
.Status
= STATUS_CANCELLED
;
94 WaitContext
->Irp
->IoStatus
.Information
= 0;
95 IoCompleteRequest(WaitContext
->Irp
, IO_NO_INCREMENT
);
96 ExFreePool(WaitContext
);
100 switch (WaitContext
->MajorFunction
)
103 NpfsReadFromPipe(WaitContext
);
114 Status
= KeWaitForMultipleObjects(CurrentCount
,
115 ThreadContext
->WaitObjectArray
,
121 ThreadContext
->WaitBlockArray
);
122 KeLockMutex(&ThreadContext
->DeviceExt
->PipeListLock
);
123 if (!NT_SUCCESS(Status
))
127 Count
= Status
- STATUS_SUCCESS
;
128 ASSERT (Count
<= CurrentCount
);
131 WaitContext
= ThreadContext
->WaitContextArray
[Count
];
132 ThreadContext
->Count
--;
133 ThreadContext
->DeviceExt
->EmptyWaiterCount
++;
134 ThreadContext
->WaitObjectArray
[Count
] = ThreadContext
->WaitObjectArray
[ThreadContext
->Count
];
135 ThreadContext
->WaitContextArray
[Count
] = ThreadContext
->WaitContextArray
[ThreadContext
->Count
];
136 IoAcquireCancelSpinLock(&oldIrql
);
137 Cancel
= NULL
== IoSetCancelRoutine(WaitContext
->Irp
, NULL
);
138 IoReleaseCancelSpinLock(oldIrql
);
142 /* someone has add a new wait request */
145 if (ThreadContext
->Count
== 1 && ThreadContext
->DeviceExt
->EmptyWaiterCount
>= MAXIMUM_WAIT_OBJECTS
)
147 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
148 RemoveEntryList(&ThreadContext
->ListEntry
);
149 ThreadContext
->DeviceExt
->EmptyWaiterCount
-= MAXIMUM_WAIT_OBJECTS
- 1;
153 KeUnlockMutex(&ThreadContext
->DeviceExt
->PipeListLock
);
154 ExFreePool(ThreadContext
);
158 NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt
, PNPFS_CONTEXT Context
, PNPFS_FCB Fcb
)
160 PLIST_ENTRY ListEntry
;
161 PNPFS_THREAD_CONTEXT ThreadContext
;
166 KeLockMutex(&DeviceExt
->PipeListLock
);
168 ListEntry
= DeviceExt
->ThreadListHead
.Flink
;
169 while (ListEntry
!= &DeviceExt
->ThreadListHead
)
171 ThreadContext
= CONTAINING_RECORD(ListEntry
, NPFS_THREAD_CONTEXT
, ListEntry
);
172 if (ThreadContext
->Count
< MAXIMUM_WAIT_OBJECTS
)
176 ListEntry
= ListEntry
->Flink
;
178 if (ListEntry
== &DeviceExt
->ThreadListHead
)
180 ThreadContext
= ExAllocatePool(NonPagedPool
, sizeof(NPFS_THREAD_CONTEXT
));
181 if (ThreadContext
== NULL
)
183 KeUnlockMutex(&DeviceExt
->PipeListLock
);
184 return STATUS_NO_MEMORY
;
186 ThreadContext
->DeviceExt
= DeviceExt
;
187 KeInitializeEvent(&ThreadContext
->Event
, NotificationEvent
, FALSE
);
188 ThreadContext
->Count
= 1;
189 ThreadContext
->WaitObjectArray
[0] = &ThreadContext
->Event
;
192 DPRINT("Creating a new system thread for waiting read requests\n");
194 Status
= PsCreateSystemThread(&hThread
,
200 (PVOID
)ThreadContext
);
201 if (!NT_SUCCESS(Status
))
203 ExFreePool(ThreadContext
);
204 KeUnlockMutex(&DeviceExt
->PipeListLock
);
207 InsertHeadList(&DeviceExt
->ThreadListHead
, &ThreadContext
->ListEntry
);
208 DeviceExt
->EmptyWaiterCount
+= MAXIMUM_WAIT_OBJECTS
- 1;
210 IoMarkIrpPending(Context
->Irp
);
211 Context
->Irp
->Tail
.Overlay
.DriverContext
[0] = Context
;
213 IoAcquireCancelSpinLock(&oldIrql
);
214 if (Context
->Irp
->Cancel
)
216 IoReleaseCancelSpinLock(oldIrql
);
217 Status
= STATUS_CANCELLED
;
221 IoSetCancelRoutine(Context
->Irp
, NpfsWaitingCancelRoutine
);
222 IoReleaseCancelSpinLock(oldIrql
);
223 ThreadContext
->WaitObjectArray
[ThreadContext
->Count
] = &Fcb
->Event
;
224 ThreadContext
->WaitContextArray
[ThreadContext
->Count
] = Context
;
225 ThreadContext
->Count
++;
226 DeviceExt
->EmptyWaiterCount
--;
227 KeSetEvent(&ThreadContext
->Event
, IO_NO_INCREMENT
, FALSE
);
228 Status
= STATUS_SUCCESS
;
230 KeUnlockMutex(&DeviceExt
->PipeListLock
);
235 NpfsReadFromPipe(PNPFS_CONTEXT Context
)
237 PIO_STACK_LOCATION IoStack
;
238 PFILE_OBJECT FileObject
;
249 DPRINT("NpfsReadFromPipe(Context %p)\n", Context
);
251 IoStack
= IoGetCurrentIrpStackLocation(Context
->Irp
);
252 FileObject
= IoStack
->FileObject
;
253 Fcb
= FileObject
->FsContext
;
255 WriterFcb
= Fcb
->OtherSide
;
257 if (Fcb
->Data
== NULL
)
259 DPRINT("Pipe is NOT readable!\n");
260 Status
= STATUS_UNSUCCESSFUL
;
265 Status
= STATUS_SUCCESS
;
266 Length
= IoStack
->Parameters
.Read
.Length
;
269 Buffer
= MmGetSystemAddressForMdl(Context
->Irp
->MdlAddress
);
270 ExAcquireFastMutex(&Fcb
->DataListLock
);
273 if (Fcb
->ReadDataAvailable
== 0)
275 if (Fcb
->PipeState
== FILE_PIPE_CONNECTED_STATE
)
277 KeSetEvent(&WriterFcb
->Event
, IO_NO_INCREMENT
, FALSE
);
279 ExReleaseFastMutex(&Fcb
->DataListLock
);
282 Status
= STATUS_SUCCESS
;
286 if (Fcb
->PipeState
!= FILE_PIPE_CONNECTED_STATE
&&
287 !(Fcb
->PipeState
== FILE_PIPE_LISTENING_STATE
&& Fcb
->PipeEnd
== FILE_PIPE_SERVER_END
))
289 DPRINT("PipeState: %x\n", Fcb
->PipeState
);
290 Status
= STATUS_PIPE_BROKEN
;
294 if (IoIsOperationSynchronous(Context
->Irp
))
296 /* Wait for ReadEvent to become signaled */
297 DPRINT("Waiting for readable data (%S)\n", Pipe
->PipeName
.Buffer
);
298 Status
= KeWaitForSingleObject(&Fcb
->Event
,
303 DPRINT("Finished waiting (%S)! Status: %x\n", Pipe
->PipeName
.Buffer
, Status
);
307 PNPFS_CONTEXT NewContext
;
309 NewContext
= ExAllocatePool(NonPagedPool
, sizeof(NPFS_CONTEXT
));
310 if (NewContext
== NULL
)
312 Status
= STATUS_NO_MEMORY
;
315 memcpy(NewContext
, Context
, sizeof(NPFS_CONTEXT
));
316 NewContext
->AllocatedFromPool
= TRUE
;
317 NewContext
->Fcb
= Fcb
;
318 NewContext
->MajorFunction
= IRP_MJ_READ
;
320 Status
= NpfsAddWaitingReader(Context
->DeviceObject
->DeviceExtension
, NewContext
, Fcb
);
322 if (NT_SUCCESS(Status
))
324 Status
= STATUS_PENDING
;
328 ExFreePool(NewContext
);
333 ExAcquireFastMutex(&Fcb
->DataListLock
);
336 if (Pipe
->ReadMode
== FILE_PIPE_BYTE_STREAM_MODE
)
338 DPRINT("Byte stream mode\n");
339 /* Byte stream mode */
340 while (Length
> 0 && Fcb
->ReadDataAvailable
> 0)
342 CopyLength
= RtlRosMin(Fcb
->ReadDataAvailable
, Length
);
343 if (Fcb
->ReadPtr
+ CopyLength
<= Fcb
->Data
+ Fcb
->MaxDataLength
)
345 memcpy(Buffer
, Fcb
->ReadPtr
, CopyLength
);
346 Fcb
->ReadPtr
+= CopyLength
;
347 if (Fcb
->ReadPtr
== Fcb
->Data
+ Fcb
->MaxDataLength
)
349 Fcb
->ReadPtr
= Fcb
->Data
;
354 TempLength
= Fcb
->Data
+ Fcb
->MaxDataLength
- Fcb
->ReadPtr
;
355 memcpy(Buffer
, Fcb
->ReadPtr
, TempLength
);
356 memcpy(Buffer
+ TempLength
, Fcb
->Data
, CopyLength
- TempLength
);
357 Fcb
->ReadPtr
= Fcb
->Data
+ CopyLength
- TempLength
;
360 Buffer
+= CopyLength
;
361 Length
-= CopyLength
;
362 Information
+= CopyLength
;
364 Fcb
->ReadDataAvailable
-= CopyLength
;
365 Fcb
->WriteQuotaAvailable
+= CopyLength
;
370 if (Fcb
->PipeState
== FILE_PIPE_CONNECTED_STATE
)
372 KeSetEvent(&WriterFcb
->Event
, IO_NO_INCREMENT
, FALSE
);
374 KeResetEvent(&Fcb
->Event
);
380 DPRINT("Message mode\n");
383 if (Fcb
->ReadDataAvailable
)
385 /* Truncate the message if the receive buffer is too small */
386 CopyLength
= RtlRosMin(Fcb
->ReadDataAvailable
, Length
);
387 memcpy(Buffer
, Fcb
->Data
, CopyLength
);
390 DPRINT("Length %d Buffer %x\n",CopyLength
,Buffer
);
391 HexDump((PUCHAR
)Buffer
, CopyLength
);
394 Information
= CopyLength
;
396 if (Fcb
->ReadDataAvailable
> Length
)
398 memmove(Fcb
->Data
, Fcb
->Data
+ Length
,
399 Fcb
->ReadDataAvailable
- Length
);
400 Fcb
->ReadDataAvailable
-= Length
;
401 Status
= STATUS_MORE_ENTRIES
;
405 KeResetEvent(&Fcb
->Event
);
406 if (Fcb
->PipeState
== FILE_PIPE_CONNECTED_STATE
)
408 KeSetEvent(&WriterFcb
->Event
, IO_NO_INCREMENT
, FALSE
);
410 Fcb
->ReadDataAvailable
= 0;
411 Fcb
->WriteQuotaAvailable
= Fcb
->MaxDataLength
;
422 ExReleaseFastMutex(&Fcb
->DataListLock
);
425 Context
->Irp
->IoStatus
.Status
= Status
;
426 Context
->Irp
->IoStatus
.Information
= Information
;
428 if (Status
!= STATUS_PENDING
)
430 IoCompleteRequest(Context
->Irp
, IO_NO_INCREMENT
);
433 if (Context
->AllocatedFromPool
)
437 DPRINT("NpfsRead done (Status %lx)\n", Status
);
443 NpfsRead(PDEVICE_OBJECT DeviceObject
,
446 NPFS_CONTEXT Context
;
448 Context
.AllocatedFromPool
= FALSE
;
449 Context
.DeviceObject
= DeviceObject
;
452 if (Irp
->MdlAddress
== NULL
)
454 DPRINT("Irp->MdlAddress == NULL\n");
455 Irp
->IoStatus
.Status
= STATUS_UNSUCCESSFUL
;
456 Irp
->IoStatus
.Information
= 0;
457 IoCompleteRequest(Irp
, IO_NO_INCREMENT
);
458 return STATUS_UNSUCCESSFUL
;
461 return NpfsReadFromPipe(&Context
);
465 NpfsWrite(PDEVICE_OBJECT DeviceObject
,
468 PIO_STACK_LOCATION IoStack
;
469 PFILE_OBJECT FileObject
;
470 PNPFS_FCB Fcb
= NULL
;
472 PNPFS_PIPE Pipe
= NULL
;
474 NTSTATUS Status
= STATUS_SUCCESS
;
481 DPRINT("NpfsWrite()\n");
483 IoStack
= IoGetCurrentIrpStackLocation(Irp
);
484 FileObject
= IoStack
->FileObject
;
485 DPRINT("FileObject %p\n", FileObject
);
486 DPRINT("Pipe name %wZ\n", &FileObject
->FileName
);
488 Fcb
= FileObject
->FsContext
;
489 ReaderFcb
= Fcb
->OtherSide
;
492 Length
= IoStack
->Parameters
.Write
.Length
;
493 Offset
= IoStack
->Parameters
.Write
.ByteOffset
.u
.LowPart
;
496 if (Irp
->MdlAddress
== NULL
)
498 DPRINT("Irp->MdlAddress == NULL\n");
499 Status
= STATUS_UNSUCCESSFUL
;
504 if (ReaderFcb
== NULL
)
506 DPRINT("Pipe is NOT connected!\n");
507 if (Fcb
->PipeState
== FILE_PIPE_LISTENING_STATE
)
508 Status
= STATUS_PIPE_LISTENING
;
509 else if (Fcb
->PipeState
== FILE_PIPE_DISCONNECTED_STATE
)
510 Status
= STATUS_PIPE_DISCONNECTED
;
512 Status
= STATUS_UNSUCCESSFUL
;
517 if (ReaderFcb
->Data
== NULL
)
519 DPRINT("Pipe is NOT writable!\n");
520 Status
= STATUS_UNSUCCESSFUL
;
525 Status
= STATUS_SUCCESS
;
526 Buffer
= MmGetSystemAddressForMdl (Irp
->MdlAddress
);
528 ExAcquireFastMutex(&ReaderFcb
->DataListLock
);
530 DPRINT("Length %d Buffer %x Offset %x\n",Length
,Buffer
,Offset
);
531 HexDump(Buffer
, Length
);
536 if (ReaderFcb
->WriteQuotaAvailable
== 0)
538 KeSetEvent(&ReaderFcb
->Event
, IO_NO_INCREMENT
, FALSE
);
539 ExReleaseFastMutex(&ReaderFcb
->DataListLock
);
540 if (Fcb
->PipeState
!= FILE_PIPE_CONNECTED_STATE
)
542 Status
= STATUS_PIPE_BROKEN
;
546 DPRINT("Waiting for buffer space (%S)\n", Pipe
->PipeName
.Buffer
);
547 Status
= KeWaitForSingleObject(&Fcb
->Event
,
552 DPRINT("Finished waiting (%S)! Status: %x\n", Pipe
->PipeName
.Buffer
, Status
);
555 * It's possible that the event was signaled because the
556 * other side of pipe was closed.
558 if (Fcb
->PipeState
!= FILE_PIPE_CONNECTED_STATE
)
560 DPRINT("PipeState: %x\n", Fcb
->PipeState
);
561 Status
= STATUS_PIPE_BROKEN
;
564 ExAcquireFastMutex(&ReaderFcb
->DataListLock
);
567 if (Pipe
->WriteMode
== FILE_PIPE_BYTE_STREAM_MODE
)
569 DPRINT("Byte stream mode\n");
570 while (Length
> 0 && ReaderFcb
->WriteQuotaAvailable
> 0)
572 CopyLength
= RtlRosMin(Length
, ReaderFcb
->WriteQuotaAvailable
);
573 if (ReaderFcb
->WritePtr
+ CopyLength
<= ReaderFcb
->Data
+ ReaderFcb
->MaxDataLength
)
575 memcpy(ReaderFcb
->WritePtr
, Buffer
, CopyLength
);
576 ReaderFcb
->WritePtr
+= CopyLength
;
577 if (ReaderFcb
->WritePtr
== ReaderFcb
->Data
+ ReaderFcb
->MaxDataLength
)
579 ReaderFcb
->WritePtr
= ReaderFcb
->Data
;
584 TempLength
= ReaderFcb
->Data
+ ReaderFcb
->MaxDataLength
- ReaderFcb
->WritePtr
;
585 memcpy(ReaderFcb
->WritePtr
, Buffer
, TempLength
);
586 memcpy(ReaderFcb
->Data
, Buffer
+ TempLength
, CopyLength
- TempLength
);
587 ReaderFcb
->WritePtr
= ReaderFcb
->Data
+ CopyLength
- TempLength
;
590 Buffer
+= CopyLength
;
591 Length
-= CopyLength
;
592 Information
+= CopyLength
;
594 ReaderFcb
->ReadDataAvailable
+= CopyLength
;
595 ReaderFcb
->WriteQuotaAvailable
-= CopyLength
;
600 KeSetEvent(&ReaderFcb
->Event
, IO_NO_INCREMENT
, FALSE
);
601 KeResetEvent(&Fcb
->Event
);
607 DPRINT("Message mode\n");
610 CopyLength
= RtlRosMin(Length
, ReaderFcb
->WriteQuotaAvailable
);
611 memcpy(ReaderFcb
->Data
, Buffer
, CopyLength
);
613 Information
= CopyLength
;
614 ReaderFcb
->ReadDataAvailable
= CopyLength
;
615 ReaderFcb
->WriteQuotaAvailable
= 0;
620 KeSetEvent(&ReaderFcb
->Event
, IO_NO_INCREMENT
, FALSE
);
621 KeResetEvent(&Fcb
->Event
);
627 ExReleaseFastMutex(&ReaderFcb
->DataListLock
);
630 Irp
->IoStatus
.Status
= Status
;
631 Irp
->IoStatus
.Information
= Information
;
633 IoCompleteRequest(Irp
, IO_NO_INCREMENT
);
635 DPRINT("NpfsWrite done (Status %lx)\n", Status
);