亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Python大數(shù)據(jù)量文本文件高效解析方案代碼實(shí)現(xiàn)全過(guò)程

 更新時(shí)間:2022年12月19日 07:27:15   作者:授客  
在數(shù)據(jù)分析中,有時(shí)數(shù)據(jù)源會(huì)是超大的文本文件(幾G,或在幾十G),需要從中提取需要的信息,下面這篇文章主要給大家介紹了關(guān)于Python大數(shù)據(jù)量文本文件高效解析方案代碼實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下

測(cè)試環(huán)境

Python 3.6.2

Win 10 內(nèi)存 8G,CPU I5 1.6 GHz

背景描述

這個(gè)作品來(lái)源于一個(gè)日志解析工具的開(kāi)發(fā),這個(gè)開(kāi)發(fā)過(guò)程中遇到的一個(gè)痛點(diǎn),就是日志文件多,日志數(shù)據(jù)量大,解析耗時(shí)長(zhǎng)。在這種情況下,尋思一種高效解析數(shù)據(jù)解析方案。

解決方案描述

1、采用多線程讀取文件

2、采用按塊讀取文件替代按行讀取文件

由于日志文件都是文本文件,需要讀取其中每一行進(jìn)行解析,所以一開(kāi)始會(huì)很自然想到采用按行讀取,后面發(fā)現(xiàn)合理配置下,按塊讀取,會(huì)比按行讀取更高效。

按塊讀取來(lái)的問(wèn)題就是,可能導(dǎo)致完整的數(shù)據(jù)行分散在不同數(shù)據(jù)塊中,那怎么解決這個(gè)問(wèn)題呢?解答如下:

將數(shù)據(jù)塊按換行符\n切分得到日志行列表,列表第一個(gè)元素可能是一個(gè)完整的日志行,也可能是上一個(gè)數(shù)據(jù)塊末尾日志行的組成部分,列表最后一個(gè)元素可能是不完整的日志行(即下一個(gè)數(shù)據(jù)塊開(kāi)頭日志行的組成部分),也可能是空字符串(日志塊中的日志行數(shù)據(jù)全部是完整的),根據(jù)這個(gè)規(guī)律,得出以下公式,通過(guò)該公式,可以得到一個(gè)新的數(shù)據(jù)塊,對(duì)該數(shù)據(jù)塊二次切分,可以得到數(shù)據(jù)完整的日志行

上一個(gè)日志塊首部日志行 +\n + 尾部日志行 + 下一個(gè)數(shù)據(jù)塊首部日志行 + \n + 尾部日志行 + ...

3、將數(shù)據(jù)解析操作拆分為可并行解析部分和不可并行解析部分

數(shù)據(jù)解析往往涉及一些不可并行的操作,比如數(shù)據(jù)求和,最值統(tǒng)計(jì)等,如果不進(jìn)行拆分,并行解析時(shí)勢(shì)必需要添加互斥鎖,避免數(shù)據(jù)覆蓋,這樣就會(huì)大大降低執(zhí)行的效率,特別是不可并行操作占比較大的情況下。

對(duì)數(shù)據(jù)解析操作進(jìn)行拆分后,可并行解析操作部分不用加鎖。考慮到Python GIL的問(wèn)題,不可并行解析部分替換為單進(jìn)程解析。

4、采用多進(jìn)程解析替代多線程解析

采用多進(jìn)程解析替代多線程解析,可以避開(kāi)Python GIL全局解釋鎖帶來(lái)的執(zhí)行效率問(wèn)題,從而提高解析效率。

5、采用隊(duì)列實(shí)現(xiàn)“協(xié)同”效果

引入隊(duì)列機(jī)制,實(shí)現(xiàn)一邊讀取日志,一邊進(jìn)行數(shù)據(jù)解析:

  • 日志讀取線程將日志塊存儲(chǔ)到隊(duì)列,解析進(jìn)程從隊(duì)列獲取已讀取日志塊,執(zhí)行可并行解析操作
  • 并行解析操作進(jìn)程將解析后的結(jié)果存儲(chǔ)到另一個(gè)隊(duì)列,另一個(gè)解析進(jìn)程從隊(duì)列獲取數(shù)據(jù),執(zhí)行不可并行解析操作。

代碼實(shí)現(xiàn)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from datetime import datetime
from joblib import Parallel, delayed, parallel_backend
from collections import deque
from multiprocessing import cpu_count
import threading


