Luchtstroom: minder bekende tips, trucs en praktische tips

Er zijn bepaalde dingen met alle tools die u gebruikt, die u zelfs na lange tijd niet zult weten. En als je het eenmaal weet, zeg je: "Ik wou dat ik dit eerder wist", omdat je je klant al had verteld dat het niet beter kan. Luchtstroom zoals andere tools is niet anders, er zijn een aantal verborgen pareltjes die je leven gemakkelijk kunnen maken en de ontwikkeling van DAG leuk maken.

Je kent ze misschien al en als je ze allemaal kent - nou dan ben je een PRO PRO.

(1) DAG met contextbeheer

Was je geïrriteerd aan jezelf toen je vergat om dag = dag toe te voegen aan je taak en Airflow-foutmelding? Ja, het is gemakkelijk om te vergeten het voor elke taak toe te voegen. Het is ook overbodig om dezelfde parameter toe te voegen zoals in het volgende voorbeeld (file example_dag.py):

Het bovenstaande voorbeeld (voorbeeld_dag.py-bestand) heeft slechts 2 taken, maar als u 10 of meer hebt, wordt de redundantie duidelijker. Om dit te voorkomen, kunt u Airflow DAG's als contextmanagers gebruiken om automatisch nieuwe operators aan die DAG toe te wijzen, zoals getoond in het bovenstaande voorbeeld (example_dag_with_context.py) met de instructie statement.

(2) Lijst gebruiken om taakafhankelijkheden in te stellen

Als u de DAG wilt maken die lijkt op die in de onderstaande afbeelding, moet u taaknamen herhalen bij het instellen van taakafhankelijkheden.

Zoals in het bovenstaande codefragment wordt getoond, zou het gebruik van onze normale manier om taakafhankelijkheden in te stellen betekenen dat task_two en end 3 keer worden herhaald. Dit kan worden vervangen met behulp van python-lijsten om hetzelfde resultaat op een elegantere manier te bereiken.

(3) Gebruik standaardargumenten om herhaling van argumenten te voorkomen

Luchtstroom waardoor een woordenboek met parameters kan worden doorgegeven dat beschikbaar zou zijn voor alle taken in die DAG.

Bij DataReply gebruiken we bijvoorbeeld BigQuery voor al onze DataWareshouse-gerelateerde DAG's en in plaats van parameters zoals labels, bigquery_conn_id door te geven aan elke taak, geven we het gewoon indefault_args-woordenboek door zoals getoond in de onderstaande DAG.

Dit is ook handig als u meldingen wilt over afzonderlijke taakstoringen in plaats van alleen DAG-storingen die ik al heb genoemd in mijn laatste blogpost over de integratie van slack-meldingen in Airflow.

(4) Het argument "params"

"Params" is een woordenboek met parameters op DAG-niveau die toegankelijk zijn gemaakt in sjablonen. Deze params kunnen op taakniveau worden opgeheven.

Dit is een uiterst nuttig argument en ik heb het persoonlijk veel gebruikt, omdat het toegankelijk is in het sjabloonveld met jinja-sjabloonplanning met behulp van params.param_name. Een voorbeeldgebruik is als volgt:

Het maakt het gemakkelijk voor u om geparametreerde DAG te schrijven in plaats van hardcoderende waarden. Zoals in de bovenstaande voorbeelden wordt getoond, kan het woordenboek van paramen op 3 plaatsen worden gedefinieerd: (1) In DAG-object (2) In woordenboek default_args (3) Elke taak.

(5) Gevoelige gegevens opslaan in Verbindingen

De meeste gebruikers zijn hiervan op de hoogte, maar ik heb nog steeds wachtwoorden in platte tekst in de DAG gezien. In godsnaam, doe dat niet. U moet uw DAG's zo schrijven dat u voldoende vertrouwen hebt om uw DAG's in een openbare repository op te slaan.

