This project provides an intelligent, two-stage agentic system built with the Google Cloud Agent Development Kit (ADK) to automate the migration of Apache Airflow DAGs from a source version to a target version.
The system leverages Gemini models, Vertex AI Search for Retrieval-Augmented Generation (RAG), and a custom-built knowledge base in BigQuery to provide accurate, context-aware code conversions.
The solution is composed of two primary agents that orchestrate a complex workflow:
-
Knowledge Base Agent (
knowledge_base_updater): This agent is responsible for the one-time, intensive process of building the migration knowledge base. It's a three-step pipeline:- Parse DAGs: Scans a GCS folder of your existing DAGs to identify the unique set of Airflow operators being used.
- Research & Scrape: For each operator, it uses Vertex AI Search (with Web Search datastore) to find relevant migration guides and documentation, then scrapes the content from these web pages.
- Structure & Store: It uses a Gemini model to analyze the scraped content, structure it into a predefined JSON schema (e.g., parameter changes, deprecation status, code examples), and stores this structured data in a BigQuery table.
-
Migration Assistant Agent (
airflow_migration_assistant): This is the main user-facing agent.- It first asks the user if the knowledge base has been built.
- If yes, it proceeds to the migration step.
- If no, it invokes the
knowledge_base_updateragent to build the knowledge base first. - For migration, it reads each source DAG file from GCS and uses a Gemini model with RAG (grounded on the BigQuery knowledge base via Vertex AI Search) to generate the migrated code, which is then saved to a destination GCS folder.
[User] -> [Migration Assistant Agent (Cloud Run)]
|
+ - Q: KB Exists?
|
+ - (No) -> [Knowledge Base Agent]
| |
| 1. Parse DAGs from [GCS]
| 2. Scrape Web Content
| 3. Structure with [Gemini] -> Store in [BigQuery]
| |
| +-> [Vertex AI Search] indexes BigQuery table
|
+ - (Yes) -> [DAG Converter Tool]
|
1. Read DAG from [GCS Source]
2. Generate migrated code with [Gemini + RAG via Vertex AI Search]
3. Write new DAG to [GCS Destination]
Before you begin, ensure you have the following:
- A Google Cloud Project with the Billing account enabled.
- Google Cloud SDK (
gcloud) installed and authenticated. - Terraform CLI installed.
- Python 3.11+ installed.
- Permissions: We assume that all necessary Google Cloud APIs and Service Account permissions have been provisioned by an administrator. Specifically, the following APIs must be enabled:
run.googleapis.comiam.googleapis.comiamcredentials.googleapis.comcloudbuild.googleapis.comartifactregistry.googleapis.combigquery.googleapis.comaiplatform.googleapis.comstorage-component.googleapis.comdiscoveryengine.googleapis.com
- Additionally, ensure your deployment user/service account has the following IAM roles:
- Cloud Build Editor (
roles/cloudbuild.builds.editor) - Artifact Registry Admin (
roles/artifactregistry.admin) - Cloud Run Admin (
roles/run.admin) - Create Service Accounts (
roles/iam.serviceAccountCreator) - Project IAM Admin (
roles/resourcemanager.projectIamAdmin) - Service Account User (
roles/iam.serviceAccountUser) - Service Usage Consumer (
roles/serviceusage.serviceUsageConsumer) - Storage Admin (
roles/storage.admin) - BigQuery DataEditor (
roles/bigquery.dataEditor) - BigQuery JobUser (
roles/bigquery.jobUser) - Discovery Engine Editor (
roles/discoveryengine.user) - Service Usage Consumer (
roles/serviceusage.serviceUsageConsumer)
- Cloud Build Editor (
Consider your user has permissions on all the APIs and resources. If not, the given setup should take care of required access for user and service account.
Follow these steps to set up your local environment and provision the required infrastructure.
-
Clone the Repository:
git clone <your-repo-url> cd <your-repo-folder>
-
Install Dependencies: We recommend using
uvfor dependency management. Run the following command to install dependencies:uv sync --dev
(Alternatively, you can use
pipwith a virtual environment.) -
Configure Environment: Create a
.envfile in the project root based on.env.exampleand fill in the values with your project details:PROJECT_ID: Your Google Cloud Project ID.USER_EMAIL: Your Google Cloud user email (used for IAM bindings).TF_STATE_BUCKET_NAME: A unique name for the GCS bucket to store Terraform state.- Other variables can be left as defaults or customized.
We provide a Makefile to automate the provisioning of all required Google Cloud infrastructure and Vertex AI Search resources for local testing.
Prerequisites:
- Ensure you have authenticated with Google Cloud:
gcloud auth application-default login
- Ensure you have the required permissions listed in the Prerequisites section.
To deploy the infrastructure, run the following command from the project root:
make deployThis command will:
- Source your
.envfile. - Check if the Terraform state bucket exists, and create it if it does not.
- Initialize and apply Terraform to create:
- GCS buckets for source and destination DAGs.
- BigQuery dataset and table for the migration corpus.
- Sample Airflow 1.10 DAGs in the source bucket for testing.
- Necessary IAM role bindings for the default Compute service account and your user account.
- Run a Python script to automate the creation of Vertex AI Search resources:
- Create a Website Data Store with predefined Airflow documentation sources.
- Create a BigQuery Data Store pointing to the newly created table.
- Create a Search App and link it to the Website Data Store.
Upon completion, you will see a summary of the created resources.
Note: This automated setup is intended for local development and testing. For production deployments, refer to the Deployment to Cloud Run section.
You can also use the Agent Starter Pack to create a production-ready version of this agent with additional deployment options:
# Create and activate a virtual environment
python -m venv .venv && source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install the starter pack and create your project
pip install --upgrade agent-starter-pack
agent-starter-pack create my-airflow-migration-agent -a adk@airflow_version_upgrade_agent⚡️ Alternative: Using uv
If you have uv installed, you can create and set up your project with a single command:
uvx agent-starter-pack create my-airflow-migration-agent -a adk@airflow_version_upgrade_agentThis command handles creating the project without needing to pre-install the package into a virtual environment.
The starter pack will prompt you to select deployment options and provides additional production-ready features including automated CI/CD deployment scripts.
Authenticate your local environment and run the ADK server:
# Authenticate
gcloud auth application-default login
# Make a call to adk web from the **agents/airflow_agents** directory
adk webYou can now interact with your agent in the ADK web UI, typically at http://127.0.0.1:8000.
This agent is designed to be deployed as a containerized service on Cloud Run via the Agent Development Kit (ADK).
If you plan to deploy this agent to Cloud Run (e.g., using adk deploy), ensure the deploying user or service account has the following additional permissions:
- Cloud Build Editor (
roles/cloudbuild.builds.editor) - Artifact Registry Admin (
roles/artifactregistry.admin) - Cloud Run Admin (
roles/run.admin) - Service Account User (
roles/iam.serviceAccountUser) on the runtime service account (e.g., the default Compute Engine service account if used).
These permissions are required because adk deploy typically builds a container image using Cloud Build, pushes it to Artifact Registry, and then creates a Cloud Run service.
Note
These permissions were ignored in the core Terraform scripts as they are only needed for remote deployment, and the agent is assumed to be running in a Google Cloud environment (like Cloud Shell) with sufficient permissions for local execution/testing.
- Set Deployment Variables:
The variables needed for the deployment are captured in the .env.example file.
Copy the example file to .env and configure your variables:
cp .env.example .env
# Open .env and fill in your details (PROJECT_ID, GOOGLE_CLOUD_LOCATION, SERVICE_NAME, etc.)
# Make sure to source your env variables before proceeding, or let your deployment pipeline load them.
source .env- Deploy to Cloud Run:
Deploy the container, passing configuration as environment variables.
# Run this command from your **`agents`** directory
adk deploy cloud_run \
--project=$PROJECT_ID \
--region=$GOOGLE_CLOUD_DEPLOY_LOCATION \
--service_name=$CLOUD_RUN_SERVICE_NAME \
--app_name=$CLOUD_RUN_APP_NAME \
--with_ui \
--port 8000 \
$AGENT_PATHDuring the deployment process, you might be prompted: Allow unauthenticated invocations to [your-service-name] (y/N)?.
- Enter
yto allow public access to your agent's API endpoint without authentication. - Enter
N(or press Enter for the default) to require authentication.
Upon successful execution, the command will deploy your agent to Cloud Run and provide the URL of the deployed service.
export APP_URL="YOUR_CLOUD_RUN_SERVICE_URL"
export TOKEN=$(gcloud auth print-identity-token)curl -X GET -H "Authorization: Bearer $TOKEN" $APP_URL/list-appsInitialize or update the state for a specific user and session. Replace $APP_NAME with your actual app name if different.
curl -X POST -H "Authorization: Bearer $TOKEN" \
$APP_URL/apps/$APP_NAME/users/user_123/sessions/session_abcd \
-H "Content-Type: application/json" \
-d '{"state": {"preferred_language": "English", "visit_count": 5}}'Send a prompt to your agent. Adjust the user/session IDs and prompt as needed.
curl -X POST -H "Authorization: Bearer $TOKEN" \
$APP_URL/run_sse \
-H "Content-Type: application/json" \
-d '{
"app_name": "airflow_version_upgrade_app",
"user_id": "user_123",
"session_id": "session_abcd",
"new_message": {
"role": "user",
"parts": [{
"text": "Help me migrate airflow"
}]
},
"streaming": false
}'Set "streaming": true if you want to receive Server-Sent Events (SSE).
- Navigate to your Cloud Run Services in the Google Cloud Console.
- Click on the deployed application URL for the
airflow-migration-agent. - This will open the ADK web interface and start a new chat session.
- Begin a conversation with the agent using the chat interface!
Here is a step-by-step example of a successful interactive migration session:
🧑 User:
Hi
🤖 Agent:
[Introduces itself and asks for details to upgrade to the next version]
- Do you have a knowledge base created already?
- If not, I can create it for you with the required inputs.
🧑 User:
No, help me create one.
🤖 Agent:
[Asks for the details required to create a knowledge base] Please provide:
- GCS URI where your input DAGs reside
- Source Airflow version
- Destination Airflow version
- Your Google Cloud Project ID
🧑 User:
source_airflow_version="1.10" target_airflow_version="2.10.5" source_gcs_uri="gs://<your-source-bucket-name>/dags/" project_id="<project-id>"
🤖 Agent:
[Invokes the 3-stage knowledge-building process (detailed in the operational guide). Once completed, it verifies the outcome] The knowledge base has been built! Would you like to proceed with the migration, or stop here?
🧑 User:
Yes, let's proceed with migration.
destination_gcs_uri="gs://<your-destination-bucket-name>/output/" project_id="<project-id>" collection_id="<collection-id>" data_store_id="<datastore-id>"
🤖 Agent:
[Executes the DAG conversion using RAG and grounded responses] Migration complete! The output GCS bucket now contains your migrated DAGs.