class LogParser(object):
    def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
        self.log_unparsed_queue = deque() # 用于存儲(chǔ)未解析日志
        self.log_line_parsed_queue = deque()  # 用于存儲(chǔ)已解析日志行
        self.is_all_files_read = False  # 標(biāo)識(shí)是否已讀取所有日志文件
        self.process_num_for_log_parsing = process_num_for_log_parsing # 并發(fā)解析日志文件進(jìn)程數(shù)
        self.chunk_size = chunk_size # 每次讀取日志的日志塊大小
        self.files_read_list = [] # 存放已讀取日志文件
        self.log_parsing_finished = False # 標(biāo)識(shí)是否完成日志解析


    def read_in_chunks(self, filePath, chunk_size=1024*1024):
        """
        惰性函數(shù)(生成器),用于逐塊讀取文件。
        默認(rèn)區(qū)塊大小:1M
        """

        with open(filePath, 'r', encoding='utf-8') as f:            
            while True:
                chunk_data = f.read(chunk_size)
                if not chunk_data:
                    break
                yield chunk_data


    def read_log_file(self, logfile_path):
        '''
        讀取日志文件
        這里假設(shè)日志文件都是文本文件,按塊讀取后,可按換行符進(jìn)行二次切分,以便獲取行日志
        '''

        temp_list = []  # 二次切分后,頭,尾行日志可能是不完整的,所以需要將日志塊頭尾行日志相連接,進(jìn)行拼接
        for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
            log_chunk = chunk.split('\n')
            temp_list.extend([log_chunk[0], '\n'])
            temp_list.append(log_chunk[-1])
            self.log_unparsed_queue.append(log_chunk[1:-1])
        self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
        self.files_read_list.remove(logfile_path)


    def start_processes_for_log_parsing(self):
        '''啟動(dòng)日志解析進(jìn)程'''

        with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
            Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))

        self.log_parsing_finished = True

    def parse_logs(self):
        '''解析日志'''

        method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL)
        url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL)

        while self.log_unparsed_queue or self.files_read_list:
            if not self.log_unparsed_queue:
                continue
            log_line_list = self.log_unparsed_queue.popleft()
            for log_line in log_line_list:
                #### do something with log_line
                if not log_line.strip():
                    continue

                res = method_url_re_pattern.findall(log_line)
                if not res:
                    print('日志未匹配到請(qǐng)求URL,已忽略:\n%s' % log_line)
                    continue
                method = res[0][0]
                url = res[0][1].split('?')[0]  # 去掉了 ?及后面的url參數(shù)

                # 提取耗時(shí)
                res = url_time_taken_extractor.findall(log_line)
                if res:
                    time_taken = float(res[0])
                else:
                    print('未從日志提取到請(qǐng)求耗時(shí),已忽略日志:\n%s' % log_line)
                    continue

                # 存儲(chǔ)解析后的日志信息
                self.log_line_parsed_queue.append({'method': method,
                                                   'url': url,
                                                   'time_taken': time_taken,
                                                   })


    def collect_statistics(self):
        '''收集統(tǒng)計(jì)數(shù)據(jù)'''

        def _collect_statistics():
            while self.log_line_parsed_queue or not self.log_parsing_finished:
                if not self.log_line_parsed_queue:
                    continue
                log_info = self.log_line_parsed_queue.popleft()
                # do something with log_info
       
        with parallel_backend("multiprocessing", n_jobs=1):
            Parallel()(delayed(_collect_statistics)() for i in range(1))

    def run(self, file_path_list):
        # 多線程讀取日志文件
        for file_path in file_path_list:
            thread = threading.Thread(target=self.read_log_file,
                                      name="read_log_file",
                                      args=(file_path,))
            thread.start()
            self.files_read_list.append(file_path)

        # 啟動(dòng)日志解析進(jìn)程
        thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
        thread.start()

        # 啟動(dòng)日志統(tǒng)計(jì)數(shù)據(jù)收集進(jìn)程
        thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
        thread.start()

        start = datetime.now()
        while threading.active_count() > 1:
            print('程序正在努力解析日志...')
            time.sleep(0.5)

        end = datetime.now()
        print('解析完成', 'start', start, 'end', end, '耗時(shí)', end - start)

if __name__ == "__main__":
    log_parser = LogParser()
    log_parser.run(['access.log', 'access2.log'])

注意:

需要合理的配置單次讀取文件數(shù)據(jù)塊的大小,不能過(guò)大,或者過(guò)小,否則都可能會(huì)導(dǎo)致數(shù)據(jù)讀取速度變慢。筆者實(shí)踐環(huán)境下,發(fā)現(xiàn)10M~15M每次是一個(gè)比較高效的配置。

總結(jié)

