文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java Dubbo协议下的服务端线程如何使用

2023-07-05 07:20

关注

本篇内容主要讲解“Java Dubbo协议下的服务端线程如何使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java Dubbo协议下的服务端线程如何使用”吧!

Provider端线程模型

在了解服务端线程模型之前,先了解一下Dubbo对Channel上的操作抽象,Dubbo将Channel上的操作成了5中行为,分别是:建立连接、断开连接、发送消息、接收消息、异常捕获,Channel上的操作的接口为org.apache.dubbo.remoting.ChannelHandler,该接口是SPI的,用户可以自己扩展,接口代码如下:

该接口抽象的五种Channel上的行为解释如下:

Dubbo框架的线程模型与以上这五种行为息息相关,Dubbo协议Provider端线程模型提供了五种实现,虽说都是五种但是别把二者混淆,线程模型的顶级接口是org.apache.dubbo.remoting.Dispatcher,该接口也是SPI的,提供的五种实现分别是AllDispatcherDirectDispatcherMessageOnlyDispatcherExecutionDispatcherConnectionOrderedDispatcher,默认的使用的是AllDispatcher

Java Dubbo协议下的服务端线程如何使用

org.apache.dubbo.remoting.ChannelHandler作为Channel上的行为的顶级接口对应Dubbo协议Provider端的5种线程模型同样也提供了对应的5种实现,分别是AllChannelHandlerDirectChannelHandlerMessageOnlyChannelHandlerExecutionChannelHandlerConnectionOrderedChannelHandler,这里Channel上行为的具体实现不展开讨论。

Java Dubbo协议下的服务端线程如何使用

Channel上行为和线程模型之间使用策略可以参考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers的源代码,这里不做详细的介绍,下面的各个章节只针对5种线程模型做简单的介绍。

AllDispatcher

IO线程上的操作:

Dubbo线程池上的操作:

AllDispatcher代码如下,AllDispatcherdispatch方法实例化了AllChannelHandlerAllChannelHandler实现了received、connected、disconnected、caught操作在dubbo线程池中,代码如下:

public class AllDispatcher implements Dispatcher {    public static final String NAME = "all";    @Override    public ChannelHandler dispatch(ChannelHandler handler, URL url) {        return new AllChannelHandler(handler, url);    }}
public class AllChannelHandler extends WrappedChannelHandler {    public AllChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void connected(Channel channel) throws RemotingException {        ExecutorService executor = getSharedExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));        } catch (Throwable t) {            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);        }    }    @Override    public void disconnected(Channel channel) throws RemotingException {        ExecutorService executor = getSharedExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));        } catch (Throwable t) {            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);        }    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getPreferredExecutorService(message);        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));        } catch (Throwable t) {            if(message instanceof Request && t instanceof RejectedExecutionException){                sendFeedback(channel, (Request) message, t);                return;            }            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);        }    }    @Override    public void caught(Channel channel, Throwable exception) throws RemotingException {        ExecutorService executor = getSharedExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));        } catch (Throwable t) {            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);        }    }}

DirectDispatcher

该线程模型Channel上的所有行为均在IO线程中执行,并没有在Dubbo线程池中执行

DirectDispatcherAllDispatcher相似,实例化了DirectChannelHandlerDirectChannelHandler只实现了received行为,但是received中获取的线程池如果是ThreadlessExecutor才会提交task,否则也是在ChannelHandler中执行received行为,ThreadlessExecutor和普通线程池最大的区别是不会管理任何线程,这里不展开讨论。

public class DirectDispatcher implements Dispatcher {    public static final String NAME = "direct";    @Override    public ChannelHandler dispatch(ChannelHandler handler, URL url) {        return new DirectChannelHandler(handler, url);    }}
public class DirectChannelHandler extends WrappedChannelHandler {    public DirectChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getPreferredExecutorService(message);        if (executor instanceof ThreadlessExecutor) {            try {                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));            } catch (Throwable t) {                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);            }        } else {            handler.received(channel, message);        }    }}

ExecutionDispatcher

在IO线程中执行的操作有:

在Dubbo线程中执行的操作有:

同样的,我们可以直接看ExecutionChannelHandler源码,逻辑是当message的类型是Request时received行为在Dubbo线程池执行。感兴趣的可以自己看源码,这里不做介绍。

MessageOnlyDispatcher

Message Only Dispatcher所有的received行为和反序列化都是在dubbo线程池中执行的

public class MessageOnlyChannelHandler extends WrappedChannelHandler {    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getPreferredExecutorService(message);        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));        } catch (Throwable t) {            if(message instanceof Request && t instanceof RejectedExecutionException){                sendFeedback(channel, (Request) message, t);                return;            }            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);        }    }}

ConnectionOrderedDispatcher

该线程模型与AllDispatcher类似,sent操作和相应的序列化是在IO线程上执行;connected、disconnected、received、caught操作在dubbo线程池上执行,他们的区别是在connected、disconnected行为上ConnectionOrderedDispatcher做了线程池隔离,并且在Dubbo connected thread pool中提供了链接限制、告警灯能力,我们直接看ConnectionOrderedChannelHandler源码,代码如下:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {    protected final ThreadPoolExecutor connectionExecutor;    private final int queueWarningLimit;    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);        connectionExecutor = new ThreadPoolExecutor(1, 1,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),                new NamedThreadFactory(threadName, true),                new AbortPolicyWithReport(threadName, url)        );  // FIXME There's no place to release connectionExecutor!        queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);    }    @Override    public void connected(Channel channel) throws RemotingException {        try {            checkQueueLength();            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));        } catch (Throwable t) {            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);        }    }    @Override    public void disconnected(Channel channel) throws RemotingException {        try {            checkQueueLength();            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));        } catch (Throwable t) {            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);        }    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getPreferredExecutorService(message);        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));        } catch (Throwable t) {            if (message instanceof Request && t instanceof RejectedExecutionException) {                sendFeedback(channel, (Request) message, t);                return;            }            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);        }    }    @Override    public void caught(Channel channel, Throwable exception) throws RemotingException {        ExecutorService executor = getSharedExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));        } catch (Throwable t) {            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);        }    }    private void checkQueueLength() {        if (connectionExecutor.getQueue().size() > queueWarningLimit) {            logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit));        }    }}

到此,相信大家对“Java Dubbo协议下的服务端线程如何使用”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