跳到主要内容

Dask

Dask 是一个并行和分布式计算库,可扩展现有的 Python 和 PyData 生态系统。

特别是,我们可以使用 Dask DataFrame 来扩展 pandas 工作流。Dask DataFrame 并行化 pandas 以处理大型表格数据。它密切镜像 pandas API,使从在单个数据集上测试到处理完整数据集的转换变得简单。Dask 在处理 Parquet(Hugging Face Datasets 的默认格式)时特别有效,因为它支持丰富的数据类型、高效的列式筛选和压缩。

Dask 的一个很好的实际用例是以分布式方式在数据集上运行数据处理或模型推理。例如,参见 Coiled 关于使用 Hugging Face + Dask 扩展基于 AI 的数据处理的优秀博客文章。

读取和写入

由于 Dask 使用 fsspec 读取和写入远程数据,你可以使用 Hugging Face 路径(hf://)在 Hub 上读取和写入数据。

首先,你需要使用你的 Hugging Face 账户登录,例如使用:

hf auth login

然后你可以创建数据集仓库,例如使用:

from huggingface_hub import HfApi

HfApi().create_repo(repo_id="username/my_dataset", repo_type="dataset")

最后,你可以在 Dask 中使用 Hugging Face 路径。 Dask DataFrame 支持在 Hugging Face 上分布式写入 Parquet,它使用提交来跟踪数据集更改:

import dask.dataframe as dd

df.to_parquet("hf://datasets/username/my_dataset")

# 或者如果数据集有训练/验证/测试切分,则写入单独的目录
df_train.to_parquet("hf://datasets/username/my_dataset/train")
df_valid.to_parquet("hf://datasets/username/my_dataset/validation")
df_test .to_parquet("hf://datasets/username/my_dataset/test")

由于这为每个文件创建一个提交,建议在上传后压缩历史记录:

from huggingface_hub import HfApi

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")

这将创建一个包含 Parquet 格式的 Dask 数据集的数据集仓库 username/my_dataset。 你可以稍后重新加载它:

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/username/my_dataset")

# 或者如果数据集有训练/验证/测试切分,则从单独的目录读取
df_train = dd.read_parquet("hf://datasets/username/my_dataset/train")
df_valid = dd.read_parquet("hf://datasets/username/my_dataset/validation")
df_test = dd.read_parquet("hf://datasets/username/my_dataset/test")

有关 Hugging Face 路径及其实现方式的更多信息,请参阅客户端库关于 HfFileSystem 的文档

处理数据

要使用 Dask 并行处理数据集,你可以首先为 pandas DataFrame 或 Series 定义数据处理函数,然后使用 Dask map_partitions 函数将此函数并行应用于数据集的所有分区:

def dummy_count_words(texts):
return pd.Series([len(text.split(" ")) for text in texts])

或使用 pandas 字符串方法的类似函数(更快):

def dummy_count_words(texts):
return texts.str.count(" ")

在 pandas 中,你可以在文本列上使用此函数:

# pandas API
df["num_words"] = dummy_count_words(df.text)

在 Dask 中,你可以在每个分区上运行此函数:

# Dask API:在每个分区上运行函数
df["num_words"] = df.text.map_partitions(dummy_count_words, meta=int)

请注意,你还需要提供 meta,这是函数输出中 pandas Series 或 DataFrame 的类型。 这是必需的,因为 Dask DataFrame 使用延迟 API。由于 Dask 只会在调用 .compute() 时运行数据处理,它需要 meta 参数来在此期间知道新列的类型。

谓词和投影下推

从 Hugging Face 读取 Parquet 数据时,Dask 会自动利用 Parquet 文件中的元数据来跳过不需要的整个文件或行组。例如,如果你在 Parquet 格式的 Hugging Face 数据集上应用筛选器(谓词),或者如果你选择列的子集(投影),Dask 将读取 Parquet 文件的元数据以丢弃不需要的部分,而无需下载它们。

这得益于重新实现的 Dask DataFrame API 以支持查询优化,这使得 Dask 更快、更稳健。

例如,FineWeb-Edu 的这个子集包含许多 Parquet 文件。如果你可以筛选数据集以保留来自最近 CC 转储的文本,Dask 将跳过大多数文件,只下载与筛选器匹配的数据:

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/HuggingFaceFW/fineweb-edu/sample/10BT/*.parquet")

# Dask 将跳过不匹配查询的文件或行组,而无需下载它们。
df = df[df.dump >= "CC-MAIN-2023"]

Dask 还将只读取计算所需的列,并跳过其余列。 例如,如果你在代码中稍后删除列,如果不需要,它不会在管道的早期加载它。 当你想要操作列的子集或进行分析时,这很有用:

# Dask 将下载筛选和计算所需的 'dump' 和 'token_count',并跳过其他列。
df.token_count.mean().compute()

客户端

dask 中的大多数功能都针对集群或本地 Client 进行了优化,以启动并行计算:

import dask.dataframe as dd
from distributed import Client

if __name__ == "__main__": # 创建新进程所需
client = Client()
df = dd.read_parquet(...)
...

对于本地使用,Client 默认使用带有多进程的 Dask LocalCluster。你可以手动配置 LocalCluster 的多进程:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=8, threads_per_worker=8)
client = Client(cluster)

请注意,如果你在本地使用默认的线程调度器而不使用 Client,DataFrame 在某些操作后可能会变慢(更多详细信息这里)。

部署 Dask 文档中查找有关设置本地或云集群的更多信息。