亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

PHP網(wǎng)絡(luò)處理模塊FPM源碼分析

 更新時(shí)間:2023年06月05日 10:38:36   作者:ethread  
這篇文章主要為大家介紹了PHP網(wǎng)絡(luò)處理模塊FPM源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

PHP-FPM源碼分析

一個(gè)請(qǐng)求從瀏覽器到達(dá)PHP腳本執(zhí)行中間有個(gè)必要模塊是網(wǎng)絡(luò)處理模塊,F(xiàn)PM是這個(gè)模塊的一部分,配合fastcgi協(xié)議實(shí)現(xiàn)對(duì)請(qǐng)求的從監(jiān)聽(tīng)到轉(zhuǎn)發(fā)到PHP處理,并將結(jié)果返回這條流程。

FPM采用多進(jìn)程模型,就是創(chuàng)建一個(gè)master進(jìn)程,在master進(jìn)程中創(chuàng)建并監(jiān)聽(tīng)socket,然后fork多個(gè)子進(jìn)程,然后子進(jìn)程各自accept請(qǐng)求,子進(jìn)程在啟動(dòng)后阻塞在accept上,有請(qǐng)求到達(dá)后開(kāi)始讀取請(qǐng)求 數(shù)據(jù),讀取完成后開(kāi)始處理然后再返回,在這期間是不會(huì)接收其它請(qǐng)求的,也就是說(shuō)fpm的子進(jìn)程同時(shí)只能響應(yīng) 一個(gè)請(qǐng)求,只有把這個(gè)請(qǐng)求處理完成后才會(huì)accept下一個(gè)請(qǐng)求,這是一種同步阻塞的模型。master進(jìn)程負(fù)責(zé)管理子進(jìn)程,監(jiān)聽(tīng)子進(jìn)程的狀態(tài),控制子進(jìn)程的數(shù)量。master進(jìn)程與worker進(jìn)程之間通過(guò)共享變量同步信息。

從main函數(shù)開(kāi)始

int main(int argc, char *argv[])
{
    zend_signal_startup();
    // 將全局變量sapi_module設(shè)置為cgi_sapi_module
    sapi_startup(&cgi_sapi_module);
    fcgi_init();
    // 獲取命令行參數(shù),其中php-fpm -D、-i等參數(shù)都是在這里被解析出來(lái)的
    // ...
    cgi_sapi_module.startup(&cgi_sapi_module);
    fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix, fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon, force_stderr);
    // master進(jìn)程會(huì)在這一步死循環(huán),后面的流程都是子進(jìn)程在執(zhí)行。
    fcgi_fd = fpm_run(&max_requests);
    fcgi_fd = fpm_run(&max_requests);
    request = fpm_init_request(fcgi_fd);
    // accept請(qǐng)求
    // ....
}

main()函數(shù)展現(xiàn)了這個(gè)fpm運(yùn)行完整的框架,可見(jiàn)整個(gè)fpm主要分為三個(gè)部分:

  • 1、運(yùn)行前的fpm_init();
  • 2、運(yùn)行函數(shù)fpm_run();
  • 3、子進(jìn)程accept請(qǐng)求處理。

FPM中的事件監(jiān)聽(tīng)機(jī)制

在詳細(xì)了解fpm工作過(guò)程前,我們要先了解fpm中的事件機(jī)制。在fpm中事件的監(jiān)聽(tīng)默認(rèn)使用kqueue來(lái)實(shí)現(xiàn),關(guān)于kqueue的介紹可以看看我之前整理的這篇文章kqueue用法簡(jiǎn)介。

// fpm中的事件結(jié)構(gòu)體
struct fpm_event_s {
    // 事件的句柄
    int fd;
    // 下一次觸發(fā)的事件
    struct timeval timeout;
    // 頻率:多久執(zhí)行一次
    struct timeval frequency;
    // 事件觸發(fā)時(shí)調(diào)用的函數(shù)
    void (*callback)(struct fpm_event_s *, short, void *);
    void *arg;                // 調(diào)用callback時(shí)的參數(shù)
    // FPM_EV_READ:讀;FPM_EV_TIMEOUT:;FPM_EV_PERSIST:;FPM_EV_EDGE:;
    int flags;
    int index;                // 在fd句柄數(shù)組中的索引
    // 事件的類(lèi)型 FPM_EV_READ:讀;FPM_EV_TIMEOUT:計(jì)時(shí)器;FPM_EV_PERSIST:;FPM_EV_EDGE:;
    short which;
};
// 事件隊(duì)列
typedef struct fpm_event_queue_s {
    struct fpm_event_queue_s *prev;
    struct fpm_event_queue_s *next;
    struct fpm_event_s *ev;
} fpm_event_queue;

