Let oskit tell us when sending should be blocked 'cause its stuffed upto its
authorGé van Geldorp <ge@gse.nl>
Wed, 4 Jan 2006 22:40:48 +0000 (22:40 +0000)
committerGé van Geldorp <ge@gse.nl>
Wed, 4 Jan 2006 22:40:48 +0000 (22:40 +0000)
nose with data waiting to be sent. Bug 1232.

svn path=/trunk/; revision=20562

reactos/drivers/lib/ip/transport/tcp/event.c
reactos/drivers/lib/ip/transport/tcp/tcp.c
reactos/drivers/lib/oskittcp/oskittcp/interface.c
reactos/drivers/lib/oskittcp/oskittcp/sleep.c
reactos/drivers/net/afd/afd/write.c
reactos/drivers/net/tcpip/include/tcp.h
reactos/drivers/net/tcpip/include/titypes.h
reactos/drivers/net/tcpip/tcpip/dispatch.c

index a30b3a0..39de114 100644 (file)
@@ -121,6 +121,11 @@ int TCPSleep( void *ClientData, void *token, int priority, char *msg,
        KeInitializeEvent( &SleepingThread->Event, NotificationEvent, FALSE );
        SleepingThread->SleepToken = token;
 
+       /* We're going to sleep and need to release the lock, otherwise
+           it's impossible to re-enter oskittcp to deliver the event that's
+           going to wake us */
+       TcpipRecursiveMutexLeave( &TCPLock );
+
        TcpipAcquireFastMutex( &SleepingThreadsLock );
        InsertTailList( &SleepingThreadsList, &SleepingThread->Entry );
        TcpipReleaseFastMutex( &SleepingThreadsLock );
@@ -136,6 +141,8 @@ int TCPSleep( void *ClientData, void *token, int priority, char *msg,
        RemoveEntryList( &SleepingThread->Entry );
        TcpipReleaseFastMutex( &SleepingThreadsLock );
 
+       TcpipRecursiveMutexEnter( &TCPLock, TRUE );
+
        PoolFreeBuffer( SleepingThread );
     }
     TI_DbgPrint(DEBUG_TCP,("Waiting finished: %x\n", token));
index 70d6d45..db3fb67 100644 (file)
@@ -153,6 +153,63 @@ static VOID HandleSignalledConnection( PCONNECTION_ENDPOINT Connection,
            }
        }
     }
