Python定時(shí)從Mysql提取數(shù)據(jù)存入Redis的實(shí)現(xiàn)
設(shè)計(jì)思路:
1.程序一旦run起來,python會(huì)把mysql中最近一段時(shí)間的數(shù)據(jù)全部提取出來
2.然后實(shí)例化redis類,將數(shù)據(jù)簡單解析后逐條傳入redis隊(duì)列
3.定時(shí)器設(shè)計(jì)每天凌晨12點(diǎn)開始跑
ps:redis是個(gè)內(nèi)存數(shù)據(jù)庫,做后臺消息隊(duì)列的緩存時(shí)有很大的用處,有興趣的小伙伴可以去查看相關(guān)的文檔。
# -*- coding:utf-8 -*-
import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis
# get the data from mysql
class FromSql(object):
def __init__(self, conn):
self.conn = conn
def acquire(self):
cursor = self.conn.cursor()
try:
sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"
cursor.execute(sql)
rs = cursor.fetchall()
#print (rs)
for eve in rs:
print('%s, %s, %s, %s' % eve)
copy_rs = rs
cursor.close()
return copy_rs
except Exception as e:
print("The error: %s" % e)
class RedisQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db= redis.Redis(**redis_kwargs)
self.key = '%s:%s' %(namespace, name)
def qsize(self):
return self.__db.llen(self.key)
def put(self, item):
self.__db.rpush(self.key, item)
def get(self, block=True, timeout=None):
if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)
if item:
item = item[1]
return item
def get_nowait(self):
return self.get(False)
if __name__ == "__main__":
# connect mysqldb
conn_sql = MySQLdb.connect(
host = '127.0.0.1',
port = 3306,
user = 'root',
passwd = '',
db = 'test',
charset = 'utf8'
)
def job_for_redis():
get_data = FromSql(conn_sql)
data = get_data.acquire()
q = RedisQueue('test',host='localhost', port=6379, db=0)
for single_data in data:
for meta_data in single_data:
q.put(meta_data)
print(meta_data)
print("All data had been inserted.")
"""
try:
schedule.every().day.at("00:00").do(job_for_redis)
except Exception as e:
print('Error: %s'% e)
# finally:
# conn.close()
while True:
schedule.run_pending()
time.sleep(1)
"""
補(bǔ)充知識:python定時(shí)獲取匯率存入數(shù)據(jù)庫
python定時(shí)任務(wù):
我們可以使用 輕量級的第三方模塊schedule。首先先安裝:pip install schedule
定時(shí)任務(wù)的的小測試:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job) # 每隔10分鐘執(zhí)行一次任務(wù)
schedule.every().hour.do(job) # 每隔一小時(shí)執(zhí)行一次任務(wù)
schedule.every().day.at("10:30").do(job) # 每天10:30執(zhí)行一次任務(wù)
schedule.every(5).to(10).days.do(job) # 每5-10天執(zhí)行一次任務(wù)
schedule.every().monday.do(job) # 每周一的這個(gè)時(shí)候執(zhí)行一次任務(wù)
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15執(zhí)行一次任務(wù)
while True:
schedule.run_pending()
獲取數(shù)據(jù)存入數(shù)據(jù)庫:(格式可能不太對,還有一些符號。自己修改一下即可)
import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine
#獲取美元的所有外匯
def job():
content = '美元'
url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外匯數(shù)據(jù)地址
html = requests.get(url).content.decode('utf-8')
index = html.index('<td>' + content + '</td>')
str = html[index:index+300]
result = re.findall('<td>(.*?)</td>',str)
print("幣種:" + result[0])
print("現(xiàn)匯買入價(jià):" + result[1])
print("現(xiàn)鈔買入價(jià):" + result[2])
print("現(xiàn)匯賣出價(jià):" + result[3])
print("現(xiàn)鈔賣出價(jià):" + result[4])
print("中行結(jié)算價(jià):" + result[5])
print("發(fā)布時(shí)間:" + result[6] + ' ' + result[7])
#本地地址 數(shù)據(jù)庫賬號 密碼 數(shù)據(jù)庫名
db = pymysql.connect('localhost','root','root','pinyougoudb')
cursor = db.cursor()
#sql語句
sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])
cursor.execute(sql)
db.commit()
print('success')
# 查詢語句,將存入的數(shù)據(jù)查出來
# sqlalchemy 進(jìn)行數(shù)據(jù)庫初始化
engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
sql = '''select * from tb_money'''
# pandas 進(jìn)行數(shù)據(jù)庫讀寫
df = pandas.read_sql_query(sql,engine)
print(df)
db.commit()
# 每隔幾分中刷新一次
#schedule.every(0.1).minutes.do(job)
#每天什么時(shí)候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)
#一直循環(huán) 知道滿足條件執(zhí)行
while True:
schedule.run_pending()
以上這篇Python定時(shí)從Mysql提取數(shù)據(jù)存入Redis的實(shí)現(xiàn)就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Redis定時(shí)監(jiān)控與數(shù)據(jù)處理的實(shí)踐指南
- Redis拓展之定時(shí)消息通知實(shí)現(xiàn)詳解
- Redis?延時(shí)任務(wù)實(shí)現(xiàn)及與定時(shí)任務(wù)區(qū)別詳解
- Spring boot詳解緩存redis實(shí)現(xiàn)定時(shí)過期方法
- Redis定時(shí)任務(wù)原理的實(shí)現(xiàn)
- Spring Boot監(jiān)聽Redis Key失效事件實(shí)現(xiàn)定時(shí)任務(wù)的示例
- 基于redis實(shí)現(xiàn)定時(shí)任務(wù)的方法詳解
- Springboot使用Redis實(shí)現(xiàn)定時(shí)任務(wù)的三種方式
相關(guān)文章
使用python爬蟲實(shí)現(xiàn)抓取動(dòng)態(tài)加載數(shù)據(jù)
這篇文章主要給大家介紹了如何用python爬蟲抓取豆瓣電影“分類排行榜”中的電影數(shù)據(jù),比如輸入“犯罪”則會(huì)輸出所有犯罪影片的電影名稱、評分,文中通過代碼示例和圖文介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01
Python獲取當(dāng)前函數(shù)名稱方法實(shí)例分享
這篇文章主要介紹了Python獲取當(dāng)前函數(shù)名稱方法實(shí)例分享,具有一定借鑒價(jià)值2018-01-01
Python一行代碼對話ChatGPT實(shí)現(xiàn)詳解
這篇文章主要為大家介紹了Python一行代碼對話ChatGPT實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
Python的speech_recognition庫如何將聲音轉(zhuǎn)為文字
這篇文章主要介紹了通過Python的speech_recognition庫將聲音轉(zhuǎn)為文字,將聲音轉(zhuǎn)為文字,除了speech_recognition庫,還要依賴pyaudio庫,而且mac用戶需要安裝PortAudio,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-05-05
Python?matplotlib?繪制散點(diǎn)圖詳解建議收藏
在數(shù)據(jù)統(tǒng)計(jì)圖表中,有一種圖表是散列點(diǎn)分布在坐標(biāo)中,反應(yīng)數(shù)據(jù)隨著自變量變化的趨勢。這篇文章主要介紹了如何通過matplotlib繪制散點(diǎn)圖,需要的朋友可以參考一下2021-12-12
Python自動(dòng)化測試ConfigParser模塊讀寫配置文件
本文主要介紹Python自動(dòng)化測試,這里詳細(xì)說明了ConfigParser模塊讀寫配置文件,有興趣的小伙伴可以參考下2016-08-08
Python中線程的MQ消息隊(duì)列實(shí)現(xiàn)以及消息隊(duì)列的優(yōu)點(diǎn)解析
消息隊(duì)列(MQ,Message Queue)在消息數(shù)據(jù)傳輸中的保存作用為數(shù)據(jù)通信提供了保障和實(shí)時(shí)處理上的便利,這里我們就來看一下Python中線程的MQ消息隊(duì)列實(shí)現(xiàn)以及消息隊(duì)列的優(yōu)點(diǎn)解析2016-06-06

