首页 文章 精选 留言 我的

精选列表

搜索[数据库],共10000篇文章
优秀的个人博客,低调大师

干货丨时序数据库DolphinDB代码模块复用教程

在软件团队开发项目中,要提升开发效率和质量,代码必然要进行封装和重用。在使用DolphinDB的脚本进行开发时,可以使用module和use方法,来声明和使用可重用模块。 1. 模块介绍 在DolphinDB中,模块是指只包含函数定义的代码包。它具有以下特点: 以.dos作为模块文件的后缀,dos是DolphinDB script的缩写 模块文件保存在DolphinDB节点的[home]/modules目录下 模块文件第一行以声明模块语句module moduleName开头 模块文件内容仅包含函数定义 2. 定义模块 2.1 创建模块目录 默认情况下,所有的模块定义在[home]/modules目录下,[home]由系统配置参数home决定,可以通过getHomeDir()函数获取。比如DolphinDB节点的home目录为/root/DolphinDB/server,那么我们需要在该目录下创建modules子目录来保存模块文件,最终模块目录为/home/root/DolphinDB/server/modules。 2.2 创建模块文件 在modules目录下创建以.dos为后缀的模块文件,比如FileLog.dos。模块文件的第一行必须是模块声明语句。模块声明语句的语法如下: module moduleName moduleName必须与模块文件的名称一致,比如在FileLog.dos中声明模块: module FileLog 声明模块后,我们可以开始编写模块代码。例如,FileLog.dos的内容如下: module FileLog //向指定日志文件写入日志 def appendLog(filePath, logText){ f = file(filePath,"a+") f.writeLine(string(now()) + " : " + logText) f.close() } 在模块文件中,仅允许封装函数定义,其他非函数定义的代码将被忽略。 3. 导入模块 在DolphinDB中,使用use关键字来导入一个模块。注意,use关键字导入的模块是会话隔离的,仅对当前会话有效。导入模块后,我们可以通过以下两种方式来使用模块内的自定义函数: (1)直接使用模块中的函数: use FileLog appendLog("mylog.txt", "test my log") (2)通过完整路径来调用模块中的函数: use FileLog FileLog::appendLog("mylog.txt", "test my log") 4. 规划模块 DolphinDB database引入了命名空间的概念,支持对模块进行分类和规划。 4.1 声明模块命名空间 如果我们需要对模块进行分类,可以通过多级路径为规划模块的命名空间。例如,现有两个模块FileLog和DateUtil,它们的存放路径分别为modules/system/log/FileLog.dos和modules/system/temperal/DateUtil.dos,那么这两个模块相应的声明语句如下: modules/system/log/FileLog.dos module system::log::FileLog modules/system/temperal/DateUtil.dos module system::temperal::DateUtil 4.2 调用命名空间模块 我们可以在use关键字后加完整路径来导入命名空间下的模块。例如,导入FileLog模块: use system::log::FileLog //全路径调用 system::log::FileLog::appendLog("mylog.txt", "test my log") //直接调用已导入模块中的函数 appendLog("mylog.txt", "test my log") 5. 在GUI中远程调试模块 当工作机和DolphinDB服务器不是同一台机器时,我们在工作机上编辑的模块代码,不能直接在远程服务器的DolphinDB上通过use导入,需要先将模块文件上传到[home]/modules的对应目录,才能通过use调用模块。 DolphinDB GUI从0.99.2版本开始提供了远程同步模块的功能,具体用法如下图所示: 此操作会将Modules目录下的所有文件和子目录同步到GUI连接的DolphinDB节点的[home]/modules目录下,同步完成后,就可以直接执行use导入模块。 6. 注意事项 6.1 同名函数定义规则 不同模块可以定义相同名字的函数。如果使用全路径调用函数,DolphinDB可以通过模块命名空间来区分函数名。如果直接调用函数: 如果已导入的模块中只有一个模块包含该函数,DolphinDB会调用该模块的函数。 如果已导入的模块中有多个模块包含该函数,DolphinDB解析脚本时会以下抛出异常: Modules [Module1] and [Module2] contain function [functionName]. Please use module name to qualify the function. 如果已导入模块中与自定义函数重名,系统会默认使用模块中的函数。如果要调用自定义函数,需要声明命名空间。自定义函数和内置函数的默认命名空间为根目录,用两个冒号表示。比如: //定义模块 module sys def myfunc(){ return 3 } //自定义函数 login("admin","123456") def myfunc(){ return 1 } addFunctionView(myfunc) //调用 use sys sys::myfunc() //调用模块的函数 myfunc() //调用模块的函数 ::myfunc() //调用自定义函数 如果已导入的模块中不包含该函数,DolphinDB会在系统内置函数中搜索该函数。如果内置函数中也没有该函数,将抛出函数为定义的异常。 6.2 刷新模块定义 在开发阶段调试模块代码时,如果需要反复修改模块代码并刷新定义,只需重新执行模块文件中的代码即可,这种方法仅对当前会话有效。 6.3 模块间的互相调用 模块之间可以单向引用,比如模块a引用模块b,模块b引用模块c,不支持交叉引用,比如模块a引用模块b,模块b引用模块a。

