To interact with internal operators we do need to use results like as global variables, instead in Apache Airflow variables and XCom (cross communication) are used. While returning values, it is sometimes needed to call returned values from other functions. In Apache Airflow returned values are stored externally, stated external storage is key-value metadata database.
XCom and variables are the same overall, both are stored externally as key-value format, the main difference is 'Variables do not store conditions that led to a value being produced'.
XCom cannot be used for passing large data sets between tasks. The limit for the size of the XCom is determined by which metadata database you are using:
- Postgres: 1 Gb
- SQLite: 2 Gb
- MySQL: 64 Kb
What if the data we need to pass is larger?
Answer is to use intermediary data storage. It means saving our data to some system external to Airflow at the end of one task, then reading it in from that system in the next task. This is commonly done using cloud file storage such as S3, GCS, Azure Blob Storage, etc., but it could also be done by loading the data in either a temporary or persistent table in a database.
Scenario: Let's assume we have function calculates multiplication, we do need to write its result into the flat file.
Solution: Dictionary valued **kwargs argument should be added in both pusher and puller functions. In pusher function, which is named 'calculate' in our scenario, **kwargs will be written besides main arguments, then inside the function we shall use .xcom_push() method to produce key-value information and store it inside xcom database.
Afterward in puller function, which is named 'result2file' in our scenario, same argument will be used, inside the function .xcom_pull() method is called to pull key's value from xcom database. In the Python Operator, where puller function will be operated, no need any arguments to pass.
After initialize airflow db and executing the task, we can preview produced xcom values under Admin>XComs menu.
Finally, here it is the executed dag preview and written calculation result inside flat file.
should set provide_contex=True in write_to_file = PythonOperator( *)
ReplyDelete