Havo oqimi: kamroq ma'lum maslahatlar, fokuslar va eng yaxshi amaliyotlar

Siz ishlatadigan barcha vositalar bilan ma'lum narsalar mavjud, ular uzoq vaqt ishlatilgan bo'lsa ham, siz bilmay qolasiz. Va buni bilganingizdan so'ng, siz o'z mijozingizga "bundan oldin buni yaxshiroq qilish mumkin emasligini" aytganingizdek, "oldin buni bilgan bo'lar edim" ga o'xshaysiz. Boshqa vosita kabi havo oqimi farq qilmaydi, sizning hayotingizni osonlashtiradigan va DAG rivojlanishini qiziqarli qiladigan ba'zi yashirin toshlar mavjud.

Siz ulardan ba'zilarini bilishingiz mumkin va agar siz ularni bilsangiz, demak siz PROsizsiz.

(1) DAG kontekst menejeri bilan

Vazifaga dag = dag qo'shishni unutganingizda va havo oqimi bilan bog'liq xato qilsangiz, o'zingizdan g'azablanganmisiz? Ha, har bir vazifa uchun qo'shishni unutish oson. Shuningdek, quyidagi misolda ko'rsatilgan parametrni qo'shish ortiqcha (masalan_dag.py fayli):

Yuqoridagi misolda (example_dag.py fayli) atigi 2 ta vazifa mavjud, ammo agar sizda 10 yoki undan ko'p bo'lsa, qisqartirish yanada ravshan bo'ladi. Buning oldini olish uchun Airflow DAG-ni kontekst menejerlar sifatida yuqoridagi misolda ko'rsatilgan (masalan_dag_with_context.py) yuqoridagi misolda ko'rsatilgandek avtomatik ravishda yangi DAG-ga yangi operatorlarni tayinlash uchun foydalanishingiz mumkin.

(2) Vazifaga bog'liqlikni o'rnatish uchun ro'yxat yordamida

Quyidagi rasmda ko'rsatilgandek DAG yaratmoqchi bo'lsangiz, vazifa bog'liqligini belgilashda vazifa nomlarini takrorlashingiz kerak bo'ladi.

Yuqoridagi kod parchasida ko'rsatilgandek, vazifa bog'liqligini o'rnatishning odatiy usulidan foydalanish task_two va end uch marta takrorlanganligini bildiradi. Xuddi shu natijaga yanada oqlangan tarzda erishish uchun buni python listlari yordamida almashtirish mumkin.

(3) Takrorlashdan saqlanish uchun odatiy dalillardan foydalaning

Havo oqimi, DAGdagi barcha vazifalar uchun mavjud bo'lishi mumkin bo'lgan parametrlar lug'atini berishga imkon beradi.

Masalan, DataReply-da biz barcha DataWareshouse-ga tegishli DAG-lar uchun BigQuery-dan foydalanamiz va har bir vazifaga teg, bigquery_conn_id kabi parametrlarni berish o'rniga shunchaki indefault_args lug'atini quyidagi DAG-da ko'rsatamiz.

Bu shunchaki DAG ishlamay qolishi o'rniga shaxsiy vazifalar muvaffaqiyatsizligi to'g'risida ogohlantirishni xohlaganingizda foydalidir, bu haqda so'nggi blogimda Airflow-dagi sustlik ogohlantirishlarini integratsiyalash to'g'risidagi blogimda aytib o'tgan edim.

(4) "params" argumenti

"Parametrlar" - bu shablonlarda kirish mumkin bo'lgan DAG darajasi parametrlarining lug'ati. Ushbu parametrlarni vazifa darajasida bekor qilish mumkin.

Bu juda foydali dalil va men shaxsan uni juda ko'p ishlatganman, chunki unga params.param_name-dan foydalanib, jinja shablonlari yordamida andazali maydonga kirish mumkin. Bunga misol quyidagicha:

Qattiq kodlangan qiymatlar o'rniga parametrlangan DAGni yozishni osonlashtiradi. Yuqoridagi misollarda ko'rsatilgandek parametrlar lug'atini 3 joyda aniqlash mumkin: (1) DAG ob'ektida (2) default_args lug'atida (3) Har bir vazifa.

(5) Ulanishlarda sezgir ma'lumotlarni saqlash

Ko'pgina foydalanuvchilar buni bilishadi, lekin men DAG ichidagi oddiy matnda saqlangan parollarni ko'rdim. Yaxshilik uchun - buni qilmang. Siz DAG-laringizni umumiy omborxonada saqlash uchun etarli darajada ishonch bilan yozishingiz kerak.

