Feathr 是领英(LinkedIn)开源的企业级高性能特征存储。
特性:
- 定义特征:使用简单的 API,基于原始数据源
- 按名称获取这些特征:在模型训练和模型推理期间
- 共享特征:在你的团队和公司中
Feathr 会自动计算特征值并将它们加入你的训练数据,使用正确的时间点语义来避免数据泄漏,并支持实现和部署你的特征以在生产中使用。
在本地安装 Feathr 客户端
如果没有使用 Jupyter Notebook 并且想在本地安装 Feathr 客户端,使用这个:
或者使用来自 GitHub 的最新代码:
pip install git+https://github.com/linkedin/feathr.git#subdirectory=feathr_project
亮点
使用转换定义特征
features = [
Feature(name="f_trip_distance", # Ingest feature data as-is
feature_type=FLOAT),
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
transform="cast_float(trip_distance)>30"), # SQL-like syntax to transform raw data into feature
Feature(name="f_day_of_week",
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)") # Provides built-in transformation
]
anchor = FeatureAnchor(name="request_features", # Features anchored on same source
source=batch_source,
features=features)
丰富的 UDF 支持
Feathr 具有高度可定制的 UDF,具有原生 PySpark 和 Spark SQL 集成,可降低数据科学家的学习曲线:
def add_new_dropoff_and_fare_amount_column(df: DataFrame):
df = df.withColumn("f_day_of_week", dayofweek("lpep_dropoff_datetime"))
df = df.withColumn("fare_amount_cents", df.fare_amount.cast('double') * 100)
return df
batch_source = HdfsSource(name="nycTaxiBatchSource",
path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv",
preprocessing=add_new_dropoff_and_fare_amount_column,
event_timestamp_column="new_lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH🇲🇲ss")
访问特征
# Requested features to be joined
# Define the key for your feature
location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")
feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=[location_id])
# Observation dataset settings
settings = ObservationSettings(
observation_path="abfss://green_tripdata_2020-04.csv", # Path to your observation data
event_timestamp_column="lpep_dropoff_datetime", # Event timepstamp field for your data, optional
timestamp_format="yyyy-MM-dd HH🇲🇲ss") # Event timestamp format, optional
# Prepare training data by joining features to the input (observation) data.
# feature-join.conf and features.conf are detected and used automatically.
feathr_client.get_offline_features(observation_settings=settings,
output_path="abfss://output.avro",
feature_query=feature_query)
部署
client = FeathrClient()
redisSink = RedisSink(table_name="nycTaxiDemoFeature")
# Materialize two features into a redis table.
settings = MaterializationSettings("nycTaxiMaterializationJob",
sinks=[redisSink],
feature_names=["f_location_avg_fare", "f_location_max_fare"])
client.materialize_features(settings)
并从在线存储获取特征:
# Get features for a locationId (key)
client.get_online_features(feature_table = "agg_features",
key = "265",
feature_names = ['f_location_avg_fare', 'f_location_max_fare'])
# Batch get for multiple locationIds (keys)
client.multi_get_online_features(feature_table = "agg_features",
key = ["239", "265"],
feature_names = ['f_location_avg_fare', 'f_location_max_fare'])