亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python實現數據庫表的監(jiān)控警告的項目實踐

 更新時間:2024年05月24日 10:01:37   作者:Taerge0110  
本文主要介紹了使用Python 實現對數據庫表的監(jiān)控告警功能, 并將告警信息通過釘釘機器人發(fā)送到釘釘群,具有一定的參考價值,感興趣的可以了解一下

簡介

使用Python 實現對數據庫表的監(jiān)控告警功能, 并將告警信息通過釘釘機器人發(fā)送到釘釘群

實現DataWorks中數據質量的基本功能, 當然 DW的數據質量的規(guī)則類型很多, 用起來比較方便, 這里只簡單實現了其中兩個規(guī)則類型的功能, 僅供參考;

初次使用Python, 請多指教

使用工具: MaxCompute

1. 創(chuàng)建表

1. tmp_monitor_tbl_info

CREATE TABLE IF NOT EXISTS puture_bigdata.tmp_monitor_tbl_info (
      `id`					STRING COMMENT '表編號id'
	, `tbl_name`			STRING COMMENT '表名'
	, `pt_format`			STRING COMMENT '分區(qū)格式: yyyy-MM-dd,yyyyMMdd 等'
	, `val_type`			STRING COMMENT '值類型: 表行數,周期值等'
    , `monitor_flag` 		int COMMENT '監(jiān)控標識: 0:不監(jiān)控, 1:監(jiān)控;'
    , `rule_code` 			int COMMENT '規(guī)則編碼: 1:表行數,上周期差值, 2:表行數,固定值 等'
    , `rule_type`			STRING COMMENT '規(guī)則類型: 表行數,上周期差值; 表行數,固定值; 與固定值比較 等'
    , `expect_val` 			int COMMENT '期望值'
    , `tbl_sort_code`       int COMMENT '表類型編碼: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數據 等'
    , `tbl_sort_name`       STRING COMMENT '表類型名字: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數據 等'
    , `pt_num`				INT COMMENT '分區(qū)日期差值'
) COMMENT '數據監(jiān)控表信息' 
tblproperties ("transactional"="true") 
;
-- 插入數據
INSERT INTO TABLE puture_bigdata_dev.tmp_monitor_tbl_info
SELECT * FROM (
  VALUES  (1 , 'ods_amazon_amz_customer_returns_df',              'yyyyMMdd', '表行數', 1, 1, '表行數,上周期差值', 0,      1, '亞馬遜' , -1)     
        , (2 , 'ods_amazon_amz_flat_file_all_orders_df',          'yyyyMMdd', '表行數', 1, 1, '表行數,上周期差值', 0,      1, '亞馬遜' , -1)         
        , (3 , 'dim_sys_salesman_info_df',                        'yyyyMMdd', '表行數', 1, 1, '表行數,上周期差值', 0,      0, '其它' , -1)  
) AS table_name(id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val, tbl_sort_code, tbl_sort_name, pt_num) ;

2. tmp_monitor_tbl_info_log_di

CREATE TABLE IF NOT EXISTS puture_bigdata_dev.tmp_monitor_tbl_info_log_di (
	  `id`					STRING COMMENT '監(jiān)控id編碼:md5(表名_分區(qū))_小時'
	, `tbl_name`			STRING COMMENT '表名'
	, `stat_time`			STRING COMMENT '統(tǒng)計時間'
	, `pt_format`			STRING COMMENT '分區(qū)格式: yyyy-MM-dd,yyyyMMdd 等'
	, `stat_pt`				STRING COMMENT '統(tǒng)計分區(qū)'
	, `val_type`			STRING COMMENT '值類型: 表行數,周期值等'
    , `val` 				int COMMENT '統(tǒng)計值'
    , `rule_code` 			int COMMENT '規(guī)則編碼: 1:表行數,上周期差值, 2:表行數,固定值 等'
    , `rule_type`			STRING COMMENT '規(guī)則類型: 表行數,上周期差值; 表行數,固定值; 與固定值比較 等'
    , `expect_val` 			int COMMENT '期望值'
    , `is_exc` 				int COMMENT '是否異常: 0:否,1:是,默認值0'
    , `tbl_sort_code`       int COMMENT '表類型編碼: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數據 等'
    , `tbl_sort_name`       STRING COMMENT '表類型名字: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數據 等'
) COMMENT '數據監(jiān)控信息記錄表'
PARTITIONED BY (pt STRING COMMENT '數據日期, yyyy-MM-dd') ;

