Python腳本實現(xiàn)Mysql數(shù)據(jù)遷移
一、為什么要做數(shù)據(jù)遷移
MySQL數(shù)據(jù)庫遷移是指將MySQL數(shù)據(jù)庫中的數(shù)據(jù)和結(jié)構(gòu)遷移到另一個MySQL實例,或者從一個MySQL實例遷移到另一個數(shù)據(jù)庫系統(tǒng)(如從MySQL遷移到MariaDB,或者從本地MySQL遷移到云數(shù)據(jù)庫)。其作用通常包括以下幾個方面:
1. 提升性能與擴展性
- 硬件升級:隨著業(yè)務(wù)的發(fā)展,原來的MySQL數(shù)據(jù)庫可能因為硬件或配置限制無法滿足需求。遷移到更強大的服務(wù)器或云平臺可以提高性能,支持更高的并發(fā)訪問。
- 分布式架構(gòu):對于大型應(yīng)用,將數(shù)據(jù)遷移到分布式數(shù)據(jù)庫架構(gòu)中,能提高系統(tǒng)的擴展性和負(fù)載均衡能力。
2. 降低成本
- 從本地遷移到云:將數(shù)據(jù)庫從本地環(huán)境遷移到云平臺(如AWS RDS、Google Cloud SQL、阿里云等),可以節(jié)省硬件和運維成本,且云平臺提供自動備份、自動擴展等功能。
- 選擇更具成本效益的數(shù)據(jù)庫實例:遷移到性價比更高的MySQL版本或?qū)嵗?,幫助企業(yè)節(jié)省開支。
3. 數(shù)據(jù)恢復(fù)和災(zāi)難恢復(fù)
數(shù)據(jù)遷移有時作為災(zāi)難恢復(fù)的一部分,幫助確保數(shù)據(jù)在主數(shù)據(jù)庫不可用時能迅速恢復(fù)。例如,將數(shù)據(jù)從主數(shù)據(jù)中心遷移到備用數(shù)據(jù)中心,確保業(yè)務(wù)不中斷。
4. 技術(shù)或版本更新
- 隨著MySQL數(shù)據(jù)庫技術(shù)的演進,新的版本(例如MySQL 8.0)提供了更強大的功能、性能優(yōu)化和安全性。遷移到新版本有助于利用這些改進,例如更高效的查詢執(zhí)行、更強的加密功能以及更靈活的配置。
- 兼容性問題:數(shù)據(jù)庫遷移有時是由于原有的MySQL版本或特性與新系統(tǒng)或需求不兼容,遷移到支持更好兼容性的新版本可以解決此問題。
5. 高可用性和負(fù)載均衡
- 遷移到支持高可用性(如主從復(fù)制、Galera Cluster等)和負(fù)載均衡的架構(gòu),可以提高數(shù)據(jù)庫的容錯能力和并發(fā)處理能力,減少單點故障。
- 可以通過遷移到支持集群的MySQL系統(tǒng),配置多個主節(jié)點或從節(jié)點,分?jǐn)倲?shù)據(jù)庫的訪問壓力,保障服務(wù)的穩(wěn)定性。
6. 數(shù)據(jù)備份和清理
- 在進行數(shù)據(jù)庫遷移的過程中,往往也會對數(shù)據(jù)進行清理和備份,去除冗余、無效或過期的數(shù)據(jù),并進行數(shù)據(jù)庫的優(yōu)化(如表結(jié)構(gòu)調(diào)整、索引優(yōu)化等)。這不僅能提高遷移后的數(shù)據(jù)質(zhì)量,也能提升新數(shù)據(jù)庫的性能。
7. 業(yè)務(wù)整合與統(tǒng)一
- 多數(shù)據(jù)庫整合:一些公司可能存在多個MySQL實例,遷移數(shù)據(jù)到統(tǒng)一的數(shù)據(jù)庫平臺可以整合數(shù)據(jù),簡化管理和運維工作,提高數(shù)據(jù)的一致性和完整性。
- 合并系統(tǒng):例如,企業(yè)收購了其他公司,原有的系統(tǒng)中有不同的MySQL數(shù)據(jù)庫,遷移合并到統(tǒng)一的數(shù)據(jù)庫平臺,方便統(tǒng)一管理和分析。
二. 數(shù)據(jù)遷移實戰(zhàn)
1. 需求分析

