登录
首页 >  文章 >  python教程

Python递归合并多级CSV文件教程

时间:2025-11-09 20:00:38 491浏览 收藏

本教程旨在指导读者利用Python高效合并多层子文件夹中的CSV文件,适用于数据分析和处理场景。通过结合`pathlib`模块的递归遍历功能和`pandas`库的数据处理能力,我们将学习如何定位、读取并整合分散在复杂目录结构中的CSV文件。教程将详细讲解如何使用`pathlib`简化文件路径操作,并利用`pandas`的`read_csv()`和`concat()`函数实现高效的数据合并。文中提供实际代码示例,展示了如何将散落在多个子目录下的CSV文件合并成一个统一的、便于分析的数据集,并探讨了处理大数据集时的内存管理策略和常见错误处理方法,助力读者提升数据处理效率。

Python教程:递归查找并合并多个子文件夹中的CSV文件

本教程将指导您如何利用Python的`pathlib`模块递归遍历复杂目录结构,并结合`pandas`库高效地将多个子文件夹中的CSV文件合并成一个统一的CSV文件。我们将通过一个实际示例,展示如何定位、读取并整合分散的数据,最终生成一个便于分析的汇总数据集。

理解需求:多层目录下的CSV文件合并挑战

在数据处理的日常工作中,我们经常会遇到数据文件分散存储在多层子目录中的情况。例如,日志文件、传感器数据或实验结果可能按照日期、类型或批次等维度,被组织成类似 Sessions/day1/weather/weather1.csv、Sessions/day2/weather/weather2.csv 这样的结构。当需要对所有这些分散的数据进行统一分析时,首要任务就是将它们合并成一个完整的数据集。

传统的文件操作方法,如使用 os.walk 遍历目录树,或者 glob 模块进行模式匹配,虽然能够实现文件查找,但在处理复杂路径和递归查找时,代码可能显得冗长且不够直观。结合 pandas 进行数据合并,则需要一个高效且简洁的机制来获取所有目标文件的路径。

核心工具:pathlib与pandas

Python标准库中的 pathlib 模块提供了一种面向对象的方式来处理文件系统路径,极大地简化了路径操作。其 Path.rglob() 方法尤其适用于递归地查找符合特定模式的文件。而 pandas 库作为数据分析的核心工具,提供了 read_csv() 用于读取CSV文件,以及 concat() 函数用于将多个DataFrame对象高效地合并在一起。

本教程将展示如何巧妙地结合这两个库,以简洁高效的方式完成多层子目录下的CSV文件合并任务。

实现步骤与代码示例

我们将以一个典型的场景为例:假设有一个名为 Sessions 的父目录,其中包含多个 dayX 子目录,每个 dayX 子目录又包含一个 weather 子目录,最终在 weather 目录中存放着 weatherX.csv 文件。我们的目标是将所有 weatherX.csv 文件合并到 Sessions 目录下的一个 weather_all.csv 文件中。

步骤1:导入必要库

首先,我们需要导入 pathlib 用于路径操作,以及 pandas 用于数据处理。

from pathlib import Path
import pandas as pd

步骤2:定义目标父目录

指定包含所有子目录和CSV文件的父目录路径。在本例中,我们假设当前脚本与 Sessions 目录处于同一级别,或者提供 Sessions 的绝对路径。

parent_directory = Path('Sessions') # 相对路径,如果Sessions在当前工作目录下
# 或者 parent_directory = Path('/path/to/your/Sessions') # 绝对路径,请替换为实际路径

步骤3:递归查找并收集CSV文件路径

使用 Path.rglob('*.csv') 方法,我们可以递归地在 parent_directory 及其所有子目录中查找所有扩展名为 .csv 的文件。

csv_files = list(parent_directory.rglob('*.csv'))
print(f"找到 {len(csv_files)} 个CSV文件。")
if csv_files:
    for file in csv_files[:5]: # 打印前5个文件路径作为示例
        print(file)

步骤4:逐一读取CSV文件并存储到列表中

遍历找到的所有CSV文件路径,使用 pd.read_csv() 将每个文件读取为一个 pandas.DataFrame,并将这些DataFrame对象收集到一个列表中。

dfs_to_combine = []
for file_path in csv_files:
    try:
        df = pd.read_csv(file_path)
        dfs_to_combine.append(df)
    except pd.errors.EmptyDataError:
        print(f"警告: 文件 '{file_path}' 为空,跳过。")
    except Exception as e:
        print(f"错误: 读取文件 '{file_path}' 失败: {e},跳过。")

步骤5:合并所有数据框并保存

