{"id":62967,"date":"2024-09-02T13:20:49","date_gmt":"2024-09-02T07:50:49","guid":{"rendered":"https:\/\/www.tothenew.com\/blog\/?p=62967"},"modified":"2024-09-04T09:26:11","modified_gmt":"2024-09-04T03:56:11","slug":"etl-extract-transform-load-with-apache-airflow","status":"publish","type":"post","link":"https:\/\/www.tothenew.com\/blog\/etl-extract-transform-load-with-apache-airflow\/","title":{"rendered":"Automating ETL Workflows with Apache Airflow: An Universal Solution for Data Management"},"content":{"rendered":"<h2>Introduction<\/h2>\n<p>Vast about of data is being processed daily in various manners like financial transactions, customer interactions, sensors, research results, and so on. For instance, pharmaceutical companies produce millions of data points just from clinical trials alone. This huge amount of data has to be handled with accuracy and speed. Otherwise it can result in complications such as slow regulatory approvals and loss of business opportunities.<\/p>\n<h2>Revolutionizing Pharmaceutical Data Workflows with Apache Airflow<\/h2>\n<p>Pharmaceutical companies rely on massive amounts of information that must be carefully managed at several stages from the early research and development, clinical testing, and final regulatory submission. Good management and automation of such data processes ensures accuracy and standards, at the same time avoids costly delays in bringing new drugs to market.<\/p>\n<p>Apache Airflow is an industry standard open-source ETL tool that helps in managing complicated workflow which makes it suitable for usage\u00a0 within the pharmaceutical industry as well. We will be showcasing an example of Airflow based development to help automate clinical trial data processing.<\/p>\n<h2>Business Use Case: Automated Clinical Trial Data Processing<\/h2>\n<p>A pharmaceutical company is running several clinical trials at the same time. The extent of data generated in each clinical trial is extensive. It requires systematic processing, cleaning, and analysis. The company intend to use an automated solution to drive ETL(Extract-Transform-Load) for clinical trial data management, thus ensuring timely and precise reporting to regulatory agencies and other stakeholders.<\/p>\n<p><strong>Problems<\/strong>:<\/p>\n<ul>\n<li><span style=\"text-decoration: underline;\">Data Volume and Complexity<\/span>: Reconciliation of millions of data points without human intervention<\/li>\n<li><span style=\"text-decoration: underline;\">Compliance<\/span>: Ensure accuracy of data and time-bound filing with different regulatory authorities<\/li>\n<li><span style=\"text-decoration: underline;\">Resource Allocation<\/span>: Engage manual efforts spent on data processing to strategic activities<\/li>\n<\/ul>\n<p>By automating the ETL process with Apache Airflow, the company can address these challenges effectively, improving operational efficiency, enhancing data accuracy, and ensuring compliance.<\/p>\n<p>&nbsp;<\/p>\n<div id=\"attachment_62962\" style=\"width: 791px\" class=\"wp-caption aligncenter\"><img aria-describedby=\"caption-attachment-62962\" decoding=\"async\" loading=\"lazy\" class=\"wp-image-62962 size-full\" src=\"https:\/\/www.tothenew.com\/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio.png\" alt=\"airflow-workflow-diagram\" width=\"781\" height=\"781\" srcset=\"\/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio.png 781w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-300x300.png 300w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-150x150.png 150w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-768x768.png 768w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-624x624.png 624w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-120x120.png 120w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-24x24.png 24w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-48x48.png 48w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/Untitled-Diagram.drawio-96x96.png 96w\" sizes=\"(max-width: 781px) 100vw, 781px\" \/><p id=\"caption-attachment-62962\" class=\"wp-caption-text\">airflow-workflow-diagram<\/p><\/div>\n<p style=\"text-align: center;\">The above Diagram shows the workflow of processing the clinical data using airflow.<\/p>\n<p>Now, let&#8217;s walk through how Apache Airflow can be used in automating the clinical trial data processing workflow. We&#8217;re going to create a dummy workflow (DAG) to illustrate steps to achieve this with example data.<\/p>\n<h2><b>Implementation of <\/b><b>Clinical Trial Data Processing<\/b><b> with Apache Airflow<\/b><\/h2>\n<h3>What is Apache Airflow?<\/h3>\n<p><span style=\"font-weight: 400;\">Apache Airflow is an open-source platform designed for programmatic authorization, scheduling, and monitoring of workflows. Users can create DAGs where each node is a task interconnected by edges, which define dependencies among nodes. Originally developed at Airbnb and handed over to the Apache Software Foundation, Airflow is currently used to help companies in various sectors to manage complex workflows.<\/span><\/p>\n<h3>Key Features of Apache Airflow<\/h3>\n<ul>\n<li><strong>Dynamic<\/strong>: Airflow workflows are defined in Python, hence allowing the generation of pipelines dynamically. This allows one to use complex logic for pipeline creation.<\/li>\n<li><strong>Scalable<\/strong>: It can scale to handle an increasing number of tasks by using a distributed execution engine.<\/li>\n<li><strong>Extensible<\/strong>: Airflow has an architecture designed for easy addition of new operators and sensors, hooks\u2014making it highly customizable.<\/li>\n<li><strong>UI<\/strong>: Airflow has a rich user interface so that pipelines can be visualized, progress can be tracked in real time, and issues can be debugged. Additionally, Airflow allows both the manual triggering of tasks and easy reruns of failed tasks.<\/li>\n<li><strong>Scheduler<\/strong>: Airflow&#8217;s scheduler will run tasks in the appropriate order. Retries will be handled; workflows should run on time or when certain external events occur.<\/li>\n<li><strong>Integrations<\/strong>: Airflow integrates with a wide variety of services and systems, including databases, cloud storage, data warehouses, and many others.<\/li>\n<\/ul>\n<p><strong>Prerequisite:<\/strong><br \/>\n&#8211; Python should be installed on the system (python 3.7 or later version)<\/p>\n<p><strong>Step 1: Setting Up Airflow<\/strong><\/p>\n<p>First, install Apache Airflow using the following commands:<\/p>\n<pre>\u2013 pip install apache-airflow\r\n\u2013 airflow db init\r\n\u2013 airflow users create \\\r\n      --username admin \\\r\n      --firstname Admin \\\r\n      --lastname User \\\r\n      --role Admin \\\r\n      --email admin@example.com<\/pre>\n<p>Additionally, install the necessary packages to interact with S3, Postgres, or Redshift<\/p>\n<pre>\u2013 pip install boto3 pandas psycopg2-binary<\/pre>\n<p>Start the Apache Airflow service<\/p>\n<pre>\u2013 airflow webserver --port 8080\r\n\u2013 airflow scheduler<\/pre>\n<p><strong>Step 2: Creating the Workflow (Dag)<\/strong><\/p>\n<p>A DAG is a Directed Acyclic Graph in Airflow, visualizing a workflow; every node represents a task, while edges define dependencies between tasks.<\/p>\n<p>The following is a dummy DAG for the processing workflow of clinical trial data:<\/p>\n<pre>from airflow import DAG\r\nfrom airflow.operators.dummy import DummyOperator\r\nfrom airflow.operators.python import PythonOperator\r\nfrom airflow.models import Variable\r\nfrom datetime import datetime\r\nimport boto3\r\nimport pandas as pd\r\nimport psycopg2\r\n\r\n#session\r\nsession = boto3.Session(\r\naws_access_key_id=Variable.get('Access key'),\r\naws_secret_access_key=Variable.get('Secret access key'),\r\n)\r\n\r\n# Configuration for S3 and PostgreSQL\r\nS3_BUCKET_NAME = 'airflow-demo-bucket1'\r\nS3_KEY = 'source\/clinical_data.csv'\r\nCLEANED_DATA_KEY = 'source\/cleaned_data\/cleaned_clinical_data.csv'\r\nANALYSIS_SUMMARY_KEY = 'path\/to\/clinical_analysis_summary.csv'\r\nPOSTGRES_HOST = 'localhost'\r\nPOSTGRES_PORT = '5432'\r\nPOSTGRES_DB = 'clinicTrial'\r\nPOSTGRES_USER = 'prince'\r\nPOSTGRES_PASSWORD = '12345'\r\nPOSTGRES_TABLE = 'clinical_trial_data'\r\n\r\ndef clean_column(df, column_name):\r\n       for x in df.index:\r\n           if df.loc[x, column_name] == '.' or df.loc[x, column_name] == ' ':\r\n           df.loc[x, column_name] = None\r\n\r\ndef extract_clinical_data():\r\n    # Extract clinical trial data from S3\r\n    print(\"Extracting clinical trial data from S3...\")\r\n    s3 = session.client('s3')\r\n    response = s3.get_object(Bucket=S3_BUCKET_NAME, Key=S3_KEY)\r\n    clinical_data = pd.read_csv(response['Body'])\r\n    clinical_data.to_csv('\/tmp\/clinical_data.csv', index=False)\r\n    print(\"Data extraction complete.\")\r\n\r\ndef clean_clinical_data():\r\n    # Clean clinical trial data\r\n    print(\"Cleaning clinical trial data...\")\r\n    clinical_data = pd.read_csv('\/tmp\/clinical_data.csv')\r\n    clinical_data['Specimen_date'] = clinical_data['Specimen_date'].str.strip() \r\n    clinical_data['Specimen_date'] = pd.to_datetime(clinical_data['Specimen_date'], format='mixed')\r\n    clean_column(clinical_data, \"Date_of_Death\")\r\n    clinical_data['Date_of_Death'] = pd.to_datetime(clinical_data['Date_of_Death'], format='mixed')\r\n    clinical_data['Date_of_Last_Follow_Up'] = pd.to_datetime(clinical_data['Date_of_Last_Follow_Up'], format='mixed')\r\n    clinical_data['Time'] = pd.to_numeric(clinical_data['Time'])\r\n    clinical_data['Event'] = clinical_data['Event'].astype(int) \r\n    clinical_data.to_csv('\/tmp\/cleaned_clinical_data.csv', index=False)\r\n    # Upload the cleaned data to S3\r\n    s3 = session.client('s3')\r\n    s3.upload_file('\/tmp\/cleaned_clinical_data.csv', S3_BUCKET_NAME, CLEANED_DATA_KEY)\r\n    print(\"Data cleaning complete.\")\r\n\r\ndef analyze_clinical_data():\r\n    # Analyze clinical trial data\r\n    print(\"Analyzing clinical trial data...\")\r\n    s3 = session.client('s3')\r\n    s3.download_file(S3_BUCKET_NAME, CLEANED_DATA_KEY, '\/tmp\/cleaned_clinical_data.csv')\r\n    cleaned_data = pd.read_csv('\/tmp\/cleaned_clinical_data.csv')\r\n    # Calculate survival time for each patient\r\n    cleaned_data['Specimen_date'] = pd.to_datetime(cleaned_data['Specimen_date'], format='mixed')\r\n    cleaned_data['Date_of_Death'] = pd.to_datetime(cleaned_data['Date_of_Death'], format='mixed')\r\n    cleaned_data['Survival_Time'] = (cleaned_data['Date_of_Death'] - cleaned_data['Specimen_date']).dt.days\r\n    summary = cleaned_data.groupby('Stage')['Survival_Time'].mean()\r\n    summary.to_csv('\/tmp\/clinical_analysis_summary.csv')\r\n    # Upload the analysis summary to S3\r\n    s3.upload_file('\/tmp\/clinical_analysis_summary.csv', S3_BUCKET_NAME, ANALYSIS_SUMMARY_KEY)\r\n    print(f\"Data analysis complete. Summary:\\n{summary}\")\r\n\r\ndef load_data_to_postgres():\r\n    # Load data into PostgreSQL\r\n    print(\"Loading data into PostgreSQL...\")\r\n    s3 = session.client('s3')\r\n    s3.download_file(S3_BUCKET_NAME, CLEANED_DATA_KEY, '\/tmp\/cleaned_clinical_data.csv')\r\n    cleaned_data = pd.read_csv('\/tmp\/cleaned_clinical_data.csv')\r\n    cleaned_data.dropna(inplace=True)\r\n    conn = psycopg2.connect(\r\n      dbname=POSTGRES_DB,\r\n      user=POSTGRES_USER,\r\n      password=POSTGRES_PASSWORD,\r\n      host=POSTGRES_HOST,\r\n      port=POSTGRES_PORT\r\n    )\r\n    cursor = conn.cursor()\r\n    # Create table if not exists (for simplicity)\r\n    create_table_query = f\"\"\"\r\n       CREATE TABLE IF NOT EXISTS {POSTGRES_TABLE} (\r\n       PatientID INT,\r\n       Specimen_date DATE,\r\n       Dead_or_Alive VARCHAR(5),\r\n       Date_of_Death DATE,\r\n       Date_of_Last_Follow_Up DATE,\r\n       sex VARCHAR(1),\r\n       race VARCHAR(1),\r\n       Stage VARCHAR(10),\r\n       Event INT,\r\n       Time INT\r\n       );\r\n      \"\"\"\r\n      cursor.execute(create_table_query)\r\n      # Insert data into the table\r\n      for _, row in cleaned_data.iterrows():\r\n             insert_query = f\"\"\"\r\n              INSERT INTO {POSTGRES_TABLE} (PatientID, Specimen_date, Dead_or_Alive, Date_of_Death, Date_of_Last_Follow_Up, sex, race, Stage, Event, Time)\r\n              VALUES ({row['PatientID']}, '{row['Specimen_date']}', '{row['Dead_or_Alive']}', '{row['Date_of_Death']}', '{row['Date_of_Last_Follow_Up']}', '{row['sex']}', '{row['race']}', '{row['Stage']}', {row['Event']}, {row['Time']});\r\n             \"\"\"\r\n      cursor.execute(insert_query)\r\n      conn.commit()\r\n      cursor.close()\r\n      conn.close()\r\n      print(\"Data loaded into PostgreSQL.\")\r\n\r\ndefault_args = {\r\n'owner': 'airflow',\r\n'depends_on_past': False,\r\n'start_date': datetime(2024, 7, 9),\r\n'retries': 1,\r\n}\r\n\r\nwith DAG('clinical_trial_data_processing', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:\r\n         start = DummyOperator(task_id='start')\r\n         extract = PythonOperator(\r\n                task_id='extract_clinical_data',\r\n                python_callable=extract_clinical_data\r\n               )\r\n\r\n         clean = PythonOperator(\r\n               task_id='clean_clinical_data',\r\n               python_callable=clean_clinical_data\r\n               )\r\n\r\n          analyze = PythonOperator(\r\n                task_id='analyze_clinical_data',\r\n                python_callable=analyze_clinical_data\r\n               )\r\n\r\n          load = PythonOperator(\r\n              task_id='load_data_to_postgres',\r\n              python_callable=load_data_to_postgres\r\n               )\r\n\r\n          end = DummyOperator(task_id='end')\r\n          start &gt;&gt; extract &gt;&gt; clean &gt;&gt; analyze &gt;&gt; load &gt;&gt; end<\/pre>\n<p><strong>Step 3: Running the Workflow<\/strong><\/p>\n<ol>\n<li>Create a dags folder and save the DAG file as clinical_trial_data_processing_dag.py in the dags directory of your Airflow installation.<\/li>\n<li>Start the Airflow webserver and scheduler if they are not already running<\/li>\n<li><span style=\"font-weight: 400;\">airflow webserver &#8211;port 8080<\/span><\/li>\n<li><span style=\"font-weight: 400;\">airflow scheduler<\/span><\/li>\n<li><span style=\"font-weight: 400;\">Access the Airflow web UI by navigating to <strong>http:\/\/localhost:8080<\/strong> in your web browser.<\/span><\/li>\n<li><span style=\"font-weight: 400;\">Enable the clinical_trial_data_processing DAG from the web UI and monitor its execution.<\/span><\/li>\n<\/ol>\n<p>&nbsp;<\/p>\n<div id=\"attachment_62963\" style=\"width: 1925px\" class=\"wp-caption aligncenter\"><img aria-describedby=\"caption-attachment-62963\" decoding=\"async\" loading=\"lazy\" class=\"size-full wp-image-62963\" src=\"https:\/\/www.tothenew.com\/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui.png\" alt=\"airflow-workflow-web-ui\" width=\"1915\" height=\"871\" srcset=\"\/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui.png 1915w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui-300x136.png 300w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui-1024x466.png 1024w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui-768x349.png 768w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui-1536x699.png 1536w, \/blog\/wp-ttn-blog\/uploads\/2024\/07\/airflow-workflow-web-ui-624x284.png 624w\" sizes=\"(max-width: 1915px) 100vw, 1915px\" \/><p id=\"caption-attachment-62963\" class=\"wp-caption-text\"><br \/>airflow-workflow-web-ui.png<\/p><\/div>\n<p>&nbsp;<\/p>\n<p><strong>Benefits gained by the Business:<\/strong><\/p>\n<p>Apache Airflow aids pharmaceutical companies to be able to:<\/p>\n<ul>\n<li><strong>Enhance Efficiency:<\/strong> Automate repetitive tasks and free up some of the most valuable resources<\/li>\n<li><strong>Improved Accuracy:<\/strong> Reduce human errors in data processing<\/li>\n<li><strong>Compliance:<\/strong> Ensure that reports are submitted on time and with accuracy to various regulatory bodies<\/li>\n<li><b>Scalability:<\/b><span style=\"font-weight: 400;\"> Handle increasing volumes of data with ease as clinical trials grow in size<\/span><\/li>\n<\/ul>\n<p><strong>Conclusion<\/strong><\/p>\n<p>Apache Airflow is positioned to help pharmaceutical companies transform data management from an overwhelming task into a streamlined system. It should be an important component in a company&#8217;s <span style=\"text-decoration: underline;\">Digital Transformation Toolkit<\/span> to drive innovation and achieve operational efficiency.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Introduction Vast about of data is being processed daily in various manners like financial transactions, customer interactions, sensors, research results, and so on. For instance, pharmaceutical companies produce millions of data points just from clinical trials alone. This huge amount of data has to be handled with accuracy and speed. Otherwise it can result in [&hellip;]<\/p>\n","protected":false},"author":1565,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"iawp_total_views":67},"categories":[5879],"tags":[4458,1358],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/62967"}],"collection":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/users\/1565"}],"replies":[{"embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/comments?post=62967"}],"version-history":[{"count":5,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/62967\/revisions"}],"predecessor-version":[{"id":65134,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/posts\/62967\/revisions\/65134"}],"wp:attachment":[{"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/media?parent=62967"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/categories?post=62967"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.tothenew.com\/blog\/wp-json\/wp\/v2\/tags?post=62967"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}