亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python實(shí)現(xiàn)將MySQL中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件并壓縮

 更新時(shí)間:2025年03月26日 15:58:20   作者:weixin_30777913  
這篇文章主要為大家詳細(xì)介紹了如何使用Python將MySQL數(shù)據(jù)庫中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件到一個(gè)目錄,并壓縮為zip文件到另一個(gè)目錄下,需要的可以了解下

Python將MySQL數(shù)據(jù)庫中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件到一個(gè)目錄,并壓縮為zip文件到另一個(gè)目錄下,然后解壓縮這個(gè)目錄中的所有zip文件到第三個(gè)目錄下。不使用Pandas庫,需要考慮SQL結(jié)果集是大數(shù)據(jù)量分批數(shù)據(jù)導(dǎo)出的情況,通過多線程和異步操作來提高程序性能,程序需要異常處理和輸出,輸出出錯(cuò)時(shí)的錯(cuò)誤信息,每次每個(gè)查詢導(dǎo)出數(shù)據(jù)的運(yùn)行狀態(tài)和表數(shù)據(jù)行數(shù)以及運(yùn)行時(shí)間戳,導(dǎo)出時(shí)間,輸出每個(gè)文件記錄數(shù)量的日志。

該腳本已在考慮大數(shù)據(jù)量、異常處理和性能優(yōu)化的基礎(chǔ)上進(jìn)行了全面設(shè)計(jì),能夠處理大多數(shù)常見場(chǎng)景。根據(jù)具體需求可進(jìn)一步調(diào)整批量大?。╞atch_size)和線程數(shù)(max_workers)以獲得最佳性能。

import os
import csv
import zipfile
import logging
import mysql.connector
from datetime import datetime
import time
import concurrent.futures
import glob

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('data_export.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000):
    """導(dǎo)出單個(gè)表的數(shù)據(jù)到CSV文件,分批處理"""
    conn = None
    cursor = None
    total_rows = 0
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()

        # 獲取數(shù)據(jù)并寫入CSV
        with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            
            # 執(zhí)行查詢并獲取列名
            cursor.execute(f"SELECT * FROM `{table_name}`")
            columns = [col[0] for col in cursor.description]
            writer.writerow(columns)
            
            # 分批獲取數(shù)據(jù)
            while True:
                rows = cursor.fetchmany(batch_size)
                if not rows:
                    break
                writer.writerows(rows)
                total_rows += len(rows)
                logger.debug(f"{table_name} 已導(dǎo)出 {total_rows} 行")

        logger.info(f"{table_name} CSV導(dǎo)出完成,總行數(shù):{total_rows}")
        return total_rows

    except Exception as e:
        logger.error(f"導(dǎo)出表 {table_name} 失敗: {str(e)}", exc_info=True)
        raise
    finally:
        if cursor:
            cursor.close()
        if conn and conn.is_connected():
            conn.close()

def compress_to_zip(source_path, zip_path):
    """壓縮文件為ZIP格式"""
    try:
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            zipf.write(source_path, arcname=os.path.basename(source_path))
        logger.info(f"成功壓縮 {source_path} 到 {zip_path}")
    except Exception as e:
        logger.error(f"壓縮 {source_path} 失敗: {str(e)}", exc_info=True)
        raise

def process_table(table_name, db_config, csv_dir, zip_dir):
    """處理單個(gè)表的導(dǎo)出和壓縮"""
    start_time = time.time()
    logger.info(f"開始處理表: {table_name}")
    status = "成功"
    rows_exported = 0

    try:
        # 定義文件路徑
        csv_filename = f"{table_name}.csv"
        zip_filename = f"{table_name}.zip"
        csv_path = os.path.join(csv_dir, csv_filename)
        zip_path = os.path.join(zip_dir, zip_filename)

        # 導(dǎo)出CSV
        rows_exported = export_table_to_csv(table_name, csv_path, db_config)
        
        # 壓縮文件
        compress_to_zip(csv_path, zip_path)

    except Exception as e:
        status = f"失敗: {str(e)}"
        # 清理可能存在的中間文件
        for path in [csv_path, zip_path]:
            if path and os.path.exists(path):
                try:
                    os.remove(path)
                    logger.warning(f"已清理文件: {path}")
                except Exception as clean_error:
                    logger.error(f"清理文件失敗: {clean_error}")

    finally:
        duration = time.time() - start_time
        log_message = (
            f"表處理完成 - 表名: {table_name}, "
            f"狀態(tài): {status}, "
            f"導(dǎo)出行數(shù): {rows_exported}, "
            f"耗時(shí): {duration:.2f}秒"
        )
        logger.info(log_message)

def unzip_files(zip_dir, unzip_dir):
    """解壓指定目錄中的所有ZIP文件"""
    zip_files = glob.glob(os.path.join(zip_dir, '*.zip'))
    if not zip_files:
        logger.warning("未找到ZIP文件,跳過解壓")
        return

    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for zip_path in zip_files:
            futures.append(executor.submit(
                lambda: extract_zip(zip_path, unzip_dir)
            ))
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                logger.error(f"解壓過程中發(fā)生錯(cuò)誤: {str(e)}")

def extract_zip(zip_path, unzip_dir):
    """解壓?jiǎn)蝹€(gè)ZIP文件"""
    try:
        start_time = time.time()
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(unzip_dir)
        duration = time.time() - start_time
        logger.info(f"解壓完成: {zip_path} => {unzip_dir} (耗時(shí): {duration:.2f}秒)")
    except Exception as e:
        logger.error(f"解壓 {zip_path} 失敗: {str(e)}", exc_info=True)
        raise

def main():
    # 配置參數(shù)
    db_config = {
        'host': 'localhost',
        'user': 'your_username',
        'password': 'your_password',
        'database': 'your_database'
    }
    
    # 目錄配置
    base_dir = os.path.dirname(os.path.abspath(__file__))
    csv_dir = os.path.join(base_dir, 'csv_exports')
    zip_dir = os.path.join(base_dir, 'zip_archives')
    unzip_dir = os.path.join(base_dir, 'unzipped_files')

    # 創(chuàng)建目錄
    for dir_path in [csv_dir, zip_dir, unzip_dir]:
        os.makedirs(dir_path, exist_ok=True)
        logger.info(f"目錄已準(zhǔn)備: {dir_path}")

    # 獲取所有表名
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]
        logger.info(f"發(fā)現(xiàn) {len(tables)} 個(gè)需要處理的表")
    except Exception as e:
        logger.error(f"獲取數(shù)據(jù)庫表失敗: {str(e)}", exc_info=True)
        return
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals() and conn.is_connected():
            conn.close()

    # 處理所有表(多線程導(dǎo)出和壓縮)
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = []
        for table in tables:
            futures.append(executor.submit(
                process_table,
                table,
                db_config,
                csv_dir,
                zip_dir
            ))

        # 處理任務(wù)結(jié)果
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                logger.error(f"表處理異常: {str(e)}")

    # 解壓所有ZIP文件(多線程解壓)
    logger.info("開始解壓所有ZIP文件")
    unzip_files(zip_dir, unzip_dir)
    logger.info("全部處理流程完成")

if __name__ == "__main__":
    main()

關(guān)鍵特性說明:

1.分批處理大數(shù)據(jù):

  • 使用fetchmany(batch_size)分批獲取數(shù)據(jù)(默認(rèn)每批1000行)
  • 流式處理減少內(nèi)存占用

2.多線程處理:

  • 使用ThreadPoolExecutor并行處理不同表的導(dǎo)出和壓縮
  • 獨(dú)立的數(shù)據(jù)庫連接池(每個(gè)線程有自己的連接)
  • 并行解壓處理

3.異常處理:

  • 全面的try-except塊覆蓋所有關(guān)鍵操作
  • 自動(dòng)清理失敗時(shí)產(chǎn)生的中間文件
  • 詳細(xì)的錯(cuò)誤日志記錄(包含堆棧跟蹤)

4.日志記錄:

  • 同時(shí)輸出到文件和終端
  • 記錄時(shí)間戳、操作類型、狀態(tài)、耗時(shí)等關(guān)鍵信息
  • 包含每個(gè)表的處理結(jié)果統(tǒng)計(jì)

5.文件管理:

  • 自動(dòng)創(chuàng)建所需目錄
  • 使用ZIP_DEFLATED進(jìn)行高效壓縮
  • 安全的文件路徑處理

