亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python腳本實(shí)現(xiàn)datax全量同步mysql到hive

 更新時(shí)間:2024年10月23日 10:28:48   作者:大數(shù)據(jù)編程之光  
這篇文章主要和大家分享一下mysql全量同步到hive自動生成json文件的python腳本,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參加一下

前言

在我們構(gòu)建離線數(shù)倉時(shí)或者遷移數(shù)據(jù)時(shí),通常選用sqoop和datax等工具進(jìn)行操作,sqoop和datax各有優(yōu)點(diǎn),datax優(yōu)點(diǎn)也很明顯,基于內(nèi)存,所以速度上很快,那么在進(jìn)行全量同步時(shí)編寫json文件是一項(xiàng)很繁瑣的事,是否可以編寫腳本來把繁瑣事來簡單化,接下來我將分享這樣一個(gè)mysql全量同步到hive自動生成json文件的python腳本。

一、展示腳本

# coding=utf-8
import json
import getopt
import os
import sys
import pymysql

# MySQL 相關(guān)配置,需根據(jù)實(shí)際情況作出修改
mysql_host = "XXXXXX"
mysql_port = "XXXX"
mysql_user = "XXX"
mysql_passwd = "XXXXXX"

# HDFS NameNode 相關(guān)配置,需根據(jù)實(shí)際情況作出修改
hdfs_nn_host = "XXXXXX"
hdfs_nn_port = "XXXX"

# 生成配置文件的目標(biāo)路徑,可根據(jù)實(shí)際情況作出修改
output_path = "/XXX/XXX/XXX"


def get_connection():
    return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return list(map(lambda x: x[0], get_mysql_meta(database, table)))


def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))


def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

二、使用準(zhǔn)備

1、安裝python環(huán)境

這里我安裝的是python3環(huán)境

sudo yum install -y python3

2、安裝EPEL

EPEL(Extra Packages for Enterprise Linux)是一個(gè)由 Fedora Special Interest Group 維護(hù)的軟件倉庫,提供了大量在官方 RHEL 或 CentOS 軟件倉庫中沒有的軟件包。當(dāng)你在 CentOS 或 RHEL 系統(tǒng)上需要安裝一些不在官方軟件倉庫中的軟件時(shí),通常會先安裝epel - release

sudo yum install -y epel-release

3、安裝腳本執(zhí)行需要的第三方模塊

pip3 install pymysql
pip3 install cryptography

這里可能由于斑紋問題cryptography安裝不上去更新一下pip和setuptools

pip3 install --upgrade pip
pip3 install --upgrade setuptools

重新安裝cryptography

pip3 install cryptography

三、腳本使用方法

1、配置腳本

首先根據(jù)自己服務(wù)器修改腳本相關(guān)配置

2、創(chuàng)建.py文件

vim /xxx/xxx/xxx/gen_import_config.py

3、執(zhí)行腳本

python3 /腳本路徑/gen_import_config.py -d 數(shù)據(jù)庫名 -t 表名

4、測試生成json文件是否可用

datax.py -p"-Dtargetdir=/表在hdfs存放路徑" /生成的json文件路徑

執(zhí)行時(shí)首先要確保targetdir目標(biāo)地址在hdfs上存在,如果沒有需要?jiǎng)?chuàng)建后再次執(zhí)行

到此這篇關(guān)于Python腳本實(shí)現(xiàn)datax全量同步mysql到hive的文章就介紹到這了,更多相關(guān)Python datax全量同步mysql到hive內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python驗(yàn)證碼圖片處理(二值化)

    python驗(yàn)證碼圖片處理(二值化)

    這篇文章主要介紹了python驗(yàn)證碼圖片處理(二值化),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • 詳解sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能

    詳解sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能

    這篇文章主要介紹了sklearn?Preprocessing?數(shù)據(jù)預(yù)處理功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-08-08
  • Python利用prettytable庫輸出好看的表格

    Python利用prettytable庫輸出好看的表格

    prettytable庫就是這么一個(gè)工具,prettytable可以打印出美觀的表格,并且對中文支持相當(dāng)好。本文將介紹如何通過prettytable輸出好看的表格,需要的可以參考一下
    2022-01-01
  • 使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人

    使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人

    這篇文章主要介紹了使用 Python 創(chuàng)建一個(gè)基于規(guī)則的聊天機(jī)器人,使用 Python 創(chuàng)建一個(gè)簡單的基于規(guī)則的聊天機(jī)器人 聊天機(jī)器人本身是一種機(jī)器或軟件,它通過文本或句子模仿人類交互。 簡而言之,可以使用類似于與人類對話的軟件進(jìn)行聊天。
    2021-10-10
  • Python PIL實(shí)現(xiàn)GIF壓縮工具

    Python PIL實(shí)現(xiàn)GIF壓縮工具

    本文將結(jié)合wxPython的GUI框架和PIL(Python Imaging Library)的圖像處理能力編寫一個(gè)GIF壓縮工具,并提供了兩種壓縮方式,感興趣的小伙伴可以了解下
    2024-10-10
  • maven沖突問題解決

    maven沖突問題解決

    這篇文章主要介紹了maven沖突問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Python實(shí)現(xiàn)剪刀石頭布小游戲(與電腦對戰(zhàn))

    Python實(shí)現(xiàn)剪刀石頭布小游戲(與電腦對戰(zhàn))

    這篇文章給大家分享Python基礎(chǔ)實(shí)現(xiàn)與電腦對戰(zhàn)的剪刀石頭布小游戲,練習(xí)if while輸入和輸出,代碼簡單易懂,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2019-12-12
  • pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法

    pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法

    如果你在Anaconda中創(chuàng)建了虛擬環(huán)境,但是無法找到python.exe,可能是因?yàn)樘摂M環(huán)境的Python路徑?jīng)]有添加到系統(tǒng)環(huán)境變量中,這篇文章主要給大家介紹了關(guān)于pycharm配置anaconda環(huán)境時(shí)找不到python.exe的兩種解決辦法,需要的朋友可以參考下
    2024-07-07
  • Pandas數(shù)據(jù)分析之pandas文本處理

    Pandas數(shù)據(jù)分析之pandas文本處理

    這篇文章主要介紹了Pandas數(shù)據(jù)分析之pandas文本處理,pandas對文本數(shù)據(jù)也有很多便捷處理方法,可以不用寫循環(huán),向量化操作運(yùn)算速度快,還可以進(jìn)行高級的正則表達(dá)式,各種復(fù)雜的邏輯篩選和匹配提取信息
    2022-08-08
  • Python3.x+迅雷x 自動下載高分電影的實(shí)現(xiàn)方法

    Python3.x+迅雷x 自動下載高分電影的實(shí)現(xiàn)方法

    這篇文章主要介紹了Python3.x+迅雷x 自動下載高分電影的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-01-01

最新評論