- Revert 38790 as i broke autotests.
[reactos.git] / reactos / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 #ifndef NDEBUG
20 VOID HexDump(PUCHAR Buffer, ULONG Length)
21 {
22 CHAR Line[65];
23 UCHAR ch;
24 const char Hex[] = "0123456789ABCDEF";
25 int i, j;
26
27 DbgPrint("---------------\n");
28
29 for (i = 0; i < Length; i+= 16)
30 {
31 memset(Line, ' ', 64);
32 Line[64] = 0;
33
34 for (j = 0; j < 16 && j + i < Length; j++)
35 {
36 ch = Buffer[i + j];
37 Line[3*j + 0] = Hex[ch >> 4];
38 Line[3*j + 1] = Hex[ch & 0x0f];
39 Line[48 + j] = isprint(ch) ? ch : '.';
40 }
41 DbgPrint("%s\n", Line);
42 }
43 DbgPrint("---------------\n");
44 }
45 #endif
46
47 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
48 static VOID NTAPI
49 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
50 IN PIRP Irp)
51 {
52 PNPFS_CONTEXT Context;
53 PNPFS_DEVICE_EXTENSION DeviceExt;
54 PIO_STACK_LOCATION IoStack;
55 PNPFS_CCB Ccb;
56 BOOLEAN Complete = FALSE;
57
58 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
59
60 IoReleaseCancelSpinLock(Irp->CancelIrql);
61
62 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
63 DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
64 IoStack = IoGetCurrentIrpStackLocation(Irp);
65 Ccb = IoStack->FileObject->FsContext2;
66
67 KeLockMutex(&DeviceExt->PipeListLock);
68 ExAcquireFastMutex(&Ccb->DataListLock);
69 switch(IoStack->MajorFunction)
70 {
71 case IRP_MJ_READ:
72 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
73 {
74 /* we are not the first in the list, remove an complete us */
75 RemoveEntryList(&Context->ListEntry);
76 Complete = TRUE;
77 }
78 else
79 {
80 KeSetEvent(&Ccb->ReadEvent, IO_NO_INCREMENT, FALSE);
81 }
82 break;
83 default:
84 ASSERT(FALSE);
85 }
86 ExReleaseFastMutex(&Ccb->DataListLock);
87 KeUnlockMutex(&DeviceExt->PipeListLock);
88 if (Complete)
89 {
90 Irp->IoStatus.Status = STATUS_CANCELLED;
91 Irp->IoStatus.Information = 0;
92 IoCompleteRequest(Irp, IO_NO_INCREMENT);
93 }
94 }
95
96 static VOID NTAPI
97 NpfsWaiterThread(PVOID InitContext)
98 {
99 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
100 ULONG CurrentCount;
101 ULONG Count = 0;
102 PIRP Irp = NULL;
103 PIRP NextIrp;
104 NTSTATUS Status;
105 BOOLEAN Terminate = FALSE;
106 BOOLEAN Cancel = FALSE;
107 PIO_STACK_LOCATION IoStack = NULL;
108 PNPFS_CONTEXT Context;
109 PNPFS_CONTEXT NextContext;
110 PNPFS_CCB Ccb;
111
112 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
113
114 while (1)
115 {
116 CurrentCount = ThreadContext->Count;
117 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
118 if (Irp)
119 {
120 if (Cancel)
121 {
122 Irp->IoStatus.Status = STATUS_CANCELLED;
123 Irp->IoStatus.Information = 0;
124 IoCompleteRequest(Irp, IO_NO_INCREMENT);
125 }
126 else
127 {
128 switch (IoStack->MajorFunction)
129 {
130 case IRP_MJ_READ:
131 NpfsRead(IoStack->DeviceObject, Irp);
132 break;
133 default:
134 ASSERT(FALSE);
135 }
136 }
137 }
138 if (Terminate)
139 {
140 break;
141 }
142 Status = KeWaitForMultipleObjects(CurrentCount,
143 ThreadContext->WaitObjectArray,
144 WaitAny,
145 Executive,
146 KernelMode,
147 FALSE,
148 NULL,
149 ThreadContext->WaitBlockArray);
150 if (!NT_SUCCESS(Status))
151 {
152 ASSERT(FALSE);
153 }
154 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
155 Count = Status - STATUS_SUCCESS;
156 ASSERT (Count < CurrentCount);
157 if (Count > 0)
158 {
159 Irp = ThreadContext->WaitIrpArray[Count];
160 ThreadContext->Count--;
161 ThreadContext->DeviceExt->EmptyWaiterCount++;
162 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
163 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
164
165 Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
166 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
167 IoStack = IoGetCurrentIrpStackLocation(Irp);
168
169 if (Cancel)
170 {
171 Ccb = IoStack->FileObject->FsContext2;
172 ExAcquireFastMutex(&Ccb->DataListLock);
173 RemoveEntryList(&Context->ListEntry);
174 switch (IoStack->MajorFunction)
175 {
176 case IRP_MJ_READ:
177 if (!IsListEmpty(&Ccb->ReadRequestListHead))
178 {
179 /* put the next request on the wait list */
180 NextContext = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
181 ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
182 NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
183 ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
184 ThreadContext->Count++;
185 ThreadContext->DeviceExt->EmptyWaiterCount--;
186 }
187 break;
188 default:
189 ASSERT(FALSE);
190 }
191 ExReleaseFastMutex(&Ccb->DataListLock);
192 }
193 }
194 else
195 {
196 /* someone has add a new wait request */
197 Irp = NULL;
198 }
199 if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
200 {
201 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
202 RemoveEntryList(&ThreadContext->ListEntry);
203 ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
204 Terminate = TRUE;
205 }
206 }
207 ExFreePool(ThreadContext);
208 }
209
210 static NTSTATUS
211 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
212 IN PIRP Irp)
213 {
214 PLIST_ENTRY ListEntry;
215 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
216 NTSTATUS Status;
217 HANDLE hThread;
218 KIRQL oldIrql;
219
220 PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
221 PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
222
223 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
224
225 KeLockMutex(&DeviceExt->PipeListLock);
226
227 ListEntry = DeviceExt->ThreadListHead.Flink;
228 while (ListEntry != &DeviceExt->ThreadListHead)
229 {
230 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
231 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
232 {
233 break;
234 }
235 ListEntry = ListEntry->Flink;
236 }
237 if (ListEntry == &DeviceExt->ThreadListHead)
238 {
239 ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
240 if (ThreadContext == NULL)
241 {
242 KeUnlockMutex(&DeviceExt->PipeListLock);
243 return STATUS_NO_MEMORY;
244 }
245 ThreadContext->DeviceExt = DeviceExt;
246 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
247 ThreadContext->Count = 1;
248 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
249
250
251 DPRINT("Creating a new system thread for waiting read/write requests\n");
252
253 Status = PsCreateSystemThread(&hThread,
254 THREAD_ALL_ACCESS,
255 NULL,
256 NULL,
257 NULL,
258 NpfsWaiterThread,
259 (PVOID)ThreadContext);
260 if (!NT_SUCCESS(Status))
261 {
262 ExFreePool(ThreadContext);
263 KeUnlockMutex(&DeviceExt->PipeListLock);
264 return Status;
265 }
266 InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
267 DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
268 }
269 IoMarkIrpPending(Irp);
270
271 IoAcquireCancelSpinLock(&oldIrql);
272 if (Irp->Cancel)
273 {
274 IoReleaseCancelSpinLock(oldIrql);
275 Status = STATUS_CANCELLED;
276 }
277 else
278 {
279 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
280 IoReleaseCancelSpinLock(oldIrql);
281 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
282 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
283 ThreadContext->Count++;
284 DeviceExt->EmptyWaiterCount--;
285 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
286 Status = STATUS_SUCCESS;
287 }
288 KeUnlockMutex(&DeviceExt->PipeListLock);
289 return Status;
290 }
291
292 NTSTATUS NTAPI
293 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
294 IN PIRP Irp)
295 {
296 PFILE_OBJECT FileObject;
297 NTSTATUS Status;
298 NTSTATUS OriginalStatus = STATUS_SUCCESS;
299 PNPFS_CCB Ccb;
300 PNPFS_CONTEXT Context;
301 KEVENT Event;
302 ULONG Length;
303 ULONG Information = 0;
304 ULONG CopyLength = 0;
305 ULONG TempLength;
306 BOOLEAN IsOriginalRequest = TRUE;
307 PVOID Buffer;
308
309 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
310
311 if (Irp->MdlAddress == NULL)
312 {
313 DPRINT("Irp->MdlAddress == NULL\n");
314 Status = STATUS_UNSUCCESSFUL;
315 Irp->IoStatus.Information = 0;
316 goto done;
317 }
318
319 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
320 DPRINT("FileObject %p\n", FileObject);
321 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
322 Ccb = FileObject->FsContext2;
323 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
324
325 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
326 {
327 /* Its ok if the other side has been Disconnect, but if we have data still in the buffer
328 , need to still be able to read it. Currently this is a HAXXXX */
329 DPRINT("Pipe no longer connected and no data exist in buffer. Ok to close pipe\n");
330 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
331 Status = STATUS_PIPE_LISTENING;
332 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
333 Status = STATUS_PIPE_DISCONNECTED;
334 else
335 Status = STATUS_UNSUCCESSFUL;
336 Irp->IoStatus.Information = 0;
337 goto done;
338 }
339
340 if (Ccb->Data == NULL)
341 {
342 DPRINT1("Pipe is NOT readable!\n");
343 Status = STATUS_UNSUCCESSFUL;
344 Irp->IoStatus.Information = 0;
345 goto done;
346 }
347
348 ExAcquireFastMutex(&Ccb->DataListLock);
349
350 if (IoIsOperationSynchronous(Irp))
351 {
352 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
353 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
354 {
355 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
356 Context->WaitEvent = &Event;
357 ExReleaseFastMutex(&Ccb->DataListLock);
358 Status = KeWaitForSingleObject(&Event,
359 Executive,
360 Irp->RequestorMode,
361 FALSE,
362 NULL);
363
364 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
365 {
366 Status = STATUS_CANCELLED;
367 goto done;
368 }
369 if (!NT_SUCCESS(Status))
370 {
371 ASSERT(FALSE);
372 }
373 ExAcquireFastMutex(&Ccb->DataListLock);
374 }
375 Irp->IoStatus.Information = 0;
376 }
377 else
378 {
379 KIRQL oldIrql;
380 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
381 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
382 {
383 /* this is a new request */
384 Irp->IoStatus.Information = 0;
385 Context->WaitEvent = &Ccb->ReadEvent;
386 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
387 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
388 {
389 /* there was already a request on the list */
390 IoAcquireCancelSpinLock(&oldIrql);
391 if (Irp->Cancel)
392 {
393 IoReleaseCancelSpinLock(oldIrql);
394 RemoveEntryList(&Context->ListEntry);
395 ExReleaseFastMutex(&Ccb->DataListLock);
396 Status = STATUS_CANCELLED;
397 goto done;
398 }
399 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
400 IoReleaseCancelSpinLock(oldIrql);
401 ExReleaseFastMutex(&Ccb->DataListLock);
402 IoMarkIrpPending(Irp);
403 Status = STATUS_PENDING;
404 goto done;
405 }
406 }
407 }
408
409 while (1)
410 {
411 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
412 Information = Irp->IoStatus.Information;
413 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
414 ASSERT (Information <= Length);
415 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
416 Length -= Information;
417 Status = STATUS_SUCCESS;
418
419 while (1)
420 {
421 if (Ccb->ReadDataAvailable == 0)
422 {
423 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
424 {
425 ASSERT(Ccb->OtherSide != NULL);
426 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
427 }
428 if (Information > 0 &&
429 (Ccb->Fcb->ReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
430 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
431 {
432 break;
433 }
434 if ((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) && (Ccb->ReadDataAvailable == 0))
435 {
436 DPRINT("PipeState: %x\n", Ccb->PipeState);
437 Status = STATUS_PIPE_BROKEN;
438 break;
439 }
440 ExReleaseFastMutex(&Ccb->DataListLock);
441
442 if (IoIsOperationSynchronous(Irp))
443 {
444 /* Wait for ReadEvent to become signaled */
445
446 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
447 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
448 UserRequest,
449 Irp->RequestorMode,
450 FALSE,
451 NULL);
452 DPRINT("Finished waiting (%wZ)! Status: %x\n", &Ccb->Fcb->PipeName, Status);
453
454 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
455 {
456 Status = STATUS_CANCELLED;
457 break;
458 }
459 if (!NT_SUCCESS(Status))
460 {
461 ASSERT(FALSE);
462 }
463 ExAcquireFastMutex(&Ccb->DataListLock);
464 }
465 else
466 {
467 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
468
469 Context->WaitEvent = &Ccb->ReadEvent;
470
471 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
472
473 if (NT_SUCCESS(Status))
474 {
475 Status = STATUS_PENDING;
476 goto done;
477 }
478 ExAcquireFastMutex(&Ccb->DataListLock);
479 break;
480 }
481 }
482 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
483 if (Ccb->Fcb->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
484 {
485 DPRINT("Byte stream mode\n");
486 /* Byte stream mode */
487 while (Length > 0 && Ccb->ReadDataAvailable > 0)
488 {
489 CopyLength = min(Ccb->ReadDataAvailable, Length);
490 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
491 {
492 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
493 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
494 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
495 {
496 Ccb->ReadPtr = Ccb->Data;
497 }
498 }
499 else
500 {
501 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
502 memcpy(Buffer, Ccb->ReadPtr, TempLength);
503 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
504 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
505 }
506
507 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
508 Length -= CopyLength;
509 Information += CopyLength;
510
511 Ccb->ReadDataAvailable -= CopyLength;
512 Ccb->WriteQuotaAvailable += CopyLength;
513 }
514
515 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
516 {
517 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
518 {
519 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
520 }
521 KeResetEvent(&Ccb->ReadEvent);
522 break;
523 }
524 }
525 else
526 {
527 DPRINT("Message mode\n");
528
529 /* For Message mode, the Message length will be stored in the buffer preceeding the Message. */
530 if (Ccb->ReadDataAvailable)
531 {
532 ULONG NextMessageLength=0;
533 //HexDump(Ccb->Data, (ULONG)Ccb->WritePtr - (ULONG)Ccb->Data);
534 /*First get the size of the message */
535 memcpy(&NextMessageLength, Ccb->Data, sizeof(NextMessageLength));
536 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
537 {
538 DPRINT1("This should never happen! Possible memory corruption.\n");
539 #ifndef NDEBUG
540 HexDump(Ccb->Data, (ULONG)Ccb->WritePtr - (ULONG)Ccb->Data);
541 #endif
542 ASSERT(FALSE);
543 }
544
545 /* Use the smaller value */
546
547 CopyLength = min(NextMessageLength, Length);
548
549 /* retrieve the message from the buffer */
550 memcpy(Buffer, (PVOID)((ULONG)Ccb->Data + sizeof(NextMessageLength)), CopyLength);
551
552
553 if (Ccb->ReadDataAvailable > CopyLength)
554 {
555 if (CopyLength < NextMessageLength)
556 {
557 /* Client only requested part of the message */
558
559 /* Calculate the remaining message new size */
560 ULONG NewMessageSize = NextMessageLength-CopyLength;
561
562 /* Write a new Message size to buffer for the part of the message still there */
563 memcpy(Ccb->Data, &NewMessageSize, sizeof(NewMessageSize));
564
565 /* Move the memory starting from end of partial Message just retrieved */
566 memmove((PVOID)((ULONG_PTR)Ccb->Data + sizeof(NewMessageSize)),
567 (PVOID)((ULONG_PTR) Ccb->Data + CopyLength + sizeof(NewMessageSize)),
568 (ULONG)Ccb->WritePtr - ((ULONG)Ccb->Data + sizeof(NewMessageSize)) - CopyLength);
569
570 /* Update the write pointer */
571 Ccb->WritePtr = (PVOID)((ULONG)Ccb->WritePtr - CopyLength);
572
573 /* Add CopyLength only to WriteQuotaAvailable as the
574 Message Size was replaced in the buffer */
575 Ccb->WriteQuotaAvailable += CopyLength;
576 }
577 else
578 {
579 /* Client wanted the entire message */
580
581 /* Move the memory starting from the next Message just retrieved */
582 memmove(Ccb->Data,
583 (PVOID)((ULONG_PTR) Ccb->Data + NextMessageLength + sizeof(NextMessageLength)),
584 (ULONG)Ccb->WritePtr - (ULONG)Ccb->Data - NextMessageLength - sizeof(NextMessageLength));
585
586 /* Update the write pointer */
587 Ccb->WritePtr = (PVOID)((ULONG)Ccb->WritePtr - NextMessageLength);
588
589 /* Add both the message length and the header to the WriteQuotaAvailable
590 as they both were removed */
591 Ccb->WriteQuotaAvailable += (CopyLength + sizeof(NextMessageLength));
592 }
593 }
594 else
595 {
596 /* This was the last Message, so just zero this messages for safety sake */
597 memset(Ccb->Data,0,NextMessageLength + sizeof(NextMessageLength));
598
599 /* reset the write pointer to beginning of buffer */
600 Ccb->WritePtr = Ccb->Data;
601
602 /* Add both the message length and the header to the WriteQuotaAvailable
603 as they both were removed */
604 Ccb->WriteQuotaAvailable += (CopyLength + sizeof(ULONG));
605
606 KeResetEvent(&Ccb->ReadEvent);
607 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
608 {
609 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
610 }
611 }
612 #ifndef NDEBUG
613 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
614 HexDump((PUCHAR)Buffer, CopyLength);
615 #endif
616
617 Information += CopyLength;
618
619 Ccb->ReadDataAvailable -= CopyLength;
620
621 if ((ULONG)Ccb->WriteQuotaAvailable > (ULONG)Ccb->MaxDataLength) ASSERT(FALSE);
622 }
623
624 if (Information > 0)
625 {
626 break;
627 }
628 }
629 }
630 Irp->IoStatus.Information = Information;
631 Irp->IoStatus.Status = Status;
632
633 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
634
635 if (IoIsOperationSynchronous(Irp))
636 {
637 RemoveEntryList(&Context->ListEntry);
638 if (!IsListEmpty(&Ccb->ReadRequestListHead))
639 {
640 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
641 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
642 }
643 ExReleaseFastMutex(&Ccb->DataListLock);
644 IoCompleteRequest(Irp, IO_NO_INCREMENT);
645
646 DPRINT("NpfsRead done (Status %lx)\n", Status);
647 return Status;
648 }
649 else
650 {
651 if (IsOriginalRequest)
652 {
653 IsOriginalRequest = FALSE;
654 OriginalStatus = Status;
655 }
656 if (Status == STATUS_PENDING)
657 {
658 ExReleaseFastMutex(&Ccb->DataListLock);
659 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
660 return OriginalStatus;
661 }
662 RemoveEntryList(&Context->ListEntry);
663 IoCompleteRequest(Irp, IO_NO_INCREMENT);
664 if (IsListEmpty(&Ccb->ReadRequestListHead))
665 {
666 ExReleaseFastMutex(&Ccb->DataListLock);
667 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
668 return OriginalStatus;
669 }
670 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
671 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
672 }
673 }
674
675 done:
676 Irp->IoStatus.Status = Status;
677
678 if (Status != STATUS_PENDING)
679 {
680 IoCompleteRequest(Irp, IO_NO_INCREMENT);
681 }
682 DPRINT("NpfsRead done (Status %lx)\n", Status);
683
684 return Status;
685 }
686
687 NTSTATUS NTAPI
688 NpfsWrite(PDEVICE_OBJECT DeviceObject,
689 PIRP Irp)
690 {
691 PIO_STACK_LOCATION IoStack;
692 PFILE_OBJECT FileObject;
693 PNPFS_FCB Fcb = NULL;
694 PNPFS_CCB Ccb = NULL;
695 PNPFS_CCB ReaderCcb;
696 PUCHAR Buffer;
697 NTSTATUS Status = STATUS_SUCCESS;
698 ULONG Length;
699 ULONG Offset;
700 ULONG Information;
701 ULONG CopyLength;
702 ULONG TempLength;
703
704 DPRINT("NpfsWrite()\n");
705
706 IoStack = IoGetCurrentIrpStackLocation(Irp);
707 FileObject = IoStack->FileObject;
708 DPRINT("FileObject %p\n", FileObject);
709 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
710
711 Ccb = FileObject->FsContext2;
712 ReaderCcb = Ccb->OtherSide;
713 Fcb = Ccb->Fcb;
714
715 Length = IoStack->Parameters.Write.Length;
716 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
717 Information = 0;
718
719 if (Irp->MdlAddress == NULL)
720 {
721 DPRINT1("Irp->MdlAddress == NULL\n");
722 Status = STATUS_UNSUCCESSFUL;
723 Length = 0;
724 goto done;
725 }
726
727 if (ReaderCcb == NULL)
728 {
729 DPRINT1("Pipe is NOT connected!\n");
730 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
731 Status = STATUS_PIPE_LISTENING;
732 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
733 Status = STATUS_PIPE_DISCONNECTED;
734 else
735 Status = STATUS_UNSUCCESSFUL;
736 Length = 0;
737 goto done;
738 }
739
740 if (ReaderCcb->Data == NULL)
741 {
742 DPRINT1("Pipe is NOT writable!\n");
743 Status = STATUS_UNSUCCESSFUL;
744 Length = 0;
745 goto done;
746 }
747
748 Status = STATUS_SUCCESS;
749 Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
750
751 ExAcquireFastMutex(&ReaderCcb->DataListLock);
752 #ifndef NDEBUG
753 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
754 HexDump(Buffer, Length);
755 #endif
756
757 while(1)
758 {
759 if ((ReaderCcb->WriteQuotaAvailable == 0))
760 {
761 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
762 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
763 {
764 Status = STATUS_PIPE_BROKEN;
765 ExReleaseFastMutex(&ReaderCcb->DataListLock);
766 goto done;
767 }
768 ExReleaseFastMutex(&ReaderCcb->DataListLock);
769
770 DPRINT("Write Waiting for buffer space (%S)\n", Fcb->PipeName.Buffer);
771 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
772 UserRequest,
773 Irp->RequestorMode,
774 FALSE,
775 NULL);
776 DPRINT("Write Finished waiting (%S)! Status: %x\n", Fcb->PipeName.Buffer, Status);
777 ExAcquireFastMutex(&ReaderCcb->DataListLock);
778 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
779 {
780 Status = STATUS_CANCELLED;
781 break;
782 }
783 if (!NT_SUCCESS(Status))
784 {
785 ASSERT(FALSE);
786 }
787 /*
788 * It's possible that the event was signaled because the
789 * other side of pipe was closed.
790 */
791 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
792 {
793 DPRINT("PipeState: %x\n", Ccb->PipeState);
794 Status = STATUS_PIPE_BROKEN;
795 // ExReleaseFastMutex(&ReaderCcb->DataListLock);
796 goto done;
797 }
798 }
799
800 if (Fcb->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
801 {
802 DPRINT("Byte stream mode\n");
803
804 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
805 {
806 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
807
808 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
809 {
810 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
811 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
812 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
813 {
814 ReaderCcb->WritePtr = ReaderCcb->Data;
815 }
816 }
817 else
818 {
819 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength - (ULONG_PTR)ReaderCcb->WritePtr);
820 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
821 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
822 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
823 }
824
825 Buffer += CopyLength;
826 Length -= CopyLength;
827 Information += CopyLength;
828
829 ReaderCcb->ReadDataAvailable += CopyLength;
830 ReaderCcb->WriteQuotaAvailable -= CopyLength;
831 }
832
833 if (Length == 0)
834 {
835 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
836 KeResetEvent(&Ccb->WriteEvent);
837 break;
838 }
839 }
840 else
841 {
842 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
843 /* FIXME: Check and verify ReadMode ByteStream */
844
845 if (Length > 0)
846 {
847 /* Verify the WritePtr is still inside the buffer */
848 if (((ULONG)ReaderCcb->WritePtr > ((ULONG)ReaderCcb->Data + (ULONG)ReaderCcb->MaxDataLength)) ||
849 ((ULONG)ReaderCcb->WritePtr < (ULONG)ReaderCcb->Data))
850 {
851 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
852 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength%d\n",
853 ReaderCcb->WritePtr,ReaderCcb->Data,ReaderCcb->MaxDataLength);
854 ASSERT(FALSE);
855 }
856
857 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
858
859 /* First Copy the Length of the message into the pipes buffer */
860 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
861
862 /* Now the user buffer itself */
863 memcpy((PVOID)((ULONG)ReaderCcb->WritePtr+ sizeof(CopyLength)), Buffer, CopyLength);
864
865 /* Update the write pointer */
866 ReaderCcb->WritePtr = (PVOID)((ULONG)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
867
868 Information += CopyLength;
869
870 ReaderCcb->ReadDataAvailable += CopyLength;
871
872 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
873
874 if ((ULONG)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
875 {
876 DPRINT1("QuotaAvailable is greater than buffer size!\n");
877 ASSERT(FALSE);
878 }
879 }
880
881 if (Information > 0)
882 {
883 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
884 KeResetEvent(&Ccb->WriteEvent);
885 break;
886 }
887 }
888 }
889
890 ExReleaseFastMutex(&ReaderCcb->DataListLock);
891
892 done:
893 Irp->IoStatus.Status = Status;
894 Irp->IoStatus.Information = Information;
895
896 IoCompleteRequest(Irp, IO_NO_INCREMENT);
897
898 DPRINT("NpfsWrite done (Status %lx)\n", Status);
899
900 return Status;
901 }
902
903 /* EOF */