Python使用Redis實現(xiàn)作業(yè)調(diào)度系統(tǒng)(超簡單)
概述
Redis是一個開源,先進的key-value存儲,并用于構(gòu)建高性能,可擴展的Web應(yīng)用程序的完美解決方案。
Redis從它的許多競爭繼承來的三個主要特點:
Redis數(shù)據(jù)庫完全在內(nèi)存中,使用磁盤僅用于持久性。
相比許多鍵值數(shù)據(jù)存儲,Redis擁有一套較為豐富的數(shù)據(jù)類型。
Redis可以將數(shù)據(jù)復(fù)制到任意數(shù)量的從服務(wù)器。
Redis 優(yōu)勢
異??焖伲篟edis的速度非常快,每秒能執(zhí)行約11萬集合,每秒約81000+條記錄。
支持豐富的數(shù)據(jù)類型:Redis支持最大多數(shù)開發(fā)人員已經(jīng)知道像列表,集合,有序集合,散列數(shù)據(jù)類型。這使得它非常容易解決各種各樣的問題,因為我們知道哪些問題是可以處理通過它的數(shù)據(jù)類型更好。
操作都是原子性:所有Redis操作是原子的,這保證了如果兩個客戶端同時訪問的Redis服務(wù)器將獲得更新后的值。
多功能實用工具:Redis是一個多實用的工具,可以在多個用例如緩存,消息,隊列使用(Redis原生支持發(fā)布/訂閱),任何短暫的數(shù)據(jù),應(yīng)用程序,如Web應(yīng)用程序會話,網(wǎng)頁命中計數(shù)等。
步入主題:
Redis作為內(nèi)存數(shù)據(jù)庫的一個典型代表,已經(jīng)在很多應(yīng)用場景中被使用,這里僅就Redis的pub/sub功能來說說怎樣通過此功能來實現(xiàn)一個簡單的作業(yè)調(diào)度系統(tǒng)。這里只是想展現(xiàn)一個簡單的想法,所以還是有很多需要考慮的東西沒有包括在這個例子中,比如錯誤處理,持久化等。
下面是實現(xiàn)上的想法
MyMaster:集群的master節(jié)點程序,負責(zé)產(chǎn)生作業(yè),派發(fā)作業(yè)和獲取執(zhí)行結(jié)果。
MySlave:集群的計算節(jié)點程序,每個計算節(jié)點一個,負責(zé)獲取作業(yè)并運行,并將結(jié)果發(fā)送會master節(jié)點。
channel CHANNEL_DISPATCH:每個slave節(jié)點訂閱一個channel,比如“CHANNEL_DISPATCH_[idx或機器名]”,master會向此channel中publish被dispatch的作業(yè)。
channel CHANNEL_RESULT:用來保存作業(yè)結(jié)果的channel,master和slave共享此channel,master訂閱此channel來獲取作業(yè)運行結(jié)果,每個slave負責(zé)將作業(yè)執(zhí)行結(jié)果發(fā)布到此channel中。
Master代碼
#!/usr/bin/env python # -*- coding: utf-8 -*- import time import threading import random import redis REDIS_HOST = 'localhost' REDIS_PORT = 6379 REDIS_DB = 0 CHANNEL_DISPATCH = 'CHANNEL_DISPATCH' CHANNEL_RESULT = 'CHANNEL_RESULT' class MyMaster(): def __init__(self): pass def start(self): MyServerResultHandleThread().start() MyServerDispatchThread().start() class MyServerDispatchThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) for i in range(1, 100): channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3)) print("Dispatch job %s to %s" % (str(i), channel)) ret = r.publish(channel, str(i)) if ret == 0: print("Dispatch job %s failed." % str(i)) time.sleep(5) class MyServerResultHandleThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) p = r.pubsub() p.subscribe(CHANNEL_RESULT) for message in p.listen(): if message['type'] != 'message': continue print("Received finished job %s" % message['data']) if __name__ == "__main__": MyMaster().start() time.sleep(10000)
說明
MyMaster類 - master主程序,用來啟動dispatch和resulthandler的線程
MyServerDispatchThread類 - 派發(fā)作業(yè)線程,產(chǎn)生作業(yè)并派發(fā)到計算節(jié)點
MyServerResultHandleThread類 - 作業(yè)運行結(jié)果處理線程,從channel里獲取作業(yè)結(jié)果并顯示
Slave代碼
#!/usr/bin/env python # -*- coding: utf-8 -*- from datetime import datetime import time import threading import random import redis REDIS_HOST = 'localhost' REDIS_PORT = 6379 REDIS_DB = 0 CHANNEL_DISPATCH = 'CHANNEL_DISPATCH' CHANNEL_RESULT = 'CHANNEL_RESULT' class MySlave(): def __init__(self): pass def start(self): for i in range(1, 4): MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start() class MyJobWorkerThread(threading.Thread): def __init__(self, channel): threading.Thread.__init__(self) self.channel = channel def run(self): r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) p = r.pubsub() p.subscribe(self.channel) for message in p.listen(): if message['type'] != 'message': continue print("%s: Received dispatched job %s " % (self.channel, message['data'])) print("%s: Run dispatched job %s " % (self.channel, message['data'])) time.sleep(2) print("%s: Send finished job %s " % (self.channel, message['data'])) ret = r.publish(CHANNEL_RESULT, message['data']) if ret == 0: print("%s: Send finished job %s failed." % (self.channel, message['data'])) if __name__ == "__main__": MySlave().start() time.sleep(10000)
說明
MySlave類 - slave節(jié)點主程序,用來啟動MyJobWorkerThread的線程
MyJobWorkerThread類 - 從channel里獲取派發(fā)的作業(yè)并將運行結(jié)果發(fā)送回master
測試
首先運行MySlave來定義派發(fā)作業(yè)channel。
然后運行MyMaster派發(fā)作業(yè)并顯示執(zhí)行結(jié)果。
有關(guān)Python使用Redis實現(xiàn)作業(yè)調(diào)度系統(tǒng)(超簡單),小編就給大家介紹這么多,希望對大家有所幫助!
- Python讀寫Redis數(shù)據(jù)庫操作示例
- Python與Redis的連接教程
- Python的Flask框架使用Redis做數(shù)據(jù)緩存的配置方法
- python操作redis的方法
- python連接MySQL、MongoDB、Redis、memcache等數(shù)據(jù)庫的方法
- Python操作Redis之設(shè)置key的過期時間實例代碼
- Python獲取Redis所有Key以及內(nèi)容的方法
- python操作redis方法總結(jié)
- python安裝與使用redis的方法
- python中redis的安裝和使用
- Python連接Redis的基本配置方法
- python實現(xiàn)與redis交互操作詳解
相關(guān)文章
python實現(xiàn)隨機調(diào)用一個瀏覽器打開網(wǎng)頁
下面小編就為大家分享一篇python實現(xiàn)隨機調(diào)用一個瀏覽器打開網(wǎng)頁,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04Pandas的read_csv函數(shù)參數(shù)分析詳解
這篇文章主要介紹了Pandas的read_csv函數(shù)參數(shù)分析詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07Python實現(xiàn)常見數(shù)據(jù)格式轉(zhuǎn)換的方法詳解
這篇文章主要為大家詳細介紹了Python實現(xiàn)常見數(shù)據(jù)格式轉(zhuǎn)換的方法,主要是xml_to_csv和csv_to_tfrecord,感興趣的小伙伴可以了解一下2022-09-09Python中免驗證跳轉(zhuǎn)到內(nèi)容頁的實例代碼
在本篇文章里小編給大家整理的是一篇關(guān)于Python中免驗證跳轉(zhuǎn)到內(nèi)容頁的實例代碼,有興趣的朋友們可以學(xué)習(xí)分享下。2020-10-10pandas apply 函數(shù) 實現(xiàn)多進程的示例講解
下面小編就為大家分享一篇pandas apply 函數(shù) 實現(xiàn)多進程的示例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04