本篇内容介绍了“dubbo的WrappedChannelHandler有什么作用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联建站是一家企业级云计算解决方案提供商,超15年IDC数据中心运营经验。主营GPU显卡服务器,站群服务器,移动服务器托管,海外高防服务器,机柜大带宽,动态拨号VPS,海外云手机,海外云服务器,海外服务器租用托管等。
本文主要研究一下dubbo的WrappedChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
public class WrappedChannelHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class); protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } public void close() { try { if (executor != null) { executor.shutdown(); } } catch (Throwable t) { logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t); } } @Override public void connected(Channel channel) throws RemotingException { handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException { handler.disconnected(channel); } @Override public void sent(Channel channel, Object message) throws RemotingException { handler.sent(channel, message); } @Override public void received(Channel channel, Object message) throws RemotingException { handler.received(channel, message); } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { handler.caught(channel, exception); } public ExecutorService getExecutor() { return executor; } @Override public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } } public URL getUrl() { return url; } public ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; } }
WrappedChannelHandler的构造根据ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)获取ExecutorService,然后放到dataStore中
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); if (message instanceof Request) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly, // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent // this scenario from happening, but a better solution should be considered later. if (t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); } } else { handler.received(channel, message); } } }
ExecutionChannelHandler继承了WrappedChannelHandler,其received会创建ChannelEventRunnable,然后放到executor去执行
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java
public class ChannelEventRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class); private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) { this(channel, handler, state, null); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) { this(channel, handler, state, message, null); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) { this(channel, handler, state, null, t); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) { this.channel = channel; this.handler = handler; this.state = state; this.message = message; this.exception = exception; } @Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } } /** * ChannelState * * */ public enum ChannelState { /** * CONNECTED */ CONNECTED, /** * DISCONNECTED */ DISCONNECTED, /** * SENT */ SENT, /** * RECEIVED */ RECEIVED, /** * CAUGHT */ CAUGHT } }
ChannelEventRunnable实现了Runnable接口,其run方法根据不同的ChannelState做不同处理
WrappedChannelHandler的构造根据ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)获取ExecutorService,然后放到dataStore中
ExecutionChannelHandler继承了WrappedChannelHandler,其received会创建ChannelEventRunnable,然后放到executor去执行
ChannelEventRunnable实现了Runnable接口,其run方法根据不同的ChannelState做不同处理
“dubbo的WrappedChannelHandler有什么作用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!