[NPFS]
[reactos.git] / reactos / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/filesystems/npfs/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 ULONG i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44
45 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
46 static VOID NTAPI
47 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
48 IN PIRP Irp)
49 {
50 PNPFS_CONTEXT Context;
51 PIO_STACK_LOCATION IoStack;
52 PNPFS_VCB Vcb;
53 PNPFS_CCB Ccb;
54 PLIST_ENTRY ListEntry;
55 PNPFS_THREAD_CONTEXT ThreadContext;
56 ULONG i;
57
58 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
59
60 IoReleaseCancelSpinLock(Irp->CancelIrql);
61
62 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
63 Vcb = (PNPFS_VCB)DeviceObject->DeviceExtension;
64 IoStack = IoGetCurrentIrpStackLocation(Irp);
65 Ccb = IoStack->FileObject->FsContext2;
66
67 KeLockMutex(&Vcb->PipeListLock);
68 ExAcquireFastMutex(&Ccb->DataListLock);
69 switch(IoStack->MajorFunction)
70 {
71 case IRP_MJ_READ:
72 ListEntry = Vcb->ThreadListHead.Flink;
73 while (ListEntry != &Vcb->ThreadListHead)
74 {
75 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
76 /* Real events start at index 1 */
77 for (i = 1; i < ThreadContext->Count; i++)
78 {
79 if (ThreadContext->WaitIrpArray[i] == Irp)
80 {
81 ASSERT(ThreadContext->WaitObjectArray[i] == Context->WaitEvent);
82
83 ThreadContext->WaitIrpArray[i] = NULL;
84
85 RemoveEntryList(&Context->ListEntry);
86
87 Irp->IoStatus.Status = STATUS_CANCELLED;
88 Irp->IoStatus.Information = 0;
89
90 IoCompleteRequest(Irp, IO_NO_INCREMENT);
91
92 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
93
94 ExReleaseFastMutex(&Ccb->DataListLock);
95 KeUnlockMutex(&Vcb->PipeListLock);
96
97 return;
98 }
99 }
100 ListEntry = ListEntry->Flink;
101 }
102
103 RemoveEntryList(&Context->ListEntry);
104
105 ExReleaseFastMutex(&Ccb->DataListLock);
106 KeUnlockMutex(&Vcb->PipeListLock);
107
108 Irp->IoStatus.Status = STATUS_CANCELLED;
109 Irp->IoStatus.Information = 0;
110
111 IoCompleteRequest(Irp, IO_NO_INCREMENT);
112 break;
113 default:
114 ASSERT(FALSE);
115 }
116 }
117
118 static KSTART_ROUTINE NpfsWaiterThread;
119 static VOID NTAPI
120 NpfsWaiterThread(PVOID InitContext)
121 {
122 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
123 ULONG CurrentCount;
124 ULONG Count = 0, i;
125 PIRP Irp = NULL;
126 NTSTATUS Status;
127 PIO_STACK_LOCATION IoStack = NULL;
128 KIRQL OldIrql;
129
130 KeLockMutex(&ThreadContext->Vcb->PipeListLock);
131
132 while (1)
133 {
134 CurrentCount = ThreadContext->Count;
135 KeUnlockMutex(&ThreadContext->Vcb->PipeListLock);
136 IoAcquireCancelSpinLock(&OldIrql);
137 if (Irp && IoSetCancelRoutine(Irp, NULL) != NULL)
138 {
139 IoReleaseCancelSpinLock(OldIrql);
140 IoStack = IoGetCurrentIrpStackLocation(Irp);
141 switch (IoStack->MajorFunction)
142 {
143 case IRP_MJ_READ:
144 NpfsRead(IoStack->DeviceObject, Irp);
145 break;
146 default:
147 ASSERT(FALSE);
148 }
149 }
150 else
151 {
152 IoReleaseCancelSpinLock(OldIrql);
153 }
154 Status = KeWaitForMultipleObjects(CurrentCount,
155 ThreadContext->WaitObjectArray,
156 WaitAny,
157 Executive,
158 KernelMode,
159 FALSE,
160 NULL,
161 ThreadContext->WaitBlockArray);
162 if (!NT_SUCCESS(Status))
163 {
164 ASSERT(FALSE);
165 }
166 KeLockMutex(&ThreadContext->Vcb->PipeListLock);
167 Count = Status - STATUS_WAIT_0;
168 ASSERT (Count < CurrentCount);
169 if (Count > 0)
170 {
171 Irp = ThreadContext->WaitIrpArray[Count];
172 ThreadContext->Count--;
173 ThreadContext->Vcb->EmptyWaiterCount++;
174 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
175 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
176 }
177 else
178 {
179 /* someone has added a new wait request or cancelled an old one */
180 Irp = NULL;
181
182 /* Look for cancelled requests */
183 for (i = 1; i < ThreadContext->Count; i++)
184 {
185 if (ThreadContext->WaitIrpArray[i] == NULL)
186 {
187 ThreadContext->Count--;
188 ThreadContext->Vcb->EmptyWaiterCount++;
189 ThreadContext->WaitObjectArray[i] = ThreadContext->WaitObjectArray[ThreadContext->Count];
190 ThreadContext->WaitIrpArray[i] = ThreadContext->WaitIrpArray[ThreadContext->Count];
191 }
192 }
193 }
194 if (ThreadContext->Count == 1 && ThreadContext->Vcb->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
195 {
196 /* there is another thread with empty wait slots, we can remove our thread from the list */
197 RemoveEntryList(&ThreadContext->ListEntry);
198 ExFreePoolWithTag(ThreadContext, TAG_NPFS_THREAD_CONTEXT);
199 KeUnlockMutex(&ThreadContext->Vcb->PipeListLock);
200 break;
201 }
202 }
203 }
204
205 static NTSTATUS
206 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
207 IN PIRP Irp)
208 {
209 PLIST_ENTRY ListEntry;
210 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
211 PNPFS_CONTEXT Context;
212 HANDLE hThread;
213 PNPFS_VCB Vcb;
214 KIRQL oldIrql;
215 NTSTATUS Status;
216
217 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
218 Vcb = (PNPFS_VCB)DeviceObject->DeviceExtension;
219
220 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
221
222 KeLockMutex(&Vcb->PipeListLock);
223
224 ListEntry = Vcb->ThreadListHead.Flink;
225 while (ListEntry != &Vcb->ThreadListHead)
226 {
227 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
228 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
229 {
230 break;
231 }
232 ListEntry = ListEntry->Flink;
233 }
234
235 if (ListEntry == &Vcb->ThreadListHead)
236 {
237 ThreadContext = ExAllocatePoolWithTag(NonPagedPool,
238 sizeof(NPFS_THREAD_CONTEXT),
239 TAG_NPFS_THREAD_CONTEXT);
240 if (ThreadContext == NULL)
241 {
242 KeUnlockMutex(&Vcb->PipeListLock);
243 return STATUS_NO_MEMORY;
244 }
245
246 ThreadContext->Vcb = Vcb;
247 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
248 ThreadContext->Count = 1;
249 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
250
251 DPRINT("Creating a new system thread for waiting read/write requests\n");
252
253 Status = PsCreateSystemThread(&hThread,
254 THREAD_ALL_ACCESS,
255 NULL,
256 NULL,
257 NULL,
258 NpfsWaiterThread,
259 (PVOID)ThreadContext);
260 if (!NT_SUCCESS(Status))
261 {
262 ExFreePoolWithTag(ThreadContext, TAG_NPFS_THREAD_CONTEXT);
263 KeUnlockMutex(&Vcb->PipeListLock);
264 return Status;
265 }
266
267 InsertHeadList(&Vcb->ThreadListHead, &ThreadContext->ListEntry);
268 Vcb->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
269 }
270 IoMarkIrpPending(Irp);
271
272 IoAcquireCancelSpinLock(&oldIrql);
273 if (Irp->Cancel)
274 {
275 IoReleaseCancelSpinLock(oldIrql);
276 Status = STATUS_CANCELLED;
277 }
278 else
279 {
280 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
281 IoReleaseCancelSpinLock(oldIrql);
282 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
283 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
284 ThreadContext->Count++;
285 Vcb->EmptyWaiterCount--;
286 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
287 Status = STATUS_SUCCESS;
288 }
289 KeUnlockMutex(&Vcb->PipeListLock);
290 return Status;
291 }
292
293 NTSTATUS NTAPI
294 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
295 IN PIRP Irp)
296 {
297 PFILE_OBJECT FileObject;
298 NTSTATUS Status;
299 NTSTATUS OriginalStatus = STATUS_SUCCESS;
300 PNPFS_CCB Ccb;
301 PNPFS_CONTEXT Context;
302 KEVENT Event;
303 ULONG Length;
304 ULONG Information = 0;
305 ULONG CopyLength = 0;
306 ULONG TempLength;
307 BOOLEAN IsOriginalRequest = TRUE;
308 PVOID Buffer;
309
310 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
311
312 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
313 DPRINT("FileObject %p\n", FileObject);
314 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
315 Ccb = FileObject->FsContext2;
316
317 /* Fail, if the CCB is not a pipe CCB */
318 if (Ccb->Type != CCB_PIPE)
319 {
320 DPRINT("Not a pipe!\n");
321 Status = STATUS_INVALID_PARAMETER;
322 Irp->IoStatus.Information = 0;
323 goto done;
324 }
325
326 if (Irp->MdlAddress == NULL)
327 {
328 DPRINT("Irp->MdlAddress == NULL\n");
329 Status = STATUS_UNSUCCESSFUL;
330 Irp->IoStatus.Information = 0;
331 goto done;
332 }
333
334 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
335
336 if ((Ccb->OtherSide) && (Ccb->OtherSide->PipeState == FILE_PIPE_DISCONNECTED_STATE) && (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE))
337 {
338 DPRINT("Both Client and Server are disconnected!\n");
339 Status = STATUS_PIPE_DISCONNECTED;
340 Irp->IoStatus.Information = 0;
341 goto done;
342
343 }
344
345 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
346 {
347 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
348 Status = STATUS_PIPE_BROKEN;
349 else if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
350 Status = STATUS_PIPE_LISTENING;
351 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
352 Status = STATUS_PIPE_DISCONNECTED;
353 else
354 Status = STATUS_UNSUCCESSFUL;
355 Irp->IoStatus.Information = 0;
356 goto done;
357 }
358
359 if (Ccb->Data == NULL)
360 {
361 DPRINT("Pipe is NOT readable!\n");
362 Status = STATUS_UNSUCCESSFUL;
363 Irp->IoStatus.Information = 0;
364 goto done;
365 }
366
367 ExAcquireFastMutex(&Ccb->DataListLock);
368
369 if (IoIsOperationSynchronous(Irp))
370 {
371 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
372 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
373 {
374 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
375 Context->WaitEvent = &Event;
376 ExReleaseFastMutex(&Ccb->DataListLock);
377 KeWaitForSingleObject(&Event,
378 Executive,
379 KernelMode,
380 FALSE,
381 NULL);
382 ExAcquireFastMutex(&Ccb->DataListLock);
383 }
384 Irp->IoStatus.Information = 0;
385 }
386 else
387 {
388 KIRQL oldIrql;
389 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
390 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
391 {
392 /* this is a new request */
393 Irp->IoStatus.Information = 0;
394 Context->WaitEvent = &Ccb->ReadEvent;
395 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
396 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
397 {
398 /* there was already a request on the list */
399 IoAcquireCancelSpinLock(&oldIrql);
400 if (Irp->Cancel)
401 {
402 IoReleaseCancelSpinLock(oldIrql);
403 RemoveEntryList(&Context->ListEntry);
404 ExReleaseFastMutex(&Ccb->DataListLock);
405 Status = STATUS_CANCELLED;
406 goto done;
407 }
408 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
409 IoReleaseCancelSpinLock(oldIrql);
410 ExReleaseFastMutex(&Ccb->DataListLock);
411 IoMarkIrpPending(Irp);
412 Status = STATUS_PENDING;
413 goto done;
414 }
415 }
416 }
417
418 while (1)
419 {
420 Buffer = MmGetSystemAddressForMdlSafe(Irp->MdlAddress,
421 NormalPagePriority);
422 Information = Irp->IoStatus.Information;
423 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
424 ASSERT(Information <= Length);
425 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
426 Length -= Information;
427 Status = STATUS_SUCCESS;
428
429 while (1)
430 {
431 if (Ccb->ReadDataAvailable == 0)
432 {
433 ULONG ConnectionSideReadMode;
434
435 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
436 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
437
438 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
439 {
440 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
441 }
442 if (Information > 0 &&
443 (ConnectionSideReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
444 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
445 {
446 break;
447 }
448 ASSERT(Ccb->ReadDataAvailable == 0);
449 if ((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) || (!Ccb->OtherSide))
450 {
451 DPRINT("PipeState: %x\n", Ccb->PipeState);
452 Status = STATUS_PIPE_BROKEN;
453 break;
454 }
455 ExReleaseFastMutex(&Ccb->DataListLock);
456
457 if (IoIsOperationSynchronous(Irp))
458 {
459 /* Wait for ReadEvent to become signaled */
460
461 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
462 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
463 UserRequest,
464 Irp->RequestorMode,
465 (FileObject->Flags & FO_ALERTABLE_IO) != 0,
466 NULL);
467 DPRINT("Finished waiting (%wZ)! Status: %lx\n", &Ccb->Fcb->PipeName, Status);
468
469 ExAcquireFastMutex(&Ccb->DataListLock);
470
471 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC) || (Status == STATUS_ALERTED))
472 {
473 Status = STATUS_CANCELLED;
474 break;
475 }
476 if (!NT_SUCCESS(Status))
477 {
478 ASSERT(FALSE);
479 }
480 }
481 else
482 {
483 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
484
485 Context->WaitEvent = &Ccb->ReadEvent;
486 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
487
488 if (NT_SUCCESS(Status))
489 {
490 Status = STATUS_PENDING;
491 goto done;
492 }
493 ExAcquireFastMutex(&Ccb->DataListLock);
494 break;
495 }
496 }
497 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
498
499 /* If the pipe type and read mode are both byte stream */
500 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
501 {
502 DPRINT("Byte stream mode: Ccb->Data %x\n", Ccb->Data);
503 /* Byte stream mode */
504 while (Length > 0 && Ccb->ReadDataAvailable > 0)
505 {
506 CopyLength = min(Ccb->ReadDataAvailable, Length);
507 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
508 {
509 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
510 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
511 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
512 {
513 Ccb->ReadPtr = Ccb->Data;
514 }
515 }
516 else
517 {
518 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
519 memcpy(Buffer, Ccb->ReadPtr, TempLength);
520 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
521 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
522 }
523
524 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
525 Length -= CopyLength;
526 Information += CopyLength;
527
528 Ccb->ReadDataAvailable -= CopyLength;
529 Ccb->WriteQuotaAvailable += CopyLength;
530 }
531
532 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
533 {
534 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
535 {
536 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
537 }
538 KeResetEvent(&Ccb->ReadEvent);
539 break;
540 }
541 }
542 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
543 {
544 DPRINT("Message mode: Ccb>Data %x\n", Ccb->Data);
545
546 /* Check if buffer is full and the read pointer is not at the start of the buffer */
547 if ((Ccb->WriteQuotaAvailable == 0) && (Ccb->ReadPtr > Ccb->Data))
548 {
549 Ccb->WriteQuotaAvailable += (ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data;
550 memcpy(Ccb->Data, Ccb->ReadPtr, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->ReadPtr);
551 Ccb->WritePtr = (PVOID)((ULONG_PTR)Ccb->WritePtr - ((ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data));
552 Ccb->ReadPtr = Ccb->Data;
553 ASSERT((ULONG_PTR)Ccb->WritePtr < ((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength));
554 ASSERT(Ccb->WritePtr >= Ccb->Data);
555 }
556
557 /* For Message mode, the Message length is stored in the buffer preceeding the Message. */
558 if (Ccb->ReadDataAvailable)
559 {
560 ULONG NextMessageLength = 0;
561
562 /* First get the size of the message */
563 memcpy(&NextMessageLength, Ccb->ReadPtr, sizeof(NextMessageLength));
564
565 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
566 {
567 DPRINT1("Possible memory corruption.\n");
568 HexDump(Ccb->Data, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->Data);
569 ASSERT(FALSE);
570 }
571
572 /* Use the smaller value */
573 CopyLength = min(NextMessageLength, Length);
574 ASSERT(CopyLength > 0);
575 ASSERT(CopyLength <= Ccb->ReadDataAvailable);
576 /* retrieve the message from the buffer */
577 memcpy(Buffer, (PVOID)((ULONG_PTR)Ccb->ReadPtr + sizeof(NextMessageLength)), CopyLength);
578
579 if (Ccb->ReadDataAvailable > CopyLength)
580 {
581 if (CopyLength < NextMessageLength)
582 /* Client only requested part of the message */
583 {
584 /* Calculate the remaining message new size */
585 ULONG NewMessageSize = NextMessageLength - CopyLength;
586
587 /* Update ReadPtr to point to new Message size location */
588 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
589
590 /* Write a new Message size to buffer for the part of the message still there */
591 memcpy(Ccb->ReadPtr, &NewMessageSize, sizeof(NewMessageSize));
592 }
593 else
594 /* Client wanted the entire message */
595 {
596 /* Update ReadPtr to point to next message size */
597 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength + sizeof(CopyLength));
598 }
599 }
600 else
601 {
602 /* This was the last Message, so just zero start of buffer for safety sake */
603 memset(Ccb->Data, 0, NextMessageLength + sizeof(NextMessageLength));
604
605 /* Reset to MaxDataLength as partial message retrievals dont
606 give the length back to Quota */
607 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
608
609 /* reset read and write pointer to beginning of buffer */
610 Ccb->WritePtr = Ccb->Data;
611 Ccb->ReadPtr = Ccb->Data;
612 }
613 #ifndef NDEBUG
614 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
615 HexDump((PUCHAR)Buffer, CopyLength);
616 #endif
617
618 Information += CopyLength;
619
620 Ccb->ReadDataAvailable -= CopyLength;
621
622 ASSERT(Ccb->WriteQuotaAvailable <= Ccb->MaxDataLength);
623 }
624
625 if (Information > 0)
626 {
627 ULONG ConnectionSideReadMode;
628
629 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
630 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
631
632 if ((ConnectionSideReadMode == FILE_PIPE_BYTE_STREAM_MODE) && (Ccb->ReadDataAvailable) && (Length > CopyLength))
633 {
634 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
635 Length -= CopyLength;
636 }
637 else
638 {
639 KeResetEvent(&Ccb->ReadEvent);
640
641 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->WriteQuotaAvailable > 0) && (Ccb->OtherSide))
642 {
643 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
644 }
645 break;
646 }
647 }
648 }
649 else
650 {
651 DPRINT1("Unhandled Pipe Mode!\n");
652 ASSERT(FALSE);
653 }
654 }
655 Irp->IoStatus.Information = Information;
656 Irp->IoStatus.Status = Status;
657
658 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
659
660 if (IoIsOperationSynchronous(Irp))
661 {
662 RemoveEntryList(&Context->ListEntry);
663 if (!IsListEmpty(&Ccb->ReadRequestListHead))
664 {
665 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
666 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
667 }
668 ExReleaseFastMutex(&Ccb->DataListLock);
669 IoCompleteRequest(Irp, IO_NO_INCREMENT);
670
671 DPRINT("NpfsRead done (Status %lx)\n", Status);
672 return Status;
673 }
674 else
675 {
676 KIRQL oldIrql;
677
678 if (IsOriginalRequest)
679 {
680 IsOriginalRequest = FALSE;
681 OriginalStatus = Status;
682 }
683 if (Status == STATUS_PENDING)
684 {
685 ExReleaseFastMutex(&Ccb->DataListLock);
686 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
687 return OriginalStatus;
688 }
689 RemoveEntryList(&Context->ListEntry);
690 IoCompleteRequest(Irp, IO_NO_INCREMENT);
691 if (IsListEmpty(&Ccb->ReadRequestListHead))
692 {
693 ExReleaseFastMutex(&Ccb->DataListLock);
694 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
695 return OriginalStatus;
696 }
697
698 IoAcquireCancelSpinLock(&oldIrql);
699 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
700
701 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
702 /* Verify the Irp wasnt cancelled */
703 if (Irp->Cancel)
704 {
705 IoReleaseCancelSpinLock(oldIrql);
706 RemoveEntryList(&Context->ListEntry);
707 ExReleaseFastMutex(&Ccb->DataListLock);
708 Status = STATUS_CANCELLED;
709 goto done;
710 }
711 /* The Irp will now be handled, so remove the CancelRoutine */
712 (void)IoSetCancelRoutine(Irp, NULL);
713 IoReleaseCancelSpinLock(oldIrql);
714 }
715 }
716
717 done:
718 Irp->IoStatus.Status = Status;
719
720 if (Status != STATUS_PENDING)
721 {
722 IoCompleteRequest(Irp, IO_NO_INCREMENT);
723 }
724 DPRINT("NpfsRead done (Status %lx)\n", Status);
725
726 return Status;
727 }
728
729 NTSTATUS NTAPI
730 NpfsWrite(PDEVICE_OBJECT DeviceObject,
731 PIRP Irp)
732 {
733 PIO_STACK_LOCATION IoStack;
734 PFILE_OBJECT FileObject;
735 PNPFS_FCB Fcb = NULL;
736 PNPFS_CCB Ccb = NULL;
737 PNPFS_CCB ReaderCcb;
738 PUCHAR Buffer;
739 NTSTATUS Status = STATUS_SUCCESS;
740 ULONG Length;
741 ULONG Offset;
742 ULONG Information = 0;
743 ULONG CopyLength;
744 ULONG TempLength;
745
746 UNREFERENCED_PARAMETER(DeviceObject);
747
748 DPRINT("NpfsWrite()\n");
749
750 IoStack = IoGetCurrentIrpStackLocation(Irp);
751 FileObject = IoStack->FileObject;
752 DPRINT("FileObject %p\n", FileObject);
753 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
754
755 Ccb = FileObject->FsContext2;
756
757 /* Fail, if the CCB is not a pipe CCB */
758 if (Ccb->Type != CCB_PIPE)
759 {
760 DPRINT("Not a pipe!\n");
761 Status = STATUS_INVALID_PARAMETER;
762 Length = 0;
763 goto done;
764 }
765
766 ReaderCcb = Ccb->OtherSide;
767 Fcb = Ccb->Fcb;
768
769 Length = IoStack->Parameters.Write.Length;
770 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
771
772 if (Irp->MdlAddress == NULL)
773 {
774 DPRINT("Irp->MdlAddress == NULL\n");
775 Status = STATUS_UNSUCCESSFUL;
776 Length = 0;
777 goto done;
778 }
779
780 if ((ReaderCcb == NULL) || (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
781 {
782 DPRINT("Pipe is NOT connected!\n");
783 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
784 Status = STATUS_PIPE_LISTENING;
785 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
786 Status = STATUS_PIPE_DISCONNECTED;
787 else
788 Status = STATUS_UNSUCCESSFUL;
789 Length = 0;
790 goto done;
791 }
792
793 if (ReaderCcb->Data == NULL)
794 {
795 DPRINT("Pipe is NOT writable!\n");
796 Status = STATUS_UNSUCCESSFUL;
797 Length = 0;
798 goto done;
799 }
800
801 Status = STATUS_SUCCESS;
802 Buffer = MmGetSystemAddressForMdlSafe(Irp->MdlAddress, NormalPagePriority);
803
804 if (!Buffer)
805 {
806 DPRINT("MmGetSystemAddressForMdlSafe failed\n");
807 Status = STATUS_INSUFFICIENT_RESOURCES;
808 Length = 0;
809 goto done;
810
811 }
812
813 ExAcquireFastMutex(&ReaderCcb->DataListLock);
814
815 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
816
817 #ifndef NDEBUG
818 HexDump(Buffer, Length);
819 #endif
820
821 while (1)
822 {
823 if (ReaderCcb->WriteQuotaAvailable == 0)
824 {
825 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
826 {
827 Status = STATUS_PIPE_BROKEN;
828 ExReleaseFastMutex(&ReaderCcb->DataListLock);
829 goto done;
830 }
831 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
832 ExReleaseFastMutex(&ReaderCcb->DataListLock);
833
834 DPRINT("Write Waiting for buffer space (%wZ)\n", &Fcb->PipeName);
835 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
836 UserRequest,
837 Irp->RequestorMode,
838 (FileObject->Flags & FO_ALERTABLE_IO) != 0,
839 NULL);
840 DPRINT("Write Finished waiting (%wZ)! Status: %lx\n", &Fcb->PipeName, Status);
841
842 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC) || (Status == STATUS_ALERTED))
843 {
844 Status = STATUS_CANCELLED;
845 goto done;
846 }
847 if (!NT_SUCCESS(Status))
848 {
849 ASSERT(FALSE);
850 }
851 /*
852 * It's possible that the event was signaled because the
853 * other side of pipe was closed.
854 */
855 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
856 {
857 DPRINT("PipeState: %x\n", Ccb->PipeState);
858 Status = STATUS_PIPE_BROKEN;
859 goto done;
860 }
861 /* Check that the pipe has not been closed */
862 if (ReaderCcb->PipeState != FILE_PIPE_CONNECTED_STATE || !ReaderCcb->OtherSide)
863 {
864 Status = STATUS_PIPE_BROKEN;
865 goto done;
866 }
867 ExAcquireFastMutex(&ReaderCcb->DataListLock);
868 }
869
870 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
871 {
872 DPRINT("Byte stream mode: Ccb->Data %x, Ccb->WritePtr %x\n", ReaderCcb->Data, ReaderCcb->WritePtr);
873
874 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
875 {
876 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
877
878 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
879 {
880 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
881 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
882 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
883 {
884 ReaderCcb->WritePtr = ReaderCcb->Data;
885 }
886 }
887 else
888 {
889
890 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength -
891 (ULONG_PTR)ReaderCcb->WritePtr);
892
893 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
894 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
895 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
896 }
897
898 Buffer += CopyLength;
899 Length -= CopyLength;
900 Information += CopyLength;
901
902 ReaderCcb->ReadDataAvailable += CopyLength;
903 ReaderCcb->WriteQuotaAvailable -= CopyLength;
904 }
905
906 if (Length == 0)
907 {
908 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
909 KeResetEvent(&Ccb->WriteEvent);
910 break;
911 }
912 }
913 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
914 {
915 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
916 DPRINT("Message mode: Ccb->Data %x, Ccb->WritePtr %x\n",ReaderCcb->Data, ReaderCcb->WritePtr);
917 if (Length > 0)
918 {
919 /* Verify the WritePtr is still inside the buffer */
920 if (((ULONG_PTR)ReaderCcb->WritePtr > ((ULONG_PTR)ReaderCcb->Data + (ULONG_PTR)ReaderCcb->MaxDataLength)) ||
921 ((ULONG_PTR)ReaderCcb->WritePtr < (ULONG_PTR)ReaderCcb->Data))
922 {
923 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
924 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength %lu\n",
925 ReaderCcb->WritePtr, ReaderCcb->Data, ReaderCcb->MaxDataLength);
926 ASSERT(FALSE);
927 }
928
929 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
930 if (CopyLength > ReaderCcb->WriteQuotaAvailable)
931 {
932 DPRINT1("Writing %lu byte to pipe would overflow as only %lu bytes are available\n",
933 CopyLength, ReaderCcb->WriteQuotaAvailable);
934 ASSERT(FALSE);
935 }
936
937 /* First Copy the Length of the message into the pipes buffer */
938 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
939
940 /* Now the user buffer itself */
941 memcpy((PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength)), Buffer, CopyLength);
942
943 /* Update the write pointer */
944 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
945
946 Information += CopyLength;
947
948 ReaderCcb->ReadDataAvailable += CopyLength;
949
950 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
951
952 if ((ULONG_PTR)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
953 {
954 DPRINT1("QuotaAvailable is greater than buffer size!\n");
955 ASSERT(FALSE);
956 }
957 }
958
959 if (Information > 0)
960 {
961 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
962 KeResetEvent(&Ccb->WriteEvent);
963 break;
964 }
965 }
966 else
967 {
968 DPRINT1("Unhandled Pipe Type Mode and Read Write Mode!\n");
969 ASSERT(FALSE);
970 }
971 }
972
973 ExReleaseFastMutex(&ReaderCcb->DataListLock);
974
975 done:
976 Irp->IoStatus.Status = Status;
977 Irp->IoStatus.Information = Information;
978
979 IoCompleteRequest(Irp, IO_NO_INCREMENT);
980
981 DPRINT("NpfsWrite done (Status %lx)\n", Status);
982
983 return Status;
984 }
985
986 /* EOF */