以fpm_run()中master進(jìn)程注冊(cè)的一個(gè)sp[0]的可讀事件為例:

void fpm_event_loop(int err)
{
    static struct fpm_event_s signal_fd_event;
    // 創(chuàng)建一個(gè)事件:管道sp[0]可讀時(shí)觸發(fā)
    fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
    // 將事件添加進(jìn)queue
    fpm_event_add(&signal_fd_event, 0);
    // 處理定時(shí)器等邏輯
    // 以阻塞的方式獲取事件
    // module->wait()是一個(gè)接口定義的方法簽名,下面展示kqueue的實(shí)現(xiàn)
    ret = module->wait(fpm_event_queue_fd, timeout);
}
int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency)
{
    // ...
    // 如果事件是觸發(fā)事件則之間添加進(jìn)queue中
    // 對(duì)于定時(shí)器事件先根據(jù)事件的frequency設(shè)置事件的觸發(fā)頻率和下一次觸發(fā)的事件
    if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) {
        return -1;
    }
    return 0;
}
static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev)
{
    // ...
    // 構(gòu)建并將當(dāng)前事件插入事件隊(duì)列queue中
    if (*queue == fpm_event_queue_fd && module->add) {
        // module->add(ev)是一個(gè)接口定義的方法簽名,下面展示kqueue的實(shí)現(xiàn)
        module->add(ev);
    }
    return 0;
}
// kqueue關(guān)于添加事件到kqueue的實(shí)現(xiàn)
static int fpm_event_kqueue_add(struct fpm_event_s *ev) /* {{{ */
{
    struct kevent k;
    int flags = EV_ADD;
    if (ev->flags & FPM_EV_EDGE) {
            flags = flags | EV_CLEAR;
    }
    EV_SET(&k, ev->fd, EVFILT_READ, flags, 0, 0, (void *)ev);
    if (kevent(kfd, &k, 1, NULL, 0, NULL) < 0) {
        zlog(ZLOG_ERROR, "kevent: unable to add event");
        return -1;
    }
    /* mark the event as registered */
    ev->index = ev->fd;
    return 0;
}

FPM中關(guān)于kqueue的實(shí)現(xiàn)

// kqueue關(guān)于從kqueue中監(jiān)聽(tīng)事件的實(shí)現(xiàn)
static int fpm_event_kqueue_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */
{
    struct timespec t;
    int ret, i;
    /* ensure we have a clean kevents before calling kevent() */
    memset(kevents, 0, sizeof(struct kevent) * nkevents);
    /* convert ms to timespec struct */
    t.tv_sec = timeout / 1000;
    t.tv_nsec = (timeout % 1000) * 1000 * 1000;
    /* wait for incoming event or timeout */
    ret = kevent(kfd, NULL, 0, kevents, nkevents, &t);
    if (ret == -1) {
        /* trigger error unless signal interrupt */
        if (errno != EINTR) {
            zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno);
            return -1;
        }
    }
    /* fire triggered events */
    for (i = 0; i < ret; i++) {
        if (kevents[i].udata) {
            struct fpm_event_s *ev = (struct fpm_event_s *)kevents[i].udata;
            fpm_event_fire(ev);
            /* sanity check */
            if (fpm_globals.parent_pid != getpid()) {
                return -2;
            }
        }
    }
    return ret;
}

fpm_init

fpm_init()負(fù)責(zé)啟動(dòng)前的初始化工作,包括注冊(cè)各個(gè)模塊的銷(xiāo)毀時(shí)用于清理變量的callback。下面只介紹幾個(gè)重要的init。

fpm_conf_init_main

負(fù)責(zé)解析php-fpm.conf配置文件,分配worker pool內(nèi)存結(jié)構(gòu)并保存到全局變量fpm_worker_all_pools中,各worker pool配置解析到 fpm_worker_pool_s->config 中。

所謂worker pool 是fpm可以同時(shí)監(jiān)聽(tīng)多個(gè)端口,每個(gè)端口對(duì)應(yīng)一個(gè)worker pool。

fpm_scoreboard_init_main

