亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Apache Airflow 快速入門教程應(yīng)用場景分析

 更新時間:2024年12月03日 12:07:18   作者:夢想畫家  
ApacheAirflow是一個用于編排、調(diào)度和監(jiān)控工作流的開源平臺,適用于ETL和MLOps用例,它通過有向無環(huán)圖(DAG)定義管道,支持任務(wù)依賴關(guān)系、調(diào)度、錯誤處理和日志記錄,本文介紹Apache Airflow 快速入門教程,感興趣的朋友一起看看吧

Apache Airflow已經(jīng)成為Python生態(tài)系統(tǒng)中管道編排的事實(shí)上的庫。與類似的解決方案相反,由于它的簡單性和可擴(kuò)展性,它已經(jīng)獲得了普及。在本文中,我將嘗試概述它的主要概念,并讓您清楚地了解何時以及如何使用它。

Airflow應(yīng)用場景

想象一下,你想要構(gòu)建一個機(jī)器學(xué)習(xí)管道,它由以下幾個步驟組成:

  • 從基于云的存儲中讀取圖像數(shù)據(jù)集
  • 處理圖像
  • 使用下載的圖像訓(xùn)練深度學(xué)習(xí)模型
  • 將訓(xùn)練好的模型上傳到云端
  • 部署模型

你將如何安排和自動化這個工作流程?Cron作業(yè)是一個簡單的解決方案,但它也帶來了許多問題。最重要的是,它們不允許你有效地擴(kuò)展。Airflow提供了輕松調(diào)度和擴(kuò)展復(fù)雜數(shù)據(jù)流程編排的能力,另一方面,它還能夠在故障后自動重新運(yùn)行它們,管理它們的依賴關(guān)系,并使用日志和儀表板監(jiān)視它們。

在構(gòu)建上述數(shù)據(jù)流之前,讓我們先了解Apache Airflow 的基本概念。

Airflow 簡介

Apache Airflow 是一個開源的平臺,用于編排、調(diào)度和監(jiān)控工作流,工作流是由一系列任務(wù)(Tasks)組成的,這些任務(wù)可以是數(shù)據(jù)處理、數(shù)據(jù)分析、機(jī)器學(xué)習(xí)模型訓(xùn)練、文件傳輸?shù)雀鞣N操作。因此,它是ETL和MLOps用例的理想解決方案。示例用例包括:

  • 從多個數(shù)據(jù)源提取數(shù)據(jù),對其進(jìn)行聚合、轉(zhuǎn)換,并將其存儲在數(shù)據(jù)倉庫中。
  • 從數(shù)據(jù)中提取見解并將其顯示在分析儀表板中
  • 訓(xùn)練、驗(yàn)證和部署機(jī)器學(xué)習(xí)模型

核心組件

在默認(rèn)版本中安裝Apache Airflow 時,你將看到四個不同的組件。

  • Webserver: Webserver是Airflow的用戶界面(UI),它允許您在不需要CLI或API的情況下與之交互。從那里可以執(zhí)行和監(jiān)視管道,創(chuàng)建與外部系統(tǒng)的連接,檢查它們的數(shù)據(jù)集等等。
  • 執(zhí)行器:執(zhí)行器是管道運(yùn)行的機(jī)制。有許多不同類型的管道在本地運(yùn)行,在單個機(jī)器中運(yùn)行,或者以分布式方式運(yùn)行。一些例子是LocalExecutor, SequentialExecutor, CeleryExecutor和KubernetesExecutor
  • 調(diào)度器:調(diào)度器負(fù)責(zé)在正確的時間執(zhí)行不同的任務(wù),重新運(yùn)行管道,回填數(shù)據(jù),確保任務(wù)完成等。
  • PostgreSQL:存儲所有管道元數(shù)據(jù)的數(shù)據(jù)庫。這通常是Postgres,但也支持其他SQL數(shù)據(jù)庫。

安裝Airflow最簡單的方法是使用docker compose。你可以從這里下載官方的docker撰寫文件:

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

基本概念

要學(xué)習(xí)Apache Airflow,必須熟悉它的主要概念,這些概念可能有點(diǎn)難理解,讓我們試著揭開它們的神秘面紗。

DAGs