2. 程序開發(fā)

1. 數據檢查程序

'''PyODPS 3
請確保不要使用從 MaxCompute下載數據來處理。下載數據操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推薦使用 PyODPS DataFrame(從 MaxCompute 表創(chuàng)建)和MaxCompute SQL來處理數據。
更詳細的內容可以參考:https://help.aliyun.com/document_detail/90481.html
'''

import os
from odps import ODPS, DataFrame
from datetime import datetime, timedelta
from dateutil import parser
options.tunnel.use_instance_tunnel = True

# 獲取當前時間
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now_time)
pt = args['date']
print(pt)
date = datetime.strptime(pt, "%Y-%m-%d") 

# 監(jiān)控表列表 tbl_sort_code -> 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數據
sql_tbl_info = """
SELECT * FROM puture_bigdata.tmp_monitor_tbl_info
WHERE monitor_flag = 1 AND tbl_sort_code = 3
"""

# 結果表
res_tbl_name = "puture_bigdata.tmp_monitor_tbl_info_log_di"

# 統(tǒng)計sql代碼 -- 表行數,上周期差值
def sql_upper_period_diff():
    sql = f"""
    set odps.sql.hive.compatible=true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          a.id
        , a.tbl_name
        , a.stat_time
        , a.pt_format
        , a.stat_pt
        , a.val_type
        , a.val
        , a.rule_code
        , a.rule_type
        , a.expect_val
        , IF (a.val = 0, 1, (IF ((a.val - NVL(b.val,0)) >= {expect_val}, 0, 1 ))) AS is_exc
        , a.tbl_sort_code
        , a.tbl_sort_name 
    FROM (
        SELECT 
              concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
            , '{tbl_name}' AS tbl_name
            , '{now_time}' AS stat_time
            , '{pt_format}' AS pt_format
            , date_format('{date_str}' ,'{pt_format}') AS stat_pt
            , '{val_type}' AS val_type
            , COUNT(1) AS val 
            , '{rule_code}' AS rule_code
            , '{rule_type}' AS rule_type
            , {expect_val} AS expect_val
            , {tbl_sort_code} AS tbl_sort_code
            , '{tbl_sort_name}' AS tbl_sort_name
        FROM puture_bigdata.{tbl_name}
        WHERE pt = date_format('{date_str}' ,'{pt_format}')
    ) a 
    LEFT JOIN 
    (
        SELECT tbl_name, val FROM (
            SELECT tbl_name, val
                , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn 
            FROM {res_tbl_name}
            WHERE pt = DATE_ADD('{date_str}', -1)
        ) WHERE rn = 1
    ) b
    ON a.tbl_name = b.tbl_name
    ;
    """
    return sql

# 表行數, 固定值
def sql_line_fixed_val():
    sql = f"""
    set odps.sql.hive.compatible=true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
        , '{tbl_name}' AS tbl_name
        , '{now_time}' AS stat_time
        , '{pt_format}' AS pt_format
        , date_format('{date_str}' ,'{pt_format}') AS stat_pt
        , '{val_type}' AS val_type
        , COUNT(1) AS val 
        , '{rule_code}' AS rule_code
        , '{rule_type}' AS rule_type
        , {expect_val} AS expect_val
        , IF (COUNT(1) >= {expect_val}, 0, 1 ) AS is_exc
        , {tbl_sort_code} AS tbl_sort_code
        , '{tbl_sort_name}' AS tbl_sort_name
    FROM puture_bigdata.{tbl_name}
    WHERE pt = date_format('{date_str}' ,'{pt_format}') ;
    """
    return sql