為每個(gè)worker pool分配一個(gè)fpm_scoreboard_s結(jié)構(gòu)的內(nèi)存空間scoreboard,用于記錄worker進(jìn)程運(yùn)行信息。

// fpm_scoreboard_s 結(jié)構(gòu)
struct fpm_scoreboard_s {
    union {
        atomic_t lock;
        char dummy[16];
    };
    char pool[32];
    int pm;                    // 進(jìn)程的管理方式 static、dynamic、ondemand
    time_t start_epoch;
    int idle;                // 空閑的worker進(jìn)程數(shù)
    int active;                // 繁忙的worker進(jìn)程數(shù)
    int active_max;            // 最大繁忙進(jìn)程數(shù)
    unsigned long int requests;
    unsigned int max_children_reached;
    int lq;
    int lq_max;
    unsigned int lq_len;
    unsigned int nprocs;
    int free_proc;
    unsigned long int slow_rq;
    struct fpm_scoreboard_proc_s *procs[];
};

fpm_signals_init_main

fpm注冊(cè)自己的信號(hào)量,并設(shè)置監(jiān)聽(tīng)函數(shù)的處理邏輯。

int fpm_signals_init_main() /* {{{ */
{
    struct sigaction act;
    // 創(chuàng)建一個(gè)全雙工套接字
    // 全雙工的套接字是一個(gè)可以讀、寫(xiě)的socket通道[0]和[1],每個(gè)進(jìn)程固定一個(gè)管道。
    // 寫(xiě)數(shù)據(jù)時(shí):管道不滿不會(huì)被阻塞;讀數(shù)據(jù)時(shí):管道里沒(méi)有數(shù)據(jù)會(huì)阻塞(可設(shè)置)
    // 向sp[0]寫(xiě)入數(shù)據(jù)時(shí),sp[0]的讀取將會(huì)被阻塞,sp[1]的寫(xiě)管道會(huì)被阻塞,sp[1]中此時(shí)讀取sp[0]的數(shù)據(jù)
    if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {
        zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()");
        return -1;
    }
    if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {
        zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()");
        return -1;
    }
    if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {
        zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)");
        return -1;
    }
    memset(&act, 0, sizeof(act));
    act.sa_handler = sig_handler;       // 監(jiān)聽(tīng)到信號(hào)調(diào)用這個(gè)函數(shù)
    sigfillset(&act.sa_mask);
    if (0 > sigaction(SIGTERM,  &act, 0) ||
        0 > sigaction(SIGINT,   &act, 0) ||
        0 > sigaction(SIGUSR1,  &act, 0) ||
        0 > sigaction(SIGUSR2,  &act, 0) ||
        0 > sigaction(SIGCHLD,  &act, 0) ||
        0 > sigaction(SIGQUIT,  &act, 0)) {
        zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()");
        return -1;
    }
    return 0;
}
// 所有信號(hào)共用同一個(gè)處理函數(shù)
static void sig_handler(int signo) /* {{{ */
{
    static const char sig_chars[NSIG + 1] = {
        [SIGTERM] = 'T',
        [SIGINT]  = 'I',
        [SIGUSR1] = '1',
        [SIGUSR2] = '2',
        [SIGQUIT] = 'Q',
        [SIGCHLD] = 'C'
    };
    char s;
    int saved_errno;
    if (fpm_globals.parent_pid != getpid()) {
        return;
    }
    saved_errno = errno;
    s = sig_chars[signo];
    zend_quiet_write(sp[1], &s, sizeof(s)); // 將信息對(duì)應(yīng)的字節(jié)寫(xiě)進(jìn)管道sp[1]端,此時(shí)sp[1]端的讀數(shù)據(jù)會(huì)阻塞;數(shù)據(jù)可以從sp[0]端讀取
    errno = saved_errno;
}

fpm_sockets_init_main

每個(gè)worker pool 開(kāi)啟一個(gè)socket套接字。

fpm_event_init_main

這里啟動(dòng)master的事件管理器。用于管理IO、定時(shí)事件,其中IO事件通過(guò)kqueue、epoll、 poll、select等管理,定時(shí)事件就是定時(shí)器,一定時(shí)間后觸發(fā)某個(gè)事件。同樣,我們以kqueue的實(shí)現(xiàn)為例看下源碼。

