sync to trunk head (37853) (except rbuild changes)
[reactos.git] / reactos / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 */
8
9 /* INCLUDES ******************************************************************/
10
11 #include "npfs.h"
12
13 #define NDEBUG
14 #include <debug.h>
15
16 /* FUNCTIONS *****************************************************************/
17
18 #ifndef NDEBUG
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 int i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44 #endif
45
46 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
47 static VOID NTAPI
48 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
49 IN PIRP Irp)
50 {
51 PNPFS_CONTEXT Context;
52 PNPFS_DEVICE_EXTENSION DeviceExt;
53 PIO_STACK_LOCATION IoStack;
54 PNPFS_CCB Ccb;
55 BOOLEAN Complete = FALSE;
56
57 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
58
59 IoReleaseCancelSpinLock(Irp->CancelIrql);
60
61 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
62 DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
63 IoStack = IoGetCurrentIrpStackLocation(Irp);
64 Ccb = IoStack->FileObject->FsContext2;
65
66 KeLockMutex(&DeviceExt->PipeListLock);
67 ExAcquireFastMutex(&Ccb->DataListLock);
68 switch(IoStack->MajorFunction)
69 {
70 case IRP_MJ_READ:
71 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
72 {
73 /* we are not the first in the list, remove an complete us */
74 RemoveEntryList(&Context->ListEntry);
75 Complete = TRUE;
76 }
77 else
78 {
79 KeSetEvent(&Ccb->ReadEvent, IO_NO_INCREMENT, FALSE);
80 }
81 break;
82 default:
83 ASSERT(FALSE);
84 }
85 ExReleaseFastMutex(&Ccb->DataListLock);
86 KeUnlockMutex(&DeviceExt->PipeListLock);
87 if (Complete)
88 {
89 Irp->IoStatus.Status = STATUS_CANCELLED;
90 Irp->IoStatus.Information = 0;
91 IoCompleteRequest(Irp, IO_NO_INCREMENT);
92 }
93 }
94
95 static VOID NTAPI
96 NpfsWaiterThread(PVOID InitContext)
97 {
98 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
99 ULONG CurrentCount;
100 ULONG Count = 0;
101 PIRP Irp = NULL;
102 PIRP NextIrp;
103 NTSTATUS Status;
104 BOOLEAN Terminate = FALSE;
105 BOOLEAN Cancel = FALSE;
106 PIO_STACK_LOCATION IoStack = NULL;
107 PNPFS_CONTEXT Context;
108 PNPFS_CONTEXT NextContext;
109 PNPFS_CCB Ccb;
110
111 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
112
113 while (1)
114 {
115 CurrentCount = ThreadContext->Count;
116 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
117 if (Irp)
118 {
119 if (Cancel)
120 {
121 Irp->IoStatus.Status = STATUS_CANCELLED;
122 Irp->IoStatus.Information = 0;
123 IoCompleteRequest(Irp, IO_NO_INCREMENT);
124 }
125 else
126 {
127 switch (IoStack->MajorFunction)
128 {
129 case IRP_MJ_READ:
130 NpfsRead(IoStack->DeviceObject, Irp);
131 break;
132 default:
133 ASSERT(FALSE);
134 }
135 }
136 }
137 if (Terminate)
138 {
139 break;
140 }
141 Status = KeWaitForMultipleObjects(CurrentCount,
142 ThreadContext->WaitObjectArray,
143 WaitAny,
144 Executive,
145 KernelMode,
146 FALSE,
147 NULL,
148 ThreadContext->WaitBlockArray);
149 if (!NT_SUCCESS(Status))
150 {
151 ASSERT(FALSE);
152 }
153 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
154 Count = Status - STATUS_SUCCESS;
155 ASSERT (Count < CurrentCount);
156 if (Count > 0)
157 {
158 Irp = ThreadContext->WaitIrpArray[Count];
159 ThreadContext->Count--;
160 ThreadContext->DeviceExt->EmptyWaiterCount++;
161 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
162 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
163
164 Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
165 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
166 IoStack = IoGetCurrentIrpStackLocation(Irp);
167
168 if (Cancel)
169 {
170 Ccb = IoStack->FileObject->FsContext2;
171 ExAcquireFastMutex(&Ccb->DataListLock);
172 RemoveEntryList(&Context->ListEntry);
173 switch (IoStack->MajorFunction)
174 {
175 case IRP_MJ_READ:
176 if (!IsListEmpty(&Ccb->ReadRequestListHead))
177 {
178 /* put the next request on the wait list */
179 NextContext = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
180 ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
181 NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
182 ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
183 ThreadContext->Count++;
184 ThreadContext->DeviceExt->EmptyWaiterCount--;
185 }
186 break;
187 default:
188 ASSERT(FALSE);
189 }
190 ExReleaseFastMutex(&Ccb->DataListLock);
191 }
192 }
193 else
194 {
195 /* someone has add a new wait request */
196 Irp = NULL;
197 }
198 if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
199 {
200 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
201 RemoveEntryList(&ThreadContext->ListEntry);
202 ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
203 Terminate = TRUE;
204 }
205 }
206 ExFreePool(ThreadContext);
207 }
208
209 static NTSTATUS
210 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
211 IN PIRP Irp)
212 {
213 PLIST_ENTRY ListEntry;
214 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
215 NTSTATUS Status;
216 HANDLE hThread;
217 KIRQL oldIrql;
218
219 PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
220 PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
221
222 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
223
224 KeLockMutex(&DeviceExt->PipeListLock);
225
226 ListEntry = DeviceExt->ThreadListHead.Flink;
227 while (ListEntry != &DeviceExt->ThreadListHead)
228 {
229 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
230 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
231 {
232 break;
233 }
234 ListEntry = ListEntry->Flink;
235 }
236 if (ListEntry == &DeviceExt->ThreadListHead)
237 {
238 ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
239 if (ThreadContext == NULL)
240 {
241 KeUnlockMutex(&DeviceExt->PipeListLock);
242 return STATUS_NO_MEMORY;
243 }
244 ThreadContext->DeviceExt = DeviceExt;
245 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
246 ThreadContext->Count = 1;
247 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
248
249
250 DPRINT("Creating a new system thread for waiting read/write requests\n");
251
252 Status = PsCreateSystemThread(&hThread,
253 THREAD_ALL_ACCESS,
254 NULL,
255 NULL,
256 NULL,
257 NpfsWaiterThread,
258 (PVOID)ThreadContext);
259 if (!NT_SUCCESS(Status))
260 {
261 ExFreePool(ThreadContext);
262 KeUnlockMutex(&DeviceExt->PipeListLock);
263 return Status;
264 }
265 InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
266 DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
267 }
268 IoMarkIrpPending(Irp);
269
270 IoAcquireCancelSpinLock(&oldIrql);
271 if (Irp->Cancel)
272 {
273 IoReleaseCancelSpinLock(oldIrql);
274 Status = STATUS_CANCELLED;
275 }
276 else
277 {
278 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
279 IoReleaseCancelSpinLock(oldIrql);
280 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
281 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
282 ThreadContext->Count++;
283 DeviceExt->EmptyWaiterCount--;
284 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
285 Status = STATUS_SUCCESS;
286 }
287 KeUnlockMutex(&DeviceExt->PipeListLock);
288 return Status;
289 }
290
291 NTSTATUS NTAPI
292 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
293 IN PIRP Irp)
294 {
295 PFILE_OBJECT FileObject;
296 NTSTATUS Status;
297 NTSTATUS OriginalStatus = STATUS_SUCCESS;
298 PNPFS_CCB Ccb;
299 PNPFS_CONTEXT Context;
300 KEVENT Event;
301 ULONG Length;
302 ULONG Information = 0;
303 ULONG CopyLength;
304 ULONG TempLength;
305 BOOLEAN IsOriginalRequest = TRUE;
306 PVOID Buffer;
307
308 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
309
310 if (Irp->MdlAddress == NULL)
311 {
312 DPRINT("Irp->MdlAddress == NULL\n");
313 Status = STATUS_UNSUCCESSFUL;
314 Irp->IoStatus.Information = 0;
315 goto done;
316 }
317
318 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
319 Ccb = FileObject->FsContext2;
320 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
321
322 if (Ccb->OtherSide == NULL)
323 {
324 DPRINT("Pipe is NOT connected!\n");
325 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
326 Status = STATUS_PIPE_LISTENING;
327 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
328 Status = STATUS_PIPE_DISCONNECTED;
329 else
330 Status = STATUS_UNSUCCESSFUL;
331 Irp->IoStatus.Information = 0;
332 goto done;
333 }
334
335 if (Ccb->Data == NULL)
336 {
337 DPRINT1("Pipe is NOT readable!\n");
338 Status = STATUS_UNSUCCESSFUL;
339 Irp->IoStatus.Information = 0;
340 goto done;
341 }
342
343 ExAcquireFastMutex(&Ccb->DataListLock);
344
345 if (IoIsOperationSynchronous(Irp))
346 {
347 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
348 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
349 {
350 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
351 Context->WaitEvent = &Event;
352 ExReleaseFastMutex(&Ccb->DataListLock);
353 Status = KeWaitForSingleObject(&Event,
354 Executive,
355 KernelMode,
356 FALSE,
357 NULL);
358 if (!NT_SUCCESS(Status))
359 {
360 ASSERT(FALSE);
361 }
362 ExAcquireFastMutex(&Ccb->DataListLock);
363 }
364 Irp->IoStatus.Information = 0;
365 }
366 else
367 {
368 KIRQL oldIrql;
369 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
370 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
371 {
372 /* this is a new request */
373 Irp->IoStatus.Information = 0;
374 Context->WaitEvent = &Ccb->ReadEvent;
375 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
376 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
377 {
378 /* there was already a request on the list */
379 IoAcquireCancelSpinLock(&oldIrql);
380 if (Irp->Cancel)
381 {
382 IoReleaseCancelSpinLock(oldIrql);
383 RemoveEntryList(&Context->ListEntry);
384 ExReleaseFastMutex(&Ccb->DataListLock);
385 Status = STATUS_CANCELLED;
386 goto done;
387 }
388 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
389 IoReleaseCancelSpinLock(oldIrql);
390 ExReleaseFastMutex(&Ccb->DataListLock);
391 IoMarkIrpPending(Irp);
392 Status = STATUS_PENDING;
393 goto done;
394 }
395 }
396 }
397
398 while (1)
399 {
400 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
401 Information = Irp->IoStatus.Information;
402 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
403 ASSERT (Information <= Length);
404 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
405 Length -= Information;
406 Status = STATUS_SUCCESS;
407
408 while (1)
409 {
410 if (Ccb->ReadDataAvailable == 0)
411 {
412 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
413 {
414 ASSERT(Ccb->OtherSide != NULL);
415 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
416 }
417 if (Information > 0 &&
418 (Ccb->Fcb->ReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
419 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
420 {
421 break;
422 }
423 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
424 {
425 DPRINT("PipeState: %x\n", Ccb->PipeState);
426 Status = STATUS_PIPE_BROKEN;
427 break;
428 }
429 ExReleaseFastMutex(&Ccb->DataListLock);
430 if (IoIsOperationSynchronous(Irp))
431 {
432 /* Wait for ReadEvent to become signaled */
433
434 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
435 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
436 UserRequest,
437 KernelMode,
438 FALSE,
439 NULL);
440 DPRINT("Finished waiting (%wZ)! Status: %x\n", &Ccb->Fcb->PipeName, Status);
441 ExAcquireFastMutex(&Ccb->DataListLock);
442 }
443 else
444 {
445 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
446
447 Context->WaitEvent = &Ccb->ReadEvent;
448 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
449
450 if (NT_SUCCESS(Status))
451 {
452 Status = STATUS_PENDING;
453 goto done;
454 }
455 ExAcquireFastMutex(&Ccb->DataListLock);
456 break;
457 }
458 }
459 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
460 if (Ccb->Fcb->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
461 {
462 DPRINT("Byte stream mode\n");
463 /* Byte stream mode */
464 while (Length > 0 && Ccb->ReadDataAvailable > 0)
465 {
466 CopyLength = min(Ccb->ReadDataAvailable, Length);
467 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
468 {
469 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
470 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
471 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
472 {
473 Ccb->ReadPtr = Ccb->Data;
474 }
475 }
476 else
477 {
478 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
479 memcpy(Buffer, Ccb->ReadPtr, TempLength);
480 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
481 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
482 }
483
484 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
485 Length -= CopyLength;
486 Information += CopyLength;
487
488 Ccb->ReadDataAvailable -= CopyLength;
489 Ccb->WriteQuotaAvailable += CopyLength;
490 }
491
492 if (Length == 0)
493 {
494 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
495 {
496 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
497 }
498 KeResetEvent(&Ccb->ReadEvent);
499 break;
500 }
501 }
502 else
503 {
504 DPRINT("Message mode\n");
505
506 /* Message mode */
507 if (Ccb->ReadDataAvailable)
508 {
509 /* Truncate the message if the receive buffer is too small */
510 CopyLength = min(Ccb->ReadDataAvailable, Length);
511 memcpy(Buffer, Ccb->Data, CopyLength);
512
513 #ifndef NDEBUG
514 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
515 HexDump((PUCHAR)Buffer, CopyLength);
516 #endif
517
518 Information = CopyLength;
519
520 if (Ccb->ReadDataAvailable > Length)
521 {
522 memmove(Ccb->Data, (PVOID)((ULONG_PTR)Ccb->Data + Length),
523 Ccb->ReadDataAvailable - Length);
524 Ccb->ReadDataAvailable -= Length;
525 Status = STATUS_MORE_ENTRIES;
526 }
527 else
528 {
529 KeResetEvent(&Ccb->ReadEvent);
530 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
531 {
532 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
533 }
534 Ccb->ReadDataAvailable = 0;
535 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
536 }
537 }
538
539 if (Information > 0)
540 {
541 break;
542 }
543 }
544 }
545 Irp->IoStatus.Information = Information;
546 Irp->IoStatus.Status = Status;
547
548 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
549
550 if (IoIsOperationSynchronous(Irp))
551 {
552 RemoveEntryList(&Context->ListEntry);
553 if (!IsListEmpty(&Ccb->ReadRequestListHead))
554 {
555 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
556 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
557 }
558 ExReleaseFastMutex(&Ccb->DataListLock);
559 IoCompleteRequest(Irp, IO_NO_INCREMENT);
560
561 DPRINT("NpfsRead done (Status %lx)\n", Status);
562 return Status;
563 }
564 else
565 {
566 if (IsOriginalRequest)
567 {
568 IsOriginalRequest = FALSE;
569 OriginalStatus = Status;
570 }
571 if (Status == STATUS_PENDING)
572 {
573 ExReleaseFastMutex(&Ccb->DataListLock);
574 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
575 return OriginalStatus;
576 }
577 RemoveEntryList(&Context->ListEntry);
578 IoCompleteRequest(Irp, IO_NO_INCREMENT);
579 if (IsListEmpty(&Ccb->ReadRequestListHead))
580 {
581 ExReleaseFastMutex(&Ccb->DataListLock);
582 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
583 return OriginalStatus;
584 }
585 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
586 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
587 }
588 }
589
590 done:
591 Irp->IoStatus.Status = Status;
592
593 if (Status != STATUS_PENDING)
594 {
595 IoCompleteRequest(Irp, IO_NO_INCREMENT);
596 }
597 DPRINT("NpfsRead done (Status %lx)\n", Status);
598
599 return Status;
600 }
601
602 NTSTATUS NTAPI
603 NpfsWrite(PDEVICE_OBJECT DeviceObject,
604 PIRP Irp)
605 {
606 PIO_STACK_LOCATION IoStack;
607 PFILE_OBJECT FileObject;
608 PNPFS_FCB Fcb = NULL;
609 PNPFS_CCB Ccb = NULL;
610 PNPFS_CCB ReaderCcb;
611 PUCHAR Buffer;
612 NTSTATUS Status = STATUS_SUCCESS;
613 ULONG Length;
614 ULONG Offset;
615 ULONG Information;
616 ULONG CopyLength;
617 ULONG TempLength;
618
619 DPRINT("NpfsWrite()\n");
620
621 IoStack = IoGetCurrentIrpStackLocation(Irp);
622 FileObject = IoStack->FileObject;
623 DPRINT("FileObject %p\n", FileObject);
624 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
625
626 Ccb = FileObject->FsContext2;
627 ReaderCcb = Ccb->OtherSide;
628 Fcb = Ccb->Fcb;
629
630 Length = IoStack->Parameters.Write.Length;
631 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
632 Information = 0;
633
634 if (Irp->MdlAddress == NULL)
635 {
636 DPRINT("Irp->MdlAddress == NULL\n");
637 Status = STATUS_UNSUCCESSFUL;
638 Length = 0;
639 goto done;
640 }
641
642 if (ReaderCcb == NULL)
643 {
644 DPRINT("Pipe is NOT connected!\n");
645 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
646 Status = STATUS_PIPE_LISTENING;
647 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
648 Status = STATUS_PIPE_DISCONNECTED;
649 else
650 Status = STATUS_UNSUCCESSFUL;
651 Length = 0;
652 goto done;
653 }
654
655 if (ReaderCcb->Data == NULL)
656 {
657 DPRINT("Pipe is NOT writable!\n");
658 Status = STATUS_UNSUCCESSFUL;
659 Length = 0;
660 goto done;
661 }
662
663 Status = STATUS_SUCCESS;
664 Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
665
666 ExAcquireFastMutex(&ReaderCcb->DataListLock);
667 #ifndef NDEBUG
668 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
669 HexDump(Buffer, Length);
670 #endif
671
672 while(1)
673 {
674 if (ReaderCcb->WriteQuotaAvailable == 0)
675 {
676 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
677 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
678 {
679 Status = STATUS_PIPE_BROKEN;
680 ExReleaseFastMutex(&ReaderCcb->DataListLock);
681 goto done;
682 }
683 ExReleaseFastMutex(&ReaderCcb->DataListLock);
684
685 DPRINT("Waiting for buffer space (%S)\n", Fcb->PipeName.Buffer);
686 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
687 UserRequest,
688 KernelMode,
689 FALSE,
690 NULL);
691 DPRINT("Finished waiting (%S)! Status: %x\n", Fcb->PipeName.Buffer, Status);
692
693 /*
694 * It's possible that the event was signaled because the
695 * other side of pipe was closed.
696 */
697 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
698 {
699 DPRINT("PipeState: %x\n", Ccb->PipeState);
700 Status = STATUS_PIPE_BROKEN;
701 // ExReleaseFastMutex(&ReaderCcb->DataListLock);
702 goto done;
703 }
704
705 ExAcquireFastMutex(&ReaderCcb->DataListLock);
706 }
707
708 if (Fcb->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
709 {
710 DPRINT("Byte stream mode\n");
711 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
712 {
713 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
714 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
715 {
716 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
717 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
718 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
719 {
720 ReaderCcb->WritePtr = ReaderCcb->Data;
721 }
722 }
723 else
724 {
725 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength - (ULONG_PTR)ReaderCcb->WritePtr);
726 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
727 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
728 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
729 }
730
731 Buffer += CopyLength;
732 Length -= CopyLength;
733 Information += CopyLength;
734
735 ReaderCcb->ReadDataAvailable += CopyLength;
736 ReaderCcb->WriteQuotaAvailable -= CopyLength;
737 }
738
739 if (Length == 0)
740 {
741 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
742 KeResetEvent(&Ccb->WriteEvent);
743 break;
744 }
745 }
746 else
747 {
748 DPRINT("Message mode\n");
749 if (Length > 0)
750 {
751 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
752 memcpy(ReaderCcb->Data, Buffer, CopyLength);
753
754 Information = CopyLength;
755 ReaderCcb->ReadDataAvailable = CopyLength;
756 ReaderCcb->WriteQuotaAvailable = 0;
757 }
758
759 if (Information > 0)
760 {
761 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
762 KeResetEvent(&Ccb->WriteEvent);
763 break;
764 }
765 }
766 }
767
768 ExReleaseFastMutex(&ReaderCcb->DataListLock);
769
770 done:
771 Irp->IoStatus.Status = Status;
772 Irp->IoStatus.Information = Information;
773
774 IoCompleteRequest(Irp, IO_NO_INCREMENT);
775
776 DPRINT("NpfsWrite done (Status %lx)\n", Status);
777
778 return Status;
779 }
780
781 /* EOF */