Ауа ағыны: Бір суретте мыңдаған тапсырмаларды жоспарлау үшін пайдалану әдісі

Бұл лауазымда мен қалайша мың бір тапсырманы қалай жоспарлауға болатынын талқыламақпын. Мен Airflow деген не және оны қалай орнатуға болатындығына назар аударғым келмейді, бірақ оның орнына бір дагдың ​​ішінде көптеген тапсырмаларды қалай жоспарлауға болатыны туралы сөйлесемін.

Негізінде, ауа ағындары бірнеше DAG болуы үшін жасалған және DAG ішінде жүздеген немесе мың тапсырмалар болуы мүмкін. Сонымен, біз көптеген тапсырмаларды жоспарлағымыз келгенде не болады, бұл шамамен 60000 немесе одан да көп. Бұл мен осы блогта түсіндіргенім.

Мен жұмыс ағындарын автоматтандыру үшін Airflow-мен жұмыс жасаймын. Бірақ менің компаниямда бізде өте үлкен деректер бар, мен Airflow-ның әртүрлі нұсқаларын қолданып көрдім және өте үлкен мәліметтер болғандықтан, менде бір DAG ішінде 70000-ге жуық тапсырмалар бар. Мен Airflow нұсқасының әртүрлі нұсқаларын қолдандым, ал соңғы нұсқасы 5000 тапсырманы жоспарлаушы бола алады, бірақ егер біз одан да көп жоспарлауды жоспарласақ, жоспарлаусыз жұмыс күйінде қалады. Мен кез-келген мәселені тауып, оны қалай шешуге болатынын және нақты себеп не екенін, ақыры осы блогты жазуға болатындығын білдім.

Бұл бір DAG ішінде мыңдаған тапсырмалар болған кезде ауа ағынының қолданылуының бірі. Алдымен біз ауа ағынының 1.10.3 нұсқасын қолдануымыз керек, содан кейін тапсырмалардың көпшілігіне назар аудармаймыз, сондықтан Airflow 1.10.3 нұсқасын қолдануымыз керек. Бұл нұсқаны орнату үшін келесі әрекеттерді орындаңыз:

  • Алдымен біз жаңа пәрменді қолданып жаңа орта құрып, сол ортаны іске қосуымыз керек:
conda жасау -n ауа ағынын_3
Конда ауа ағынын белсендіреді_3
  • 1.10.3 нұсқасымен ауа ағынын орнату үшін келесі пәрменді пайдаланыңыз:
conda install -c conda-forge ауа ағыны == 1.10.3
  • Бұл нұсқа flask≥1.0.9-пен жұмыс істемейтін кейбір нақты талаптарға көз жеткізіңіз, сондықтан егер сізде осы нұсқадан үлкен колбасы болса, келесі пәрменді қолданыңыз:
құбыр орнату колбасы == 1.0.4
pip install funcsigs == 1.0.0 (бұл орнатылуы қажет басқа талаптар)
  • Біз өте көп мөлшерде жұмыс жасағанда сельдерей өндеушісін пайдалану ұсынылады, өйткені біз осы міндеттерді параллельдеуіміз керек және оған сельдерей орындаушыны қолдану арқылы қол жеткізуге болады. Сельдерейді орнату үшін келесі пәрменді қолданыңыз:
тамшуырға балдыркөк салыңыз
  • Сіз жұмысшыларды пайдаланып, балдыркөк орындаушысын пайдалану үшін брокерді белгілеуіңіз керек, мен RabbitMQ-ді брокер ретінде қолданамын. TO орнату брокерінің URL мекен-жайы келесі құрылымды қолдана алады:
broker_url = amqp: // «пайдаланушы аты»: «пароль» @ «host_name»: «порт» /

Мысалға

брокер_url = amqp: // қонақ: қонақ @ localhost: 5672 /
  • Сельдерей Орындаушысының пайдаланушы интерфейсін көру үшін біз Гүлді қолдана отырып, келесі пәрменді орнатамыз:
conda install -c conda-forge гүлі
  • Осыдан кейін мың параллельді орындау үшін бірнеше конфигурацияны өзгерту керек және мыңдаған тапсырмаларды бір суретте жоспарлау керек.
[өзегі]
орындаушы = Сельдерей Эксплорасы параллелизм = 200000 емес__таск_слот_көлем = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[жоспарлаушы]
max_thread = 10 (Бағдарламаңыз бойынша ағындарды көбейту немесе азайту арқылы қолдана аласыз)

Міне, бірнеше тапсырманы бір суретке жоспарлағыңыз келсе, бұл негізгі параметрлер. Сіз оны параллельде қанша максималды DAG жұмыс істегіңіз келетінін және бір DAG ішінде қанша тапсырманың бар екеніне қарай реттеуіңіз керек.

Басты параметр - «Non_pooled_task_slot_count», ол Airflow 1.10.4 нұсқасынан алынып тасталды, сондықтан мен 1.10.3-ті қолданамын, өйткені бұл параметр тапсырмаларды жоспарлауда өте маңызды рөл атқарады.

«Non_pooled_task_slot_count» алып тастағаннан кейінгі негізгі айырмашылық - бұл әдепкі бойынша 128 мәніне орнатылған default_pool қолданады (талапқа сәйкес көбейте алады). «Non_pooled_task_slot_count» негізгі жұмысы - бұл тапсырмаларды жоспарлау және ол default_pool немесе дерекқордағы кез-келген басқа қосылуларға қосылмаған, сондықтан біз оны сіз қалағанша көбейте аламыз, бірақ егер сіз «default_pool» -да ұялар санын көбейтсеңіз. онда ол сізде бар дерекқор қосылымдарына қосылады және бір уақытта параллель жұмыс істейтін 100000 дерекқорға қосыла алмайсыз. Негізінен, «non_pooled_task_slot_count» «default_pool» пайдасына алынып тасталды.

Бұл хабарламада жоспарлаушы неліктен қиын болады, ол кептеліп қалды, көп жұмыс жоспарламайды немесе ештеңе істемей күні бойы жұмыс істейді деген сұраққа жауап бар. Бұл жауаптың бәрінде Airflow 1.10.3 нұсқасын қолдануға бір жауап бар.

Airflow 1.10.3 пайдаланған кезде біз DAG қандай бассейнді пайдалану керектігін көрсетуіміз керек, өйткені ол әдепкі бойынша «default_pool» қолданбайды, сондықтан тапсырмаларды құру кезінде біз para mater бассейнінен = 'defautl_pool' өтуіміз керек. UI (Admin -> бассейндер) көмегімен 'default_pool' құруға болады немесе пәрмен жолымен орындалады:

ауа ағыны-default_pool 128 'әдепкі бассейн'.

DAG үлгісінің мысалы:

datetime импорттау датасынан импорттау, timedelta ауа ағынын DAG ауа ағынынан импорттау. airflow.operators.dummy_operator импорттау DummyOperator
default_args = {'иесі': 'Ауа ағыны', 'байланысты_он_паст': Жалған, 'басталу_күні': airflow.utils.dates.days_ago (2), 'қайталау': 1, 'қайталау_делу': timedelta (минут = 1),}
dag = DAG ('dummy_try1', default_args = default_args, кесте_интервалы = Ешқайсысы)
(50000) диапазондағы i үшін: тапсырмалар = DummyOperator (task_id = '{}'. формат (i), dag = dag, бассейн = 'default_pool)

Барлық нұсқалар арасындағы айырмашылықты төмендегі сілтемеден тексере аласыз:

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104