登录
首页 >  文章 >  python教程

Pandas条件滚动累加技巧分享

时间:2025-09-03 20:49:06 408浏览 收藏

在Pandas DataFrame中进行条件滚动累加计算,传统迭代方法效率低下,尤其是在处理大数据集时。本文提出了一种高效的解决方案,即利用Pandas的`groupby_rolling`函数进行向量化优化。通过将条件计数转换为数值,并确保数据按分组键和时间列排序,我们可以轻松地对指定分组内的历史数据在特定时间窗内进行条件求和。本文详细介绍了`groupby_rolling`的使用方法,并强调了数据预处理、排序以及时间窗口定义等关键注意事项。通过示例代码,展示了如何避免低效的循环,从而实现高性能的数据分析,显著提升计算效率,为数据分析师提供了一种更优的实践方案。

Pandas中条件滚动累加的向量化实现

本文旨在解决Pandas DataFrame中基于条件和时间窗口进行累加计算的效率问题。通过详细分析迭代方法的局限性,并引入Pandas groupby_rolling函数,展示了如何高效地对指定分组内的历史数据在特定时间窗内进行条件求和。教程提供了示例代码,并强调了数据预处理、排序及窗口定义等关键注意事项,以实现高性能的数据分析。

1. 问题背景与低效实现

在数据分析中,我们经常需要对DataFrame中的每一行,根据其特定属性(如商店ID)和时间戳,回溯查询满足特定条件的历史数据并进行聚合计算。例如,对于每一笔交易,我们可能需要计算在过去15天内同一商店某种特定商品的出现次数。

传统的实现方式通常涉及使用循环(如iterrows())遍历DataFrame的每一行,然后在循环内部对DataFrame进行过滤和计算。这种方法虽然直观,但对于大型数据集而言,其性能瓶颈非常明显,因为它涉及大量的重复数据筛选操作,且Python循环的效率远低于底层优化的Pandas操作。

考虑以下一个典型的低效实现示例:

import pandas as pd
import numpy as np
from tqdm import tqdm

# 示例数据
data = pd.DataFrame({
    'shop': [1, 2, 1, 2, 1, 2, 1, 2, 1, 2] * 3,
    'execution_date': pd.to_datetime(pd.date_range(start='2023-11-01', periods=30, freq='D')),
    'item': np.random.choice(['stuff', 'other', 'another'], size=30)
})
data.loc[data.index % 3 == 0, 'item'] = 'stuff'
data.loc[data.index % 7 == 0, 'item'] = 'stuff'

cum_items = []
data.execution_date = pd.to_datetime(data.execution_date, errors="coerce")
subset = data[["shop", "execution_date", "item"]]

# 预处理:将需要计数的条件转换为数值(1或0)
# 假设我们要计算 'item' 列中 'stuff' 的出现次数
subset['is_stuff'] = (subset['item'] == 'stuff').astype(int)

for i, row in tqdm(subset.iterrows(), total=len(subset)):
    # 过滤条件:同一商店,且日期严格早于当前日期减去15天
    fdf = (subset
           .loc[(subset.shop == row["shop"])]
           .loc[subset.execution_date < (row["execution_date"] - pd.Timedelta(15, unit='d'))]
          )

    if len(fdf) == 0: 
        cum_items.append(np.nan)
    else: 
        cum_items.append(fdf['is_stuff'].sum()) # 计算 'stuff' 的出现次数

data["cum_items_iter"] = cum_items
print("迭代计算结果 (部分):")
print(data[['shop', 'execution_date', 'item', 'cum_items_iter']].head(10))

上述代码通过逐行遍历,为每行构建一个子DataFrame并执行过滤和求和操作。对于包含数万甚至数百万行的数据集,这种方法将导致极长的运行时间。

2. 使用 groupby_rolling 进行向量化优化

为了显著提升性能,我们需要利用Pandas的向量化操作。groupby_rolling是处理此类分组-时间窗口聚合问题的强大工具。它允许我们对DataFrame进行分组,并在每个组内定义一个滑动时间窗口进行计算,从而避免显式循环。

核心思想:

  1. 数据预处理: 将需要计数的条件(如item == 'stuff')转换为数值类型(1或0),以便进行求和。
  2. 排序: 确保数据按分组键和时间列进行排序,这是rolling操作正确定义时间窗口的前提。
  3. 分组与滚动: 使用groupby()指定分组键,然后使用rolling()定义时间窗口,并指定基于哪一列进行滚动。
  4. 聚合: 在每个滚动窗口内执行聚合操作(如sum())。
  5. 结果对齐: 将rolling操作的结果重新对齐到原始DataFrame的索引。

