亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

python調用subprocess模塊實現(xiàn)命令行操作控制SVN的方法

 更新時間:2022年09月08日 10:52:43   作者:Logintern09  
這篇文章主要介紹了使用python的subprocess模塊實現(xiàn)對SVN的相關操作,通過設置GitSvn類,在該類下自定義執(zhí)行SVN常規(guī)操作的方法,需要的朋友跟隨小編一起看看吧

使用python的subprocess模塊實現(xiàn)對SVN的相關操作。

設置GitSvn類,在該類下自定義執(zhí)行SVN常規(guī)操作的方法。

SVN的常規(guī)操作包括:
(1)獲取SVN當前版本,通過方法get_version()實現(xiàn);

(2)下載SVN指定倉庫,通過方法download()實現(xiàn),實際是通過調用SVN的命令行操作指令svn checkout實現(xiàn)的下載整個倉庫功能;

(3)獲取SVN某個倉庫下的所有文件列表,通過方法search_dir()實現(xiàn),實際是通過調用SVN的命令行操作指令svn list實現(xiàn)的獲取倉庫文件列表功能;

(4)在SVN指定位置創(chuàng)建新的文件夾,通過方法mkdir_command()實現(xiàn),實際是通過調用SVN的命令行操作指令svn mkdir實現(xiàn)的創(chuàng)建文件夾或者目錄功能;

(5)將本地倉庫文件添加到SVN倉庫,通過方法upload_file()實現(xiàn),實際是通過調用SVN的命令行操作指令svn add實現(xiàn)的添加文件功能;

(6)將本地倉庫已添加的文件提交SVN指定倉庫,通過方法commit_command()實現(xiàn),實際是通過調用SVN的命令行操作指令svn commit實現(xiàn)的提交文件功能;

(7)刪除SVN倉庫的目錄,通過方法delete_url()實現(xiàn),實際是通過調用SVN的命令行操作指令svn delete實現(xiàn)的刪除目錄功能;

(8)鎖定SVN倉庫的文件,通過方法lock_file()實現(xiàn),實際是通過調用SVN的命令行操作指令svn lock實現(xiàn)的鎖定文件功能;

(9)將SVN倉庫的文件解除鎖定,通過方法unlock_file()實現(xiàn),實際是通過調用SVN的命令行操作指令svn unlock實現(xiàn)的解除文件鎖定功能;

(10)查看SVN倉庫文件當前狀態(tài),通過方法check_status()實現(xiàn),實際是通過調用SVN的命令行操作指令svn status實現(xiàn)的查看文件狀態(tài)功能;

(11)更新SVN倉庫的某個文件,通過方法update_file()實現(xiàn),實際是通過調用SVN的命令行操作指令svn up實現(xiàn)的更新文件功能;

GitSvn類定義如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Logintern09


import os
import time
import subprocess
from log_manage import logger

rq = time.strftime('%Y%m%d', time.localtime(time.time()))
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp_files")
tempfile = os.path.join(file_path, rq + '.log')

class GitSvn(object):
    def __init__(self, bill_addr):
        self.bill_addr = bill_addr

    def get_version(self):
        cmd = "svn info %s" % self.bill_addr
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        logger.info(result)
        error = error.decode(encoding="gbk")
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)

    def download(self, dir_path):
        with open(tempfile, "r") as f:
            result = f.readlines()
            for line in result:
                if line.startswith("Revision"):
                    laster_version = int(line.split(":")[-1].strip())
                    logger.info(line)
                    break
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        cmd = "svn checkout %s %s -r r%s" % (dir_path, id_path, laster_version)
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)
            if (error.startswith("svn: E155004:")) or (error.startswith("svn: E155037:")):
                cur_path = os.path.dirname(os.path.realpath(__file__))
                file_path = os.path.join(cur_path, default_sys_name)
                cmd = "svn cleanup"
                logger.info(cmd)
                output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                          cwd=file_path)
                (result, error) = output.communicate()
                result = result.decode(encoding="gbk")
                error = error.decode(encoding="gbk")
                logger.info(result)
                logger.error(error)

    def search_dir(self):
        cmd = "svn list %s" % self.bill_addr
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)

    def mkdir_command(self, id_num):
        cmd = "svn mkdir %s" % id_num
        logger.info(cmd)
        cur_path = os.path.dirname(os.path.realpath(__file__))
        file_path = os.path.join(cur_path, default_sys_name)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=file_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)

    def upload_file(self, file_path):
        cmd = "svn add %s --force" % file_path
        root_path, file_name = os.path.split(file_path)
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=root_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)

    def commit_command(self, file):
        # 執(zhí)行了鎖定的用戶執(zhí)行了提交操作(提交操作將自動解鎖)
        if os.path.isfile(file):
            file_path, file_name = os.path.split(file)
        else:
            file_path = file
        cmd = "svn commit %s -m 'update_files'" % file
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=file_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)
            elif error.startswith("svn: E730053:"):
                res = "數(shù)據(jù)上傳失敗,請重新提交數(shù)據(jù)?。?!"
                raise SystemError(res)
            else:
                res = "數(shù)據(jù)上傳失敗,請重新提交數(shù)據(jù)!?。?
                raise SystemError(res)

    def delete_url(self, url):
        cmd = "svn delete %s" % url
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)

    def lock_file(self, file_name):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn lock %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)
            elif error.startswith("svn: warning: W160042:"):
                res = "系統(tǒng)資源已被其他用戶鎖定,請稍后重試!"
                raise SystemError(res)

    def unlock_file(self, file_name):
        # 不使用--force 參數(shù) 可以解鎖被自己鎖定的文集 即普通的release lock
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn unlock %s --force" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)

    def check_status(self, file_name):
        # ?:不在svn的控制中;M:內容被修改;C:發(fā)生沖突;A:預定加入到版本庫;K:被鎖定
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn status -v %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        # 獲取狀態(tài)結果解析
        status_list = result.split(" ")
        status_flag_list = []
        for i in status_list:
            if i.isalpha():
                status_flag_list.append(i)
            else:
                break
        if result.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)
        elif "C" in status_flag_list:
            return "C"
        elif "K" in status_flag_list:
            return "K"

    def update_file(self, file_name):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn up %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        with open(tempfile, "r") as f:
            result = f.readlines()
            count = -1
            for line in result:
                count += 1
                if (line.strip() != "") and (count == 1):
                    if line.startswith("C"):
                        res = "更新系統(tǒng)資源時發(fā)現(xiàn)存在沖突,請稍后重試!"
                        raise SystemError(res)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "網(wǎng)絡異常,暫時連接不上系統(tǒng),請檢查網(wǎng)絡或其他配置!"
                raise SystemError(res)
            elif (error.startswith("svn: E155037:")) or (error.startswith("svn: E155004:")):
                cur_path = os.path.dirname(os.path.realpath(__file__))
                file_path = os.path.join(cur_path, default_sys_name)
                cmd = "svn cleanup"
                logger.info(cmd)
                output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                          cwd=file_path)
                (result, error) = output.communicate()
                result = result.decode(encoding="gbk")
                error = error.decode(encoding="gbk")
                logger.info(result)
                logger.error(error)

上述類GitSvn的應用實例如下:

if __name__ == '__main__':
    default_url = svn:***  # 用戶本地SVN客戶端的URL地址
    git_class = GitSvn(default_url)
    git_class.get_version()
    git_class.download()
    # 驗證查看目錄文件列表功能
    git_class.search_dir()
    # 驗證刪除目錄功能
    cur_path = os.path.dirname(os.path.realpath(__file__))
    file_path = os.path.join(cur_path, default_sys_name)
    id_path = os.path.join(file_path, 'SCR202202100002')
    git_class.delete_url(id_path)
    # 驗證文件加鎖功能
    git_class.lock_file(file_name="單號總表.xlsx")
    # 驗證文件解鎖功能
    git_class.unlock_file(file_name="單號總表.xlsx")
    # 檢查文件狀態(tài)
    git_class.check_status(file_name="單號總表.xlsx")
    # 驗證創(chuàng)建目錄功能
    git_class.mkdir_command("SCR202203280001")
    # 驗證提交文件功能
    cur_path = os.path.dirname(os.path.realpath(__file__))
    sys_path = os.path.join(cur_path, default_sys_name)
    target_dir = os.path.join(sys_path, "SCR202203280001")
    git_class.upload_file(target_dir)
    git_class.commit_command(target_dir)

到此這篇關于python調用subprocess模塊實現(xiàn)命令行操作控制SVN的文章就介紹到這了,更多相關python subprocess模塊內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

最新評論