![]() Pretty messy, very limited endpoints, you don’t even know exactly what you will get in return for some of them. Stay tuned to the following article in which we'll discuss Airflow and Amazon S3 connection.I think we can agree that this is not the best documentation for an API. You know how to do it by now, and it will come as a good practice. I'll leave task dependencies and running the task through the Airflow webserver up to you. The best practice is to check if the API is available first, so you don't think there's something wrong with the parsing logic if it isn't. Today you've learned how to communicate with REST APIs in Apache Airflow. ![]() Let's make a brief summary before wrapping things up. Here's what it contains: Image 6 - Saved posts in JSON format (image by author)Īll of the fetched posts were saved to the JSON file, which means all of our tasks work as advertised. It looks like everything went well, which means you should see a posts.json file in the data folder. Use the following shell command to test the task: airflow tasks test api_dag save_posts Image 5 - Testing the task for saving posts (image by author) With open('/Users/dradecic/airflow/data/posts.json', 'w') as f: Both the task and function code is displayed below: import json This means we can now write another task that uses PythonOperator to save the data locally in JSON format. How can we access it? That's the best part - Airflow stores the data in XComs behind the scenes ( Admin - XComs): Image 4 - Posts pushed to XComs (image by author) You can see a JSON array with multiple JSON objects, indicating the data was fetched successfully. Let's test this task as well: airflow tasks test api_dag get_posts Image 3 - Collecting data from a REST API in Airflow (image by author) Response_filter=lambda response: json.loads(response.text), It'll make a GET request to the posts/ endpoint and return the result back as JSON: with DAG(.) as dag: The SimpleHttpOperator takes care of that. The API is up and running, which means we can extract the data next. Use the following command: airflow tasks test api_dag is_api_available Image 2 - Testing the HttpSensor Airflow task (image by author) Use the following code to declare an HttpSensor - it checks if an API declared earlier in the Airflow configuration is running for the given endpoint - posts/: with DAG(.) as dag: ![]() Not checking if API is available could result in you searching for bugs in the wrong places. Why? The reason is simple - by dividing the logic into two tasks (one checks if the API is available and the other fetches the data) you can know if the DAG failed because API wasn't up or because there was an error in your code. Sometimes the website you're connecting to is down, and you should always check for it. You now have everything needed to extract the data, so let's do that next.Ī good practice when working with external APIs is to check if they are available first. Feel free to change the connection ID and description, but leave the connection type and host as shown on the image. The DAG will extract posts JSON data from .in website, which serves as a dummy REST API for testing purposes. Click on the plus sign to add a new connection, and specify the connection parameters as shown on the image below: Image 1 - Define the HTTP connection in Airflow (image by author) The next step is to open the Airflow homepage and go under Admin - Connections. For now, paste the following boilerplate to setup the DAG: import jsonįrom .http import HttpSensorįrom .http import SimpleHttpOperatorįrom import PythonOperator It will contain the logic for the DAG we'll write today. After doing so, create a new Python file in the dags folder - I've named mine api_dag.py. You won't see it straight away on the Airflow homepage, so you'll have to restart both the webserver and the scheduler. To start, you'll have to install the HTTP provider for Airflow using the following command: pip install 'apache-airflow-providers-http' How to Configure Airflow for Communicating with REST APIs Today you'll learn all about it.ĭon't feel like reading? Watch my video instead: How should you approach this task then? Well, Airflow has a dedicated SimpleHttpOperator built-in, and it allows you to effectively communicate with external APIs. In an earlier article, you saw how to handle API calls with the PythonOperator, but I mentioned it's not a recommended method. If you're an avid Apache Airflow user, there are multiple ways you can approach this. What do 90% of data pipelines have in common? You've guessed it - extracting and transforming data from REST APIs. Build a data pipeline that connects to a remote REST API in 5 minutes
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |