亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python使用Redis實現(xiàn)作業(yè)調(diào)度系統(tǒng)(超簡單)

 更新時間:2016年03月22日 10:22:09   作者:kongxx  
Redis作為內(nèi)存數(shù)據(jù)庫的一個典型代表,已經(jīng)在很多應(yīng)用場景中被使用,這里僅就Redis的pub/sub功能來說說怎樣通過此功能來實現(xiàn)一個簡單的作業(yè)調(diào)度系統(tǒng)。這里只是想展現(xiàn)一個簡單的想法,所以還是有很多需要考慮的東西沒有包括在這個例子中,比如錯誤處理,持久化等

概述

Redis是一個開源,先進的key-value存儲,并用于構(gòu)建高性能,可擴展的Web應(yīng)用程序的完美解決方案。

Redis從它的許多競爭繼承來的三個主要特點:

Redis數(shù)據(jù)庫完全在內(nèi)存中,使用磁盤僅用于持久性。

相比許多鍵值數(shù)據(jù)存儲,Redis擁有一套較為豐富的數(shù)據(jù)類型。

Redis可以將數(shù)據(jù)復(fù)制到任意數(shù)量的從服務(wù)器。

Redis 優(yōu)勢

異??焖伲篟edis的速度非常快,每秒能執(zhí)行約11萬集合,每秒約81000+條記錄。

支持豐富的數(shù)據(jù)類型:Redis支持最大多數(shù)開發(fā)人員已經(jīng)知道像列表,集合,有序集合,散列數(shù)據(jù)類型。這使得它非常容易解決各種各樣的問題,因為我們知道哪些問題是可以處理通過它的數(shù)據(jù)類型更好。

操作都是原子性:所有Redis操作是原子的,這保證了如果兩個客戶端同時訪問的Redis服務(wù)器將獲得更新后的值。

多功能實用工具:Redis是一個多實用的工具,可以在多個用例如緩存,消息,隊列使用(Redis原生支持發(fā)布/訂閱),任何短暫的數(shù)據(jù),應(yīng)用程序,如Web應(yīng)用程序會話,網(wǎng)頁命中計數(shù)等。

步入主題:

Redis作為內(nèi)存數(shù)據(jù)庫的一個典型代表,已經(jīng)在很多應(yīng)用場景中被使用,這里僅就Redis的pub/sub功能來說說怎樣通過此功能來實現(xiàn)一個簡單的作業(yè)調(diào)度系統(tǒng)。這里只是想展現(xiàn)一個簡單的想法,所以還是有很多需要考慮的東西沒有包括在這個例子中,比如錯誤處理,持久化等。

下面是實現(xiàn)上的想法

MyMaster:集群的master節(jié)點程序,負責(zé)產(chǎn)生作業(yè),派發(fā)作業(yè)和獲取執(zhí)行結(jié)果。

MySlave:集群的計算節(jié)點程序,每個計算節(jié)點一個,負責(zé)獲取作業(yè)并運行,并將結(jié)果發(fā)送會master節(jié)點。

channel CHANNEL_DISPATCH:每個slave節(jié)點訂閱一個channel,比如“CHANNEL_DISPATCH_[idx或機器名]”,master會向此channel中publish被dispatch的作業(yè)。

channel CHANNEL_RESULT:用來保存作業(yè)結(jié)果的channel,master和slave共享此channel,master訂閱此channel來獲取作業(yè)運行結(jié)果,每個slave負責(zé)將作業(yè)執(zhí)行結(jié)果發(fā)布到此channel中。

Master代碼

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MyMaster():
def __init__(self):
pass
def start(self):
MyServerResultHandleThread().start()
MyServerDispatchThread().start()
class MyServerDispatchThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
for i in range(1, 100):
channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
print("Dispatch job %s to %s" % (str(i), channel))
ret = r.publish(channel, str(i))
if ret == 0:
print("Dispatch job %s failed." % str(i))
time.sleep(5)
class MyServerResultHandleThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(CHANNEL_RESULT)
for message in p.listen():
if message['type'] != 'message':
continue
print("Received finished job %s" % message['data'])
if __name__ == "__main__":
MyMaster().start()
time.sleep(10000)

說明

MyMaster類 - master主程序,用來啟動dispatch和resulthandler的線程

MyServerDispatchThread類 - 派發(fā)作業(yè)線程,產(chǎn)生作業(yè)并派發(fā)到計算節(jié)點

MyServerResultHandleThread類 - 作業(yè)運行結(jié)果處理線程,從channel里獲取作業(yè)結(jié)果并顯示

Slave代碼

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MySlave():
def __init__(self):
pass
def start(self):
for i in range(1, 4):
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
class MyJobWorkerThread(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.channel = channel
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(self.channel)
for message in p.listen():
if message['type'] != 'message':
continue
print("%s: Received dispatched job %s " % (self.channel, message['data']))
print("%s: Run dispatched job %s " % (self.channel, message['data']))
time.sleep(2)
print("%s: Send finished job %s " % (self.channel, message['data']))
ret = r.publish(CHANNEL_RESULT, message['data'])
if ret == 0:
print("%s: Send finished job %s failed." % (self.channel, message['data']))
if __name__ == "__main__":
MySlave().start()
time.sleep(10000)

說明

MySlave類 - slave節(jié)點主程序,用來啟動MyJobWorkerThread的線程

MyJobWorkerThread類 - 從channel里獲取派發(fā)的作業(yè)并將運行結(jié)果發(fā)送回master

測試

首先運行MySlave來定義派發(fā)作業(yè)channel。

然后運行MyMaster派發(fā)作業(yè)并顯示執(zhí)行結(jié)果。

有關(guān)Python使用Redis實現(xiàn)作業(yè)調(diào)度系統(tǒng)(超簡單),小編就給大家介紹這么多,希望對大家有所幫助!

相關(guān)文章

最新評論