所有管道都定義為有向無環(huán)圖(dag)。每次執(zhí)行DAG時,都會創(chuàng)建一個單獨(dú)的運(yùn)行。每個DAG運(yùn)行都是獨(dú)立的,并且包含一個關(guān)于DAG執(zhí)行階段的狀態(tài)。這意味著相同的dag可以并行執(zhí)行多次。

要實(shí)例化DAG,可以使用DAG函數(shù)或與上下文管理器一起使用,如下所示:

from airflow import DAG
with DAG(
    "mlops",
    default_args={
        "retries": 1,
     },
    schedule=timedelta(days=1),
    start_date=datetime(2023, 1, 1)
) as dag:
# dag code goes here

上下文管理器接受一些關(guān)于DAG的全局變量和一些默認(rèn)參數(shù)。默認(rèn)參數(shù)被傳遞到所有任務(wù)中,并且可以在每個任務(wù)的基礎(chǔ)上重寫。完整的參數(shù)列表可以在官方文檔中找到。

在本例中,我們定義DAG將從2023年1月1日開始,并且每天執(zhí)行一次。retries參數(shù)確保在可能出現(xiàn)故障后重新運(yùn)行一次。

task(任務(wù))

DAG的每個節(jié)點(diǎn)表示一個Task,即一段單獨(dú)的代碼。每個任務(wù)可能有一些上游和下游依賴項。這些依賴關(guān)系表示任務(wù)如何相互關(guān)聯(lián)以及它們應(yīng)該以何種順序執(zhí)行。每當(dāng)初始化一個新的DAG運(yùn)行時,所有任務(wù)都初始化為Task實(shí)例。這意味著每個Task實(shí)例都是給定任務(wù)的特定運(yùn)行。

operator(任務(wù)模板)

操作符可以被視為預(yù)定義任務(wù)的模板,因?yàn)樗鼈兎庋b了樣板代碼并抽象了它們的大部分邏輯。常見的操作符有BashOperator、PythonOperator、MySqlOperator、S3FileTransformOperator。我們看到,操作符可以定義遵循特定模式的任務(wù)。例如,MySqlOperator創(chuàng)建任務(wù)來執(zhí)行SQL查詢,而BashOperator執(zhí)行bash腳本。

操作符在DAG上下文管理器中定義,如下所示。下面的代碼創(chuàng)建了兩個任務(wù),一個執(zhí)行bash命令,另一個執(zhí)行MySQL查詢。

with DAG(
	"tutorial"
) as dag:
    task1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )
    task2 = MySqlOperator(
        task_id="load_table",
        sql="/scripts/load_table.sql"
    )

任務(wù)依賴

為了形成DAG的結(jié)構(gòu),我們需要定義每個任務(wù)之間的依賴關(guān)系。一種方法是使用>>符號,如下所示:

task1 >> task2 >> task3
# 一個任務(wù)有多個依賴
task1 >> [task2, task3]
# 也可以使用set_downstream, set_upstream
t1.set_downstream([t2, t3])

xcom

xcom,或相互通信,負(fù)責(zé)任務(wù)之間的通信。xcom對象可以在任務(wù)之間推拉數(shù)據(jù)。更具體地說,它們將數(shù)據(jù)推入元數(shù)據(jù)數(shù)據(jù)庫,其他任務(wù)可以從中提取數(shù)據(jù)。這就是為什么可以通過它們傳遞的數(shù)據(jù)量是有限的。但是,如果需要傳輸大數(shù)據(jù),則可以使用合適的外部數(shù)據(jù)存儲,例如對象存儲或NoSQL數(shù)據(jù)庫。

看看下面的代碼。這兩個任務(wù)使用ti參數(shù)(任務(wù)實(shí)例的縮寫)通過xcom進(jìn)行通信。train_model任務(wù)將model_path推入元數(shù)據(jù)數(shù)據(jù)庫,元數(shù)據(jù)由deploy_model任務(wù)拉出。

dag = DAG(
    'mlops_dag',
)
def train_model(ti):
    model_path = train_and_save_model()
    ti.xcom_push(key='model_path', value=model_path)
def deploy_model(ti):
    model_path = ti.xcom_pull(key='model_path', task_ids='train_model')
    deploy_trained_model(model_path)
train_model_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)
deploy_model_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag
)
train_model_task >> deploy_model_task