到此這篇關(guān)于Python大數(shù)據(jù)量文本文件高效解析方案代碼實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Python大數(shù)據(jù)量文本文件解析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 分析Python list操作為什么會(huì)錯(cuò)誤

    分析Python list操作為什么會(huì)錯(cuò)誤

    這篇文章主要介紹了分析Python list操作為什么會(huì)錯(cuò)誤,python搞數(shù)據(jù)分析,在很多方面python有著比Matlab更大的優(yōu)勢(shì),下面來(lái)看看文章具體介紹的相關(guān)內(nèi)容吧,需要的朋友可以參考一下
    2021-11-11
  • Python操作注冊(cè)表詳細(xì)步驟介紹

    Python操作注冊(cè)表詳細(xì)步驟介紹

    Python編程語(yǔ)言最大的特點(diǎn)在于其簡(jiǎn)單易用,可以大大方便開(kāi)發(fā)人員的程序開(kāi)發(fā)。在這里我們就一起來(lái)了解一下有關(guān)Python操作注冊(cè)表的相關(guān)應(yīng)用技術(shù)。Python操作注冊(cè)表相關(guān)的函數(shù)可以分為打開(kāi)注冊(cè)表、關(guān)閉注冊(cè)表、讀取項(xiàng)值、c添加項(xiàng)值、添加項(xiàng),以及刪除項(xiàng)等幾類
    2020-02-02
  • python愛(ài)心表白 每天都是浪漫七夕!

    python愛(ài)心表白 每天都是浪漫七夕!

    每天都是浪漫七夕!這篇文章主要為大家詳細(xì)介紹了python愛(ài)心表白,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-08-08
  • python 使用shutil復(fù)制圖片的例子

    python 使用shutil復(fù)制圖片的例子

    今天小編就為大家分享一篇python 使用shutil復(fù)制圖片的例子,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2019-12-12
  • python imutils包基本概念及使用

    python imutils包基本概念及使用

    python imutils包可以很簡(jiǎn)潔的調(diào)用opencv接口,輕松實(shí)現(xiàn)圖像的平移,旋轉(zhuǎn),縮放,骨架化等操作,對(duì)python imutils包基本概念及使用方法感興趣的朋友一起看看吧
    2021-07-07
  • Python滲透測(cè)試入門之Scapy庫(kù)的使用詳解

    Python滲透測(cè)試入門之Scapy庫(kù)的使用詳解

    Scapy?是一個(gè)用來(lái)解析底層網(wǎng)絡(luò)數(shù)據(jù)包的Python模塊和交互式程序,該程序?qū)Φ讓影幚磉M(jìn)行了抽象打包,使得對(duì)網(wǎng)絡(luò)數(shù)據(jù)包的處理非常簡(jiǎn)便。本文就來(lái)聊聊它的具體使用,希望對(duì)大家有所幫助
    2023-03-03
  • Python矩陣常見(jiàn)運(yùn)算操作實(shí)例總結(jié)

    Python矩陣常見(jiàn)運(yùn)算操作實(shí)例總結(jié)

    這篇文章主要介紹了Python矩陣常見(jiàn)運(yùn)算操作,結(jié)合實(shí)例形式總結(jié)分析了Python矩陣的創(chuàng)建以及相乘、求逆、轉(zhuǎn)置等相關(guān)操作實(shí)現(xiàn)方法,需要的朋友可以參考下
    2017-09-09
  • 詳解Python里使用正則表達(dá)式的ASCII模式

    詳解Python里使用正則表達(dá)式的ASCII模式

    ASCII(American Standard Code for Information Interchange),是一種單字節(jié)的編碼。這篇文章主要介紹了Python里使用正則表達(dá)式的ASCII模式,需要的朋友可以參考下
    2017-11-11
  • 音頻處理 windows10下python三方庫(kù)librosa安裝教程

    音頻處理 windows10下python三方庫(kù)librosa安裝教程

    這篇文章主要介紹了音頻處理 windows10下python三方庫(kù)librosa安裝方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-06-06
  • Python如何新建三維數(shù)組并賦值

    Python如何新建三維數(shù)組并賦值

    本文詳細(xì)介紹了如何使用Python和numpy庫(kù)建立三維數(shù)組并對(duì)其進(jìn)行賦值。首先,通過(guò)numpy創(chuàng)建一個(gè)3x3x3的三維數(shù)組,其次,將自定義的二維數(shù)組賦值到三維數(shù)組中。本文還解釋了相關(guān)參數(shù)的含義,使讀者能夠更好地理解和應(yīng)用到其他多維矩陣的操作中
    2024-09-09

最新評(píng)論