merge 37282 from amd64-branch:
[reactos.git] / reactos / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 int i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44
45 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
46 static VOID NTAPI
47 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
48 IN PIRP Irp)
49 {
50 PNPFS_CONTEXT Context;
51 PNPFS_DEVICE_EXTENSION DeviceExt;
52 PIO_STACK_LOCATION IoStack;
53 PNPFS_CCB Ccb;
54 BOOLEAN Complete = FALSE;
55
56 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
57
58 IoReleaseCancelSpinLock(Irp->CancelIrql);
59
60 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
61 DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
62 IoStack = IoGetCurrentIrpStackLocation(Irp);
63 Ccb = IoStack->FileObject->FsContext2;
64
65 KeLockMutex(&DeviceExt->PipeListLock);
66 ExAcquireFastMutex(&Ccb->DataListLock);
67 switch(IoStack->MajorFunction)
68 {
69 case IRP_MJ_READ:
70 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
71 {
72 /* we are not the first in the list, remove an complete us */
73 RemoveEntryList(&Context->ListEntry);
74 Complete = TRUE;
75 }
76 else
77 {
78 KeSetEvent(&Ccb->ReadEvent, IO_NO_INCREMENT, FALSE);
79 }
80 break;
81 default:
82 ASSERT(FALSE);
83 }
84 ExReleaseFastMutex(&Ccb->DataListLock);
85 KeUnlockMutex(&DeviceExt->PipeListLock);
86 if (Complete)
87 {
88 Irp->IoStatus.Status = STATUS_CANCELLED;
89 Irp->IoStatus.Information = 0;
90 IoCompleteRequest(Irp, IO_NO_INCREMENT);
91 }
92 }
93
94 static VOID NTAPI
95 NpfsWaiterThread(PVOID InitContext)
96 {
97 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
98 ULONG CurrentCount;
99 ULONG Count = 0;
100 PIRP Irp = NULL;
101 PIRP NextIrp;
102 NTSTATUS Status;
103 BOOLEAN Terminate = FALSE;
104 BOOLEAN Cancel = FALSE;
105 PIO_STACK_LOCATION IoStack = NULL;
106 PNPFS_CONTEXT Context;
107 PNPFS_CONTEXT NextContext;
108 PNPFS_CCB Ccb;
109
110 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
111
112 while (1)
113 {
114 CurrentCount = ThreadContext->Count;
115 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
116 if (Irp)
117 {
118 if (Cancel)
119 {
120 Irp->IoStatus.Status = STATUS_CANCELLED;
121 Irp->IoStatus.Information = 0;
122 IoCompleteRequest(Irp, IO_NO_INCREMENT);
123 }
124 else
125 {
126 switch (IoStack->MajorFunction)
127 {
128 case IRP_MJ_READ:
129 NpfsRead(IoStack->DeviceObject, Irp);
130 break;
131 default:
132 ASSERT(FALSE);
133 }
134 }
135 }
136 if (Terminate)
137 {
138 break;
139 }
140 Status = KeWaitForMultipleObjects(CurrentCount,
141 ThreadContext->WaitObjectArray,
142 WaitAny,
143 Executive,
144 KernelMode,
145 FALSE,
146 NULL,
147 ThreadContext->WaitBlockArray);
148 if (!NT_SUCCESS(Status))
149 {
150 ASSERT(FALSE);
151 }
152 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
153 Count = Status - STATUS_SUCCESS;
154 ASSERT (Count < CurrentCount);
155 if (Count > 0)
156 {
157 Irp = ThreadContext->WaitIrpArray[Count];
158 ThreadContext->Count--;
159 ThreadContext->DeviceExt->EmptyWaiterCount++;
160 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
161 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
162
163 Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
164 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
165 IoStack = IoGetCurrentIrpStackLocation(Irp);
166
167 if (Cancel)
168 {
169 Ccb = IoStack->FileObject->FsContext2;
170 ExAcquireFastMutex(&Ccb->DataListLock);
171 RemoveEntryList(&Context->ListEntry);
172 switch (IoStack->MajorFunction)
173 {
174 case IRP_MJ_READ:
175 if (!IsListEmpty(&Ccb->ReadRequestListHead))
176 {
177 /* put the next request on the wait list */
178 NextContext = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
179 ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
180 NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
181 ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
182 ThreadContext->Count++;
183 ThreadContext->DeviceExt->EmptyWaiterCount--;
184 }
185 break;
186 default:
187 ASSERT(FALSE);
188 }
189 ExReleaseFastMutex(&Ccb->DataListLock);
190 }
191 }
192 else
193 {
194 /* someone has add a new wait request */
195 Irp = NULL;
196 }
197 if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
198 {
199 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
200 RemoveEntryList(&ThreadContext->ListEntry);
201 ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
202 Terminate = TRUE;
203 }
204 }
205 ExFreePool(ThreadContext);
206 }
207
208 static NTSTATUS
209 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
210 IN PIRP Irp)
211 {
212 PLIST_ENTRY ListEntry;
213 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
214 NTSTATUS Status;
215 HANDLE hThread;
216 KIRQL oldIrql;
217
218 PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
219 PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
220
221 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
222
223 KeLockMutex(&DeviceExt->PipeListLock);
224
225 ListEntry = DeviceExt->ThreadListHead.Flink;
226 while (ListEntry != &DeviceExt->ThreadListHead)
227 {
228 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
229 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
230 {
231 break;
232 }
233 ListEntry = ListEntry->Flink;
234 }
235 if (ListEntry == &DeviceExt->ThreadListHead)
236 {
237 ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
238 if (ThreadContext == NULL)
239 {
240 KeUnlockMutex(&DeviceExt->PipeListLock);
241 return STATUS_NO_MEMORY;
242 }
243 ThreadContext->DeviceExt = DeviceExt;
244 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
245 ThreadContext->Count = 1;
246 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
247
248
249 DPRINT("Creating a new system thread for waiting read/write requests\n");
250
251 Status = PsCreateSystemThread(&hThread,
252 THREAD_ALL_ACCESS,
253 NULL,
254 NULL,
255 NULL,
256 NpfsWaiterThread,
257 (PVOID)ThreadContext);
258 if (!NT_SUCCESS(Status))
259 {
260 ExFreePool(ThreadContext);
261 KeUnlockMutex(&DeviceExt->PipeListLock);
262 return Status;
263 }
264 InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
265 DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
266 }
267 IoMarkIrpPending(Irp);
268
269 IoAcquireCancelSpinLock(&oldIrql);
270 if (Irp->Cancel)
271 {
272 IoReleaseCancelSpinLock(oldIrql);
273 Status = STATUS_CANCELLED;
274 }
275 else
276 {
277 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
278 IoReleaseCancelSpinLock(oldIrql);
279 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
280 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
281 ThreadContext->Count++;
282 DeviceExt->EmptyWaiterCount--;
283 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
284 Status = STATUS_SUCCESS;
285 }
286 KeUnlockMutex(&DeviceExt->PipeListLock);
287 return Status;
288 }
289
290 NTSTATUS NTAPI
291 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
292 IN PIRP Irp)
293 {
294 PFILE_OBJECT FileObject;
295 NTSTATUS Status;
296 NTSTATUS OriginalStatus = STATUS_SUCCESS;
297 PNPFS_CCB Ccb;
298 PNPFS_CONTEXT Context;
299 KEVENT Event;
300 ULONG Length;
301 ULONG Information = 0;
302 ULONG CopyLength = 0;
303 ULONG TempLength;
304 BOOLEAN IsOriginalRequest = TRUE;
305 PVOID Buffer;
306
307 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
308
309 if (Irp->MdlAddress == NULL)
310 {
311 DPRINT("Irp->MdlAddress == NULL\n");
312 Status = STATUS_UNSUCCESSFUL;
313 Irp->IoStatus.Information = 0;
314 goto done;
315 }
316
317 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
318 DPRINT("FileObject %p\n", FileObject);
319 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
320 Ccb = FileObject->FsContext2;
321 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
322
323 if ((Ccb->OtherSide) && (Ccb->OtherSide->PipeState == FILE_PIPE_DISCONNECTED_STATE) && (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE))
324 {
325 DPRINT("Both Client and Server are disconnected!\n");
326 Status = STATUS_PIPE_DISCONNECTED;
327 Irp->IoStatus.Information = 0;
328 goto done;
329
330 }
331
332 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
333 {
334 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
335 {
336 DPRINT("File pipe broken\n");
337 Status = STATUS_PIPE_BROKEN;
338 }
339 else if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
340 Status = STATUS_PIPE_LISTENING;
341 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
342 Status = STATUS_PIPE_DISCONNECTED;
343 else
344 Status = STATUS_UNSUCCESSFUL;
345 Irp->IoStatus.Information = 0;
346 goto done;
347 }
348
349 if (Ccb->Data == NULL)
350 {
351 DPRINT("Pipe is NOT readable!\n");
352 Status = STATUS_UNSUCCESSFUL;
353 Irp->IoStatus.Information = 0;
354 goto done;
355 }
356
357 ExAcquireFastMutex(&Ccb->DataListLock);
358
359 if (IoIsOperationSynchronous(Irp))
360 {
361 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
362 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
363 {
364 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
365 Context->WaitEvent = &Event;
366 ExReleaseFastMutex(&Ccb->DataListLock);
367 Status = KeWaitForSingleObject(&Event,
368 Executive,
369 Irp->RequestorMode,
370 FALSE,
371 NULL);
372 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
373 {
374 Status = STATUS_CANCELLED;
375 goto done;
376 }
377 if (!NT_SUCCESS(Status))
378 {
379 ASSERT(FALSE);
380 }
381 ExAcquireFastMutex(&Ccb->DataListLock);
382 }
383 Irp->IoStatus.Information = 0;
384 }
385 else
386 {
387 KIRQL oldIrql;
388 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
389 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
390 {
391 /* this is a new request */
392 Irp->IoStatus.Information = 0;
393 Context->WaitEvent = &Ccb->ReadEvent;
394 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
395 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
396 {
397 /* there was already a request on the list */
398 IoAcquireCancelSpinLock(&oldIrql);
399 if (Irp->Cancel)
400 {
401 IoReleaseCancelSpinLock(oldIrql);
402 RemoveEntryList(&Context->ListEntry);
403 ExReleaseFastMutex(&Ccb->DataListLock);
404 Status = STATUS_CANCELLED;
405 goto done;
406 }
407 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
408 IoReleaseCancelSpinLock(oldIrql);
409 ExReleaseFastMutex(&Ccb->DataListLock);
410 IoMarkIrpPending(Irp);
411 Status = STATUS_PENDING;
412 goto done;
413 }
414 }
415 }
416
417 while (1)
418 {
419 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
420 Information = Irp->IoStatus.Information;
421 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
422 ASSERT (Information <= Length);
423 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
424 Length -= Information;
425 Status = STATUS_SUCCESS;
426
427 while (1)
428 {
429 if (Ccb->ReadDataAvailable == 0)
430 {
431 ULONG ConnectionSideReadMode;
432
433 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
434 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
435
436 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
437 {
438 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
439 }
440 if (Information > 0 &&
441 (ConnectionSideReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
442 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
443 {
444 break;
445 }
446 if ((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) && (Ccb->ReadDataAvailable == 0))
447 {
448 DPRINT("PipeState: %x\n", Ccb->PipeState);
449 Status = STATUS_PIPE_BROKEN;
450 break;
451 }
452 ExReleaseFastMutex(&Ccb->DataListLock);
453
454 if (IoIsOperationSynchronous(Irp))
455 {
456 /* Wait for ReadEvent to become signaled */
457
458 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
459 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
460 UserRequest,
461 Irp->RequestorMode,
462 FALSE,
463 NULL);
464 DPRINT("Finished waiting (%wZ)! Status: %x\n", &Ccb->Fcb->PipeName, Status);
465
466 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
467 {
468 Status = STATUS_CANCELLED;
469 break;
470 }
471 if (!NT_SUCCESS(Status))
472 {
473 ASSERT(FALSE);
474 }
475 ExAcquireFastMutex(&Ccb->DataListLock);
476 }
477 else
478 {
479 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
480
481 Context->WaitEvent = &Ccb->ReadEvent;
482 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
483
484 if (NT_SUCCESS(Status))
485 {
486 Status = STATUS_PENDING;
487 goto done;
488 }
489 ExAcquireFastMutex(&Ccb->DataListLock);
490 break;
491 }
492 }
493 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
494
495 /* If the pipe type and read mode are both byte stream */
496 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
497 {
498 DPRINT("Byte stream mode: Ccb->Data %x\n", Ccb->Data);
499 /* Byte stream mode */
500 while (Length > 0 && Ccb->ReadDataAvailable > 0)
501 {
502 CopyLength = min(Ccb->ReadDataAvailable, Length);
503 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
504 {
505 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
506 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
507 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
508 {
509 Ccb->ReadPtr = Ccb->Data;
510 }
511 }
512 else
513 {
514 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
515 memcpy(Buffer, Ccb->ReadPtr, TempLength);
516 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
517 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
518 }
519
520 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
521 Length -= CopyLength;
522 Information += CopyLength;
523
524 Ccb->ReadDataAvailable -= CopyLength;
525 Ccb->WriteQuotaAvailable += CopyLength;
526 }
527
528 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
529 {
530 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
531 {
532 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
533 }
534 KeResetEvent(&Ccb->ReadEvent);
535 break;
536 }
537 }
538 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
539 {
540 DPRINT("Message mode: Ccb>Data %x\n", Ccb->Data);
541
542 /* Check if buffer is full and the read pointer is not at the start of the buffer */
543 if ((Ccb->WriteQuotaAvailable == 0) && (Ccb->ReadPtr > Ccb->Data))
544 {
545 Ccb->WriteQuotaAvailable += (ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data;
546 memcpy(Ccb->Data, Ccb->ReadPtr, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->ReadPtr);
547 Ccb->WritePtr = (PVOID)((ULONG_PTR)Ccb->WritePtr - ((ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data));
548 Ccb->ReadPtr = Ccb->Data;
549 ASSERT((ULONG_PTR)Ccb->WritePtr < ((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength));
550 ASSERT(Ccb->WritePtr >= Ccb->Data);
551 }
552
553 /* For Message mode, the Message length is stored in the buffer preceeding the Message. */
554 if (Ccb->ReadDataAvailable)
555 {
556 ULONG NextMessageLength = 0;
557
558 /*First get the size of the message */
559 memcpy(&NextMessageLength, Ccb->ReadPtr, sizeof(NextMessageLength));
560
561 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
562 {
563 DPRINT1("Possible memory corruption.\n");
564 HexDump(Ccb->Data, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->Data);
565 ASSERT(FALSE);
566 }
567
568 /* Use the smaller value */
569 CopyLength = min(NextMessageLength, Length);
570 ASSERT(CopyLength > 0);
571 ASSERT(CopyLength <= Ccb->ReadDataAvailable);
572 /* retrieve the message from the buffer */
573 memcpy(Buffer, (PVOID)((ULONG_PTR)Ccb->ReadPtr + sizeof(NextMessageLength)), CopyLength);
574
575 if (Ccb->ReadDataAvailable > CopyLength)
576 {
577 if (CopyLength < NextMessageLength)
578 /* Client only requested part of the message */
579 {
580 /* Calculate the remaining message new size */
581 ULONG NewMessageSize = NextMessageLength-CopyLength;
582
583 /* Update ReadPtr to point to new Message size location */
584 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
585
586 /* Write a new Message size to buffer for the part of the message still there */
587 memcpy(Ccb->ReadPtr, &NewMessageSize, sizeof(NewMessageSize));
588 }
589 else
590 /* Client wanted the entire message */
591 {
592 /* Update ReadPtr to point to next message size */
593 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength + sizeof(CopyLength));
594 }
595 }
596 else
597 {
598 /* This was the last Message, so just zero start of buffer for safety sake */
599 memset(Ccb->Data, 0, NextMessageLength + sizeof(NextMessageLength));
600
601 /* Reset to MaxDataLength as partial message retrievals dont
602 give the length back to Quota */
603 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
604
605 /* reset read and write pointer to beginning of buffer */
606 Ccb->WritePtr = Ccb->Data;
607 Ccb->ReadPtr = Ccb->Data;
608 }
609 #ifndef NDEBUG
610 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
611 HexDump((PUCHAR)Buffer, CopyLength);
612 #endif
613
614 Information += CopyLength;
615
616 Ccb->ReadDataAvailable -= CopyLength;
617
618 if ((ULONG)Ccb->WriteQuotaAvailable > (ULONG)Ccb->MaxDataLength) ASSERT(FALSE);
619 }
620
621 if (Information > 0)
622 {
623 ULONG ConnectionSideReadMode;
624
625 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
626 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
627
628 if ((ConnectionSideReadMode == FILE_PIPE_BYTE_STREAM_MODE) && (Ccb->ReadDataAvailable) && (Length > CopyLength))
629 {
630 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
631 Length -= CopyLength;
632 }
633 else
634 {
635 KeResetEvent(&Ccb->ReadEvent);
636
637 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->WriteQuotaAvailable > 0) && (Ccb->OtherSide))
638 {
639 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
640 }
641 break;
642 }
643 }
644 }
645 else
646 {
647 DPRINT1("Unhandled Pipe Mode!\n");
648 ASSERT(FALSE);
649 }
650 }
651 Irp->IoStatus.Information = Information;
652 Irp->IoStatus.Status = Status;
653
654 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
655
656 if (Status == STATUS_CANCELLED)
657 goto done;
658
659 if (IoIsOperationSynchronous(Irp))
660 {
661 RemoveEntryList(&Context->ListEntry);
662 if (!IsListEmpty(&Ccb->ReadRequestListHead))
663 {
664 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
665 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
666 }
667 ExReleaseFastMutex(&Ccb->DataListLock);
668 IoCompleteRequest(Irp, IO_NO_INCREMENT);
669
670 DPRINT("NpfsRead done (Status %lx)\n", Status);
671 return Status;
672 }
673 else
674 {
675 KIRQL oldIrql;
676
677 if (IsOriginalRequest)
678 {
679 IsOriginalRequest = FALSE;
680 OriginalStatus = Status;
681 }
682 if (Status == STATUS_PENDING)
683 {
684 ExReleaseFastMutex(&Ccb->DataListLock);
685 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
686 return OriginalStatus;
687 }
688 RemoveEntryList(&Context->ListEntry);
689 IoCompleteRequest(Irp, IO_NO_INCREMENT);
690 if (IsListEmpty(&Ccb->ReadRequestListHead))
691 {
692 ExReleaseFastMutex(&Ccb->DataListLock);
693 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
694 return OriginalStatus;
695 }
696
697 IoAcquireCancelSpinLock(&oldIrql);
698 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
699
700 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
701 /* Verify the Irp wasnt cancelled */
702 if (Irp->Cancel)
703 {
704 IoReleaseCancelSpinLock(oldIrql);
705 RemoveEntryList(&Context->ListEntry);
706 ExReleaseFastMutex(&Ccb->DataListLock);
707 Status = STATUS_CANCELLED;
708 goto done;
709 }
710 /* The Irp will now be handled, so remove the CancelRoutine */
711 (void)IoSetCancelRoutine(Irp, NULL);
712 IoReleaseCancelSpinLock(oldIrql);
713 }
714 }
715
716 done:
717 Irp->IoStatus.Status = Status;
718
719 if (Status != STATUS_PENDING)
720 {
721 IoCompleteRequest(Irp, IO_NO_INCREMENT);
722 }
723 DPRINT("NpfsRead done (Status %lx)\n", Status);
724
725 return Status;
726 }
727
728 NTSTATUS NTAPI
729 NpfsWrite(PDEVICE_OBJECT DeviceObject,
730 PIRP Irp)
731 {
732 PIO_STACK_LOCATION IoStack;
733 PFILE_OBJECT FileObject;
734 PNPFS_FCB Fcb = NULL;
735 PNPFS_CCB Ccb = NULL;
736 PNPFS_CCB ReaderCcb;
737 PUCHAR Buffer;
738 NTSTATUS Status = STATUS_SUCCESS;
739 ULONG Length;
740 ULONG Offset;
741 ULONG Information;
742 ULONG CopyLength;
743 ULONG TempLength;
744
745 DPRINT("NpfsWrite()\n");
746
747 IoStack = IoGetCurrentIrpStackLocation(Irp);
748 FileObject = IoStack->FileObject;
749 DPRINT("FileObject %p\n", FileObject);
750 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
751
752 Ccb = FileObject->FsContext2;
753 ReaderCcb = Ccb->OtherSide;
754 Fcb = Ccb->Fcb;
755
756 Length = IoStack->Parameters.Write.Length;
757 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
758 Information = 0;
759
760 if (Irp->MdlAddress == NULL)
761 {
762 DPRINT("Irp->MdlAddress == NULL\n");
763 Status = STATUS_UNSUCCESSFUL;
764 Length = 0;
765 goto done;
766 }
767
768 if ((ReaderCcb == NULL) || (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
769 {
770 DPRINT("Pipe is NOT connected!\n");
771 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
772 Status = STATUS_PIPE_LISTENING;
773 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
774 Status = STATUS_PIPE_DISCONNECTED;
775 else
776 Status = STATUS_UNSUCCESSFUL;
777 Length = 0;
778 goto done;
779 }
780
781 if (ReaderCcb->Data == NULL)
782 {
783 DPRINT("Pipe is NOT writable!\n");
784 Status = STATUS_UNSUCCESSFUL;
785 Length = 0;
786 goto done;
787 }
788
789 Status = STATUS_SUCCESS;
790 Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
791
792 ExAcquireFastMutex(&ReaderCcb->DataListLock);
793
794 #ifndef NDEBUG
795 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
796 HexDump(Buffer, Length);
797 #endif
798
799 while(1)
800 {
801 if ((ReaderCcb->WriteQuotaAvailable == 0))
802 {
803 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
804 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
805 {
806 Status = STATUS_PIPE_BROKEN;
807 ExReleaseFastMutex(&ReaderCcb->DataListLock);
808 goto done;
809 }
810 ExReleaseFastMutex(&ReaderCcb->DataListLock);
811
812 DPRINT("Write Waiting for buffer space (%S)\n", Fcb->PipeName.Buffer);
813 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
814 UserRequest,
815 Irp->RequestorMode,
816 FALSE,
817 NULL);
818 DPRINT("Write Finished waiting (%S)! Status: %x\n", Fcb->PipeName.Buffer, Status);
819
820 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
821 {
822 Status = STATUS_CANCELLED;
823 goto done;
824 }
825 if (!NT_SUCCESS(Status))
826 {
827 ASSERT(FALSE);
828 }
829 /*
830 * It's possible that the event was signaled because the
831 * other side of pipe was closed.
832 */
833 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
834 {
835 DPRINT("PipeState: %x\n", Ccb->PipeState);
836 Status = STATUS_PIPE_BROKEN;
837 goto done;
838 }
839 /* Check that the pipe has not been closed */
840 if (ReaderCcb->PipeState != FILE_PIPE_CONNECTED_STATE)
841 {
842 /* If the other side is valid, fire event */
843 if (Ccb)
844 {
845 KeResetEvent(&Ccb->WriteEvent);
846 }
847 Status = STATUS_PIPE_BROKEN;
848 goto done;
849 }
850 ExAcquireFastMutex(&ReaderCcb->DataListLock);
851 }
852
853 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
854 {
855 DPRINT("Byte stream mode: Ccb->Data %x, Ccb->WritePtr %x\n", ReaderCcb->Data, ReaderCcb->WritePtr);
856
857 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
858 {
859 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
860
861 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
862 {
863 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
864 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
865 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
866 {
867 ReaderCcb->WritePtr = ReaderCcb->Data;
868 }
869 }
870 else
871 {
872
873 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength -
874 (ULONG_PTR)ReaderCcb->WritePtr);
875
876 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
877 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
878 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
879 }
880
881 Buffer += CopyLength;
882 Length -= CopyLength;
883 Information += CopyLength;
884
885 ReaderCcb->ReadDataAvailable += CopyLength;
886 ReaderCcb->WriteQuotaAvailable -= CopyLength;
887 }
888
889 if (Length == 0)
890 {
891 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
892 KeResetEvent(&Ccb->WriteEvent);
893 break;
894 }
895 }
896 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
897 {
898 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
899 DPRINT("Message mode: Ccb->Data %x, Ccb->WritePtr %x\n",ReaderCcb->Data, ReaderCcb->WritePtr);
900 if (Length > 0)
901 {
902 /* Verify the WritePtr is still inside the buffer */
903 if (((ULONG_PTR)ReaderCcb->WritePtr > ((ULONG_PTR)ReaderCcb->Data + (ULONG_PTR)ReaderCcb->MaxDataLength)) ||
904 ((ULONG_PTR)ReaderCcb->WritePtr < (ULONG_PTR)ReaderCcb->Data))
905 {
906 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
907 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength %lu\n",
908 ReaderCcb->WritePtr, ReaderCcb->Data, ReaderCcb->MaxDataLength);
909 ASSERT(FALSE);
910 }
911
912 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
913 if (CopyLength > ReaderCcb->WriteQuotaAvailable)
914 {
915 DPRINT1("Writing %lu byte to pipe would overflow as only %lu bytes are available\n",
916 CopyLength, ReaderCcb->WriteQuotaAvailable);
917 ASSERT(FALSE);
918 }
919
920 /* First Copy the Length of the message into the pipes buffer */
921 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
922
923 /* Now the user buffer itself */
924 memcpy((PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength)), Buffer, CopyLength);
925
926 /* Update the write pointer */
927 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
928
929 Information += CopyLength;
930
931 ReaderCcb->ReadDataAvailable += CopyLength;
932
933 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
934
935 if ((ULONG_PTR)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
936 {
937 DPRINT1("QuotaAvailable is greater than buffer size!\n");
938 ASSERT(FALSE);
939 }
940 }
941
942 if (Information > 0)
943 {
944 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
945 KeResetEvent(&Ccb->WriteEvent);
946 break;
947 }
948 }
949 else
950 {
951 DPRINT1("Unhandled Pipe Type Mode and Read Write Mode!\n");
952 ASSERT(FALSE);
953 }
954 }
955
956 ExReleaseFastMutex(&ReaderCcb->DataListLock);
957
958 done:
959 Irp->IoStatus.Status = Status;
960 Irp->IoStatus.Information = Information;
961
962 IoCompleteRequest(Irp, IO_NO_INCREMENT);
963
964 DPRINT("NpfsWrite done (Status %lx)\n", Status);
965
966 return Status;
967 }
968
969 /* EOF */