跳到主要内容

fenic

fenic 是一个受 PySpark 启发的 DataFrame 框架,专为构建生产 AI 和代理应用程序而设计。fenic 提供从 Hugging Face Hub 直接读取数据集的支持。

开始使用

要开始使用,请使用 pip 安装 fenic

pip install fenic

创建会话

使用默认配置实例化 fenic 会话(足以读取数据集和其他非语义操作):

import fenic as fc

session = fc.Session.get_or_create(fc.SessionConfig())

概述

fenic 是一个有主见的数据处理框架,结合了:

  • DataFrame API:受 PySpark 启发的操作,用于熟悉的数据操作
  • 语义操作:内置的 AI/LLM 操作,包括语义函数、嵌入和聚类
  • 模型集成:原生支持 AI 提供程序(Anthropic、OpenAI、Cohere、Google)
  • 查询优化:通过逻辑计划转换自动优化

从 Hugging Face Hub 读取

fenic 可以使用 hf:// 协议直接从 Hugging Face Hub 读取数据集。此功能内置于 fenic 的 DataFrameReader 接口中。

支持的格式

fenic 支持从 Hugging Face 读取以下格式:

  • Parquet 文件.parquet
  • CSV 文件.csv

读取数据集

要从 Hugging Face Hub 读取数据集:

import fenic as fc

session = fc.Session.get_or_create(fc.SessionConfig())

# 从公共数据集读取 CSV 文件
df = session.read.csv("hf://datasets/datasets-examples/doc-formats-csv-1/data.csv")

# 使用 glob 模式读取 Parquet 文件
df = session.read.parquet("hf://datasets/cais/mmlu/astronomy/*.parquet")

# 从特定数据集修订版本读取
df = session.read.parquet("hf://datasets/datasets-examples/doc-formats-csv-1@~parquet/**/*.parquet")

使用架构管理读取

# 使用架构合并读取多个 CSV 文件
df = session.read.csv("hf://datasets/username/dataset_name/*.csv", merge_schemas=True)

# 使用架构合并读取多个 Parquet 文件
df = session.read.parquet("hf://datasets/username/dataset_name/*.parquet", merge_schemas=True)

注意: 在 fenic 中,架构是列名及其数据类型的集合。当你启用 merge_schemas 时,fenic 尝试通过用 null 填充缺失列并在可能的情况下扩大类型来协调文件之间的差异。某些布局仍然无法合并——请查阅 fenic 文档以了解 CSV 架构合并限制Parquet 架构合并限制

身份验证

要读取私有数据集,你需要将 Hugging Face 令牌设置为环境变量:

export HF_TOKEN="your_hugging_face_token_here"

路径格式

fenic 中的 Hugging Face 路径格式遵循以下结构:

hf://{repo_type}/{repo_id}/{path_to_file}

你还可以指定数据集修订版本或版本:

hf://{repo_type}/{repo_id}@{revision}/{path_to_file}

功能:

  • 支持 glob 模式(***
  • 使用 @ 表示法的数据集修订版本/版本:
    • 特定提交:@d50d8923b5934dc8e74b66e6e4b0e2cd85e9142e
    • 分支:@refs/convert/parquet
    • 分支别名:@~parquet
  • 私有数据集需要 HF_TOKEN 环境变量

混合数据源

fenic 允许你在单个读取操作中组合多个数据源,包括混合不同的协议:

# 在一个读取调用中混合 HF 和本地文件
df = session.read.parquet([
"hf://datasets/cais/mmlu/astronomy/*.parquet",
"file:///local/data/*.parquet",
"./relative/path/data.parquet"
])

这种灵活性允许你在数据处理管道中无缝组合来自 Hugging Face Hub 和本地文件的数据。

处理来自 Hugging Face 的数据

从 Hugging Face 加载后,你可以使用 fenic 的完整 DataFrame API:

基本 DataFrame 操作

import fenic as fc

session = fc.Session.get_or_create(fc.SessionConfig())

# 从 Hugging Face 加载 IMDB 数据集
df = session.read.parquet("hf://datasets/imdb/plain_text/train-*.parquet")

# 筛选和选择
positive_reviews = df.filter(fc.col("label") == 1).select("text", "label")

# 分组和聚合
label_counts = df.group_by("label").agg(
fc.count("*").alias("count")
)

AI 驱动的操作

要使用语义和嵌入操作,请在 SessionConfig 中配置语言和嵌入模型。配置后:

import fenic as fc

# 需要设置 OPENAI_API_KEY 以进行语言和嵌入调用
session = fc.Session.get_or_create(
fc.SessionConfig(
semantic=fc.SemanticConfig(
language_models={
"gpt-4o-mini": fc.OpenAILanguageModel(
model_name="gpt-4o-mini",
rpm=60,
tpm=60000,
)
},
embedding_models={
"text-embedding-3-small": fc.OpenAIEmbeddingModel(
model_name="text-embedding-3-small",
rpm=60,
tpm=60000,
)
},
)
)
)

# 从 Hugging Face 加载文本数据集
df = session.read.parquet("hf://datasets/imdb/plain_text/train-00000-of-00001.parquet")

# 向文本列添加嵌入
df_with_embeddings = df.select(
"*",
fc.semantic.embed(fc.col("text")).alias("embedding")
)

# 应用语义函数进行情感分析
df_analyzed = df_with_embeddings.select(
"*",
fc.semantic.analyze_sentiment(
fc.col("text"),
model_alias="gpt-4o-mini", # 可选:指定模型
).alias("sentiment")
)

示例:分析 MMLU 数据集

import fenic as fc

# 需要设置 OPENAI_API_KEY 以进行语义调用
session = fc.Session.get_or_create(
fc.SessionConfig(
semantic=fc.SemanticConfig(
language_models={
"gpt-4o-mini": fc.OpenAILanguageModel(
model_name="gpt-4o-mini",
rpm=60,
tpm=60000,
)
},
)
)
)

# 从 Hugging Face 加载 MMLU 天文学子集
df = session.read.parquet("hf://datasets/cais/mmlu/astronomy/*.parquet")

# 处理数据
processed_df = (df
# 筛选特定条件
.filter(fc.col("subject") == "astronomy")
# 选择相关列
.select("question", "choices", "answer")
# 使用 semantic.map 添加难度分析
.select(
"*",
fc.semantic.map(
"Rate the difficulty of this question from 1-5: {{question}}",
question=fc.col("question"),
model_alias="gpt-4o-mini" # 可选:指定模型
).alias("difficulty")
)
)

# 显示结果
processed_df.show()

资源