首页 文章 精选 留言 我的

精选列表

搜索[数据库连接池],共10000篇文章
优秀的个人博客,低调大师

时序数据库Influx-IOx源码学习三(命令行及配置)

欢迎关注公众号: 上篇介绍到:InfluxDB-IOx的环境搭建,详情见:https://my.oschina.net/u/3374539/blog/5016798 本章开始,讲解启动的主流程! 打开src/main.rs文件可以找到下面的代码 fn main() -> Result<(), std::io::Error> { // load all environment variables from .env before doing anything load_dotenv(); let config = Config::from_args(); println!("{:?}", config); //省略 ..... Ok(()) } 在main方法中映入眼帘的第一行就是load_dotenv()方法,然后是Config::from_args()接下来就分别跟踪这两个方法,看明白是怎么工作的。 加载配置文件 在README文件中,我们可以看到这样一行: Should you desire specifying config via a file, you can do so using a .env formatted file in the working directory. You can use the provided example as a template if you want: cp docs/env.example .env 意思就是这个工程使用的配置文件,名字是.env。了解这个特殊的名字之后,我们看代码src/main.rs:276: fn load_dotenv() { //调用dotenv方法,并对其返回值进行判断 match dotenv() { //如果返回成功,程序什么都不做,继续执行。 Ok(_) => {} //返回的是错误,那么判断一下是否为'未找到'错误, //如果是未找到,那么就什么都不做(也就是有默认值填充) Err(dotenv::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => { } //这里就是真真正正必须要处理的错误了,直接退出程序 Err(e) => { eprintln!("FATAL Error loading config from: {}", e); eprintln!("Aborting"); std::process::exit(1); } }; } 然后跟踪dotenv()方法看看如何执行(这里就进入了dotenv这个crate了): 为了方便写,我就直接把所有调用,从上到下的顺序全都写出来了 //返回一个PathBuf的Result,之后再看这个Result pub fn dotenv() -> Result<PathBuf> { //new一个Finder结构并调用find方法 //?代表错误的时候直接抛出错误 let (path, iter) = Finder::new().find()?; //返回一个自定义的Iter结构,并调用load方法 iter.load()?; //成功返回 Ok(path) } //创建一个Finder结构体,filename使用`.env`填充 pub fn new() -> Self { Finder { filename: Path::new(".env"), } } //返回一个元组,多个返回值,(路径,文件读取相关记录) pub fn find(self) -> Result<(PathBuf, Iter<File>)> { //使用标准库中的current_dir()方法得到当前的路径 //出错就返回Error::Io错误,正常就调用find方法 let path = find(&env::current_dir().map_err(Error::Io)?, self.filename)?; //如果找到了.env文件就打开,打开错误就返回Error::Io错误 let file = File::open(&path).map_err(Error::Io)?; //使用打开的文件创建一个Iter的结构 let iter = Iter::new(file); //返回 Ok((path, iter)) } //递归查找.env文件 pub fn find(directory: &Path, filename: &Path) -> Result<PathBuf> { //拼装一个全路径 let candidate = directory.join(filename); //尝试打开这个文件 match fs::metadata(&candidate) { //成功打开了,说明找到了.env文件,就返回成功 //但我有个疑问文件内容为啥不校验一下呢? Ok(metadata) => if metadata.is_file() { return Ok(candidate); }, //除了没找到文件的错误之外,其它错误都直接返回异常 Err(error) => { if error.kind() != io::ErrorKind::NotFound { return Err(Error::Io(error)); } } } //没找到的时候,就返回到父级文件夹里,继续找,一直到根文件夹 if let Some(parent) = directory.parent() { find(parent, filename) } else { //一直到根文件夹,还没找到就返回一个NotFound的IO错误, //这个在上面的代码中提到,这个错误会被忽略 Err(Error::Io(io::Error::new(io::ErrorKind::NotFound, "path not found"))) } } //对应的iter.load()?;方法实现 pub fn load(self) -> Result<()> { //可以使用for是因为实现了Iterator 这个trait for item in self { //获取读取出来的一行一行的配置项 let (key, value) = item?; //验证key没有什么问题,就放到env中 if env::var(&key).is_err() { env::set_var(&key, value); } } Ok(()) } // 为了能够for循环,实现的Iterator impl<R: Read> Iterator for Iter<R> { type Item = Result<(String, String)>; fn next(&mut self) -> Option<Self::Item> { loop { //一行一行的读取文件内容 let line = match self.lines.next() { Some(Ok(line)) => line, Some(Err(err)) => return Some(Err(Error::Io(err))), None => return None, }; //解析配置项目,这里就不在深入跟了 match parse::parse_line(&line, &mut self.substitution_data) { Ok(Some(result)) => return Some(Ok(result)), Ok(None) => {} Err(err) => return Some(Err(err)), } } } } 研究这里的时候,我发现了一个比较好玩儿的东西就是返回值的Result<PathBuf>。标准库的定义中,Result是有两个值,分别是<T,E>。 自定义的类型,节省了Error这个模板代码 pub type Result<T> = std::result::Result<T, Error>; //Error也自己定义 pub enum Error { LineParse(String, usize), Io(io::Error), EnvVar(std::env::VarError), #[doc(hidden)] __Nonexhaustive } //实现一个not_found()的方法来判断是否为not_found的一个错误类型 impl Error { pub fn not_found(&self) -> bool { if let Error::Io(ref io_error) = *self { return io_error.kind() == io::ErrorKind::NotFound; } false } } //实现标准库中的error::Error这个trait impl error::Error for Error { //追踪错误的上一级,应该是打印堆栈这种功能 //如果内部有错误类型Err返回:Some(e),如果没有返回:None //关于'static这个生命周期的标注,我也不是很理解 //是指存储的错误生命周期足够长还是什么? fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { Error::Io(err) => Some(err), Error::EnvVar(err) => Some(err), _ => None, } } } //实现错误的打印 impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match self { Error::Io(err) => write!(fmt, "{}", err), Error::EnvVar(err) => write!(fmt, "{}", err), Error::LineParse(line, error_index) => write!(fmt, "Error parsing line: '{}', error at line index: {}", line, error_index), _ => unreachable!(), } } } 更详细的rust错误处理,可以参见:https://zhuanlan.zhihu.com/p/109242831 命令行参数 在main方法中我们可以看到第二行, let config = Config::from_args(); 这是influx使用了structopt这个crate,调用该方法后,程序会根据结构体上的#[structopt()]中的参数进行执行命令行解析。 #[derive(Debug, StructOpt)] #[structopt( //cargo的crate名字 name = "influxdb_iox", //打印出来介绍 about = "InfluxDB IOx server and command line tools", long_about = // 省略 ... )] struct Config { // from_occurrences代表出现了几次,就是-vvv的时候v出现的次数 #[structopt(short, long, parse(from_occurrences))] verbose: u64, #[structopt( short, long, global = true, env = "IOX_ADDR", default_value = "http://127.0.0.1:8082" )] host: String, #[structopt(long)] num_threads: Option<usize>, //subcommand代表是一个子类型的, //具体还有什么命令行要去子类型里继续解析, //这个字段不展示在命令行中 #[structopt(subcommand)] command: Command, } //在influx的命令行中提供了8个主要的命令, //在上一章中使用到的run参数就是属于Run(Box<commands::run::Config>)里的调用。 //这里都是subcommand,需要继续解析,这个在以后学习每个具体功能的时候再分析 #[derive(Debug, StructOpt)] enum Command { Convert { // 省略 ...}, Meta {// 省略 ...}, Database(commands::database::Config), Run(Box<commands::run::Config>), Stats(commands::stats::Config), Server(commands::server::Config), Writer(commands::writer::Config), Operation(commands::operations::Config), } 下面通过打印出来的例子来对应structopt中的内容。 $ ./influxdb_iox -vvvv run Config { verbose: 4, host: "http://127.0.0.1:8082", num_threads: None, command: Run(Config { rust_log: None, log_format: None, verbose_count: 0, writer_id: None, http_bind_address: 127.0.0.1:8080, grpc_bind_address: 127.0.0.1:8082, database_directory: None, object_store: None, bucket: None, aws_access_key_id: None, aws_secret_access_key: None, aws_default_region: "us-east-1", google_service_account: None, azure_storage_account: None, azure_storage_access_key: None, jaeger_host: None }) } 可以看到,我们执行了Run这个变体的Subcommand,并且指定了Config结构体中的verbose 4 次,IOx也成功的识别了。 后面继续学习程序的启动过程,祝玩儿的开心!

