您现在的位置是:首页 > 文章详情

【NIO】NIO版本鸿儒聊天室

日期:2020-07-09点击:383

# 需求

  • 基于NIO实现
  • 支持同时多个客户端接入
  • 支持客户端发送文本消息到服务器
  • 支持客户端自定义群聊名称
  • 接收到客户端发送的消息之后,服务器需要将消息转发给目前在线的所有其他客户端
  • 支持客户端退出群聊
  • 服务端停止服务后,客户端自动断开连接

# 技术介绍

  • Non-blockingI/O 编程模型
  • Channel 通道
    • ServerSocketChannel 服务端通道
    • SocketChannel 客户端通道
  • ByteBuffer NIO中使用的读写缓冲区
  • Selector 多路复用器
    • channel注册在多路复用器上,并监听相应的事件
  • 多线程
  • 线程池

# 代码

温馨提示:注意看代码注释哟~ 跟上节奏,很简单😼

  • 服务器

/**
 * 基于NIO实现的聊天室服务端
 *
 * @author futao
 * @date 2020/7/8
 */

@Slf4j
public class NioChatServer {

    /**
     * 用于处理通道上的事件的线程池(可选的)
     */

    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);

    /**
     * 启动聊天室
     */

    public void start() {
        try {
            //服务端Socket通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //将通道设置成非阻塞
            serverSocketChannel.configureBlocking(false);
            //绑定主机与监听端口
            serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));

            //多路复用器
            Selector selector = Selector.open();

            //将服务端通道注册到多路复用器上,并设置监听事件接入事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            log.debug("{} 基于NIO的聊天室在[{}]端口启动成功 {}", StringUtils.repeat("="30), Constants.SERVER_PORT, StringUtils.repeat("="30));

            while (true) {
                // 触发了事件的通道数量,该方法会阻塞
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 获取到所有触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件进行处理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 处理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件记录
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 事件处理器
     *
     * @param selectionKey 触发的事件信息
     * @param selector     多路复用器
     */

    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
        if (selectionKey.isAcceptable()) {
            //如果触发的是SocketChannel接入事件
            try {
                // ServerSocketChannel上触发的客户端SocketChannel接入
                SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                log.debug("客户端[{}]成功接入聊天服务器", socketChannel.socket().getPort());
                // 将客户端SocketChannel通道设置成非阻塞
                socketChannel.configureBlocking(false);
                // 将客户端通道注册到多路复用器,并监听这个通道上发生的可读事件
                socketChannel.register(selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 触发的是可读事件
            // 获取到可读事件的客户端通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 读取通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                //切换为读模式
                byteBuffer.flip();
                // 接收到的消息
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                log.info("接收到来自客户端[{}]的数据:[{}]", socketChannel.socket().getPort(), message);
                // 是否退出
                quit(message, selector, selectionKey);
                // 消息转发
                forwardMessage(message, selector, selectionKey);
                // 清除缓冲区的数据
                byteBuffer.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 客户端退出
     *
     * @param message      消息
     * @param selector     多路复用器
     * @param selectionKey 触发的selectionKey
     */

    public void quit(String message, Selector selector, SelectionKey selectionKey) {
        if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
            int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
            // 客户端下线
            selectionKey.cancel();
            log.debug("客户端[{}]下线", port);
            // 因为发生了监听事件和channel的变更,所以需要通知selector重新整理selector所监听的事件
            selector.wakeup();
        }
    }

    /**
     * 转发消息
     *
     * @param message         需要转发的消息
     * @param selector        多路复用器
     * @param curSelectionKey 当前触发的selectionKey
     */

    public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
        // 创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
        // 数据写入缓冲区
        byteBuffer.put(message.getBytes(Constants.CHARSET));

        // 切换为读模式
        byteBuffer.flip();
        // 在首尾进行标记,因为需要给每个客户端发送同样的数据,需要重复读取
        byteBuffer.mark();
        // 当前注册在多路复用器上的SelectionKey集合
        Set<SelectionKey> keys = selector.keys();
        for (SelectionKey key : keys) {
            // 消息不能转发给自己 and 只转发给客户端SocketChannel
            if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
                continue;
            }
            // 客户端SocketChannel
            SocketChannel socketChannel = (SocketChannel) key.channel();
            // 如果缓冲区中还有数据就一直写
            while (byteBuffer.hasRemaining()) {
                try {
                    // 数据写入通道
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 重置到上次mark的地方,即首位
            byteBuffer.reset();
        }
        // 清除缓冲区的数据
        byteBuffer.clear();
    }


    public static void main(String[] args) {
        new NioChatServer().start();
    }
}
  • 客户端

/**
 * 基于NIO实现的群聊客户端
 *
 * @author futao
 * @date 2020/7/8
 */

@Getter
@Setter
@Slf4j
public class NioChatClient {

    /**
     * 用于处理用户输入数据的单个线程线程池,使用线程池是为了便于关闭
     */

    private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();

    /**
     * 用户名
     */

    private String userName;

    /**
     * 启动客户端
     */

    public void start() {
        try {
            // 创建客户端通道
            SocketChannel socketChannel = SocketChannel.open();
            // 将通道设置为非阻塞
            socketChannel.configureBlocking(false);

            // 创建多路复用器
            Selector selector = Selector.open();

            // 将客户端通道注册到多路复用器,并监听可读事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);

            // 尝试连接到聊天服务器
            socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));

            while (true) {
                // 阻塞等待通道上的事件触发。返回触发的通道的数量
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 获取到所有触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件进行处理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 处理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件记录
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e) {
            log.debug("成功退出聊天室...");
        }
    }

    /**
     * 处理器
     *
     * @param selectionKey 触发的selectionKey
     * @param selector     多路复用器
     */

    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {

        if (selectionKey.isConnectable()) {
            // 触发的是成功接入服务器的事件
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            try {
                // 判断此通道上的连接操作是否正在进行中
                if (socketChannel.isConnectionPending()) {
                    // 完成连接套接字通道的过程
                    socketChannel.finishConnect();
                    log.debug("成功接入聊天服务器");

                    // 将通道设置成非阻塞
                    socketChannel.configureBlocking(false);
                    // 将通道注册到多路复用器,并监听可读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);

                    // 创建缓冲区,用于处理将用户输入的数据写入通道
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
                    // 在新线程中处理用户输入
                    USER_INPUT_HANDLER.execute(() -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            //先清空缓冲区中的数据
                            byteBuffer.clear();
                            // 获取用户输入的文本
                            String message = new Scanner(System.in).nextLine();
                            // 将数据写入缓冲区
                            byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
                            // 将缓冲区设置为读模式
                            byteBuffer.flip();
                            try {
                                // 当缓冲区中还有数据
                                while (byteBuffer.hasRemaining()) {
                                    // 将数据写入通道
                                    socketChannel.write(byteBuffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }

                            // 判断是否退出群聊
                            if (quit(message, selector, selectionKey)) {
                                // 跳出循环,结束线程
                                break;
                            }
                        }
                        try {
                            // 关闭多路复用器
                            selector.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        // 关闭线程池
                        USER_INPUT_HANDLER.shutdown();
                    });
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 触发的是可读事件
            // 获取到可读事件的通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 将通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                // 切换成读模式
                byteBuffer.flip();
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                byteBuffer.clear();
                log.info("接收到数据:[{}]", message);
                if (StringUtils.isBlank(message)) {
                    log.debug("服务器拉胯,下车...");
                    selector.close();
                    USER_INPUT_HANDLER.shutdownNow();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 退出群聊
     *
     * @param message      消息
     * @param selector     多路复用器
     * @param selectionKey 触发的selectionKey
     * @return 是否退出
     */

    public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
        if (Constants.KEY_WORD_QUIT.equals(message)) {
            selectionKey.cancel();
            selector.wakeup();
            return true;
        }
        return false;
    }


    public static void main(String[] args) {
        NioChatClient nioChatClient = new NioChatClient();
        nioChatClient.setUserName("小9");
        nioChatClient.start();
    }
}

# 测试

  • 接入
image.png
  • 客户端发送消息
image.png
  • 消息转发

  • 客户端下线

  • 服务器宕机
image.png

# 源代码

* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio

# 系列文章

【BIO】基于BIO实现简单动态HTTP服务器


【BIO】通过指定消息大小实现的多人聊天室-终极版本


BIO在聊天室项目中的演化


欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~

本文分享自微信公众号 - 喜欢天文(AllUnderControl)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

原文链接:https://my.oschina.net/u/3529894/blog/4365035
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章