# 執(zhí)行監(jiān)控統(tǒng)計代碼
def ex_monitor(sql: str):
    try :
        # print (sql)
        o.execute_sql(sql, hints={'odps.sql.hive.compatible': True , "odps.sql.submit.mode":"script"})
        print("{}: 運行成功".format(tbl_name) )
    except Exception as e:
        print('{}: 運行異常 ======> '.format(tbl_name) + str(e))


if __name__ == '__main__':
    try :
        with o.execute_sql(sql_tbl_info, hints={'odps.sql.hive.compatible': True}).open_reader() as reader:

            for row_record in reader:
                # print(row_record) # 打印一條數據值
                tbl_name = row_record.tbl_name
                pt_format = row_record.pt_format
                val_type = row_record.val_type
                monitor_flag = row_record.monitor_flag
                rule_code = row_record.rule_code
                rule_type = row_record.rule_type
                expect_val = row_record.expect_val
                tbl_sort_code = row_record.tbl_sort_code
                tbl_sort_name = row_record.tbl_sort_name
                pt_num = row_record.pt_num
                date_str = (date + timedelta(days=pt_num)).strftime('%Y-%m-%d')
                
                if rule_code == 1 :
                    ex_monitor(sql_upper_period_diff())
                elif rule_code == 2 :
                    ex_monitor(sql_line_fixed_val())
                else :
                    print("未知規(guī)則!!!")
                           
    except Exception as e:
        print('異常 ======> ' + str(e))

2. 告警信息推送程序

'''PyODPS 3
請確保不要使用從 MaxCompute下載數據來處理。下載數據操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推薦使用 PyODPS DataFrame(從 MaxCompute 表創(chuàng)建)和MaxCompute SQL來處理數據。
更詳細的內容可以參考:https://help.aliyun.com/document_detail/90481.html
'''

import json
import requests 
from datetime import datetime
import os
from odps import ODPS, DataFrame

date_str = args['date']

# 接口地址和token信息
url = 'https://oapi.dingtalk.com/robot/send?access_token=***********************'

now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print (now_time)

sql_query = f"""
SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc
FROM (
    SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc
        , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC) AS rn 
    FROM puture_bigdata_dev.tmp_monitor_tbl_info_log_di 
    WHERE pt = '{date_str}' 
         AND tbl_sort_code = 1 -- 表種類
) a
WHERE rn = 1 AND is_exc = 1 
"""

# 釘釘機器人,發(fā)送消息
def dd_robot(url:str, content: str):
  HEADERS = {"Content-Type": "application/json;charset=utf-8"}
  #content里面要設置關鍵字
  data_info = {
    "msgtype": "text",
    "text": {
    "content": content
    },
    "isAtAll": False
    #這是配置需要@的人
     # ,"at": {"atMobiles": ["15xxxxxx06",'18xxxxxx1']}
  }
  value = json.dumps(data_info)
  response = requests.post(url,data=value,headers=HEADERS)
  if response.json()['errmsg']!='ok':
    print(response.text)

