亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python讀取Hive數(shù)據(jù)庫實(shí)現(xiàn)代碼詳解

 更新時(shí)間:2023年03月01日 08:33:49   作者:是豬哥不是諸葛  
這篇文章主要介紹了Python讀取Hive數(shù)據(jù)庫實(shí)現(xiàn)代碼,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

背景:

在這篇文章之前,我讀取數(shù)據(jù)庫的數(shù)據(jù)沒有形成規(guī)范,并且代碼擴(kuò)展性不好,使用率不高,而且比較混亂。數(shù)據(jù)庫信息的替換也比較混亂。壞習(xí)慣包括:連接數(shù)據(jù)庫之后就開始讀數(shù),讀完就結(jié)束,數(shù)據(jù)的存放也沒有規(guī)范,而且容易重復(fù)讀取。

現(xiàn)在將代碼分為幾層,一層是底層,就是單獨(dú)連接數(shù)據(jù)庫,在這基礎(chǔ)上封裝第二個(gè)類別,加上了線程鎖和時(shí)間表,用于確保讀數(shù)的穩(wěn)定和超時(shí)錯(cuò)誤提醒。第三層才是真正的業(yè)務(wù),第三層的類里面封裝了很多讀取不同數(shù)據(jù)表的方法,每一個(gè)方法就是讀一個(gè)表,然后將數(shù)據(jù)緩存起來,并且設(shè)置好更新數(shù)據(jù)緩存的時(shí)間(例如24小時(shí)),和維護(hù)多線程讀數(shù)。

第四層也就是簡單的調(diào)用第三層即可,然后所有的數(shù)據(jù)都可以讀取然后緩存到我們在配置項(xiàng)中指定的文件夾目錄了

實(shí)際業(yè)務(wù)讀取hive數(shù)據(jù)庫的代碼

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class HiveHelper(object):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.logger = logger
        self.impala_conn = None
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''創(chuàng)建表類代碼'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_impala_conn(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.impala_conn is None:
            self.impala_conn = connect(
                host=self.host,
                port=self.port,
                database=self.database,
                auth_mechanism=self.auth_mechanism,
                user=self.user,
                password=self.password
                )
        return self.impala_conn
    def get_engine(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
        return self.engine
    def get_cursor(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''創(chuàng)建連接或獲取連接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''關(guān)閉連接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
        self.close_impala_conn()
    def close_impala_conn(self):
        '''關(guān)閉impala連接'''
        if self.impala_conn is not None:
            self.impala_conn.close()
            self.impala_conn = None
    def close_session(self):
        '''關(guān)閉連接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''釋放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''關(guān)閉cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查詢數(shù)據(jù)'''
        conn = self.get_conn()
        data = None
        try:
            # 異常重試3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外拋出異常
                    time.sleep(60) # 一分鐘后重試
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 標(biāo)簽?zāi)夸?
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚類導(dǎo)出標(biāo)簽?zāi)夸?
        "common_datas_dir":"./hjx/data", # 共用數(shù)據(jù)目錄。ur_bi_dw的公共
        "only_predict": False, # 只識別,不訓(xùn)練
        "delete_model": True, # 先刪除模型,僅在訓(xùn)練時(shí)使用
        "export_excel": False, # 導(dǎo)出excel
        "classes": 12, # 聚類數(shù)
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class UrBiGetDatasBase():
    # 線程鎖列表,同保存路徑共用鎖
    lock_dict:Dict[str, threading.Lock] = {}
    # 時(shí)間列表,用于判斷是否超時(shí)
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于記錄是否需要更新超時(shí)時(shí)間
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = HiveHelper(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            logger=logger
            )
        # 創(chuàng)建子目錄
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas') 
    def close(self):
        '''關(guān)閉連接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''獲取是否超時(shí)'''
        # 轉(zhuǎn)靜態(tài)路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
        timeout = 12 # 12小時(shí)
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小時(shí)
        get_data_timeout = False
        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
            self.logger.info('超時(shí)%d小時(shí),重新查數(shù)據(jù):%s', timeout, key_name)
            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超時(shí)%d小時(shí),跳過查數(shù)據(jù):%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新狀態(tài)超時(shí)'''
        # 轉(zhuǎn)靜態(tài)路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''獲取鎖'''
        # 轉(zhuǎn)靜態(tài)路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in UrBiGetDatasBase.lock_dict.keys():
            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
        return UrBiGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 刪除最后下標(biāo)
        start_date = datetime.datetime(2017, 1, 1), # 開始時(shí)間
        offset = relativedelta(months=3), # 時(shí)間間隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查詢語句中替代時(shí)間參數(shù)的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查詢語句中替代時(shí)間參數(shù)的格式化
        stop_date = '20700101', # 超過時(shí)間則停止
        data_format_fun = None, # 格式化數(shù)據(jù)
        ):
        '''分時(shí)間增量讀取數(shù)據(jù)'''
        # 創(chuàng)建文件夾
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #刪除最后一個(gè)文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('刪除最后一個(gè)文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    if data_format_fun is not None:
                        data = data_format_fun(data)
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("讀取數(shù)據(jù)異常,時(shí)間超出最大值!")
            start_date = end_date
