DUBBO消费异步化解决了什么问题
欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习
1 文章概述
我们在服务端开发时如果需要实现异步调用,首先声明一个线程池,并将调用业务方法封装成一个任务提交至线程池,如果不需要获取返回值则封装为Runnable,需要获取返回值则封装为Callable并通过Future对象接受结果。
class CalcTask1 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("task1耗时计算");
Thread.sleep(1000L);
return 100;
}
}
class CalcTask2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("task2耗时计算");
Thread.sleep(3000L);
return 200;
}
}
public class CallableTest {
public static void test1() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
CalcTask1 task1 = new CalcTask1();
Future<Integer> f1 = executor.submit(task1);
CalcTask2 task2 = new CalcTask2();
Future<Integer> f2 = executor.submit(task2);
Integer result1 = f1.get();
Integer result2 = f2.get();
System.out.println("final result=" + (result1 + result2));
executor.shutdown();
}
public static void test2() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
CalcTask1 task1 = new CalcTask1();
CalcTask2 task2 = new CalcTask2();
tasks.add(task1);
tasks.add(task2);
for (int i = 0; i < tasks.size(); i++) {
Future<Integer> future = executor.submit(tasks.get(i));
System.out.println("result=" + future.get());
}
executor.shutdown();
}
}
1.1 什么是消费异步化
在使用DUBBO进行异步化调用时不需要这么麻烦,DUBBO基于NIO非阻塞能力使得服务消费者无需启用多线程就可以实现并行调用多个服务,在此我们给出基于2.7.0版本调用实例。
1.1.1 生产者
(1) 服务声明
public interface CalcSumService {
public Integer sum(int a, int b);
}
public class CalcSumServiceImpl implements CalcSumService {
@Override
public Integer sum(int a, int b) {
return a + b;
}
}
public interface CalcSubtractionService {
public Integer subtraction(int a, int b);
}
public class CalcSubtractionServiceImpl implements CalcSubtractionService {
@Override
public Integer subtraction(int a, int b) {
return a - b;
}
}
(2) 配置文件
<beans>
<dubbo:application name="java-front-provider" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="9999" />
<bean id="calcSumService" class="com.java.front.dubbo.demo.provider.service.CalcSumServiceImpl" />
<bean id="calcSubtractionService" class="com.java.front.dubbo.demo.provider.service.CalcSubtractionServiceImpl" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" />
</beans>
(3) 服务发布
public class Provider {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:META-INF/spring/dubbo-provider.xml");
context.start();
System.out.println(context);
System.in.read();
}
}
1.1.2 消费者
(1) 配置文件
<beans>
<dubbo:application name="java-front-consumer" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000">
<dubbo:method name="sum" async="true" />
</dubbo:reference>
<dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000">
<dubbo:method name="subtraction" async="true" />
</dubbo:reference>
</beans>
(2) 服务消费
public class Consumer {
public static void main(String[] args) throws Exception {
testAsync();
System.in.read();
}
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法运算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();
/** 减法运算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();
/** 输出结果 **/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2 为什么消费异步化
异步化可以将原本串行的调用并行化,减少执行时间从而提升性能。假设上述实例加法服务需要100ms,减法服务需要200ms,那么串行化执行时间为二者之和300ms:
如果消费异步化那么执行时间减少为二者最大值200ms,异步化所带来的性能提升不言而喻:
2 保护性暂停模式
分析DUBBO源码之前我们首先介绍一种多线程设计模式:保护性暂停模式。我们设想这样一种场景:线程A生产数据,线程B读取这个数据。我们必须面对一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。
那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。
public class MyData implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public MyData(String message) {
this.message = message;
}
}
class Resource {
private MyData data;
private Object lock = new Object();
public MyData getData() {
synchronized (lock) {
while (data == null) {
try {
// 没有数据则释放锁并暂停等待被唤醒
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
// 生产数据后唤醒消费线程
this.data = data;
lock.notifyAll();
}
}
}
public class ProtectDesignTest {
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData();
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
在上述代码实例中线程1生产数据,线程2消费数据,Resource类通过wait/notify实现了保护性暂停模式,关于保护性暂停模式请参看我之前《保护性暂停模式详解以及其在DUBBO应用源码分析》这篇文章。
3 源码分析
本章节我们分析对比2.6.9和2.7.0两个版本源码,之所以选取这两个版本是因为2.7.0是一个里程碑版本,异步化能力得到了明显增强。
3.1 version_2.6.9
3.1.1 异步调用
我们首先看看这个版本异步调用使用方式,生产者内容和消费者配置文件同第一章节不再赘述,我们重点分析服务消费代码。
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
test1();
System.in.read();
}
public static void test1() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法运算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
Future<Integer> futureSum = RpcContext.getContext().getFuture();
/** 减法运算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
Future<Integer> futureSubtraction = RpcContext.getContext().getFuture();
/** 输出结果 **/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
}
}
消费者最终执行DubboInvoker.doInvoke,这个方法包含异步调用核心:
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 单向调用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
// 异步调用
else if (isAsync) {
// 发起请求给生产者
ResponseFuture future = currentClient.request(inv, timeout);
// 设置future对象至上下文
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 返回空结果
return new RpcResult();
}
// 同步调用
else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
如果包含async属性则表示异步调用,第一步发送调用请求给生产者,第二步设置Future对象至上下文,第三步立即返回空结果。那么在服务消费时关键一步就是获取Future对象,所以我们在调用时要从上下文获取Future对象:
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
Future<Integer> futureSum = RpcContext.getContext().getFuture();
使用Future对象获取结果:
int sumResult = futureSum.get();
进入FutureAdapter.get()方法:
public class FutureAdapter<V> implements Future<V> {
private final ResponseFuture future;
public V get() throws InterruptedException, ExecutionException {
try {
return (V) (((Result) future.get()).recreate());
} catch (RemotingException e) {
throw new ExecutionException(e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e);
}
}
}
进入ResponseFuture.get()方法,我们可以看到保护性暂停模式应用,当生产者线程没有返回数据则阻塞并等待被唤醒:
public class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 远程调用未完成则等待被唤醒
done.await(timeout, TimeUnit.MILLISECONDS);
// 超时时间未完成则退出
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 抛出超时异常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
}
当消费者接收到生产者响应时会调用received方法唤醒相关阻塞线程,这时阻塞在get方法中的线程即可获取到数据:
public class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
// 根据唯一请求号获取Future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
// 唤醒相关阻塞线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
3.1.2 设置回调函数
我们现在调用get方法会阻塞在那里等到结果,那么有没有一种方式当结果返回时就立即调用我们设置的回调函数?答案是有。
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
test2();
System.in.read();
}
public static void test2() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法运算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
/** 执行回调函数 **/
((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
System.out.println("sumResult=" + response);
}
@Override
public void caught(Throwable exception) {
exception.printStackTrace();
}
});
/** 减法运算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
/** 执行回调函数 **/
((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
System.out.println("subtractionResult=" + response);
}
@Override
public void caught(Throwable exception) {
exception.printStackTrace();
}
});
}
}
DefaultFuture可以设置callback回调函数,当结果返回时如果回调函数不为空则执行:
public class DefaultFuture implements ResponseFuture {
private volatile ResponseCallback callback;
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
// 执行回调函数
invokeCallback(callback);
}
}
private void invokeCallback(ResponseCallback c) {
ResponseCallback callbackCopy = c;
if (callbackCopy == null) {
throw new NullPointerException("callback cannot be null.");
}
c = null;
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
}
if (res.getStatus() == Response.OK) {
try {
// 执行成功回调
callbackCopy.done(res.getResult());
} catch (Exception e) {
logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
}
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
try {
TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
// 发生超时回调
callbackCopy.caught(te);
} catch (Exception e) {
logger.error("callback invoke error ,url:" + channel.getUrl(), e);
}
} else {
try {
RuntimeException re = new RuntimeException(res.getErrorMessage());
callbackCopy.caught(re);
} catch (Exception e) {
logger.error("callback invoke error ,url:" + channel.getUrl(), e);
}
}
}
}
3.2 version_2.7.0
CompletableFuture在这个版本中被引入实现异步调用,可以使用此类强大的异步编程API增强异步能力,我们首先回顾1.1.2章节实例:
public class Consumer {
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法运算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();
/** 减法运算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();
/** 输出结果 **/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在上述消费者代码的实例中我们只是应用了CompletableFuture.get()方法,并没有发挥其强大功能。我们对上述实例稍加改造,两个CompletionStage任务都执行完成后,两个任务结果会一起交给thenCombine进行处理:
public class Consumer {
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer2.xml" });
System.out.println(context);
context.start();
/** 加法运算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();
/** 减法运算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();
/** 乘法运算 **/
CompletableFuture<Integer> multiplyResult = futureSum.thenCombine(futureSubtraction, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t, Integer u) {
return (t * u);
}
});
System.out.println("multiplyResult=" + multiplyResult.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
DubboInvoker代码有所变化:
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否为异步调用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否为future异步方式
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
// 是否需要响应结果
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超时时间
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 单向调用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
// 异步请求
else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
}
// 同步请求
else {
RpcContext.getContext().setFuture(null);
Result result = (Result) currentClient.request(inv, timeout).get();
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
我们看到与2.6.9版本相同的是FutureAdapter同样会被设置到上下文,但是FutureAdapter本身已经发生了变化:
public class FutureAdapter<V> extends CompletableFuture<V> {
private final ResponseFuture future;
private CompletableFuture<Result> resultFuture;
public FutureAdapter(ResponseFuture future) {
this.future = future;
this.resultFuture = new CompletableFuture<>();
// 设置回调函数至DefaultFuture
future.setCallback(new ResponseCallback() {
// 设置响应结果至CompletableFuture
@Override
public void done(Object response) {
Result result = (Result) response;
FutureAdapter.this.resultFuture.complete(result);
V value = null;
try {
value = (V) result.recreate();
} catch (Throwable t) {
FutureAdapter.this.completeExceptionally(t);
}
FutureAdapter.this.complete(value);
}
// 设置异常结果至FutureAdapter
@Override
public void caught(Throwable exception) {
FutureAdapter.this.completeExceptionally(exception);
}
});
}
public ResponseFuture getFuture() {
return future;
}
public CompletableFuture<Result> getResultFuture() {
return resultFuture;
}
}
我们在服务消费时通过getResultFuture方法获取CompletableFuture,这个对象值在回调时被设置,回调时机同样在DefaultFuture.doReceived方法里面:
public class DefaultFuture implements ResponseFuture {
private volatile ResponseCallback callback;
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
// 执行回调函数代码同version_2.6.9
invokeCallback(callback);
}
}
}
4 文章总结
本文第一介绍了DUBBO消费异步化是什么,以及异步化为什么会带来性能提升。第二介绍了保护性暂停模式,这是实现异步化的基础。最后我们阅读了两个不同版本异步化源码,了解了DUBBO异步化演进过程,希望本文对大家有所帮助。
欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
为什么一段看似正确的代码会导致DUBBO线程池被打满
欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习 1 一个公式 之前我们在一个公式看懂:为什么DUBBO线程池会打满这篇文章中分析了为什么DUBBO线程池为什么会打满,在本文开始时我们不妨先回顾这个公式:一个公司有7200名员工,每天上班打卡时间是早上8点到8点30分,每次打卡系统耗时5秒。请问RT、QPS、并发量分别是多少? RT表示响应时间,问题已经告诉了我们答案: RT = 5 QPS表示每秒查询量,假设签到行为平均分布: QPS = 7200 / (30 * 60) = 4 并发量表示系统同时处理的请求数量: 并发量 = QPS x RT = 4 x 5 = 20 根据上述实例引出如下公式: 并发量 = QPS x RT 如果系统为每一个请求分配一个处理线程,那么并发量可以近似等于线程数。基于上述公式不难看出并发量受QPS和RT影响,这两个指标任意一个上升就会导致并发量上升。 但是这只是理想情况,因为并发量受限于系统能力而不可能持续上升,例如DUBB...
-
下一篇
长文详解:DUBBO源码使用了哪些设计模式
欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习 0 文章概述 DUBBO作为RPC领域优秀开源的框架在业界十分流行,本文我们阅读其源码并对其使用到的设计模式进行分析。需要说明的是本文所说的设计模式更加广义,不仅包括标准意义上23种设计模式,还有一些常见经过检验的代码模式例如双重检查锁模式、多线程保护性暂停模式等等。 1 模板方法 模板方法模式定义一个操作中的算法骨架,一般使用抽象类定义算法骨架。抽象类同时定义一些抽象方法,这些抽象方法延迟到子类实现,这样子类不仅遵守了算法骨架约定,也实现了自己的算法。既保证了规约也兼顾灵活性。这就是用抽象构建框架,用实现扩展细节。 DUBBO源码中有一个非常重要的核心概念Invoker,我们可以理解为执行器或者说一个可执行对象,能够根据方法的名称、参数得到相应执行结果,这个特性体现了代理模式我们后面章节再说,本章节我们先分析其中的模板方法模式。 publicabstractclassAbstractInvoker<T...
相关文章
文章评论
共有0条评论来说两句吧...