基于curator的延迟队列
这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理
怎么使用
<!--dependency-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
public class Processor {
private final static CuratorFramework client;
private final static DistributedDelayQueue<String> queue;
static{
ZookeeperConfig config = ZookeeperConfig.getConfig();
// create client
client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
new ExponentialBackoffRetry(3000, 2));
// build queue
queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
.buildDelayQueue();
// 开启执行计划
enable();
}
/**
* 生产数据
*
* @param id
* @param endTime
* @throws Exception
*/
public void producer(String id, Date endTime) throws Exception {
queue.put(id, endTime.getTime());
}
private static void enable(){
try {
client.start();
queue.start();
} catch (Exception e) {
logger.error("enable queue fail, exception:{}", e);
}
}
}
// Serializer
class AutoSubmitQueueSerializer implements QueueSerializer<String> {
@Override
public byte[] serialize(String s) {
return s.getBytes("utf-8");
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}
// consumer
AutoSubmitConsumer implements QueueConsumer<String> {
@Override
public void consumeMessage(String id) {
logger.info("consumeMessage, :{}", id);
// service processor.
logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}
}
是临时节点还是持久化节点,如果基于内存的话客户端或者服务端挂了以后就会存在数据丢失的问题? 是否会重新排序,zk是按照请求的时间先后顺序写入的,那么curator是怎么监听到期时间的呢?
猜想
- 是否持久化
- 是否会在每次请求的时候拿到服务端所有的节点数据进行排序后存入到服务端
验证
-
针对第一点,我们关闭
zookeeper
服务端和客户端后重新启动后之前的节点还存在所以是持久化节点 -
通过客户端工具连接
zookeeper
发现并不会每次请求的时候都会重新排序,也就是说可能在client端进行处理的
以下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不确定,第三部分是节点的序号
源码求证
// org.apache.curator.framework.recipes.queue.DistributedQueue#start
// 部分片段
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
if ( !isProducerOnly )
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call()
{
runLoop(); // step1
return null;
}
}
);
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop
// step1中的代码片段
while ( state.get() == State.STARTED )
{
try
{
ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
currentVersion = data.version;
// 诸如:
//queue-|2E1D86A3BB6|0000000019
//queue-|1712F752AA0|0000000036
//queue-|1712F76FF60|0000000035
// 拿到所有的子节点
List<String> children = Lists.newArrayList(data.children);
// 根据过期时间排序
// step6
sortChildren(children);
// 排序后
//queue-|1712F752AA0|0000000036
//queue-|1712F76FF60|0000000035
//queue-|2E1D86A3BB6|0000000019
if ( children.size() > 0 )
{ //获取到期时间
maxWaitMs = getDelay(children.get(0));
if ( maxWaitMs > 0 ) continue;
}
else continue;
// 死循环不断轮询是否有满足条件的节点;
// 只要有满足条件的节点就将整个排序后的集合往下传递
processChildren(children, currentVersion); // step2
}
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren
// step2对应的代码片段:
private void processChildren(List<String> children, long currentVersion)
{
final Semaphore processedLatch = new Semaphore(0);
final boolean isUsingLockSafety = (lockPath != null);
int min = minItemsBeforeRefresh;
for ( final String itemNode : children )
{
if ( Thread.currentThread().isInterrupted() )
{
processedLatch.release(children.size());
break;
}
if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
{
processedLatch.release();
continue;
}
if ( min-- <= 0 )
{
if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
{
processedLatch.release(children.size());
break;
}
}
// step3
if ( getDelay(itemNode) > 0 )
{
processedLatch.release();
continue;
}
//这里使用了线程池,为了保证每一个节点都执行完毕后才返回方法所以使用了信号灯
executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
//是否采用了分布式锁,因为我们初始化的时候并未使用所以没有用到这里的安全锁,实际上是进入到了else中
if ( isUsingLockSafety )
{
processWithLockSafety(itemNode, ProcessType.NORMAL);
}
else
{
// 看这里 step4
processNormally(itemNode, ProcessType.NORMAL);
}
}finally
{
processedLatch.release();
}
}
}
);
}
processedLatch.acquire(children.size());
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)
// 对应step3处的代码片段
protected long getDelay(String itemNode)
{
return getDelay(itemNode, System.currentTimeMillis());
}
private long getDelay(String itemNode, long sortTime)
{ // 会从key上获取时间戳
// step5
long epoch = getEpoch(itemNode);
return epoch - sortTime; // 计算过期时间
}
// 对应step5处的代码
private static long getEpoch(String itemNode)
{
// itemNode -> queue-|时间戳|序号
int index2 = itemNode.lastIndexOf(SEPARATOR);
int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
if ( (index1 > 0) && (index2 > (index1 + 1)) )
{
try
{
String epochStr = itemNode.substring(index1 + 1, index2);
return Long.parseLong(epochStr, 16); // 从这里可以知道queue-|这里是16进制的时间戳了|序号| 可能是出于key长度的考量吧(更节省内存),用10进制的时间戳会长很多
}
}
return 0;
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren
// 会根据延时时间排序
// step6处的代码片段
protected void sortChildren(List<String> children)
{
final long sortTime = System.currentTimeMillis();
Collections.sort
(
children,
new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
);
}
// 对应step4处的代码片段
private boolean processNormally(String itemNode, ProcessType type) throws Exception
{
try
{
String itemPath = ZKPaths.makePath(queuePath, itemNode);
Stat stat = new Stat();
byte[] bytes = null;
if ( type == ProcessType.NORMAL )
{
// 获取key对应的value
bytes = client.getData().storingStatIn(stat).forPath(itemPath);
}
if ( client.getState() == CuratorFrameworkState.STARTED )
{
// 移除节点
client.delete().withVersion(stat.getVersion()).forPath(itemPath);
}
if ( type == ProcessType.NORMAL )
{
//step7
processMessageBytes(itemNode, bytes);
}
return true;
}
return false;
}
//对应step7处代码,会回调我们的业务代码
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
{
ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL;
MultiItem<T> items;
try
{
// 根据我们定义的序列化器序列化
items = ItemSerializer.deserialize(bytes, serializer);
}
for(;;)
{
// 省略一部分代码
try
{
consumer.consumeMessage(item); // 这里就会回调到我们的业务代码
}
}
return resultCode;
}
总结
- org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode这个方法也证实了确实是持久化且有序的节点;
- 如果过期时间太长而数据生产的过于频繁的话,那么势必会造成数据的积压对于性能和内存都是很大的考验;
- 而且是客户端不断的循环获取所有的节点、排序、再处理,由此我们也证明了前面猜想是排序后在服务端重新添加所有节点每次监听第一个节点变化的想法看来是错误的;

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
图解kubernetes批处理Job控制器的关键设计
K8s中的批处理任务模块主要是由Job控制器完成,今天我们就来关注下其底层的关键设计,包括完成状态、并行模式、并行策略等关键机制 1. 基础概念 在聊k8s的任务模块的实现的时候,我们先看一下传统的任务系统的设计与实现,然后聊下基于k8s的基础的概念 1.1 传统的任务系统设计 传统的任务系统设计主要可以分为master(任务分配/故障感知/负载均衡)、Worker(任务执行/任务监控/任务管理)、分布式协调(etcd等存储元数据)、任务仓库(存储任务的实现比如类或者接口)等几部分, 从大的部分又可以切分为两个部分管控端(分布式协调/master/仓库)、执行端(Worker),传统的任务系统大概就是这样 通常复杂的就是如何在master如何做任务的负载均衡、任务的快速完成、依赖等管控功能,其次就是如何在worker端实现一个牛x的引擎,可以支持各种不同任务的执行环境和类型的执行 1.2 基于Pod的任务载体 k8s中的最小单元调度是Pod,同样的job控制器调度的最小单元也是Pod, Pod里面包含容器,以容器为载体k8s屏蔽了传统worker模块的任务执行环境与实现两个部分,只需要...
-
下一篇
x-easypdf v1.1.0 发布
x-easypdf基于pdfbox构建而来,极大降低使用门槛,以组件化的形式进行pdf的构建。简单、易用,3分钟即可上手,人人都是pdf的构建高手 本次更新如下: 1. 由于与现在软件名重名,故变更项目名(原为xpdf) 2. 统一源文件前缀为XEasyPdf 3. 新增水印组件 4. 新增图片组件 5. 新增模板填充 由于表格组件遇到些许问题,本次更新暂不提供,将在下个版本进行更新,感谢大家支持,谢谢
相关文章
文章评论
共有0条评论来说两句吧...