Python腳本實(shí)現(xiàn)datax全量同步mysql到hive
前言
在我們構(gòu)建離線數(shù)倉時(shí)或者遷移數(shù)據(jù)時(shí),通常選用sqoop和datax等工具進(jìn)行操作,sqoop和datax各有優(yōu)點(diǎn),datax優(yōu)點(diǎn)也很明顯,基于內(nèi)存,所以速度上很快,那么在進(jìn)行全量同步時(shí)編寫json文件是一項(xiàng)很繁瑣的事,是否可以編寫腳本來把繁瑣事來簡單化,接下來我將分享這樣一個(gè)mysql全量同步到hive自動生成json文件的python腳本。
一、展示腳本
# coding=utf-8 import json import getopt import os import sys import pymysql # MySQL 相關(guān)配置,需根據(jù)實(shí)際情況作出修改 mysql_host = "XXXXXX" mysql_port = "XXXX" mysql_user = "XXX" mysql_passwd = "XXXXXX" # HDFS NameNode 相關(guān)配置,需根據(jù)實(shí)際情況作出修改 hdfs_nn_host = "XXXXXX" hdfs_nn_port = "XXXX" # 生成配置文件的目標(biāo)路徑,可根據(jù)實(shí)際情況作出修改 output_path = "/XXX/XXX/XXX" def get_connection(): return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd) def get_mysql_meta(database, table): connection = get_connection() cursor = connection.cursor() sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION" cursor.execute(sql, [database, table]) fetchall = cursor.fetchall() cursor.close() connection.close() return fetchall def get_mysql_columns(database, table): return list(map(lambda x: x[0], get_mysql_meta(database, table))) def get_hive_columns(database, table): def type_mapping(mysql_type): mappings = { "bigint": "bigint", "int": "bigint", "smallint": "bigint", "tinyint": "bigint", "decimal": "string", "double": "double", "float": "float", "binary": "string", "char": "string", "varchar": "string", "datetime": "string", "time": "string", "timestamp": "string", "date": "string", "text": "string" } return mappings[mysql_type] meta = get_mysql_meta(database, table) return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)) def generate_json(source_database, source_table): job = { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": mysql_user, "password": mysql_passwd, "column": get_mysql_columns(source_database, source_table), "splitPk": "", "connection": [{ "table": [source_table], "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database] }] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port, "fileType": "text", "path": "${targetdir}", "fileName": source_table, "column": get_hive_columns(source_database, source_table), "writeMode": "append", "fieldDelimiter": "\t", "compress": "gzip" } } }] } } if not os.path.exists(output_path): os.makedirs(output_path) with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f: json.dump(job, f) def main(args): source_database = "" source_table = "" options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl=']) for opt_name, opt_value in options: if opt_name in ('-d', '--sourcedb'): source_database = opt_value if opt_name in ('-t', '--sourcetbl'): source_table = opt_value generate_json(source_database, source_table) if __name__ == '__main__': main(sys.argv[1:])
二、使用準(zhǔn)備
1、安裝python環(huán)境
這里我安裝的是python3環(huán)境
sudo yum install -y python3
2、安裝EPEL
EPEL(Extra Packages for Enterprise Linux)是一個(gè)由 Fedora Special Interest Group 維護(hù)的軟件倉庫,提供了大量在官方 RHEL 或 CentOS 軟件倉庫中沒有的軟件包。當(dāng)你在 CentOS 或 RHEL 系統(tǒng)上需要安裝一些不在官方軟件倉庫中的軟件時(shí),通常會先安裝epel - release
sudo yum install -y epel-release
3、安裝腳本執(zhí)行需要的第三方模塊
pip3 install pymysql pip3 install cryptography
這里可能由于斑紋問題cryptography安裝不上去更新一下pip和setuptools
pip3 install --upgrade pip pip3 install --upgrade setuptools
重新安裝cryptography
pip3 install cryptography
三、腳本使用方法
1、配置腳本
首先根據(jù)自己服務(wù)器修改腳本相關(guān)配置
2、創(chuàng)建.py文件
vim /xxx/xxx/xxx/gen_import_config.py
3、執(zhí)行腳本
python3 /腳本路徑/gen_import_config.py -d 數(shù)據(jù)庫名 -t 表名
4、測試生成json文件是否可用
datax.py -p"-Dtargetdir=/表在hdfs存放路徑" /生成的json文件路徑
執(zhí)行時(shí)首先要確保targetdir目標(biāo)地址在hdfs上存在,如果沒有需要?jiǎng)?chuàng)建后再次執(zhí)行
到此這篇關(guān)于Python腳本實(shí)現(xiàn)datax全量同步mysql到hive的文章就介紹到這了,更多相關(guān)Python datax全量同步mysql到hive內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能
這篇文章主要介紹了sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人
這篇文章主要介紹了使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人,使用 Python 創(chuàng)建一個(gè)簡單的基于規(guī)則的聊天機(jī)器人 聊天機(jī)器人本身是一種機(jī)器或軟件,它通過文本或句子模仿人類交互。 簡而言之,可以使用類似于與人類對話的軟件進(jìn)行聊天。2021-10-10Python PIL實(shí)現(xiàn)GIF壓縮工具
本文將結(jié)合wxPython的GUI框架和PIL(Python Imaging Library)的圖像處理能力編寫一個(gè)GIF壓縮工具,并提供了兩種壓縮方式,感興趣的小伙伴可以了解下2024-10-10Python實(shí)現(xiàn)剪刀石頭布小游戲(與電腦對戰(zhàn))
這篇文章給大家分享Python基礎(chǔ)實(shí)現(xiàn)與電腦對戰(zhàn)的剪刀石頭布小游戲,練習(xí)if while輸入和輸出,代碼簡單易懂,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2019-12-12pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法
如果你在Anaconda中創(chuàng)建了虛擬環(huán)境,但是無法找到python.exe,可能是因?yàn)樘摂M環(huán)境的Python路徑?jīng)]有添加到系統(tǒng)環(huán)境變量中,這篇文章主要給大家介紹了關(guān)于pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法,需要的朋友可以參考下2024-07-07Pandas數(shù)據(jù)分析之pandas文本處理
這篇文章主要介紹了Pandas數(shù)據(jù)分析之pandas文本處理,pandas對文本數(shù)據(jù)也有很多便捷處理方法,可以不用寫循環(huán),向量化操作運(yùn)算速度快,還可以進(jìn)行高級的正則表達(dá)式,各種復(fù)雜的邏輯篩選和匹配提取信息2022-08-08Python3.x+迅雷x 自動下載高分電影的實(shí)現(xiàn)方法
這篇文章主要介紹了Python3.x+迅雷x 自動下載高分電影的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01