优秀的个人博客,低调大师

Redis 6.2-RC3 发布,高性能内存数据库

Redis 6.2-RC3 发布了,这是 6.2 系列的第 3 个候选者版本,与 RC2 相比有更完整的更新列表。 主要更新内容 新命令/参数 添加 HRANDFIELD 和 ZRANDMEMBER 命令 添加 FAILOVER 命令 添加 GETEX、GETDEL命令 SET 命令增加PXAT/EXAT 参数 FLUSHALL 和 FLUSHDB 命令增加 SYNC 参数,SCRIPT FLUSH命令增加ASYNC/SYNC 参数 Sentinel 向 Sentinel 添加主机名支持 防止文件描述符泄漏到 Sentinel 脚本中 修复配置文件行顺序依赖性和配置重写顺序 新的配置选项 添加 set-proc-title 配置选项以禁用对进程标题的更改 添加 proc-title-template 选项以控制进程标题中显示的内容 添加 lazyfree-lazy-user-flush 配置选项以控制 FLUSHALL、FLUSHDB 和 SCRIPT FLUSH Bug 修复 AOF:通过打开/关闭 appendonly 配置从上次写入错误中恢复 当 AOF fsync 策略为“总是”时,退出 fsync 错误 测试 arm64 CoW 错误时,避免断言(在较早的内核上) CONFIG REWRITE 应该接受 umask 设置 修复某些命令中的 firstkey、lastkey 和 step 详细内容请查看更新公告。

优秀的个人博客,低调大师

干货丨时序数据库DolphinDB横截面引擎教程