优秀的个人博客,低调大师

干货丨如何用数据库将高频信号转化成买卖信号

高频交易中,我们通常首先基于tick级的报价信息和交易信息来生成信号量,然后将这些信号量转化成离散的买卖信号,譬如说 1 (买入), 0 (不变), -1(卖出),接着根据资金和已有头寸以及其他优化规则来生成订单发送到交易系统。本文要讨论第二个步骤,即如何将信号量转化成离散的买卖信号,也就是把一个浮点数类型的数组signal转化成一个取值为1,0或-1的整型数组direction。 如果转化规则简单,譬如超过某一个阈值t1为+1 (买入信号),低于某一个阈值t2为-1(卖出信号),其他情况为0,那么实现起来也很简单。譬如在DolphinDB中用下面这个表达式就可以实现。 iif(signal > t1, 1, iif(signal <t2, -1, 0)) 实践中,为了让系统更加的健壮,不要频繁的切换买卖方向,通常不会这么处理。一个常用的做法是这样:当信号量超过某一个阈值t1时,开始转化为买入信号,后续的信号量在衰减到低于t10之前,一直保持买入信号(+1);同理当信号量低于某一个阈值t2时,开始转化为卖出信号,后续的信号量在增强到大于t20之前,一直保持卖出信号(-1);其他情况为0。这儿t1, t10, t2, t20满足下面的规则: t1 > t10 > t20 > t2 1 当系统按照上面的规则运行时,决定买卖方向的除了当前的信号量值,还有前一个买卖信号的状态,这是典型的路径依赖问题。通常我们认为路径依赖问题不适合向量化的方法来处理,或者说需要非常高的技巧。而我们用来回测高频数据的语言通常都是脚本语言(譬如DolphinDB和kdb+),脚本语言在处理量化问题时效率很高,但是如果需要逐行处理路径依赖问题,解析成本会很高,效率低下。今天我们会介绍一些技巧,如何化解这个矛盾? 我们先找出买入信号。在一个向量中找到大于t1的点很容易(买入信号的临界点),找到不可能是买入信号的点也很简单(小于t10)。这样我们把一个向量上的点分成了三种状态,买入信号临界点(+1),不可能是买入信号的点(0),其他状态未知的点(NULL)。根据前面的规则,如果状态未知的点,前面出现了买入临界点,那么该点也应该置为买入信号点;如果前面出现了非买入信号点(0),那么该点也应该置为非买入信号点。因此我们可以使用front fill来实现。我们用同样的方法可以找出卖出信号(卖出信号为+1,其他信号为0)。两者相减可以得到最终的信号。可能还存在一些为null的信号,把这部分信号替换为0。DolphinDB的全部代码如下: buy = iif(signal >t1, 1h, iif(signal < t10, 0h, 00h)).ffill() sell = iif(signal <t2, 1h, iif(signal > t20, 0h, 00h)).ffill() direction = (buy - sell).nullFill(0h) 上面的代码可以合并成单个表达式: direction = (iif(signal >t1, 1h, iif(signal < t10, 0h, 00h)) - iif(signal <t2, 1h, iif(signal > t20, 0h, 00h))).ffill().nullFill(0h) 一个简单的测试如下: t1= 60 t10 = 50 t20 = 30 t2 = 20 signal =10 20 70 59 42 49 19 25 26 35 direction = (iif(signal >t1, 1h, iif(signal < t10, 0h, 00h)) - iif(signal <t2, 1h, iif(signal > t20, 0h, 00h))).ffill().nullFill(0h) [-1,-1,1,1,0,0,-1,-1,-1,0] 如果改用kdb+脚本来实现,则表达式如下: direction: 0h^fills(-).(0N 1h)[(signal>t1;signal<t2)]^'(0N 0h)[(signal<t10;signal>t20)] 如果使用pandas实现,代码如下: t1 = 60 t10 = 50 t20 = 30 t2 = 20 signal = pd.Series([10,20,70,59,42,49,19,25,26,35]) direction = (signal.apply(lambdas: 1 if s > t1 else (0 if s < t10 else np.nan)) - signal.apply(lambdas: 1 if s < t2 else (0 if s > t20 else np.nan))).ffill().fillna(0) 下面我们生成一个长度为1000万的在0~100之间的随机信号数组,测试DolphinDB、kdb+和pandas的性能。测试使用的机器配置如下: CPU:Intel® Core™ i7-7700 CPU @3.60GHz 3.60 GHz 内存:16GB OS:Windows 10 DolphinDB耗时330ms, kdb+耗时800ms,pandas耗时6.8s左右。DolphinDB的测试脚本如下: t1= 60 t10 = 50 t20 = 30 t2 = 20 signal = rand(100.0, 10000000) timer direction = (iif(signal >t1, 1h, iif(signal < t10, 0h, 00h)) - iif(signal <t2, 1h, iif(signal > t20, 0h, 00h))).ffill().nullFill(0h) kdb+的测试脚本如下: t1:60 t10:50 t20:30 t2:20 signal: 10000000 ? 100.0 \t 0h^fills(-).(0N 1h)[(signal>t1;signal<t2)]^'(0N 0h)[(signal<t10;signal>t20)] pandas的测试脚本如下: import time t1= 60 t10= 50 t20= 30 t2= 20 signal= pd.Series(np.random.random(10000000) * 100) start= time.time() direction= (signal.apply(lambdas:1 if s > t1 else (0 if s < t10 else np.nan)) - signal.apply(lambdas:1 if s < t2 else (0 if s > t20 else np.nan))).ffill().fillna(0) end= time.time() print(end- start) 通过上面这个例子,也不难发现,DolphinDB和kdb+的脚本在本质上有很多共性的东西。kdb+的脚本基本上能逐句逐词的翻译成DolphinDB脚本。区别在于kdb+是从左到右解析脚本的,而DolphinDB跟常规的编程语言一样,是从右到左;kdb+喜欢用符号来代表某一个功能,而DolphinDB更喜欢用函数来表达某一个功能,可读性会比较好但也会冗长一点。

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

WebStorm

WebStorm

WebStorm 是jetbrains公司旗下一款JavaScript 开发工具。目前已经被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。与IntelliJ IDEA同源,继承了IntelliJ IDEA强大的JS部分的功能。

用户登录
用户注册