Standaard bewaart Airflow de wachtwoorden voor de verbinding in platte tekst in de metadatadatabase. Het cryptopakket wordt ten zeerste aanbevolen tijdens de installatie van Airflow en kan eenvoudig worden gedaan door pip install apache-airflow [crypto].

U kunt het dan als volgt gemakkelijk openen:

vanuit airflow.hooks.base_hook import BaseHook
slack_token = BaseHook.get_connection ('slack'). wachtwoord

(6) Beperk het aantal luchtstroomvariabelen in uw DAG

Luchtstroomvariabelen worden opgeslagen in Metadata-database, dus elke aanroep van variabelen zou een verbinding met Metadata DB betekenen. Uw DAG-bestanden worden elke X seconden geparseerd. Het gebruik van een groot aantal variabelen in uw DAG (en erger in default_args) kan betekenen dat u mogelijk het aantal toegestane verbindingen met uw database verzadigt.

Om deze situatie te voorkomen, kunt u een enkele Airflow-variabele met JSON-waarde gebruiken. Aangezien een Airflow-variabele JSON-waarde kan bevatten, kunt u al uw DAG-configuratie opslaan in een enkele variabele, zoals weergegeven in de onderstaande afbeelding:

Zoals getoond in deze screenshot kunt u waarden opslaan in afzonderlijke Airflow-variabelen of onder een enkele Airflow-variabele als een JSON-veld

U kunt ze vervolgens openen zoals hieronder weergegeven onder Aanbevolen manier:

(7) Het woordenboek "context"

Gebruikers vergeten vaak de inhoud van het contextwoordenboek wanneer ze PythonOperator gebruiken met een opvraagbare functie.

De context bevat verwijzingen naar gerelateerde objecten voor de taakinstantie en is gedocumenteerd onder de macrosectie van de API, aangezien deze ook beschikbaar zijn voor het sjabloonveld.

{
      'dag': task.dag,
      'ds': ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'prev_ds': prev_ds,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      'ts': ts,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'yesterday_ds': gisteren_ds,
      'yesterday_ds_nodash': yesterday_ds_nodash,
      'tomorrow_ds': tomorrow_ds,
      'tomorrow_ds_nodash': tomorrow_ds_nodash,
      'END_DATE': ds,
      'einddatum': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'execution_date': self.execution_date,
      'vorige_uitvoeringsdatum': vorige_uitvoeringsdatum,
      'next_execution_date': next_execution_date,
      'laatste_datum': ds,
      'macro's': macro's,
      'params': params,
      'tabellen': tabellen,
      'taak': taak,
      'task_instance': zelf,
      'ti': zelf,
      'task_instance_key_str': ti_key_str,
      'conf': configuratie,
      'test_mode': self.test_mode,
      'var': {
          'waarde': VariableAccessor (),
          'json': VariableJsonAccessor ()
      },
      'inlaten': task.inlets,
      'verkooppunten': task.outlets,
}

(8) Dynamische luchtstroomtaken genereren

Ik heb veel vragen beantwoord over StackOverflow over hoe dynamische taken te maken. Het antwoord is eenvoudig, u hoeft alleen een unieke taak-id te genereren voor al uw taken. Hieronder staan ​​2 voorbeelden om dit te bereiken:

(9) Voer “airflow upgradedb” uit in plaats van “airflow initdb”

Dank aan Ash Berlin voor deze tip in zijn lezing in de First Apache Airflow London Meetup.

airflow initdb maakt alle standaardverbindingen, grafieken enz. die we misschien niet gebruiken en die we niet in onze productiedatabase willen. airflow upgradedb past in plaats daarvan ontbrekende migraties gewoon toe op de databasetabel. (inclusief het maken van ontbrekende tabellen enz.) Het is ook veilig om elke keer uit te voeren, het volgt welke migraties al zijn toegepast (met behulp van de Alembic-module).

Laat het me in het commentaar hieronder weten als je iets weet dat het waard is om in dit blogbericht toe te voegen. Happy Airflow’ing :-)