- Guarded the calls to IoSetCancelRoutine with IoAcquireCancelSpinLock/IoReleaseCance...
[reactos.git] / reactos / drivers / fs / np / rw.c
1 /* $Id$
2 *
3 * COPYRIGHT: See COPYING in the top level directory
4 * PROJECT: ReactOS kernel
5 * FILE: drivers/fs/np/rw.c
6 * PURPOSE: Named pipe filesystem
7 * PROGRAMMER: David Welch <welch@cwcom.net>
8 */
9
10 /* INCLUDES ******************************************************************/
11
12 #include <ddk/ntddk.h>
13 #include <rosrtl/minmax.h>
14 #include "npfs.h"
15
16 #define NDEBUG
17 #include <debug.h>
18
19 /* FUNCTIONS *****************************************************************/
20
21 #ifndef NDEBUG
22 VOID HexDump(PUCHAR Buffer, ULONG Length)
23 {
24 CHAR Line[65];
25 UCHAR ch;
26 const char Hex[] = "0123456789ABCDEF";
27 int i, j;
28
29 DbgPrint("---------------\n");
30
31 for (i = 0; i < ROUND_UP(Length, 16); i+= 16)
32 {
33 memset(Line, ' ', 64);
34 Line[64] = 0;
35
36 for (j = 0; j < 16 && j + i < Length; j++)
37 {
38 ch = Buffer[i + j];
39 Line[3*j + 0] = Hex[ch >> 4];
40 Line[3*j + 1] = Hex[ch & 0x0f];
41 Line[48 + j] = isprint(ch) ? ch : '.';
42 }
43 DbgPrint("%s\n", Line);
44 }
45 DbgPrint("---------------\n");
46 }
47 #endif
48
49 static NTSTATUS
50 NpfsReadFromPipe(PNPFS_CONTEXT Context);
51
52 static VOID STDCALL
53 NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
54 IN PIRP Irp)
55 {
56 PNPFS_CONTEXT Context;
57 PNPFS_DEVICE_EXTENSION DeviceExt;
58
59 DPRINT1("NpfsWaitingCancelRoutine() called\n");
60
61 IoReleaseCancelSpinLock(Irp->CancelIrql);
62
63 Context = Irp->Tail.Overlay.DriverContext[0];
64 DeviceExt = Context->DeviceObject->DeviceExtension;
65
66 KeLockMutex(&DeviceExt->PipeListLock);
67 KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
68 KeUnlockMutex(&DeviceExt->PipeListLock);
69 }
70
71 static VOID STDCALL
72 NpfsWaiterThread(PVOID Context)
73 {
74 PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
75 ULONG CurrentCount, Count = 0;
76 PNPFS_CONTEXT WaitContext = NULL;
77 NTSTATUS Status;
78 BOOLEAN Terminate = FALSE;
79 BOOLEAN Cancel = FALSE;
80 KIRQL oldIrql;
81
82 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
83
84 while (1)
85 {
86 CurrentCount = ThreadContext->Count;
87 KeResetEvent(&ThreadContext->Event);
88 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
89 if (WaitContext)
90 {
91 if (Cancel)
92 {
93 WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
94 WaitContext->Irp->IoStatus.Information = 0;
95 IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
96 ExFreePool(WaitContext);
97 }
98 else
99 {
100 switch (WaitContext->MajorFunction)
101 {
102 case IRP_MJ_READ:
103 NpfsReadFromPipe(WaitContext);
104 break;
105 default:
106 KEBUGCHECK(0);
107 }
108 }
109 }
110 if (Terminate)
111 {
112 break;
113 }
114 Status = KeWaitForMultipleObjects(CurrentCount,
115 ThreadContext->WaitObjectArray,
116 WaitAny,
117 Executive,
118 KernelMode,
119 FALSE,
120 NULL,
121 ThreadContext->WaitBlockArray);
122 KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
123 if (!NT_SUCCESS(Status))
124 {
125 KEBUGCHECK(0);
126 }
127 Count = Status - STATUS_SUCCESS;
128 ASSERT (Count <= CurrentCount);
129 if (Count > 0)
130 {
131 WaitContext = ThreadContext->WaitContextArray[Count];
132 ThreadContext->Count--;
133 ThreadContext->DeviceExt->EmptyWaiterCount++;
134 ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
135 ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count];
136 IoAcquireCancelSpinLock(&oldIrql);
137 Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
138 IoReleaseCancelSpinLock(oldIrql);
139 }
140 else
141 {
142 /* someone has add a new wait request */
143 WaitContext = NULL;
144 }
145 if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
146 {
147 /* it exist an other thread with empty wait slots, we can remove our thread from the list */
148 RemoveEntryList(&ThreadContext->ListEntry);
149 ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
150 Terminate = TRUE;
151 }
152 }
153 KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
154 ExFreePool(ThreadContext);
155 }
156
157 static NTSTATUS
158 NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb)
159 {
160 PLIST_ENTRY ListEntry;
161 PNPFS_THREAD_CONTEXT ThreadContext;
162 NTSTATUS Status;
163 HANDLE hThread;
164 KIRQL oldIrql;
165
166 KeLockMutex(&DeviceExt->PipeListLock);
167
168 ListEntry = DeviceExt->ThreadListHead.Flink;
169 while (ListEntry != &DeviceExt->ThreadListHead)
170 {
171 ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
172 if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
173 {
174 break;
175 }
176 ListEntry = ListEntry->Flink;
177 }
178 if (ListEntry == &DeviceExt->ThreadListHead)
179 {
180 ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
181 if (ThreadContext == NULL)
182 {
183 KeUnlockMutex(&DeviceExt->PipeListLock);
184 return STATUS_NO_MEMORY;
185 }
186 ThreadContext->DeviceExt = DeviceExt;
187 KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE);
188 ThreadContext->Count = 1;
189 ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
190
191
192 DPRINT("Creating a new system thread for waiting read requests\n");
193
194 Status = PsCreateSystemThread(&hThread,
195 THREAD_ALL_ACCESS,
196 NULL,
197 NULL,
198 NULL,
199 NpfsWaiterThread,
200 (PVOID)ThreadContext);
201 if (!NT_SUCCESS(Status))
202 {
203 ExFreePool(ThreadContext);
204 KeUnlockMutex(&DeviceExt->PipeListLock);
205 return Status;
206 }
207 InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
208 DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
209 }
210 IoMarkIrpPending(Context->Irp);
211 Context->Irp->Tail.Overlay.DriverContext[0] = Context;
212
213 IoAcquireCancelSpinLock(&oldIrql);
214 if (Context->Irp->Cancel)
215 {
216 IoReleaseCancelSpinLock(oldIrql);
217 Status = STATUS_CANCELLED;
218 }
219 else
220 {
221 IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
222 IoReleaseCancelSpinLock(oldIrql);
223 ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event;
224 ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
225 ThreadContext->Count++;
226 DeviceExt->EmptyWaiterCount--;
227 KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
228 Status = STATUS_SUCCESS;
229 }
230 KeUnlockMutex(&DeviceExt->PipeListLock);
231 return Status;
232 }
233
234 static NTSTATUS
235 NpfsReadFromPipe(PNPFS_CONTEXT Context)
236 {
237 PIO_STACK_LOCATION IoStack;
238 PFILE_OBJECT FileObject;
239 NTSTATUS Status;
240 ULONG Information;
241 PNPFS_FCB Fcb;
242 PNPFS_FCB WriterFcb;
243 PNPFS_PIPE Pipe;
244 ULONG Length;
245 PVOID Buffer;
246 ULONG CopyLength;
247 ULONG TempLength;
248
249 DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
250
251 IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
252 FileObject = IoStack->FileObject;
253 Fcb = FileObject->FsContext;
254 Pipe = Fcb->Pipe;
255 WriterFcb = Fcb->OtherSide;
256
257 if (Fcb->Data == NULL)
258 {
259 DPRINT("Pipe is NOT readable!\n");
260 Status = STATUS_UNSUCCESSFUL;
261 Information = 0;
262 goto done;
263 }
264
265 Status = STATUS_SUCCESS;
266 Length = IoStack->Parameters.Read.Length;
267 Information = 0;
268
269 Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
270 ExAcquireFastMutex(&Fcb->DataListLock);
271 while (1)
272 {
273 if (Fcb->ReadDataAvailable == 0)
274 {
275 if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
276 {
277 KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
278 }
279 ExReleaseFastMutex(&Fcb->DataListLock);
280 if (Information > 0)
281 {
282 Status = STATUS_SUCCESS;
283 goto done;
284 }
285
286 if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE &&
287 !(Fcb->PipeState == FILE_PIPE_LISTENING_STATE && Fcb->PipeEnd == FILE_PIPE_SERVER_END))
288 {
289 DPRINT("PipeState: %x\n", Fcb->PipeState);
290 Status = STATUS_PIPE_BROKEN;
291 goto done;
292 }
293
294 if (IoIsOperationSynchronous(Context->Irp))
295 {
296 /* Wait for ReadEvent to become signaled */
297 DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
298 Status = KeWaitForSingleObject(&Fcb->Event,
299 UserRequest,
300 KernelMode,
301 FALSE,
302 NULL);
303 DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
304 }
305 else
306 {
307 PNPFS_CONTEXT NewContext;
308
309 NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT));
310 if (NewContext == NULL)
311 {
312 Status = STATUS_NO_MEMORY;
313 goto done;
314 }
315 memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
316 NewContext->AllocatedFromPool = TRUE;
317 NewContext->Fcb = Fcb;
318 NewContext->MajorFunction = IRP_MJ_READ;
319
320 Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb);
321
322 if (NT_SUCCESS(Status))
323 {
324 Status = STATUS_PENDING;
325 }
326 else
327 {
328 ExFreePool(NewContext);
329 }
330 goto done;
331 }
332
333 ExAcquireFastMutex(&Fcb->DataListLock);
334 }
335
336 if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
337 {
338 DPRINT("Byte stream mode\n");
339 /* Byte stream mode */
340 while (Length > 0 && Fcb->ReadDataAvailable > 0)
341 {
342 CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
343 if (Fcb->ReadPtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength)
344 {
345 memcpy(Buffer, Fcb->ReadPtr, CopyLength);
346 Fcb->ReadPtr += CopyLength;
347 if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
348 {
349 Fcb->ReadPtr = Fcb->Data;
350 }
351 }
352 else
353 {
354 TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr;
355 memcpy(Buffer, Fcb->ReadPtr, TempLength);
356 memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength);
357 Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
358 }
359
360 Buffer += CopyLength;
361 Length -= CopyLength;
362 Information += CopyLength;
363
364 Fcb->ReadDataAvailable -= CopyLength;
365 Fcb->WriteQuotaAvailable += CopyLength;
366 }
367
368 if (Length == 0)
369 {
370 if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
371 {
372 KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
373 }
374 KeResetEvent(&Fcb->Event);
375 break;
376 }
377 }
378 else
379 {
380 DPRINT("Message mode\n");
381
382 /* Message mode */
383 if (Fcb->ReadDataAvailable)
384 {
385 /* Truncate the message if the receive buffer is too small */
386 CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
387 memcpy(Buffer, Fcb->Data, CopyLength);
388
389 #ifndef NDEBUG
390 DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
391 HexDump((PUCHAR)Buffer, CopyLength);
392 #endif
393
394 Information = CopyLength;
395
396 if (Fcb->ReadDataAvailable > Length)
397 {
398 memmove(Fcb->Data, Fcb->Data + Length,
399 Fcb->ReadDataAvailable - Length);
400 Fcb->ReadDataAvailable -= Length;
401 Status = STATUS_MORE_ENTRIES;
402 }
403 else
404 {
405 KeResetEvent(&Fcb->Event);
406 if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
407 {
408 KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
409 }
410 Fcb->ReadDataAvailable = 0;
411 Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
412 }
413 }
414
415 if (Information > 0)
416 {
417 break;
418 }
419 }
420 }
421
422 ExReleaseFastMutex(&Fcb->DataListLock);
423
424 done:
425 Context->Irp->IoStatus.Status = Status;
426 Context->Irp->IoStatus.Information = Information;
427
428 if (Status != STATUS_PENDING)
429 {
430 IoCompleteRequest(Context->Irp, IO_NO_INCREMENT);
431 }
432
433 if (Context->AllocatedFromPool)
434 {
435 ExFreePool(Context);
436 }
437 DPRINT("NpfsRead done (Status %lx)\n", Status);
438
439 return Status;
440 }
441
442 NTSTATUS STDCALL
443 NpfsRead(PDEVICE_OBJECT DeviceObject,
444 PIRP Irp)
445 {
446 NPFS_CONTEXT Context;
447
448 Context.AllocatedFromPool = FALSE;
449 Context.DeviceObject = DeviceObject;
450 Context.Irp = Irp;
451
452 if (Irp->MdlAddress == NULL)
453 {
454 DPRINT("Irp->MdlAddress == NULL\n");
455 Irp->IoStatus.Status = STATUS_UNSUCCESSFUL;
456 Irp->IoStatus.Information = 0;
457 IoCompleteRequest(Irp, IO_NO_INCREMENT);
458 return STATUS_UNSUCCESSFUL;
459 }
460
461 return NpfsReadFromPipe(&Context);
462 }
463
464 NTSTATUS STDCALL
465 NpfsWrite(PDEVICE_OBJECT DeviceObject,
466 PIRP Irp)
467 {
468 PIO_STACK_LOCATION IoStack;
469 PFILE_OBJECT FileObject;
470 PNPFS_FCB Fcb = NULL;
471 PNPFS_FCB ReaderFcb;
472 PNPFS_PIPE Pipe = NULL;
473 PUCHAR Buffer;
474 NTSTATUS Status = STATUS_SUCCESS;
475 ULONG Length;
476 ULONG Offset;
477 ULONG Information;
478 ULONG CopyLength;
479 ULONG TempLength;
480
481 DPRINT("NpfsWrite()\n");
482
483 IoStack = IoGetCurrentIrpStackLocation(Irp);
484 FileObject = IoStack->FileObject;
485 DPRINT("FileObject %p\n", FileObject);
486 DPRINT("Pipe name %wZ\n", &FileObject->FileName);
487
488 Fcb = FileObject->FsContext;
489 ReaderFcb = Fcb->OtherSide;
490 Pipe = Fcb->Pipe;
491
492 Length = IoStack->Parameters.Write.Length;
493 Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
494 Information = 0;
495
496 if (Irp->MdlAddress == NULL)
497 {
498 DPRINT("Irp->MdlAddress == NULL\n");
499 Status = STATUS_UNSUCCESSFUL;
500 Length = 0;
501 goto done;
502 }
503
504 if (ReaderFcb == NULL)
505 {
506 DPRINT("Pipe is NOT connected!\n");
507 if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
508 Status = STATUS_PIPE_LISTENING;
509 else if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
510 Status = STATUS_PIPE_DISCONNECTED;
511 else
512 Status = STATUS_UNSUCCESSFUL;
513 Length = 0;
514 goto done;
515 }
516
517 if (ReaderFcb->Data == NULL)
518 {
519 DPRINT("Pipe is NOT writable!\n");
520 Status = STATUS_UNSUCCESSFUL;
521 Length = 0;
522 goto done;
523 }
524
525 Status = STATUS_SUCCESS;
526 Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
527
528 ExAcquireFastMutex(&ReaderFcb->DataListLock);
529 #ifndef NDEBUG
530 DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
531 HexDump(Buffer, Length);
532 #endif
533
534 while(1)
535 {
536 if (ReaderFcb->WriteQuotaAvailable == 0)
537 {
538 KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
539 ExReleaseFastMutex(&ReaderFcb->DataListLock);
540 if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
541 {
542 Status = STATUS_PIPE_BROKEN;
543 goto done;
544 }
545
546 DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer);
547 Status = KeWaitForSingleObject(&Fcb->Event,
548 UserRequest,
549 KernelMode,
550 FALSE,
551 NULL);
552 DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
553
554 /*
555 * It's possible that the event was signaled because the
556 * other side of pipe was closed.
557 */
558 if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
559 {
560 DPRINT("PipeState: %x\n", Fcb->PipeState);
561 Status = STATUS_PIPE_BROKEN;
562 goto done;
563 }
564 ExAcquireFastMutex(&ReaderFcb->DataListLock);
565 }
566
567 if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
568 {
569 DPRINT("Byte stream mode\n");
570 while (Length > 0 && ReaderFcb->WriteQuotaAvailable > 0)
571 {
572 CopyLength = RtlRosMin(Length, ReaderFcb->WriteQuotaAvailable);
573 if (ReaderFcb->WritePtr + CopyLength <= ReaderFcb->Data + ReaderFcb->MaxDataLength)
574 {
575 memcpy(ReaderFcb->WritePtr, Buffer, CopyLength);
576 ReaderFcb->WritePtr += CopyLength;
577 if (ReaderFcb->WritePtr == ReaderFcb->Data + ReaderFcb->MaxDataLength)
578 {
579 ReaderFcb->WritePtr = ReaderFcb->Data;
580 }
581 }
582 else
583 {
584 TempLength = ReaderFcb->Data + ReaderFcb->MaxDataLength - ReaderFcb->WritePtr;
585 memcpy(ReaderFcb->WritePtr, Buffer, TempLength);
586 memcpy(ReaderFcb->Data, Buffer + TempLength, CopyLength - TempLength);
587 ReaderFcb->WritePtr = ReaderFcb->Data + CopyLength - TempLength;
588 }
589
590 Buffer += CopyLength;
591 Length -= CopyLength;
592 Information += CopyLength;
593
594 ReaderFcb->ReadDataAvailable += CopyLength;
595 ReaderFcb->WriteQuotaAvailable -= CopyLength;
596 }
597
598 if (Length == 0)
599 {
600 KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
601 KeResetEvent(&Fcb->Event);
602 break;
603 }
604 }
605 else
606 {
607 DPRINT("Message mode\n");
608 if (Length > 0)
609 {
610 CopyLength = RtlRosMin(Length, ReaderFcb->WriteQuotaAvailable);
611 memcpy(ReaderFcb->Data, Buffer, CopyLength);
612
613 Information = CopyLength;
614 ReaderFcb->ReadDataAvailable = CopyLength;
615 ReaderFcb->WriteQuotaAvailable = 0;
616 }
617
618 if (Information > 0)
619 {
620 KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
621 KeResetEvent(&Fcb->Event);
622 break;
623 }
624 }
625 }
626
627 ExReleaseFastMutex(&ReaderFcb->DataListLock);
628
629 done:
630 Irp->IoStatus.Status = Status;
631 Irp->IoStatus.Information = Information;
632
633 IoCompleteRequest(Irp, IO_NO_INCREMENT);
634
635 DPRINT("NpfsWrite done (Status %lx)\n", Status);
636
637 return Status;
638 }
639
640 /* EOF */