在处理实时流数据时,不仅需要按照时间做纵向聚合计算(时间序列聚合引擎),还需要对最新的数据做横向比较和计算,如金融里对所有股票的最新报价求百分位、工业物联网中计算一批设备的温度均值等。DolphinDB database 提供了横截面聚合引擎,可以对流数据中所有分组的最新数据做聚合运算。 横截面引擎的主体分为两部分:横截面数据表和计算引擎。横截面数据是横截面引擎的内部表,保存了所有分组最新的截面数据。计算引擎是一组聚合计算表达式以及触发器,系统会按照指定的方式触发聚合运算,计算结果会输出到另外一个表中。 1. 基本用法 在DolphinDB database中,通过createCrossSectionalAggregator创建横截面聚合引擎。它返回一个横截面数据表,保存了所有分组最新的截面数据,往这个表写入数据意味着这些数据进入横截面聚合引擎进行计算。具体用法如下: createCrossSectionalAggregator(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern="perBatch"], [triggeringInterval=1000]) name是一个字符串,表示横截面聚合引擎的名称,是横截面聚合引擎的唯一标识。它可以包含字母,数字和下划线,但必须以字母开头。 metrics是元代码。它可以是系统内置或用户自定义的函数,如<[sum(qty), avg(price)]>,可以对聚合结果使用表达式,如<[avg(price1)-avg(price2)]>,也可以对计算列进行聚合运算,如<[std(price1-price2)]>。详情可参考元编程。 dummyTable是表对象,它可以不包含数据,但它的结构必须与订阅的流数据表相同。 outputTable是表对象,用于保存计算结果。输出表的列数为metrics数量+1,第一列为TIMESTAMP类型,用于存放发生计算的时间戳,,其他列的数据类型必须与metrics返回结果的数据类型一致。 keyColumn是一个字符串,指定dummyTable的某列为横截面聚合引擎的key。keyColumn指定列中的每一个key对应表中的唯一一行。 triggeringPattern是一个字符串,表示触发计算的方式。它可以是以下取值: "perRow": 每插入一行数据触发一次计算 "perBatch": 每插入一次数据触发一次计算 "interval": 按一定的时间间隔触发计算 triggeringInterval是一个整数。只有当triggeringPattern的取值为interval时才生效,表示触发计算的时间间隔。默认值为1000毫秒。 2. 示例 下面通过一个例子说明横截面聚合引擎的应用。在金融交易中,往往需要实时了解所有股票最新的报价均值、最近一次成交量总和以及最近一次交易的交易量。DolphinDB的横截面聚合引擎结合流数据订阅功能可以方便地完成这些工作。 (1)创建实时交易表 股票的实时交易表trades,包含以下主要字段: sym:股票代码 time:时间 price:成交价 qty:成交量 每当交易发生时,实时数据会写入trades表。创建trades表的脚本如下: share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades (2)创建横截面聚合引擎 tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perRow) tradesCrossAggregator是横截面数据表,它按股票代码分组,每个股票有且仅有一行。当数据进入该表时,会计算每个股票的avg(price), sum(qty)和sum(price*qty)。每插入一条数据触发一次计算。 (3)横截面数据表订阅实时交易表 subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true) 通过流数据订阅功能,把实时数据写入横截面数据表。 (4)模拟数据产生 def writeData(n){ timev = 2000.10.08T01:01:01.001 + timestamp(1..n) symv = take(`A`B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } writeData(4); 查看实时交易表,共有4条数据。 select * from trades time sym price qty ----------------------- --- ----- --- 2000.10.08T01:01:01.002 A 102.1 60 2000.10.08T01:01:01.003 B 33.4 74 2000.10.08T01:01:01.004 A 73.6 82 2000.10.08T01:01:01.005 B 223 59 查看横截面数据表,里面保存了A、B两只股票最近的两笔交易记录。 select * from tradesCrossAggregator time sym price qty ----------------------- --- ----- --- 2000.10.08T01:01:01.004 A 73.6 82 2000.10.08T01:01:01.005 B 223 59 查看横截面引擎的输出表,由于横截面引擎采用了perRow每行触发计算的频率,所以每往横截面表写入一行数据,聚合引擎都会做一次计算,因此一共有4条记录。 select * from outputTable time avgPrice sumqty Total ----------------------- -------- ------ ------- 2019.07.08T10:04:41.731 102.1 60 6126 2019.07.08T10:04:41.732 67.75 134 8597.6 2019.07.08T10:04:41.732 53.5 156 8506.8 2019.07.08T10:04:41.732 148.3 141 19192.2 通过getAggregatorStat函数查看横截面引擎的状态。 getAggregatorStat().CrossSectionalAggregator name user status lastErrMsg numRows numMetrics metrics triggeringPattern triggeringInterval ------------------ ----- ------ ---------- ------- ---------- ------------------ ----------------- ------------------ CrossSectionalDemo guest OK 2 3 [ avg(price), su...perRow 1000 通过removeAggregator函数删除横截面引擎。 removeAggregator("CrossSectionalDemo") 3. 触发计算的几种方式 横截面引擎一共有三种触发计算的方式:perRow、perBatch和interval。上面的例子中采用的是每插入一行数据触发一次计算。下面介绍另外两种触发计算的方式。 perBatch perBatch参数表示每追加一批数据就触发一次写入,下例按perBatch模式启用横截面引擎,脚本一共生成12条记录,分三批写入,输出表中预期有3条记录。 share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `perBatch) subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true) def writeData(n){ timev = 2000.10.08T01:01:01.001 + timestamp(1..n) symv = take(`A`B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } //写入三批数据,预期会触发三次计算,输出三次聚合结果。 writeData(4); writeData(4); writeData(4); 查看横截面数据表。 select * from tradesCrossAggregator time sym price qty ----------------------- --- ----- --- 2000.10.08T01:01:01.002 A 73.6 82 2000.10.08T01:01:01.003 B 33.4 59 查看输出表。插入了三批数据,因此输出表中有3条记录。 select * from outputTable time avgPrice sumqty Total ----------------------- -------- ------ ------- 2019.07.08T10:14:54.446 148.3 141 19192.2 2019.07.08T10:14:54.446 148.3 141 19192.2 2019.07.08T10:14:54.446 148.3 141 19192.2 interval 当触发计算的方式为interval时,需要指定triggeringInterval,表示每隔triggeringInterval毫秒触发一次计算。下面的例子中,分6次写入12条记录,每次间隔500毫秒。设置横截面引擎每1000毫秒触发一次计算,预期最终输出3条记录。 share streamTable(10:0,`time`sym`price`qty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades outputTable = table(1:0, `time`avgPrice`sumqty`Total, [TIMESTAMP,DOUBLE,INT,DOUBLE]) tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, `sym, `interval,1000) subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true) def writeData(n){ timev = 2000.10.08T01:01:01.001 + timestamp(1..n) symv = take(`A`B, n) pricev = take(102.1 33.4 73.6 223,n) qtyv = take(60 74 82 59, n) insert into trades values(timev, symv, pricev,qtyv) } a = now() writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) writeData(2); sleep(500) b = now() select count(*) from outputTable 3 如果再次执行select count(*) from outputTable,会发现随着时间的推移,输出表的记录数会不断增长。这是因为在interval模式下,计算是按照现实时间定时触发,并不依赖于是否有新的数据进来。 4. 横截面数据表的独立使用 从上面的例子中可以看出,横截面表虽然是为聚合计算提供的一个中间数据表,但其实在很多场合还是能独立发挥作用的。比如我们需要定时刷新某只股票的最新交易价格,按照常规思路是从实时交易表中按代码筛选股票并拿出最后一条记录,而交易表的数据量是随着时间快速增长的,如果频繁做这样的查询,无论从系统的资源消耗还是从查询的效能来看都不是很好的做法。而横截面表永远只保存所有股票的最近一次交易数据,数据量是稳定的,对于这种定时轮询的场景非常合适。 如果要单独使用横截面表,需要在创建横截面引擎时,把metrics,outputTable这两个参数设置为空。 tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", , trades,, `sym, `perRow) 相关链接: 流数据教程 时间序列引擎教程 异常检测引擎教程

优秀的个人博客,低调大师

干货丨时序数据库流数据聚合引擎教程

流数据是指随时间持续增长的动态数据。互联网的运营数据和物联网的传感器数据都属于流数据的范畴。流数据的特性决定了它的数据集是动态变化的,传统的面向静态数据表的计算引擎无法胜任流数据领域的分析和计算任务,所以流数据场景需要专门的计算引擎来处理。 DolphinDB提供了灵活的面向流数据的聚合引擎,通过createStreamAggregator函数创建流数据聚合引擎,能够持续不断地对已有的流数据做聚合计算,并且将计算结果持续输出到指定数据表中。 1.聚合引擎应用框架 流聚合引擎本身是一个独立的计算引擎,只要向聚合引擎写入数据就可以触发计算,并将计算结果输出到目标表。而在流数据场景下,聚合引擎与流数据订阅功能(subscribeTable)配合,可以方便的将流数据持续的提供给聚合引擎。示例如下: tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, trades, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true) 通过subscribeTable函数订阅流数据表,每次有新数据进入就会按指定规则触发append!{tradesAggregators},把流数据持续输入到聚合引擎。 聚合引擎主要涉及到以下概念: 流数据表:DolphinDB为流数据提供了一种特定的表对象——streamTable,它提供流数据的发布功能,其他节点或APP可以通过subscribeTable函数订阅或消费流数据。 聚合引擎数据源:createStreamAggregator返回一个抽象表,往这个抽象表写入数据,意味着数据进入聚合引擎进行计算。 聚合表达式 :以元数据的格式提供一组处理流数据的聚合函数,类似如下格式<[sum(qty)]>,<[sum(qty),max(qty),avg(price)]>。聚合引擎支持使用系统内所有的聚合函数,也支持使用表达式来满足更复杂的场景,比如 <[avg(price1)-avg(price2)]>,<[std(price1-price2)]>这样的组合表达式。 数据窗口(windowSize) :指定每次计算时截取的流数据窗口长度。 计算周期(step): 指定进行计算的间隔。 2.数据窗口 每次对流数据进行聚合计算,必须截取一段数据。截取的数据称为数据窗口,其长度由参数windowSize决定。计算间隔由参数step决定。 数据窗口长度和计算间隔的单位都是由参数useSystemTime决定。流数据聚合计算场景有两种时间概念,第一种是数据的生成时间,通常以时间戳的格式记录于数据中,它可能采用天、分钟、秒、毫秒、纳秒等不同的精度;第二种是数据进入聚合引擎的时间,我们也称为系统时间,这个时间是由聚合引擎给数据打上的时间戳,取自聚合引擎所在服务器的系统时间,精度为毫秒。系统通过参数useSystemTime来确定数据窗口长度和计算间隔是以哪一个时间的精度为单位,当useSystemTime=true时以系统时间精度为单位,否则以数据生成时间精度为单位。 如果根据第一条数据进入系统的时间来构造数据窗口的边界,那么它一般会是不规整的时间。如果有很多组数据,并且每组都根据各自第一条数据进入系统的时间来构造数据窗口的边界,那么无法将各组在相同的数据窗口中进行对比。因此,系统会根据step的值对第一个数据窗口的边界值进行规整处理,并确定一个整型的规整尺度alignmentSize。具体的规整公式与时间精度、step有关: 当数据的时间精度为秒时,如DATETIME、SECOND类型,alignmentSize的取值如下: step alignmentSize 0~2 2 3~5 5 6~10 10 11~15 15 16~20 20 21~30 30 31~60 60 当数据时间精度为毫秒时,如TIMESTAMP、TIME类型,alignmentSize的取值如下: step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501~1000 1000 假设第一条数据时间的最小精度值为firstDataTime,那么第一个数据窗口的左边界最小精度经过规整后为 firstDataTime/alignmentSize*alignmentSize,其中/代表相除后取整。例如,第一条数据时间为 2018.10.08T01:01:01.365,则firstDataTime=365。若step=100,根据上表,alignmentSize=100,可得出规整后的第一个数据窗口左边界最小精度为365\100*100=300,因此规整后的第一个数据窗口的左边界为2018.10.08T01:01:01.300。 下面我们通过一个例子来详细说明系统是如何进行流数据计算的。输入流数据表包含time和qty两列,time精度为毫秒,根据设定的窗口对流数据进行持续sum(qty)计算。本示例的流数据表中使用的时间精度为毫秒,为了方便观察,模拟输入的数据流频率也设为每毫秒一条数据的频率。以下代码建立流数据表trades,设定聚合计算参数,并定义函数writeData向流数据表trades中写入模拟数据。 share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT]) tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, trades, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true) def writeData(n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) qtyv = take(1, n) insert into trades values(timev, qtyv) } 第一次操作:向流数据表trades中写入5条数据。 writeData(5) 查看流数据表: select * from trades time qty 2018.10.08T01:01:01.002 1 2018.10.08T01:01:01.003 1 2018.10.08T01:01:01.004 1 2018.10.08T01:01:01.005 1 2018.10.08T01:01:01.006 1 查看输出表: select * from outputTable time sumQty 2018.10.08T01:01:01.000 3 发生计算的时间是2018.10.08T01:01:01.000。可以看出,系统对首个数据的时间2018.10.08T01:01:01.002做了规整操作。 第二次操作:清空数据表,设置 windowSize=6,step=3,模拟写入10条数据: share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT]) tradesAggregator = createStreamAggregator(6, 3, <[sum(qty)]>, trades, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true) def writeData(n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) qtyv = take(1, n) insert into trades values(timev, qtyv) } writeData(10) 查看流数据表: select * from trades time qty 2018.10.08T01:01:01.002 1 2018.10.08T01:01:01.003 1 2018.10.08T01:01:01.004 1 2018.10.08T01:01:01.005 1 2018.10.08T01:01:01.006 1 2018.10.08T01:01:01.007 1 2018.10.08T01:01:01.008 1 2018.10.08T01:01:01.009 1 2018.10.08T01:01:01.010 1 2018.10.08T01:01:01.011 1 查看输出表: select * from outputTable time qty 2018.10.08T01:01:00.997 1 2018.10.08T01:01:01.000 4 2018.10.08T01:01:01.003 6 从这个结果也可以发现聚合引擎窗口计算的规则:窗口起始时间是以第一条数据时间规整后为准,窗口是以windowSize为大小,step为步长移动的。 下面根据三次计算的过程来解释聚合引擎是如何进行窗口数据的确定的。为方便阅读,对时间的描述中省略相同的2018.10.08T01:01:01部分,只列出毫秒部分。窗口的起始是第一个数据的时间002为基础进行对齐,时间对齐后为000,所以第一次触发计算的时间是000,根据windowSize=6,所以理论上窗口边界是从上一秒的997到002,最终第一次计算窗口中只包含了002一条记录,计算sum(qty)的结果是1;而第二次计算发生在000,根据windowSize=6,那么实际窗口大小是6毫秒(从000到005),实际窗口中包含了从002到005四个数据,计算结果为4;以此类推,第三次的计算窗口是从003到008,实际包含了6个数据,计算结果为6。 3.聚合表达式 在实际的应用中,通常要对流数据进行比较复杂的聚合计算,这对聚合引擎的表达式灵活性提出了较高的要求。DolphinDB聚合引擎支持使用复杂的表达式进行实时计算。 纵向聚合计算(按时间序列聚合): tradesAggregator = createStreamAggregator(6, 3, <sum(ofr)>, trades, outputTable, `time) 横向聚合计算(按维度聚合): tradesAggregator = createStreamAggregator(6, 3, <max(ofr)-min(ofr)>, trades, outputTable, `time) tradesAggregator = createStreamAggregator(6, 3, <max(ofr-bid)>, trades, outputTable, `time) 输出多个聚合结果: tradesAggregator = createStreamAggregator(6, 3, <[max((ofr-bid)/(ofr+bid)*2), min((ofr-bid)/(ofr+bid)*2)]>, trades, outputTable, `time) 多参数聚合函数的调用: 有些聚合函数会使用多个参数,例如 corr,percentile等。 tradesAggregator = createStreamAggregator(6, 3, <corr(ofr,bid)>, trades, outputTable, `time) tradesAggregator = createStreamAggregator(6, 3, <percentile(ofr-bid,99)/sum(ofr)>, trades, outputTable, `time) 调用自定义函数: def spread(x,y){ return abs(x-y)/(x+y)*2 } tradesAggregator = createStreamAggregator(6, 3, <spread(ofr, bid)>, trades, outputTable, `time) 注意:DolphinDB不支持聚合函数嵌套调用,比如若要在流数据引擎中计算sum(spread(ofr,bid)),系统会给出异常提示:Nested aggregated function is not allowed 4.流数据源 DolphinDB的聚合引擎使用流数据表(streamTable)来作为输入数据源,流数据表提供流式数据的发布功能,通过subscribeTable函数可以订阅流数据并触发数据处理流程,而聚合引擎就是处理数据的方式之一。 streamTable作为聚合引擎的数据源,它并不仅仅是简单的将原始数据灌入聚合引擎,通过subscribeTable函数,可以在数据进入聚合引擎之前对数据做初步清洗,下面的例子展示如何对流数据做初步过滤。 传感器采集电压和电流数据并实时上传作为流数据源,但是其中电压voltage<=0.02或电流electric==NULL的数据需要在进入聚合引擎之前过滤掉。 share streamTable(1000:0, `time`voltage`electric, [TIMESTAMP, DOUBLE, INT]) as trades outputTable = table(10000:0, `time`avgElectric, [TIMESTAMP, DOUBLE]) //模拟产生传感器数据 def writeData(blockNumber){ timev = 2018.10.08T01:01:01.001 + timestamp(1..blockNumber) vt = 1..blockNumber * 0.01 bidv = take([1,NULL,2], blockNumber) insert into trades values(timev, vt, bidv); } //自定义数据处理过程,msg即实时流入的数据 def dataPreHandle(aggrTable, msg){ //过滤 voltage<=0.02 或 electric==NULL的无效数据 t = select * from msg where voltage >0.02,not electric == NULL if(size(t)>0){ insert into aggrTable values(t.time,t.voltage,t.electric) } } tradesAggregator = createStreamAggregator(6, 3, <[avg(electric)]>, trades, outputTable, `time , false, , 2000) //订阅数据源时使用自定义的数据处理函数 subscribeTable(, "trades", "tradesAggregator", 0, dataPreHandle{tradesAggregator}, true) writeData(10) 从流数据源中可以看到有两个voltage<=0.02和三个electric==NULL的数据: select * from trades time voltage electric 2018.10.08T01:01:01.002 0.01 1 2018.10.08T01:01:01.003 0.02 2018.10.08T01:01:01.004 0.03 2 2018.10.08T01:01:01.005 0.04 1 2018.10.08T01:01:01.006 0.05 2018.10.08T01:01:01.007 0.06 2 2018.10.08T01:01:01.008 0.07 1 2018.10.08T01:01:01.009 0.08 2018.10.08T01:01:01.010 0.09 2 2018.10.08T01:01:01.011 0.1 1 查看输出表: select * from outputTable time avgElectric 2018.10.08T01:01:01.000 1.5 2018.10.08T01:01:01.003 1.5 从结果可以看到,voltage<=0.02或electric==NULL的数据已经被过滤了,所以第一个计算窗口没有数据,所以也没有聚合结果。 5.聚合引擎输出 聚合结果可以输出到新建或已存在的内存表,也可以输出到流数据表。内存表对数据操作上较为灵活,可以进行更新或删除操作;输出到流数据表的数据无法再做变动,但是可以通过流数据表将聚合结果再次发布。下面的例子展示如何将聚合结果表作为另一个聚合引擎的数据源。 本例从一个初始的流数据表trades里,通过聚合引擎tradesAggregator进行移动均值计算,并将结果输出到流数据表aggrOutput,再通过订阅aggrOutput表并关联聚合引擎SecondAggregator对计算结果求移动峰值。 share streamTable(1000:0, `time`voltage`electric, [TIMESTAMP, DOUBLE, INT]) as trades //将输出表定义为流数据表,可以再次订阅 outputTable = streamTable(10000:0, `time`avgElectric, [TIMESTAMP, DOUBLE]) share outputTable as aggrOutput def writeData(blockNumber){ timev = 2018.10.08T01:01:01.001 + timestamp(1..blockNumber) vt = 1..blockNumber * 0.01 bidv = take([1,2], blockNumber) insert into trades values(timev, vt, bidv); } tradesAggregator = createStreamAggregator(6, 3, <[avg(electric)]>, trades, outputTable, `time , false, , 2000) subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true) //对聚合结果进行订阅做二次聚合计算 outputTable2 =table(10000:0, `time`maxAggrElec, [TIMESTAMP, DOUBLE]) SecondAggregator = createStreamAggregator(6, 3, <[max(avgElectric)]>, aggrOutput, outputTable2, `time , false, , 2000) subscribeTable(, "aggrOutput", "SecondAggregator", 0, append!{SecondAggregator}, true) writeData(10) 查看输出表: select * from outputTable2 time maxAggrElec 2018.10.08T01:01:00.992 1 2018.10.08T01:01:00.995 1.5 6.createAggregator函数介绍及语法 createStreamAggregator函数关联了流数据聚合应用的3个主要信息: 输入数据源 输入数据源是流数据表,通过订阅的当时把数据源和聚合引擎联系起来。 聚合表达式 定义聚合计算的逻辑,支持复杂表达式。聚合引擎根据聚合表达式对流数据表做计算,并将结果输出到目的表中。 输出表 聚合结果可以输出到新建或已存在的内存表或流数据表中。内存表在数据操作上更加灵活,可以做更新删除操作,而输出到流数据表的数据无法再做改动,但是通过流数据表将聚合结果再次发布,可以满足更多应用场景。 6.1 语法 createStreamAggregator(windowTime, rollingTime, aggregators, dummyTable, outputTable, timeColumn[,useSystemTime, keyColumn, garbageSize]) 6.2 返回对象 返回一个抽象的表对象,作为聚合引擎的入口,向这个表写入数据,意味着数据进入聚合引擎进行计算。 6.3 参数 useSystemTime:布尔值,表示聚合引擎的驱动方式。当它为true时,表示时间驱动,即当到达预定的时间点,聚合引擎就会激活并以设定的窗口截取流数据进行计算。在这种模式下,时间的精度为毫秒,系统会给每一条进来的数据添加毫秒精度的时间戳作为数据窗口的依据。但它为false时,表示数据驱动,只有当数据进入系统时,聚合引擎才会被激活,系统会选择数据的时间字段timeColumn作为数据窗口的依据。它是可选参数,默认值为false。 windowSize:正整数,表示数据窗口的大小。数据窗口只包含下边界不包含上边界。 step:正整数,表示聚合计算的频率,即触发计算的时间间隔。 windowSize和step的单位相同,它们都取决于useSystemTime。当useSystemTime=true,它们的单位是毫秒,当useSystemTime=false,它们的单位与数据中的时间字段timeColumn相同。 为了便于对计算结果的观察和对比,系统会对窗口的起始时间统一对齐。具体规则请查看2.数据窗口 aggregators:元数据,表示聚合函数。支持系统内所有的聚合函数,如<sum(qty)>,<sum(qty),max(qty),avg(price)>,也支持对聚合结果使用表达式来满足更复杂的场景,如<[avg(price1)-avg(price2)]>,<[std(price1-price2)]>。 为了提升流数据聚合的性能,DolphinDB对部分聚合函数进行了优化,每次计算时,充分利用上一个窗口的计算结果,最大程度地降低了重复计算。 以下是经过优化聚合函数: corr:相关性 covar:协方差 first:第一个元素 last:最后一个元素 max:最大值 med:中位数 min:最小值 percentile:百分位数 std:标准差 sum:求和 sum2:平方和 var:方差 wavg:加权平均 wsum:加权和 dummyTable:表,提供一个样本表对象,不需要有数据,但是表结构必须与输入的流数据表相同。 outputTable:聚合结果的输出表。输出表的第一列是时间类型,用于存放发生计算的时间点,如果keyColumn不为空,则第二列为keyColumn(分组列),从第三列开始,用于存放聚合计算的结构。最终输出表的结构如下: 时间列 分组列 聚合结果列1 聚合结果列2 ... timeColumn:输入流数据表中的时间列。 keyColumn:聚合计算的分组列。按keyColumn分组,对输入流数据进行分组聚合计算。它是可选参数。 garbageSize:正整数。当内存中缓存的历史数据记录条数超过garbageSize时,系统将清理缓存。 当流数据聚合引擎在运行时,每次计算都会需要载入新的窗口数据到内存中进行计算,随着计算过程的持续,内存中缓存的数据会越来越多,这时候需要有一个机制来清理不再需要的历史数据。当内存中保留的历史数据行数超过garbageSize设定值时会引发清理内存。 当需要分组计算时,每个分组的历史数据记录数是分别统计的,所以内存清理的动作也是各分组独立进行的。当每个组的历史数据记录数超出garbageSize时都会引发清理内存。 6.4 示例 6.4.1 dummyTable示例 本例展示dummyTable的作用。增加一个结构完全与trades相同的modelTable对象,将modelTable作为dummyTable参数,而实际的数据仍然写入trades。 share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades modelTable = table(1000:0, `time`qty, [TIMESTAMP, INT]) outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT]) tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, modelTable, outputTable, `time) subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true) def writeData(n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) qtyv = take(1, n) insert into trades values(timev, qtyv) } writeData(6) 最后仍然输出了结果,说明聚合引擎的dummyTable参数只是一个样本表,它是否包含数据对结果并没有影响。 6.4.2 分组聚合示例 输入的流数据表增加了分组列sym,在聚合计算时设定keyColumn为sym。 share streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) as trades outputTable = table(10000:0, `time`sym`sumQty, [TIMESTAMP, SYMBOL, INT]) tradesAggregator = createStreamAggregator(3, 3, <[sum(qty)]>, trades, outputTable, `time, false,`sym, 50) subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true) def writeData(n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n) symv =take(`A`B, n) qtyv = take(1, n) insert into trades values(timev, symv, qtyv) } writeData(6) 为了观察方便,对执行结果的sym列排序输出: select * from trades order by sym time sym qty 2018.10.08T01:01:01.002 A 1 2018.10.08T01:01:01.004 A 1 2018.10.08T01:01:01.006 A 1 2018.10.08T01:01:01.003 B 1 2018.10.08T01:01:01.005 B 1 2018.10.08T01:01:01.007 B 1 outputTable的结果是根据sym列的内容进行的分组计算。 select * from outputTable time sym qty 2018.10.08T01:01:01.000 A 1 2018.10.08T01:01:01.003 A 1 2018.10.08T01:01:01.003 B 2 各组时间规整后统一从000时间点开始,根据windowSize=3, step=3, 每个组的窗口会按照000-003-006划分,计算触发在000,003两个时间点。 需要注意的是窗口内若没有任何数据,系统不会计算也不会产生结果,所以B组第一个窗口没有结果输出。 7.总结 DolphinDB database 提供的streamAggregator是一个轻量、使用方便的流数据聚合引擎,它通过与streamTable流数据表合作来完成流数据的实时计算任务。它能够支持纵向聚合和横向聚合以及组合计算,支持自定义函数计算,分组聚合,无效数据预清洗,多级计算等功能,能满足流数据实时计算各方面需求。

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Spring

Spring

Spring框架(Spring Framework)是由Rod Johnson于2002年提出的开源Java企业级应用框架,旨在通过使用JavaBean替代传统EJB实现方式降低企业级编程开发的复杂性。该框架基于简单性、可测试性和松耦合性设计理念,提供核心容器、应用上下文、数据访问集成等模块,支持整合Hibernate、Struts等第三方框架,其适用范围不仅限于服务器端开发,绝大多数Java应用均可从中受益。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册