6.性能優(yōu)化:

  • 使用服務(wù)器端游標(biāo)避免內(nèi)存過載
  • 可配置的批量大小和線程數(shù)
  • 異步I/O操作

使用說明:

安裝依賴:

pip install mysql-connector-python

修改配置:

更新db_config中的數(shù)據(jù)庫連接信息

根據(jù)需要調(diào)整目錄路徑(csv_dir, zip_dir, unzip_dir)

運(yùn)行腳本:

python script.py

查看日志:

實(shí)時(shí)終端輸出

詳細(xì)日志文件data_export.log

擴(kuò)展建議:

通過命令行參數(shù)接受數(shù)據(jù)庫配置和目錄路徑

添加郵件通知功能(處理完成或失敗時(shí)通知)

實(shí)現(xiàn)斷點(diǎn)續(xù)傳功能

添加文件校驗(yàn)(MD5校驗(yàn)和)

支持配置文件(YAML/JSON格式)

添加進(jìn)度條顯示

到此這篇關(guān)于Python實(shí)現(xiàn)將MySQL中所有表的數(shù)據(jù)都導(dǎo)出為CSV文件并壓縮的文章就介紹到這了,更多相關(guān)Python MySQL數(shù)據(jù)導(dǎo)出為CSV內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類的方法分析

    python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類的方法分析

    這篇文章主要介紹了python實(shí)現(xiàn)動(dòng)態(tài)創(chuàng)建類的方法,結(jié)合實(shí)例形式分析了Python動(dòng)態(tài)創(chuàng)建類的原理、實(shí)現(xiàn)方法及相關(guān)操作技巧,需要的朋友可以參考下
    2019-06-06
  • python多線程并發(fā)讓兩個(gè)LED同時(shí)亮的方法

    python多線程并發(fā)讓兩個(gè)LED同時(shí)亮的方法

    今天小編就為大家分享一篇python多線程并發(fā)讓兩個(gè)LED同時(shí)亮的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-02-02
  • Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法

    Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法

    今天小編就為大家分享一篇Python任意字符串轉(zhuǎn)16, 32, 64進(jìn)制的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-06-06
  • 將自己的數(shù)據(jù)集制作成TFRecord格式教程

    將自己的數(shù)據(jù)集制作成TFRecord格式教程

    今天小編就為大家分享一篇將自己的數(shù)據(jù)集制作成TFRecord格式教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-02-02
  • Python爬蟲之爬取二手房信息

    Python爬蟲之爬取二手房信息

    這篇文章主要介紹了Python爬蟲之爬取二手房信息,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)python爬蟲的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-04-04
  • python?lambda?表達(dá)式形式分析

    python?lambda?表達(dá)式形式分析

    這篇文章主要介紹了python?lambda?表達(dá)式形式分析,?lambda??表達(dá)式會(huì)創(chuàng)建一個(gè)函數(shù)對(duì)象,可以對(duì)其賦值并如同普通函數(shù)一樣使用,下面通過定義了一個(gè)求平方的?lambda?表達(dá)式展開主題內(nèi)容,需要的朋友可以參考一下
    2022-04-04
  • Python 通過requests實(shí)現(xiàn)騰訊新聞抓取爬蟲的方法

    Python 通過requests實(shí)現(xiàn)騰訊新聞抓取爬蟲的方法

    今天小編就為大家分享一篇Python 通過requests實(shí)現(xiàn)騰訊新聞抓取爬蟲的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2019-02-02
  • python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程

    python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程

    本文主要介紹了python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • Python安裝第三方庫及常見問題處理方法匯總

    Python安裝第三方庫及常見問題處理方法匯總

    本文給大家匯總介紹了Python安裝第三方庫及常見問題處理方法,非常的簡(jiǎn)單使用,有需要的小伙伴可以參考下
    2016-09-09
  • Python實(shí)現(xiàn)爬取馬云的微博功能示例

    Python實(shí)現(xiàn)爬取馬云的微博功能示例

    這篇文章主要介紹了Python實(shí)現(xiàn)爬取馬云的微博功能,結(jié)合實(shí)例形式較為詳細(xì)的分析了Python模擬ajax請(qǐng)求爬取馬云微博的相關(guān)操作技巧與注意事項(xiàng),需要的朋友可以參考下
    2019-02-02

最新評(píng)論