EventMesh源码解析系列(一)之HTTP Server实现
HTTP Server实现
源码解析
首先我们先讲一下HTTP Server启动前的准备工作,也就是HTTP Server的初始化。
1.在初始化的时候,会初始化多个线程池,这个线程池中会有许多的阻塞的线程队列,当某一种事件发生的时候,就会根据事件来使用线程池工厂来为事件创建线程。
【源代码片段】
public void initThreadPool() throws Exception {
// 批处理消息
BlockingQueue<Runnable> batchMsgThreadPoolQueue = new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize);
batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,
eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, batchMsgThreadPoolQueue, "eventMesh-batchMsg-", true);
// 发送消息,和上面的类似
BlockingQueue<Runnable> sendMsgThreadPoolQueue=...
// 推送消息,和上面的类似
BlockingQueue<Runnable> pushMsgThreadPoolQueue =...
// 客户端管理,和上面的类似
BlockingQueue<Runnable> clientManageThreadPoolQueue =...
// 管理线程池,和上面的类似
BlockingQueue<Runnable> adminThreadPoolQueue =...
// 回复消息,和上面的类似
BlockingQueue<Runnable> replyMessageThreadPoolQueue =...
}
2.注册http请求处理器,这个处理器会针对http发过来的请求,获取到请求码,根据请求码注册对应的处理器,并且为这个事件用上面的线程池分配一个线程进行处理。
【源代码片段】
public void registerHTTPRequestProcessor() {
// 新建批量消息处理器
BatchSendMessageProcessor batchSendMessageProcessor = new BatchSendMessageProcessor(this);
// 获取请求码,并且分配一个线程进行处理
registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(),
batchSendMessageProcessor, batchMsgExecutor);
BatchSendMessageV2Processor batchSendMessageV2Processor =...
// 同步消息处理器,和上面的类似
SendSyncMessageProcessor sendSyncMessageProcessor =...
// 异步消息处理器,和上面的类似
SendAsyncMessageProcessor sendAsyncMessageProcessor =...
// 管理指标处理器,和上面的类似
AdminMetricsProcessor adminMetricsProcessor =...
// 心跳处理器,和上面的类似
HeartBeatProcessor heartProcessor =...
// 订阅处理器,和上面的类似
SubscribeProcessor subscribeProcessor =...
// 和上面的类似
UnSubscribeProcessor unSubscribeProcessor =...
// 回复消息处理器,和上面的类似
ReplyMessageProcessor replyMessageProcessor =...
}
3.这里就是http初始化的全部代码。
public class EventMeshHTTPServer extends AbstractHTTPServer {
...
//初始化
public void init() throws Exception {
logger.info("==================EventMeshHTTPServer Initialing==================");
// 初始化线程组
super.init("eventMesh-http");
// 初始化线程池
initThreadPool();
// 对指标初始化,主要是把生成的指标用于后台数据处理
metrics = new HTTPMetricsServer(this);
metrics.init();
// 消费者管理初始化,主要是把httpServer注册到事件总线上
consumerManager = new ConsumerManager(this);
consumerManager.init();
producerManager = new ProducerManager(this);
producerManager.init();
// 重试
httpRetryer = new HttpRetryer(this);
httpRetryer.init();
// 注册http处理器
registerHTTPRequestProcessor();
logger.info("--------------------------EventMeshHTTPServer inited");
}
...
}
初始化完成之后,再启动http的服务器端,这里我也同样聊聊以下几点。
1.AbstractHTTPServer的启动,采用的是netty的异步模型框架搭建的。具体来讲,这里创建了两个线程池:bossGroup和workerGroup,前者是用来轮询accept事件并且和client建立连接的,后者是用来轮询read和write事件并且使用handlers处理io事件的。而且这里采用了回调机制,当调用发出后,并不一定立刻就能得到结果,而是在实际处理的时候调用这个组件完成后,通过状态、通知等回调告知调用者。
public abstract class AbstractHTTPServer extends AbstractRemotingServer {
...
@Override
public void start() throws Exception {
super.start();
Runnable r = () -> {
// 创建服务器端启动的对象
ServerBootstrap b = new ServerBootstrap();
// 不进行加密通话?
SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext() : null;
b.group(this.bossGroup, this.workerGroup)// 设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
.childHandler(new HttpsServerInitializer(sslContext))// 设置workerGroup的管道处理器
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);// 保持连接状态
try {
httpServerLogger.info("HTTPServer[port={}] started......", this.port);
// 这里就是对http的端口进行绑定,并且启动服务器端,采用了回调机制
ChannelFuture future = b.bind(this.port).sync();
//关闭通道事件进行监听
future.channel().closeFuture().sync();
} catch (Exception e) {
httpServerLogger.error("HTTPServer start Err!", e);
try {
// 关闭资源
shutdown();
} catch (Exception e1) {
httpServerLogger.error("HTTPServer shutdown Err!", e);
}
return;
}
};
Thread t = new Thread(r, "eventMesh-http-server");
t.start();
started.compareAndSet(false, true);
}
...
}
2.设置管道处理器部分,当channel被注册之后,这个类中的initChannel方法就会被调用,也会执行在管道后面加入的handlers。
class HttpsServerInitializer extends ChannelInitializer<SocketChannel> {
private SSLContext sslContext;
public HttpsServerInitializer(SSLContext sslContext) {
this.sslContext = sslContext;
}
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
if (sslContext != null && useTLS) {
SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
pipeline.addFirst("ssl", new SslHandler(sslEngine));
}
// 在管道后面加入handlers
pipeline.addLast(new HttpRequestDecoder(),// 这个是对http解码
new HttpResponseEncoder(),// 这个是对http的响应编码
new HttpConnectionHandler(),// 这个是对http连接处理
new HttpObjectAggregator(Integer.MAX_VALUE),// 这个是http对象聚合
new HTTPHandler());// 这个是http的Handler的具体实现
}
}
}
3.这里附上start的源码部分。
public class EventMeshHTTPServer extends AbstractHTTPServer {
...
public void start() throws Exception {
super.start();
// 指标
metrics.start();
// 消费者管理
consumerManager.start();
// 生产者管理
producerManager.start();
// 重试
httpRetryer.start();
logger.info("--------------------------EventMeshHTTPServer started");
}
...
}
到此,EventMesh的HTTP Server实现部分源码解析就结束了。
注:本文内容由社区小伙伴陈创慧提供,原文地址:https://blog.csdn.net/CodePlayMe/article/details/120671622
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Taro React Native 3 个更新帮助开发者高效开发APP
Taro React Native 开源项目重大更新来了,全方位降低上手成本,提升开发体验。全流程自动化,让开发者摆脱原生环境配置,专注前端开发。 Taro 3.2.0 正式版本发布至今,已过去半年。在此期间,有不少社区开发者已经使用上 Taro 来开发 APP 了。看到社区的使用量越来越多,开发团队也是收获满满。 同时我们也收到了很多来自开发者的反馈,主要集中于开发环境配置复杂、组件和 API 的完善度不够及使用上的 BUG 等。对于组件和 API 的完善度及使用上的 BUG,我们都是尽可能地及时地处理并发布新版本。然而,对于开发者反馈的开发环境配置的问题,却很难复现及解决。 首先 Android + iOS + React Native + Taro,4个技术的各种环境配置,会让很多开发者望而却步。其次开发者面对的环境问题千奇百怪,很多问题难以通过远程协助解决。不少开发者在调研阶段,因为无法顺利运行,便放弃了使用。对于一个跨平台框架来说,主要目的是提效,而非给开发者带来更多困难。开发环境配置问题的解决,显得尤为重要。 这次我们从以下三个方向去优化整个开发流程,全面降低上手成本,让 ...
- 下一篇
每秒创建百万文件,百度沧海·文件存储CFS推出新一代Namespace架构
随着移动互联网、物联网、AI 计算等技术和市场的迅速发展,数据规模指数级膨胀,对于分布式文件系统作为大规模数据场景的存储底座提出了更高的要求。已有分布式文件系统解决方案存在着短板,只能适应有限的场景: >>新型分布式文件系统无法承接传统领域内的所有 WorkLoad:通过只支持部分 POSIX 接口来简化系统设计,无法完全兼容 POSIX 协议。 >>传统分布式文件系统无法支持海量小文件场景:为了保证低延迟,元数据的可扩展性较差、随文件规模性能和稳定性下降严重,无法支持如 AI 训练、自动驾驶等文件规模达到十亿甚至百亿规模的 AI 场景。 因此,设计出一款不仅能完美兼容传统应用,又能适应最新 AI 场景需求的分布式文件存储,显得意义重大。这样的分布式文件系统需要满足: 完全兼容 POSIX 协议。 在确保元数据低延迟、稳定的情况下,可线性扩展,支持百亿文件规模,具备超大规模文件数量元数据操作能力的同时具备超高的性能稳定性。 要想达到以上目标,百度沧海·文件存储 CFS 给出的技术解答是设计新一代的 Namespace 子系统,在实现创建文件每秒百万级 QPS 的...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Linux系统CentOS6、CentOS7手动修改IP地址
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Red5直播服务器,属于Java语言的直播服务器
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2整合Thymeleaf,官方推荐html解决方案