下面是使用groupby_rolling实现上述需求的向量化代码:

import pandas as pd
import numpy as np

# 示例数据(与上文相同)
data = pd.DataFrame({
    'shop': [1, 2, 1, 2, 1, 2, 1, 2, 1, 2] * 3,
    'execution_date': pd.to_datetime(pd.date_range(start='2023-11-01', periods=30, freq='D')),
    'item': np.random.choice(['stuff', 'other', 'another'], size=30)
})
data.loc[data.index % 3 == 0, 'item'] = 'stuff'
data.loc[data.index % 7 == 0, 'item'] = 'stuff'

# 1. 确保日期列为datetime类型
data.execution_date = pd.to_datetime(data.execution_date, errors="coerce")

# 2. 预处理:将需要计数的条件转换为数值(1或0)
# 假设我们要计算 'item' 列中 'stuff' 的出现次数
data['is_stuff'] = (data['item'] == 'stuff').astype(int)

# 3. 排序:按商店和日期排序,这对于滚动操作至关重要
# 注意:groupby_rolling 可能会打乱原始顺序,因此在计算前排序并保留原始索引是好习惯
subset_sorted = data.sort_values(['shop', 'execution_date']).copy()

# 4. 使用 groupby_rolling 进行向量化计算
# '15D' 定义了一个15天的窗口,包括当前日期并向前追溯15天
# on='execution_date' 指定了滚动基于的日期列
data['cum_items_vec'] = (subset_sorted.groupby('shop')
                                      .rolling('15D', on='execution_date')['is_stuff']
                                      .sum()
                                      .reset_index(level=0, drop=True) # 移除groupby引入的shop级别索引
                                      .reindex(data.index) # 将结果重新对齐到原始DataFrame的索引
                                     )

# 由于滚动窗口默认包含当前点,如果需要排除当前点,可以进一步处理
# 例如,如果'is_stuff'在当前日期为1,则减去它
# data['cum_items_vec'] = data['cum_items_vec'] - data['is_stuff']

print("\n向量化计算结果 (部分):")
print(data[['shop', 'execution_date', 'item', 'is_stuff', 'cum_items_vec']].head(10))

# 比较迭代和向量化结果(如果窗口定义一致)
# 注意:本例中,迭代方法和向量化方法对时间窗口的定义略有不同
# 迭代方法:D' < D - 15天 (严格早于15天前)
# 向量化方法:[D - 15天, D] (过去15天,包含当前日期)
# 因此,结果通常不会完全相同,但向量化方法解决了“过去N天”的常见需求

关于时间窗口的说明: 原问题中迭代方法的条件是 subset.execution_date < (row["execution_date"] - pd.Timedelta(15, unit='d')),这意味着它查找严格早于当前日期15天前的所有记录。而rolling('15D', on='execution_date')所定义的窗口是[当前日期 - 15天, 当前日期],即包含当前日期在内的过去15天。虽然这两种窗口定义略有不同,但groupby_rolling提供的“过去N天”窗口计算是时间序列分析中更常见且高效的向量化模式。如果严格需要原问题中的窗口定义,可能需要更复杂的rolling配置或结合其他操作。

3. 关键注意事项

  • 数据类型: 确保用于定义时间窗口的列是Pandas的datetime类型。如果不是,需要使用pd.to_datetime()进行转换。
  • 数据排序: 在应用groupby_rolling之前,务必根据分组键和时间列对DataFrame进行排序。rolling操作依赖于数据的顺序来正确地构建时间窗口。
  • 窗口定义: 理解rolling()函数中时间偏移字符串(如'15D')的含义。它定义了一个以当前点为终点的固定长度时间窗口。例如,'15D'表示从当前日期开始,向前追溯15天的窗口,包括当前日期。
  • 结果对齐: groupby_rolling操作会生成一个带有MultiIndex的Series。为了将结果添加回原始DataFrame,需要使用reset_index(level=0, drop=True)移除分组键作为索引级别,然后使用reindex(data.index)将结果重新对齐到原始DataFrame的索引。
  • 条件计数转换为数值: 如果需要对特定条件(如字符串值)进行计数,应先将该条件转换为布尔值(True/False),再转换为整数(1/0),然后进行sum()操作。

4. 总结

通过将低效的iterrows()循环替换为Pandas的groupby_rolling函数,我们能够实现对DataFrame中条件滚动累加计算的显著性能提升。这种向量化方法不仅代码更简洁,而且利用了Pandas底层C语言优化,极大地加快了大数据集的处理速度。掌握groupby_rolling是进行高效时间序列数据分析的关键技能之一。

以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。

相关阅读
更多>
最新阅读
更多>
课程推荐
更多>