Databend query result cache 设计与实现
Databend 在 1.0 中支持了对查询结果集的缓存,大大提高了多次相同查询返回结果的效率。
Query result cache 主要用于处理数据更新频率不高的查询,它通过缓存第一次查询返回的结果集,以便在之后对相同数据执行相同查询时能够立即返回结果,从而提高查询效率。
比如我们有个需求是每隔十秒获取一次销量前 5 的产品,通过以下 sql 执行查询:
SELECT product, count(product) AS sales_count FROM sales_log GROUP BY product ORDER BY sales_count DESC LIMIT 5;
在没有 cache 的情况下,每次都需要执行完整的 sql 查询流程,而整个流程可能耗时比较久,但结果仅仅返回5条数据。如果 sales_count 表中的数据更新频率不高,那么通过 cache 可以立即返回之后查询的结果,大大降低了等待时间和 Server 的负载。
整体设计
Query Result Cache 的生命周期
每个被缓存的结果集都会设置一个缓存失效时间(TTL),每次对相同缓存结果集的访问都会刷新失效时间,缓存的默认失效时间为 300 秒,可以通过设置 query_result_cache_ttl_secs
来修改。当失效时间到达后,缓存的结果集将不再可用。
除了 TTL 之外,如果底层数据(如 snapshot id、segment id、partition location)发生变化,缓存就会变得不准确。但是,这种底层数据的修改不会影响缓存的效果。如果仍然希望快速返回结果集,可以通过设置 SET query_result_cache_allow_inconsistent=1
来允许返回不一致的结果。如果您对 Databend 底层存储结构感兴趣,可以参考 Databend 存储概览
缓存结果存储
Databend 使用键值对来存储查询结果集,对于每一次查询, Databend 根据 query 信息构造一个对应的 key,然后将查询结果集的一些元信息构造成 value 存入到 meta service 中。
其中 Key 的生成规则为:
// 将 ast 序列化为 string,然后通过 hash 函数拿到对应的 hash 值 let ast_hash = sha256(formatted_ast); // 将 result cache 的前缀,当前租户和上面生成的 hash 值拼接,得到最终 key let key = format!("{RESULT_CACHE_PREFIX}/{tenant}/{ast_hash}");
Value 的结构如下(注意:value 中只存储对应结果集的元信息,真正的结果集会写到当前使用的 storage 中,比如 local fs, s3...):
struct ResultCacheValue { /// 原始查询 SQL. pub sql: String, /// 该次查询的 query_id pub query_id: String, /// 查询持续时间. pub query_time: u64, /// 缓存失效时间 pub ttl: u64, /// 结果集大小,单位:字节 pub result_size: usize, /// 结果集一共包含多少行数据 pub num_rows: usize, /// 查询命中的 partitions 的 hash 值,每个表一个 hash 值 pub partitions_shas: Vec<String>, /// 结果集缓存文件在底层存储中的地址 pub location: String, }
读取 cache
读 cache 流程比较简单,通过以下伪代码说明:
// 通过格式化之后的 ast 来生成查询语句对应的 key let key = gen_result_cache_key(formatted_ast); // 构建 cache reader let cache_reader = ResultCacheReader::create(ctx, key, meta_client, allow_inconsistent); // cache reader 首先从 meta service 中通过 key 得到对应的 ResultCacheValue // ResultCacheValue 的结构见之前的代码段 let value = cache_reader.get(key) // 如果可以容忍不一致,或者查询覆盖的 partitions 的 hash 值相同 // 就会通过 location 去底层存储读取缓存结果集,然后返回。 if allow_inconsistent || value.partitions_shas == ctx.partitions_shas { read_result_from_cache(&value.location) }
写入 cache
┌─────────┐ 1 ┌─────────┐ 1 │ ├───►│ ├───►Dummy───►Downstream Upstream──►│Duplicate│ 2 │ │ 3 │ ├───►│ ├───►Dummy───►Downstream └─────────┘ │ │ │ Shuffle │ ┌─────────┐ 3 │ │ 2 ┌─────────┐ │ ├───►│ ├───►│ Write │ Upstream──►│Duplicate│ 4 │ │ 4 │ Result │ │ ├───►│ ├───►│ Cache │ └─────────┘ └─────────┘ └─────────┘
写 cache 的主要流程如上图所示,当一个查询执行没有命中 cache 时,就会触发写 cache 流程。
Databend 使用 pipeline 方式调度和处理读写任务,通常的 pipeline 流程是 source -> transform -> transform .. -> sink
, 写 cache 会增加一个 sink 出口,因此需要首先并行的加一条管道来复制上游数据 (图中 duplicate 部分)
而由于 pipeline 中前置节点的 output port
和后置节点的 input port
是一一对应的,所以这里我们通过 shuffle 来重排序,以此来衔接前后处理节点。
注意事项
如果 query 中使用了不确定性的函数,比如 now()
, rand()
, uuid()
,那么结果集将不会被 cache,另外 system 下的表也不会被 cache。
另外目前结果集最大缓存 1MiB 的数据,可以通过设置 query_result_cache_max_bytes
来调整允许 cache 的大小。
使用方式
相关设置
// 进行如下设置开启 query result cache, // 后续 databend 将会默认打开这个设置 SET enable_query_result_cache=1; // 进行如下设置来容忍不准确的结果 SET query_result_cache_allow_inconsistent=1;
测试 cache 是否生效
SET enable_query_result_cache=1; SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +---------------------+-------------+------+----------------+----------------------+ | watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) | +---------------------+-------------+------+----------------+----------------------+ | 6655575552203051303 | 1611957945 | 2 | 0 | 1638.0 | | 8566928176839891583 | -1402644643 | 2 | 0 | 1368.0 | | 7904046282518428963 | 1509330109 | 2 | 0 | 1368.0 | | 7224410078130478461 | -776509581 | 2 | 0 | 1368.0 | | 5957995970499767542 | 1311505962 | 1 | 0 | 1368.0 | | 5295730445754781367 | 1398621605 | 1 | 0 | 1917.0 | | 8635802783983293129 | 900266514 | 1 | 1 | 1638.0 | | 5650467702003458413 | 1358200733 | 1 | 0 | 1368.0 | | 6470882100682188891 | -1911689457 | 1 | 0 | 1996.0 | | 6475474889432602205 | 1501294204 | 1 | 0 | 1368.0 | +---------------------+-------------+------+----------------+----------------------+ 10 rows in set (3.255 sec) SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +---------------------+-------------+------+----------------+----------------------+ | watchid | clientip | c | sum(isrefresh) | avg(resolutionwidth) | +---------------------+-------------+------+----------------+----------------------+ | 6655575552203051303 | 1611957945 | 2 | 0 | 1638.0 | | 8566928176839891583 | -1402644643 | 2 | 0 | 1368.0 | | 7904046282518428963 | 1509330109 | 2 | 0 | 1368.0 | | 7224410078130478461 | -776509581 | 2 | 0 | 1368.0 | | 5957995970499767542 | 1311505962 | 1 | 0 | 1368.0 | | 5295730445754781367 | 1398621605 | 1 | 0 | 1917.0 | | 8635802783983293129 | 900266514 | 1 | 1 | 1638.0 | | 5650467702003458413 | 1358200733 | 1 | 0 | 1368.0 | | 6470882100682188891 | -1911689457 | 1 | 0 | 1996.0 | | 6475474889432602205 | 1501294204 | 1 | 0 | 1368.0 | +---------------------+-------------+------+----------------+----------------------+ 10 rows in set (0.066 sec)
可以看到,相同的查询,第二次的结果是立即返回的。
RESULT_SCAN
Query result cache 同时提供了 RESULT_SCAN
的 table function,在同一个 session 中,可以快速根据 query_id 来拿到之前查询的结果,使用方式可以参考文档。
另外用户可以通过 SELECT * from system.query_cache
来获取当前租户下被 cache 所有结果集的元信息,包括
sql | 结果集对应的原始 sql |
---|---|
query_id | 结果集对应的 query id |
result_size | 缓存结果集的大小 |
num_rows | 缓存结果集的行数 |
partitions_sha | 查询对应 partitions 的 hash 值 |
location | 缓存结果集在存储中的地址 |
active_result_scan | 为 true 表示可以被 result_scan 使用 |
未来规划
- 缓存数据清理:当前缓存的结果集在 TTL 到期后不可用,但是底层数据并未被清理,未来可以有个定时任务去清理过期数据
- 对缓存结果进行压缩,进一步节省空间
- 对复合 SQL 进行结果集缓存,比如:(
INSERT INTO xxx SELECT ...
,COPY FROM SELECT
)
对以上改进感兴趣的同学欢迎为 Databend 添砖加瓦。
致谢
Databend 结果集缓存的设计参考了 ClickHouse 和 Snowflake,如果想进一步跟进 query result cache 的细节,请参考以下链接:
- Databend Query Result Cache RFC
- Query Cache in ClickHouse
- ClickHouse query cache blog
- Snowflake RESULT_SCAN function
- Tuning the Result Cache in Oracle
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
Databend Cloud:https://databend.cn
Databend 文档:https://databend.rs/
Wechat:Databend

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
如何通过DAS连接GaussDB
文章目录 1 实验介绍 2 实验目的 3 配置DAS服务 4 SQL使用入门 1 实验介绍 本实验主要描述如何通过华为云数据管理服务 (Data Admin Service,简称DAS) 来连接华为云GaussDB数据库实例,DAS是一款专业的简化数据库管理工具,提供优质的可视化操作界面,大幅提高工作效率,让数据管理变得既安全又简单。 DAS连接数据库,无需使用IP地址,易用、安全、高级、智能。 2 实验目的 掌握DAS连接GaussDB数据库实例。 3 配置DAS服务 步骤 1进入DAS服务。 在服务列表,选择数据库中的数据库管理服务DAS。 步骤 2设置DAS连接服务。 选择“进入开发工具”。 单击“新增数据库实例登录”,具体如下: 在此设置页面,“数据库引擎”选择GaussDB ,然后在“数据库来源”中就会出现前面安装好了的GaussDB数据库实例,接着选中想要连接的实例。 设置登录用户名及密码,然后先测试下连接,测试成功后会提示“连接成功”,勾选“记住密码”、打开“定时采集”,设置完成后单击“确定”。 在这里插入图片描述 新增完成,通过单击操作中的“登录”可以进入到相应的数据库...
- 下一篇
提升研发交付速率,从正确的指标管理开始
每当提及「研发效能」,我们都在谈论什么? 研发效能管理要在保证质量的前提下,思考如何更快地向客户交付价值。在管理实践中,效能度量涉及三大维度:交付速率、交付质量、交付价值。 技术团队对内如何优化开发流程,以提升交付速率和质量?对外如何围绕价值交付,与产品、业务侧同事开展紧密高效的研发协作?在众多急需攻破的效能难题中,Cycle Time 都是极为关键的速率管理发力点。 01 是什么 Cycle Time? Cycle Time 原是精益生产的专业术语,描述了某个工序制造一单位产品或某过程完成一个工作循环所需的平均完整时间,可以确定机器或工序的生产能力和效率。 在软件研发中,Cycle Time 是指技术团队从头到尾完成一单位研发工作平均需要的时间,即研发工作从进入开发到发布上线所经历的平均时间。 02 为什么应该关注 Cycle Time? Cycle Time 是反映技术团队工作速率的结果度量指标,可以帮助团队识别障碍、有的放矢地优化改进并实现更快更好的价值交付。 更快地响应。 缩短 Cycle Time 的本质是更快地向客户交付价值,响应变化。 识别障碍和待改进空间。 跟踪对比多项...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- Red5直播服务器,属于Java语言的直播服务器
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7