Apache Airflow 快速入門教程應(yīng)用場景分析
Apache Airflow已經(jīng)成為Python生態(tài)系統(tǒng)中管道編排的事實(shí)上的庫。與類似的解決方案相反,由于它的簡單性和可擴(kuò)展性,它已經(jīng)獲得了普及。在本文中,我將嘗試概述它的主要概念,并讓您清楚地了解何時以及如何使用它。
Airflow應(yīng)用場景
想象一下,你想要構(gòu)建一個機(jī)器學(xué)習(xí)管道,它由以下幾個步驟組成:
- 從基于云的存儲中讀取圖像數(shù)據(jù)集
- 處理圖像
- 使用下載的圖像訓(xùn)練深度學(xué)習(xí)模型
- 將訓(xùn)練好的模型上傳到云端
- 部署模型
你將如何安排和自動化這個工作流程?Cron作業(yè)是一個簡單的解決方案,但它也帶來了許多問題。最重要的是,它們不允許你有效地擴(kuò)展。Airflow提供了輕松調(diào)度和擴(kuò)展復(fù)雜數(shù)據(jù)流程編排的能力,另一方面,它還能夠在故障后自動重新運(yùn)行它們,管理它們的依賴關(guān)系,并使用日志和儀表板監(jiān)視它們。
在構(gòu)建上述數(shù)據(jù)流之前,讓我們先了解Apache Airflow 的基本概念。
Airflow 簡介
Apache Airflow 是一個開源的平臺,用于編排、調(diào)度和監(jiān)控工作流,工作流是由一系列任務(wù)(Tasks)組成的,這些任務(wù)可以是數(shù)據(jù)處理、數(shù)據(jù)分析、機(jī)器學(xué)習(xí)模型訓(xùn)練、文件傳輸?shù)雀鞣N操作。因此,它是ETL和MLOps用例的理想解決方案。示例用例包括:
- 從多個數(shù)據(jù)源提取數(shù)據(jù),對其進(jìn)行聚合、轉(zhuǎn)換,并將其存儲在數(shù)據(jù)倉庫中。
- 從數(shù)據(jù)中提取見解并將其顯示在分析儀表板中
- 訓(xùn)練、驗(yàn)證和部署機(jī)器學(xué)習(xí)模型
核心組件
在默認(rèn)版本中安裝Apache Airflow 時,你將看到四個不同的組件。
- Webserver: Webserver是Airflow的用戶界面(UI),它允許您在不需要CLI或API的情況下與之交互。從那里可以執(zhí)行和監(jiān)視管道,創(chuàng)建與外部系統(tǒng)的連接,檢查它們的數(shù)據(jù)集等等。
- 執(zhí)行器:執(zhí)行器是管道運(yùn)行的機(jī)制。有許多不同類型的管道在本地運(yùn)行,在單個機(jī)器中運(yùn)行,或者以分布式方式運(yùn)行。一些例子是LocalExecutor, SequentialExecutor, CeleryExecutor和KubernetesExecutor
- 調(diào)度器:調(diào)度器負(fù)責(zé)在正確的時間執(zhí)行不同的任務(wù),重新運(yùn)行管道,回填數(shù)據(jù),確保任務(wù)完成等。
- PostgreSQL:存儲所有管道元數(shù)據(jù)的數(shù)據(jù)庫。這通常是Postgres,但也支持其他SQL數(shù)據(jù)庫。
安裝Airflow最簡單的方法是使用docker compose。你可以從這里下載官方的docker撰寫文件:
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
基本概念
要學(xué)習(xí)Apache Airflow,必須熟悉它的主要概念,這些概念可能有點(diǎn)難理解,讓我們試著揭開它們的神秘面紗。
DAGs
所有管道都定義為有向無環(huán)圖(dag)。每次執(zhí)行DAG時,都會創(chuàng)建一個單獨(dú)的運(yùn)行。每個DAG運(yùn)行都是獨(dú)立的,并且包含一個關(guān)于DAG執(zhí)行階段的狀態(tài)。這意味著相同的dag可以并行執(zhí)行多次。
要實(shí)例化DAG,可以使用DAG函數(shù)或與上下文管理器一起使用,如下所示:
from airflow import DAG with DAG( "mlops", default_args={ "retries": 1, }, schedule=timedelta(days=1), start_date=datetime(2023, 1, 1) ) as dag: # dag code goes here
上下文管理器接受一些關(guān)于DAG的全局變量和一些默認(rèn)參數(shù)。默認(rèn)參數(shù)被傳遞到所有任務(wù)中,并且可以在每個任務(wù)的基礎(chǔ)上重寫。完整的參數(shù)列表可以在官方文檔中找到。
在本例中,我們定義DAG將從2023年1月1日開始,并且每天執(zhí)行一次。retries參數(shù)確保在可能出現(xiàn)故障后重新運(yùn)行一次。
task(任務(wù))
DAG的每個節(jié)點(diǎn)表示一個Task,即一段單獨(dú)的代碼。每個任務(wù)可能有一些上游和下游依賴項。這些依賴關(guān)系表示任務(wù)如何相互關(guān)聯(lián)以及它們應(yīng)該以何種順序執(zhí)行。每當(dāng)初始化一個新的DAG運(yùn)行時,所有任務(wù)都初始化為Task實(shí)例。這意味著每個Task實(shí)例都是給定任務(wù)的特定運(yùn)行。
operator(任務(wù)模板)
操作符可以被視為預(yù)定義任務(wù)的模板,因?yàn)樗鼈兎庋b了樣板代碼并抽象了它們的大部分邏輯。常見的操作符有BashOperator、PythonOperator、MySqlOperator、S3FileTransformOperator。我們看到,操作符可以定義遵循特定模式的任務(wù)。例如,MySqlOperator創(chuàng)建任務(wù)來執(zhí)行SQL查詢,而BashOperator執(zhí)行bash腳本。
操作符在DAG上下文管理器中定義,如下所示。下面的代碼創(chuàng)建了兩個任務(wù),一個執(zhí)行bash命令,另一個執(zhí)行MySQL查詢。
with DAG( "tutorial" ) as dag: task1 = BashOperator( task_id="print_date", bash_command="date", ) task2 = MySqlOperator( task_id="load_table", sql="/scripts/load_table.sql" )
任務(wù)依賴
為了形成DAG的結(jié)構(gòu),我們需要定義每個任務(wù)之間的依賴關(guān)系。一種方法是使用>>符號,如下所示:
task1 >> task2 >> task3 # 一個任務(wù)有多個依賴 task1 >> [task2, task3] # 也可以使用set_downstream, set_upstream t1.set_downstream([t2, t3])
xcom
xcom,或相互通信,負(fù)責(zé)任務(wù)之間的通信。xcom對象可以在任務(wù)之間推拉數(shù)據(jù)。更具體地說,它們將數(shù)據(jù)推入元數(shù)據(jù)數(shù)據(jù)庫,其他任務(wù)可以從中提取數(shù)據(jù)。這就是為什么可以通過它們傳遞的數(shù)據(jù)量是有限的。但是,如果需要傳輸大數(shù)據(jù),則可以使用合適的外部數(shù)據(jù)存儲,例如對象存儲或NoSQL數(shù)據(jù)庫。
看看下面的代碼。這兩個任務(wù)使用ti參數(shù)(任務(wù)實(shí)例的縮寫)通過xcom進(jìn)行通信。train_model任務(wù)將model_path推入元數(shù)據(jù)數(shù)據(jù)庫,元數(shù)據(jù)由deploy_model任務(wù)拉出。
dag = DAG( 'mlops_dag', ) def train_model(ti): model_path = train_and_save_model() ti.xcom_push(key='model_path', value=model_path) def deploy_model(ti): model_path = ti.xcom_pull(key='model_path', task_ids='train_model') deploy_trained_model(model_path) train_model_task = PythonOperator( task_id='train_model', python_callable=train_model, dag=dag ) deploy_model_task = PythonOperator( task_id='deploy_model', python_callable=deploy_model, dag=dag ) train_model_task >> deploy_model_task
Taskflow
Taskflow API是一種使用Python裝飾器@task來定義任務(wù)的簡單方法。如果所有任務(wù)的邏輯都可以用Python編寫,那么一個簡單的注釋就可以定義一個新任務(wù)。Taskflow自動管理其他任務(wù)之間的依賴關(guān)系和通信。
使用Taskflow API,我們可以用@dag裝飾器初始化DAG。下面是使用Tashflow示例:
@dag( start_date=datetime(2023, 1, 1), schedule_interval='@daily' ) def mlops(): @task def load_data(): . . . return df @task def preprocessing(data): . . . return data @task def fit(data): return None df = load_data() data = preprocessing(df) model = fit(data) dag = mlops()
注意,任務(wù)之間的依賴關(guān)系是通過每個函數(shù)參數(shù)隱含的。這里我們是簡單的連接順序,但實(shí)際可以變得復(fù)雜得多。Taskflow API還解決了任務(wù)之間的通信問題,因此使用xcom的需求有限。
調(diào)度
作業(yè)調(diào)度是Airflow的核心功能之一。這可以使用schedule_interval參數(shù)完成,該參數(shù)接收cron表達(dá)式,表示日期時間對象,或預(yù)定義變量,如@hour, @daily等。更靈活的方法是使用最近添加的時間表,它支持使用Python定義自定義時間表。
下面是如何使用schedule_interval參數(shù)的示例。以下DAG將每天執(zhí)行。
@dag( start_date=datetime(2023,1,1), schedule_interval = '@daily', catchup =False ) def my_dag(): pass
關(guān)于調(diào)度,需要了解兩個非常重要的概念:回填(backfill)和追趕(catchup)。
一旦我們定義了DAG,我們就設(shè)置了開始日期和計劃間隔。如果catchup=True,則Airflow 將為從開始日期到當(dāng)前日期的所有計劃間隔創(chuàng)建DAG運(yùn)行。如果catchup=False,氣流將只從當(dāng)前日期調(diào)度運(yùn)行。
回填擴(kuò)展了這個想法,使我們能夠在CLI中創(chuàng)建過去的運(yùn)行,而不管catchup參數(shù)的值:
$ airflow backfill -s <START_DATE> -e <END_DATE> <DAG_NAME>
連接
Airflow 提供了一種簡單的方法來配置與外部系統(tǒng)或服務(wù)的連接??梢允褂肬I、作為環(huán)境變量或通過配置文件創(chuàng)建連接。它們通常需要URL、身份驗(yàn)證信息和唯一id。鉤子(Hooks )是一種API,它抽象了與這些外部系統(tǒng)的通信。例如,我們可以通過如下的UI定義一個PostgreSQL連接:
然后使用PostgresHook來建立連接并執(zhí)行我們的查詢:
pg_hook = PostgresHook(postgres_conn_id='postgres_custom') conn = pg_hook.get_conn() cursor = conn.cursor() cursor.execute('create table _mytable (ModelID int, ModelName varchar(255)') cursor.close() conn.close()
高級概念
為了使本教程盡可能完整,我需要提到一些更高級的概念。我不會詳細(xì)介紹每一個,但我強(qiáng)烈建議你看看他們,如果你想深入掌握Airflow 。
- 分支:分支允許你將任務(wù)劃分為許多不同的任務(wù),如:支持條件處理不同任務(wù)的工作流。最常見的方法是BranchPythonOperator。
- 任務(wù)組:任務(wù)組可以在單個組中組織多個任務(wù)。它是簡化圖形視圖和重復(fù)模式的好工具。
- 動態(tài)包:包和任務(wù)也可以以動態(tài)的方式構(gòu)造。從Airflow 2.3開始,可以在運(yùn)行時創(chuàng)建包和任務(wù),這對于并行和依賴輸入的任務(wù)來說是理想的。氣流也支持Jinja模板,并且是對動態(tài)包非常有用的補(bǔ)充。
- 單元測試和日志記錄:氣流具有運(yùn)行單元測試和記錄信息的專用功能.
Airflow最佳實(shí)踐
在我們看到實(shí)際操作的示例之前,讓我們討論一下大多數(shù)從業(yè)者使用的一些最佳實(shí)踐。
- 分支:分支允許你將任務(wù)劃分為許多不同的任務(wù),如:支持條件處理不同任務(wù)的工作流。最常見的方法是BranchPythonOperator。
- 任務(wù)組:任務(wù)組可以在單個組中組織多個任務(wù)。它是簡化圖形視圖和重復(fù)模式的好工具。
- 動態(tài)包:包和任務(wù)也可以以動態(tài)的方式構(gòu)造。從Airflow 2.3開始,可以在運(yùn)行時創(chuàng)建包和任務(wù),這對于并行和依賴輸入的任務(wù)來說是理想的。氣流也支持Jinja模板,并且是對動態(tài)包非常有用的補(bǔ)充。
- 單元測試和日志記錄:氣流具有運(yùn)行單元測試和記錄信息的專用功能.
到此這篇關(guān)于Apache Airflow 快速入門教程的文章就介紹到這了,更多相關(guān)Apache Airflow 入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
linux輸入yum后提示: -bash: /usr/bin/yum: No such file or director
在本篇文章里小編給大家整理的是關(guān)于linux輸入yum后提示: -bash: /usr/bin/yum: No such file or directory的解決方法,有需要的朋友們參考下。2019-11-11Ubuntu環(huán)境編譯安裝PHP和Nginx的方法
這篇文章主要介紹了Ubuntu環(huán)境編譯安裝PHP和Nginx的方法,較為詳細(xì)的分析了Ubuntu環(huán)境編譯安裝PHP和Nginx的具體步驟、相關(guān)命令與操作技巧,需要的朋友可以參考下2019-08-08centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的方法
這篇文章主要介紹了centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的方法,較為詳細(xì)的講述了centos7.2.1511安裝jdk1.8.0_151及mysql5.6.38的具體步驟與相關(guān)設(shè)置技巧,需要的朋友可以參考下2018-01-01淺析centos 7 自帶的 php 5.4升級為 5.6的方法
這篇文章主要介紹了centos 7 自帶的 php 5.4升級為 5.6的方法,需要的朋友可以參考下2018-12-12安裝redhat 8.0紅帽系統(tǒng)的圖文教程(小白必備)
這篇文章主要介紹了安裝redhat 8.0紅帽系統(tǒng)的圖文教程(小白必備),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12