每日一博 | 谈反应式编程在服务端中的应用
反应式编程在客户端编程当中的应用相当广泛,而当前在服务端中的应用相对被提及较少。本篇将介绍如何在服务端编程中应用响应时编程来改进数据库操作的性能。
开篇就是结论
利用 System.Reactive 配合 TaskCompelteSource ,可以将分散的单次数据库插入请求合并会一个批量插入的请求。在确保正确性的前提下,实现数据库插入性能的优化。
如果读者已经了解了如何操作,那么剩下的内容就不需要再看了。
预设条件
现在,我们假设存在这样一个 Repository 接口来表示一次数据库的插入操作。
namespace Newbe.RxWorld.DatabaseRepository { public interface IDatabaseRepository { /// <summary> /// Insert one item and return total count of data in database /// </summary> /// <param name="item"></param> /// <returns></returns> Task<int> InsertData(int item); } }
接下来,我们在不改变该接口签名的前提下,体验一下不同的实现带来的性能区别。
基础版本
首先是基础版本,采用的是最为常规的单次数据库INSERT
操作来完成数据的插入。本示例采用的是SQLite
作为演示数据库,方便读者自行实验。
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class NormalDatabaseRepository : IDatabaseRepository { private readonly IDatabase _database; public NormalDatabaseRepository( IDatabase database) { _database = database; } public Task<int> InsertData(int item) { return _database.InsertOne(item); } } }
常规操作。其中_database.InsertOne(item)
的具体实现就是调用了一次INSERT
。
基础版本在同时插入小于20次时基本上可以较快的完成。但是如果数量级增加,例如需要同时插入一万条数据库,将会花费约20秒钟,存在很大的优化空间。
TaskCompelteSource
TaskCompelteSource 是 TPL 库中一个可以生成一个可操作 Task 的类型。对于 TaskCompelteSource 不太熟悉的读者可以通过该实例代码了解。
此处也简单解释一下该对象的作用,以便读者可以继续阅读。
对于熟悉 javascript 的朋友,可以认为 TaskCompelteSource 相当于 Promise 对象。也可以相当于 jQuery 当中的 $.Deferred 。
如果都不了解的朋友,可以听一下笔者吃麻辣烫时想到的生活化例子。
吃麻辣烫 | 技术解释 |
---|---|
吃麻辣烫之前,需要先用盘子夹菜。 | 构造参数 |
夹好菜之后,拿到结账处去结账 | 调用方法 |
收银员结账完毕之后,会得到一个叫餐牌,会响铃的那种 | 得到一个 Task 返回值 |
拿着菜牌找了一个位子坐下,玩手机等餐 | 正在 await 这个 Task ,CPU转而处理其他事情 |
餐牌响了,去取餐,吃起来 | Task 完成,await 节数,继续执行下一行代码 |
那么 TaskCompelteSource 在哪儿呢?
首先,根据上面的例子,在餐牌响的时候,我们才会去取餐。那么餐牌什么时候才会响呢?当然是服务员手动按了一个在柜台的手动开关才触发了这个响铃。
那么,柜台的这个开关,可以被技术解释为 TaskCompelteSource 。
餐台开关可以控制餐牌的响铃。同样, TaskCompelteSource 就是一种可以控制 Task 的状态的对象。
解决思路
有了前面对 TaskCompelteSource 的了解,那么接下来就可以解决文章开头的问题了。思路如下:
当调用 InsertData 时,可以创建一个 TaskCompelteSource 以及 item 的元组。为了方便说明,我们将这个元组命名为BatchItem
。
将 BatchItem 的 TaskCompelteSource 对应的 Task 返回出去。
调用 InsertData 的代码会 await 返回的 Task,因此只要不操作 TaskCompelteSource ,调用者就一会一直等待。
然后,另外启动一个线程,定时将 BatchItem 队列消费掉。
这样就完成了单次插入变为批量插入的操作。
笔者可能解释的不太清楚,不过以下所有版本的代码均基于以上思路。读者可以结合文字和代码进行理解。
ConcurrentQueue 版本
基于以上的思路,我们采用 ConcurrentQueue 作为 BatchItem 队列进行实现,代码如下(代码很多,不必纠结,因为下面还有更简单的):
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class ConcurrentQueueDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly ConcurrentQueue<BatchItem> _queue; // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable private readonly Task _batchInsertDataTask; public ConcurrentQueueDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _queue = new ConcurrentQueue<BatchItem>(); // 启动一个 Task 消费队列中的 BatchItem _batchInsertDataTask = Task.Factory.StartNew(RunBatchInsert, TaskCreationOptions.LongRunning); _batchInsertDataTask.ConfigureAwait(false); } public Task<int> InsertData(int item) { // 生成 BatchItem ,将对象放入队列。返回 Task 出去 var taskCompletionSource = new TaskCompletionSource<int>(); _queue.Enqueue(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 从队列中不断获取 BatchItem ,并且一批一批插入数据库,更新 TaskCompletionSource 的状态 private void RunBatchInsert() { foreach (var batchItems in GetBatches()) { try { BatchInsertData(batchItems).Wait(); } catch (Exception e) { _testOutputHelper.WriteLine($"there is an error : {e}"); } } IEnumerable<IList<BatchItem>> GetBatches() { var sleepTime = TimeSpan.FromMilliseconds(50); while (true) { const int maxCount = 100; var oneBatchItems = GetWaitingItems() .Take(maxCount) .ToList(); if (oneBatchItems.Any()) { yield return oneBatchItems; } else { Thread.Sleep(sleepTime); } } IEnumerable<BatchItem> GetWaitingItems() { while (_queue.TryDequeue(out var item)) { yield return item; } } } } private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { // 调用数据库的批量插入操作 var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item)); foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetResult(totalCount); } } catch (Exception e) { foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetException(e); } throw; } } private struct BatchItem { public TaskCompletionSource<int> TaskCompletionSource { get; set; } public int Item { get; set; } } } }
以上代码中使用了较多的 Local Function 和 IEnumerable 的特性,不了解的读者可以点击此处进行了解。
正片开始!
接下来我们使用 System.Reactive 来改造上面较为复杂的 ConcurrentQueue 版本。如下:
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class AutoBatchDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly Subject<BatchItem> _subject; public AutoBatchDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _subject = new Subject<BatchItem>(); // 将请求进行分组,每50毫秒一组或者每100个一组 _subject.Buffer(TimeSpan.FromMilliseconds(50), 100) .Where(x => x.Count > 0) // 将每组数据调用批量插入,写入数据库 .Select(list => Observable.FromAsync(() => BatchInsertData(list))) .Concat() .Subscribe(); } // 这里和前面对比没有变化 public Task<int> InsertData(int item) { var taskCompletionSource = new TaskCompletionSource<int>(); _subject.OnNext(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 这段和前面也完全一样,没有变化 private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item)); foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetResult(totalCount); } } catch (Exception e) { foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetException(e); } throw; } } private struct BatchItem { public TaskCompletionSource<int> TaskCompletionSource { get; set; } public int Item { get; set; } } } }
代码减少了 50 行,主要原因就是使用 System.Reactive 中提供的很强力的 Buffer 方法实现了 ConcurrentQueue 版本中的复杂的逻辑实现。
老师,可以更给力一点吗?
我们,可以“稍微”优化一下代码,将 Buffer 以及相关的逻辑独立于“数据库插入”这个业务逻辑。那么我们就会得到一个更加简单的版本:
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class FinalDatabaseRepository : IDatabaseRepository { private readonly IBatchOperator<int, int> _batchOperator; public FinalDatabaseRepository( IDatabase database) { var options = new BatchOperatorOptions<int, int> { BufferTime = TimeSpan.FromMilliseconds(50), BufferCount = 100, DoManyFunc = database.InsertMany, }; _batchOperator = new BatchOperator<int, int>(options); } public Task<int> InsertData(int item) { return _batchOperator.CreateTask(item); } } }
其中 IBatchOperator 等代码,读者可以到代码库中进行查看,此处就不在陈列了。
性能测试
基本可以测定如下:
在 10 条数据并发操作时,原始版本和批量版本没有多大区别。甚至批量版本在数量少时会更慢,毕竟其中存在一个最大 50 毫秒的等待时间。
但是,如果需要批量操作并发操作一万条数据,那么原始版本可能需要消耗20秒,而批量版本仅仅只需要0.5秒。
所有的示例代码均可以在代码库中找到。如果 Github Clone 存在困难,也可以点击此处从 Gitee 进行 Clone
最后但是最重要!
最近作者正在构建以反应式
、Actor模式
和事件溯源
为理论基础的一套服务端开发框架。希望为开发者提供能够便于开发出“分布式”、“可水平扩展”、“可测试性高”的应用系统——Newbe.Claptrap
本篇文章是该框架的一篇技术选文,属于技术构成的一部分。如果读者对该内容感兴趣,欢迎转发、评论、收藏文章以及项目。您的支持是促进项目成功的关键。
当前项目已经快要发布 0.1 alpha 版本,欢迎参与讨论。
GitHub 项目地址:https://github.com/newbe36524/Newbe.Claptrap
Gitee 项目地址:https://gitee.com/yks/Newbe.Claptrap
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
DataGear 1.8.1 发布,看板支持异步加载图表和参数传入
DataGear 1.8.1发布,看板支持异步加载图表和参数传入, 具体更新内容如下: 新增:看板展示页面新增异步加载图表API; 新增:图表、看板展示页面新增参数传入支持; 修复:修复导入数据功能报“readonly connection”的BUG; 修复:修复某些功能查询列表在用户没有设置昵称时创建用户列显示为空的BUG; 改进:看板页面图表插件改为从单独的chartPluginManager.js内引入; 改进:数据集、图表、看板查询列表的可搜索列加粗显示; 改进:改进新版本检测策略,改为每12小时检测一次,避免每次刷新主页都检测; DataGear是一款数据管理与可视化分析平台,使用Java语言开发,采用浏览器/服务器架构,支持多种数据库, 主要功能包括数据管理、SQL工作台、数据导入/导出、数据集管理、图表管理、看板管理等。 官网地址:http://www.datagear.tech
- 下一篇
Firefox 78 放弃对多个 macOS 版本的支持
Firefox 77已面向所有用户推出。而随着Mozilla开发节奏的加快,预计Firefox78也将在本月底进行发布。目前Firefox78已经处于 beta 阶段,Windows、Linux 和 macOS 上的用户可提前试用。 Firefox 78 预计将对 Linux 和 macOS 进行重大更改,并将在 Windows 上引入一些备受期待的新功能,具体内容可查看更新说明。 Mozilla方面透露,其已在Firefox 78中更新了最低的 Linux 系统要求。因此从此新版本开始,浏览器在 Linux 发行版上需要GNU libc 2.17、libstdc ++ 4.8.1 和 GTK + 3.14 或更高版本。 另外值得关注的一方面是,对macOS 用户来说,Firefox 78 将放弃对较早版本操作系统的支持,如果用户希望继续接收更新和安全补丁,建议切换到 ESR 版本。换句话说,如果用户继续运行不受支持的操作系统版本的话,则 Firefox 78 将是其获得的最后一个主要更新,Firefox 79 将不再支持该操作系统。 受影响的三个macOS版本分别为: macOSve...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- CentOS8编译安装MySQL8.0.19
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- 2048小游戏-低调大师作品
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2整合Redis,开启缓存,提高访问速度