Sync to trunk r39350.
[reactos.git] / reactos / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 int i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44
45 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
46 static VOID NTAPI
47 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
48 IN PIRP Irp)
49 {
50 PNPFS_CONTEXT Context;
51 PNPFS_DEVICE_EXTENSION DeviceExt;
52 PIO_STACK_LOCATION IoStack;
53 PNPFS_CCB Ccb;
54 BOOLEAN Complete = FALSE;
55
56 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
57
58 IoReleaseCancelSpinLock(Irp->CancelIrql);
59
60 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
61 DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
62 IoStack = IoGetCurrentIrpStackLocation(Irp);
63 Ccb = IoStack->FileObject->FsContext2;
64
65 KeLockMutex(&DeviceExt->PipeListLock);
66 ExAcquireFastMutex(&Ccb->DataListLock);
67 switch(IoStack->MajorFunction)
68 {
69 case IRP_MJ_READ:
70 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
71 {
72 /* we are not the first in the list, remove an complete us */
73 RemoveEntryList(&Context->ListEntry);
74 Complete = TRUE;
75 }
76 else
77 {
78 KeSetEvent(&Ccb->ReadEvent, IO_NO_INCREMENT, FALSE);
79 }
80 break;
81 default:
82 ASSERT(FALSE);
83 }
84 ExReleaseFastMutex(&Ccb->DataListLock);
85 KeUnlockMutex(&DeviceExt->PipeListLock);
86 if (Complete)
87 {
88 Irp->IoStatus.Status = STATUS_CANCELLED;
89 Irp->IoStatus.Information = 0;
90 IoCompleteRequest(Irp, IO_NO_INCREMENT);
91 }
92 }
93
94 static VOID NTAPI
95 NpfsWaiterThread(PVOID InitContext)
96 {
97 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
98 ULONG CurrentCount;
99 ULONG Count = 0;
100 PIRP Irp = NULL;
101 PIRP NextIrp;
102 NTSTATUS Status;
103 BOOLEAN Terminate = FALSE;
104 BOOLEAN Cancel = FALSE;
105 PIO_STACK_LOCATION IoStack = NULL;
106 PNPFS_CONTEXT Context;
107 PNPFS_CONTEXT NextContext;
108 PNPFS_CCB Ccb;
109
110 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
111
112 while (1)
113 {
114 CurrentCount = ThreadContext->Count;
115 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
116 if (Irp)
117 {
118 if (Cancel)
119 {
120 Irp->IoStatus.Status = STATUS_CANCELLED;
121 Irp->IoStatus.Information = 0;
122 IoCompleteRequest(Irp, IO_NO_INCREMENT);
123 }
124 else
125 {
126 switch (IoStack->MajorFunction)
127 {
128 case IRP_MJ_READ:
129 NpfsRead(IoStack->DeviceObject, Irp);
130 break;
131 default:
132 ASSERT(FALSE);
133 }
134 }
135 }
136 if (Terminate)
137 {
138 break;
139 }
140 Status = KeWaitForMultipleObjects(CurrentCount,
141 ThreadContext->WaitObjectArray,
142 WaitAny,
143 Executive,
144 KernelMode,
145 FALSE,
146 NULL,
147 ThreadContext->WaitBlockArray);
148 if (!NT_SUCCESS(Status))
149 {
150 ASSERT(FALSE);
151 }
152 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
153 Count = Status - STATUS_SUCCESS;
154 ASSERT (Count < CurrentCount);
155 if (Count > 0)
156 {
157 Irp = ThreadContext->WaitIrpArray[Count];
158 ThreadContext->Count--;
159 ThreadContext->DeviceExt->EmptyWaiterCount++;
160 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
161 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
162
163 Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
164 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
165 IoStack = IoGetCurrentIrpStackLocation(Irp);
166
167 if (Cancel)
168 {
169 Ccb = IoStack->FileObject->FsContext2;
170 ExAcquireFastMutex(&Ccb->DataListLock);
171 RemoveEntryList(&Context->ListEntry);
172 switch (IoStack->MajorFunction)
173 {
174 case IRP_MJ_READ:
175 if (!IsListEmpty(&Ccb->ReadRequestListHead))
176 {
177 /* put the next request on the wait list */
178 NextContext = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
179 ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
180 NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
181 ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
182 ThreadContext->Count++;
183 ThreadContext->DeviceExt->EmptyWaiterCount--;
184 }
185 break;
186 default:
187 ASSERT(FALSE);
188 }
189 ExReleaseFastMutex(&Ccb->DataListLock);
190 }
191 }
192 else
193 {
194 /* someone has add a new wait request */
195 Irp = NULL;
196 }
197 if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
198 {
199 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
200 RemoveEntryList(&ThreadContext->ListEntry);
201 ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
202 Terminate = TRUE;
203 }
204 }
205 ExFreePool(ThreadContext);
206 }
207
208 static NTSTATUS
209 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
210 IN PIRP Irp)
211 {
212 PLIST_ENTRY ListEntry;
213 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
214 NTSTATUS Status;
215 HANDLE hThread;
216 KIRQL oldIrql;
217
218 PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
219 PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
220
221 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
222
223 KeLockMutex(&DeviceExt->PipeListLock);
224
225 ListEntry = DeviceExt->ThreadListHead.Flink;
226 while (ListEntry != &DeviceExt->ThreadListHead)
227 {
228 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
229 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
230 {
231 break;
232 }
233 ListEntry = ListEntry->Flink;
234 }
235 if (ListEntry == &DeviceExt->ThreadListHead)
236 {
237 ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
238 if (ThreadContext == NULL)
239 {
240 KeUnlockMutex(&DeviceExt->PipeListLock);
241 return STATUS_NO_MEMORY;
242 }
243 ThreadContext->DeviceExt = DeviceExt;
244 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
245 ThreadContext->Count = 1;
246 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
247
248
249 DPRINT("Creating a new system thread for waiting read/write requests\n");
250
251 Status = PsCreateSystemThread(&hThread,
252 THREAD_ALL_ACCESS,
253 NULL,
254 NULL,
255 NULL,
256 NpfsWaiterThread,
257 (PVOID)ThreadContext);
258 if (!NT_SUCCESS(Status))
259 {
260 ExFreePool(ThreadContext);
261 KeUnlockMutex(&DeviceExt->PipeListLock);
262 return Status;
263 }
264 InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
265 DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
266 }
267 IoMarkIrpPending(Irp);
268
269 IoAcquireCancelSpinLock(&oldIrql);
270 if (Irp->Cancel)
271 {
272 IoReleaseCancelSpinLock(oldIrql);
273 Status = STATUS_CANCELLED;
274 }
275 else
276 {
277 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
278 IoReleaseCancelSpinLock(oldIrql);
279 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
280 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
281 ThreadContext->Count++;
282 DeviceExt->EmptyWaiterCount--;
283 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
284 Status = STATUS_SUCCESS;
285 }
286 KeUnlockMutex(&DeviceExt->PipeListLock);
287 return Status;
288 }
289
290 NTSTATUS NTAPI
291 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
292 IN PIRP Irp)
293 {
294 PFILE_OBJECT FileObject;
295 NTSTATUS Status;
296 NTSTATUS OriginalStatus = STATUS_SUCCESS;
297 PNPFS_CCB Ccb;
298 PNPFS_CONTEXT Context;
299 KEVENT Event;
300 ULONG Length;
301 ULONG Information = 0;
302 ULONG CopyLength = 0;
303 ULONG TempLength;
304 BOOLEAN IsOriginalRequest = TRUE;
305 PVOID Buffer;
306
307 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
308
309 if (Irp->MdlAddress == NULL)
310 {
311 DPRINT("Irp->MdlAddress == NULL\n");
312 Status = STATUS_UNSUCCESSFUL;
313 Irp->IoStatus.Information = 0;
314 goto done;
315 }
316
317 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
318 DPRINT("FileObject %p\n", FileObject);
319 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
320 Ccb = FileObject->FsContext2;
321 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
322
323 if ((Ccb->OtherSide) && (Ccb->OtherSide->PipeState == FILE_PIPE_DISCONNECTED_STATE) && (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE))
324 {
325 DPRINT("Both Client and Server are disconnected!\n");
326 Status = STATUS_PIPE_DISCONNECTED;
327 Irp->IoStatus.Information = 0;
328 goto done;
329
330 }
331
332 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
333 {
334 /* Its ok if the other side has been Disconnect, but if we have data still in the buffer
335 , need to still be able to read it. Currently this is a HAXXXX */
336 DPRINT("Pipe no longer connected and no data exist in buffer. Ok to close pipe\n");
337 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
338 Status = STATUS_PIPE_LISTENING;
339 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
340 Status = STATUS_PIPE_DISCONNECTED;
341 else
342 Status = STATUS_UNSUCCESSFUL;
343 Irp->IoStatus.Information = 0;
344 goto done;
345 }
346
347 if (Ccb->Data == NULL)
348 {
349 DPRINT("Pipe is NOT readable!\n");
350 Status = STATUS_UNSUCCESSFUL;
351 Irp->IoStatus.Information = 0;
352 goto done;
353 }
354
355 ExAcquireFastMutex(&Ccb->DataListLock);
356
357 if (IoIsOperationSynchronous(Irp))
358 {
359 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
360 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
361 {
362 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
363 Context->WaitEvent = &Event;
364 ExReleaseFastMutex(&Ccb->DataListLock);
365 Status = KeWaitForSingleObject(&Event,
366 Executive,
367 Irp->RequestorMode,
368 FALSE,
369 NULL);
370 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
371 {
372 Status = STATUS_CANCELLED;
373 goto done;
374 }
375 if (!NT_SUCCESS(Status))
376 {
377 ASSERT(FALSE);
378 }
379 ExAcquireFastMutex(&Ccb->DataListLock);
380 }
381 Irp->IoStatus.Information = 0;
382 }
383 else
384 {
385 KIRQL oldIrql;
386 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
387 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
388 {
389 /* this is a new request */
390 Irp->IoStatus.Information = 0;
391 Context->WaitEvent = &Ccb->ReadEvent;
392 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
393 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
394 {
395 /* there was already a request on the list */
396 IoAcquireCancelSpinLock(&oldIrql);
397 if (Irp->Cancel)
398 {
399 IoReleaseCancelSpinLock(oldIrql);
400 RemoveEntryList(&Context->ListEntry);
401 ExReleaseFastMutex(&Ccb->DataListLock);
402 Status = STATUS_CANCELLED;
403 goto done;
404 }
405 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
406 IoReleaseCancelSpinLock(oldIrql);
407 ExReleaseFastMutex(&Ccb->DataListLock);
408 IoMarkIrpPending(Irp);
409 Status = STATUS_PENDING;
410 goto done;
411 }
412 }
413 }
414
415 while (1)
416 {
417 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
418 Information = Irp->IoStatus.Information;
419 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
420 ASSERT (Information <= Length);
421 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
422 Length -= Information;
423 Status = STATUS_SUCCESS;
424
425 while (1)
426 {
427 if (Ccb->ReadDataAvailable == 0)
428 {
429 ULONG ConnectionSideReadMode;
430
431 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
432 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
433
434 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
435 {
436 ASSERT(Ccb->OtherSide != NULL);
437 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
438 }
439 if (Information > 0 &&
440 (ConnectionSideReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
441 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
442 {
443 break;
444 }
445 if ((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) && (Ccb->ReadDataAvailable == 0))
446 {
447 DPRINT("PipeState: %x\n", Ccb->PipeState);
448 Status = STATUS_PIPE_BROKEN;
449 break;
450 }
451 ExReleaseFastMutex(&Ccb->DataListLock);
452
453 if (IoIsOperationSynchronous(Irp))
454 {
455 /* Wait for ReadEvent to become signaled */
456
457 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
458 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
459 UserRequest,
460 Irp->RequestorMode,
461 FALSE,
462 NULL);
463 DPRINT("Finished waiting (%wZ)! Status: %x\n", &Ccb->Fcb->PipeName, Status);
464
465 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
466 {
467 Status = STATUS_CANCELLED;
468 break;
469 }
470 if (!NT_SUCCESS(Status))
471 {
472 ASSERT(FALSE);
473 }
474 ExAcquireFastMutex(&Ccb->DataListLock);
475 }
476 else
477 {
478 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
479
480 Context->WaitEvent = &Ccb->ReadEvent;
481 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
482
483 if (NT_SUCCESS(Status))
484 {
485 Status = STATUS_PENDING;
486 goto done;
487 }
488 ExAcquireFastMutex(&Ccb->DataListLock);
489 break;
490 }
491 }
492 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
493
494 /* If the pipe type and read mode are both byte stream */
495 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
496 {
497 DPRINT("Byte stream mode: Ccb->Data %x\n", Ccb->Data);
498 /* Byte stream mode */
499 while (Length > 0 && Ccb->ReadDataAvailable > 0)
500 {
501 CopyLength = min(Ccb->ReadDataAvailable, Length);
502 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
503 {
504 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
505 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
506 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
507 {
508 Ccb->ReadPtr = Ccb->Data;
509 }
510 }
511 else
512 {
513 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
514 memcpy(Buffer, Ccb->ReadPtr, TempLength);
515 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
516 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
517 }
518
519 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
520 Length -= CopyLength;
521 Information += CopyLength;
522
523 Ccb->ReadDataAvailable -= CopyLength;
524 Ccb->WriteQuotaAvailable += CopyLength;
525 }
526
527 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
528 {
529 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
530 {
531 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
532 }
533 KeResetEvent(&Ccb->ReadEvent);
534 break;
535 }
536 }
537 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
538 {
539 DPRINT("Message mode: Ccb>Data %x\n", Ccb->Data);
540
541 /* Check if buffer is full and the read pointer is not at the start of the buffer */
542 if ((Ccb->WriteQuotaAvailable == 0) && (Ccb->ReadPtr > Ccb->Data))
543 {
544 Ccb->WriteQuotaAvailable += (ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data;
545 memcpy(Ccb->Data, Ccb->ReadPtr, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->ReadPtr);
546 Ccb->WritePtr = (PVOID)((ULONG_PTR)Ccb->WritePtr - ((ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data));
547 Ccb->ReadPtr = Ccb->Data;
548 }
549
550 /* For Message mode, the Message length is stored in the buffer preceeding the Message. */
551 if (Ccb->ReadDataAvailable)
552 {
553 ULONG NextMessageLength = 0;
554
555 /*First get the size of the message */
556 memcpy(&NextMessageLength, Ccb->ReadPtr, sizeof(NextMessageLength));
557
558 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
559 {
560 DPRINT1("Possible memory corruption.\n");
561 HexDump(Ccb->Data, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->Data);
562 ASSERT(FALSE);
563 }
564
565 /* Use the smaller value */
566 CopyLength = min(NextMessageLength, Length);
567 ASSERT(CopyLength > 0);
568 ASSERT(CopyLength <= Ccb->ReadDataAvailable);
569 /* retrieve the message from the buffer */
570 memcpy(Buffer, (PVOID)((ULONG_PTR)Ccb->ReadPtr + sizeof(NextMessageLength)), CopyLength);
571
572 if (Ccb->ReadDataAvailable > CopyLength)
573 {
574 if (CopyLength < NextMessageLength)
575 /* Client only requested part of the message */
576 {
577 /* Calculate the remaining message new size */
578 ULONG NewMessageSize = NextMessageLength-CopyLength;
579
580 /* Update ReadPtr to point to new Message size location */
581 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
582
583 /* Write a new Message size to buffer for the part of the message still there */
584 memcpy(Ccb->ReadPtr, &NewMessageSize, sizeof(NewMessageSize));
585 }
586 else
587 /* Client wanted the entire message */
588 {
589 /* Update ReadPtr to point to next message size */
590 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength + sizeof(CopyLength));
591 }
592 }
593 else
594 {
595 /* This was the last Message, so just zero start of buffer for safety sake */
596 memset(Ccb->Data, 0, NextMessageLength + sizeof(NextMessageLength));
597
598 /* Reset to MaxDataLength as partial message retrievals dont
599 give the length back to Quota */
600 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
601
602 /* reset read and write pointer to beginning of buffer */
603 Ccb->WritePtr = Ccb->Data;
604 Ccb->ReadPtr = Ccb->Data;
605 }
606 #ifndef NDEBUG
607 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
608 HexDump((PUCHAR)Buffer, CopyLength);
609 #endif
610
611 Information += CopyLength;
612
613 Ccb->ReadDataAvailable -= CopyLength;
614
615 if ((ULONG)Ccb->WriteQuotaAvailable > (ULONG)Ccb->MaxDataLength) ASSERT(FALSE);
616 }
617
618 if (Information > 0)
619 {
620 ULONG ConnectionSideReadMode;
621
622 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
623 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
624
625 if ((ConnectionSideReadMode == FILE_PIPE_BYTE_STREAM_MODE) && (Ccb->ReadDataAvailable) && (Length > CopyLength))
626 {
627 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
628 Length -= CopyLength;
629 }
630 else
631 {
632 KeResetEvent(&Ccb->ReadEvent);
633
634 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->WriteQuotaAvailable > 0))
635 {
636 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
637 }
638 break;
639 }
640 }
641 }
642 else
643 {
644 DPRINT1("Unhandled Pipe Mode!\n");
645 ASSERT(FALSE);
646 }
647 }
648 Irp->IoStatus.Information = Information;
649 Irp->IoStatus.Status = Status;
650
651 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
652
653 if (Status == STATUS_CANCELLED)
654 goto done;
655
656 if (IoIsOperationSynchronous(Irp))
657 {
658 RemoveEntryList(&Context->ListEntry);
659 if (!IsListEmpty(&Ccb->ReadRequestListHead))
660 {
661 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
662 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
663 }
664 ExReleaseFastMutex(&Ccb->DataListLock);
665 IoCompleteRequest(Irp, IO_NO_INCREMENT);
666
667 DPRINT("NpfsRead done (Status %lx)\n", Status);
668 return Status;
669 }
670 else
671 {
672 KIRQL oldIrql;
673
674 if (IsOriginalRequest)
675 {
676 IsOriginalRequest = FALSE;
677 OriginalStatus = Status;
678 }
679 if (Status == STATUS_PENDING)
680 {
681 ExReleaseFastMutex(&Ccb->DataListLock);
682 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
683 return OriginalStatus;
684 }
685 RemoveEntryList(&Context->ListEntry);
686 IoCompleteRequest(Irp, IO_NO_INCREMENT);
687 if (IsListEmpty(&Ccb->ReadRequestListHead))
688 {
689 ExReleaseFastMutex(&Ccb->DataListLock);
690 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
691 return OriginalStatus;
692 }
693
694 IoAcquireCancelSpinLock(&oldIrql);
695 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
696
697 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
698 /* Verify the Irp wasnt cancelled */
699 if (Irp->Cancel)
700 {
701 IoReleaseCancelSpinLock(oldIrql);
702 RemoveEntryList(&Context->ListEntry);
703 ExReleaseFastMutex(&Ccb->DataListLock);
704 Status = STATUS_CANCELLED;
705 goto done;
706 }
707 /* The Irp will now be handled, so remove the CancelRoutine */
708 (void)IoSetCancelRoutine(Irp, NULL);
709 IoReleaseCancelSpinLock(oldIrql);
710 }
711 }
712
713 done:
714 Irp->IoStatus.Status = Status;
715
716 if (Status != STATUS_PENDING)
717 {
718 IoCompleteRequest(Irp, IO_NO_INCREMENT);
719 }
720 DPRINT("NpfsRead done (Status %lx)\n", Status);
721
722 return Status;
723 }
724
725 NTSTATUS NTAPI
726 NpfsWrite(PDEVICE_OBJECT DeviceObject,
727 PIRP Irp)
728 {
729 PIO_STACK_LOCATION IoStack;
730 PFILE_OBJECT FileObject;
731 PNPFS_FCB Fcb = NULL;
732 PNPFS_CCB Ccb = NULL;
733 PNPFS_CCB ReaderCcb;
734 PUCHAR Buffer;
735 NTSTATUS Status = STATUS_SUCCESS;
736 ULONG Length;
737 ULONG Offset;
738 ULONG Information;
739 ULONG CopyLength;
740 ULONG TempLength;
741
742 DPRINT("NpfsWrite()\n");
743
744 IoStack = IoGetCurrentIrpStackLocation(Irp);
745 FileObject = IoStack->FileObject;
746 DPRINT("FileObject %p\n", FileObject);
747 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
748
749 Ccb = FileObject->FsContext2;
750 ReaderCcb = Ccb->OtherSide;
751 Fcb = Ccb->Fcb;
752
753 Length = IoStack->Parameters.Write.Length;
754 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
755 Information = 0;
756
757 if (Irp->MdlAddress == NULL)
758 {
759 DPRINT("Irp->MdlAddress == NULL\n");
760 Status = STATUS_UNSUCCESSFUL;
761 Length = 0;
762 goto done;
763 }
764
765 if ((ReaderCcb == NULL) || (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
766 {
767 DPRINT("Pipe is NOT connected!\n");
768 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
769 Status = STATUS_PIPE_LISTENING;
770 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
771 Status = STATUS_PIPE_DISCONNECTED;
772 else
773 Status = STATUS_UNSUCCESSFUL;
774 Length = 0;
775 goto done;
776 }
777
778 if (ReaderCcb->Data == NULL)
779 {
780 DPRINT("Pipe is NOT writable!\n");
781 Status = STATUS_UNSUCCESSFUL;
782 Length = 0;
783 goto done;
784 }
785
786 Status = STATUS_SUCCESS;
787 Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
788
789 ExAcquireFastMutex(&ReaderCcb->DataListLock);
790
791 #ifndef NDEBUG
792 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
793 HexDump(Buffer, Length);
794 #endif
795
796 while(1)
797 {
798 if ((ReaderCcb->WriteQuotaAvailable == 0))
799 {
800 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
801 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
802 {
803 Status = STATUS_PIPE_BROKEN;
804 ExReleaseFastMutex(&ReaderCcb->DataListLock);
805 goto done;
806 }
807 ExReleaseFastMutex(&ReaderCcb->DataListLock);
808
809 DPRINT("Write Waiting for buffer space (%S)\n", Fcb->PipeName.Buffer);
810 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
811 UserRequest,
812 Irp->RequestorMode,
813 FALSE,
814 NULL);
815 DPRINT("Write Finished waiting (%S)! Status: %x\n", Fcb->PipeName.Buffer, Status);
816
817 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
818 {
819 Status = STATUS_CANCELLED;
820 break;
821 }
822 if (!NT_SUCCESS(Status))
823 {
824 ASSERT(FALSE);
825 }
826 /*
827 * It's possible that the event was signaled because the
828 * other side of pipe was closed.
829 */
830 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
831 {
832 DPRINT("PipeState: %x\n", Ccb->PipeState);
833 Status = STATUS_PIPE_BROKEN;
834 goto done;
835 }
836 ExAcquireFastMutex(&ReaderCcb->DataListLock);
837 }
838
839 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
840 {
841 DPRINT("Byte stream mode: Ccb->Data %x, Ccb->WritePtr %x\n", ReaderCcb->Data, ReaderCcb->WritePtr);
842
843 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
844 {
845 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
846
847 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
848 {
849 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
850 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
851 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
852 {
853 ReaderCcb->WritePtr = ReaderCcb->Data;
854 }
855 }
856 else
857 {
858
859 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength -
860 (ULONG_PTR)ReaderCcb->WritePtr);
861
862 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
863 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
864 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
865 }
866
867 Buffer += CopyLength;
868 Length -= CopyLength;
869 Information += CopyLength;
870
871 ReaderCcb->ReadDataAvailable += CopyLength;
872 ReaderCcb->WriteQuotaAvailable -= CopyLength;
873 }
874
875 if (Length == 0)
876 {
877 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
878 KeResetEvent(&Ccb->WriteEvent);
879 break;
880 }
881 }
882 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
883 {
884 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
885 DPRINT("Message mode: Ccb->Data %x, Ccb->WritePtr %x\n",ReaderCcb->Data, ReaderCcb->WritePtr);
886 if (Length > 0)
887 {
888 /* Verify the WritePtr is still inside the buffer */
889 if (((ULONG_PTR)ReaderCcb->WritePtr > ((ULONG_PTR)ReaderCcb->Data + (ULONG_PTR)ReaderCcb->MaxDataLength)) ||
890 ((ULONG_PTR)ReaderCcb->WritePtr < (ULONG_PTR)ReaderCcb->Data))
891 {
892 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
893 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength %lu\n",
894 ReaderCcb->WritePtr, ReaderCcb->Data, ReaderCcb->MaxDataLength);
895 ASSERT(FALSE);
896 }
897
898 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
899 if (CopyLength > ReaderCcb->WriteQuotaAvailable)
900 {
901 DPRINT1("Writing %lu byte to pipe would overflow as only %lu bytes are available\n",
902 CopyLength, ReaderCcb->WriteQuotaAvailable);
903 ASSERT(FALSE);
904 }
905
906 /* First Copy the Length of the message into the pipes buffer */
907 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
908
909 /* Now the user buffer itself */
910 memcpy((PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength)), Buffer, CopyLength);
911
912 /* Update the write pointer */
913 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
914
915 Information += CopyLength;
916
917 ReaderCcb->ReadDataAvailable += CopyLength;
918
919 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
920
921 if ((ULONG_PTR)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
922 {
923 DPRINT1("QuotaAvailable is greater than buffer size!\n");
924 ASSERT(FALSE);
925 }
926 }
927
928 if (Information > 0)
929 {
930 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
931 KeResetEvent(&Ccb->WriteEvent);
932 break;
933 }
934 }
935 else
936 {
937 DPRINT1("Unhandled Pipe Type Mode and Read Write Mode!\n");
938 ASSERT(FALSE);
939 }
940 }
941
942 ExReleaseFastMutex(&ReaderCcb->DataListLock);
943
944 done:
945 Irp->IoStatus.Status = Status;
946 Irp->IoStatus.Information = Information;
947
948 IoCompleteRequest(Irp, IO_NO_INCREMENT);
949
950 DPRINT("NpfsWrite done (Status %lx)\n", Status);
951
952 return Status;
953 }
954
955 /* EOF */