int fpm_event_init_main()
{
    // ...
    if (module->init(max) < 0) {
        zlog(ZLOG_ERROR, "Unable to initialize the event module %s", module->name);
        return -1;
    }
    // ...
}
// max用于指定kqueue事件數(shù)組的大小
static int fpm_event_kqueue_init(int max) /* {{{ */
{
    if (max < 1) {
        return 0;
    }
    kfd = kqueue();
    if (kfd < 0) {
        zlog(ZLOG_ERROR, "kqueue: unable to initialize");
        return -1;
    }
    kevents = malloc(sizeof(struct kevent) * max);
    if (!kevents) {
        zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max);
        return -1;
    }
    memset(kevents, 0, sizeof(struct kevent) * max);
    nkevents = max;
    return 0;
}

fpm_run

fpm_init到此結(jié)束,下面進(jìn)入fpm_run階段,在這個(gè)階段master進(jìn)程會(huì)根據(jù)配置fork出多個(gè)子進(jìn)程然后master進(jìn)程會(huì)進(jìn)入fpm_event_loop(0)函數(shù),并在這個(gè)函數(shù)內(nèi)部死循環(huán),也就是說(shuō)master進(jìn)程將不再執(zhí)行后面的代碼,后面的邏輯全部是子進(jìn)程執(zhí)行的操作。

master進(jìn)程在fpm_event_loop里通過(guò)管道sp來(lái)監(jiān)聽(tīng)子進(jìn)程的各個(gè)事件,同時(shí)也要處理自身產(chǎn)生的一些事件、定時(shí)器等任務(wù),來(lái)響應(yīng)的管理子進(jìn)程。內(nèi)部的邏輯在介紹事件監(jiān)聽(tīng)機(jī)制時(shí)已經(jīng)詳細(xì)說(shuō)過(guò)。

int fpm_run(int *max_requests) /* {{{ */
{
    struct fpm_worker_pool_s *wp;
    /* create initial children in all pools */
    for (wp = fpm_worker_all_pools; wp; wp = wp->next) {
        int is_parent;
        is_parent = fpm_children_create_initial(wp);
        if (!is_parent) {
            goto run_child;
        }
    }
    /* run event loop forever */
    fpm_event_loop(0);
run_child: /* only workers reach this point */
    fpm_cleanups_run(FPM_CLEANUP_CHILD);
    *max_requests = fpm_globals.max_requests;
    return fpm_globals.listening_socket;
}

子進(jìn)程處理請(qǐng)求

回到main函數(shù),fpm_run后面的邏輯都是子進(jìn)程在運(yùn)行。首先會(huì)初始化一個(gè)fpm的request結(jié)構(gòu)的變量,然后子進(jìn)程會(huì)阻塞在fcgi_accept_request(request)函數(shù)上等待請(qǐng)求。關(guān)于fcgi_accept_request函數(shù)就是死循環(huán)一個(gè)socket編程的accept函數(shù)來(lái)接收請(qǐng)求,并將請(qǐng)求數(shù)據(jù)全部取出。

...
// 初始化request
request = fpm_init_request(fcgi_fd);
zend_first_try {
    // accept接收請(qǐng)求
    while (EXPECTED(fcgi_accept_request(request) >= 0)) {
        init_request_info();
        fpm_request_info();
        if (UNEXPECTED(php_request_startup() == FAILURE)) {
            // ...
        }
        if (UNEXPECTED(fpm_status_handle_request())) {
            goto fastcgi_request_done;
        }
        ...
        // 打開(kāi)配置文件中DOCUMENT_ROOT設(shè)置的腳本
        if (UNEXPECTED(php_fopen_primary_script(&file_handle) == FAILURE)) {
            ...
        }
        fpm_request_executing();
        // 執(zhí)行腳本
        php_execute_script(&file_handle);
        ...
    }
    // 銷(xiāo)毀請(qǐng)求request
    fcgi_destroy_request(request);
    // fcgi退出
    fcgi_shutdown();
    if (cgi_sapi_module.php_ini_path_override) {
        free(cgi_sapi_module.php_ini_path_override);
    }
    if (cgi_sapi_module.ini_entries) {
        free(cgi_sapi_module.ini_entries);
    }
} zend_catch {
    ...
} zend_end_try();

以上就是PHP網(wǎng)絡(luò)處理模塊FPM源碼分析的詳細(xì)內(nèi)容,更多關(guān)于PHP網(wǎng)絡(luò)處理模塊FPM的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論