python基于mysql實(shí)現(xiàn)的簡(jiǎn)單隊(duì)列以及跨進(jìn)程鎖實(shí)例詳解
通常在我們進(jìn)行多進(jìn)程應(yīng)用開(kāi)發(fā)的過(guò)程中,不可避免的會(huì)遇到多個(gè)進(jìn)程訪問(wèn)同一個(gè)資源(臨界資源)的狀況,這時(shí)候必須通過(guò)加一個(gè)全局性的鎖,來(lái)實(shí)現(xiàn)資源的同步訪問(wèn)(即:同一時(shí)間里只能有一個(gè)進(jìn)程訪問(wèn)資源)。
舉個(gè)例子如下:
假設(shè)我們用mysql來(lái)實(shí)現(xiàn)一個(gè)任務(wù)隊(duì)列,實(shí)現(xiàn)的過(guò)程如下:
1. 在Mysql中創(chuàng)建Job表,用于儲(chǔ)存隊(duì)列任務(wù),如下:
create table jobs( id auto_increment not null primary key, message text not null, job_status not null default 0 );
message 用來(lái)存儲(chǔ)任務(wù)信息,job_status用來(lái)標(biāo)識(shí)任務(wù)狀態(tài),假設(shè)只有兩種狀態(tài),0:在隊(duì)列中, 1:已出隊(duì)列
2. 有一個(gè)生產(chǎn)者進(jìn)程,往job表中放新的數(shù)據(jù),進(jìn)行排隊(duì):
insert into jobs(message) values('msg1');
3.假設(shè)有多個(gè)消費(fèi)者進(jìn)程,從job表中取排隊(duì)信息,要做的操作如下:
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id為剛剛?cè)〉玫挠涗沬d
4. 如果沒(méi)有跨進(jìn)程的鎖,兩個(gè)消費(fèi)者進(jìn)程有可能同時(shí)取到重復(fù)的消息,導(dǎo)致一個(gè)消息被消費(fèi)多次。這種情況是我們不希望看到的,于是,我們需要實(shí)現(xiàn)一個(gè)跨進(jìn)程的鎖。
=========================分割線=======================================
說(shuō)到跨進(jìn)程的鎖實(shí)現(xiàn),我們主要有幾種實(shí)現(xiàn)方式:
(1)信號(hào)量
(2)文件鎖fcntl
(3)socket(端口號(hào)綁定)
(4)signal
這幾種方式各有利弊,總體來(lái)說(shuō)前2種方式可能多一點(diǎn),這里我就不詳細(xì)說(shuō)了,大家可以去查閱資料。
查資料的時(shí)候發(fā)現(xiàn)mysql中有鎖的實(shí)現(xiàn),適用于對(duì)于性能要求不是很高的應(yīng)用場(chǎng)景,大并發(fā)的分布式訪問(wèn)可能會(huì)有瓶頸.
對(duì)此用python實(shí)現(xiàn)了一個(gè)demo,如下:
文件名:glock.py
#!/usr/bin/env python2.7 # # -*- coding:utf-8 -*- # # Desc : # import logging, time import MySQLdb class Glock: def __init__(self, db): self.db = db def _execute(self, sql): cursor = self.db.cursor() try: ret = None cursor.execute(sql) if cursor.rowcount != 1: logging.error("Multiple rows returned in mysql lock function.") ret = None else: ret = cursor.fetchone() cursor.close() return ret except Exception, ex: logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex)) cursor.close() return None def lock(self, lockstr, timeout): sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout) ret = self._execute(sql) if ret[0] == 0: logging.debug("Another client has previously locked '%s'.", lockstr) return False elif ret[0] == 1: logging.debug("The lock '%s' was obtained successfully.", lockstr) return True else: logging.error("Error occurred!") return None def unlock(self, lockstr): sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) ret = self._execute(sql) if ret[0] == 0: logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) return False elif ret[0] == 1: logging.debug("The lock '%s' the lock was released.", lockstr) return True else: logging.error("The lock '%s' did not exist.", lockstr) return None #Init logging def init_logging(): sh = logging.StreamHandler() logger = logging.getLogger() logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) def main(): init_logging() db = MySQLdb.connect(host='localhost', user='root', passwd='') lock_name = 'queue' l = Glock(db) ret = l.lock(lock_name, 10) if ret != True: logging.error("Can't get lock! exit!") quit() time.sleep(10) logging.info("You can do some synchronization work across processes!") ##TODO ## you can do something in here ## l.unlock(lock_name) if __name__ == "__main__": main()
在main函數(shù)里:
l.lock(lock_name, 10) 中,10是表示timeout的時(shí)間是10秒,如果10秒還獲取不了鎖,就返回,執(zhí)行后面的操作。
在這個(gè)demo中,在標(biāo)記TODO的地方,可以將消費(fèi)者從job表中取消息的邏輯放在這里。即分割線以上的.
2.假設(shè)有多個(gè)消費(fèi)者進(jìn)程,從job表中取排隊(duì)信息,要做的操作如下:
select * from jobs where job_status=0 order by id asc limit 1; update jobs set job_status=1 where id = ?; -- id為剛剛?cè)〉玫挠涗沬d
這樣,就能保證多個(gè)進(jìn)程訪問(wèn)臨界資源時(shí)同步進(jìn)行了,保證數(shù)據(jù)的一致性。
測(cè)試的時(shí)候,啟動(dòng)兩個(gè)glock.py, 結(jié)果如下:
[@tj-10-47 test]# ./glock.py 2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
可以看到第一個(gè)glock.py是 17:08:50解鎖的,下面的glock.py是在17:08:50獲取鎖的,可以證實(shí)這樣是完全可行的。
[@tj-10-47 test]# ./glock.py 2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. [@tj-10-47 test]#
相關(guān)文章
深入解析NumPy中的Broadcasting廣播機(jī)制
在吳恩達(dá)老師的深度學(xué)習(xí)專項(xiàng)課程中,老師有提到NumPy中的廣播機(jī)制,同時(shí)那一周的測(cè)驗(yàn)也有涉及到廣播機(jī)制的題目。那么,到底什么是NumPy中的廣播機(jī)制?本文就來(lái)介紹一下2021-05-05使用pandas庫(kù)對(duì)csv文件進(jìn)行篩選保存
這篇文章主要介紹了使用pandas庫(kù)對(duì)csv文件進(jìn)行篩選保存,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05Python實(shí)現(xiàn)的繪制三維雙螺旋線圖形功能示例
這篇文章主要介紹了Python實(shí)現(xiàn)的繪制三維雙螺旋線圖形功能,結(jié)合實(shí)例形式分析了Python使用matplotlib、numpy模塊進(jìn)行數(shù)值運(yùn)算及圖形繪制相關(guān)操作技巧,需要的朋友可以參考下2018-06-06Python?程序通過(guò)可執(zhí)行文件部署方式
這篇文章主要介紹了Python?程序通過(guò)可執(zhí)行文件部署方式,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04卸載tensorflow-cpu重裝tensorflow-gpu操作
這篇文章主要介紹了卸載tensorflow-cpu重裝tensorflow-gpu操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-06-06Python?pandas的八個(gè)生命周期總結(jié)
這篇文章主要從八個(gè)pandas的數(shù)據(jù)處理生命周期,整理匯總出pandas框架在整個(gè)數(shù)據(jù)處理過(guò)程中都是如何處理數(shù)據(jù)的,感興趣的小伙伴可以了解一下2022-10-10Python利用Selenium實(shí)現(xiàn)自動(dòng)觀看學(xué)習(xí)通視頻
Selenium是一個(gè)用于Web應(yīng)用程序測(cè)試的工具。Selenium測(cè)試直接運(yùn)行在瀏覽器中,就像真正的用戶在操作一樣。本文主要介紹了利用Selenium實(shí)現(xiàn)自動(dòng)觀看學(xué)習(xí)通視頻,需要的同學(xué)可以參考一下2021-12-12