Create a branch for audio work
[reactos.git] / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 int i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44
45 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
46 static VOID NTAPI
47 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
48 IN PIRP Irp)
49 {
50 PNPFS_CONTEXT Context;
51 PNPFS_DEVICE_EXTENSION DeviceExt;
52 PIO_STACK_LOCATION IoStack;
53 PNPFS_CCB Ccb;
54 PLIST_ENTRY ListEntry;
55 PNPFS_THREAD_CONTEXT ThreadContext;
56 ULONG i;
57
58 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
59
60 IoReleaseCancelSpinLock(Irp->CancelIrql);
61
62 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
63 DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
64 IoStack = IoGetCurrentIrpStackLocation(Irp);
65 Ccb = IoStack->FileObject->FsContext2;
66
67 KeLockMutex(&DeviceExt->PipeListLock);
68 ExAcquireFastMutex(&Ccb->DataListLock);
69 switch(IoStack->MajorFunction)
70 {
71 case IRP_MJ_READ:
72 ListEntry = DeviceExt->ThreadListHead.Flink;
73 while (ListEntry != &DeviceExt->ThreadListHead)
74 {
75 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
76 /* Real events start at index 1 */
77 for (i = 1; i < ThreadContext->Count; i++)
78 {
79 if (ThreadContext->WaitIrpArray[i] == Irp)
80 {
81 ASSERT(ThreadContext->WaitObjectArray[i] == Context->WaitEvent);
82
83 ThreadContext->WaitIrpArray[i] = NULL;
84
85 RemoveEntryList(&Context->ListEntry);
86
87 Irp->IoStatus.Status = STATUS_CANCELLED;
88 Irp->IoStatus.Information = 0;
89
90 IoCompleteRequest(Irp, IO_NO_INCREMENT);
91
92 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
93
94 ExReleaseFastMutex(&Ccb->DataListLock);
95 KeUnlockMutex(&DeviceExt->PipeListLock);
96
97 return;
98 }
99 }
100 ListEntry = ListEntry->Flink;
101 }
102
103 RemoveEntryList(&Context->ListEntry);
104
105 ExReleaseFastMutex(&Ccb->DataListLock);
106 KeUnlockMutex(&DeviceExt->PipeListLock);
107
108 Irp->IoStatus.Status = STATUS_CANCELLED;
109 Irp->IoStatus.Information = 0;
110
111 IoCompleteRequest(Irp, IO_NO_INCREMENT);
112 break;
113 default:
114 ASSERT(FALSE);
115 }
116 }
117
118 static VOID NTAPI
119 NpfsWaiterThread(PVOID InitContext)
120 {
121 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
122 ULONG CurrentCount;
123 ULONG Count = 0, i;
124 PIRP Irp = NULL;
125 NTSTATUS Status;
126 PIO_STACK_LOCATION IoStack = NULL;
127 KIRQL OldIrql;
128
129 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
130
131 while (1)
132 {
133 CurrentCount = ThreadContext->Count;
134 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
135 IoAcquireCancelSpinLock(&OldIrql);
136 if (Irp && IoSetCancelRoutine(Irp, NULL) != NULL)
137 {
138 IoReleaseCancelSpinLock(OldIrql);
139 IoStack = IoGetCurrentIrpStackLocation(Irp);
140 switch (IoStack->MajorFunction)
141 {
142 case IRP_MJ_READ:
143 NpfsRead(IoStack->DeviceObject, Irp);
144 break;
145 default:
146 ASSERT(FALSE);
147 }
148 }
149 else
150 {
151 IoReleaseCancelSpinLock(OldIrql);
152 }
153 Status = KeWaitForMultipleObjects(CurrentCount,
154 ThreadContext->WaitObjectArray,
155 WaitAny,
156 Executive,
157 KernelMode,
158 FALSE,
159 NULL,
160 ThreadContext->WaitBlockArray);
161 if (!NT_SUCCESS(Status))
162 {
163 ASSERT(FALSE);
164 }
165 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
166 Count = Status - STATUS_WAIT_0;
167 ASSERT (Count < CurrentCount);
168 if (Count > 0)
169 {
170 Irp = ThreadContext->WaitIrpArray[Count];
171 ThreadContext->Count--;
172 ThreadContext->DeviceExt->EmptyWaiterCount++;
173 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
174 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
175 }
176 else
177 {
178 /* someone has add a new wait request or cancelled an old one */
179 Irp = NULL;
180
181 /* Look for cancelled requests */
182 for (i = 1; i < ThreadContext->Count; i++)
183 {
184 if (ThreadContext->WaitIrpArray[i] == NULL)
185 {
186 ThreadContext->Count--;
187 ThreadContext->DeviceExt->EmptyWaiterCount++;
188 ThreadContext->WaitObjectArray[i] = ThreadContext->WaitObjectArray[ThreadContext->Count];
189 ThreadContext->WaitIrpArray[i] = ThreadContext->WaitIrpArray[ThreadContext->Count];
190 }
191 }
192 }
193 if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
194 {
195 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
196 RemoveEntryList(&ThreadContext->ListEntry);
197 ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
198 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
199 break;
200 }
201 }
202 ExFreePool(ThreadContext);
203 }
204
205 static NTSTATUS
206 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
207 IN PIRP Irp)
208 {
209 PLIST_ENTRY ListEntry;
210 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
211 NTSTATUS Status;
212 HANDLE hThread;
213 KIRQL oldIrql;
214
215 PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
216 PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
217
218 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
219
220 KeLockMutex(&DeviceExt->PipeListLock);
221
222 ListEntry = DeviceExt->ThreadListHead.Flink;
223 while (ListEntry != &DeviceExt->ThreadListHead)
224 {
225 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
226 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
227 {
228 break;
229 }
230 ListEntry = ListEntry->Flink;
231 }
232 if (ListEntry == &DeviceExt->ThreadListHead)
233 {
234 ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
235 if (ThreadContext == NULL)
236 {
237 KeUnlockMutex(&DeviceExt->PipeListLock);
238 return STATUS_NO_MEMORY;
239 }
240 ThreadContext->DeviceExt = DeviceExt;
241 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
242 ThreadContext->Count = 1;
243 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
244
245
246 DPRINT("Creating a new system thread for waiting read/write requests\n");
247
248 Status = PsCreateSystemThread(&hThread,
249 THREAD_ALL_ACCESS,
250 NULL,
251 NULL,
252 NULL,
253 NpfsWaiterThread,
254 (PVOID)ThreadContext);
255 if (!NT_SUCCESS(Status))
256 {
257 ExFreePool(ThreadContext);
258 KeUnlockMutex(&DeviceExt->PipeListLock);
259 return Status;
260 }
261 InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
262 DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
263 }
264 IoMarkIrpPending(Irp);
265
266 IoAcquireCancelSpinLock(&oldIrql);
267 if (Irp->Cancel)
268 {
269 IoReleaseCancelSpinLock(oldIrql);
270 Status = STATUS_CANCELLED;
271 }
272 else
273 {
274 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
275 IoReleaseCancelSpinLock(oldIrql);
276 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
277 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
278 ThreadContext->Count++;
279 DeviceExt->EmptyWaiterCount--;
280 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
281 Status = STATUS_SUCCESS;
282 }
283 KeUnlockMutex(&DeviceExt->PipeListLock);
284 return Status;
285 }
286
287 NTSTATUS NTAPI
288 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
289 IN PIRP Irp)
290 {
291 PFILE_OBJECT FileObject;
292 NTSTATUS Status;
293 NTSTATUS OriginalStatus = STATUS_SUCCESS;
294 PNPFS_CCB Ccb;
295 PNPFS_CONTEXT Context;
296 KEVENT Event;
297 ULONG Length;
298 ULONG Information = 0;
299 ULONG CopyLength = 0;
300 ULONG TempLength;
301 BOOLEAN IsOriginalRequest = TRUE;
302 PVOID Buffer;
303
304 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
305
306 if (Irp->MdlAddress == NULL)
307 {
308 DPRINT("Irp->MdlAddress == NULL\n");
309 Status = STATUS_UNSUCCESSFUL;
310 Irp->IoStatus.Information = 0;
311 goto done;
312 }
313
314 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
315 DPRINT("FileObject %p\n", FileObject);
316 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
317 Ccb = FileObject->FsContext2;
318 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
319
320 if ((Ccb->OtherSide) && (Ccb->OtherSide->PipeState == FILE_PIPE_DISCONNECTED_STATE) && (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE))
321 {
322 DPRINT("Both Client and Server are disconnected!\n");
323 Status = STATUS_PIPE_DISCONNECTED;
324 Irp->IoStatus.Information = 0;
325 goto done;
326
327 }
328
329 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
330 {
331 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
332 Status = STATUS_PIPE_BROKEN;
333 else if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
334 Status = STATUS_PIPE_LISTENING;
335 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
336 Status = STATUS_PIPE_DISCONNECTED;
337 else
338 Status = STATUS_UNSUCCESSFUL;
339 Irp->IoStatus.Information = 0;
340 goto done;
341 }
342
343 if (Ccb->Data == NULL)
344 {
345 DPRINT("Pipe is NOT readable!\n");
346 Status = STATUS_UNSUCCESSFUL;
347 Irp->IoStatus.Information = 0;
348 goto done;
349 }
350
351 ExAcquireFastMutex(&Ccb->DataListLock);
352
353 if (IoIsOperationSynchronous(Irp))
354 {
355 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
356 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
357 {
358 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
359 Context->WaitEvent = &Event;
360 ExReleaseFastMutex(&Ccb->DataListLock);
361 Status = KeWaitForSingleObject(&Event,
362 Executive,
363 Irp->RequestorMode,
364 FALSE,
365 NULL);
366 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
367 {
368 Status = STATUS_CANCELLED;
369 goto done;
370 }
371 if (!NT_SUCCESS(Status))
372 {
373 ASSERT(FALSE);
374 }
375 ExAcquireFastMutex(&Ccb->DataListLock);
376 }
377 Irp->IoStatus.Information = 0;
378 }
379 else
380 {
381 KIRQL oldIrql;
382 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
383 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
384 {
385 /* this is a new request */
386 Irp->IoStatus.Information = 0;
387 Context->WaitEvent = &Ccb->ReadEvent;
388 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
389 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
390 {
391 /* there was already a request on the list */
392 IoAcquireCancelSpinLock(&oldIrql);
393 if (Irp->Cancel)
394 {
395 IoReleaseCancelSpinLock(oldIrql);
396 RemoveEntryList(&Context->ListEntry);
397 ExReleaseFastMutex(&Ccb->DataListLock);
398 Status = STATUS_CANCELLED;
399 goto done;
400 }
401 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
402 IoReleaseCancelSpinLock(oldIrql);
403 ExReleaseFastMutex(&Ccb->DataListLock);
404 IoMarkIrpPending(Irp);
405 Status = STATUS_PENDING;
406 goto done;
407 }
408 }
409 }
410
411 while (1)
412 {
413 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
414 Information = Irp->IoStatus.Information;
415 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
416 ASSERT (Information <= Length);
417 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
418 Length -= Information;
419 Status = STATUS_SUCCESS;
420
421 while (1)
422 {
423 if (Ccb->ReadDataAvailable == 0)
424 {
425 ULONG ConnectionSideReadMode;
426
427 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
428 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
429
430 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
431 {
432 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
433 }
434 if (Information > 0 &&
435 (ConnectionSideReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
436 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
437 {
438 break;
439 }
440 if (((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) || (!Ccb->OtherSide)) && (Ccb->ReadDataAvailable == 0))
441 {
442 DPRINT("PipeState: %x\n", Ccb->PipeState);
443 Status = STATUS_PIPE_BROKEN;
444 break;
445 }
446 ExReleaseFastMutex(&Ccb->DataListLock);
447
448 if (IoIsOperationSynchronous(Irp))
449 {
450 /* Wait for ReadEvent to become signaled */
451
452 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
453 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
454 UserRequest,
455 Irp->RequestorMode,
456 FALSE,
457 NULL);
458 DPRINT("Finished waiting (%wZ)! Status: %x\n", &Ccb->Fcb->PipeName, Status);
459
460 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
461 {
462 Status = STATUS_CANCELLED;
463 break;
464 }
465 if (!NT_SUCCESS(Status))
466 {
467 ASSERT(FALSE);
468 }
469 ExAcquireFastMutex(&Ccb->DataListLock);
470 }
471 else
472 {
473 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
474
475 Context->WaitEvent = &Ccb->ReadEvent;
476 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
477
478 if (NT_SUCCESS(Status))
479 {
480 Status = STATUS_PENDING;
481 goto done;
482 }
483 ExAcquireFastMutex(&Ccb->DataListLock);
484 break;
485 }
486 }
487 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
488
489 /* If the pipe type and read mode are both byte stream */
490 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
491 {
492 DPRINT("Byte stream mode: Ccb->Data %x\n", Ccb->Data);
493 /* Byte stream mode */
494 while (Length > 0 && Ccb->ReadDataAvailable > 0)
495 {
496 CopyLength = min(Ccb->ReadDataAvailable, Length);
497 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
498 {
499 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
500 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
501 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
502 {
503 Ccb->ReadPtr = Ccb->Data;
504 }
505 }
506 else
507 {
508 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
509 memcpy(Buffer, Ccb->ReadPtr, TempLength);
510 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
511 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
512 }
513
514 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
515 Length -= CopyLength;
516 Information += CopyLength;
517
518 Ccb->ReadDataAvailable -= CopyLength;
519 Ccb->WriteQuotaAvailable += CopyLength;
520 }
521
522 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
523 {
524 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
525 {
526 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
527 }
528 KeResetEvent(&Ccb->ReadEvent);
529 break;
530 }
531 }
532 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
533 {
534 DPRINT("Message mode: Ccb>Data %x\n", Ccb->Data);
535
536 /* Check if buffer is full and the read pointer is not at the start of the buffer */
537 if ((Ccb->WriteQuotaAvailable == 0) && (Ccb->ReadPtr > Ccb->Data))
538 {
539 Ccb->WriteQuotaAvailable += (ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data;
540 memcpy(Ccb->Data, Ccb->ReadPtr, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->ReadPtr);
541 Ccb->WritePtr = (PVOID)((ULONG_PTR)Ccb->WritePtr - ((ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data));
542 Ccb->ReadPtr = Ccb->Data;
543 ASSERT((ULONG_PTR)Ccb->WritePtr < ((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength));
544 ASSERT(Ccb->WritePtr >= Ccb->Data);
545 }
546
547 /* For Message mode, the Message length is stored in the buffer preceeding the Message. */
548 if (Ccb->ReadDataAvailable)
549 {
550 ULONG NextMessageLength = 0;
551
552 /*First get the size of the message */
553 memcpy(&NextMessageLength, Ccb->ReadPtr, sizeof(NextMessageLength));
554
555 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
556 {
557 DPRINT1("Possible memory corruption.\n");
558 HexDump(Ccb->Data, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->Data);
559 ASSERT(FALSE);
560 }
561
562 /* Use the smaller value */
563 CopyLength = min(NextMessageLength, Length);
564 ASSERT(CopyLength > 0);
565 ASSERT(CopyLength <= Ccb->ReadDataAvailable);
566 /* retrieve the message from the buffer */
567 memcpy(Buffer, (PVOID)((ULONG_PTR)Ccb->ReadPtr + sizeof(NextMessageLength)), CopyLength);
568
569 if (Ccb->ReadDataAvailable > CopyLength)
570 {
571 if (CopyLength < NextMessageLength)
572 /* Client only requested part of the message */
573 {
574 /* Calculate the remaining message new size */
575 ULONG NewMessageSize = NextMessageLength-CopyLength;
576
577 /* Update ReadPtr to point to new Message size location */
578 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
579
580 /* Write a new Message size to buffer for the part of the message still there */
581 memcpy(Ccb->ReadPtr, &NewMessageSize, sizeof(NewMessageSize));
582 }
583 else
584 /* Client wanted the entire message */
585 {
586 /* Update ReadPtr to point to next message size */
587 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength + sizeof(CopyLength));
588 }
589 }
590 else
591 {
592 /* This was the last Message, so just zero start of buffer for safety sake */
593 memset(Ccb->Data, 0, NextMessageLength + sizeof(NextMessageLength));
594
595 /* Reset to MaxDataLength as partial message retrievals dont
596 give the length back to Quota */
597 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
598
599 /* reset read and write pointer to beginning of buffer */
600 Ccb->WritePtr = Ccb->Data;
601 Ccb->ReadPtr = Ccb->Data;
602 }
603 #ifndef NDEBUG
604 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
605 HexDump((PUCHAR)Buffer, CopyLength);
606 #endif
607
608 Information += CopyLength;
609
610 Ccb->ReadDataAvailable -= CopyLength;
611
612 if ((ULONG)Ccb->WriteQuotaAvailable > (ULONG)Ccb->MaxDataLength) ASSERT(FALSE);
613 }
614
615 if (Information > 0)
616 {
617 ULONG ConnectionSideReadMode;
618
619 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
620 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
621
622 if ((ConnectionSideReadMode == FILE_PIPE_BYTE_STREAM_MODE) && (Ccb->ReadDataAvailable) && (Length > CopyLength))
623 {
624 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
625 Length -= CopyLength;
626 }
627 else
628 {
629 KeResetEvent(&Ccb->ReadEvent);
630
631 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->WriteQuotaAvailable > 0) && (Ccb->OtherSide))
632 {
633 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
634 }
635 break;
636 }
637 }
638 }
639 else
640 {
641 DPRINT1("Unhandled Pipe Mode!\n");
642 ASSERT(FALSE);
643 }
644 }
645 Irp->IoStatus.Information = Information;
646 Irp->IoStatus.Status = Status;
647
648 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
649
650 if (Status == STATUS_CANCELLED)
651 goto done;
652
653 if (IoIsOperationSynchronous(Irp))
654 {
655 RemoveEntryList(&Context->ListEntry);
656 if (!IsListEmpty(&Ccb->ReadRequestListHead))
657 {
658 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
659 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
660 }
661 ExReleaseFastMutex(&Ccb->DataListLock);
662 IoCompleteRequest(Irp, IO_NO_INCREMENT);
663
664 DPRINT("NpfsRead done (Status %lx)\n", Status);
665 return Status;
666 }
667 else
668 {
669 KIRQL oldIrql;
670
671 if (IsOriginalRequest)
672 {
673 IsOriginalRequest = FALSE;
674 OriginalStatus = Status;
675 }
676 if (Status == STATUS_PENDING)
677 {
678 ExReleaseFastMutex(&Ccb->DataListLock);
679 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
680 return OriginalStatus;
681 }
682 RemoveEntryList(&Context->ListEntry);
683 IoCompleteRequest(Irp, IO_NO_INCREMENT);
684 if (IsListEmpty(&Ccb->ReadRequestListHead))
685 {
686 ExReleaseFastMutex(&Ccb->DataListLock);
687 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
688 return OriginalStatus;
689 }
690
691 IoAcquireCancelSpinLock(&oldIrql);
692 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
693
694 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
695 /* Verify the Irp wasnt cancelled */
696 if (Irp->Cancel)
697 {
698 IoReleaseCancelSpinLock(oldIrql);
699 RemoveEntryList(&Context->ListEntry);
700 ExReleaseFastMutex(&Ccb->DataListLock);
701 Status = STATUS_CANCELLED;
702 goto done;
703 }
704 /* The Irp will now be handled, so remove the CancelRoutine */
705 (void)IoSetCancelRoutine(Irp, NULL);
706 IoReleaseCancelSpinLock(oldIrql);
707 }
708 }
709
710 done:
711 Irp->IoStatus.Status = Status;
712
713 if (Status != STATUS_PENDING)
714 {
715 IoCompleteRequest(Irp, IO_NO_INCREMENT);
716 }
717 DPRINT("NpfsRead done (Status %lx)\n", Status);
718
719 return Status;
720 }
721
722 NTSTATUS NTAPI
723 NpfsWrite(PDEVICE_OBJECT DeviceObject,
724 PIRP Irp)
725 {
726 PIO_STACK_LOCATION IoStack;
727 PFILE_OBJECT FileObject;
728 PNPFS_FCB Fcb = NULL;
729 PNPFS_CCB Ccb = NULL;
730 PNPFS_CCB ReaderCcb;
731 PUCHAR Buffer;
732 NTSTATUS Status = STATUS_SUCCESS;
733 ULONG Length;
734 ULONG Offset;
735 ULONG Information;
736 ULONG CopyLength;
737 ULONG TempLength;
738
739 DPRINT("NpfsWrite()\n");
740
741 IoStack = IoGetCurrentIrpStackLocation(Irp);
742 FileObject = IoStack->FileObject;
743 DPRINT("FileObject %p\n", FileObject);
744 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
745
746 Ccb = FileObject->FsContext2;
747 ReaderCcb = Ccb->OtherSide;
748 Fcb = Ccb->Fcb;
749
750 Length = IoStack->Parameters.Write.Length;
751 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
752 Information = 0;
753
754 if (Irp->MdlAddress == NULL)
755 {
756 DPRINT("Irp->MdlAddress == NULL\n");
757 Status = STATUS_UNSUCCESSFUL;
758 Length = 0;
759 goto done;
760 }
761
762 if ((ReaderCcb == NULL) || (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
763 {
764 DPRINT("Pipe is NOT connected!\n");
765 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
766 Status = STATUS_PIPE_LISTENING;
767 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
768 Status = STATUS_PIPE_DISCONNECTED;
769 else
770 Status = STATUS_UNSUCCESSFUL;
771 Length = 0;
772 goto done;
773 }
774
775 if (ReaderCcb->Data == NULL)
776 {
777 DPRINT("Pipe is NOT writable!\n");
778 Status = STATUS_UNSUCCESSFUL;
779 Length = 0;
780 goto done;
781 }
782
783 Status = STATUS_SUCCESS;
784 Buffer = MmGetSystemAddressForMdlSafe (Irp->MdlAddress, NormalPagePriority);
785
786 if (!Buffer)
787 {
788 DPRINT("MmGetSystemAddressForMdlSafe failed\n");
789 Status = STATUS_INSUFFICIENT_RESOURCES;
790 Length = 0;
791 goto done;
792
793 }
794
795 ExAcquireFastMutex(&ReaderCcb->DataListLock);
796
797 #ifndef NDEBUG
798 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
799 HexDump(Buffer, Length);
800 #endif
801
802 while(1)
803 {
804 if ((ReaderCcb->WriteQuotaAvailable == 0))
805 {
806 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
807 {
808 Status = STATUS_PIPE_BROKEN;
809 ExReleaseFastMutex(&ReaderCcb->DataListLock);
810 goto done;
811 }
812 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
813 ExReleaseFastMutex(&ReaderCcb->DataListLock);
814
815 DPRINT("Write Waiting for buffer space (%S)\n", Fcb->PipeName.Buffer);
816 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
817 UserRequest,
818 Irp->RequestorMode,
819 FALSE,
820 NULL);
821 DPRINT("Write Finished waiting (%S)! Status: %x\n", Fcb->PipeName.Buffer, Status);
822
823 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
824 {
825 Status = STATUS_CANCELLED;
826 goto done;
827 }
828 if (!NT_SUCCESS(Status))
829 {
830 ASSERT(FALSE);
831 }
832 /*
833 * It's possible that the event was signaled because the
834 * other side of pipe was closed.
835 */
836 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
837 {
838 DPRINT("PipeState: %x\n", Ccb->PipeState);
839 Status = STATUS_PIPE_BROKEN;
840 goto done;
841 }
842 /* Check that the pipe has not been closed */
843 if (ReaderCcb->PipeState != FILE_PIPE_CONNECTED_STATE || !ReaderCcb->OtherSide)
844 {
845 Status = STATUS_PIPE_BROKEN;
846 goto done;
847 }
848 ExAcquireFastMutex(&ReaderCcb->DataListLock);
849 }
850
851 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
852 {
853 DPRINT("Byte stream mode: Ccb->Data %x, Ccb->WritePtr %x\n", ReaderCcb->Data, ReaderCcb->WritePtr);
854
855 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
856 {
857 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
858
859 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
860 {
861 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
862 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
863 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
864 {
865 ReaderCcb->WritePtr = ReaderCcb->Data;
866 }
867 }
868 else
869 {
870
871 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength -
872 (ULONG_PTR)ReaderCcb->WritePtr);
873
874 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
875 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
876 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
877 }
878
879 Buffer += CopyLength;
880 Length -= CopyLength;
881 Information += CopyLength;
882
883 ReaderCcb->ReadDataAvailable += CopyLength;
884 ReaderCcb->WriteQuotaAvailable -= CopyLength;
885 }
886
887 if (Length == 0)
888 {
889 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
890 KeResetEvent(&Ccb->WriteEvent);
891 break;
892 }
893 }
894 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
895 {
896 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
897 DPRINT("Message mode: Ccb->Data %x, Ccb->WritePtr %x\n",ReaderCcb->Data, ReaderCcb->WritePtr);
898 if (Length > 0)
899 {
900 /* Verify the WritePtr is still inside the buffer */
901 if (((ULONG_PTR)ReaderCcb->WritePtr > ((ULONG_PTR)ReaderCcb->Data + (ULONG_PTR)ReaderCcb->MaxDataLength)) ||
902 ((ULONG_PTR)ReaderCcb->WritePtr < (ULONG_PTR)ReaderCcb->Data))
903 {
904 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
905 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength %lu\n",
906 ReaderCcb->WritePtr, ReaderCcb->Data, ReaderCcb->MaxDataLength);
907 ASSERT(FALSE);
908 }
909
910 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
911 if (CopyLength > ReaderCcb->WriteQuotaAvailable)
912 {
913 DPRINT1("Writing %lu byte to pipe would overflow as only %lu bytes are available\n",
914 CopyLength, ReaderCcb->WriteQuotaAvailable);
915 ASSERT(FALSE);
916 }
917
918 /* First Copy the Length of the message into the pipes buffer */
919 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
920
921 /* Now the user buffer itself */
922 memcpy((PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength)), Buffer, CopyLength);
923
924 /* Update the write pointer */
925 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
926
927 Information += CopyLength;
928
929 ReaderCcb->ReadDataAvailable += CopyLength;
930
931 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
932
933 if ((ULONG_PTR)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
934 {
935 DPRINT1("QuotaAvailable is greater than buffer size!\n");
936 ASSERT(FALSE);
937 }
938 }
939
940 if (Information > 0)
941 {
942 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
943 KeResetEvent(&Ccb->WriteEvent);
944 break;
945 }
946 }
947 else
948 {
949 DPRINT1("Unhandled Pipe Type Mode and Read Write Mode!\n");
950 ASSERT(FALSE);
951 }
952 }
953
954 ExReleaseFastMutex(&ReaderCcb->DataListLock);
955
956 done:
957 Irp->IoStatus.Status = Status;
958 Irp->IoStatus.Information = Information;
959
960 IoCompleteRequest(Irp, IO_NO_INCREMENT);
961
962 DPRINT("NpfsWrite done (Status %lx)\n", Status);
963
964 return Status;
965 }
966
967 /* EOF */