亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Node.js + Redis Sorted Set實(shí)現(xiàn)任務(wù)隊(duì)列

 更新時(shí)間:2016年09月19日 08:39:54   投稿:hebedich  
本文給大家分享的是使用Node.js + Redis Sorted Set實(shí)現(xiàn)任務(wù)隊(duì)列的方法示例,非常的實(shí)用,有需要的小伙伴可以參考下

需求:功能 A 需要調(diào)用第三方 API 獲取數(shù)據(jù),而第三方 API 自身是異步處理方式,在調(diào)用后會(huì)返回?cái)?shù)據(jù)與狀態(tài) { data: "查詢結(jié)果", "status": "正在異步處理中" } ,這樣就需要間隔一段時(shí)間后再去調(diào)用第三方 API 獲取數(shù)據(jù)。為了用戶在使用功能 A 時(shí)不會(huì)因?yàn)榈谌?API 正在異步處理中而必須等待,將用戶請(qǐng)求加入任務(wù)隊(duì)列中,返回部分?jǐn)?shù)據(jù)并關(guān)閉請(qǐng)求。然后定時(shí)從任務(wù)隊(duì)列里中取出任務(wù)調(diào)用第三方 API,若返回狀態(tài)為”異步處理中“,將該任務(wù)再次加入任務(wù)隊(duì)列,若返回狀態(tài)為”已處理完畢“,將返回?cái)?shù)據(jù)入庫(kù)。

根據(jù)以上問(wèn)題,想到使用 Node.js + Redis sorted set 來(lái)實(shí)現(xiàn)任務(wù)隊(duì)列。Node.js 實(shí)現(xiàn)自身應(yīng)用 API 用來(lái)接受用戶請(qǐng)求,合并數(shù)據(jù)庫(kù)已存數(shù)據(jù)與 API 返回的部分?jǐn)?shù)據(jù)返回給用戶,并將任務(wù)加入到任務(wù)隊(duì)列中。利用 Node.js child process 與 cron 定時(shí)從任務(wù)隊(duì)列中取出任務(wù)執(zhí)行。

在設(shè)計(jì)任務(wù)隊(duì)列的過(guò)程中需要考慮到的幾個(gè)問(wèn)題

  • 并行執(zhí)行多個(gè)任務(wù)
  • 任務(wù)唯一性
  • 任務(wù)成功或失敗后的處理

針對(duì)以上問(wèn)題的解決方案

  • 并行執(zhí)行多個(gè)任務(wù)利用 Promise.all 來(lái)實(shí)現(xiàn)
  • 任務(wù)唯一性利用 Redis sorted set 來(lái)實(shí)現(xiàn)。使用時(shí)間戳作為分值可以實(shí)現(xiàn)將 sorted set 作為 list 來(lái)使用,在加入任務(wù)時(shí)判斷任務(wù)是否已經(jīng)存在,在取出任務(wù)執(zhí)行時(shí)將該任務(wù)分值設(shè)置為 0,每次取出分值大于 0 的任務(wù)來(lái)執(zhí)行,可以避免重復(fù)執(zhí)行任務(wù)。
  • 執(zhí)行任務(wù)成功后刪除任務(wù),執(zhí)行任務(wù)失敗后將任務(wù)分值更新為當(dāng)前時(shí)間時(shí)間戳,這樣就可以將失敗的任務(wù)重新加入任務(wù)隊(duì)列尾部

示例代碼

// remote_api.js 模擬第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
  setTimeout(() => {
    let arr = [200, 300]; // 200 代表成功,300 代表失敗需要重新請(qǐng)求
    res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
  }, 3000);
});

app.listen('9001', () => {
  console.log('API 服務(wù)監(jiān)聽(tīng)端口:9001');
});
// producer.js 自身應(yīng)用 API,用來(lái)接受用戶請(qǐng)求并將任務(wù)加入任務(wù)隊(duì)列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
  // 先判斷任務(wù)是否已經(jīng)存在,存在:跳過(guò),不存在:加入任務(wù)隊(duì)列
  redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
    if (error) {
      console.log(error);
    } else {
      if (task) {
        console.log('任務(wù)已存在,不新增相同任務(wù)');
        callback(null, task);
      } else {
        redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, result);
          }
        });
      }
    }
  });
}

app.get('/', (req, res) => {
  let taskName = req.query['task-name'];
  addTaskToQueue(taskName, (error, result) => {
    if (error) {
      console.log(error);
    } else {
      res.status(200).send('正在查詢中......');
    }
  });
});

app.listen(9002, () => {
  console.log('生產(chǎn)者服務(wù)監(jiān)聽(tīng)端口:9002');
});
// consumer.js 定時(shí)獲取任務(wù)并執(zhí)行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行執(zhí)行任務(wù)數(shù)量

function getTasksFromQueue(callback) {
  // 獲取多個(gè)任務(wù)
  redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
    if (error) {
      callback(error);
    } else {
      // 將任務(wù)分值設(shè)置為 0,表示正在處理
      if (tasks.length > 0) {
        let tmp = [];
        tasks.forEach((task) => {
          tmp.push(0);
          tmp.push(task);
        });
        redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
          if (error) {
            callback(error);
          } else {
            callback(null, tasks)
          }
        });
      }
    }
  });
}

function addFailedTaskToQueue(taskName, callback) {
  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  });
}

function removeSucceedTaskFromQueue(taskName, callback) {
  redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
    if (error) {
      callback(error);
    } else {
      callback(null, result);
    }
  })
}

function execTask(taskName) {
  return new Promise((resolve, reject) => {
    let requestOptions = {
      'url': 'http://127.0.0.1:9001',
      'method': 'GET',
      'timeout': 5000
    };
    request(requestOptions, (error, response, body) => {
      if (error) {
        resolve('failed');
        console.log(error);
        addFailedTaskToQueue(taskName, (error) => {
          if (error) {
            console.log(error);
          } else {

          }
        });
      } else {
        try {
          body = typeof body !== 'object' ? JSON.parse(body) : body;
        } catch (error) {
          resolve('failed');
          console.log(error);
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
          return;
        }
        if (body.status !== 200) {
          resolve('failed');
          addFailedTaskToQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        } else {
          resolve('succeed');
          removeSucceedTaskFromQueue(taskName, (error, result) => {
            if (error) {
              console.log(error);
            } else {

            }
          });
        }
      }
    });
  });
}

// 定時(shí),每隔 5 秒獲取新的任務(wù)來(lái)執(zhí)行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
  console.log('獲取新任務(wù)');
  getTasksFromQueue((error, tasks) => {
    if (error) {
      console.log(error);
    } else {
      if (tasks.length > 0) {
        console.log(tasks);

        Promise.all(tasks.map(execTask))
        .then((results) => {
          console.log(results);
        })
        .catch((error) => {
          console.log(error);
        });
        
      }
    }
  });
});

相關(guān)文章

  • 一文詳解NPM如何換源

    一文詳解NPM如何換源

    在每一次的實(shí)際開(kāi)發(fā)過(guò)程中我們都會(huì)下載相關(guān)的依賴包,最官方的是 npm,但是該服務(wù)器對(duì)于國(guó)內(nèi)開(kāi)發(fā)者來(lái)說(shuō),下載起來(lái)是比較慢的,所以我們需要換源,下面這篇文章主要給大家介紹了關(guān)于NPM如何換源的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • Nodejs 構(gòu)建Cluster集群多線程Worker threads

    Nodejs 構(gòu)建Cluster集群多線程Worker threads

    這篇文章主要為大家介紹了Nodejs 構(gòu)建Cluster集群多線程Worker threads示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-10-10
  • Nodejs express框架一個(gè)工程中同時(shí)使用ejs模版和jade模版

    Nodejs express框架一個(gè)工程中同時(shí)使用ejs模版和jade模版

    這篇文章主要介紹了Nodejs express框架一個(gè)工程中同時(shí)使用ejs模版和jade模版 的相關(guān)資料,需要的朋友可以參考下
    2015-12-12
  • node如何實(shí)現(xiàn)cmd彈窗交互之inquirer

    node如何實(shí)現(xiàn)cmd彈窗交互之inquirer

    這篇文章主要介紹了node如何實(shí)現(xiàn)cmd彈窗交互之inquirer問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-10-10
  • node刪除、復(fù)制文件或文件夾示例代碼

    node刪除、復(fù)制文件或文件夾示例代碼

    這篇文章主要給大家介紹了關(guān)于node刪除、復(fù)制文件或文件夾的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用node具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • Node.js中Swagger的使用指南詳解

    Node.js中Swagger的使用指南詳解

    Swagger(目前用OpenAPI?Specification代替)是一個(gè)用于設(shè)計(jì)、構(gòu)建、記錄和使用REST?API的強(qiáng)大工具,本文將探討使用Swagger的一些關(guān)鍵技巧,需要的可以參考一下
    2024-01-01
  • nodejs實(shí)現(xiàn)用戶登錄路由功能

    nodejs實(shí)現(xiàn)用戶登錄路由功能

    這篇文章主要介紹了nodejs中實(shí)現(xiàn)用戶登錄路由功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-05-05
  • NodeJS和瀏覽器中this關(guān)鍵字的不同之處

    NodeJS和瀏覽器中this關(guān)鍵字的不同之處

    這篇文章主要給大家介紹了關(guān)于NodeJS和瀏覽器中this關(guān)鍵字不同的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • node.js下when.js 的異步編程實(shí)踐

    node.js下when.js 的異步編程實(shí)踐

    這篇文章主要介紹了node.js下when.js 的異步編程實(shí)踐,需要的朋友可以參考下
    2014-12-12
  • nvm管理node版本的詳細(xì)圖文教程

    nvm管理node版本的詳細(xì)圖文教程

    nvm全英文也叫node.js version management,是一個(gè)nodejs的版本管理工具,下面這篇文章主要給大家介紹了關(guān)于nvm管理node版本的詳細(xì)圖文教程,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2022-12-12

最新評(píng)論