基于redis樂觀鎖實現(xiàn)并發(fā)排隊
有個需求場景是這樣的,使用redis控制scrapy運行的數(shù)量。當系統(tǒng)的后臺設置為4時,只允許scapry啟動4個任務,多余的任務則進行排隊。
概況
最近做了一個django + scrapy + celery + redis 的爬蟲系統(tǒng),客戶購買的主機除了跑其他程序外,還要跑我開發(fā)的這套程序,所以需要手動控制scrapy的實例數(shù)量,避免過多的爬蟲給系統(tǒng)造成負擔。
流程設計
1、爬蟲任務由用戶以請求的方式發(fā)起,所有的用戶的請求統(tǒng)一進入到celery進行排隊;
2、任務數(shù)量控制的執(zhí)行就交給reids,經(jīng)由celery保存到redis,包含了爬蟲啟動所需要的必要信息,從redis取一條信息即可啟動一個爬蟲;
3、通過scrapyd的接口來獲取當前在運行的爬蟲數(shù)量,以便決定下一步流程:如果小于4,則從redis中取相應數(shù)量的信息來啟動爬蟲,如果大于等于4,則繼續(xù)等待;
4、如果在運行爬蟲的數(shù)量有所減少,則及時從reids中取相應數(shù)量的信息來啟動爬蟲。
代碼實現(xiàn)
業(yè)務代碼有點復雜和啰嗦,此處使用偽代碼來演示
import redis # 實例化一個redis連接池 pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='') r = redis.Redis(connection_pool=pool) # 爬蟲實例限制為4 即只允許4個scrapy實例在運行 limited = 4 # 聲明redis的樂觀鎖 lock = r.Lock() # lock.acquire中有while循環(huán),即它會線程阻塞,直到當前線程獲得redis的lock,才會繼續(xù)往下執(zhí)行代碼 if lock.acquire(): # 1、從reids中取一條爬蟲信息 info = redis.get() # 2、while循環(huán)監(jiān)聽爬蟲運行的數(shù)量 while True: req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json() # 統(tǒng)計當前有多少個爬蟲在運行 running = req.get('running') + req.get('pending') # 3、判斷是否等待還是要增加爬蟲數(shù)量 # 3.1 如果在運行的數(shù)量大于等于設置到量 則繼續(xù)等待 if running >= limited: continue # 3.2 如果小于 則啟動爬蟲 start_scrapy(info) # 3.3 將info從redis中刪除 redis.delete(info) # 3.4 釋放鎖 lock.release() break
當前,這只是偽代碼而已,實際的業(yè)務邏輯可能是非常復雜的,如:
@shared_task def scrapy_control(key_uuid): r = redis.Redis(connection_pool=pool) db = MysqlDB() speed_limited = db.fetch_config('REPTILE_SPEED') speed_limited = int(speed_limited[0]) keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM') keywords_num = int(keywords_num[0]) # while True: lock = r.lock('lock') with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 進入處理環(huán)節(jié)' + '\n') try: # acquire默認阻塞 如果獲取不到鎖時 會一直阻塞在這個函數(shù)的while循環(huán)中 if lock.acquire(): with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖' + '\n') # 1 從redis中獲取信息 redis_obj = json.loads(r.get(key_uuid)) user_id = redis_obj.get('user_id') contents = redis_obj.get('contents') # 2 使用while循環(huán)處理核心邏輯 is_hold_print = True while True: req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json() running = req.get('running') + req.get('pending') # 3 如果仍然有足夠的爬蟲在運行 則hold住redis鎖,等待有空余的爬蟲位置讓出 if running >= speed_limited: if is_hold_print: with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬蟲在運行,線程等待中' + '\n') is_hold_print = False time.sleep(1) continue # 4 有空余的爬蟲位置 則往下走 # 4.1 處理完所有的內(nèi)容后 釋放鎖 if len(contents) == 0: r.delete(key_uuid) with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任務已完成,從redis中刪除' + '\n') lock.release() with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 釋放鎖' + '\n') break # 4.2 創(chuàng)建task任務 task_uuid = str(uuid.uuid4()) article_obj = contents.pop() article_id = article_obj.get('article_id') article = article_obj.get('content') try: Task.objects.create( task_uuid = task_uuid, user_id = user_id, article_id = article_id, content = article ) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 創(chuàng)建Task出錯: ' + str(e) + '\n') # finally: # 4.3 啟動爬蟲任務 即便創(chuàng)建task失敗也會啟動 try: task_chain(user_id, article, task_uuid, keywords_num) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 啟動任務鏈失敗: ' + str(e) + '\n') # 加入sleep 防止代碼執(zhí)行速度快于爬蟲啟動速度而導致當前線程啟動額外的爬蟲 time.sleep(5) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖之后的操作出錯: ' + str(e) + '\n') lock.release()
小坑
scrapy啟動速度相對較慢,所以while循環(huán)中,代碼中執(zhí)行到了爬蟲的啟動,需要sleep一下再去通過scrapyd接口獲取爬蟲運行的數(shù)量,如果立刻讀取,可能會造成誤判。
到此這篇關于基于redis樂觀鎖實現(xiàn)并發(fā)排隊的文章就介紹到這了,更多相關基于redis樂觀鎖實現(xiàn)并發(fā)排隊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!