只篩選,源表中S_SMSSendLogs的字段RobotTaskId = 源表T_RobotTasks字段Id且字段TenantId = 17409350669509才需要遷移
2. Python腳本
import mysql.connector
import json
# 數(shù)據(jù)庫連接配置
source_db_config = {
'host': 'localhost',
'port': 3306,
'user': 'CallSystemProd',
'password': '******',
'database': ''******',' # 源數(shù)據(jù)庫名稱
}
target_db_config = {
'host': 'localhost',
'port': 3306,
'user': 'CallSystemTest',
'password': ''******',',
'database': ''******',' # 目標(biāo)數(shù)據(jù)庫名稱
}
# 連接到源數(shù)據(jù)庫
try:
source_conn = mysql.connector.connect(**source_db_config)
source_cursor = source_conn.cursor(dictionary=True)
print("成功連接到源數(shù)據(jù)庫")
except mysql.connector.Error as err:
print(f"連接源數(shù)據(jù)庫失敗: {err}")
exit()
# 連接到目標(biāo)數(shù)據(jù)庫
try:
target_conn = mysql.connector.connect(**target_db_config)
target_cursor = target_conn.cursor()
print("成功連接到目標(biāo)數(shù)據(jù)庫")
except mysql.connector.Error as err:
print(f"連接目標(biāo)數(shù)據(jù)庫失敗: {err}")
exit()
# 獲取源表數(shù)據(jù)(S_SMSSendLogs)
source_cursor.execute("""
SELECT Id, SMSId, Phone, Status, IsSucceed, Trade_id, RequestStr,
ResponseStr, RobotTaskId, ExtendCode, CreateTime, UpdateTime,
CreateUserId, UpdateUserId, IsDelete, TaskCallResultId, ProviderType
FROM S_SMSSendLogs
""")
columns = source_cursor.fetchall()
# 獲取源表數(shù)據(jù)(S_SMSTemplates),用于RequestStr中的替換
source_cursor.execute("""
SELECT SMSId, Content, Id
FROM S_SMSTemplates
""")
templates = {template['SMSId']: template for template in source_cursor.fetchall()}
# 獲取與TaskCallResultId相關(guān)的數(shù)據(jù)(t_taskcallresults和t_repeattaskcallresults)
source_cursor.execute("""
SELECT id, CreateUserId, UpdateUserId, ResidentsId, CustomerName
FROM T_TaskCallResults
""")
taskcallresults = {result['id']: result for result in source_cursor.fetchall()}
source_cursor.execute("""
SELECT id, CreateUserId, UpdateUserId, ResidentsId, CustomerName
FROM T_RepeatTaskCallResults
""")
repeattaskcallresults = {result['id']: result for result in source_cursor.fetchall()}
# 獲取RobotTaskId和OrgId相關(guān)的表數(shù)據(jù)(t_taskcallresults, t_robotrepeattasks, sysuser)
source_cursor.execute("""
SELECT id, CreateUserId
FROM T_TaskCallResults
""")
taskcallresults_user_map = {result['id']: result['CreateUserId'] for result in source_cursor.fetchall()}
source_cursor.execute("""
SELECT id, CreateUserId
FROM T_RobotRepeatTasks
""")
robotrepeattasks_user_map = {result['id']: result['CreateUserId'] for result in source_cursor.fetchall()}
source_cursor.execute("""
SELECT Id, OrgId
FROM SysUser
""")
sysuser_map = {user['Id']: user['OrgId'] for user in source_cursor.fetchall()}
# 構(gòu)造批量插入語句
insert_query = """
INSERT INTO S_SMSSendLogs (Id, SMSId, Phone, Status, IsSucceed, Trade_id, RequestStr,
ResponseStr, RobotTaskId, ExtendCode, CreateTime, UpdateTime,
CreateUserId, UpdateUserId, IsDelete, TaskCallResultId,
ProviderType, OrgId, Content, CharCount, BillingCount, ResidentsId,
CustomerName, TemplateId)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 構(gòu)造插入的值(批量數(shù)據(jù)),應(yīng)用轉(zhuǎn)換規(guī)則
insert_values = []
for column in columns:
# Status 字段直接遷移,無需轉(zhuǎn)換
status = column['Status']
# 將 IsSucceed 轉(zhuǎn)換為 tinyint類型的 0
is_succeed = 0 if column['IsSucceed'] == 1 else column['IsSucceed']
# RequestStr 替換操作
request_str = column['RequestStr']
template = templates.get(column['SMSId'])
if template:
template_content = template['Content']
try:
# 獲取 RequestStr 中的變量并替換
request_values = json.loads(request_str).get('arguments', {})
for key, value in request_values.items():
template_content = template_content.replace(f"#{key}#", str(value) if value is not None else "")
except json.JSONDecodeError:
pass # 如果 JSON 格式不正確,跳過替換
request_str = template_content
# 計算字符數(shù)
char_count = len(request_str)
# 計算計費條數(shù)
billing_count = (char_count + 66) // 67 # 每條超過67個字算第二條
# 獲取相關(guān)的 UserId 和 OrgId
create_user_id = taskcallresults.get(column['TaskCallResultId'], {}).get('CreateUserId') or \
repeattaskcallresults.get(column['TaskCallResultId'], {}).get('CreateUserId')
update_user_id = taskcallresults.get(column['TaskCallResultId'], {}).get('UpdateUserId') or \
repeattaskcallresults.get(column['TaskCallResultId'], {}).get('UpdateUserId')
# ProviderType 轉(zhuǎn)換
provider_type = column['ProviderType']
if provider_type == 1:
provider_type = 0
elif provider_type == 2:
provider_type = 3
# OrgId 通過 CreateUserId 獲取
create_user_id_for_org = taskcallresults_user_map.get(column['RobotTaskId']) or \
robotrepeattasks_user_map.get(column['RobotTaskId'])
org_id = sysuser_map.get(create_user_id_for_org, None)
# 構(gòu)造數(shù)據(jù)插入
insert_values.append((
column['Id'], column['SMSId'], column['Phone'], status, is_succeed, column['Trade_id'],
request_str, column['ResponseStr'], column['RobotTaskId'], column['ExtendCode'],
column['CreateTime'], column['UpdateTime'], create_user_id, update_user_id,
column['IsDelete'], column['TaskCallResultId'], provider_type, org_id, request_str, char_count,
billing_count, taskcallresults.get(column['TaskCallResultId'], {}).get('ResidentsId'),
taskcallresults.get(column['TaskCallResultId'], {}).get('CustomerName'),
template['Id'] if template else None
))
# 執(zhí)行批量插入
target_cursor.executemany(insert_query, insert_values)
target_conn.commit()
print(f"成功遷移了 {len(insert_values)} 條數(shù)據(jù)")
# 關(guān)閉數(shù)據(jù)庫連接
source_cursor.close()
source_conn.close()
target_cursor.close()
target_conn.close()以上就是Python腳本實現(xiàn)Mysql數(shù)據(jù)遷移的詳細(xì)內(nèi)容,更多關(guān)于Mysql數(shù)據(jù)遷移的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python+OpenCV進行不規(guī)則多邊形ROI區(qū)域提取
ROI即感興趣區(qū)域。機器視覺、圖像處理中,從被處理的圖像以方框、圓、橢圓、不規(guī)則多邊形等方式勾勒出需要處理的區(qū)域,稱為感興趣區(qū)域,ROI。本文將利用Python和OpenCV實現(xiàn)不規(guī)則多邊形ROI區(qū)域提取,需要的可以參考一下2022-03-03
Python實現(xiàn)疫情通定時自動填寫功能(附代碼)
這篇文章主要介紹了Python實現(xiàn)疫情通定時自動填寫功能,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05