Odatiy bo'lib, havo oqimi ulanish uchun parollarni metadata ma'lumotlar bazasida oddiy matnda saqlaydi. Havo oqimini o'rnatish paytida kripto paketi juda tavsiya etiladi va uni apache-airflow [crypto] pip o'rnatish orqali amalga oshirish mumkin.

Keyin unga quyidagicha osongina kirishingiz mumkin:

airflow.hooks.base_hook-dan import BaseHook-dan
slack_token = BaseHook.get_connection ('sustlik'). parol

(6) DAG-dagi havo oqimi o'zgaruvchilar sonini cheklang

Havo oqimi o'zgaruvchilari Metadata ma'lumotlar bazasida saqlanadi, shuning uchun har qanday o'zgaruvchiga murojaat qilish Metadata ma'lumotlar bazasiga ulanishni anglatadi. Sizning DAG fayllaringiz har X soniyada tahlil qilinadi. O'zingizning DAG-da ko'p o'zgaruvchidan foydalanish (va yomonroq default_args), sizning ma'lumotlar bazasiga ruxsat berilgan ulanishlar sonini to'ydirishingiz mumkin degan ma'noni anglatadi.

Bunday vaziyatni oldini olish uchun siz JSON qiymatiga ega bitta Airflow o'zgaruvchisidan foydalanishingiz mumkin. Havo oqimi o'zgaruvchisi JSON qiymatini o'z ichiga olishi mumkinligi sababli, siz DAG konfiguratsiyasini quyidagi rasmda ko'rsatilgandek bitta o'zgaruvchida saqlashingiz mumkin:

Ushbu ekran rasmida ko'rsatilgandek siz qiymatlarni alohida havo oqimi parametrlarida yoki bitta havo oqimi o'zgaruvchisi ostida JSON maydoni sifatida saqlashingiz mumkin.

Keyin ularga quyida ko'rsatilgandek, tavsiya etilgan usul ostida kirishingiz mumkin:

(7) "Kontekst" lug'ati

Foydalanuvchilar PythonOperator-ni chaqirish funktsiyasidan foydalanganda ko'pincha kontekst lug'atining tarkibini unutishadi.

Kontekstda vazifa instansiyasi bilan bog'liq ob'ektlarga havolalar mavjud va ular API makros bo'limi ostida hujjatlashtirilgan, chunki ular andoza qilingan maydonlarda ham mavjud.

{
      'dag': task.dag,
      "ds": ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      "prev_ds": oldingi,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      'ts': ts,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'kecha_ds': kecha_desh,
      'kecha_ds_nodash': kecha_ds_nodash,
      'ertaga_ds': ertaga_ds,
      'ertaga_ds_nodash': ertaga_ds_nodash,
      "END_DATE": ds,
      'tugash kuni': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'bajarish_kana': self.exmissions_date,
      'prev_exedom_date': prev_exedom_date,
      'next_ex ijro__ kuni': next_ex ijro__ sanasi,
      "so'nggi_ kun": ds,
      'makros': makroslar,
      "params": paramlar,
      'jadval': jadvallar,
      'task': vazifa,
      'task_instance': o'zini,
      'ti': o'zini,
      'task_instance_key_str': ti_key_str,
      'conf': konfiguratsiya,
      'test_mode': self.test_mode,
      'var': {
          'value': VariableAccessor (),
          'json': O'zgaruvchanJsonAccessor ()
      },
      'inlets': task.inlets,
      'outs': task.outlets,
}

(8) Havo oqimining dinamik vazifalari

Men StackOverflow-da dinamik vazifalarni qanday yaratish haqida ko'plab savollarga javob berdim. Javob oddiy, siz barcha vazifalaringiz uchun noyob task_id yaratishingiz kerak. Quyida bunga qanday erishishning ikkita misoli keltirilgan:

(9) "havo oqimi tashabbusi" o'rniga "havo oqimi yangilangan" ni ishga tushiring

Birinchi Apache Airflow London Uchrashuvidagi nutqida aytilgani uchun Ash Berlinga minnatdorchilik bildiraman.

airflow initdb barcha standart ulanishlarni, jadvallarni va boshqalarni yaratadi, biz foydalana olmaymiz va ishlab chiqarish ma'lumotlar bazamizda istamaymiz. havo oqimi yangilangan shunchaki ma'lumotlar bazasi jadvaliga mavjud bo'lmagan migratsiyalarni amalga oshiradi. (shu jumladan, etishmayotgan jadvallarni yaratish va hk) Shuningdek, har safar ishlatish xavfsiz, u qanday ko'chirilganligini kuzatadi (Alembik modulidan foydalangan holda).

Agar siz ushbu blog postiga qo'shishga arziydigan narsani bilsangiz, menga quyidagi izohlar bo'limida xabar bering. Baxtli havo oqimi :-)