Feathr领英开源的企业级特征存储
Feathr 是领英(LinkedIn)开源的企业级高性能特征存储。
特性:
- 定义特征:使用简单的 API,基于原始数据源
- 按名称获取这些特征:在模型训练和模型推理期间
- 共享特征:在你的团队和公司中
Feathr 会自动计算特征值并将它们加入你的训练数据,使用正确的时间点语义来避免数据泄漏,并支持实现和部署你的特征以在生产中使用。
在本地安装 Feathr 客户端
如果没有使用 Jupyter Notebook 并且想在本地安装 Feathr 客户端,使用这个:
pip install -U feathr
或者使用来自 GitHub 的最新代码:
pip install git+https://github.com/linkedin/feathr.git#subdirectory=feathr_project
亮点
使用转换定义特征
features = [ Feature(name="f_trip_distance", # Ingest feature data as-is feature_type=FLOAT), Feature(name="f_is_long_trip_distance", feature_type=BOOLEAN, transform="cast_float(trip_distance)>30"), # SQL-like syntax to transform raw data into feature Feature(name="f_day_of_week", feature_type=INT32, transform="dayofweek(lpep_dropoff_datetime)") # Provides built-in transformation ] anchor = FeatureAnchor(name="request_features", # Features anchored on same source source=batch_source, features=features)
丰富的 UDF 支持
Feathr 具有高度可定制的 UDF,具有原生 PySpark 和 Spark SQL 集成,可降低数据科学家的学习曲线:
def add_new_dropoff_and_fare_amount_column(df: DataFrame): df = df.withColumn("f_day_of_week", dayofweek("lpep_dropoff_datetime")) df = df.withColumn("fare_amount_cents", df.fare_amount.cast('double') * 100) return df batch_source = HdfsSource(name="nycTaxiBatchSource", path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", preprocessing=add_new_dropoff_and_fare_amount_column, event_timestamp_column="new_lpep_dropoff_datetime", timestamp_format="yyyy-MM-dd HH🇲🇲ss")
访问特征
# Requested features to be joined # Define the key for your feature location_id = TypedKey(key_column="DOLocationID", key_column_type=ValueType.INT32, description="location id in NYC", full_name="nyc_taxi.location_id") feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=[location_id]) # Observation dataset settings settings = ObservationSettings( observation_path="abfss://green_tripdata_2020-04.csv", # Path to your observation data event_timestamp_column="lpep_dropoff_datetime", # Event timepstamp field for your data, optional timestamp_format="yyyy-MM-dd HH🇲🇲ss") # Event timestamp format, optional # Prepare training data by joining features to the input (observation) data. # feature-join.conf and features.conf are detected and used automatically. feathr_client.get_offline_features(observation_settings=settings, output_path="abfss://output.avro", feature_query=feature_query)
部署
client = FeathrClient() redisSink = RedisSink(table_name="nycTaxiDemoFeature") # Materialize two features into a redis table. settings = MaterializationSettings("nycTaxiMaterializationJob", sinks=[redisSink], feature_names=["f_location_avg_fare", "f_location_max_fare"]) client.materialize_features(settings)
并从在线存储获取特征:
# Get features for a locationId (key) client.get_online_features(feature_table = "agg_features", key = "265", feature_names = ['f_location_avg_fare', 'f_location_max_fare']) # Batch get for multiple locationIds (keys) client.multi_get_online_features(feature_table = "agg_features", key = ["239", "265"], feature_names = ['f_location_avg_fare', 'f_location_max_fare'])
评论