Taskflow

Taskflow API是一種使用Python裝飾器@task來定義任務(wù)的簡單方法。如果所有任務(wù)的邏輯都可以用Python編寫,那么一個簡單的注釋就可以定義一個新任務(wù)。Taskflow自動管理其他任務(wù)之間的依賴關(guān)系和通信。

使用Taskflow API,我們可以用@dag裝飾器初始化DAG。下面是使用Tashflow示例:

@dag(
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
)
def mlops():
    @task
    def load_data():
        . . .
        return df
    @task
    def preprocessing(data):
       . . .
       return data
    @task
    def fit(data): 
        return None
    df = load_data()
    data = preprocessing(df)
    model = fit(data)
dag = mlops()

注意,任務(wù)之間的依賴關(guān)系是通過每個函數(shù)參數(shù)隱含的。這里我們是簡單的連接順序,但實(shí)際可以變得復(fù)雜得多。Taskflow API還解決了任務(wù)之間的通信問題,因此使用xcom的需求有限。

調(diào)度

作業(yè)調(diào)度是Airflow的核心功能之一。這可以使用schedule_interval參數(shù)完成,該參數(shù)接收cron表達(dá)式,表示日期時間對象,或預(yù)定義變量,如@hour, @daily等。更靈活的方法是使用最近添加的時間表,它支持使用Python定義自定義時間表。

下面是如何使用schedule_interval參數(shù)的示例。以下DAG將每天執(zhí)行。

@dag(
    start_date=datetime(2023,1,1),
    schedule_interval = '@daily',
    catchup =False
)
def my_dag():
    pass

關(guān)于調(diào)度,需要了解兩個非常重要的概念:回填(backfill)和追趕(catchup)。

一旦我們定義了DAG,我們就設(shè)置了開始日期和計劃間隔。如果catchup=True,則Airflow 將為從開始日期到當(dāng)前日期的所有計劃間隔創(chuàng)建DAG運(yùn)行。如果catchup=False,氣流將只從當(dāng)前日期調(diào)度運(yùn)行。

回填擴(kuò)展了這個想法,使我們能夠在CLI中創(chuàng)建過去的運(yùn)行,而不管catchup參數(shù)的值:

$ airflow backfill  -s <START_DATE> -e <END_DATE> <DAG_NAME>

連接

Airflow 提供了一種簡單的方法來配置與外部系統(tǒng)或服務(wù)的連接??梢允褂肬I、作為環(huán)境變量或通過配置文件創(chuàng)建連接。它們通常需要URL、身份驗(yàn)證信息和唯一id。鉤子(Hooks )是一種API,它抽象了與這些外部系統(tǒng)的通信。例如,我們可以通過如下的UI定義一個PostgreSQL連接:

然后使用PostgresHook來建立連接并執(zhí)行我們的查詢:

pg_hook = PostgresHook(postgres_conn_id='postgres_custom')
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('create table _mytable (ModelID int, ModelName varchar(255)')
cursor.close()
conn.close()

高級概念

為了使本教程盡可能完整,我需要提到一些更高級的概念。我不會詳細(xì)介紹每一個,但我強(qiáng)烈建議你看看他們,如果你想深入掌握Airflow 。

  • 分支:分支允許你將任務(wù)劃分為許多不同的任務(wù),如:支持條件處理不同任務(wù)的工作流。最常見的方法是BranchPythonOperator。
  • 任務(wù)組:任務(wù)組可以在單個組中組織多個任務(wù)。它是簡化圖形視圖和重復(fù)模式的好工具。
  • 動態(tài)包:包和任務(wù)也可以以動態(tài)的方式構(gòu)造。從Airflow 2.3開始,可以在運(yùn)行時創(chuàng)建包和任務(wù),這對于并行和依賴輸入的任務(wù)來說是理想的。氣流也支持Jinja模板,并且是對動態(tài)包非常有用的補(bǔ)充。
  • 單元測試和日志記錄:氣流具有運(yùn)行單元測試和記錄信息的專用功能.

Airflow最佳實(shí)踐