当所有DataFrame都读取完毕并存储在 dfs_to_combine 列表中后,使用 pd.concat() 函数将它们一次性合并成一个大的DataFrame。最后,将合并后的DataFrame保存为一个新的CSV文件。

if dfs_to_combine: # 确保列表不为空
    combined_df = pd.concat(dfs_to_combine, ignore_index=True)
    destination_path = parent_directory / 'weather_all.csv' # 合并后的文件路径

    # 确保输出目录存在,如果destination_path包含子目录
    destination_path.parent.mkdir(parents=True, exist_ok=True)

    combined_df.to_csv(destination_path, index=False, encoding='utf-8-sig')
    print(f"所有CSV文件已成功合并并保存到: {destination_path}")
else:
    print("没有可合并的数据。")

ignore_index=True 参数在合并时会重置索引,这对于新的合并数据集通常是期望的行为。encoding='utf-8-sig' 确保了在不同系统上的兼容性,尤其是在处理包含特殊字符的数据时。

完整代码示例

将上述步骤整合在一起,形成一个完整的脚本:

from pathlib import Path
import pandas as pd

def combine_nested_csvs(parent_dir_path, output_filename='weather_all.csv'):
    """
    递归查找指定父目录及其子目录中的所有CSV文件,
    并将它们合并成一个单一的CSV文件。

    Args:
        parent_dir_path (str or Path): 包含CSV文件的父目录路径。
        output_filename (str): 合并后CSV文件的名称。
    """
    parent_directory = Path(parent_dir_path)

    if not parent_directory.is_dir():
        print(f"错误: 目录 '{parent_dir_path}' 不存在。")
        return

    csv_files = list(parent_directory.rglob('*.csv'))

    if not csv_files:
        print(f"在 '{parent_dir_path}' 中没有找到任何CSV文件。")
        return

    print(f"找到 {len(csv_files)} 个CSV文件,开始读取和合并...")

    dfs_to_combine = []
    for file_path in csv_files:
        try:
            # 尝试读取文件,可以根据需要添加更多pd.read_csv的参数
            df = pd.read_csv(file_path)
            dfs_to_combine.append(df)
        except pd.errors.EmptyDataError:
            print(f"警告: 文件 '{file_path}' 为空,跳过。")
        except Exception as e:
            print(f"错误: 读取文件 '{file_path}' 失败: {e},跳过。")

    if dfs_to_combine:
        combined_df = pd.concat(dfs_to_combine, ignore_index=True)
        destination_path = parent_directory / output_filename

        # 确保输出目录存在
        destination_path.parent.mkdir(parents=True, exist_ok=True)

        combined_df.to_csv(destination_path, index=False, encoding='utf-8-sig')
        print(f"所有CSV文件已成功合并并保存到: {destination_path}")
    else:
        print("没有可合并的数据。")

# 示例调用
# 假设你的 'Sessions' 目录与你的脚本在同一目录下
# combine_nested_csvs('Sessions', 'weather_all.csv')

# 如果 'Sessions' 在其他位置,请提供完整路径
# combine_nested_csvs('/Users/YourUser/Documents/Sessions', 'weather_all.csv')

注意事项与最佳实践

  • 列名一致性: pd.concat() 在合并时会尝试对齐列。如果所有CSV文件的列名和顺序完全一致,合并将非常顺利。如果列名不一致,pd.concat 会创建所有列的并集,并在缺失值处填充 NaN。在合并前,建议检查并标准化所有CSV文件的列结构。
  • 内存管理: 对于数量巨大或单个文件很大的CSV数据集,将所有DataFrame一次性加载到内存中可能会导致内存溢出。在这种情况下,可以考虑以下策略:
    • 分块读取: 使用 pd.read_csv(..., chunksize=...) 分块读取并处理数据。
    • 逐步写入: 读取一个文件,写入输出文件(追加模式),然后读取下一个。
    • 使用 dask.dataframe: 对于超大数据集,dask 提供了类似于 pandas 的API,但能够处理超出内存限制的数据。
  • 错误处理: 在读取CSV文件时,应考虑文件可能为空 (pd.errors.EmptyDataError)、文件损坏或编码问题等异常情况。在示例代码中已加入了基本的 try-except 块来处理这些情况。
  • 文件编码: 确保 pd.read_csv() 和 df.to_csv() 使用正确的编码(如 'utf-8' 或 'utf-8-sig'),以避免乱码问题。
  • 文件复制: 如果除了合并之外,你确实需要将每个单独的CSV文件复制到一个目标目录,可以在读取文件的循环中添加 `shutil

终于介绍完啦!小伙伴们,这篇关于《Python递归合并多级CSV文件教程》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布文章相关知识,快来关注吧!

相关阅读
更多>
最新阅读
更多>
课程推荐
更多>