给 Databend 添加 Scalar 函数 | 函数开发系例一
在 Databend 中按函数实现分为了:scalars 函数和 aggregates 函数。 Scalar 函数: 基于输入值,返回单个值。常见的 Scalar function 有 now, round 等。 Aggregate 函数: 用于对列的值进行操作并返回单个值。常见的 Agg function 有 sum, count, avg 等。 https://github.com/datafuselabs/databend/tree/main/src/query/functions/src 该系列共两篇,本文主要介绍 Scalar Function 从注册到执行是如何在 Databend 运行起来的。 函数注册 由 FunctionRegistry 接管函数注册。 #[derive(Default)] pubstructFunctionRegistry{ pubfuncs:HashMap<&'staticstr,Vec<Arc<Function>>>, #[allow(clippy::type_complexity)] pubfactories:HashMap< &'staticstr, Vec<Box<dynFn(&[usize],&[DataType])->Option<Arc<Function>>+'static>>, >, pubaliases:HashMap<&'staticstr,&'staticstr>, } 三个 item 都是 Hashmap。 其中,funcs 和 factories 都用来存储被注册的函数。不同之处在于 funcs 注册的都是固定参数个数的函数(目前支持最少参数个数为0,最多参数个数为 5),分为 register_0_arg, register_1_arg 等等。而 factories 注册的都是参数不定长的函数(如 concat),调用 register_function_factory 函数。 由于一个函数可能有多个别名(如 minus 的别名有 subtract 和 neg),因此有了 alias,它的 key 是某个函数的别名,v 是当前的存在的函数名,调用 register_aliases 函数。 另外, 根据不同的功能需求, 我们提供了不同级别的 register api。 函数构成 已知 funcs 的 value 是函数主体,我们来看一下 Function 在 Databend 中是怎么构建的。 pubstructFunction{ pubsignature:FunctionSignature, #[allow(clippy::type_complexity)] pubcalc_domain:Box<dynFn(&[Domain])->Option<Domain>>, #[allow(clippy::type_complexity)] pubeval:Box<dynFn(&[ValueRef<AnyType>],FunctionContext)->Result<Value<AnyType>,String>>, } 其中,signature 包括 函数名,参数类型,返回类型以及函数特性(目前暂未有函数使用特性,仅作为保留位)。要特别注意的是,在注册时函数名需要是小写。而一些 token 会经过 src/query/ast/src/parser/token.rs 转换。 #[allow(non_camel_case_types)] #[derive(Logos,Clone,Copy,Debug,PartialEq,Eq,Hash)] pubenumTokenKind{ ... #[token("+")] Plus, ... } 以实现 `select 1+2` 的加法函数为例子,`+` 被转换为 Plus,而函数名需要小写,因此我们在注册时函数名使用 `plus`。 with_number_mapped_type!(|NUM_TYPE|matchleft{ NumberDataType::NUM_TYPE=>{ registry.register_1_arg::<NumberType<NUM_TYPE>,NumberType<NUM_TYPE>,_,_>( "plus", FunctionProperty::default(), |lhs|Some(lhs.clone()), |a,_|a, ); } }); calc_domain 用来计算输出值的输入值的集合。用数学公式描述的话比如 `y = f(x)` 其中域就是 x 值的集合,可以作为f的参数生成 y 值。这可以使我们在索引数据时轻松过滤掉不在域内的值,极大提升响应效率。 eval 可以理解成函数的具体实现内容。本质是接受一些字符或者数字,将他们解析成表达式,再转换成另外一组值。 示例 目前在 function-v2 中实现的函数有这几类:arithmetric, array, boolean, control, comparison, datetime, math, string, string_mult_args, variant 以 length 的实现为例: length 接受一个 String 类型的值为参数,返回一个 Number 类型。名字为 length,domain 不做限制(因为任何 string 都有长度)最后一个参数是一个闭包函数,作为 length 的 eval 实现部分。 registry.register_1_arg::<StringType,NumberType<u64>,_,_>( "length", FunctionProperty::default(), |_|None, |val,_|val.len()asu64, ); 在 register_1_arg 的实现中,我们看到调用的函数是 register_passthrough_nullable_1_arg,函数名包含一个 nullable。而 eval 被 vectorize_1_arg 调用。 注意:请不要手动修改 register_1_arg 所在的文件 [src/query/expression/src/register.rs](https://github.com/datafuselabs/databend/blob/2aec38605eebb7f0e1717f7f54ec52ae0f2e530b/src/query/expression/src/register.rs) 。因为它是被 [src/query/codegen/src/writes/register.rs](https://github.com/datafuselabs/databend/blob/2aec38605eebb7f0e1717f7f54ec52ae0f2e530b/src/query/codegen/src/writes/register.rs) 生成的。 pubfnregister_1_arg<I1:ArgType,O:ArgType,F,G>( &mutself, name:&'staticstr, property:FunctionProperty, calc_domain:F, func:G, )where F:Fn(&I1::Domain)->Option<O::Domain>+'static+Clone+Copy, G:Fn(I1::ScalarRef<'_>,FunctionContext)->O::Scalar+'static+Clone+Copy, { self.register_passthrough_nullable_1_arg::<I1,O,_,_>( name, property, calc_domain, vectorize_1_arg(func), ) } 这是因为 eval 在实际应用场景中接受的不只是字符或者数字,还可能是 null 或者其他各种类型。而 null 无疑是最特殊的一种。而我们接收的参数也可能是一个列或者一个值。比如 selectlength(null); +--------------+ |length(null)| +--------------+ |NULL| +--------------+ selectlength(id)fromt; +------------+ |length(id)| +------------+ |2| |3| +------------+ 基于此,如果我们在函数中无需对 null 类型的值做特殊处理,直接使用 register_x_arg 即可。如果需要对 null 类型做特殊处理,参考 [try_to_timestamp](https://github.com/datafuselabs/databend/blob/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/src/scalars/datetime.rs)。 而对于需要在 vectorize 中进行特化的函数则需要调用 register_passthrough_nullable_x_arg,对要实现的函数进行特定的向量化优化。 例如 comparison 函数 regexp 的实现:regexp 接收两个 String 类型的值,返回 Bool 值。在向量化执行中,为了进一步优化减少重复正则表达式的解析,引入了 HashMap 结构。因此单独实现了 `vectorize_regexp`。 registry.register_passthrough_nullable_2_arg::<StringType,StringType,BooleanType,_,_>( "regexp", FunctionProperty::default(), |_,_|None, vectorize_regexp(|str,pat,map,_|{ letpattern=ifletSome(pattern)=map.get(pat){ pattern }else{ letre=regexp::build_regexp_from_pattern("regexp",pat,None)?; map.insert(pat.to_vec(),re); map.get(pat).unwrap() }; Ok(pattern.is_match(str)) }), ); 函数测试 Unit Test 函数相关单元测试在 [scalars](https://github.com/datafuselabs/databend/tree/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/tests/it/scalars) 目录中。 Logic Test Functions 相关的 logic 测试在 [02_function](https://github.com/datafuselabs/databend/tree/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/tests/sqllogictests/suites/query/02_function) 目录中。 关于 Databend Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。 Databend 文档:https://databend.rs/ Twitter:https://twitter.com/Datafuse_Labs Slack:https://datafusecloud.slack.com/ Wechat:Databend GitHub :https://github.com/datafuselabs/databend