谈反应式编程在服务端中的应用,数据库操作优化,从20秒到0.5秒
反应式编程在客户端编程当中的应用相当广泛,而当前在服务端中的应用相对被提及较少。本篇将介绍如何在服务端编程中应用响应时编程来改进数据库操作的性能。
开篇就是结论
利用 System.Reactive 配合 TaskCompelteSource ,可以将分散的单次数据库插入请求合并会一个批量插入的请求。在确保正确性的前提下,实现数据库插入性能的优化。
如果读者已经了解了如何操作,那么剩下的内容就不需要再看了。
预设条件
现在,我们假设存在这样一个 Repository 接口来表示一次数据库的插入操作。
namespace Newbe.RxWorld.DatabaseRepository { public interface IDatabaseRepository { /// <summary> /// Insert one item and return total count of data in database /// </summary> /// <param name="item"></param> /// <returns></returns> Task<int> InsertData(int item); } }
接下来,我们在不改变该接口签名的前提下,体验一下不同的实现带来的性能区别。
基础版本
首先是基础版本,采用的是最为常规的单次数据库INSERT
操作来完成数据的插入。本示例采用的是SQLite
作为演示数据库,方便读者自行实验。
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class NormalDatabaseRepository : IDatabaseRepository { private readonly IDatabase _database; public NormalDatabaseRepository( IDatabase database) { _database = database; } public Task<int> InsertData(int item) { return _database.InsertOne(item); } } }
常规操作。其中_database.InsertOne(item)
的具体实现就是调用了一次INSERT
。
基础版本在同时插入小于20次时基本上可以较快的完成。但是如果数量级增加,例如需要同时插入一万条数据库,将会花费约20秒钟,存在很大的优化空间。
TaskCompelteSource
TaskCompelteSource 是 TPL 库中一个可以生成一个可操作 Task 的类型。对于 TaskCompelteSource 不太熟悉的读者可以通过该实例代码了解。
此处也简单解释一下该对象的作用,以便读者可以继续阅读。
对于熟悉 javascript 的朋友,可以认为 TaskCompelteSource 相当于 Promise 对象。也可以相当于 jQuery 当中的 $.Deferred 。
如果都不了解的朋友,可以听一下笔者吃麻辣烫时想到的生活化例子。
吃麻辣烫 | 技术解释 |
---|---|
吃麻辣烫之前,需要先用盘子夹菜。 | 构造参数 |
夹好菜之后,拿到结账处去结账 | 调用方法 |
收银员结账完毕之后,会得到一个叫餐牌,会响铃的那种 | 得到一个 Task 返回值 |
拿着菜牌找了一个位子坐下,玩手机等餐 | 正在 await 这个 Task ,CPU转而处理其他事情 |
餐牌响了,去取餐,吃起来 | Task 完成,await 节数,继续执行下一行代码 |
那么 TaskCompelteSource 在哪儿呢?
首先,根据上面的例子,在餐牌响的时候,我们才会去取餐。那么餐牌什么时候才会响呢?当然是服务员手动按了一个在柜台的手动开关才触发了这个响铃。
那么,柜台的这个开关,可以被技术解释为 TaskCompelteSource 。
餐台开关可以控制餐牌的响铃。同样, TaskCompelteSource 就是一种可以控制 Task 的状态的对象。
解决思路
有了前面对 TaskCompelteSource 的了解,那么接下来就可以解决文章开头的问题了。思路如下:
当调用 InsertData 时,可以创建一个 TaskCompelteSource 以及 item 的元组。为了方便说明,我们将这个元组命名为BatchItem
。
将 BatchItem 的 TaskCompelteSource 对应的 Task 返回出去。
调用 InsertData 的代码会 await 返回的 Task,因此只要不操作 TaskCompelteSource ,调用者就一会一直等待。
然后,另外启动一个线程,定时将 BatchItem 队列消费掉。
这样就完成了单次插入变为批量插入的操作。
笔者可能解释的不太清楚,不过以下所有版本的代码均基于以上思路。读者可以结合文字和代码进行理解。
ConcurrentQueue 版本
基于以上的思路,我们采用 ConcurrentQueue 作为 BatchItem 队列进行实现,代码如下(代码很多,不必纠结,因为下面还有更简单的):
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class ConcurrentQueueDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly ConcurrentQueue<BatchItem> _queue; // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable private readonly Task _batchInsertDataTask; public ConcurrentQueueDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _queue = new ConcurrentQueue<BatchItem>(); // 启动一个 Task 消费队列中的 BatchItem _batchInsertDataTask = Task.Factory.StartNew(RunBatchInsert, TaskCreationOptions.LongRunning); _batchInsertDataTask.ConfigureAwait(false); } public Task<int> InsertData(int item) { // 生成 BatchItem ,将对象放入队列。返回 Task 出去 var taskCompletionSource = new TaskCompletionSource<int>(); _queue.Enqueue(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 从队列中不断获取 BatchItem ,并且一批一批插入数据库,更新 TaskCompletionSource 的状态 private void RunBatchInsert() { foreach (var batchItems in GetBatches()) { try { BatchInsertData(batchItems).Wait(); } catch (Exception e) { _testOutputHelper.WriteLine($"there is an error : {e}"); } } IEnumerable<IList<BatchItem>> GetBatches() { var sleepTime = TimeSpan.FromMilliseconds(50); while (true) { const int maxCount = 100; var oneBatchItems = GetWaitingItems() .Take(maxCount) .ToList(); if (oneBatchItems.Any()) { yield return oneBatchItems; } else { Thread.Sleep(sleepTime); } } IEnumerable<BatchItem> GetWaitingItems() { while (_queue.TryDequeue(out var item)) { yield return item; } } } } private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { // 调用数据库的批量插入操作 var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item)); foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetResult(totalCount); } } catch (Exception e) { foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetException(e); } throw; } } private struct BatchItem { public TaskCompletionSource<int> TaskCompletionSource { get; set; } public int Item { get; set; } } } }
以上代码中使用了较多的 Local Function 和 IEnumerable 的特性,不了解的读者可以点击此处进行了解。
正片开始!
接下来我们使用 System.Reactive 来改造上面较为复杂的 ConcurrentQueue 版本。如下:
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class AutoBatchDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly Subject<BatchItem> _subject; public AutoBatchDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _subject = new Subject<BatchItem>(); // 将请求进行分组,每50毫秒一组或者每100个一组 _subject.Buffer(TimeSpan.FromMilliseconds(50), 100) .Where(x => x.Count > 0) // 将每组数据调用批量插入,写入数据库 .Select(list => Observable.FromAsync(() => BatchInsertData(list))) .Concat() .Subscribe(); } // 这里和前面对比没有变化 public Task<int> InsertData(int item) { var taskCompletionSource = new TaskCompletionSource<int>(); _subject.OnNext(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 这段和前面也完全一样,没有变化 private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item)); foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetResult(totalCount); } } catch (Exception e) { foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetException(e); } throw; } } private struct BatchItem { public TaskCompletionSource<int> TaskCompletionSource { get; set; } public int Item { get; set; } } } }
代码减少了 50 行,主要原因就是使用 System.Reactive 中提供的很强力的 Buffer 方法实现了 ConcurrentQueue 版本中的复杂的逻辑实现。
老师,可以更给力一点吗?
我们,可以“稍微”优化一下代码,将 Buffer 以及相关的逻辑独立于“数据库插入”这个业务逻辑。那么我们就会得到一个更加简单的版本:
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class FinalDatabaseRepository : IDatabaseRepository { private readonly IBatchOperator<int, int> _batchOperator; public FinalDatabaseRepository( IDatabase database) { var options = new BatchOperatorOptions<int, int> { BufferTime = TimeSpan.FromMilliseconds(50), BufferCount = 100, DoManyFunc = database.InsertMany, }; _batchOperator = new BatchOperator<int, int>(options); } public Task<int> InsertData(int item) { return _batchOperator.CreateTask(item); } } }
其中 IBatchOperator 等代码,读者可以到代码库中进行查看,此处就不在陈列了。
性能测试
基本可以测定如下:
在 10 条数据并发操作时,原始版本和批量版本没有多大区别。甚至批量版本在数量少时会更慢,毕竟其中存在一个最大 50 毫秒的等待时间。
但是,如果需要批量操作并发操作一万条数据,那么原始版本可能需要消耗20秒,而批量版本仅仅只需要0.5秒。
所有的示例代码均可以在代码库中找到。如果 Github Clone 存在困难,也可以点击此处从 Gitee 进行 Clone
最后但是最重要!
最近作者正在构建以反应式
、Actor模式
和事件溯源
为理论基础的一套服务端开发框架。希望为开发者提供能够便于开发出“分布式”、“可水平扩展”、“可测试性高”的应用系统——Newbe.Claptrap
本篇文章是该框架的一篇技术选文,属于技术构成的一部分。如果读者对该内容感兴趣,欢迎转发、评论、收藏文章以及项目。您的支持是促进项目成功的关键。
当前项目已经快要发布 0.1 alpha 版本,欢迎参与讨论。
GitHub 项目地址:https://github.com/newbe36524/Newbe.Claptrap
Gitee 项目地址:https://gitee.com/yks/Newbe.Claptrap
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【JVM】如何理解强引用、软引用、弱引用、虚引用?
【JVM】如何理解强引用、软引用、弱引用、虚引用?整体架构 强引用强引用是默认支持,当内存不足的时候,JVM开始垃圾回收,对于强引用的对象,就算是出现了OOM也不会回收对象。 强引用是最常见的普通对象引用,只要还有强引用指向对象,对象就存活,垃圾回收器不会处理存活对象。一般把一个对象赋给一个引用变量,这个引用变量就是强引用。当一个对象被强引用变量所引用,它就处于可达状态,是不会被垃圾回收的,即使之后都不会再用到了,也不会回收。因此强引用是造成Java内存泄漏的主要原因之一。 关于Java内存泄漏的详细内容,可以参考这篇博客:https://blog.csdn.net/m0_38110132/article/details/81986334。 对于一个普通对象,如果没有其他引用关系,只要超过了引用的作用域或者显式地将相应的强引用赋值为null,一般认为就是可以被垃圾回收了。(具体的回收时机看垃圾回收策略) 下例中,b就是强引用。 1 public static void main(String[] args) {2 Object a = new Object();3 Object b =...
- 下一篇
基于 Serverless 架构的编程学习小工具
之前我做过一个在线编程的软件,目前用户量大概有几十万,通过这个 App 不仅仅可以进行代码的编写、运行还可以进行编程的学习。自己一直对 Serverless 架构情有独钟,恰好赶到我的这个 App 学习板块被很多人吐槽难用,索性就对这个学习板块进行重构,并且打算在重构的时候,直接将这个学习板块搬上 Serverless 架构。 本文作者 Anycodes 基于 Serverless 架构重构是出于两个方面考虑 —— 一是 Serverless 架构能让个人开发者的运维工作变得简单,尤其是不用操心服务器,也不用关心流量洪峰(当然,对于我的个人项目而言,也没太多的洪峰),二是 Serverless 架构的按量付费,极大节约了成本。 整体设计 数据库设计 这个部分在之前是若干个大模块,现在统一整理到一个模块中进行项目重构,所以这里继续复用之前的数据库: 在这个数据库中,四个模块分别是:新闻文章、开发文档、基础教程以及图书资源。其中开发文档包括大分类,子列表以及正文等内容,这里表关联并没有使用外键,而是直接用的 ID 进行表之间的关联。 说实话,这个数据库设计的并不是很好,原因是因为初次构建这...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS6,CentOS7官方镜像安装Oracle11G
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- 设置Eclipse缩进为4个空格,增强代码规范
- Mario游戏-低调大师作品
- MySQL8.0.19开启GTID主从同步CentOS8