pass
class UrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_dw_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_dim_date(self):
        '''日期數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_date'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_date.date_key'])
            data.to_csv(file_path)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_shop(self):
        '''店鋪數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_shop'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_shop.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_shop.shop_no'])
            data.to_csv(file_path)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_vip(self):
        '''會(huì)員數(shù)據(jù)'''
        sub_dir = os.path.join(self.save_dir,'vip_no')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(sub_dir):
                return
            sql = '''SELECT dv.*, dd.date_key, dd.date_name2 
            FROM ur_bi_dw.dim_vip as dv
            INNER JOIN ur_bi_dw.dim_date as dd
            ON dv.card_create_date=dd.date_name2 
            where dd.date_key >= %s
            and dd.date_key < %s'''
            # data:pd.DataFrame = self.db_helper.get_data(sql)
            sort_columns = ['dv.vip_no']
            # TODO:
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 開始時(shí)間
                offset=relativedelta(years=1)
            )
            # 更新超時(shí)時(shí)間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_weather(self):
        '''天氣數(shù)據(jù)'''
        sub_dir = os.path.join(self.save_dir,'weather')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
            where weather.date_key>=%s and weather.date_key<%s
            """
            sort_columns = ['weather.date_key','weather.areaid']
            def data_format_fun(data):
                columns = list(data.columns)
                columns = {c:'weather.'+c for c in columns}
                data = data.rename(columns=columns)
                return data
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                del_index_list=[-2, -1], # 刪除最后下標(biāo)
                data_format_fun=data_format_fun,
            )
            # 更新超時(shí)時(shí)間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_weather_city(self):
        '''天氣城市數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'weather_city.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods(self):
        '''貨品數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.dim_goods'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods.'+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods_market_shop_date(self):
        '''店鋪商品生命周期數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = '''
            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
            FROM ur_bi_dw.dim_goods_market_shop_date
            where lifecycle_end_date is not null
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('lifecycle_end_date.','') for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['shop_market_date'])
            data.to_csv(file_path, index=False)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods_market_date(self):
        '''全國商品生命周期數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = '''
            select * FROM ur_bi_dw.dim_goods_market_date
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:'dim_goods_market_date.'+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values(['dim_goods_market_date.sku_no'])
            data.to_csv(file_path, index=False)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_goods_color_dev_sizes(self):
        '''商品開發(fā)碼數(shù)數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'
            sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dwd_daily_sales_size(self):
        '''實(shí)際銷售金額'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(tag_price) as `tag_price`,
                sum(sales_qty) as `sales_qty`,
                sum(sales_tag_amt) as `sales_tag_amt`,
                sum(sales_amt) as `sales_amt`,
                count(0) as `sales_count`
            from ur_bi_dw.dwd_daily_sales_size as sales
            where sales.date_key>=%s and sales.date_key<%s
                and sales.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 開始時(shí)間
            )
            # 更新超時(shí)時(shí)間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dwd_daily_delivery_size(self):
        '''實(shí)際配貨金額'''
        sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
                sum(delivery.pr_received_qty) as `pr_received_qty`,
                count(0) as `delivery_count`
            from ur_bi_dw.dwd_daily_delivery_size as delivery
            where delivery.date_key>=%s and delivery.date_key<%s
                and delivery.currency_code='CNY'
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = ['date_key','shop_no','sku_no']
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 開始時(shí)間
            )
            # 更新超時(shí)時(shí)間
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_v_last_nation_sales_status(self):
        '''商品暢滯銷數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dwd_daily_finacial_goods(self):
        '''商品成本價(jià)數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = """
            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
            inner join (
                select sku_no,`size`,max(date_key) as date_key
                from ur_bi_dw.dwd_daily_finacial_goods
                where currency_code='CNY' and country_code='CN'
                group by sku_no,`size`
            ) as t2
            on t2.sku_no=t1.sku_no
                and t2.`size`=t1.`size`
                and t2.date_key=t1.date_key
            where t1.currency_code='CNY' and t1.country_code='CN'
            """
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('t1.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_dim_size_group(self):
        '''尺碼映射數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'dim_size_group.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = """select * from ur_bi_dw.dim_size_group"""
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace('dim_size_group.','') for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
pass
def get_common_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    logger:logging.Logger=None):
    # 共用文件
    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')
    ur_bi_get_datas = UrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=common_ur_bi_dir,
        logger=logger
    )
    try:
        logger.info('正在查詢?nèi)掌跀?shù)據(jù)...')
        ur_bi_get_datas.get_dim_date()
        logger.info('查詢?nèi)掌跀?shù)據(jù)完成!')
        logger.info('正在查詢店鋪數(shù)據(jù)...')
        ur_bi_get_datas.get_dim_shop()
        logger.info('查詢店鋪數(shù)據(jù)完成!')
        logger.info('正在查詢天氣數(shù)據(jù)...')
        ur_bi_get_datas.get_weather()
        logger.info('查詢天氣數(shù)據(jù)完成!')
        logger.info('正在查詢天氣城市數(shù)據(jù)...')
        ur_bi_get_datas.get_weather_city()
        logger.info('查詢天氣城市數(shù)據(jù)完成!')
        logger.info('正在查詢貨品數(shù)據(jù)...')
        ur_bi_get_datas.get_dim_goods()
        logger.info('查詢貨品數(shù)據(jù)完成!')
        logger.info('正在查詢實(shí)際銷量數(shù)據(jù)...')
        ur_bi_get_datas.get_dwd_daily_sales_size()
        logger.info('查詢實(shí)際銷量數(shù)據(jù)完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外拋出異常
    finally:
        ur_bi_get_datas.close()
pass
class CustomUrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir='./hjx/data/ur_bi_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_sales_goal_amt(self):
        '''銷售目標(biāo)金額'''
        file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = '''
            select sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,
                dates.month_of_year,
                sum(sales_goal.sales_goal_amt) as sales_goal_amt
            from ur_bi_dw.dwd_sales_goal_west as sales_goal
            inner join ur_bi_dw.dim_date as dates
                on sales_goal.date_key = dates.date_key
            group by sales_goal.shop_no,
                if(sales_goal.serial='Y','W',sales_goal.serial),
                dates.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'sales_goal.shop_no',
                'serial':'sales_goal.serial',
                'month_of_year':'dates.month_of_year',
            })
            # 排序
            data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])
            data.to_csv(file_path)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
    def get_shop_serial_area(self):
        '''店-系列面積'''
        file_path = os.path.join(self.save_dir,'shop_serial_area.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            if not self.get_last_time(file_path):
                return
            sql = '''
            select shop_serial_area.shop_no,
                if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,
                shop_serial_area.month_of_year,
                sum(shop_serial_area.area) as `shop_serial_area.area`
            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
            where shop_serial_area.area is not null
            group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year
            '''
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'shop_no':'shop_serial_area.shop_no',
                'serial':'shop_serial_area.serial',
                'month_of_year':'shop_serial_area.month_of_year',
                'area':'shop_serial_area.area',
            })
            # 排序
            data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])
            data.to_csv(file_path)
            # 更新超時(shí)時(shí)間
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
pass
def get_datas(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger:logging.Logger=None):
    ur_bi_get_datas = CustomUrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 店,系列,品類,年月,銷售目標(biāo)金額
        logger.info('正在查詢年月銷售目標(biāo)金額數(shù)據(jù)...')
        ur_bi_get_datas.get_sales_goal_amt()
        logger.info('查詢年月銷售目標(biāo)金額數(shù)據(jù)完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外拋出異常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
    host='10.2.32.22',
    port=21051,
    database='ur_ai_dw',
    auth_mechanism='LDAP',
    user='urbi',
    password='Ur#730xd',
    save_dir='./data/sales_forecast/ur_bi_dw_data',
    logger=None
):
    get_common_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        logger=logger
    )
    get_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
# 代碼入口
# getdata_ur_bi_dw(
#     host=ur_bi_dw_host,
#     port=ur_bi_dw_port,
#     database=ur_bi_dw_database,
#     auth_mechanism=ur_bi_dw_auth_mechanism,
#     user=ur_bi_dw_user,
#     password=ur_bi_dw_password,
#     save_dir=ur_bi_dw_save_dir,
#     logger=logger
#     )

代碼說明和領(lǐng)悟

每個(gè)類的具體作用說明,代碼需要根據(jù)下面的文字說明進(jìn)行“食用”:

(第一層)HiveHelper完成了連接數(shù)據(jù)庫、關(guān)閉數(shù)據(jù)庫連接、生成事務(wù)、執(zhí)行、引擎、連接等功能

VarsHelper提供了一個(gè)簡單的持久化功能,可以將對象以文件的形式存放在磁盤上。并提供設(shè)置值、獲取值、判斷值是否存在的方法

GlobalShareArgs提供了一個(gè)字典,并且提供了獲取字典、設(shè)置字典、設(shè)置字典鍵值對、設(shè)置字典鍵的值、判斷鍵是否在字典中、更新字典等方法

ShareArgs跟GlobalShareArgs類似,只是一開始字典的初始化的鍵值對比較多

(第二層)UrBiGetDataBase類,提供了線程鎖字典、時(shí)間字典、超時(shí)判斷字典,都是類變量;使用了HiveHelper類,但注意,不是繼承。在具體的sql讀數(shù)時(shí),提供了線程固定和時(shí)間判斷

(第三層)UrBiGetDatas類,獲取hive數(shù)據(jù)庫那邊的日期數(shù)據(jù)、店鋪數(shù)據(jù)、會(huì)員數(shù)據(jù)、天氣數(shù)據(jù)、天氣城市數(shù)據(jù)、商品數(shù)據(jù)、店鋪生命周期數(shù)據(jù)、全國商品生命周期數(shù)據(jù)、商品開發(fā)碼數(shù)數(shù)據(jù)、實(shí)際銷售金額、實(shí)際配貨金額、商品暢滯銷數(shù)據(jù)、商品成本價(jià)數(shù)據(jù)、尺碼映射數(shù)據(jù)等。

(第四層)get_common_data函數(shù),使用URBiGetData類讀取日期、店鋪、天氣、天氣城市、貨品、實(shí)際銷量數(shù)據(jù),并緩存到文件夾./yongjian/data/ur_bi_data下面

CustomUrBiGetData類,繼承了UrBiGetDatasBase類,讀取銷售目標(biāo)金額、點(diǎn)系列面積數(shù)據(jù)。

(這個(gè)也是第四層)get_datas函數(shù),通過CustomUrBiGetData類,讀取年月銷售目標(biāo)金額。

總的函數(shù):(這個(gè)是總的調(diào)用入口函數(shù))get_data_ur_bi_dw函數(shù),調(diào)用了get_common_data和get_datas函數(shù)進(jìn)行讀取數(shù)據(jù),然后將數(shù)據(jù)保存到某個(gè)文件夾目錄下面。

舉一反三,如果你不是hive數(shù)據(jù)庫,你可以將第一層這個(gè)底層更換成mysql。主頁有解釋如果進(jìn)行更換。第二層不需要改變,第三層就是你想要進(jìn)行讀取的數(shù)據(jù)表,不同的數(shù)據(jù)庫你想要讀取的數(shù)據(jù)表也不同,所以sql需要你在這里寫,套用里面的方法即可,基本上就是修改sql就好了。

這種方法的好處在于,數(shù)據(jù)不會(huì)重復(fù)讀取,并且讀取的數(shù)據(jù)都可以得到高效的使用。

后續(xù)附上修改成mysql的一個(gè)例子代碼

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class MySqlHelper(object):
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='7cmoP3QDtueVJQj2q4Az',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.logger = logger
        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(
            self.user, self.password, self.host, self.port, self.database
        )
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''創(chuàng)建表類代碼'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_engine(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine(self.connection_str)
        return self.engine
    def get_cursor(self):
        '''創(chuàng)建連接或獲取連接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''創(chuàng)建連接或獲取連接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''關(guān)閉連接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
    def close_session(self):
        '''關(guān)閉連接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''釋放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''關(guān)閉cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查詢數(shù)據(jù)'''
        conn = self.get_conn()
        data = None
        try:
            # 異常重試3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外拋出異常
                    time.sleep(60) # 一分鐘后重試
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 標(biāo)簽?zāi)夸?
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚類導(dǎo)出標(biāo)簽?zāi)夸?
        "common_datas_dir":"./hjx/data", # 共用數(shù)據(jù)目錄。ur_bi_dw的公共
        "only_predict": False, # 只識別,不訓(xùn)練
        "delete_model": True, # 先刪除模型,僅在訓(xùn)練時(shí)使用
        "export_excel": False, # 導(dǎo)出excel
        "classes": 12, # 聚類數(shù)
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class IMSGetDatasBase():
    # 線程鎖列表,同保存路徑共用鎖
    lock_dict:Dict[str, threading.Lock] = {}
    # 時(shí)間列表,用于判斷是否超時(shí)
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于記錄是否需要更新超時(shí)時(shí)間
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='192.168.15.144',
        port=3306,
        database='test_ims',
        user='spkjz_writer',
        password='Ur#7cmoP3QDtueVJQj2q4Az',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = MySqlHelper(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            logger=logger
            )
        # 創(chuàng)建子目錄
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超時(shí)時(shí)間保存到文件,注釋該行即可停掉,只用于調(diào)試
    def close(self):
        '''關(guān)閉連接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''獲取是否超時(shí)'''
        # 轉(zhuǎn)靜態(tài)路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):
            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')
        timeout = 12 # 12小時(shí)
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小時(shí)
        get_data_timeout = False
        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
            self.logger.info('超時(shí)%d小時(shí),重新查數(shù)據(jù):%s', timeout, key_name)
            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超時(shí)%d小時(shí),跳過查數(shù)據(jù):%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)
        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新狀態(tài)超時(shí)'''
        # 轉(zhuǎn)靜態(tài)路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if IMSGetDatasBase.get_data_timeout_dict[key_name]:
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''獲取鎖'''
        # 轉(zhuǎn)靜態(tài)路徑,確保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in IMSGetDatasBase.lock_dict.keys():
            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
        return IMSGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 刪除最后下標(biāo)
        start_date = datetime.datetime(2017, 1, 1), # 開始時(shí)間
        offset = relativedelta(months=3), # 時(shí)間間隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查詢語句中替代時(shí)間參數(shù)的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查詢語句中替代時(shí)間參數(shù)的格式化
        stop_date = '20700101', # 超過時(shí)間則停止
        ):
        '''分時(shí)間增量讀取數(shù)據(jù)'''
        # 創(chuàng)建文件夾
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #刪除最后一個(gè)文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('刪除最后一個(gè)文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("讀取數(shù)據(jù)異常,時(shí)間超出最大值!")
            start_date = end_date
pass
class CustomIMSGetDatas(IMSGetDatasBase):
    def __init__(
        self,
        host='192.168.13.134',
        port=4000,
        database='test_ims',
        user='root',
        password='rootimmsadmin',
        save_dir='./hjx/data/export_ims_data',
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_ims_w_amt_pro(self):
        '''年月系列占比數(shù)據(jù)'''
        file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加鎖
        try:
            # 設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
            # if not self.get_last_time(file_path):
            #     return
            sql = 'SELECT * FROM ims_w_amt_pro'
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                'serial_forecast_proportion': 'forecast_proportion',
            })
            data.to_csv(file_path)
            # # 更新超時(shí)時(shí)間
            # self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外拋出異常
        finally:
            now_lock.release() # 釋放鎖
pass
def get_datas(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    ur_bi_get_datas = CustomIMSGetDatas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 年月系列占比數(shù)據(jù)
        logger.info('正在查詢年月系列占比數(shù)據(jù)...')
        ur_bi_get_datas.get_ims_w_amt_pro()
        logger.info('查詢年月系列占比數(shù)據(jù)完成!')
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外拋出異常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_export_ims(
    host='192.168.13.134',
    port=4000,
    database='test_ims',
    user='root',
    password='rootimmsadmin',
    save_dir='./hjx/data/export_ims_data',
    logger:logging.Logger=None
    ):
    get_datas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass

到此這篇關(guān)于Python讀取Hive數(shù)據(jù)庫實(shí)現(xiàn)代碼詳解的文章就介紹到這了,更多相關(guān)Python讀取Hive數(shù)據(jù)庫內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Python celery原理及運(yùn)行流程解析

    Python celery原理及運(yùn)行流程解析

    這篇文章主要介紹了Python celery原理及運(yùn)行流程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • python 切片和range()用法說明

    python 切片和range()用法說明

    首先需要明白,可迭代對象,按照正數(shù)索引(正序)是從0開始的,按照負(fù)數(shù)索引(逆序)是從-1開始的
    2013-03-03
  • Django 項(xiàng)目重命名的實(shí)現(xiàn)步驟解析

    Django 項(xiàng)目重命名的實(shí)現(xiàn)步驟解析

    這篇文章主要介紹了Django 項(xiàng)目重命名的實(shí)現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-08-08
  • Pytest命令行選項(xiàng)的具體使用

    Pytest命令行選項(xiàng)的具體使用

    pytest是一個(gè)流行的Python測試框架,它提供了許多命令行選項(xiàng),本文主要介紹了Pytest命令行選項(xiàng)的具體使用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-11-11
  • 五個(gè)Pandas?實(shí)戰(zhàn)案例帶你分析操作數(shù)據(jù)

    五個(gè)Pandas?實(shí)戰(zhàn)案例帶你分析操作數(shù)據(jù)

    pandas是基于NumPy的一種工具,該工具是為了解決數(shù)據(jù)分析任務(wù)而創(chuàng)建的。Pandas納入了大量庫和一些標(biāo)準(zhǔn)的數(shù)據(jù)模型,提供了高效操作大型數(shù)據(jù)集的工具。pandas提供大量快速便捷地處理數(shù)據(jù)的函數(shù)和方法。你很快就會(huì)發(fā)現(xiàn),它是使Python強(qiáng)大而高效的數(shù)據(jù)分析環(huán)境的重要因素之一
    2022-01-01
  • Python實(shí)現(xiàn)按逗號分隔列表的方法

    Python實(shí)現(xiàn)按逗號分隔列表的方法

    今天小編就為大家分享一篇Python實(shí)現(xiàn)按逗號分隔列表的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-10-10
  • Python制作微信機(jī)器人教程詳解

    Python制作微信機(jī)器人教程詳解

    這篇文章主要介紹了Python如何實(shí)現(xiàn)微信機(jī)器人,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2021-12-12
  • Python實(shí)現(xiàn)聊天機(jī)器人的示例代碼

    Python實(shí)現(xiàn)聊天機(jī)器人的示例代碼

    這篇文章主要介紹了Python實(shí)現(xiàn)聊天機(jī)器人,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-07-07
  • 使用Python通過win32 COM打開Excel并添加Sheet的方法

    使用Python通過win32 COM打開Excel并添加Sheet的方法

    今天小編就為大家分享一篇使用Python通過win32 COM打開Excel并添加Sheet的方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-05-05
  • 基于python繪制科赫雪花

    基于python繪制科赫雪花

    這篇文章主要為大家詳細(xì)介紹了基于python繪制科赫雪花,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-06-06

最新評論