模仿Tomcat的BIO,NIO线程模型
模仿Tomcat的BIO模型,来一个消息,分配一个线程处理.
则主线程池代码如下
package com.guanjian;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
- Created by Administrator on 2018/7/10.
*/
public class ThreadPool {
private ExecutorService service;
private List<MessageTask> tasks;
private int fixedThreadNum = 0;
private List<String> messages;
private MessageHandler messageHandler;
public ThreadPool(int fixedThreadNum,List<String> messages,MessageHandler messageHandler) {
this.fixedThreadNum = fixedThreadNum;
this.messages = messages;
this.messageHandler = messageHandler;
service = Executors.newFixedThreadPool(fixedThreadNum);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
shutdownGracefully(service);
}
});
}
public void shutdownGracefully(ExecutorService ThreadPool) {
ShutdownPool.shutdownThreadPool(ThreadPool, "main-pool");
}
public void startup() {
tasks = new ArrayList<>();
MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages));
for (String message:messages) {
tasks.add(messageTask);
service.execute(messageTask);
}
}
}
它是通过线程数fixedThreadNum来区分使用哪种线程模型.
package com.guanjian;
/**
- Created by Administrator on 2018/7/10.
*/
public interface MessageHandler {
public void execute(String message);
}
package com.guanjian;
/**
- Created by Administrator on 2018/7/10.
*/
public class MessageHandlerImpl implements MessageHandler {
@Override
public void execute(String message) {
System.out.println(message);
}
}
以上是消息处理器的接口和实现类
package com.guanjian;
import java.util.List;
/**
- Created by Administrator on 2018/7/10.
*/
public abstract class MessageTask implements Runnable {
protected MessageHandler messageHandler;
protected List<String> messages;
MessageTask(MessageHandler messageHandler,List<String> messages) {
this.messageHandler = messageHandler;
this.messages = messages;
}
@Override
public void run() {
for (String message:messages) {
handlerMessage(message);
}
}
protected abstract void handlerMessage(String message);
}
消息任务抽象类实现了Runnable线程接口,以不同的子类来实现BIO,NIO线程模型,具体在抽象方法handlerMessage中实现.
package com.guanjian;
import java.util.List;
/**
- Created by Administrator on 2018/7/10.
*/
public class SequentialMessageTask extends MessageTask {
SequentialMessageTask(MessageHandler messageHandler, List<String> messages) {
super(messageHandler, messages);
}
@Override
protected void handlerMessage(String message) {
messageHandler.execute(message);
}
}
BIO线程模型子类,通过主线程池来分配线程处理.
package com.guanjian;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
- Created by Administrator on 2018/7/10.
*/
public class ConcurrentMessageTask extends MessageTask {
private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
ConcurrentMessageTask(MessageHandler messageHandler, List<String> messages) {
super(messageHandler, messages);
}
@Override
protected void handlerMessage(String message) {
asyncService.submit(new Runnable() {
@Override
public void run() {
messageHandler.execute(message);
}
});
}
protected void shutdown() {
ShutdownPool.shutdownThreadPool(asyncService,"async-pool-" + Thread.currentThread().getId());
}
}
NIO线程模型,不再使用主线程池来分配线程,而是异步线程池,类比于Netty中的Worker线程池,从BOSS线程池中接管消息处理.
package com.guanjian;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
- Created by Administrator on 2018/7/10.
*/
public class ShutdownPool {
private static Logger log = LoggerFactory.getLogger(ThreadPool.class);
/**
* 优雅关闭线程池
* @param threadPool
* @param alias
*/
public static void shutdownThreadPool(ExecutorService threadPool, String alias) {
log.info("Start to shutdown the thead pool: {}", alias);
threadPool.shutdown(); // 使新任务无法提交.
try {
// 等待未完成任务结束
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // 取消当前执行的任务
log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");
// 等待任务取消的响应
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
}
} catch (InterruptedException ie) {
// 重新取消当前线程进行中断
threadPool.shutdownNow();
log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");
// 保留中断状态
Thread.currentThread().interrupt();
}
log.info("Finally shutdown the thead pool: {}", alias);
}
}
最后是线程池的优雅关闭,无论是主线程池还是异步线程池皆调用该方法实现优雅关闭.
以上只是模型代码,具体可替换成具体需要的业务代码来达到业务性能的提升.
关注公众号
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
线程间的协作(1)——等待/通知机制
当使用线程来同时运行多个任务时,可以使用锁(互斥)来同步两个任务的行为,从而使得一个任务不会干扰另一个任务,也就是说两个任务在交替的使用某个共享资源,使用互斥可以保证时刻只有一个任务可以访问这项资源。 解决了多线程引起的资源竞争问题,那么我们又该如何实现线程间的彼此协作?使得多个任务可以一起去解决某个问题,也就是说,现在的问题不是线程间的干涉,而是线程间如何协调,因为在这类问题中,某些部分必须在其他部分被解决之前解决。 一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良好的伸缩性,在Java语言中实现的简单的办法是让消费者线程不断地循环检查变量是否符合预期,如下面代码所示,在 while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。 while (value != desire) { Thread.sleep(1000); } doSomethi...
-
下一篇
静态变量的多线程同步问题
我们先来讨论一个问题,一个类的静态变量当类被多次实例化的时候,静态变量是否会受影响?首先我们应该清楚的是静态变量是在类被JVM classloader的时候分配内存,并且是分配在永久区而非堆内存中。 当我们用对象锁来同步静态变量的时候,我们来看一个例子。public interface OrderService { public String getOrderNo(); }先定义一个接口,获取一个订单编号。public class OrderLockServiceImpl implements OrderService { static int num = 0; @Override synchronized public String getOrderNo() { SimpleDateFormat date = new SimpleDateFormat("YYYYMMDDHHMMSS"); return date.format(new Date()) + num++; } }实现这个接口,并且用对象方法来操作静态变量。public class OrderTask implements ...
相关文章
文章评论
共有0条评论来说两句吧...

微信收款码
支付宝收款码