文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java中多线程Reactor模式怎么实现

2023-06-21 22:55

关注

这篇文章主要讲解了“Java中多线程Reactor模式怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java中多线程Reactor模式怎么实现”吧!

多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。

让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度

1、 主服务器

package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;import com.crazymakercircle.util.Logger;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.atomic.AtomicInteger;class MultiThreadEchoServerReactor {    ServerSocketChannel serverSocket;    AtomicInteger next = new AtomicInteger(0);    Selector bossSelector = null;    Reactor bossReactor = null;    //selectors集合,引入多个selector选择器    //多个选择器可以更好的提高通道的并发量    Selector[] workSelectors = new Selector[2];    //引入多个子反应器    //如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。    //每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量    Reactor[] workReactors = null;    MultiThreadEchoServerReactor() throws IOException {        bossSelector = Selector.open();        //初始化多个selector选择器        workSelectors[0] = Selector.open();        workSelectors[1] = Selector.open();        serverSocket = ServerSocketChannel.open();        InetSocketAddress address =                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,                        NioDemoConfig.SOCKET_SERVER_PORT);        serverSocket.socket().bind(address);        //非阻塞        serverSocket.configureBlocking(false);        //第一个selector,负责监控新连接事件        SelectionKey sk =                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);        //附加新连接处理handler处理器到SelectionKey(选择键)        sk.attach(new AcceptorHandler());        //处理新连接的反应器        bossReactor = new Reactor(bossSelector);        //第一个子反应器,一子反应器负责一个选择器        Reactor subReactor1 = new Reactor(workSelectors[0]);        //第二个子反应器,一子反应器负责一个选择器        Reactor subReactor2 = new Reactor(workSelectors[1]);        workReactors = new Reactor[]{subReactor1, subReactor2};    }    private void startService() {        new Thread(bossReactor).start();        // 一子反应器对应一条线程        new Thread(workReactors[0]).start();        new Thread(workReactors[1]).start();    }    //反应器    class Reactor implements Runnable {        //每条线程负责一个选择器的查询        final Selector selector;        public Reactor(Selector selector) {            this.selector = selector;        }        public void run() {            try {                while (!Thread.interrupted()) {                    //单位为毫秒                    //每隔一秒列出选择器感应列表                    selector.select(1000);                    Set<SelectionKey> selectedKeys = selector.selectedKeys();                    if (null == selectedKeys || selectedKeys.size() == 0) {                        //如果列表中的通道注册事件没有发生那就继续执行                        continue;                    }                    Iterator<SelectionKey> it = selectedKeys.iterator();                    while (it.hasNext()) {                        //Reactor负责dispatch收到的事件                        SelectionKey sk = it.next();                        dispatch(sk);                    }                    //清楚掉已经处理过的感应事件,防止重复处理                    selectedKeys.clear();                }            } catch (IOException ex) {                ex.printStackTrace();            }        }        void dispatch(SelectionKey sk) {            Runnable handler = (Runnable) sk.attachment();            //调用之前attach绑定到选择键的handler处理器对象            if (handler != null) {                handler.run();            }        }    }    // Handler:新连接处理器    class AcceptorHandler implements Runnable {        public void run() {        try {                SocketChannel channel = serverSocket.accept();                Logger.info("接收到一个新的连接");                if (channel != null) {                    int index = next.get();                    Logger.info("选择器的编号:" + index);                    Selector selector = workSelectors[index];                    new MultiThreadEchoHandler(selector, channel);                }            } catch (IOException e) {                e.printStackTrace();            }            if (next.incrementAndGet() == workSelectors.length) {                next.set(0);            }        }    }    public static void main(String[] args) throws IOException {        MultiThreadEchoServerReactor server =                new MultiThreadEchoServerReactor();        server.startService();    }}

按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。

这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。

 //第一个selector,负责监控新连接事件        SelectionKey sk =                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);        //附加新连接处理handler处理器到SelectionKey(选择键)        sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。

   int index = next.get();                   Logger.info("选择器的编号:" + index);                   Selector selector = workSelectors[index];                   new MultiThreadEchoHandler(selector, channel);

2、IO请求handler+线程池

package com.crazymakercircle.ReactorModel;import com.crazymakercircle.util.Logger;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;class MultiThreadEchoHandler implements Runnable {    final SocketChannel channel;    final SelectionKey sk;    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);    static final int RECIEVING = 0, SENDING = 1;    int state = RECIEVING;    //引入线程池    static ExecutorService pool = Executors.newFixedThreadPool(4);    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {        channel = c;        channel.configureBlocking(false);        //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss        selector.wakeup();        //仅仅取得选择键,后设置感兴趣的IO事件        sk = channel.register(selector, 0);        //将本Handler作为sk选择键的附件,方便事件dispatch        sk.attach(this);        //向sk选择键注册Read就绪事件        sk.interestOps(SelectionKey.OP_READ);        //唤醒选择,是的OP_READ生效        selector.wakeup();        Logger.info("新的连接 注册完成");    }    public void run() {        //异步任务,在独立的线程池中执行        pool.execute(new AsyncTask());    }    //异步任务,不在Reactor线程中执行    public synchronized void asyncRun() {        try {            if (state == SENDING) {                //写入通道                channel.write(byteBuffer);                //写完后,准备开始从通道读,byteBuffer切换成写模式                byteBuffer.clear();                //写完后,注册read就绪事件                sk.interestOps(SelectionKey.OP_READ);                //写完后,进入接收的状态                state = RECIEVING;            } else if (state == RECIEVING) {                //从通道读                int length = 0;                while ((length = channel.read(byteBuffer)) > 0) {                    Logger.info(new String(byteBuffer.array(), 0, length));                }                //读完后,准备开始写入通道,byteBuffer切换成读模式                byteBuffer.flip();                //读完后,注册write就绪事件                sk.interestOps(SelectionKey.OP_WRITE);                //读完后,进入发送的状态                state = SENDING;            }            //处理结束了, 这里不能关闭select key,需要重复使用            //sk.cancel();        } catch (IOException ex) {            ex.printStackTrace();        }    }    //异步任务的内部类    class AsyncTask implements Runnable {        public void run() {            MultiThreadEchoHandler.this.asyncRun();        }    }}

3、客户端

在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。

package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;import com.crazymakercircle.util.Dateutil;import com.crazymakercircle.util.Logger;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Scanner;import java.util.Set;public class EchoClient {    public void start() throws IOException {        InetSocketAddress address =                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,                        NioDemoConfig.SOCKET_SERVER_PORT);        // 1、获取通道(channel)        SocketChannel socketChannel = SocketChannel.open(address);        Logger.info("客户端连接成功");        // 2、切换成非阻塞模式        socketChannel.configureBlocking(false);        //不断的自旋、等待连接完成,或者做一些其他的事情        while (!socketChannel.finishConnect()) {        }        Logger.tcfo("客户端启动成功!");        //启动接受线程        Processer processer = new Processer(socketChannel);        new Thread(processer).start();    }    static class Processer implements Runnable {        final Selector selector;        final SocketChannel channel;        Processer(SocketChannel channel) throws IOException {            //Reactor初始化            selector = Selector.open();            this.channel = channel;            channel.register(selector,                    SelectionKey.OP_READ | SelectionKey.OP_WRITE);        }        public void run() {            try {                while (!Thread.interrupted()) {                    selector.select();                    Set<SelectionKey> selected = selector.selectedKeys();                    Iterator<SelectionKey> it = selected.iterator();                    while (it.hasNext()) {                        SelectionKey sk = it.next();                        if (sk.isWritable()) {                            ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);                            Scanner scanner = new Scanner(System.in);                            Logger.tcfo("请输入发送内容:");                            if (scanner.hasNext()) {                                SocketChannel socketChannel = (SocketChannel) sk.channel();                                String next = scanner.next();                                buffer.put((Dateutil.getNow() + " >>" + next).getBytes());                                buffer.flip();                                // 操作三:发送数据                                socketChannel.write(buffer);                                buffer.clear();                            }                        }                        if (sk.isReadable()) {                            // 若选择键的IO事件是“可读”事件,读取数据                            SocketChannel socketChannel = (SocketChannel) sk.channel();                            //读取数据                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);                            int length = 0;                            while ((length = socketChannel.read(byteBuffer)) > 0) {                                byteBuffer.flip();                                Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));                                byteBuffer.clear();                            }                        }                        //处理结束了, 这里不能关闭select key,需要重复使用                        //selectionKey.cancel();                    }                    selected.clear();                }            } catch (IOException ex) {                ex.printStackTrace();            }        }    }    public static void main(String[] args) throws IOException {        new EchoClient().start();    }}

感谢各位的阅读,以上就是“Java中多线程Reactor模式怎么实现”的内容了,经过本文的学习后,相信大家对Java中多线程Reactor模式怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