+    if( NewState & SEL_WRITE ) {
+       TI_DbgPrint(DEBUG_TCP,("Writeable: irp list %s\n",
+                              IsListEmpty(&Connection->ReceiveRequest) ?
+                              "empty" : "nonempty"));
+
+       while( !IsListEmpty( &Connection->SendRequest ) ) {
+           OSK_UINT SendLen = 0, Sent = 0;
+           OSK_PCHAR SendBuffer = 0;
+
+           Entry = RemoveHeadList( &Connection->SendRequest );
+           Bucket = CONTAINING_RECORD( Entry, TDI_BUCKET, Entry );
+           Complete = Bucket->Request.RequestNotifyObject;
+
+           Irp = Bucket->Request.RequestContext;
+           Mdl = Irp->MdlAddress;
+
+           TI_DbgPrint(DEBUG_TCP,
+                       ("Getting the user buffer from %x\n", Mdl));
+
+           NdisQueryBuffer( Mdl, &SendBuffer, &SendLen );
+
+           TI_DbgPrint(DEBUG_TCP,
+                       ("Writing %d bytes to %x\n", SendLen, SendBuffer));
+
+           TI_DbgPrint(DEBUG_TCP, ("Connection: %x\n", Connection));
+           TI_DbgPrint
+               (DEBUG_TCP,
+                ("Connection->SocketContext: %x\n",
+                 Connection->SocketContext));
+
+           Status = TCPTranslateError
+               ( OskitTCPSend( Connection->SocketContext,
+                               SendBuffer,
+                               SendLen,
+                               &Sent,
+                               0 ) );
+
+           TI_DbgPrint(DEBUG_TCP,("TCP Bytes: %d\n", Sent));
+
+           if( Status == STATUS_SUCCESS ) {
+               TI_DbgPrint(DEBUG_TCP,("Sent %d bytes with status %x\n",
+                                      Sent, Status));
+
+               Complete( Bucket->Request.RequestContext,
+                         STATUS_SUCCESS, Sent );
+           } else if( Status == STATUS_PENDING ) {
+               InsertHeadList
+                   ( &Connection->SendRequest, &Bucket->Entry );
+               break;
+           } else {
+               TI_DbgPrint(DEBUG_TCP,
+                           ("Completing Send request: %x %x\n",
+                            Bucket->Request, Status));
+               Complete( Bucket->Request.RequestContext, Status, 0 );
+           }
+       }
+    }
 
     if( NewState & SEL_FIN ) {
         PLIST_ENTRY ListsToErase[4];
@@ -209,6 +266,7 @@ PCONNECTION_ENDPOINT TCPAllocateConnectionEndpoint( PVOID ClientContext ) {
     InitializeListHead(&Connection->ConnectRequest);
     InitializeListHead(&Connection->ListenRequest);
     InitializeListHead(&Connection->ReceiveRequest);
+    InitializeListHead(&Connection->SendRequest);
 
     /* Save client context pointer */
     Connection->ClientContext = ClientContext;
@@ -656,10 +714,17 @@ NTSTATUS TCPReceiveData
 NTSTATUS TCPSendData
 ( PCONNECTION_ENDPOINT Connection,
   PCHAR BufferData,
-  ULONG PacketSize,
-  PULONG DataUsed,
-  ULONG Flags) {
+  ULONG SendLength,
+  PULONG BytesSent,
+  ULONG Flags,
+  PTCP_COMPLETION_ROUTINE Complete,
+  PVOID Context ) {
+    UINT Sent = 0;
     NTSTATUS Status;
+    PTDI_BUCKET Bucket;
+
+    TI_DbgPrint(DEBUG_TCP,("Called for %d bytes (on socket %x)\n",
+                           SendLength, Connection->SocketContext));
 
     ASSERT_KM_POINTER(Connection->SocketContext);
 
@@ -669,12 +734,38 @@ NTSTATUS TCPSendData
     TI_DbgPrint(DEBUG_TCP,("Connection->SocketContext = %x\n",
                           Connection->SocketContext));
 
-    Status = OskitTCPSend( Connection->SocketContext,
-                          (OSK_PCHAR)BufferData, PacketSize,
-                          (PUINT)DataUsed, 0 );
+    Status = TCPTranslateError
+       ( OskitTCPSend( Connection->SocketContext,
+                       (OSK_PCHAR)BufferData, SendLength,
+                       &Sent, 0 ) );
+
+    TI_DbgPrint(DEBUG_TCP,("OskitTCPSend: %x, %d\n", Status, Sent));
+
+    /* Keep this request around ... there was no data yet */
+    if( Status == STATUS_PENDING ) {
+       /* Freed in TCPSocketState */
+       Bucket = ExAllocatePool( NonPagedPool, sizeof(*Bucket) );
+       if( !Bucket ) {
+           TI_DbgPrint(DEBUG_TCP,("Failed to allocate bucket\n"));
+           TcpipRecursiveMutexLeave( &TCPLock );
+           return STATUS_NO_MEMORY;
+       }
+
+       Bucket->Request.RequestNotifyObject = Complete;
+       Bucket->Request.RequestContext = Context;
+       *BytesSent = 0;
+
+       InsertHeadList( &Connection->SendRequest, &Bucket->Entry );
+       TI_DbgPrint(DEBUG_TCP,("Queued write irp\n"));
+    } else {
+       TI_DbgPrint(DEBUG_TCP,("Got status %x, bytes %d\n", Status, Sent));
+       *BytesSent = Sent;
+    }
 
     TcpipRecursiveMutexLeave( &TCPLock );
 
+    TI_DbgPrint(DEBUG_TCP,("Status %x\n", Status));
+
     return Status;
 }
 
@@ -729,14 +820,14 @@ VOID TCPRemoveIRP( PCONNECTION_ENDPOINT Endpoint, PIRP Irp ) {
     PTDI_BUCKET Bucket;
     UINT i = 0;
 
-    ListHead[0] = &Endpoint->ReceiveRequest;
-    ListHead[1] = &Endpoint->ConnectRequest;
-    ListHead[2] = &Endpoint->ListenRequest;
-    ListHead[3] = 0;
+    ListHead[0] = &Endpoint->SendRequest;
+    ListHead[1] = &Endpoint->ReceiveRequest;
+    ListHead[2] = &Endpoint->ConnectRequest;
+    ListHead[3] = &Endpoint->ListenRequest;
 
     TcpipAcquireSpinLock( &Endpoint->Lock, &OldIrql );
 
-    for( i = 0; ListHead[i]; i++ ) {
+    for( i = 0; i < sizeof( ListHead ) / sizeof( ListHead[0] ); i++ ) {
        for( Entry = ListHead[i]->Flink;
             Entry != ListHead[i];
             Entry = Entry->Flink ) {
index fd82994..14f4c85 100644 (file)
@@ -256,12 +256,26 @@ int OskitTCPClose( void *socket ) {
 
 int OskitTCPSend( void *socket, OSK_PCHAR Data, OSK_UINT Len,
                  OSK_UINT *OutLen, OSK_UINT flags ) {
-    struct mbuf* m = m_devget( Data, Len, 0, NULL, NULL );
-    int error = 0;
-       if ( !m )
-               return ENOBUFS;
-    error = sosend( socket, NULL, NULL, m, NULL, 0 );
-    *OutLen = Len;
+    int error;
+    struct uio uio;
+    struct iovec iov;
+
+    iov.iov_len = Len;
+    iov.iov_base = Data;
+    uio.uio_iov = &iov;
+    uio.uio_iovcnt = 1;
+    uio.uio_offset = 0;
+    uio.uio_resid = Len;
+    uio.uio_segflg = UIO_SYSSPACE;
+    uio.uio_rw = UIO_WRITE;
+    uio.uio_procp = NULL;
+
+    error = sosend( socket, NULL, &uio, NULL, NULL, 0 );
+    if (OSK_EWOULDBLOCK == error) {
+       ((struct socket *) socket)->so_snd.sb_flags |= SB_WAIT;
+    }
+    *OutLen = uio.uio_offset;
+
     return error;
 }
 
index 9e432d9..a00b277 100644 (file)
@@ -35,6 +35,10 @@ void wakeup( struct socket *so, void *token ) {
        OS_DbgPrint(OSK_MID_TRACE,("Socket readable\n"));
        flags |= SEL_READ;
     }
+    if( 0 < sbspace(&so->so_snd) ) {
+       OS_DbgPrint(OSK_MID_TRACE,("Socket writeable\n"));
+       flags |= SEL_WRITE;
+    }
     if( so->so_state & SS_CANTRCVMORE ) {
        OS_DbgPrint(OSK_MID_TRACE,("Socket can't be read any longer\n"));
        flags |= SEL_FIN;
index 9d0f9bf..ef4ec2a 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id$
+/*
  * COPYRIGHT:        See COPYING in the top level directory
  * PROJECT:          ReactOS kernel
  * FILE:             drivers/net/afd/afd/write.c
@@ -17,7 +17,6 @@ static NTSTATUS DDKAPI SendComplete
   PIRP Irp,
   PVOID Context ) {
     NTSTATUS Status = Irp->IoStatus.Status;
-    PIO_STACK_LOCATION IrpSp = IoGetCurrentIrpStackLocation(Irp);
     PAFD_FCB FCB = (PAFD_FCB)Context;
     PLIST_ENTRY NextIrpEntry;
     PIRP NextIrp = NULL;
@@ -26,6 +25,15 @@ static NTSTATUS DDKAPI SendComplete
     PAFD_MAPBUF Map;
     UINT TotalBytesCopied = 0, SpaceAvail, i, CopySize = 0;
 
+    /*
+     * The Irp parameter passed in is the IRP of the stream between AFD and
+     * TDI driver. It's not very usefull to us. We need the IRPs of the stream
+     * between usermode and AFD. Those are chained from
+     * FCB->PendingIrpList[FUNCTION_SEND] and you'll see them in the code
+     * below as "NextIrp" ('cause they are the next usermode IRP to be
+     * processed).
+     */
+
     AFD_DbgPrint(MID_TRACE,("Called, status %x, %d bytes used\n",
                            Irp->IoStatus.Status,
                            Irp->IoStatus.Information));
@@ -78,7 +86,7 @@ static NTSTATUS DDKAPI SendComplete
        !IsListEmpty( &FCB->PendingIrpList[FUNCTION_SEND] ) &&
        NT_SUCCESS(Status) ) {
        NextIrpEntry =
-           RemoveHeadList(&FCB->PendingIrpList[FUNCTION_RECV]);
+           RemoveHeadList(&FCB->PendingIrpList[FUNCTION_SEND]);
        NextIrp =
            CONTAINING_RECORD(NextIrpEntry, IRP, Tail.Overlay.ListEntry);
        NextIrpSp = IoGetCurrentIrpStackLocation( NextIrp );
@@ -89,8 +97,7 @@ static NTSTATUS DDKAPI SendComplete
 
        SpaceAvail = FCB->Send.Size - FCB->Send.BytesUsed;
 
-       for( i = 0; FCB->Send.BytesUsed < FCB->Send.Content &&
-                i < SendReq->BufferCount; i++ ) {
+       for( i = 0; i < SendReq->BufferCount; i++ ) {
            Map[i].BufferAddress =
                MmMapLockedPages( Map[i].Mdl, KernelMode );
 
@@ -101,7 +108,7 @@ static NTSTATUS DDKAPI SendComplete
                           Map[i].BufferAddress,
                           CopySize );
 
-           MmUnmapLockedPages( Map[i].Mdl, KernelMode );
+           MmUnmapLockedPages( Map[i].BufferAddress, Map[i].Mdl );
 
            FCB->Send.BytesUsed += CopySize;
            TotalBytesCopied += CopySize;
@@ -116,7 +123,7 @@ static NTSTATUS DDKAPI SendComplete
        SocketCalloutEnter( FCB );
 
        Status = TdiSend( &FCB->SendIrp.InFlightRequest,
-                         IrpSp->FileObject,
+                         FCB->Connection.Object,
                          0,
                          FCB->Send.Window,
                          FCB->Send.BytesUsed,
@@ -138,13 +145,13 @@ static NTSTATUS DDKAPI SendComplete
 
        AFD_DbgPrint(MID_TRACE,("Dismissing request: %x\n", Status));
 
-       return UnlockAndMaybeComplete( FCB, Status, Irp, TotalBytesCopied,
+       return UnlockAndMaybeComplete( FCB, Status, NextIrp, TotalBytesCopied,
                                       NULL, TRUE );
     } else if( NextIrp ) {
        AFD_DbgPrint(MID_TRACE,("Could not do any more with Irp %x\n",
                                NextIrp));
        InsertHeadList( &FCB->PendingIrpList[FUNCTION_SEND],
-                       &Irp->Tail.Overlay.ListEntry );
+                       &NextIrp->Tail.Overlay.ListEntry );
     }
 
     SocketStateUnlock( FCB );
index b2c2efd..b160b66 100644 (file)
@@ -150,7 +150,9 @@ NTSTATUS TCPSendData(
   PCHAR Buffer,
   ULONG DataSize,
   PULONG DataUsed,
-  ULONG Flags);
+  ULONG Flags,
+  PTCP_COMPLETION_ROUTINE Complete,
+  PVOID Context);
 
 NTSTATUS TCPClose( PCONNECTION_ENDPOINT Connection );
 
index 52f0a5f..50f3539 100644 (file)
@@ -302,6 +302,7 @@ typedef struct _CONNECTION_ENDPOINT {
     LIST_ENTRY ConnectRequest; /* Queued connect rqueusts */
     LIST_ENTRY ListenRequest;  /* Queued listen requests */
     LIST_ENTRY ReceiveRequest; /* Queued receive requests */
+    LIST_ENTRY SendRequest;    /* Queued send requests */
 
     /* Signals */
     LIST_ENTRY SignalList;     /* Entry in the list of sockets waiting for
index 88b1ab6..4b8aa87 100644 (file)
@@ -898,15 +898,15 @@ NTSTATUS DispTdiSend(
  */
 {
   PIO_STACK_LOCATION IrpSp;
-  PTDI_REQUEST_KERNEL_RECEIVE ReceiveInfo;
+  PTDI_REQUEST_KERNEL_SEND SendInfo;
   PTRANSPORT_CONTEXT TranContext;
   NTSTATUS Status;
-  ULONG BytesReceived;
+  ULONG BytesSent;
 
   TI_DbgPrint(DEBUG_IRP, ("Called.\n"));
 
   IrpSp = IoGetCurrentIrpStackLocation(Irp);
-  ReceiveInfo = (PTDI_REQUEST_KERNEL_RECEIVE)&(IrpSp->Parameters);
+  SendInfo = (PTDI_REQUEST_KERNEL_SEND)&(IrpSp->Parameters);
 
   TranContext = IrpSp->FileObject->FsContext;
   if (TranContext == NULL)
@@ -938,12 +938,14 @@ NTSTATUS DispTdiSend(
        Status = TCPSendData(
            TranContext->Handle.ConnectionContext,
            Data,
-           ReceiveInfo->ReceiveLength,
-           &BytesReceived,
-           ReceiveInfo->ReceiveFlags);
+           SendInfo->SendLength,
+           &BytesSent,
+           SendInfo->SendFlags,
+           DispDataRequestComplete,
+           Irp);
        if (Status != STATUS_PENDING)
        {
-           DispDataRequestComplete(Irp, Status, BytesReceived);
+           DispDataRequestComplete(Irp, Status, BytesSent);
        } else
            IoMarkIrpPending( Irp );
     }