Python調用Redis的示例代碼
更新時間:2020年11月24日 11:21:10 作者:LeoZhanggg
這篇文章主要介紹了Python調用Redis的示例代碼,幫助大家更好的理解和學習python,感興趣的朋友可以了解下
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time : 2019/8/12
# @Author : Zhang Fan
# @Desc : Library
# @File : MyRedis.py
# @Update : 2019/8/23
# *************************************
import redis
class MyRedis(object):
"""
===================================================================
===================== MyRedis ========================
===================================================================
"""
def __init__(self):
self.redis_conn = None
self.redis_db = None
def connect_to_redis(self, redis_host, redis_port=6379, db=0, password=None):
"""
連接到Redis服務器
"""
self.redis_db = db
print('Executing : Connect To Redis | host={0}, port={1}, db={2}, password={3}'
.format(redis_host, redis_port, self.redis_db, password))
try:
self.redis_conn = redis.StrictRedis(
host=redis_host, port=redis_port, db=self.redis_db, password=password)
except Exception as ex:
logger.error(str(ex))
raise Exception(str(ex))
def redis_key_should_be_exist(self, name):
"""
驗證redis存在指定鍵
"""
if not self.redis_conn.exists(name):
logger.error(("Redis of db%s doesn't exist in key [ %s ]." % (self.redis_db, name)))
raise AssertionError
def redis_key_should_not_be_exist(self, name):
"""
驗證redis不存在指定鍵
"""
if self.redis_conn.exists(name):
logger.error(("Redis of db%s exist in key [ %s ]." % (self.redis_db, name)))
raise AssertionError
def getkeys_from_redis_bypattern(self, pattern, field=None):
"""
獲取redis所有鍵值
"""
keys_list = list()
print('Executing : Getall Key | %s' % pattern)
if field is None:
return self.redis_conn.keys(pattern)
else:
keys = self.redis_conn.keys(pattern)
for key in keys:
if not self.redis_conn.hget(key, field) is None:
keys_list.append(key)
return keys_list
# ========================== String Type =============================
def get_from_redis(self, name):
"""
獲取redis數據
"""
print('Executing : Get Key | %s' % name)
return self.redis_conn.get(name)
def del_from_redis(self, name):
"""
刪除redis中的任意數據類型
"""
return self.redis_conn.delete(name)
def set_to_redis(self, name, data, expire_time=0):
"""
設置redis執(zhí)行key的值
"""
return self.redis_conn.set(name, data, expire_time)
def append_to_redis(self, name, value):
"""
添加數據到redis
"""
return self.redis_conn.append(name, value)
# ========================== Hash Type ==========================
def hgetall_from_redis(self, name):
"""
獲取redis hash所有數據
"""
print('Executing : Hgetall Key | %s' % name)
return self.redis_conn.hgetall(name)
def hget_from_redis(self, name, key):
"""
獲取redis hash指定key數據
"""
print('Executing : Hget Key | %s' % name)
return self.redis_conn.hget(name, key)
def hset_to_redis(self, name, key, data):
"""
設置redis指定key的值
"""
print(('Executing : Hset Redis | name={0}, key={1}, data={2}'
.format(name, key, data)))
return self.redis_conn.hset(name, key, data)
def hdel_to_redis(self, name, *keys):
"""
刪除redis指定key的值
"""
print('Executing : Hdel Key | ', *keys)
self.redis_conn.hdel(name, *keys)
# ========================= ZSet Type ================================
def get_from_redis_zscore(self, name, values):
"""
獲取name對應有序集合中 value 對應的分數
"""
try:
return int(self.redis_conn.zscore(name, values))
except:
return self.redis_conn.zscore(name, values)
def get_from_redis_zrange(self, name, start=0, end=10):
"""
按照索引范圍獲取name對應的有序集合的元素
"""
return self.redis_conn.zrange(name, start, end, desc=False, withscores=True, score_cast_func=int)
def del_from_redis_zrem(self, name, values):
"""
刪除name對應的有序集合中值是values的成員
"""
return self.redis_conn.zrem(name, values)
def add_from_redis_zadd(self, name, value, score):
"""
在name對應的有序集合中添加一條。若值存在,則修改對應分數。
"""
return self.redis_conn.zadd(name, {value: score})
def count_from_redis_zcard(self, name):
"""
獲取name對應的有序集合元素的數量
"""
return self.redis_conn.zcard(name)
if __name__ == '__main__':
print('This is test.')
mr = MyRedis()
以上就是Python調用Redis的示例代碼的詳細內容,更多關于Python調用Redis的資料請關注腳本之家其它相關文章!

