PowerJob的GridFsManager工作流程源碼解讀
序
本文主要研究一下PowerJob的GridFsManager
GridFsManager
tech/powerjob/server/persistence/mongodb/GridFsManager.java
@Slf4j
@Service
public class GridFsManager implements InitializingBean {
private final Environment environment;
private final MongoDatabase db;
private boolean available;
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) {
this.environment = environment;
if (mongoTemplate != null) {
this.db = mongoTemplate.getDb();
} else {
this.db = null;
}
}
/**
* 是否可用
* @return true:可用;false:不可用
*/
public boolean available() {
return available;
}
//......
private GridFSBucket getBucket(String bucketName) {
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
}
@Override
public void afterPropertiesSet() throws Exception {
String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString());
available = Boolean.TRUE.toString().equals(enable) && db != null;
log.info("[GridFsManager] available: {}, db: {}", available, db);
}
}GridFsManager實現(xiàn)了InitializingBean接口,其afterPropertiesSet從environment讀取oms.mongodb.enable配置,默認(rèn)為false;其構(gòu)造器注入mongoTemplate,若為null則available為false;其getBucket方法則根據(jù)bucketName緩存到bucketCache,若不存在則通過GridFSBuckets.create創(chuàng)建
store
/**
* 存儲文件到 GridFS
* @param localFile 本地文件
* @param bucketName 桶名稱
* @param fileName GirdFS中的文件名稱
* @throws IOException 異常
*/
public void store(File localFile, String bucketName, String fileName) throws IOException {
if (available()) {
GridFSBucket bucket = getBucket(bucketName);
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(localFile))) {
bucket.uploadFromStream(fileName, bis);
}
}
}store方法先獲取bucket,再讀取localFile,通過bucket.uploadFromStream上傳
download
/**
* 從 GridFS 下載文件
* @param targetFile 下載的目標(biāo)文件(本地文件)
* @param bucketName 桶名稱
* @param fileName GirdFS中的文件名稱
* @throws IOException 異常
*/
public void download(File targetFile, String bucketName, String fileName) throws IOException {
if (available()) {
GridFSBucket bucket = getBucket(bucketName);
try (GridFSDownloadStream gis = bucket.openDownloadStream(fileName);
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))
) {
byte[] buffer = new byte[1024];
int bytes = 0;
while ((bytes = gis.read(buffer)) != -1) {
bos.write(buffer, 0, bytes);
}
bos.flush();
}
}
}download方法則先獲取bucket,再通過bucket.openDownloadStream獲取GridFSDownloadStream,最后寫到targetFile
deleteBefore
/**
* 刪除幾天前的文件
* @param bucketName 桶名稱
* @param day 日期偏移量,單位 天
*/
public void deleteBefore(String bucketName, int day) {
Stopwatch sw = Stopwatch.createStarted();
Date date = DateUtils.addDays(new Date(), -day);
GridFSBucket bucket = getBucket(bucketName);
Bson filter = Filters.lt("uploadDate", date);
// 循環(huán)刪除性能很差?我猜你肯定沒看過官方實現(xiàn)[狗頭]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete
bucket.find(filter).forEach((Consumer<GridFSFile>) gridFSFile -> {
ObjectId objectId = gridFSFile.getObjectId();
try {
bucket.delete(objectId);
log.info("[GridFsManager] deleted {}#{}", bucketName, objectId);
}catch (Exception e) {
log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e);
}
});
log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
}deleteBefore主要通過bucket.find(Filters.lt("uploadDate", date))找到GridFSFile,再挨個執(zhí)行delete
exists
public boolean exists(String bucketName, String fileName) {
GridFSBucket bucket = getBucket(bucketName);
GridFSFindIterable files = bucket.find(Filters.eq("filename", fileName));
try {
GridFSFile first = files.first();
return first != null;
}catch (Exception ignore) {
}
return false;
}exists方法則通過bucket.find(Filters.eq("filename", fileName))來進(jìn)行查找
sync
tech/powerjob/server/core/instance/InstanceLogService.java
@Async(PJThreadPool.BACKGROUND_POOL)
public void sync(Long instanceId) {
Stopwatch sw = Stopwatch.createStarted();
try {
// 先持久化到本地文件
File stableLogFile = genStableLogFile(instanceId);
// 將文件推送到 MongoDB
if (gridFsManager.available()) {
try {
gridFsManager.store(stableLogFile, GridFsManager.LOG_BUCKET, genMongoFileName(instanceId));
log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop());
}catch (Exception e) {
log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e);
}
}
}catch (Exception e) {
log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e);
}
// 刪除本地數(shù)據(jù)庫數(shù)據(jù)
try {
instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
}catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
}
}InstanceLogService的sync方法先持久化到本地文件,再將文件推送到 MongoDB
小結(jié)
GridFsManager實現(xiàn)了InitializingBean接口,其afterPropertiesSet從environment讀取oms.mongodb.enable配置,默認(rèn)為false;其構(gòu)造器注入mongoTemplate,若為null則available為false;其store和download方法都先判斷是否available,為false則空操作。
以上就是PowerJob的GridFsManager工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob GridFsManager的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java泛型之協(xié)變與逆變及extends與super選擇
這篇文章主要介紹了Java泛型之協(xié)變與逆變及extends與super選擇,文章圍繞主題內(nèi)容展開詳細(xì)內(nèi)容介紹,需要的小伙伴可以參考一下2022-05-05
java利用easyexcel實現(xiàn)導(dǎo)入與導(dǎo)出功能
這篇文章主要介紹了java利用easyexcel實現(xiàn)導(dǎo)入與導(dǎo)出功能,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下,希望對你的學(xué)習(xí)有所幫助2022-09-09
java?HttpURLConnection類的disconnect方法與http長連接詳解
這篇文章主要介紹了java?HttpURLConnection類的disconnect方法與http長連接,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04
解決程序包org.springframework.test.context不存在
這篇文章主要介紹了解決程序包org.springframework.test.context不存在的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
java如何在項目中實現(xiàn)excel導(dǎo)入導(dǎo)出功能
這篇文章主要介紹了java如何在項目中實現(xiàn)excel導(dǎo)入導(dǎo)出功能的相關(guān)資料,EasyExcel是一個基于Apache?POI開發(fā)的開源Java庫,用于簡化Excel文件的讀寫操作,文中將用法介紹的非常詳細(xì),需要的朋友可以參考下2024-10-10

