Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 321|回复: 0

[默认分类] 深入探析c# Socket

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2018-5-27 12:44:13 | 显示全部楼层 |阅读模式
      最近浏览了几篇有关Socket发送消息的文章,发现大家对Socket Send方法理解有所偏差,现将自己在开发过程中对Socket的领悟写出来,以供大家参考。
      (一)架构
      基于TCP协议的Socket通信,架构类似于B/S架构,一个Socket通信服务器,多个Socket通信客户端。Socket通信服务器启动时,会建立一个侦听Socket,侦听Socket将侦听到的Socket连接传给接受Socket,然后由接受Socket完成接受、发送消息,当Socket存在异常时,断开连接。在实际开发项目中,往往要求Socket通信服务器能提供高效、稳定的服务,一般会用到以下技术:双工通信、完成端口、SAEA、池、多线程、异步等。特别是池,用的比较多,池一般包括一下几种:
    1)Buffer池,用于集中管控Socket缓冲区,防止内存碎片。
    2)SAEA池,用于集中管控Socket,重复利用Socket。
    3)SQL池,用于分离网络服务层与数据访问层(SQL的执行效率远远低于网络层执行效率)。
    4)线程池,用于从线程池中调用空闲线程执行业务逻辑,进一步提高网络层运行效率。
      


      (二)Send
      主服务器接受Socket为一端口,客户端Socket为一端口,这两个端口通过TCP协议建立连接,通信基础系统负责管理此连接,它有两个功能:            
      1)发送消息            
      2)接受消息
      Socket的Send方法,并非大家想象中的从一个端口发送消息到另一个端口,它仅仅是拷贝数据到基础系统的发送缓冲区,然后由基础系统将发送缓冲区的数据到连接的另一端口。值得一说的是,这里的拷贝数据与异步发送消息的拷贝是不一样的,同步发送的拷贝,是直接拷贝数据到基础系统缓冲区,拷贝完成后返回,在拷贝的过程中,执行线程会IO等待, 此种拷贝与Socket自带的Buffer空间无关,但异步发送消息的拷贝,是将Socket自带的Buffer空间内的所有数据,拷贝到基础系统发送缓冲区,并立即返回,执行线程无需IO等待,所以异步发送在发送前必须执行SetBuffer方法,拷贝完成后,会触发你自定义回调函数ProcessSend,在ProcessSend方法中,调用SetBuffer方法,重新初始化Buffer空间。
      

      
      口说无凭,下面给个例子:
      服务器端:

    客户端:

    解释:
      
    客户端第一次发送数据:1234567890。
    客户端第一个接受数据:1234567890,该数据由服务端用Send同步方法发送返回。
    客户端第二个接受数据:1234567890,该数据由服务端用Send异步方法发送返回。
      
    以上似乎没什么异常,好,接下来,我只发送abc。
    客户端第一个接受数据:abc,理所当然,没什么问题。
    客户端第二个接受数据:abc4567890!为什么呢?应该是abc才对呀!
      
    好,现在为大家解释一下:
    异步发送是将其Buffer空间中所有数据拷贝到基础系统发送缓冲区,第一次拷贝1234567890到发送缓冲区,所以收到1234567890,第二次拷贝abc到发送缓冲区,替换了先前的123,所以收到abc4567890,大家明白的?
      
    源码:
      
      



    BufferManager
      
      
       
       using
        System.Collections.Generic;

       using
        System.Net.Sockets;


       //
        This class creates a single large buffer which can be divided up

       //
        and assigned to SocketAsyncEventArgs objects for use with each

       //
        socket I/O operation.

       //
        This enables bufffers to be easily reused and guards against

       //
        fragmenting heap memory.

       //
       

       //
        The operations exposed on the BufferManager class are not thread safe.
       

       class
        BufferManager
    {

       int
        m_numBytes;
       //
        the total number of bytes controlled by the buffer pool
       

       byte
       [] m_buffer;
       //
        the underlying byte array maintained by the Buffer Manager
       

        Stack
       <
       int
       >
        m_freeIndexPool;
       //
       

       int
        m_currentIndex;

       int
        m_bufferSize;


       public
        BufferManager(
       int
        totalBytes,
       int
        bufferSize)
    {
    m_numBytes
       =
        totalBytes;
    m_currentIndex
       =
       0
       ;
    m_bufferSize
       =
        bufferSize;
    m_freeIndexPool
       =
       new
        Stack
       <
       int
       >
       ();
    }


       //
        Allocates buffer space used by the buffer pool
       

       public
       void
        InitBuffer()
    {

       //
        create one big large buffer and divide that

       //
        out to each SocketAsyncEventArg object
       

        m_buffer
       =
       new
       byte
       [m_numBytes];
    }


       //
        Assigns a buffer from the buffer pool to the

       //
        specified SocketAsyncEventArgs object

       //
       

       //
        <returns>true if the buffer was successfully set, else false</returns>
       

       public
       bool
        SetBuffer(SocketAsyncEventArgs args)
    {


       if
        (m_freeIndexPool.Count
       >
       0
       )
    {
    args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
    }

       else
       
    {

       if
        ((m_numBytes
       -
        m_bufferSize)
       <
        m_currentIndex)
    {

       return
       false
       ;
    }
    args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
    m_currentIndex
       +=
        m_bufferSize;
    }

       return
       true
       ;
    }


       //
        Removes the buffer from a SocketAsyncEventArg object.

       //
        This frees the buffer back to the buffer pool
       

       public
       void
        FreeBuffer(SocketAsyncEventArgs args)
    {
    m_freeIndexPool.Push(args.Offset);
    args.SetBuffer(
       null
       ,
       0
       ,
       0
       );
    }

    }

       
      

      
      
      



    SocketAsyncEventArgsPool
      
      
       
       using
        System;

       using
        System.Collections.Generic;

       using
        System.Net.Sockets;


       //
        Represents a collection of reusable SocketAsyncEventArgs objects.
       

       class
        SocketAsyncEventArgsPool
    {
    Stack
       <
       SocketAsyncEventArgs
       >
        m_pool;


       //
        Initializes the object pool to the specified size

       //
       

       //
        The "capacity" parameter is the maximum number of

       //
        SocketAsyncEventArgs objects the pool can hold
       

       public
        SocketAsyncEventArgsPool(
       int
        capacity)
    {
    m_pool
       =
       new
        Stack
       <
       SocketAsyncEventArgs
       >
       (capacity);
    }


       //
        Add a SocketAsyncEventArg instance to the pool

       //
       

       //
       The "item" parameter is the SocketAsyncEventArgs instance

       //
        to add to the pool
       

       public
       void
        Push(SocketAsyncEventArgs item)
    {

       if
        (item
       ==
       null
       ) {
       throw
       new
        ArgumentNullException(
       "
       Items added to a SocketAsyncEventArgsPool cannot be null
       "
       ); }

       lock
        (m_pool)
    {
    m_pool.Push(item);
    }
    }


       //
        Removes a SocketAsyncEventArgs instance from the pool

       //
        and returns the object removed from the pool
       

       public
        SocketAsyncEventArgs Pop()
    {

       lock
        (m_pool)
    {

       return
        m_pool.Pop();
    }
    }


       //
        The number of SocketAsyncEventArgs instances in the pool
       

       public
       int
        Count
    {

       get
        {
       return
        m_pool.Count; }
    }

    }
       
      

      
      
      


      
      using
       System;

      using
       System.Collections.Generic;

      using
       System.Linq;

      using
       System.Text;

      using
       System.Net.Sockets;


      class
       AsyncUserToken
    {

      public
       Socket Socket;
    }

      

      
      
      



    Server
      
      
       
       using
        System;

       using
        System.Threading;

       using
        System.Net.Sockets;

       using
        System.Net;

       using
        System.Text;


       //
        Implements the connection logic for the socket server.

       //
        After accepting a connection, all data read from the client

       //
        is sent back to the client. The read and echo back to the client pattern

       //
        is continued until the client disconnects.
       

       class
        Server
    {

       private
       int
        m_numConnections;
       //
        the maximum number of connections the sample is designed to handle simultaneously
       

       private
       int
        m_receiveBufferSize;
       //
        buffer size to use for each socket I/O operation
       

        BufferManager m_bufferManager;
       //
        represents a large reusable set of buffers for all socket operations
       

       const
       int
        opsToPreAlloc
       =
       2
       ;
       //
        read, write (don"t alloc buffer space for accepts)
       

        Socket listenSocket;
       //
        the socket used to listen for incoming connection requests

       //
        pool of reusable SocketAsyncEventArgs objects for write, read and accept socket operations
       

        SocketAsyncEventArgsPool m_readWritePool;

       int
        m_totalBytesRead;
       //
        counter of the total # bytes received by the server
       

       int
        m_numConnectedSockets;
       //
        the total number of clients connected to the server
       

        Semaphore m_maxNumberAcceptedClients;


       //
        Create an uninitialized server instance.

       //
        To start the server listening for connection requests

       //
        call the Init method followed by Start method

       //
       

       //
        <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>

       //
        <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
       

       public
        Server(
       int
        numConnections,
       int
        receiveBufferSize)
    {
    m_totalBytesRead
       =
       0
       ;
    m_numConnectedSockets
       =
       0
       ;
    m_numConnections
       =
        numConnections;
    m_receiveBufferSize
       =
        receiveBufferSize;

       //
        allocate buffers such that the maximum number of sockets can have one outstanding read and

       //
       write posted to the socket simultaneously
       

        m_bufferManager
       =
       new
        BufferManager(receiveBufferSize
       *
        numConnections
       *
        opsToPreAlloc,
    receiveBufferSize);

    m_readWritePool
       =
       new
        SocketAsyncEventArgsPool(numConnections);
    m_maxNumberAcceptedClients
       =
       new
        Semaphore(numConnections, numConnections);
    }


       //
        Initializes the server by preallocating reusable buffers and

       //
        context objects. These objects do not need to be preallocated

       //
        or reused, but it is done this way to illustrate how the API can

       //
        easily be used to create reusable objects to increase server performance.

       //

       public
       void
        Init()
    {

       //
        Allocates one large byte buffer which all I/O operations use a piece of. This gaurds

       //
        against memory fragmentation
       

        m_bufferManager.InitBuffer();


       //
        preallocate pool of SocketAsyncEventArgs objects
       

        SocketAsyncEventArgs readWriteEventArg;


       for
        (
       int
        i
       =
       0
       ; i
       <
        m_numConnections; i
       ++
       )
    {

       //
       Pre-allocate a set of reusable SocketAsyncEventArgs
       

        readWriteEventArg
       =
       new
        SocketAsyncEventArgs();
    readWriteEventArg.Completed
       +=
       new
        EventHandler
       <
       SocketAsyncEventArgs
       >
       (IO_Completed);
    readWriteEventArg.UserToken
       =
       new
        AsyncUserToken();


       //
        assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
       

        m_bufferManager.SetBuffer(readWriteEventArg);


       //
        add SocketAsyncEventArg to the pool
       

        m_readWritePool.Push(readWriteEventArg);
    }

    }


       //
        Starts the server such that it is listening for

       //
        incoming connection requests.

       //
       

       //
        <param name="localEndPoint">The endpoint which the server will listening

       //
        for connection requests on</param>
       

       public
       void
        Start(IPEndPoint localEndPoint)
    {

       //
        create the socket which listens for incoming connections
       

        listenSocket
       =
       new
        Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
    listenSocket.Bind(localEndPoint);

       //
        start the server with a listen backlog of 100 connections
       

        listenSocket.Listen(
       100
       );


       //
        post accepts on the listening socket
       

        StartAccept(
       null
       );


       //
       Console.WriteLine("{0} connected sockets with one outstanding receive posted to each....press any key", m_outstandingReadCount);
       

        Console.WriteLine(
       "
       Press any key to terminate the server process....
       "
       );
    Console.ReadKey();
    }



       //
        Begins an operation to accept a connection request from the client

       //
       

       //
        <param name="acceptEventArg">The context object to use when issuing

       //
        the accept operation on the server"s listening socket</param>
       

       public
       void
        StartAccept(SocketAsyncEventArgs acceptEventArg)
    {

       if
        (acceptEventArg
       ==
       null
       )
    {
    acceptEventArg
       =
       new
        SocketAsyncEventArgs();
    acceptEventArg.Completed
       +=
       new
        EventHandler
       <
       SocketAsyncEventArgs
       >
       (AcceptEventArg_Completed);
    }

       else
       
    {

       //
        socket must be cleared since the context object is being reused
       

        acceptEventArg.AcceptSocket
       =
       null
       ;
    }

    m_maxNumberAcceptedClients.WaitOne();

       bool
        willRaiseEvent
       =
        listenSocket.AcceptAsync(acceptEventArg);

       if
        (
       !
       willRaiseEvent)
    {
    ProcessAccept(acceptEventArg);
    }
    }


       //
        This method is the callback method associated with Socket.AcceptAsync

       //
        operations and is invoked when an accept operation is complete

       //

       void
        AcceptEventArg_Completed(
       object
        sender, SocketAsyncEventArgs e)
    {
    ProcessAccept(e);
    }


       private
       void
        ProcessAccept(SocketAsyncEventArgs e)
    {
    Interlocked.Increment(
       ref
        m_numConnectedSockets);
    Console.WriteLine(
       "
       Client connection accepted. There are {0} clients connected to the server
       "
       ,
    m_numConnectedSockets);


       //
        Get the socket for the accepted client connection and put it into the

       //
       ReadEventArg object user token
       

        SocketAsyncEventArgs readEventArgs
       =
        m_readWritePool.Pop();
    ((AsyncUserToken)readEventArgs.UserToken).Socket
       =
        e.AcceptSocket;


       //
        As soon as the client is connected, post a receive to the connection
       

       bool
        willRaiseEvent
       =
        e.AcceptSocket.ReceiveAsync(readEventArgs);

       if
        (
       !
       willRaiseEvent)
    {
    ProcessReceive(readEventArgs);
    }


       //
        Accept the next connection request
       

        StartAccept(e);
    }


       //
        This method is called whenever a receive or send operation is completed on a socket

       //
       

       //
        <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
       

       void
        IO_Completed(
       object
        sender, SocketAsyncEventArgs e)
    {

       //
        determine which type of operation just completed and call the associated handler
       

       switch
        (e.LastOperation)
    {

       case
        SocketAsyncOperation.Receive:
    ProcessReceive(e);

       break
       ;

       case
        SocketAsyncOperation.Send:
    ProcessSend(e);

       break
       ;

       default
       :

       throw
       new
        ArgumentException(
       "
       The last operation completed on the socket was not a receive or send
       "
       );
    }

    }


       //
        This method is invoked when an asynchronous receive operation completes.

       //
        If the remote host closed the connection, then the socket is closed.

       //
        If data was received then the data is echoed back to the client.

       //

       private
       void
        ProcessReceive(SocketAsyncEventArgs e)
    {

       //
        check if the remote host closed the connection
       

        AsyncUserToken token
       =
        (AsyncUserToken)e.UserToken;

       if
        (e.BytesTransferred
       >
       0
       &&
        e.SocketError
       ==
        SocketError.Success)
    {

       //
       increment the count of the total bytes receive by the server
       

        Interlocked.Add(
       ref
        m_totalBytesRead, e.BytesTransferred);
    Console.WriteLine(
       "
       The server has read a total of {0} bytes
       "
       , m_totalBytesRead);


    Int32 BytesToProcess
       =
        e.BytesTransferred;
    Byte[] bt
       =
       new
        Byte[BytesToProcess];
    Buffer.BlockCopy(e.Buffer, e.Offset, bt,
       0
       , BytesToProcess);

       string
        strReceive
       =
        Encoding.Default.GetString(bt);


    Send(token.Socket, bt,
       0
       , bt.Length,
       1000
       );


    Thread.Sleep(
       1000
       );


       //
       echo the data received back to the client

       //
       e.SetBuffer(e.Offset, e.BytesTransferred);
       

       bool
        willRaiseEvent
       =
        token.Socket.SendAsync(e);

       if
        (
       !
       willRaiseEvent)
    {
    ProcessSend(e);
    }

    }

       else
       
    {
    CloseClientSocket(e);
    }
    }



       public
       static
       void
        Send(Socket socket,
       byte
       [] buffer,
       int
        offset,
       int
        size,
       int
        timeout)
    {
    socket.SendTimeout
       =
       0
       ;

       int
        startTickCount
       =
        Environment.TickCount;

       int
        sent
       =
       0
       ;
       //
        how many bytes is already sent
       

       do
       
    {

       if
        (Environment.TickCount
       >
        startTickCount
       +
        timeout)

       //
       throw new Exception("Timeout.");
       

       try
       
    {
    sent
       +=
        socket.Send(buffer, offset
       +
        sent, size
       -
        sent, SocketFlags.None);
    }

       catch
        (SocketException ex)
    {

       if
        (ex.SocketErrorCode
       ==
        SocketError.WouldBlock
       ||
       
    ex.SocketErrorCode
       ==
        SocketError.IOPending
       ||
       
    ex.SocketErrorCode
       ==
        SocketError.NoBufferSpaceAvailable)
    {

       //
        socket buffer is probably full, wait and try again
       

        Thread.Sleep(
       30
       );
    }

       else
       

       throw
        ex;
       //
        any serious error occurr
       

        }
    }
       while
        (sent
       <
        size);
    }




       //
        This method is invoked when an asynchronous send operation completes.

       //
        The method issues another receive on the socket to read any additional

       //
        data sent from the client

       //
       

       //
        <param name="e"></param>
       

       private
       void
        ProcessSend(SocketAsyncEventArgs e)
    {

       if
        (e.SocketError
       ==
        SocketError.Success)
    {

       //
       e.SetBuffer(e.Offset, 10);


       //
        done echoing data back to the client
       

        AsyncUserToken token
       =
        (AsyncUserToken)e.UserToken;

       //
        read the next block of data send from the client
       

       bool
        willRaiseEvent
       =
        token.Socket.ReceiveAsync(e);

       if
        (
       !
       willRaiseEvent)
    {
    ProcessReceive(e);
    }
    }

       else
       
    {
    CloseClientSocket(e);
    }
    }


       private
       void
        CloseClientSocket(SocketAsyncEventArgs e)
    {
    AsyncUserToken token
       =
        e.UserToken
       as
        AsyncUserToken;


       //
        close the socket associated with the client
       

       try
       
    {
    token.Socket.Shutdown(SocketShutdown.Send);
    }

       //
        throws if client process has already closed
       

       catch
        (Exception) { }
    token.Socket.Close();


       //
        decrement the counter keeping track of the total number of clients connected to the server
       

        Interlocked.Decrement(
       ref
        m_numConnectedSockets);
    m_maxNumberAcceptedClients.Release();
    Console.WriteLine(
       "
       A client has been disconnected from the server. There are {0} clients connected to the server
       "
       , m_numConnectedSockets);


       //
        Free the SocketAsyncEventArg so they can be reused by another client
       

        m_readWritePool.Push(e);
    }

    }
       
      

      
      
      
      



    Program
      
      
       
       using
        System;

       using
        System.Net;

       using
        System.Collections.Generic;

       using
        System.IO;


       class
        Program
    {

       static
       void
        Main(
       string
       [] args)
    {
    IPEndPoint iep
       =
       new
        IPEndPoint(IPAddress.Parse(
       "
       10.1.20.6
       "
       ),
       1333
       );

    Server objServer
       =
       new
        Server(
       1000
       ,
       10
       );
    objServer.Init();
    objServer.Start(iep);
    }
    }

       
      

      
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-6-1 07:24 , Processed in 0.375563 second(s), 46 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表