在我們看到實(shí)際操作的示例之前,讓我們討論一下大多數(shù)從業(yè)者使用的一些最佳實(shí)踐。

  • 分支:分支允許你將任務(wù)劃分為許多不同的任務(wù),如:支持條件處理不同任務(wù)的工作流。最常見的方法是BranchPythonOperator。
  • 任務(wù)組:任務(wù)組可以在單個組中組織多個任務(wù)。它是簡化圖形視圖和重復(fù)模式的好工具。
  • 動態(tài)包:包和任務(wù)也可以以動態(tài)的方式構(gòu)造。從Airflow 2.3開始,可以在運(yùn)行時創(chuàng)建包和任務(wù),這對于并行和依賴輸入的任務(wù)來說是理想的。氣流也支持Jinja模板,并且是對動態(tài)包非常有用的補(bǔ)充。
  • 單元測試和日志記錄:氣流具有運(yùn)行單元測試和記錄信息的專用功能.

到此這篇關(guān)于Apache Airflow 快速入門教程的文章就介紹到這了,更多相關(guān)Apache Airflow 入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • linux輸入yum后提示: -bash: /usr/bin/yum: No such file or directory的解決方法

    linux輸入yum后提示: -bash: /usr/bin/yum: No such file or director

    在本篇文章里小編給大家整理的是關(guān)于linux輸入yum后提示: -bash: /usr/bin/yum: No such file or directory的解決方法,有需要的朋友們參考下。
    2019-11-11
  • Linux下“/”和“~”的區(qū)別詳解

    Linux下“/”和“~”的區(qū)別詳解

    這篇文章主要介紹了Linux下“/”和“~”的區(qū)別詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • Ubuntu環(huán)境編譯安裝PHP和Nginx的方法

    Ubuntu環(huán)境編譯安裝PHP和Nginx的方法

    這篇文章主要介紹了Ubuntu環(huán)境編譯安裝PHP和Nginx的方法,較為詳細(xì)的分析了Ubuntu環(huán)境編譯安裝PHP和Nginx的具體步驟、相關(guān)命令與操作技巧,需要的朋友可以參考下
    2019-08-08
  • centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的方法

    centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的方法

    這篇文章主要介紹了centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的方法,較為詳細(xì)的講述了centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的具體步驟與相關(guān)設(shè)置技巧,需要的朋友可以參考下
    2018-01-01
  • apache虛擬主機(jī)的配置指南

    apache虛擬主機(jī)的配置指南

    本文介紹了apache虛擬主機(jī)的配置的方法,要配置apache的虛擬主機(jī),我們需要分以下幾步進(jìn)行:檢查apache虛擬主機(jī)模塊,開啟apache虛擬主機(jī)功能,httpd-vhosts.conf文件詳解,根據(jù)IP配置虛擬主機(jī),根據(jù)端口配置虛擬主機(jī),根據(jù)域名配置虛擬主機(jī),有需要的小伙伴參考下
    2015-01-01
  • 什么是Linux軟鏈接和Linux硬鏈接

    什么是Linux軟鏈接和Linux硬鏈接

    這篇文章主要為大家詳細(xì)介紹了什么是Linux軟鏈接和Linux硬鏈接,在默認(rèn)情況下,ln命令產(chǎn)生硬鏈接,感興趣的小伙伴們可以參考一下
    2016-05-05
  • 在 Ubuntu 上安裝 Protobuf 3 的教程詳解

    在 Ubuntu 上安裝 Protobuf 3 的教程詳解

    這篇文章主要介紹了在 Ubuntu 上安裝 Protobuf 3遇到問題及解決方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值 ,需要的朋友可以參考下
    2019-06-06
  • 解決Unixbench安裝報錯信息的問題

    解決Unixbench安裝報錯信息的問題

    下面小編就為大家分享一篇解決Unixbench安裝報錯信息的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2017-12-12
  • 淺析centos 7 自帶的 php 5.4升級為 5.6的方法

    淺析centos 7 自帶的 php 5.4升級為 5.6的方法

    這篇文章主要介紹了centos 7 自帶的 php 5.4升級為 5.6的方法,需要的朋友可以參考下
    2018-12-12
  • 安裝redhat 8.0紅帽系統(tǒng)的圖文教程(小白必備)

    安裝redhat 8.0紅帽系統(tǒng)的圖文教程(小白必備)

    這篇文章主要介紹了安裝redhat 8.0紅帽系統(tǒng)的圖文教程(小白必備),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12

最新評論