python從Hadoop?HDFS導出數據到關系數據庫
python從HDFS導出到關系數據庫(如MySQL、Oracle、PostgreSQL)
一整套從Hadoop HDFS中導出數據并通過DataX工具導入到關系數據庫的過程。

操作步驟
1. 定義參數和變量
sql=$1 # 導出數據的SQL語句
s_tablename=$2 # 源表名
ds_name=$3 # 目標數據庫名稱
t_tablename=$4 # 目標表名
temptable="h2o_"`date +%s%N | md5sum | head -c 16` # 生成一個基于時間戳的臨時表名
filename=${s_tablename}_${temptable} # 文件名
path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" # HDFS路徑
local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" # 本地腳本路徑
flag=$5 # 標志,用來確定是否TRUNCATE表
2. 構造SQL查詢并提交給Hive
echo "$sql"
sql1=`echo "$sql"|cut -d ";" -f2` # 截取分號后的部分
sql0=`echo "$sql"|cut -d ";" -f1` # 截取分號前的部分
sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" # 構建最終的SQL
echo "$sql0"
kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM
beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181, ... ,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
3. 獲取目標數據庫連接信息并解析結果為變量
從PostgreSQL數據庫中獲取目標數據庫的連接信息,并解析結果為變量。
re=$(PGPASSWORD=... psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF
SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8')
WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url'
WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url
,ds_acct
,ds_auth
,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter'
WHEN ds_type = 'oracle' THEN 'oraclewriter'
WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
AND upper(trim(ds_name)) = upper(trim('$ds_name'))
EOF
)
eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')
4. 獲取目標數據庫密碼
通過執(zhí)行Java程序解密數據庫密碼:
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`
5. 預處理SQL語句
根據標志變量flag,確定是否執(zhí)行TRUNCATE語句:
preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL"
6. 數據導出并導入目標數據庫
使用datax執(zhí)行從HDFS導入到關系數據庫的任務:
python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass="$pw" -Drdb_jdbc="$jdbc_url" -Drdb_table=$t_tablename -DpreSql="$preSQL"" $local_path/hdfs_to_rdb.json
7. Python代碼詳解
此外,你還展示了大量的Python代碼用于處理數據轉換和傳輸。重點如下:
1. 初始化設置和依賴
加載必要的包,并初始化變量。
import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 # 其他初始化設置
2. 連接數據庫并執(zhí)行SQL
定義了連接數據庫并執(zhí)行SQL的函數:
def connect_database_to_select(conn, sql):
cursor = conn.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
conn.commit()
return result
except Exception as e:
print('SQL執(zhí)行錯誤:{},執(zhí)行SQL:{}'.format(str(e), sql))
sys.exit(2)
finally:
cursor.close()
def connect_database_to_commit(exe_type, conn, sql, insert_list):
cursor = conn.cursor()
try:
if exe_type.lower() in ('delete', 'insert'):
cursor.execute(sql)
conn.commit()
elif exe_type.lower() == 'insertmany':
cursor.executemany(sql, insert_list)
conn.commit()
except Exception as e:
print('SQL執(zhí)行錯誤:{},執(zhí)行SQL:{}'.format(str(e), sql))
sys.exit(2)
finally:
cursor.close()
3. 數據導出處理
執(zhí)行數據導出并提交給Hive:
def produce_exe_data(sql, s_tablename):
global local_path_name
local_path_01 = local_path_list[random.randrange(len(local_path_list))] + '/dataos_exp'
local_path_name = "h2o_{0}_{1}".format(s_tablename, get_md5_str()).lower()
local_path = local_path_01 + '/' + local_path_name
if os.path.exists(local_path):
cmd = 'rm -rf {}'.format(local_path)
exe_system_cmd(cmd)
os.mkdir(local_path)
hdfs_path = "hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name)
sql = sql.strip().strip(';')
sql_list = sql.split(';')
hive_conn = hive_connect()
compress_sql = 'set hive.exec.compress.output=false'
connect_database_to_commit('insert', hive_conn, compress_sql, '')
for i in range(len(sql_list)):
sql_str = sql_list[i]
if i == len(sql_list)-1: # 如果是最后一條SQL,則執(zhí)行insert overwrite directory
sql_str='''INSERT OVERWRITE DIRECTORY '{0}'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' COLLECTION ITEMS TERMINATED BY '\n' MAP KEYS TERMINATED BY ':'
{1} '''.format(hdfs_path, sql_str)
connect_database_to_commit('insert', hive_conn, sql_str, '')
if hive_conn:
hive_conn.close()
cmd = ''' hdfs dfs -get {}/* {} '''.format(hdfs_path, local_path)
exe_system_cmd(cmd)
return local_path, hdfs_path
4. 多線程數據傳輸
利用多線程加速數據傳輸過程:
def thread_exe_exchange_data(g_tablename, flag, local_path, hdfs_path):
global rdb_conn
rdb_conn = get_rdb_database_conn()
if flag.upper() == 'T':
presql = 'truncate table {}'.format(g_tablename)
connect_database_to_commit('insert', rdb_conn, presql, '')
if ds_type.lower() == 'oracle':
global oracle_table_field
oracle_table_field = get_oracle_table_fields()
localtime = str(time.strftime("%Y%m%d", time.localtime()))
ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime)
if not os.path.exists(ora_dir):
os.mkdir(ora_dir)
file_lists = os.listdir(local_path)
global exp_num_list
global log_list
global exception_list
exp_num_list = []
log_list = []
exception_list = []
thread_list = []
for file_name in file_lists:
thread_list.append(local_path + '/' + file_name)
pool = threadpool.ThreadPool(5)
requests = threadpool.makeRequests(exchange_data, thread_list)
[pool.putRequest(req) for req in requests]
pool.wait()
if exception_list:
delete_local_path(local_path, hdfs_path)
sys.exit(2)
print('數據導出完成,導出數據總量為:{}'.format(sum(exp_num_list)))
完整python腳本
#!/bin/bash
sql=$1 #導出數據sql
s_tablename=$2 #源表
ds_name=$3 #目標庫
t_tablename=$4 #目標表
temptable="h2o_"`date +%s%N | md5sum | head -c 16` #構造一個時間戳
filename=${s_tablename}_${temptable} #文件名
path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/"
local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp"
flag=$5 #t TRUNCATE
#hadoop fs -mkdir $path
# 參數sql0為 待執(zhí)行SQL
echo "$sql"
sql1=`echo "$sql"|cut -d ";" -f2`
sql0=`echo "$sql"|cut -d ";" -f1`
sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1"
echo "$sql0"
# 向Hive提交HQL
kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM
#beeline <<EOF
#!connect jdbc:hive2://devdataosambari:2181,devdataosnn1:2181,devdataosnn2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
#$sql0
beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181,prdnn2.yxbdprd.sc.ctc.com:2181,prdrm1.yxbdprd.sc.ctc.com:2181,prddb2.yxbdprd.sc.ctc.com:2181,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
# 獲取目標數據源地址
#eval $(mysql -h 10.251.88.71 -udacp -pdacp123456 dacp_dev -e "select case when ds_type = 'mysql' then concat('jdbc:mysql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')),'?characterEncoding=UTF-8')
#when ds_type = 'oracle' then concat('jdbc:oracle:thin:@', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')))
#when ds_type = 'pg' then concat('jdbc:postgresql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) end jdbc_url,
#ds_acct, ds_auth, case when ds_type = 'mysql' then 'mysqlwriter' when ds_type = 'oracle' then 'oraclewriter' when ds_type = 'pg' then 'postgresqlwriter' end ds_type
#from dacp_meta_datasource
#where ds_type in ('mysql','oracle','pg')
#and ds_name = '$ds_name'" | awk 'NR== 2 {printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$2,$3,$4)}')
re=$(PGPASSWORD=jxFgCKv9GJw2ohS3 psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF
SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8')
WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url'
WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url
,ds_acct
,ds_auth
,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter'
WHEN ds_type = 'oracle' THEN 'oraclewriter'
WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
AND upper(trim(ds_name)) = upper(trim('$ds_name'))
EOF
)
eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')
#eval $(java -jar /data01/etl/scripts/exec_aes.jar $ds_auth | awk ' {printf("pw=%s;",$1)}')
#pw=`java -jar $local_path/exec_aes.jar $ds_auth`
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`
preSQL="select * from $t_tablename where 1=-1"
if [ "$flag" = "T" ];then
preSQL="truncate table $t_tablename"
fi
echo "preSQL=$preSQL"
python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass=\"$pw\" -Drdb_jdbc=\"$jdbc_url\" -Drdb_table=$t_tablename -DpreSql=\"$preSQL\"" $local_path/hdfs_to_rdb.json
# -*- coding:utf-8 -*-
import time
import datetime
import os
import threadpool
import commands
import calendar
import random
import pymssql
import pymysql
import cx_Oracle
import psycopg2
import socket
from pyhdfs import HdfsClient
from hashlib import md5
import sys
reload(sys)
sys.setdefaultencoding('utf8')
sys.path.append('/data02/dcadmin/scripts/common')
from connect_postgresql import postgresql_connect
from connect_hive import hive_connect
pg_conn_str='dataos_71_pg_dev'
# 本地磁盤目錄,文件隨機選擇一個目錄
local_path_list=['/data01','/data02','/data03','/data04','/data05']
def close_datadb_conn():
if rdb_conn:
rdb_conn.close()
def connect_database_to_select(conn,sql):
# print('\r\n執(zhí)行SQL:{}'.format(sql))
cursor = conn.cursor()
try:
cursor.execute(sql)
#cursor.execute(sql.decode('utf-8').encode('gbk'))
result = cursor.fetchall()
conn.commit()
return result
except Exception as e:
print('SQL執(zhí)行錯誤:{},執(zhí)行SQL:{}'.format(str(e),sql))
sys.exit(2)
finally:
cursor.close()
def connect_database_to_commit(exe_type,conn,sql,insert_list):
# print('\r\n執(zhí)行SQL:{}'.format(sql))
cursor = conn.cursor()
try:
if exe_type.lower() in ('delete','insert'):
cursor.execute(sql)
conn.commit()
print('執(zhí)行SQL:{}'.format(sql))
elif exe_type.lower()=='insertmany':
cursor.executemany(sql, insert_list)
conn.commit()
except Exception as e:
print('SQL執(zhí)行錯誤c:{},執(zhí)行SQL:{}'.format(str(e),sql))
print(sql)
sys.exit(2)
finally:
cursor.close()
# 執(zhí)行系統命令
def exe_system_cmd(cmd):
status,output=commands.getstatusoutput(cmd)
if status!=0:
print('命令{}:執(zhí)行失敗,請檢查!'.format(cmd))
print('失敗日志:{}'.format(output))
sys.exit(2)
return output
# 返回MD5串
def get_md5_str():
# 時間戳
ts = calendar.timegm(time.gmtime())
md5_str=md5(str(ts).encode(encoding='utf-8')).hexdigest()
return md5_str
# 判斷輸入參數
def judge_input_parameters_num():
if len(sys.argv)!=6:
print('參數有問題,請檢查!')
print(sys.argv)
sys.exit(2)
else:
sql =sys.argv[1] # 導出數據sql
s_tablename =sys.argv[2] # 源表名
ds_name =sys.argv[3] # 目標庫
g_tablename =sys.argv[4] # 目標表
flag =sys.argv[5] # A:append,T:truncate
return sql,s_tablename,ds_name,g_tablename,flag
# 執(zhí)行SQL語句,生成HDFS文件
def produce_exe_data(sql,s_tablename):
global local_path_name
# 1、創(chuàng)建本地文件夾
# 隨機選擇一個磁盤目錄:'/data01','/data02','/data03','/data04','/data05'
local_path_01 = local_path_list[random.randrange(len(local_path_list))]+'/dataos_exp' # /data01/dataos_exp
local_path_name="h2o_{0}_{1}".format(s_tablename,get_md5_str()).lower()
# local_path_name='h2o_app_hub_resource_value_level_d_e6963bad13299e939a3a4cc2b2a26a47'
local_path=local_path_01+'/'+local_path_name
if os.path.exists(local_path):
cmd='rm -rf {}'.format(local_path)
exe_system_cmd(cmd)
os.mkdir(local_path)
# 創(chuàng)建hdfs文件夾
hdfs_path="hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name)
# 處理SQL,先去除兩邊的空格,再去除兩邊的分號
sql=sql.strip().strip(';')
sql_list=sql.split(';')
# 依次執(zhí)行切分的SQL
hive_conn=hive_connect() # 連接生產HIVE
compress_sql='set hive.exec.compress.output=false'
print('執(zhí)行SQL:{}'.format(compress_sql))
connect_database_to_commit('insert',hive_conn,compress_sql,'')
for i in range(len(sql_list)):
sql_str=sql_list[i]
if i==len(sql_list)-1: # 如果是最后一條SQL,則執(zhí)行insert overwrite directory
sql_str='''INSERT OVERWRITE DIRECTORY '{0}'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' COLLECTION ITEMS TERMINATED BY '\\n' MAP KEYS TERMINATED BY ':'
{1} '''.format(hdfs_path,sql_str)
print('執(zhí)行SQL:{}'.format(sql_str))
connect_database_to_commit('insert',hive_conn,sql_str,'')
# 關閉HIVE連接
if hive_conn:
hive_conn.close()
# 將hdfs文件從hdfs_path路徑get到local_path下
cmd=''' hdfs dfs -get {}/* {} '''.format(hdfs_path,local_path)
exe_system_cmd(cmd)
print('文件GET成功,當前主機:{},數據臨時文件夾:{}'.format(socket.gethostname(),local_path))
return local_path,hdfs_path
# 獲取目標端的連接信息
def get_rdb_conn_msg(ds_name):
global ds_type
global ds_acct
global ds_auth
global host
global port
global database
global jdbc_url
sql='''
SELECT ds_name
,ds_type
,ds_acct
,ds_auth
,split_part(ds_inst_loc,':',1) as host
,case when split_part(ds_inst_loc,':',2)='' and ds_type='oracle' then '1521' else split_part(ds_inst_loc,':',2) end as port
,case when lower(ds_type)='oracle' then split_part(replace(replace(replace(ds_conf::json->>'url','jdbc:oracle:thin:@',''),':1521',''),'/',':'),':',2) else ds_conf::json->>'physicalDbName' end as database
,CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8')
WHEN ds_type = 'oracle' THEN ds_conf::json ->>'url'
WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END as jdbc_url
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
AND upper(trim(ds_name)) = upper(trim('{}')) '''.format(ds_name)
pg_conn=postgresql_connect(pg_conn_str)
results=connect_database_to_select(pg_conn,sql)
# print(results)
if not results:
print('未查詢到數據庫連接信息,請檢查,DS_NAME:{}'.format(ds_name))
sys.exit(2)
# 關閉數據庫連接
if pg_conn:
pg_conn.close()
# 解密密碼
cmd='''java -Dpwd='{0}' -jar /data02/dcadmin/scripts/common/AesCipher-1.0.jar'''.format(results[0][3])
pw=exe_system_cmd(cmd).replace('\r','').replace('\n','')
ds_type = results[0][1]ds_acct = results[0][2]
ds_auth = pw
host = results[0][4]
port = int(results[0][5])
database= results[0][6]
jdbc_url= results[0][7]
# 判斷連接的數據庫類型,并返回數據庫連接conn
def get_rdb_database_conn():
dbms_conn=None
try:
if ds_type.upper()=='SQLSERVER':
dbms_conn = pymssql.connect(host=host , user=ds_acct, password=ds_auth, port=port, database=database, charset='utf8')
elif ds_type.upper()=='MYSQL':
dbms_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True)
elif ds_type.upper()=='ORACLE':
listener = '{0}:{1}/{2}'.format(host,port,database)
print('listener:{}'.format(listener))
dbms_conn = cx_Oracle.connect(ds_acct,ds_auth,listener,encoding='utf-8')
elif ds_type.upper() in ('POSTGRESQL','PG'):
dbms_conn = psycopg2.connect(host=host , user=ds_acct, password=ds_auth , port=port, database=database, client_encoding='utf8')
else:
print("未知源端數據庫類型{}~~~~~,請檢查!".format(ds_type.upper()))
sys.exit(2)
except Exception as e:
print('{0},{1}數據庫連接失敗,請檢查~~~~~~!'.format(ds_type,ds_name))
print('報錯日志:{}'.format(e))
print(host)
print(ds_acct)
print(ds_auth)
print(port)
print(database)
sys.exit(2)
return dbms_conn
def thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path):
global rdb_conn
rdb_conn=get_rdb_database_conn()
# 執(zhí)行預處理SQL
if flag.upper()=='T':
presql='truncate table {}'.format(g_tablename)
print('執(zhí)行SQL:{}'.format(presql))
connect_database_to_commit('insert',rdb_conn,presql,'')
# 獲取Oracle表結構
if ds_type.lower() in ('oracle'):
global oracle_table_field # oracle 表結構
oracle_table_field=get_oracle_table_fields()
# 創(chuàng)建ctl,bad,log存放目錄
global ora_dir
localtime = str(time.strftime("%Y%m%d", time.localtime()))
ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime)
if not os.path.exists(ora_dir):
os.mkdir(ora_dir)
# 文件列表
file_lists=os.listdir(local_path)
# 多線程導數
global exp_num_list # 存儲導數數量
global log_list # 存儲多線程的日志信息
global exception_list # 存儲多線程異常信息
exp_num_list =[]
log_list =[]
exception_list=[]
thread_list=[] # 存儲多線程任務
for file_name in file_lists:
thread_list.append(local_path+'/'+file_name)
# 創(chuàng)建線程池
pool=threadpool.ThreadPool(5)
# 存放任務列表
requests = threadpool.makeRequests(exchange_data,thread_list)
[pool.putRequest(req) for req in requests]
pool.wait()
# 處理異常
if exception_list:
# 導數出現異常,刪除文件
delete_local_path(local_path,hdfs_path)
print('導數失敗,異常日志信息如下:')
for except_msg in exception_list:
print(except_msg)
sys.exit(2)
# 打印多線程日志
# log_list.sort()
# for log in log_list:
# print(log)
# 打印導出結果
print('數據導出完成,導出數據總量為:{}'.format(sum(exp_num_list)))
# 獲取Oracle表結構
def get_oracle_table_fields():
sql = '''
SELECT COLUMN_NAME || SUFFIX AS AA
FROM (SELECT A.COLUMN_NAME
,A.COLUMN_ID
,CASE WHEN UPPER(A.DATA_TYPE) LIKE '%DATE%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss"'
WHEN UPPER(A.DATA_TYPE) LIKE '%TIMESTAMP%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss.ff"'
WHEN UPPER(A.DATA_TYPE) LIKE '%VARCHAR%' THEN ' CHAR(3000)'
ELSE '' END AS SUFFIX
FROM ALL_TAB_COLUMNS A
WHERE UPPER(A.OWNER||'.'||A.TABLE_NAME) = UPPER(TRIM('{0}'))
ORDER BY A.COLUMN_ID) '''
if '.' in g_tablename:
sql=sql.format(g_tablename)
else:
sql=sql.format(database+'.'+g_tablename)
oracle_table_fields=connect_database_to_select(rdb_conn,sql)
if not oracle_table_fields:
print('未查詢到表結構,表名:{}'.format(g_tablename))
sys.exit(2)
oracle_table_field = ",\n".join([str(list[0]) for list in oracle_table_fields])
return oracle_table_field
# 執(zhí)行單個導出任務
def exchange_data(file_path):
try:
output=''
# 執(zhí)行導數任務
if ds_type.lower() in ('pg','postgresql','telpg','antdb'):
cmd='''psql "port={0} host={1} user={2} dbname={3} password={4} " -c "\copy {5} from '{6}' DELIMITER AS E'\u0001' " '''
cmd=cmd.format(port,host,ds_acct,database,ds_auth,g_tablename,file_path)
status,output=commands.getstatusoutput(cmd)
if status!=0:
exception_list.append('命令{}:執(zhí)行失敗,請檢查!失敗日志:{}'.format(cmd,output))
elif ds_type.lower() in ('mysql','teldb'):
mysql_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True)
mysql_cursor=mysql_conn.cursor()
sql='SET NAMES UTF8'
mysql_cursor.execute(sql)
sql='''load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n' '''.format(file_path,g_tablename)
#print(sql)
output=mysql_cursor.execute(sql)
mysql_conn.commit()
mysql_conn.close()
# cmd='''mysql -h {} -P {} -u {} -p{} -D {} -e "SET NAMES UTF8;load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'" '''
# cmd=cmd.format(host,port,ds_acct,ds_auth,database,file_path,g_tablename)
elif ds_type.lower() in ('oracle'):
tns='''\'{}/"{}"\'@{}:1521/{}'''.format(ds_acct,ds_auth,host,database)
ora_file_name=file_path.replace(local_path+'/','')
ora_file_path=ora_dir+'/'+local_path_name+'_'+ora_file_name
control_file = ora_file_path+".ctl"
log_file = ora_file_path+".log"
bad_file = ora_file_path+".bad"
dis_file = ora_file_path+".dis"
content ='''
UNRECOVERABLE LOAD DATA CHARACTERSET AL32UTF8
APPEND INTO TABLE {0} FIELDS TERMINATED BY x'01'
TRAILING NULLCOLS ({1}) '''.format(g_tablename, oracle_table_field)
# 如果控制文件存在,則先刪除
if os.path.exists(control_file):
cmd='rm -rf {}'.format(control_file)
exe_system_cmd(cmd)
# 再創(chuàng)建控制文件
with open(control_file, "w") as file:
file.write(content)
cmd='''export ORACLE_HOME=/data03/apps/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH;cat {0} | /data03/apps/db_1/bin/sqlldr userid={1} control={2} data=\\"-\\" log={3} bad={4} discard={5} errors=0 direct=true parallel=true multithreading=true columnarrayrows=100000 STREAMSIZE=20971520 readsize=20971520 bindsize=20971520 date_cache=0 '''
cmd=cmd.format(file_path, tns, control_file, log_file, bad_file, dis_file)
status,output=commands.getstatusoutput(cmd)
if status!=0:
exception_list.append('命令{}:執(zhí)行失敗,請檢查!失敗日志:{}'.format(cmd,output))
else:
exception_list.append('目標端數據庫類型為:{},此類型暫未支持!'.format(db_type.lower()))
# 計算導出行數
if ds_type.lower() in ('pg','postgresql','telpg','antdb'):
file_row_num=int(output.split('COPY ')[1].strip())
exp_num_list.append(file_row_num)
elif ds_type.lower() in ('oracle'):
try:
output=output.decode('gbk')
except:
output=output
file_row_num=int(output.split('邏輯記錄計數 ')[1].replace('。','').strip())
exp_num_list.append(file_row_num)
elif ds_type.lower() in ('mysql','teldb'):
exp_num_list.append(output)
# 插入日志
log_list.append(output)
except Exception as e:
exception_list.append(e)
def delete_local_path(local_path,hdfs_path):
cmd='rm -rf {}'.format(local_path)
exe_system_cmd(cmd)
print('本地文件夾刪除成功。')
cmd='hdfs dfs -rm -r {}'.format(hdfs_path)
exe_system_cmd(cmd)
print('HDFS文件夾刪除成功。')
if __name__ == '__main__':
starttime = datetime.datetime.now()
print('開始時間:{0}'.format(starttime.strftime('%Y-%m-%d %H:%M:%S')))
# 1、判斷輸入參數
sql,s_tablename,ds_name,g_tablename,flag=judge_input_parameters_num()
# 2、執(zhí)行SQL,生產文件,并返回本地目錄
local_path,hdfs_path=produce_exe_data(sql,s_tablename)
hdfs_time=datetime.datetime.now()
#print('當前時間:{}'.format(hdfs_time))
print("生成HDFS文件耗時:{0}秒".format((hdfs_time - starttime).seconds))
# 3、獲取目標端連接信息(host,port等)
get_rdb_conn_msg(ds_name)
# 4、執(zhí)行導數任務
thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path)
# 5、刪除本地文件夾
delete_local_path(local_path,hdfs_path)
# 6、關閉數據庫連接
close_datadb_conn()
endtime = datetime.datetime.now()
print('結束時間:{0}'.format(endtime.strftime('%Y-%m-%d %H:%M:%S')))
print('導數耗時:{0}秒'.format((endtime - hdfs_time).seconds))
print("一共耗時:{0}秒".format((endtime - starttime).seconds))總結
整個腳本有效地實現了從HDFS到關系數據庫的數據遷移,確保數據的完整性和一致性。首先通過Hive導出數據,再利用多線程和DataX工具導入到目標數據庫。本地化和多線程處理使過程更高效,適合大數據處理和數據倉庫遷移。
請務必按需調整腳本中的具體參數和配置以適應你的環(huán)境和數據架構。
以上就是python從Hadoop HDFS導出數據到關系數據庫的詳細內容,更多關于python HDFS導出數據到數據庫的資料請關注腳本之家其它相關文章!
相關文章
Python如何利用Har文件進行遍歷指定字典替換提交的數據詳解
這篇文章主要給大家介紹了關于Python如何利用Har文件進行遍歷指定字典替換提交的數據的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-11-11
anaconda jupyter不能導入安裝的lightgbm解決方案
這篇文章主要介紹了anaconda jupyter不能導入安裝的lightgbm解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03