# 主函數
if __name__ == '__main__': # py3可以省略
    try :
        with o.execute_sql(sql_query, hints={'odps.sql.hive.compatible': True}).open_reader() as reader:
            result_rows = list(reader) # 讀取所有的結果行
            result_count = len(result_rows) # 獲取結果條數
            #print("結果條數:", result_count) # 打印結果條數

            if result_count > 0 :
                for row in result_rows:
                    tbl_name = row.tbl_name
                    stat_time = row.stat_time
                    stat_pt = row.stat_pt
                    val_type = row.val_type
                    val = row.val
                    rule_type = row.rule_type
                    expect_val = row.expect_val
                    #print (tbl_name)
                    content = "數據質量(DQC)校驗告警 \n  "
                    content = content + "【對象名稱】:" + tbl_name + " \n  "
                    content = content + "【實際分區(qū)】:pt=" + stat_pt + " \n  "
                    content = content + "【觸發(fā)規(guī)則】: " + rule_type + " | 當前樣本值: " + val + " | 閾值: " + expect_val + " \n  "
                    content = content + now_time  + " \n  "
                    dd_robot(url, content)
            else :
                print ("無異常情況;")
    except Exception as e:
        print ('異常 ========>' + str(e) )

3. 告警樣例

數據質量(DQC)校驗告警 
  【對象名稱】:dws_amazon_market_sales_stat_di 
  【實際分區(qū)】:pt=20240103 
  【觸發(fā)規(guī)則】: 表行數,固定值 | 當前樣本值: 617 | 閾值: 650 
  2024-01-04 02:54:44 

到此這篇關于Python實現數據庫表的監(jiān)控警告的項目實踐的文章就介紹到這了,更多相關Python 數據庫表監(jiān)控警告內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Python使用pymupdf實現PDF加密

    Python使用pymupdf實現PDF加密

    這篇文章主要介紹了如何使用 Python 和 wxPython 庫創(chuàng)建一個簡單的圖形用戶界面(GUI)應用程序,用于對 PDF 文件進行加密,感興趣的小伙伴可以了解下
    2023-08-08
  • selenium切換標簽頁解決get超時問題的完整代碼

    selenium切換標簽頁解決get超時問題的完整代碼

    這篇文章主要給大家介紹了關于selenium切換標簽頁解決get超時問題的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-08-08
  • python開根號實例講解

    python開根號實例講解

    在本篇文章里小編給大家整理的是關于python開根號實例講解內容,有需要的朋友們可以參考下。
    2020-08-08
  • 在Python中字典根據多項規(guī)則排序的方法

    在Python中字典根據多項規(guī)則排序的方法

    今天小編就為大家分享一篇在Python中字典根據多項規(guī)則排序的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-01-01
  • Python實現Windows上氣泡提醒效果的方法

    Python實現Windows上氣泡提醒效果的方法

    這篇文章主要介紹了Python實現Windows上氣泡提醒效果的方法,涉及Python針對windows窗口操作的相關技巧,需要的朋友可以參考下
    2015-06-06
  • pycharm 使用心得(五)斷點調試

    pycharm 使用心得(五)斷點調試

    PyCharm 作為IDE,斷點調試是必須有的功能。否則,我們還真不如用純編輯器寫的快。
    2014-06-06
  • Python 可視化神器Plotly詳解

    Python 可視化神器Plotly詳解

    這篇文章主要介紹了Python 可視化神器Plotly詳解,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12
  • 用Python輸出一個楊輝三角的例子

    用Python輸出一個楊輝三角的例子

    這篇文章主要介紹了用Python和erlang輸出一個楊輝三角的例子,同時還提供了一個erlang版楊輝三角,需要的朋友可以參考下
    2014-06-06
  • OpenCV實現從灰度圖像切出Mask前景區(qū)域

    OpenCV實現從灰度圖像切出Mask前景區(qū)域

    本文主要介紹了如何利用OpenCV實現從灰度圖像,根據閾值,切出多個前景區(qū)域,過濾面積太小的圖像。文中的示例代碼講解詳細,需要的可以參考一下
    2022-06-06
  • Python實現服務端渲染SSR的示例代碼

    Python實現服務端渲染SSR的示例代碼

    服務端渲染是一種常見的技術策略,特別是在需要改善網站的搜索引擎優(yōu)化(SEO)和首屏加載時間的場景下,本文將介紹如何利用?Python?實現?SSR,感興趣的可以了解下
    2024-02-02

最新評論