- Update to r53061
[reactos.git] / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 ULONG i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44
45 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
46 static VOID NTAPI
47 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
48 IN PIRP Irp)
49 {
50 PNPFS_CONTEXT Context;
51 PIO_STACK_LOCATION IoStack;
52 PNPFS_VCB Vcb;
53 PNPFS_CCB Ccb;
54 PLIST_ENTRY ListEntry;
55 PNPFS_THREAD_CONTEXT ThreadContext;
56 ULONG i;
57
58 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
59
60 IoReleaseCancelSpinLock(Irp->CancelIrql);
61
62 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
63 Vcb = (PNPFS_VCB)DeviceObject->DeviceExtension;
64 IoStack = IoGetCurrentIrpStackLocation(Irp);
65 Ccb = IoStack->FileObject->FsContext2;
66
67 KeLockMutex(&Vcb->PipeListLock);
68 ExAcquireFastMutex(&Ccb->DataListLock);
69 switch(IoStack->MajorFunction)
70 {
71 case IRP_MJ_READ:
72 ListEntry = Vcb->ThreadListHead.Flink;
73 while (ListEntry != &Vcb->ThreadListHead)
74 {
75 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
76 /* Real events start at index 1 */
77 for (i = 1; i < ThreadContext->Count; i++)
78 {
79 if (ThreadContext->WaitIrpArray[i] == Irp)
80 {
81 ASSERT(ThreadContext->WaitObjectArray[i] == Context->WaitEvent);
82
83 ThreadContext->WaitIrpArray[i] = NULL;
84
85 RemoveEntryList(&Context->ListEntry);
86
87 Irp->IoStatus.Status = STATUS_CANCELLED;
88 Irp->IoStatus.Information = 0;
89
90 IoCompleteRequest(Irp, IO_NO_INCREMENT);
91
92 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
93
94 ExReleaseFastMutex(&Ccb->DataListLock);
95 KeUnlockMutex(&Vcb->PipeListLock);
96
97 return;
98 }
99 }
100 ListEntry = ListEntry->Flink;
101 }
102
103 RemoveEntryList(&Context->ListEntry);
104
105 ExReleaseFastMutex(&Ccb->DataListLock);
106 KeUnlockMutex(&Vcb->PipeListLock);
107
108 Irp->IoStatus.Status = STATUS_CANCELLED;
109 Irp->IoStatus.Information = 0;
110
111 IoCompleteRequest(Irp, IO_NO_INCREMENT);
112 break;
113 default:
114 ASSERT(FALSE);
115 }
116 }
117
118 static VOID NTAPI
119 NpfsWaiterThread(PVOID InitContext)
120 {
121 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
122 ULONG CurrentCount;
123 ULONG Count = 0, i;
124 PIRP Irp = NULL;
125 NTSTATUS Status;
126 PIO_STACK_LOCATION IoStack = NULL;
127 KIRQL OldIrql;
128
129 KeLockMutex(&ThreadContext->Vcb->PipeListLock);
130
131 while (1)
132 {
133 CurrentCount = ThreadContext->Count;
134 KeUnlockMutex(&ThreadContext->Vcb->PipeListLock);
135 IoAcquireCancelSpinLock(&OldIrql);
136 if (Irp && IoSetCancelRoutine(Irp, NULL) != NULL)
137 {
138 IoReleaseCancelSpinLock(OldIrql);
139 IoStack = IoGetCurrentIrpStackLocation(Irp);
140 switch (IoStack->MajorFunction)
141 {
142 case IRP_MJ_READ:
143 NpfsRead(IoStack->DeviceObject, Irp);
144 break;
145 default:
146 ASSERT(FALSE);
147 }
148 }
149 else
150 {
151 IoReleaseCancelSpinLock(OldIrql);
152 }
153 Status = KeWaitForMultipleObjects(CurrentCount,
154 ThreadContext->WaitObjectArray,
155 WaitAny,
156 Executive,
157 KernelMode,
158 FALSE,
159 NULL,
160 ThreadContext->WaitBlockArray);
161 if (!NT_SUCCESS(Status))
162 {
163 ASSERT(FALSE);
164 }
165 KeLockMutex(&ThreadContext->Vcb->PipeListLock);
166 Count = Status - STATUS_WAIT_0;
167 ASSERT (Count < CurrentCount);
168 if (Count > 0)
169 {
170 Irp = ThreadContext->WaitIrpArray[Count];
171 ThreadContext->Count--;
172 ThreadContext->Vcb->EmptyWaiterCount++;
173 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
174 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
175 }
176 else
177 {
178 /* someone has add a new wait request or cancelled an old one */
179 Irp = NULL;
180
181 /* Look for cancelled requests */
182 for (i = 1; i < ThreadContext->Count; i++)
183 {
184 if (ThreadContext->WaitIrpArray[i] == NULL)
185 {
186 ThreadContext->Count--;
187 ThreadContext->Vcb->EmptyWaiterCount++;
188 ThreadContext->WaitObjectArray[i] = ThreadContext->WaitObjectArray[ThreadContext->Count];
189 ThreadContext->WaitIrpArray[i] = ThreadContext->WaitIrpArray[ThreadContext->Count];
190 }
191 }
192 }
193 if (ThreadContext->Count == 1 && ThreadContext->Vcb->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
194 {
195 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
196 RemoveEntryList(&ThreadContext->ListEntry);
197 ExFreePoolWithTag(ThreadContext, TAG_NPFS_THREAD_CONTEXT);
198 KeUnlockMutex(&ThreadContext->Vcb->PipeListLock);
199 break;
200 }
201 }
202 }
203
204 static NTSTATUS
205 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
206 IN PIRP Irp)
207 {
208 PLIST_ENTRY ListEntry;
209 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
210 PNPFS_CONTEXT Context;
211 HANDLE hThread;
212 PNPFS_VCB Vcb;
213 KIRQL oldIrql;
214 NTSTATUS Status;
215
216 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
217 Vcb = (PNPFS_VCB)DeviceObject->DeviceExtension;
218
219 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
220
221 KeLockMutex(&Vcb->PipeListLock);
222
223 ListEntry = Vcb->ThreadListHead.Flink;
224 while (ListEntry != &Vcb->ThreadListHead)
225 {
226 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
227 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
228 {
229 break;
230 }
231 ListEntry = ListEntry->Flink;
232 }
233
234 if (ListEntry == &Vcb->ThreadListHead)
235 {
236 ThreadContext = ExAllocatePoolWithTag(NonPagedPool,
237 sizeof(NPFS_THREAD_CONTEXT),
238 TAG_NPFS_THREAD_CONTEXT);
239 if (ThreadContext == NULL)
240 {
241 KeUnlockMutex(&Vcb->PipeListLock);
242 return STATUS_NO_MEMORY;
243 }
244
245 ThreadContext->Vcb = Vcb;
246 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
247 ThreadContext->Count = 1;
248 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
249
250 DPRINT("Creating a new system thread for waiting read/write requests\n");
251
252 Status = PsCreateSystemThread(&hThread,
253 THREAD_ALL_ACCESS,
254 NULL,
255 NULL,
256 NULL,
257 NpfsWaiterThread,
258 (PVOID)ThreadContext);
259 if (!NT_SUCCESS(Status))
260 {
261 ExFreePoolWithTag(ThreadContext, TAG_NPFS_THREAD_CONTEXT);
262 KeUnlockMutex(&Vcb->PipeListLock);
263 return Status;
264 }
265
266 InsertHeadList(&Vcb->ThreadListHead, &ThreadContext->ListEntry);
267 Vcb->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
268 }
269 IoMarkIrpPending(Irp);
270
271 IoAcquireCancelSpinLock(&oldIrql);
272 if (Irp->Cancel)
273 {
274 IoReleaseCancelSpinLock(oldIrql);
275 Status = STATUS_CANCELLED;
276 }
277 else
278 {
279 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
280 IoReleaseCancelSpinLock(oldIrql);
281 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
282 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
283 ThreadContext->Count++;
284 Vcb->EmptyWaiterCount--;
285 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
286 Status = STATUS_SUCCESS;
287 }
288 KeUnlockMutex(&Vcb->PipeListLock);
289 return Status;
290 }
291
292 NTSTATUS NTAPI
293 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
294 IN PIRP Irp)
295 {
296 PFILE_OBJECT FileObject;
297 NTSTATUS Status;
298 NTSTATUS OriginalStatus = STATUS_SUCCESS;
299 PNPFS_CCB Ccb;
300 PNPFS_CONTEXT Context;
301 KEVENT Event;
302 ULONG Length;
303 ULONG Information = 0;
304 ULONG CopyLength = 0;
305 ULONG TempLength;
306 BOOLEAN IsOriginalRequest = TRUE;
307 PVOID Buffer;
308
309 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
310
311 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
312 DPRINT("FileObject %p\n", FileObject);
313 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
314 Ccb = FileObject->FsContext2;
315
316 /* Fail, if the CCB is not a pipe CCB */
317 if (Ccb->Type != CCB_PIPE)
318 {
319 DPRINT("Not a pipe!\n");
320 Status = STATUS_INVALID_PARAMETER;
321 Irp->IoStatus.Information = 0;
322 goto done;
323 }
324
325 if (Irp->MdlAddress == NULL)
326 {
327 DPRINT("Irp->MdlAddress == NULL\n");
328 Status = STATUS_UNSUCCESSFUL;
329 Irp->IoStatus.Information = 0;
330 goto done;
331 }
332
333 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
334
335 if ((Ccb->OtherSide) && (Ccb->OtherSide->PipeState == FILE_PIPE_DISCONNECTED_STATE) && (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE))
336 {
337 DPRINT("Both Client and Server are disconnected!\n");
338 Status = STATUS_PIPE_DISCONNECTED;
339 Irp->IoStatus.Information = 0;
340 goto done;
341
342 }
343
344 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
345 {
346 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
347 Status = STATUS_PIPE_BROKEN;
348 else if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
349 Status = STATUS_PIPE_LISTENING;
350 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
351 Status = STATUS_PIPE_DISCONNECTED;
352 else
353 Status = STATUS_UNSUCCESSFUL;
354 Irp->IoStatus.Information = 0;
355 goto done;
356 }
357
358 if (Ccb->Data == NULL)
359 {
360 DPRINT("Pipe is NOT readable!\n");
361 Status = STATUS_UNSUCCESSFUL;
362 Irp->IoStatus.Information = 0;
363 goto done;
364 }
365
366 ExAcquireFastMutex(&Ccb->DataListLock);
367
368 if (IoIsOperationSynchronous(Irp))
369 {
370 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
371 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
372 {
373 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
374 Context->WaitEvent = &Event;
375 ExReleaseFastMutex(&Ccb->DataListLock);
376 Status = KeWaitForSingleObject(&Event,
377 Executive,
378 Irp->RequestorMode,
379 FALSE,
380 NULL);
381 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
382 {
383 Status = STATUS_CANCELLED;
384 goto done;
385 }
386 if (!NT_SUCCESS(Status))
387 {
388 ASSERT(FALSE);
389 }
390 ExAcquireFastMutex(&Ccb->DataListLock);
391 }
392 Irp->IoStatus.Information = 0;
393 }
394 else
395 {
396 KIRQL oldIrql;
397 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
398 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
399 {
400 /* this is a new request */
401 Irp->IoStatus.Information = 0;
402 Context->WaitEvent = &Ccb->ReadEvent;
403 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
404 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
405 {
406 /* there was already a request on the list */
407 IoAcquireCancelSpinLock(&oldIrql);
408 if (Irp->Cancel)
409 {
410 IoReleaseCancelSpinLock(oldIrql);
411 RemoveEntryList(&Context->ListEntry);
412 ExReleaseFastMutex(&Ccb->DataListLock);
413 Status = STATUS_CANCELLED;
414 goto done;
415 }
416 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
417 IoReleaseCancelSpinLock(oldIrql);
418 ExReleaseFastMutex(&Ccb->DataListLock);
419 IoMarkIrpPending(Irp);
420 Status = STATUS_PENDING;
421 goto done;
422 }
423 }
424 }
425
426 while (1)
427 {
428 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
429 Information = Irp->IoStatus.Information;
430 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
431 ASSERT (Information <= Length);
432 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
433 Length -= Information;
434 Status = STATUS_SUCCESS;
435
436 while (1)
437 {
438 if (Ccb->ReadDataAvailable == 0)
439 {
440 ULONG ConnectionSideReadMode;
441
442 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
443 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
444
445 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
446 {
447 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
448 }
449 if (Information > 0 &&
450 (ConnectionSideReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
451 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
452 {
453 break;
454 }
455 if (((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) || (!Ccb->OtherSide)) && (Ccb->ReadDataAvailable == 0))
456 {
457 DPRINT("PipeState: %x\n", Ccb->PipeState);
458 Status = STATUS_PIPE_BROKEN;
459 break;
460 }
461 ExReleaseFastMutex(&Ccb->DataListLock);
462
463 if (IoIsOperationSynchronous(Irp))
464 {
465 /* Wait for ReadEvent to become signaled */
466
467 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
468 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
469 UserRequest,
470 Irp->RequestorMode,
471 FALSE,
472 NULL);
473 DPRINT("Finished waiting (%wZ)! Status: %x\n", &Ccb->Fcb->PipeName, Status);
474
475 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
476 {
477 Status = STATUS_CANCELLED;
478 break;
479 }
480 if (!NT_SUCCESS(Status))
481 {
482 ASSERT(FALSE);
483 }
484 ExAcquireFastMutex(&Ccb->DataListLock);
485 }
486 else
487 {
488 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
489
490 Context->WaitEvent = &Ccb->ReadEvent;
491 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
492
493 if (NT_SUCCESS(Status))
494 {
495 Status = STATUS_PENDING;
496 goto done;
497 }
498 ExAcquireFastMutex(&Ccb->DataListLock);
499 break;
500 }
501 }
502 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
503
504 /* If the pipe type and read mode are both byte stream */
505 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
506 {
507 DPRINT("Byte stream mode: Ccb->Data %x\n", Ccb->Data);
508 /* Byte stream mode */
509 while (Length > 0 && Ccb->ReadDataAvailable > 0)
510 {
511 CopyLength = min(Ccb->ReadDataAvailable, Length);
512 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
513 {
514 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
515 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
516 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
517 {
518 Ccb->ReadPtr = Ccb->Data;
519 }
520 }
521 else
522 {
523 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
524 memcpy(Buffer, Ccb->ReadPtr, TempLength);
525 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
526 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
527 }
528
529 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
530 Length -= CopyLength;
531 Information += CopyLength;
532
533 Ccb->ReadDataAvailable -= CopyLength;
534 Ccb->WriteQuotaAvailable += CopyLength;
535 }
536
537 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
538 {
539 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
540 {
541 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
542 }
543 KeResetEvent(&Ccb->ReadEvent);
544 break;
545 }
546 }
547 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
548 {
549 DPRINT("Message mode: Ccb>Data %x\n", Ccb->Data);
550
551 /* Check if buffer is full and the read pointer is not at the start of the buffer */
552 if ((Ccb->WriteQuotaAvailable == 0) && (Ccb->ReadPtr > Ccb->Data))
553 {
554 Ccb->WriteQuotaAvailable += (ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data;
555 memcpy(Ccb->Data, Ccb->ReadPtr, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->ReadPtr);
556 Ccb->WritePtr = (PVOID)((ULONG_PTR)Ccb->WritePtr - ((ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data));
557 Ccb->ReadPtr = Ccb->Data;
558 ASSERT((ULONG_PTR)Ccb->WritePtr < ((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength));
559 ASSERT(Ccb->WritePtr >= Ccb->Data);
560 }
561
562 /* For Message mode, the Message length is stored in the buffer preceeding the Message. */
563 if (Ccb->ReadDataAvailable)
564 {
565 ULONG NextMessageLength = 0;
566
567 /*First get the size of the message */
568 memcpy(&NextMessageLength, Ccb->ReadPtr, sizeof(NextMessageLength));
569
570 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
571 {
572 DPRINT1("Possible memory corruption.\n");
573 HexDump(Ccb->Data, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->Data);
574 ASSERT(FALSE);
575 }
576
577 /* Use the smaller value */
578 CopyLength = min(NextMessageLength, Length);
579 ASSERT(CopyLength > 0);
580 ASSERT(CopyLength <= Ccb->ReadDataAvailable);
581 /* retrieve the message from the buffer */
582 memcpy(Buffer, (PVOID)((ULONG_PTR)Ccb->ReadPtr + sizeof(NextMessageLength)), CopyLength);
583
584 if (Ccb->ReadDataAvailable > CopyLength)
585 {
586 if (CopyLength < NextMessageLength)
587 /* Client only requested part of the message */
588 {
589 /* Calculate the remaining message new size */
590 ULONG NewMessageSize = NextMessageLength-CopyLength;
591
592 /* Update ReadPtr to point to new Message size location */
593 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
594
595 /* Write a new Message size to buffer for the part of the message still there */
596 memcpy(Ccb->ReadPtr, &NewMessageSize, sizeof(NewMessageSize));
597 }
598 else
599 /* Client wanted the entire message */
600 {
601 /* Update ReadPtr to point to next message size */
602 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength + sizeof(CopyLength));
603 }
604 }
605 else
606 {
607 /* This was the last Message, so just zero start of buffer for safety sake */
608 memset(Ccb->Data, 0, NextMessageLength + sizeof(NextMessageLength));
609
610 /* Reset to MaxDataLength as partial message retrievals dont
611 give the length back to Quota */
612 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
613
614 /* reset read and write pointer to beginning of buffer */
615 Ccb->WritePtr = Ccb->Data;
616 Ccb->ReadPtr = Ccb->Data;
617 }
618 #ifndef NDEBUG
619 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
620 HexDump((PUCHAR)Buffer, CopyLength);
621 #endif
622
623 Information += CopyLength;
624
625 Ccb->ReadDataAvailable -= CopyLength;
626
627 if ((ULONG)Ccb->WriteQuotaAvailable > (ULONG)Ccb->MaxDataLength) ASSERT(FALSE);
628 }
629
630 if (Information > 0)
631 {
632 ULONG ConnectionSideReadMode;
633
634 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
635 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
636
637 if ((ConnectionSideReadMode == FILE_PIPE_BYTE_STREAM_MODE) && (Ccb->ReadDataAvailable) && (Length > CopyLength))
638 {
639 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
640 Length -= CopyLength;
641 }
642 else
643 {
644 KeResetEvent(&Ccb->ReadEvent);
645
646 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->WriteQuotaAvailable > 0) && (Ccb->OtherSide))
647 {
648 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
649 }
650 break;
651 }
652 }
653 }
654 else
655 {
656 DPRINT1("Unhandled Pipe Mode!\n");
657 ASSERT(FALSE);
658 }
659 }
660 Irp->IoStatus.Information = Information;
661 Irp->IoStatus.Status = Status;
662
663 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
664
665 if (Status == STATUS_CANCELLED)
666 goto done;
667
668 if (IoIsOperationSynchronous(Irp))
669 {
670 RemoveEntryList(&Context->ListEntry);
671 if (!IsListEmpty(&Ccb->ReadRequestListHead))
672 {
673 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
674 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
675 }
676 ExReleaseFastMutex(&Ccb->DataListLock);
677 IoCompleteRequest(Irp, IO_NO_INCREMENT);
678
679 DPRINT("NpfsRead done (Status %lx)\n", Status);
680 return Status;
681 }
682 else
683 {
684 KIRQL oldIrql;
685
686 if (IsOriginalRequest)
687 {
688 IsOriginalRequest = FALSE;
689 OriginalStatus = Status;
690 }
691 if (Status == STATUS_PENDING)
692 {
693 ExReleaseFastMutex(&Ccb->DataListLock);
694 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
695 return OriginalStatus;
696 }
697 RemoveEntryList(&Context->ListEntry);
698 IoCompleteRequest(Irp, IO_NO_INCREMENT);
699 if (IsListEmpty(&Ccb->ReadRequestListHead))
700 {
701 ExReleaseFastMutex(&Ccb->DataListLock);
702 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
703 return OriginalStatus;
704 }
705
706 IoAcquireCancelSpinLock(&oldIrql);
707 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
708
709 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
710 /* Verify the Irp wasnt cancelled */
711 if (Irp->Cancel)
712 {
713 IoReleaseCancelSpinLock(oldIrql);
714 RemoveEntryList(&Context->ListEntry);
715 ExReleaseFastMutex(&Ccb->DataListLock);
716 Status = STATUS_CANCELLED;
717 goto done;
718 }
719 /* The Irp will now be handled, so remove the CancelRoutine */
720 (void)IoSetCancelRoutine(Irp, NULL);
721 IoReleaseCancelSpinLock(oldIrql);
722 }
723 }
724
725 done:
726 Irp->IoStatus.Status = Status;
727
728 if (Status != STATUS_PENDING)
729 {
730 IoCompleteRequest(Irp, IO_NO_INCREMENT);
731 }
732 DPRINT("NpfsRead done (Status %lx)\n", Status);
733
734 return Status;
735 }
736
737 NTSTATUS NTAPI
738 NpfsWrite(PDEVICE_OBJECT DeviceObject,
739 PIRP Irp)
740 {
741 PIO_STACK_LOCATION IoStack;
742 PFILE_OBJECT FileObject;
743 PNPFS_FCB Fcb = NULL;
744 PNPFS_CCB Ccb = NULL;
745 PNPFS_CCB ReaderCcb;
746 PUCHAR Buffer;
747 NTSTATUS Status = STATUS_SUCCESS;
748 ULONG Length;
749 ULONG Offset;
750 ULONG Information = 0;
751 ULONG CopyLength;
752 ULONG TempLength;
753
754 DPRINT("NpfsWrite()\n");
755
756 IoStack = IoGetCurrentIrpStackLocation(Irp);
757 FileObject = IoStack->FileObject;
758 DPRINT("FileObject %p\n", FileObject);
759 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
760
761 Ccb = FileObject->FsContext2;
762
763 /* Fail, if the CCB is not a pipe CCB */
764 if (Ccb->Type != CCB_PIPE)
765 {
766 DPRINT("Not a pipe!\n");
767 Status = STATUS_INVALID_PARAMETER;
768 Length = 0;
769 goto done;
770 }
771
772 ReaderCcb = Ccb->OtherSide;
773 Fcb = Ccb->Fcb;
774
775 Length = IoStack->Parameters.Write.Length;
776 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
777
778 if (Irp->MdlAddress == NULL)
779 {
780 DPRINT("Irp->MdlAddress == NULL\n");
781 Status = STATUS_UNSUCCESSFUL;
782 Length = 0;
783 goto done;
784 }
785
786 if ((ReaderCcb == NULL) || (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
787 {
788 DPRINT("Pipe is NOT connected!\n");
789 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
790 Status = STATUS_PIPE_LISTENING;
791 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
792 Status = STATUS_PIPE_DISCONNECTED;
793 else
794 Status = STATUS_UNSUCCESSFUL;
795 Length = 0;
796 goto done;
797 }
798
799 if (ReaderCcb->Data == NULL)
800 {
801 DPRINT("Pipe is NOT writable!\n");
802 Status = STATUS_UNSUCCESSFUL;
803 Length = 0;
804 goto done;
805 }
806
807 Status = STATUS_SUCCESS;
808 Buffer = MmGetSystemAddressForMdlSafe (Irp->MdlAddress, NormalPagePriority);
809
810 if (!Buffer)
811 {
812 DPRINT("MmGetSystemAddressForMdlSafe failed\n");
813 Status = STATUS_INSUFFICIENT_RESOURCES;
814 Length = 0;
815 goto done;
816
817 }
818
819 ExAcquireFastMutex(&ReaderCcb->DataListLock);
820
821 #ifndef NDEBUG
822 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
823 HexDump(Buffer, Length);
824 #endif
825
826 while(1)
827 {
828 if ((ReaderCcb->WriteQuotaAvailable == 0))
829 {
830 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
831 {
832 Status = STATUS_PIPE_BROKEN;
833 ExReleaseFastMutex(&ReaderCcb->DataListLock);
834 goto done;
835 }
836 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
837 ExReleaseFastMutex(&ReaderCcb->DataListLock);
838
839 DPRINT("Write Waiting for buffer space (%S)\n", Fcb->PipeName.Buffer);
840 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
841 UserRequest,
842 Irp->RequestorMode,
843 FALSE,
844 NULL);
845 DPRINT("Write Finished waiting (%S)! Status: %x\n", Fcb->PipeName.Buffer, Status);
846
847 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC))
848 {
849 Status = STATUS_CANCELLED;
850 goto done;
851 }
852 if (!NT_SUCCESS(Status))
853 {
854 ASSERT(FALSE);
855 }
856 /*
857 * It's possible that the event was signaled because the
858 * other side of pipe was closed.
859 */
860 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
861 {
862 DPRINT("PipeState: %x\n", Ccb->PipeState);
863 Status = STATUS_PIPE_BROKEN;
864 goto done;
865 }
866 /* Check that the pipe has not been closed */
867 if (ReaderCcb->PipeState != FILE_PIPE_CONNECTED_STATE || !ReaderCcb->OtherSide)
868 {
869 Status = STATUS_PIPE_BROKEN;
870 goto done;
871 }
872 ExAcquireFastMutex(&ReaderCcb->DataListLock);
873 }
874
875 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
876 {
877 DPRINT("Byte stream mode: Ccb->Data %x, Ccb->WritePtr %x\n", ReaderCcb->Data, ReaderCcb->WritePtr);
878
879 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
880 {
881 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
882
883 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
884 {
885 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
886 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
887 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
888 {
889 ReaderCcb->WritePtr = ReaderCcb->Data;
890 }
891 }
892 else
893 {
894
895 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength -
896 (ULONG_PTR)ReaderCcb->WritePtr);
897
898 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
899 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
900 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
901 }
902
903 Buffer += CopyLength;
904 Length -= CopyLength;
905 Information += CopyLength;
906
907 ReaderCcb->ReadDataAvailable += CopyLength;
908 ReaderCcb->WriteQuotaAvailable -= CopyLength;
909 }
910
911 if (Length == 0)
912 {
913 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
914 KeResetEvent(&Ccb->WriteEvent);
915 break;
916 }
917 }
918 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
919 {
920 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
921 DPRINT("Message mode: Ccb->Data %x, Ccb->WritePtr %x\n",ReaderCcb->Data, ReaderCcb->WritePtr);
922 if (Length > 0)
923 {
924 /* Verify the WritePtr is still inside the buffer */
925 if (((ULONG_PTR)ReaderCcb->WritePtr > ((ULONG_PTR)ReaderCcb->Data + (ULONG_PTR)ReaderCcb->MaxDataLength)) ||
926 ((ULONG_PTR)ReaderCcb->WritePtr < (ULONG_PTR)ReaderCcb->Data))
927 {
928 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
929 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength %lu\n",
930 ReaderCcb->WritePtr, ReaderCcb->Data, ReaderCcb->MaxDataLength);
931 ASSERT(FALSE);
932 }
933
934 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
935 if (CopyLength > ReaderCcb->WriteQuotaAvailable)
936 {
937 DPRINT1("Writing %lu byte to pipe would overflow as only %lu bytes are available\n",
938 CopyLength, ReaderCcb->WriteQuotaAvailable);
939 ASSERT(FALSE);
940 }
941
942 /* First Copy the Length of the message into the pipes buffer */
943 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
944
945 /* Now the user buffer itself */
946 memcpy((PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength)), Buffer, CopyLength);
947
948 /* Update the write pointer */
949 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
950
951 Information += CopyLength;
952
953 ReaderCcb->ReadDataAvailable += CopyLength;
954
955 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
956
957 if ((ULONG_PTR)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
958 {
959 DPRINT1("QuotaAvailable is greater than buffer size!\n");
960 ASSERT(FALSE);
961 }
962 }
963
964 if (Information > 0)
965 {
966 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
967 KeResetEvent(&Ccb->WriteEvent);
968 break;
969 }
970 }
971 else
972 {
973 DPRINT1("Unhandled Pipe Type Mode and Read Write Mode!\n");
974 ASSERT(FALSE);
975 }
976 }
977
978 ExReleaseFastMutex(&ReaderCcb->DataListLock);
979
980 done:
981 Irp->IoStatus.Status = Status;
982 Irp->IoStatus.Information = Information;
983
984 IoCompleteRequest(Irp, IO_NO_INCREMENT);
985
986 DPRINT("NpfsWrite done (Status %lx)\n", Status);
987
988 return Status;
989 }
990
991 /* EOF */