[NTOSKRNL]
[reactos.git] / reactos / drivers / filesystems / npfs / rw.c
1 /*
2 * COPYRIGHT: See COPYING in the top level directory
3 * PROJECT: ReactOS kernel
4 * FILE: drivers/fs/np/rw.c
5 * PURPOSE: Named pipe filesystem
6 * PROGRAMMER: David Welch <welch@cwcom.net>
7 * Michael Martin
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include "npfs.h"
13
14 #define NDEBUG
15 #include <debug.h>
16
17 /* FUNCTIONS *****************************************************************/
18
19 VOID HexDump(PUCHAR Buffer, ULONG Length)
20 {
21 CHAR Line[65];
22 UCHAR ch;
23 const char Hex[] = "0123456789ABCDEF";
24 ULONG i, j;
25
26 DbgPrint("---------------\n");
27
28 for (i = 0; i < Length; i+= 16)
29 {
30 memset(Line, ' ', 64);
31 Line[64] = 0;
32
33 for (j = 0; j < 16 && j + i < Length; j++)
34 {
35 ch = Buffer[i + j];
36 Line[3*j + 0] = Hex[ch >> 4];
37 Line[3*j + 1] = Hex[ch & 0x0f];
38 Line[48 + j] = isprint(ch) ? ch : '.';
39 }
40 DbgPrint("%s\n", Line);
41 }
42 DbgPrint("---------------\n");
43 }
44
45 static DRIVER_CANCEL NpfsReadWriteCancelRoutine;
46 static VOID NTAPI
47 NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
48 IN PIRP Irp)
49 {
50 PNPFS_CONTEXT Context;
51 PIO_STACK_LOCATION IoStack;
52 PNPFS_VCB Vcb;
53 PNPFS_CCB Ccb;
54 PLIST_ENTRY ListEntry;
55 PNPFS_THREAD_CONTEXT ThreadContext;
56 ULONG i;
57
58 DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
59
60 IoReleaseCancelSpinLock(Irp->CancelIrql);
61
62 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
63 Vcb = (PNPFS_VCB)DeviceObject->DeviceExtension;
64 IoStack = IoGetCurrentIrpStackLocation(Irp);
65 Ccb = IoStack->FileObject->FsContext2;
66
67 KeLockMutex(&Vcb->PipeListLock);
68 ExAcquireFastMutex(&Ccb->DataListLock);
69 switch(IoStack->MajorFunction)
70 {
71 case IRP_MJ_READ:
72 ListEntry = Vcb->ThreadListHead.Flink;
73 while (ListEntry != &Vcb->ThreadListHead)
74 {
75 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
76 /* Real events start at index 1 */
77 for (i = 1; i < ThreadContext->Count; i++)
78 {
79 if (ThreadContext->WaitIrpArray[i] == Irp)
80 {
81 ASSERT(ThreadContext->WaitObjectArray[i] == Context->WaitEvent);
82
83 ThreadContext->WaitIrpArray[i] = NULL;
84
85 RemoveEntryList(&Context->ListEntry);
86
87 Irp->IoStatus.Status = STATUS_CANCELLED;
88 Irp->IoStatus.Information = 0;
89
90 IoCompleteRequest(Irp, IO_NO_INCREMENT);
91
92 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
93
94 ExReleaseFastMutex(&Ccb->DataListLock);
95 KeUnlockMutex(&Vcb->PipeListLock);
96
97 return;
98 }
99 }
100 ListEntry = ListEntry->Flink;
101 }
102
103 RemoveEntryList(&Context->ListEntry);
104
105 ExReleaseFastMutex(&Ccb->DataListLock);
106 KeUnlockMutex(&Vcb->PipeListLock);
107
108 Irp->IoStatus.Status = STATUS_CANCELLED;
109 Irp->IoStatus.Information = 0;
110
111 IoCompleteRequest(Irp, IO_NO_INCREMENT);
112 break;
113 default:
114 ASSERT(FALSE);
115 }
116 }
117
118 static VOID NTAPI
119 NpfsWaiterThread(PVOID InitContext)
120 {
121 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
122 ULONG CurrentCount;
123 ULONG Count = 0, i;
124 PIRP Irp = NULL;
125 NTSTATUS Status;
126 PIO_STACK_LOCATION IoStack = NULL;
127 KIRQL OldIrql;
128
129 KeLockMutex(&ThreadContext->Vcb->PipeListLock);
130
131 while (1)
132 {
133 CurrentCount = ThreadContext->Count;
134 KeUnlockMutex(&ThreadContext->Vcb->PipeListLock);
135 IoAcquireCancelSpinLock(&OldIrql);
136 if (Irp && IoSetCancelRoutine(Irp, NULL) != NULL)
137 {
138 IoReleaseCancelSpinLock(OldIrql);
139 IoStack = IoGetCurrentIrpStackLocation(Irp);
140 switch (IoStack->MajorFunction)
141 {
142 case IRP_MJ_READ:
143 NpfsRead(IoStack->DeviceObject, Irp);
144 break;
145 default:
146 ASSERT(FALSE);
147 }
148 }
149 else
150 {
151 IoReleaseCancelSpinLock(OldIrql);
152 }
153 Status = KeWaitForMultipleObjects(CurrentCount,
154 ThreadContext->WaitObjectArray,
155 WaitAny,
156 Executive,
157 KernelMode,
158 FALSE,
159 NULL,
160 ThreadContext->WaitBlockArray);
161 if (!NT_SUCCESS(Status))
162 {
163 ASSERT(FALSE);
164 }
165 KeLockMutex(&ThreadContext->Vcb->PipeListLock);
166 Count = Status - STATUS_WAIT_0;
167 ASSERT (Count < CurrentCount);
168 if (Count > 0)
169 {
170 Irp = ThreadContext->WaitIrpArray[Count];
171 ThreadContext->Count--;
172 ThreadContext->Vcb->EmptyWaiterCount++;
173 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
174 ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
175 }
176 else
177 {
178 /* someone has add a new wait request or cancelled an old one */
179 Irp = NULL;
180
181 /* Look for cancelled requests */
182 for (i = 1; i < ThreadContext->Count; i++)
183 {
184 if (ThreadContext->WaitIrpArray[i] == NULL)
185 {
186 ThreadContext->Count--;
187 ThreadContext->Vcb->EmptyWaiterCount++;
188 ThreadContext->WaitObjectArray[i] = ThreadContext->WaitObjectArray[ThreadContext->Count];
189 ThreadContext->WaitIrpArray[i] = ThreadContext->WaitIrpArray[ThreadContext->Count];
190 }
191 }
192 }
193 if (ThreadContext->Count == 1 && ThreadContext->Vcb->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
194 {
195 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
196 RemoveEntryList(&ThreadContext->ListEntry);
197 ExFreePoolWithTag(ThreadContext, TAG_NPFS_THREAD_CONTEXT);
198 KeUnlockMutex(&ThreadContext->Vcb->PipeListLock);
199 break;
200 }
201 }
202 }
203
204 static NTSTATUS
205 NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
206 IN PIRP Irp)
207 {
208 PLIST_ENTRY ListEntry;
209 PNPFS_THREAD_CONTEXT ThreadContext = NULL;
210 PNPFS_CONTEXT Context;
211 HANDLE hThread;
212 PNPFS_VCB Vcb;
213 KIRQL oldIrql;
214 NTSTATUS Status;
215
216 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
217 Vcb = (PNPFS_VCB)DeviceObject->DeviceExtension;
218
219 DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
220
221 KeLockMutex(&Vcb->PipeListLock);
222
223 ListEntry = Vcb->ThreadListHead.Flink;
224 while (ListEntry != &Vcb->ThreadListHead)
225 {
226 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
227 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
228 {
229 break;
230 }
231 ListEntry = ListEntry->Flink;
232 }
233
234 if (ListEntry == &Vcb->ThreadListHead)
235 {
236 ThreadContext = ExAllocatePoolWithTag(NonPagedPool,
237 sizeof(NPFS_THREAD_CONTEXT),
238 TAG_NPFS_THREAD_CONTEXT);
239 if (ThreadContext == NULL)
240 {
241 KeUnlockMutex(&Vcb->PipeListLock);
242 return STATUS_NO_MEMORY;
243 }
244
245 ThreadContext->Vcb = Vcb;
246 KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
247 ThreadContext->Count = 1;
248 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
249
250 DPRINT("Creating a new system thread for waiting read/write requests\n");
251
252 Status = PsCreateSystemThread(&hThread,
253 THREAD_ALL_ACCESS,
254 NULL,
255 NULL,
256 NULL,
257 NpfsWaiterThread,
258 (PVOID)ThreadContext);
259 if (!NT_SUCCESS(Status))
260 {
261 ExFreePoolWithTag(ThreadContext, TAG_NPFS_THREAD_CONTEXT);
262 KeUnlockMutex(&Vcb->PipeListLock);
263 return Status;
264 }
265
266 InsertHeadList(&Vcb->ThreadListHead, &ThreadContext->ListEntry);
267 Vcb->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
268 }
269 IoMarkIrpPending(Irp);
270
271 IoAcquireCancelSpinLock(&oldIrql);
272 if (Irp->Cancel)
273 {
274 IoReleaseCancelSpinLock(oldIrql);
275 Status = STATUS_CANCELLED;
276 }
277 else
278 {
279 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
280 IoReleaseCancelSpinLock(oldIrql);
281 ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
282 ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
283 ThreadContext->Count++;
284 Vcb->EmptyWaiterCount--;
285 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
286 Status = STATUS_SUCCESS;
287 }
288 KeUnlockMutex(&Vcb->PipeListLock);
289 return Status;
290 }
291
292 NTSTATUS NTAPI
293 NpfsRead(IN PDEVICE_OBJECT DeviceObject,
294 IN PIRP Irp)
295 {
296 PFILE_OBJECT FileObject;
297 NTSTATUS Status;
298 NTSTATUS OriginalStatus = STATUS_SUCCESS;
299 PNPFS_CCB Ccb;
300 PNPFS_CONTEXT Context;
301 KEVENT Event;
302 ULONG Length;
303 ULONG Information = 0;
304 ULONG CopyLength = 0;
305 ULONG TempLength;
306 BOOLEAN IsOriginalRequest = TRUE;
307 PVOID Buffer;
308
309 DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
310
311 FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
312 DPRINT("FileObject %p\n", FileObject);
313 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
314 Ccb = FileObject->FsContext2;
315
316 /* Fail, if the CCB is not a pipe CCB */
317 if (Ccb->Type != CCB_PIPE)
318 {
319 DPRINT("Not a pipe!\n");
320 Status = STATUS_INVALID_PARAMETER;
321 Irp->IoStatus.Information = 0;
322 goto done;
323 }
324
325 if (Irp->MdlAddress == NULL)
326 {
327 DPRINT("Irp->MdlAddress == NULL\n");
328 Status = STATUS_UNSUCCESSFUL;
329 Irp->IoStatus.Information = 0;
330 goto done;
331 }
332
333 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
334
335 if ((Ccb->OtherSide) && (Ccb->OtherSide->PipeState == FILE_PIPE_DISCONNECTED_STATE) && (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE))
336 {
337 DPRINT("Both Client and Server are disconnected!\n");
338 Status = STATUS_PIPE_DISCONNECTED;
339 Irp->IoStatus.Information = 0;
340 goto done;
341
342 }
343
344 if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
345 {
346 if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
347 Status = STATUS_PIPE_BROKEN;
348 else if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
349 Status = STATUS_PIPE_LISTENING;
350 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
351 Status = STATUS_PIPE_DISCONNECTED;
352 else
353 Status = STATUS_UNSUCCESSFUL;
354 Irp->IoStatus.Information = 0;
355 goto done;
356 }
357
358 if (Ccb->Data == NULL)
359 {
360 DPRINT("Pipe is NOT readable!\n");
361 Status = STATUS_UNSUCCESSFUL;
362 Irp->IoStatus.Information = 0;
363 goto done;
364 }
365
366 ExAcquireFastMutex(&Ccb->DataListLock);
367
368 if (IoIsOperationSynchronous(Irp))
369 {
370 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
371 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
372 {
373 KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
374 Context->WaitEvent = &Event;
375 ExReleaseFastMutex(&Ccb->DataListLock);
376 KeWaitForSingleObject(&Event,
377 Executive,
378 KernelMode,
379 FALSE,
380 NULL);
381 ExAcquireFastMutex(&Ccb->DataListLock);
382 }
383 Irp->IoStatus.Information = 0;
384 }
385 else
386 {
387 KIRQL oldIrql;
388 if (IsListEmpty(&Ccb->ReadRequestListHead) ||
389 Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
390 {
391 /* this is a new request */
392 Irp->IoStatus.Information = 0;
393 Context->WaitEvent = &Ccb->ReadEvent;
394 InsertTailList(&Ccb->ReadRequestListHead, &Context->ListEntry);
395 if (Ccb->ReadRequestListHead.Flink != &Context->ListEntry)
396 {
397 /* there was already a request on the list */
398 IoAcquireCancelSpinLock(&oldIrql);
399 if (Irp->Cancel)
400 {
401 IoReleaseCancelSpinLock(oldIrql);
402 RemoveEntryList(&Context->ListEntry);
403 ExReleaseFastMutex(&Ccb->DataListLock);
404 Status = STATUS_CANCELLED;
405 goto done;
406 }
407 (void)IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
408 IoReleaseCancelSpinLock(oldIrql);
409 ExReleaseFastMutex(&Ccb->DataListLock);
410 IoMarkIrpPending(Irp);
411 Status = STATUS_PENDING;
412 goto done;
413 }
414 }
415 }
416
417 while (1)
418 {
419 Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
420 Information = Irp->IoStatus.Information;
421 Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
422 ASSERT (Information <= Length);
423 Buffer = (PVOID)((ULONG_PTR)Buffer + Information);
424 Length -= Information;
425 Status = STATUS_SUCCESS;
426
427 while (1)
428 {
429 if (Ccb->ReadDataAvailable == 0)
430 {
431 ULONG ConnectionSideReadMode;
432
433 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
434 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
435
436 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
437 {
438 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
439 }
440 if (Information > 0 &&
441 (ConnectionSideReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
442 Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
443 {
444 break;
445 }
446 if (((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) || (!Ccb->OtherSide)) && (Ccb->ReadDataAvailable == 0))
447 {
448 DPRINT("PipeState: %x\n", Ccb->PipeState);
449 Status = STATUS_PIPE_BROKEN;
450 break;
451 }
452 ExReleaseFastMutex(&Ccb->DataListLock);
453
454 if (IoIsOperationSynchronous(Irp))
455 {
456 /* Wait for ReadEvent to become signaled */
457
458 DPRINT("Waiting for readable data (%wZ)\n", &Ccb->Fcb->PipeName);
459 Status = KeWaitForSingleObject(&Ccb->ReadEvent,
460 UserRequest,
461 Irp->RequestorMode,
462 (FileObject->Flags & FO_ALERTABLE_IO) != 0,
463 NULL);
464 DPRINT("Finished waiting (%wZ)! Status: %lx\n", &Ccb->Fcb->PipeName, Status);
465
466 ExAcquireFastMutex(&Ccb->DataListLock);
467
468 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC) || (Status == STATUS_ALERTED))
469 {
470 Status = STATUS_CANCELLED;
471 break;
472 }
473 if (!NT_SUCCESS(Status))
474 {
475 ASSERT(FALSE);
476 }
477 }
478 else
479 {
480 Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
481
482 Context->WaitEvent = &Ccb->ReadEvent;
483 Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
484
485 if (NT_SUCCESS(Status))
486 {
487 Status = STATUS_PENDING;
488 goto done;
489 }
490 ExAcquireFastMutex(&Ccb->DataListLock);
491 break;
492 }
493 }
494 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
495
496 /* If the pipe type and read mode are both byte stream */
497 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
498 {
499 DPRINT("Byte stream mode: Ccb->Data %x\n", Ccb->Data);
500 /* Byte stream mode */
501 while (Length > 0 && Ccb->ReadDataAvailable > 0)
502 {
503 CopyLength = min(Ccb->ReadDataAvailable, Length);
504 if ((ULONG_PTR)Ccb->ReadPtr + CopyLength <= (ULONG_PTR)Ccb->Data + Ccb->MaxDataLength)
505 {
506 memcpy(Buffer, Ccb->ReadPtr, CopyLength);
507 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
508 if (Ccb->ReadPtr == (PVOID)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength))
509 {
510 Ccb->ReadPtr = Ccb->Data;
511 }
512 }
513 else
514 {
515 TempLength = (ULONG)((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength - (ULONG_PTR)Ccb->ReadPtr);
516 memcpy(Buffer, Ccb->ReadPtr, TempLength);
517 memcpy((PVOID)((ULONG_PTR)Buffer + TempLength), Ccb->Data, CopyLength - TempLength);
518 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->Data + CopyLength - TempLength);
519 }
520
521 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
522 Length -= CopyLength;
523 Information += CopyLength;
524
525 Ccb->ReadDataAvailable -= CopyLength;
526 Ccb->WriteQuotaAvailable += CopyLength;
527 }
528
529 if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
530 {
531 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->OtherSide))
532 {
533 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
534 }
535 KeResetEvent(&Ccb->ReadEvent);
536 break;
537 }
538 }
539 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
540 {
541 DPRINT("Message mode: Ccb>Data %x\n", Ccb->Data);
542
543 /* Check if buffer is full and the read pointer is not at the start of the buffer */
544 if ((Ccb->WriteQuotaAvailable == 0) && (Ccb->ReadPtr > Ccb->Data))
545 {
546 Ccb->WriteQuotaAvailable += (ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data;
547 memcpy(Ccb->Data, Ccb->ReadPtr, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->ReadPtr);
548 Ccb->WritePtr = (PVOID)((ULONG_PTR)Ccb->WritePtr - ((ULONG_PTR)Ccb->ReadPtr - (ULONG_PTR)Ccb->Data));
549 Ccb->ReadPtr = Ccb->Data;
550 ASSERT((ULONG_PTR)Ccb->WritePtr < ((ULONG_PTR)Ccb->Data + Ccb->MaxDataLength));
551 ASSERT(Ccb->WritePtr >= Ccb->Data);
552 }
553
554 /* For Message mode, the Message length is stored in the buffer preceeding the Message. */
555 if (Ccb->ReadDataAvailable)
556 {
557 ULONG NextMessageLength = 0;
558
559 /*First get the size of the message */
560 memcpy(&NextMessageLength, Ccb->ReadPtr, sizeof(NextMessageLength));
561
562 if ((NextMessageLength == 0) || (NextMessageLength > Ccb->ReadDataAvailable))
563 {
564 DPRINT1("Possible memory corruption.\n");
565 HexDump(Ccb->Data, (ULONG_PTR)Ccb->WritePtr - (ULONG_PTR)Ccb->Data);
566 ASSERT(FALSE);
567 }
568
569 /* Use the smaller value */
570 CopyLength = min(NextMessageLength, Length);
571 ASSERT(CopyLength > 0);
572 ASSERT(CopyLength <= Ccb->ReadDataAvailable);
573 /* retrieve the message from the buffer */
574 memcpy(Buffer, (PVOID)((ULONG_PTR)Ccb->ReadPtr + sizeof(NextMessageLength)), CopyLength);
575
576 if (Ccb->ReadDataAvailable > CopyLength)
577 {
578 if (CopyLength < NextMessageLength)
579 /* Client only requested part of the message */
580 {
581 /* Calculate the remaining message new size */
582 ULONG NewMessageSize = NextMessageLength-CopyLength;
583
584 /* Update ReadPtr to point to new Message size location */
585 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength);
586
587 /* Write a new Message size to buffer for the part of the message still there */
588 memcpy(Ccb->ReadPtr, &NewMessageSize, sizeof(NewMessageSize));
589 }
590 else
591 /* Client wanted the entire message */
592 {
593 /* Update ReadPtr to point to next message size */
594 Ccb->ReadPtr = (PVOID)((ULONG_PTR)Ccb->ReadPtr + CopyLength + sizeof(CopyLength));
595 }
596 }
597 else
598 {
599 /* This was the last Message, so just zero start of buffer for safety sake */
600 memset(Ccb->Data, 0, NextMessageLength + sizeof(NextMessageLength));
601
602 /* Reset to MaxDataLength as partial message retrievals dont
603 give the length back to Quota */
604 Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
605
606 /* reset read and write pointer to beginning of buffer */
607 Ccb->WritePtr = Ccb->Data;
608 Ccb->ReadPtr = Ccb->Data;
609 }
610 #ifndef NDEBUG
611 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
612 HexDump((PUCHAR)Buffer, CopyLength);
613 #endif
614
615 Information += CopyLength;
616
617 Ccb->ReadDataAvailable -= CopyLength;
618
619 if ((ULONG)Ccb->WriteQuotaAvailable > (ULONG)Ccb->MaxDataLength) ASSERT(FALSE);
620 }
621
622 if (Information > 0)
623 {
624 ULONG ConnectionSideReadMode;
625
626 if (Ccb->PipeEnd == FILE_PIPE_CLIENT_END) ConnectionSideReadMode=Ccb->Fcb->ClientReadMode;
627 else ConnectionSideReadMode = Ccb->Fcb->ServerReadMode;
628
629 if ((ConnectionSideReadMode == FILE_PIPE_BYTE_STREAM_MODE) && (Ccb->ReadDataAvailable) && (Length > CopyLength))
630 {
631 Buffer = (PVOID)((ULONG_PTR)Buffer + CopyLength);
632 Length -= CopyLength;
633 }
634 else
635 {
636 KeResetEvent(&Ccb->ReadEvent);
637
638 if ((Ccb->PipeState == FILE_PIPE_CONNECTED_STATE) && (Ccb->WriteQuotaAvailable > 0) && (Ccb->OtherSide))
639 {
640 KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
641 }
642 break;
643 }
644 }
645 }
646 else
647 {
648 DPRINT1("Unhandled Pipe Mode!\n");
649 ASSERT(FALSE);
650 }
651 }
652 Irp->IoStatus.Information = Information;
653 Irp->IoStatus.Status = Status;
654
655 ASSERT(IoGetCurrentIrpStackLocation(Irp)->FileObject != NULL);
656
657 if (IoIsOperationSynchronous(Irp))
658 {
659 RemoveEntryList(&Context->ListEntry);
660 if (!IsListEmpty(&Ccb->ReadRequestListHead))
661 {
662 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
663 KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
664 }
665 ExReleaseFastMutex(&Ccb->DataListLock);
666 IoCompleteRequest(Irp, IO_NO_INCREMENT);
667
668 DPRINT("NpfsRead done (Status %lx)\n", Status);
669 return Status;
670 }
671 else
672 {
673 KIRQL oldIrql;
674
675 if (IsOriginalRequest)
676 {
677 IsOriginalRequest = FALSE;
678 OriginalStatus = Status;
679 }
680 if (Status == STATUS_PENDING)
681 {
682 ExReleaseFastMutex(&Ccb->DataListLock);
683 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
684 return OriginalStatus;
685 }
686 RemoveEntryList(&Context->ListEntry);
687 IoCompleteRequest(Irp, IO_NO_INCREMENT);
688 if (IsListEmpty(&Ccb->ReadRequestListHead))
689 {
690 ExReleaseFastMutex(&Ccb->DataListLock);
691 DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
692 return OriginalStatus;
693 }
694
695 IoAcquireCancelSpinLock(&oldIrql);
696 Context = CONTAINING_RECORD(Ccb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
697
698 Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
699 /* Verify the Irp wasnt cancelled */
700 if (Irp->Cancel)
701 {
702 IoReleaseCancelSpinLock(oldIrql);
703 RemoveEntryList(&Context->ListEntry);
704 ExReleaseFastMutex(&Ccb->DataListLock);
705 Status = STATUS_CANCELLED;
706 goto done;
707 }
708 /* The Irp will now be handled, so remove the CancelRoutine */
709 (void)IoSetCancelRoutine(Irp, NULL);
710 IoReleaseCancelSpinLock(oldIrql);
711 }
712 }
713
714 done:
715 Irp->IoStatus.Status = Status;
716
717 if (Status != STATUS_PENDING)
718 {
719 IoCompleteRequest(Irp, IO_NO_INCREMENT);
720 }
721 DPRINT("NpfsRead done (Status %lx)\n", Status);
722
723 return Status;
724 }
725
726 NTSTATUS NTAPI
727 NpfsWrite(PDEVICE_OBJECT DeviceObject,
728 PIRP Irp)
729 {
730 PIO_STACK_LOCATION IoStack;
731 PFILE_OBJECT FileObject;
732 PNPFS_FCB Fcb = NULL;
733 PNPFS_CCB Ccb = NULL;
734 PNPFS_CCB ReaderCcb;
735 PUCHAR Buffer;
736 NTSTATUS Status = STATUS_SUCCESS;
737 ULONG Length;
738 ULONG Offset;
739 ULONG Information = 0;
740 ULONG CopyLength;
741 ULONG TempLength;
742
743 DPRINT("NpfsWrite()\n");
744
745 IoStack = IoGetCurrentIrpStackLocation(Irp);
746 FileObject = IoStack->FileObject;
747 DPRINT("FileObject %p\n", FileObject);
748 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
749
750 Ccb = FileObject->FsContext2;
751
752 /* Fail, if the CCB is not a pipe CCB */
753 if (Ccb->Type != CCB_PIPE)
754 {
755 DPRINT("Not a pipe!\n");
756 Status = STATUS_INVALID_PARAMETER;
757 Length = 0;
758 goto done;
759 }
760
761 ReaderCcb = Ccb->OtherSide;
762 Fcb = Ccb->Fcb;
763
764 Length = IoStack->Parameters.Write.Length;
765 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
766
767 if (Irp->MdlAddress == NULL)
768 {
769 DPRINT("Irp->MdlAddress == NULL\n");
770 Status = STATUS_UNSUCCESSFUL;
771 Length = 0;
772 goto done;
773 }
774
775 if ((ReaderCcb == NULL) || (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE))
776 {
777 DPRINT("Pipe is NOT connected!\n");
778 if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
779 Status = STATUS_PIPE_LISTENING;
780 else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
781 Status = STATUS_PIPE_DISCONNECTED;
782 else
783 Status = STATUS_UNSUCCESSFUL;
784 Length = 0;
785 goto done;
786 }
787
788 if (ReaderCcb->Data == NULL)
789 {
790 DPRINT("Pipe is NOT writable!\n");
791 Status = STATUS_UNSUCCESSFUL;
792 Length = 0;
793 goto done;
794 }
795
796 Status = STATUS_SUCCESS;
797 Buffer = MmGetSystemAddressForMdlSafe (Irp->MdlAddress, NormalPagePriority);
798
799 if (!Buffer)
800 {
801 DPRINT("MmGetSystemAddressForMdlSafe failed\n");
802 Status = STATUS_INSUFFICIENT_RESOURCES;
803 Length = 0;
804 goto done;
805
806 }
807
808 ExAcquireFastMutex(&ReaderCcb->DataListLock);
809
810 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
811
812 #ifndef NDEBUG
813 HexDump(Buffer, Length);
814 #endif
815
816 while(1)
817 {
818 if (ReaderCcb->WriteQuotaAvailable == 0)
819 {
820 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
821 {
822 Status = STATUS_PIPE_BROKEN;
823 ExReleaseFastMutex(&ReaderCcb->DataListLock);
824 goto done;
825 }
826 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
827 ExReleaseFastMutex(&ReaderCcb->DataListLock);
828
829 DPRINT("Write Waiting for buffer space (%wZ)\n", &Fcb->PipeName);
830 Status = KeWaitForSingleObject(&Ccb->WriteEvent,
831 UserRequest,
832 Irp->RequestorMode,
833 (FileObject->Flags & FO_ALERTABLE_IO) != 0,
834 NULL);
835 DPRINT("Write Finished waiting (%wZ)! Status: %lx\n", &Fcb->PipeName, Status);
836
837 if ((Status == STATUS_USER_APC) || (Status == STATUS_KERNEL_APC) || (Status == STATUS_ALERTED))
838 {
839 Status = STATUS_CANCELLED;
840 goto done;
841 }
842 if (!NT_SUCCESS(Status))
843 {
844 ASSERT(FALSE);
845 }
846 /*
847 * It's possible that the event was signaled because the
848 * other side of pipe was closed.
849 */
850 if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE || !Ccb->OtherSide)
851 {
852 DPRINT("PipeState: %x\n", Ccb->PipeState);
853 Status = STATUS_PIPE_BROKEN;
854 goto done;
855 }
856 /* Check that the pipe has not been closed */
857 if (ReaderCcb->PipeState != FILE_PIPE_CONNECTED_STATE || !ReaderCcb->OtherSide)
858 {
859 Status = STATUS_PIPE_BROKEN;
860 goto done;
861 }
862 ExAcquireFastMutex(&ReaderCcb->DataListLock);
863 }
864
865 if (Ccb->Fcb->PipeType == FILE_PIPE_BYTE_STREAM_TYPE)
866 {
867 DPRINT("Byte stream mode: Ccb->Data %x, Ccb->WritePtr %x\n", ReaderCcb->Data, ReaderCcb->WritePtr);
868
869 while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
870 {
871 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
872
873 if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
874 {
875 memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
876 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + CopyLength);
877 if ((ULONG_PTR)ReaderCcb->WritePtr == (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
878 {
879 ReaderCcb->WritePtr = ReaderCcb->Data;
880 }
881 }
882 else
883 {
884
885 TempLength = (ULONG)((ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength -
886 (ULONG_PTR)ReaderCcb->WritePtr);
887
888 memcpy(ReaderCcb->WritePtr, Buffer, TempLength);
889 memcpy(ReaderCcb->Data, Buffer + TempLength, CopyLength - TempLength);
890 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->Data + CopyLength - TempLength);
891 }
892
893 Buffer += CopyLength;
894 Length -= CopyLength;
895 Information += CopyLength;
896
897 ReaderCcb->ReadDataAvailable += CopyLength;
898 ReaderCcb->WriteQuotaAvailable -= CopyLength;
899 }
900
901 if (Length == 0)
902 {
903 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
904 KeResetEvent(&Ccb->WriteEvent);
905 break;
906 }
907 }
908 else if (Ccb->Fcb->PipeType == FILE_PIPE_MESSAGE_TYPE)
909 {
910 /* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
911 DPRINT("Message mode: Ccb->Data %x, Ccb->WritePtr %x\n",ReaderCcb->Data, ReaderCcb->WritePtr);
912 if (Length > 0)
913 {
914 /* Verify the WritePtr is still inside the buffer */
915 if (((ULONG_PTR)ReaderCcb->WritePtr > ((ULONG_PTR)ReaderCcb->Data + (ULONG_PTR)ReaderCcb->MaxDataLength)) ||
916 ((ULONG_PTR)ReaderCcb->WritePtr < (ULONG_PTR)ReaderCcb->Data))
917 {
918 DPRINT1("NPFS is writing out of its buffer. Report to developer!\n");
919 DPRINT1("ReaderCcb->WritePtr %x, ReaderCcb->Data %x, ReaderCcb->MaxDataLength %lu\n",
920 ReaderCcb->WritePtr, ReaderCcb->Data, ReaderCcb->MaxDataLength);
921 ASSERT(FALSE);
922 }
923
924 CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable - sizeof(ULONG));
925 if (CopyLength > ReaderCcb->WriteQuotaAvailable)
926 {
927 DPRINT1("Writing %lu byte to pipe would overflow as only %lu bytes are available\n",
928 CopyLength, ReaderCcb->WriteQuotaAvailable);
929 ASSERT(FALSE);
930 }
931
932 /* First Copy the Length of the message into the pipes buffer */
933 memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
934
935 /* Now the user buffer itself */
936 memcpy((PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength)), Buffer, CopyLength);
937
938 /* Update the write pointer */
939 ReaderCcb->WritePtr = (PVOID)((ULONG_PTR)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
940
941 Information += CopyLength;
942
943 ReaderCcb->ReadDataAvailable += CopyLength;
944
945 ReaderCcb->WriteQuotaAvailable -= (CopyLength + sizeof(ULONG));
946
947 if ((ULONG_PTR)ReaderCcb->WriteQuotaAvailable > (ULONG)ReaderCcb->MaxDataLength)
948 {
949 DPRINT1("QuotaAvailable is greater than buffer size!\n");
950 ASSERT(FALSE);
951 }
952 }
953
954 if (Information > 0)
955 {
956 KeSetEvent(&ReaderCcb->ReadEvent, IO_NO_INCREMENT, FALSE);
957 KeResetEvent(&Ccb->WriteEvent);
958 break;
959 }
960 }
961 else
962 {
963 DPRINT1("Unhandled Pipe Type Mode and Read Write Mode!\n");
964 ASSERT(FALSE);
965 }
966 }
967
968 ExReleaseFastMutex(&ReaderCcb->DataListLock);
969
970 done:
971 Irp->IoStatus.Status = Status;
972 Irp->IoStatus.Information = Information;
973
974 IoCompleteRequest(Irp, IO_NO_INCREMENT);
975
976 DPRINT("NpfsWrite done (Status %lx)\n", Status);
977
978 return Status;
979 }
980
981 /* EOF */