python實用代碼片段收集貼
獲取一個類的所有子類
def itersubclasses(cls, _seen=None):
"""Generator over all subclasses of a given class in depth first order."""
if not isinstance(cls, type):
raise TypeError(_('itersubclasses must be called with '
'new-style classes, not %.100r') % cls)
_seen = _seen or set()
try:
subs = cls.__subclasses__()
except TypeError: # fails only when cls is type
subs = cls.__subclasses__(cls)
for sub in subs:
if sub not in _seen:
_seen.add(sub)
yield sub
for sub in itersubclasses(sub, _seen):
yield sub
簡單的線程配合
import threading
is_done = threading.Event()
consumer = threading.Thread(
target=self.consume_results,
args=(key, self.task, runner.result_queue, is_done))
consumer.start()
self.duration = runner.run(
name, kw.get("context", {}), kw.get("args", {}))
is_done.set()
consumer.join() #主線程堵塞,直到consumer運行結束
多說一點,threading.Event()也可以被替換為threading.Condition(),condition有notify(), wait(), notifyAll()。解釋如下:
The wait() method releases the lock, and then blocks until it is awakened by a notify() or notifyAll() call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.
The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notifyAll() method wakes up all threads waiting for the condition variable.
Note: the notify() and notifyAll() methods don't release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.
# Consume one item
cv.acquire()
while not an_item_is_available():
cv.wait()
get_an_available_item()
cv.release()
# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
計算運行時間
class Timer(object):
def __enter__(self):
self.error = None
self.start = time.time()
return self
def __exit__(self, type, value, tb):
self.finish = time.time()
if type:
self.error = (type, value, tb)
def duration(self):
return self.finish - self.start
with Timer() as timer:
func()
return timer.duration()
元類
__new__()方法接收到的參數依次是:
當前準備創(chuàng)建的類的對象;
類的名字;
類繼承的父類集合;
類的方法集合;
class ModelMetaclass(type):
def __new__(cls, name, bases, attrs):
if name=='Model':
return type.__new__(cls, name, bases, attrs)
mappings = dict()
for k, v in attrs.iteritems():
if isinstance(v, Field):
print('Found mapping: %s==>%s' % (k, v))
mappings[k] = v
for k in mappings.iterkeys():
attrs.pop(k)
attrs['__table__'] = name # 假設表名和類名一致
attrs['__mappings__'] = mappings # 保存屬性和列的映射關系
return type.__new__(cls, name, bases, attrs)
class Model(dict):
__metaclass__ = ModelMetaclass
def __init__(self, **kw):
super(Model, self).__init__(**kw)
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Model' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
def save(self):
fields = []
params = []
args = []
for k, v in self.__mappings__.iteritems():
fields.append(v.name)
params.append('?')
args.append(getattr(self, k, None))
sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))
print('SQL: %s' % sql)
print('ARGS: %s' % str(args))
class Field(object):
def __init__(self, name, column_type):
self.name = name
self.column_type = column_type
def __str__(self):
return '<%s:%s>' % (self.__class__.__name__, self.name)
class StringField(Field):
def __init__(self, name):
super(StringField, self).__init__(name, 'varchar(100)')
class IntegerField(Field):
def __init__(self, name):
super(IntegerField, self).__init__(name, 'bigint')
class User(Model):
# 定義類的屬性到列的映射:
id = IntegerField('id')
name = StringField('username')
email = StringField('email')
password = StringField('password')
# 創(chuàng)建一個實例:
u = User(id=12345, name='Michael', email='test@orm.org', password='my-pwd')
# 保存到數據庫:
u.save()
輸出如下:
Found model: User
Found mapping: email ==> <StringField:email>
Found mapping: password ==> <StringField:password>
Found mapping: id ==> <IntegerField:uid>
Found mapping: name ==> <StringField:username>
SQL: insert into User (password,email,username,uid) values (?,?,?,?)
ARGS: ['my-pwd', 'test@orm.org', 'Michael', 12345]
SQLAlchemy簡單使用
# 導入:
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 創(chuàng)建對象的基類:
Base = declarative_base()
# 定義User對象:
class User(Base):
# 表的名字:
__tablename__ = 'user'
# 表的結構:
id = Column(String(20), primary_key=True)
name = Column(String(20))
# 初始化數據庫連接:
engine = create_engine('mysql+mysqlconnector://root:password@localhost:3306/test') # '數據庫類型+數據庫驅動名稱://用戶名:口令@機器地址:端口號/數據庫名'
# 創(chuàng)建DBSession類型:
DBSession = sessionmaker(bind=engine)
# 創(chuàng)建新User對象:
new_user = User(id='5', name='Bob')
# 添加到session:
session.add(new_user)
# 提交即保存到數據庫:
session.commit()
# 創(chuàng)建Query查詢,filter是where條件,最后調用one()返回唯一行,如果調用all()則返回所有行:
user = session.query(User).filter(User.id=='5').one()
# 關閉session:
session.close()
WSGI簡單使用和Web框架Flask的簡單使用
from wsgiref.simple_server import make_server
def application(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html')])
return '<h1>Hello, web!</h1>'
# 創(chuàng)建一個服務器,IP地址為空,端口是8000,處理函數是application:
httpd = make_server('', 8000, application)
print "Serving HTTP on port 8000..."
# 開始監(jiān)聽HTTP請求:
httpd.serve_forever()
了解了WSGI框架,我們發(fā)現:其實一個Web App,就是寫一個WSGI的處理函數,針對每個HTTP請求進行響應。
但是如何處理HTTP請求不是問題,問題是如何處理100個不同的URL。
一個最簡單和最土的想法是從environ變量里取出HTTP請求的信息,然后逐個判斷。
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def home():
return '<h1>Home</h1>'
@app.route('/signin', methods=['GET'])
def signin_form():
return '''<form action="/signin" method="post">
<p><input name="username"></p>
<p><input name="password" type="password"></p>
<p><button type="submit">Sign In</button></p>
</form>'''
@app.route('/signin', methods=['POST'])
def signin():
# 需要從request對象讀取表單內容:
if request.form['username']=='admin' and request.form['password']=='password':
return '<h3>Hello, admin!</h3>'
return '<h3>Bad username or password.</h3>'
if __name__ == '__main__':
app.run()
格式化顯示json
print(json.dumps(data, indent=4))
# 或者
import pprint
pprint.pprint(data)
實現類似Java或C中的枚舉
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import itertools
import sys
class ImmutableMixin(object):
_inited = False
def __init__(self):
self._inited = True
def __setattr__(self, key, value):
if self._inited:
raise Exception("unsupported action")
super(ImmutableMixin, self).__setattr__(key, value)
class EnumMixin(object):
def __iter__(self):
for k, v in itertools.imap(lambda x: (x, getattr(self, x)), dir(self)):
if not k.startswith('_'):
yield v
class _RunnerType(ImmutableMixin, EnumMixin):
SERIAL = "serial"
CONSTANT = "constant"
CONSTANT_FOR_DURATION = "constant_for_duration"
RPS = "rps"
if __name__=="__main__":
print _RunnerType.CONSTANT
創(chuàng)建文件時指定權限
import os
def write_to_file(path, contents, umask=None):
"""Write the given contents to a file
:param path: Destination file
:param contents: Desired contents of the file
:param umask: Umask to set when creating this file (will be reset)
"""
if umask:
saved_umask = os.umask(umask)
try:
with open(path, 'w') as f:
f.write(contents)
finally:
if umask:
os.umask(saved_umask)
if __name__ == '__main__':
write_to_file('/home/kong/tmp', 'test', 31)
# Then you will see a file is created with permission 640.
# Warning: If the file already exists, its permission will not be changed.
# Note:For file, default all permission is 666, and 777 for directory.
多進程并發(fā)執(zhí)行
import multiprocessing
import time
import os
def run(flag):
print "flag: %s, sleep 2s in run" % flag
time.sleep(2)
print "%s exist" % flag
return flag
if __name__ == '__main__':
pool = multiprocessing.Pool(3)
iter_result = pool.imap(run, xrange(6))
print "sleep 5s\n\n"
time.sleep(5)
for i in range(6):
try:
result = iter_result.next(600)
except multiprocessing.TimeoutError as e:
raise
print result
pool.close()
pool.join()
運行時自動填充函數參數
import decorator
def default_from_global(arg_name, env_name):
def default_from_global(f, *args, **kwargs):
id_arg_index = f.func_code.co_varnames.index(arg_name)
args = list(args)
if args[id_arg_index] is None:
args[id_arg_index] = get_global(env_name)
if not args[id_arg_index]:
print("Missing argument: --%(arg_name)s" % {"arg_name": arg_name})
return(1)
return f(*args, **kwargs)
return decorator.decorator(default_from_global)
# 如下是一個裝飾器,可以用在需要自動填充參數的函數上。功能是:
# 如果沒有傳遞函數的deploy_id參數,那么就從環(huán)境變量中獲?。ㄕ{用自定義的get_global函數)
with_default_deploy_id = default_from_global('deploy_id', ENV_DEPLOYMENT)
嵌套裝飾器
validator函數裝飾func1,func1使用時接收參數(*arg, **kwargs),而func1又裝飾func2(其實就是Rally中的scenario函數),給func2增加validators屬性,是一個函數的列表,函數的接收參數config, clients, task。這些函數最終調用func1,傳入參數(config, clients, task, *args, **kwargs),所以func1定義時參數是(config, clients, task, *arg, **kwargs)
最終實現的效果是,func2有很多裝飾器,每個都會接收自己的參數,做一些校驗工作。
def validator(fn):
"""Decorator that constructs a scenario validator from given function.
Decorated function should return ValidationResult on error.
:param fn: function that performs validation
:returns: rally scenario validator
"""
def wrap_given(*args, **kwargs):
"""Dynamic validation decorator for scenario.
:param args: the arguments of the decorator of the benchmark scenario
ex. @my_decorator("arg1"), then args = ('arg1',)
:param kwargs: the keyword arguments of the decorator of the scenario
ex. @my_decorator(kwarg1="kwarg1"), then kwargs = {"kwarg1": "kwarg1"}
"""
def wrap_validator(config, clients, task):
return (fn(config, clients, task, *args, **kwargs) or
ValidationResult())
def wrap_scenario(scenario):
wrap_validator.permission = getattr(fn, "permission",
consts.EndpointPermission.USER)
if not hasattr(scenario, "validators"):
scenario.validators = []
scenario.validators.append(wrap_validator)
return scenario
return wrap_scenario
return wrap_given
inspect庫的一些常見用法
inspect.getargspec(func) 獲取函數參數的名稱和默認值,返回一個四元組(args, varargs, keywords, defaults),其中:
args是參數名稱的列表;
varargs和keywords是*號和**號的變量名稱;
defaults是參數默認值的列表;
inspect.getcallargs(func[, *args][, **kwds]) 綁定函數參數。返回綁定后函數的入參字典。
python中的私有屬性和函數
Python把以兩個或以上下劃線字符開頭且沒有以兩個或以上下劃線結尾的變量當作私有變量。私有變量會在代碼生成之前被轉換為長格式(變?yōu)楣校?,這個過程叫"Private name mangling",如類A里的__private標識符將被轉換為_A__private,但當類名全部以下劃線命名的時候,Python就不再執(zhí)行軋壓。而且,雖然叫私有變量,仍然有可能被訪問或修改(使用_classname__membername),所以, 總結如下:
無論是單下劃線還是雙下劃線開頭的成員,都是希望外部程序開發(fā)者不要直接使用這些成員變量和這些成員函數,只是雙下劃線從語法上能夠更直接的避免錯誤的使用,但是如果按照_類名__成員名則依然可以訪問到。單下劃線的在動態(tài)調試時可能會方便一些,只要項目組的人都遵守下劃線開頭的成員不直接使用,那使用單下劃線或許會更好。
相關文章
Python實戰(zhàn)實現爬取天氣數據并完成可視化分析詳解
這篇文章主要和大家分享一個用Python實現的小功能:獲取天氣數據,進行可視化分析,帶你直觀了解天氣情況!感興趣的小伙伴可以學習一下2022-06-06