กลับมาอีกครั้งกับ Blog ของผมแค่จะมาลง Blog โดยการที่ลองนำโจทย์การแข่งขัน TCTT หรือ Thailand Top Talent ของปี 2025 ให้ chatgpt solve โจทย์หมวด Network
โจทย์หมวด network มีทั้งหมด 4 ข้อดังนี้
- Meet the New Router 100 คะแนน
- Meet the Upgraded Router 100 คะแนน
- Whispers in the wire 200 คะแนน
- Custom Protocol v2 300 คะแนน
มาเริ่มที่ข้อแรก
Meet the New Router
- ไฟล์ให้วิเคราะห์:
thctt2025_open_netsec1_new-router.pcapng - ประเภทโจทย์: Forensics / Network (PCAP)
- เป้าหมาย: ไล่ดึงค่า Data จากทุกแพ็กเก็ต ICMPv6 Echo Request (type=128) แล้วเรียงต่อกันเป็นข้อความ จากนั้นดึง flag ในรูปแบบ
flag{32-hex}(MD5)
จากสกรีนช็อตจะเห็นว่า:
- โปรโตคอล: ICMPv6
- Type = 128 (Echo request)
- มีฟิลด์ Data (1 byte) เช่นตัวอย่าง “Data: 66”
- Identifier =
0x1337, Sequence ไล่เพิ่มทีละ 1
-> น่าจะเป็นการส่งข้อความทีละ 1 ไบต์ ผ่าน Echo Request หลายๆ แพ็กเก็ต
วิธีทำ (สั้นที่สุดด้วย TShark one-liner)
ใช้ tshark ดึงฟิลด์ไบต์ดิบของ Data จากทุก Echo request แล้วแปลงเป็นข้อความ:
tshark -r thctt2025_open_netsec1_new-router.pcapng \ -Y 'icmpv6 && icmpv6.type==128 && data' \ -T fields -e data.data \| tr -d ':\r\n' \| xxd -r -p \| grep -aoE 'flag\{[0-9a-f]{32}\}'
คำอธิบาย:
-Y 'icmpv6 && icmpv6.type==128 && data'กรองเฉพาะ Echo Request ที่มีเลเยอร์ Data-T fields -e data.dataดึงค่าเฮกซ์ของ Data (Wireshark ใช้ฟิลด์data.data)tr -d ':\r\n'ลบ:และขึ้นบรรทัดใหม่ เพื่อให้เหลือเฮกซ์ติดกันxxd -r -pแปลงจากเฮกซ์ → ไบต์จริงgrep -aoE 'flag\{[0-9a-f]{32}\}'หาแพตเทิร์น flag{md5}
ผลลัพธ์ที่ได้:
Meet the Upgraded Router
ดูเหมือนว่าแฮกเกอร์จะอัปเกรด C2 (Command & Control) ที่ใช้ขโมยข้อมูลเป็นเวอร์ชันใหม่แล้ว ไม่แน่ว่า… payload ที่ส่งออกมาอาจจะไม่ได้มาแบบธรรมดา ๆ เหมือนรอบที่แล้ว… XOR สักหน่อยดีไหม?
- ไฟล์ให้วิเคราะห์:
thctt2025_open_netsec2_upgraded-router.pcapng - โปรโตคอล: ICMPv6 Echo (type 128/129)
- เทคนิค: รวม payload ตาม
Identifier/Sequence, ลอง raw / zlib / gzip / XOR (1-byte) - สตรีมที่ใช่: type = 128 (Echo Request), ident = 4919
ภาพรวมโจทย์
ไฟล์ PCAP ที่ได้มามีการซ่อนข้อมูลในทราฟฟิก ICMPv6 echo (ping v6)
ซึ่งเป็นแชนเนลที่ “ดูไม่ผิดปกติ” มากนักสำหรับระบบเฝ้าระวังทั่วไป ผู้ทำโจทย์ต้อง:
- แยก flow ตาม
(ICMPv6 type, Identifier) - ต่อ payload ตาม
Sequence - ทดลองถอดรหัส/บีบอัด/ถอด XOR แบบเบา ๆ
- หารูปแบบ
flag{…}
ขั้นตอนละเอียด (Step-by-Step)
1) ส่องทราฟฟิกให้รู้ว่า “เล่นที่ไหน”
เปิดด้วย Wireshark แล้วกรอง:
icmpv6.type == 128 || icmpv6.type == 129
- 128 = Echo Request
- 129 = Echo Reply
ลองเพิ่มคีย์กรอง icmpv6.echo.identifier == 4919 เพื่อเจาะ flow ที่น่าสงสัย:
icmpv6 && icmpv6.echo.identifier == 4919
เช็คแถบ Packet Details → ICMPv6 Echo → มี Identifier, Sequence number, และ Data (payload) อยู่
2) ดึง payload ออกมา (tshark)
ถ้าถนัด CLI ใช้ tshark ดึง data ของสตรีมที่สนใจเป็นเฮกซ์:
# กรองเฉพาะ echo req ที่ ident=4919 แล้วเอา data เป็น hex ต่อๆ กันทีละบรรทัดtshark -r capture.pcapng \ -Y "icmpv6.type == 128 && icmpv6.echo.identifier == 4919 && data" \ -T fields -e data > echo_hex.txt
ต่อเฮกซ์ให้เป็นบัฟเฟอร์ไบต์:
tr -d '\n' < echo_hex.txt > one_line_hex.txtxxd -r -p one_line_hex.txt > echo_payload.bin
หมายเหตุ: ถ้ามีหลายSequenceมักเรียงเวลาใน pcap อยู่แล้ว แต่ถ้าสุ่มหลุด ก็ sort ตาม frame number ก่อนcut/sort/awkค่อยแปะรวมกัน
3) ลองถอดหลายแบบเร็ว ๆ
เช็ค plain ก่อน:
strings -n 6 echo_payload.bin | head# หรือ grep หาแพทเทิร์นstrings -n 6 echo_payload.bin | grep -i "flag{"
ลองบีบอัดยอดฮิต:
# zlib (raw/with header) + gzippython3 - << 'PY'import zlib,sys,reb=open("echo_payload.bin","rb").read()cands=[]for w in (zlib.MAX_WBITS, -zlib.MAX_WBITS, 16+zlib.MAX_WBITS): try: out=zlib.decompress(b,w) cands.append(out) except: passfor i,buf in enumerate(cands): m=re.search(rb'flag\{[^}]+\}', buf) if m: print("hit zlib mode",i,":", m.group(0).decode())PY
ถัดมา ลอง XOR คีย์เดี่ยว (0..255):
python3 - << 'PY'import re,sysdata=open("echo_payload.bin","rb").read()for k in range(256): dec=bytes(b ^ k for b in data) m=re.search(rb'flag\{[^}]+\}', dec, re.I) if m: print(f"Found with XOR key=0x{k:02x}:", m.group(0).decode()) breakPY
ในเคสนี้ เจอ flag โผล่จากสตรีม (type=128, ident=4919) แบบชัดเจนโดยไม่ต้องทำอะไรซับซ้อน (raw scan ก็เห็น) แต่การเช็ค zlib/gzip/XOR เป็น safety net เผื่อคู่กรณีแอบอัด/เข้ารหัสมาด้วย
4) สคริปต์รวมแบบอัตโนมัติ (Scapy-style)
ถ้าอยากทำให้ reproducible ในขวดเดียว:
# test.pyfrom scapy.all import PcapNgReader, IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, rawimport re, zlib, syspcap = sys.argv[1] if len(sys.argv)>1 else "capture.pcapng"target_ident = 4919# 1) เก็บ payload ตาม seq เฉพาะ ident ที่สนใจseq2data = {}with PcapNgReader(pcap) as rd: for pkt in rd: if not pkt.haslayer(IPv6): continue if pkt.haslayer(ICMPv6EchoRequest) or pkt.haslayer(ICMPv6EchoReply): icmp = pkt.getlayer(ICMPv6EchoRequest) or pkt.getlayer(ICMPv6EchoReply) if getattr(icmp, "id", None) != target_ident: continue # Scapy: icmp.data เป็น bytes ของ payload seq2data[getattr(icmp, "seq", 0)] = bytes(icmp.data)# 2) ต่อ payload ตามลำดับ seqbuf = b"".join(v for k,v in sorted(seq2data.items()))def hunt(b: bytes): # plain m = re.search(rb'flag\{[^}]+\}', b, re.I) if m: return ("plain", m.group(0).decode()) # zlib/gzip for w in (zlib.MAX_WBITS, -zlib.MAX_WBITS, 16+zlib.MAX_WBITS): try: out = zlib.decompress(b, w) m = re.search(rb'flag\{[^}]+\}', out, re.I) if m: return (f"zlib({w})", m.group(0).decode()) except: pass # XOR 1-byte for k in range(256): dec = bytes(x ^ k for x in b) m = re.search(rb'flag\{[^}]+\}', dec, re.I) if m: return (f"xor 0x{k:02x}", m.group(0).decode()) return (None, None)how, flag = hunt(buf)print("Found via:", how)print("Flag:", flag)
รัน:
python3 test1.py thctt2025_open_netsec2_upgraded-router.pcapng
5) Validation
- รูปแบบแฟล็ก:
flag{+ 32 hex +}→ ตรงตามที่เจอ - ลอง cross-check ด้วยการค้นหาซ้ำในไฟล์ที่ประกอบแล้ว (
grep -a) เพื่อมั่นใจว่าไม่มี false positive
Pitfalls / Tips
- ICMPv6 echo มีทั้ง Request/Reply: ถ้าอีกฝั่ง “สะท้อน” payload กลับมา อาจเจอข้อมูลซ้ำสองชุด
- ลำดับ packet ใน PCAP ส่วนใหญ่เรียงตามเวลาอยู่แล้ว แต่ถ้าดรอป/สลับ ให้เรียงตาม
Sequence - ถ้าไม่เจอใน raw: ลอง
zlib,gzip, และ XOR single-byte (เป็นเทคนิคซ่อนที่ย่อยง่ายและเร็ว) - ท้ายสุด ถ้าผ่านทุกอย่างแล้วยังว่างเปล่า แปลว่าอาจมี layer ซ้อนอีกชั้น (เช่น CBOR/Protobuf/custom framing) ให้ลองชิ้นส่วนที่คาดว่าจะเป็น header ก่อน
ผลลัพธ์ที่ได้:
Whispers in the wire
ฐานทัพไซเบอร์แห่งใหม่เพิ่งถูกสร้างขึ้นโดยประเทศเพื่อนบ้าน พวกเขาใช้ระบบควบคุมแบบ gRPC over HTTP/2 เพื่อซ่อนคำสั่งลับและข้อมูลสำคัญระหว่าง นักวิจัยและเซิร์ฟเวอร์ทดสอบอาวุธ แต่โชคดี ที่หน่วยไซเบอร์ของเราได้ แอบเก็บ ไฟล์ PCAP ที่บันทึกการสื่อสารได้มาแล้ว 🎧 แรก ๆ มันดูเหมือนข้อมูลไร้ค่า ที่อ่านไม่ออกทันที, ข้อความยังดูเหมือนอาจจะถูกบีบอัดด้วย zlib ไว้ อีกต่างหาก… อย่างไรก็ตาม มีข่าวลือว่า ข้อความตอบกลับจาก ซ่อน flag เอาไว้! 🏴☠️ ภารกิจของคุณคือแกะรอยจากไฟล์ PCAP เพื่อดึง flag ที่ถูกซ่อนอยู่ให้ได้ โจทย์
Overview
- ไฟล์ที่ให้วิเคราะห์
thctt2025_open_netsec3_whisper-in-the-wire.pcapng thctt2025_open_netsec3_whisper-in-the-wire.proto - พาร์ส PCAPNG → TCP reassembly → HTTP/2 (cleartext)
- ดึง DATA frames (type=0x0) ตาม
stream_id - แตก gRPC messages (ฟอร์แมต
compressed_flag(1B) + length(4B, BE) + body) - ถ้า
compressed_flag==1ให้ลอง zlib/gzip - ค้นหา
flag{...}ในเพย์โหลด
🔎 Step-by-Step Walkthrough
1) ตรวจว่าทราฟฟิกเป็น HTTP/2 แบบเคลียร์ (h2c)
- เปิด Wireshark แล้วค้นหา Client Connection Preface:
PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n- หรือฟิลเตอร์:
http2
(ถ้าดูเป็น raw TCP ก็ยังเห็น frame header 9 ไบต์ของ HTTP/2)
2) รวบรวม TCP flows และประกอบสตรีม
- จับคู่ flow ตาม
(src, sport, dst, dport) - เรียงตาม TCP sequence แล้วประกอบบัฟเฟอร์สองทิศทาง (client→server / server→client)
3) พาร์เซ HTTP/2 frames
- เฮดเดอร์ 9 ไบต์/เฟรม:
len(3B BE) | type(1B) | flags(1B) | stream_id(31b)- เก็บเฉพาะ DATA frames (type=0x0) และ stream_id != 0
- ต่อ payload ของ DATA เฟรมตามลำดับที่พบ (ภายใน stream เดียวกัน)
4) แตก gRPC message framing
- gRPC บน HTTP/2 มี framing เพิ่ม 5 ไบต์ต่อ message:
compressed_flag(1B) | msg_length(4B BE) | msg_body(msg_length)- ถ้า
compressed_flag == 1ให้ลอง zlib/gzip (แม้ปกติ gRPC จะขึ้นกับค่าgrpc-encodingใน HEADERS แต่ในแคปเจอร์ CTF มักตรงไปตรงมา)
5) ค้นหา flag
- สแกน
msg_bodyหรือdecompressed_bodyด้วย regex: flag\{[^}]+\}
🧪 Quick commands (ถ้าชอบแนวเครื่องมือ)
- ดู HTTP/2 ใน Wireshark:
http2 - แยก field (เช่น
http2.streamid,http2.data.data) ด้วย tshark ก็ได้ แต่สำหรับกรณีนี้ ข้อดีของสคริปต์คือต่อและแตก message ให้อัตโนมัติ
🧰 โค้ดพร้อมใช้ซ้ำ (Python 3, no extra deps)
บันทึกเป็นไฟล์ grpc_h2_flag_extractor.py แล้วรันด้วย python3 grpc_h2_flag_extractor.py your.pcapng
#!/usr/bin/env python3import struct, zlib, re, sysfrom collections import defaultdictPCAPNG_SHB = 0x0A0D0D0APCAPNG_IDB = 0x00000001PCAPNG_EPB = 0x00000006PCAPNG_SPB = 0x00000003def read_pcapng(path): data = open(path, "rb").read() off = 0 interfaces = {} packets = [] while off + 12 <= len(data): btype, blen = struct.unpack_from("<II", data, off) if blen < 12 or off + blen > len(data): break block = data[off:off+blen] if btype == PCAPNG_IDB: if len(block) >= 20: linktype = struct.unpack_from("<H", block, 8)[0] iface_id = len(interfaces) interfaces[iface_id] = linktype elif btype == PCAPNG_EPB: if len(block) >= 32: iface_id = struct.unpack_from("<I", block, 8)[0] cap_len = struct.unpack_from("<I", block, 20)[0] pkt_data = block[28:28+cap_len] packets.append((interfaces.get(iface_id,1), pkt_data)) elif btype == PCAPNG_SPB: if len(block) >= 12: cap_len = struct.unpack_from("<I", block, 8)[0] pkt_data = block[12:12+cap_len] packets.append((1, pkt_data)) off += blen return packetsdef ipv6_addr(b): return ":".join(f"{int.from_bytes(b[i:i+2],'big'):x}" for i in range(0,16,2))def parse_ip_tcp(pkt): if len(pkt) < 14: return None eth_type = struct.unpack("!H", pkt[12:14])[0] if eth_type == 0x0800: # IPv4 ip = pkt[14:] if len(ip) < 20: return None ihl = (ip[0] & 0x0F) * 4 proto = ip[9] if proto != 6: return None total = struct.unpack("!H", ip[2:4])[0] body = ip[ihl:total] if total <= len(ip) else ip[ihl:] if len(body) < 20: return None sport, dport, seq, ack, off_flags = struct.unpack("!HHIIH", body[:14]) doff = (off_flags >> 12) * 4 return ("IPv4", sport, dport, seq, body[doff:]) elif eth_type == 0x86DD: # IPv6 ip = pkt[14:] if len(ip) < 40: return None plen = struct.unpack("!H", ip[4:6])[0] nh = ip[6] if nh != 6: return None body = ip[40:40+plen] if len(body) < 20: return None sport, dport, seq, ack, off_flags = struct.unpack("!HHIIH", body[:14]) doff = (off_flags >> 12) * 4 return ("IPv6", sport, dport, seq, body[doff:]) return Nonedef reassemble_tcp(segments): # segments: list[(seq, payload)] segs = sorted(segments, key=lambda x: x[0]) out = bytearray() cur = None for seq, blob in segs: if not blob: continue if cur is None: cur = seq if seq > cur: out.extend(b"\x00" * (seq - cur)) cur = seq overlap = cur - seq if overlap < 0: overlap = 0 add = blob[overlap:] out.extend(add) cur += len(add) return bytes(out)def parse_h2_frames(stream_bytes): frames = [] i = 0 preface = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" if stream_bytes.startswith(preface): i = len(preface) n = len(stream_bytes) while i + 9 <= n: length = int.from_bytes(stream_bytes[i:i+3], "big") ftype = stream_bytes[i+3] flags = stream_bytes[i+4] sid = int.from_bytes(stream_bytes[i+5:i+9], "big") & 0x7fffffff i += 9 if i + length > n: break payload = stream_bytes[i:i+length] i += length frames.append((ftype, flags, sid, payload)) return framesdef scan_grpc_messages(buf): hits = [] i = 0 n = len(buf) while i + 5 <= n: comp = buf[i] size = int.from_bytes(buf[i+1:i+5], "big") i += 5 if size < 0 or i + size > n: break body = buf[i:i+size] i += size cands = [body] if comp == 1: for w in (zlib.MAX_WBITS, -zlib.MAX_WBITS, 16+zlib.MAX_WBITS): try: cands.append(zlib.decompress(body, w)) except: pass for c in cands: m = re.search(rb'flag\{[^}]+\}', c, re.I) if m: hits.append(m.group(0).decode()) return hitsdef main(pcapng_path): packets = read_pcapng(pcapng_path) flows = defaultdict(list) # key: (src,sport,dst,dport) but we only need ports here for linktype, pkt in packets: p = parse_ip_tcp(pkt) if not p: continue _, sport, dport, seq, payload = p # สนใจเฉพาะ clear HTTP/2 (พอร์ตใดก็ได้) - ไม่จำกัดพอร์ตเพื่อความทั่วไป # เก็บเฉพาะมี payload if payload: flows[(sport, dport)].append((seq, payload)) # reassemble per direction, then parse h2 frames; collect DATA per stream_id DATA = 0x0 flags_found = set() for key, segs in flows.items(): stream = reassemble_tcp(segs) frames = parse_h2_frames(stream) data_by_sid = defaultdict(bytearray) for ftype, flags, sid, payload in frames: if ftype == DATA and sid != 0: data_by_sid[sid].extend(payload) for sid, data in data_by_sid.items(): for flag in scan_grpc_messages(bytes(data)): flags_found.add(flag) if not flags_found: print("No flag found.") else: for f in flags_found: print(f)if __name__ == "__main__": pcap = sys.argv[1] if len(sys.argv)>1 else "input.pcapng" main(pcap)
ใช้งาน
python3 test.py thctt2025_open_netsec3_whisper-in-the-wire.pcapng
🧷 Notes & Pitfalls
- gRPC compression มีทั้งระดับ message (ดู
compressed_flag) และระดับเชื่อมต่อ (grpc-encodingใน HEADERS) — สคริปต์นี้ลอง zlib/gzip อัตโนมัติให้แล้ว - HTTP/2 อาจแทรก HEADERS/SETTINGS/CONTINUATION — โค้ดนี้โฟกัส DATA เพียว ๆ (พอสำหรับ CTF/แคปเจอร์นี้)
- ถ้าสตรีมยาวมาก: สามารถเพิ่มการตัดเฉพาะ
sidที่มี HEADERS:pathเป้าหมายได้ (แต่ต้องพาร์เซ HEADERS/Hpack เพิ่ม)
ผลลัพธ์หลังจากการรัน
Custom Protocol v2
Custom Protocol v2 ถูกใช้งานเข้ารหัสลับข้อมูลของศัตรู แต่ทางหน่วยข่าวกรองได้เอกสาร Protocol Specification มา จงใช้ Gen AI ช่วยถอดรหัส Custom Protocol นี้ให้หน่อยนะ
สรุปผล
- ไฟล์ที่ให้วิเคราะห์ thctt2025_open_netsec4_custom-protocol-v2.pcap
- โปรโตคอล: STH v2 บน UDP:31337
- กลไกสำคัญ: Header 23 ไบต์ + Options + Payload, CRC-32 ตรวจครบ, ข้อมูลแตกเป็นชิ้น DATA หลายชิ้นแล้วสลับลำดับด้วย LCG, มี reverse/base64 ตาม
flags2, จากนั้น XOR ด้วย keystream (SHA-256 counter-mode) และสุดท้าย zlib
🔍 โครงสร้างโปรโตคอล (สรุปที่ใช้จริง)
STH packet (บน UDP)
'STH'(3) | ver(1=0x02) | type(1) | flags(1) | reserved(1)session_id(4, BE) | seq(4, BE) | payload_len(2, BE) | options_len(2, BE) | crc32(4, BE)[options (options_len)] [payload (payload_len)]
- CRC-32 คำนวณจาก:
header[:19] + options + payload
HELLO (payload)
client_nonce(8) | name_len(1) | name(name_len bytes)
WELCOME (payload)
server_nonce(8) | salt(8) | chunk_size(2) | total_chunks(2) |a(4) | c(4) | seed(4) | hint(2) | flags2(1) | reserved(1)
flags2:- bit0 (
0x01) → มี reverse (กลับบัฟเฟอร์) - bit1 (
0x02) → มี base64 hintใช้เช็ค/กรอง session (เช่นค่าพิเศษที่คนเขียนโจทย์ตั้งไว้)
DATA
- DATA แต่ละชิ้นอยู่ใน
payloadตรง ๆ (ไม่มีส่วนหัวเพิ่ม) - ต้องเอาชิ้นที่มี
seq = 0..n-1มาเรียงใหม่ด้วย LCG
Reordering (LCG)
- ให้
n = total_chunks - ลำดับจริงของ ตำแหน่ง ชิ้นคือ
x0 = seed % n x_{k+1} = (a * x_k + c) % n
- จาก ชิ้นที่ส่งมาด้วย index/seq = j ให้ไปวางที่ตำแหน่ง i = x_j
Keystream & XOR
- สร้าง keystream ยาวเท่าข้อมูล โดยวนคอนเตอร์
k = 0,1,2,...: block_k = SHA256( session_id(4,BE) || client_nonce(8) || server_nonce(8) || salt(8) || k(4,BE) )- นำมาวางต่อกัน แล้ว XOR กับ ciphertext (หลัง undo base64/reverse)
Decompress
zlib.decompress(...)(ลองMAX_WBITS / raw / gzipเผื่อ)
🧭 วิธีทำ (Step-by-Step)
- ดึงทุก UDP packet ที่พอร์ต 31337
- แยก STH packets ด้วยการเช็ค
'STH'+ver=0x02+ CRC-32 ถูกต้อง
- จับคู่ session จาก
session_id: - เก็บ
HELLO(เอาclient_nonce) - เก็บ
WELCOME(ดึงพารามิเตอร์server_nonce, salt, chunk_size, total_chunks, a, c, seed, flags2, hint) - เก็บ
DATAใส่ map ตามseq
ประกอบข้อมูล:
- ใช้ LCG
(a,c,seed,n)สร้างลำดับตำแหน่งx_j - วาง
chunk[ x_j ] = DATA[ j ]แล้ว concat ->ciphertext
ถอด transform ตาม flags2:
- ถ้ามี
0x02→ base64 decode - ถ้ามี
0x01→ reverse (กลับบัฟเฟอร์)
XOR ด้วย keystream (SHA-256 counter-mode) → ได้ compressed plaintext
zlib decompress → ได้ข้อความจริง แล้ว regex หา flag{...}
🛠️ โค้ดพร้อมรัน (Python 3, no external deps)
บันทึกเป็นไฟล์ sth_v2_decoder.py:
#!/usr/bin/env python3import struct, binascii, base64, zlib, re, sysfrom collections import defaultdict, namedtuple# ---------- PCAP (libpcap) reader (ipv4/ipv6 + udp) ----------def read_pcap(path): data = open(path, "rb").read() if len(data) < 24: raise ValueError("PCAP too short") magic = struct.unpack("<I", data[:4])[0] if magic == 0xa1b2c3d4: endian, ts_nsec = "<", False elif magic == 0xd4c3b2a1: endian, ts_nsec = ">", False elif magic == 0xa1b23c4d: endian, ts_nsec = "<", True elif magic == 0x4d3cb2a1: endian, ts_nsec = ">", True else: raise ValueError("Unknown PCAP magic")off = 24 while off + 16 <= len(data): ts_sec, ts_usec, incl_len, orig_len = struct.unpack(endian+"IIII", data[off:off+16]) off += 16 pkt = data[off:off+incl_len] off += incl_len yield ts_sec, ts_usec, pktdef parse_eth(pkt): if len(pkt) < 14: return None ether_type = struct.unpack("!H", pkt[12:14])[0] return ether_type, pkt[14:]def parse_ipv4(buf): if len(buf) < 20: return None ihl = (buf[0] & 0x0F) * 4 if len(buf) < ihl: return None proto = buf[9] total_len = struct.unpack("!H", buf[2:4])[0] payload = buf[ihl: total_len] if total_len <= len(buf) else buf[ihl:] src = ".".join(map(str, buf[12:16])) dst = ".".join(map(str, buf[16:20])) return ("IPv4", proto, src, dst, payload)def parse_ipv6(buf): if len(buf) < 40: return None ver = (buf[0] >> 4) & 0xF if ver != 6: return None plen = struct.unpack("!H", buf[4:6])[0] nh = buf[6] payload = buf[40:40+plen] src = ":".join(f"{int.from_bytes(buf[8+i:10+i],'big'):x}" for i in range(0,16,2)) dst = ":".join(f"{int.from_bytes(buf[24+i:26+i],'big'):x}" for i in range(0,16,2)) return ("IPv6", nh, src, dst, payload)def parse_udp(payload): if len(payload) < 8: return None sport, dport, length, csum = struct.unpack("!HHHH", payload[:8]) data = payload[8:length] if length <= len(payload) else payload[8:] return sport, dport, data# ---------- STH v2 ----------STH_MAGIC = b"STH"TYPE_HELLO = 0x01TYPE_WELCOME = 0x02TYPE_DATA = 0x10TYPE_ACK = 0x11TYPE_BYE = 0x20Welcome = namedtuple("Welcome", "session_id server_nonce salt chunk_size total_chunks a c seed hint flags2")Hello = namedtuple("Hello", "client_nonce client_name")def crc32(buf): return binascii.crc32(buf) & 0xffffffffdef parse_sth(buf): if len(buf) < 23: return None if buf[:3] != STH_MAGIC or buf[3] != 0x02: return None t = buf[4] flags = buf[5] # buf[6] reserved sid = struct.unpack("!I", buf[7:11])[0] seq = struct.unpack("!I", buf[11:15])[0] payL = struct.unpack("!H", buf[15:17])[0] optL = struct.unpack("!H", buf[17:19])[0] crc = struct.unpack("!I", buf[19:23])[0] need = 23 + optL + payL if len(buf) < need: return None options = buf[23:23+optL] payload = buf[23+optL:23+optL+payL] if crc32(buf[:19] + options + payload) != crc: return None return {"type":t,"flags":flags,"session_id":sid,"seq":seq, "options":options,"payload":payload, "raw":buf[:need]}def parse_tlv(options_bytes): tlvs=[]; i=0 while i+2 <= len(options_bytes): typ = options_bytes[i]; ln = options_bytes[i+1]; i+=2 tlvs.append((typ, options_bytes[i:i+ln])); i+=ln return tlvsdef parse_hello(p): if len(p) < 9: return None client_nonce = p[:8] name_len = p[8] name = p[9:9+name_len].decode("utf-8","ignore") return Hello(client_nonce, name)def parse_welcome(p, sid): if len(p) < 36: return None server_nonce = p[0:8] salt = p[8:16] chunk_size = struct.unpack("!H", p[16:18])[0] total_chunks = struct.unpack("!H", p[18:20])[0] a = struct.unpack("!I", p[20:24])[0] c = struct.unpack("!I", p[24:28])[0] seed = struct.unpack("!I", p[28:32])[0] hint = struct.unpack("!H", p[32:34])[0] flags2 = p[34] # p[35] reserved return Welcome(sid, server_nonce, salt, chunk_size, total_chunks, a, c, seed, hint, flags2)def lcg_order(a,c,seed,n): order=[]; x = seed % n; seen=set() for _ in range(n): order.append(x); seen.add(x) x = (a*x + c) % n if len(set(order)) != n: # fallback make unique uniq=[]; seen=set() for x in order: if x not in seen: uniq.append(x); seen.add(x) for i in range(n): if i not in seen: uniq.append(i) order = uniq[:n] return orderdef keystream(session_id_be4, client_nonce, server_nonce, salt, L): base = session_id_be4 + client_nonce + server_nonce + salt out = bytearray(); k=0 import hashlib while len(out) < L: blk = base + struct.pack("!I", k) out.extend(hashlib.sha256(blk).digest()) k += 1 return bytes(out[:L])def decode_session(sess): hello = sess.get("hello") wel = sess.get("welcome") if not hello or not wel: return None n = wel.total_chunks if len(sess["data"]) != n: return None # Reorder by LCG order = lcg_order(wel.a, wel.c, wel.seed, n) chunks = [b""]*n for j in range(n): if j not in sess["data"]: return None i = order[j] chunks[i] = sess["data"][j] ciphertext = b"".join(chunks) # Undo transforms buf = ciphertext if (wel.flags2 & 0x02) != 0: # base64 try: buf = base64.b64decode(buf, validate=False) except Exception: try: buf = base64.b64decode(buf + b"==") except: pass if (wel.flags2 & 0x01) != 0: # reverse buf = buf[::-1] # XOR keystream sid_be4 = struct.pack("!I", wel.session_id) ks = keystream(sid_be4, hello.client_nonce, wel.server_nonce, wel.salt, len(buf)) comp = bytes(a ^ b for a,b in zip(buf, ks)) # Decompress plain = None for w in (zlib.MAX_WBITS, -zlib.MAX_WBITS, 16+zlib.MAX_WBITS): try: plain = zlib.decompress(comp, w); break except: pass if plain is None: plain = comp # fallback m = re.search(rb'flag\{[^}]+\}', plain, re.I) return m.group(0).decode() if m else Nonedef main(pcap_path): sessions = defaultdict(lambda: {"hello":None, "welcome":None, "data":{}}) last_hello = None for _,_,pkt in read_pcap(pcap_path): L2 = parse_eth(pkt); if not L2: continue ether_type, l3 = L2 L3 = None if ether_type == 0x0800: L3 = parse_ipv4(l3) elif ether_type == 0x86DD: L3 = parse_ipv6(l3) if not L3: continue proto = L3[1] if proto != 17: # UDP only continue udp = parse_udp(L3[4]) if not udp: continue sport, dport, udp_payload = udp if sport != 31337 and dport != 31337: continue # ภายใน UDP อาจมีหลาย STH frame (เผื่อคนส่ง pack) i=0 while i + 23 <= len(udp_payload): if udp_payload[i:i+3] != STH_MAGIC or udp_payload[i+3] != 0x02: i += 1; continue # peek length payL = struct.unpack("!H", udp_payload[i+15:i+17])[0] optL = struct.unpack("!H", udp_payload[i+17:i+19])[0] need = 23 + optL + payL if i + need > len(udp_payload): break frame = udp_payload[i:i+need] i += need p = parse_sth(frame) if not p: continue sid = p["session_id"]; t = p["type"] if t == TYPE_HELLO: h = parse_hello(p["payload"]) if h: sessions[sid]["hello"] = h; last_hello = h elif t == TYPE_WELCOME: w = parse_welcome(p["payload"], sid) if w: sessions[sid]["welcome"] = w if sessions[sid]["hello"] is None and last_hello: sessions[sid]["hello"] = last_hello elif t == TYPE_DATA: sessions[sid]["data"][p["seq"]] = p["payload"] # ACK/BYE not needed for decode # ลองถอดทุก session ที่ข้อมูลครบ results=[] for sid, S in sessions.items(): w = S.get("welcome") if not w: continue if S["data"] and len(S["data"]) == w.total_chunks: flag = decode_session(S) if flag: results.append((sid, flag)) if not results: print("No flag found.") return 1 for sid, flag in results: print(f"[session {sid}] {flag}") return 0if __name__ == "__main__": pcap = sys.argv[1] if len(sys.argv)>1 else "input.pcap" sys.exit(main(pcap))
ใช้งาน
python3 test2.py thctt2025_open_netsec4_custom-protocol-v2.pcap
ถ้าต้องการรองรับไฟล์ pcapng ด้วย ให้แปลงเป็น pcap ก่อน (เช่น editcap -F pcap input.pcapng out.pcap) หรือเพิ่มตัวอ่าน pcapng เข้าไปตามต้องการ
🧪 เช็กลิสต์ดีบักเร็ว
- CRC-32 ไม่ผ่าน → header ชำรุดหรืออ่าน options/payload ผิด offset
DATAไม่ครบtotal_chunks→ dump รายการseqที่ได้มา, อาจมีชิ้นตกหล่น- ไม่พบ
flag{…}→ ลองดูplainที่ได้ก่อน decompress เผื่อflags2สลับลำดับ (แต่จากสเปก: base64 → reverse → XOR → zlib)
ภาพรวมสั้น ๆ
สคริปต์นี้ทำ 6 ขั้นตอนหลัก:
- อ่าน PCAP (libpcap) → แยก L2/L3/L4 จนถึง UDP:31337
- สแกนและตัด STH v2 frame ออกจาก UDP payload (1 datagram อาจมีหลาย frame)
- ตรวจ CRC-32 และแยกตาม session_id + เก็บ HELLO / WELCOME / DATA
- ใช้พารามิเตอร์จาก WELCOME (a,c,seed,n) จัดเรียง DATA ด้วย LCG
- คลาย transform ตาม
flags2(base64, reverse) + สร้าง keystream → XOR zlib.decompress→ หาflag{...}
ไล่ code ข้างต้น
1) ส่วน import และตัวช่วยอ่าน PCAP
import struct, binascii, base64, zlib, re, sysfrom collections import defaultdict, namedtuple
structใช้อ่านเลขแบบไบนารี (big-endian/ little-endian)binascii.crc32คำนวณ CRC-32base64,zlibถอดบีบอัด/คำรหัสตามที่โปรโตคอลกำหนดreหาแพทเทิร์นflag{...}defaultdictช่วยเก็บแพ็กเก็ตแบ่งตาม session
ฟังก์ชันอ่าน PCAP (ไม่พึ่ง lib ภายนอก)
def read_pcap(path): ...
- ตรวจ magic number เพื่อรู้ endianness ของไฟล์ pcap
- loop อ่าน packet header (เวลา+ความยาว) แล้ว yield เนื้อแพ็กเก็ตดิบ
เลือกใช้ “reader แบบเบา” เพื่อเลี่ยง dependency — คุม offset เองทั้งหมด
แยกชั้น Ethernet / IPv4 / IPv6 / UDP
parse_eth→ คืนether_typeและ payload ที่ชั้นบนparse_ipv4→ คืน(type, proto, src, dst, payload)โดยproto==17คือ UDPparse_ipv6→ คล้ายกัน แต่ใช้โครง IPv6 (อ่านplen,nh)parse_udp→ คืน(sport, dport, data)(ตัดหัว UDP 8 ไบต์ออก)
จุดสำคัญคือเราสนใจเฉพาะ UDP:31337 เท่านั้น
2) โครงสร้าง/พาร์เซ STH v2
คงค่าคงที่ประเภท frame:
STH_MAGIC = b"STH"TYPE_HELLO=0x01; TYPE_WELCOME=0x02; TYPE_DATA=0x10; ...
โครงสร้างผลพาร์เซ:
Welcome = namedtuple(...)Hello = namedtuple(...)
parse_sth(buf)
- ตรวจ
'STH'และver==0x02 - ดึง
type, flags, session_id, seq, payload_len, options_len, crc32 - คำนวณ CRC-32 จาก
header[:19] + options + payload - คืน dict สำหรับ frame นี้
parse_hello(payload)
- ตัด
client_nonce(8)และname_len + name
parse_welcome(payload, sid)
- ดึง
server_nonce, salt, chunk_size, total_chunks, a, c, seed, hint, flags2 - สร้าง
Welcome(...)
3) Reordering ด้วย LCG
def lcg_order(a,c,seed,n): x0 = seed % n x_{k+1} = (a*x_k + c) % n
- คืนรายการลำดับ ตำแหน่ง ของชิ้น (ยาว n)
- ถ้าเจอกรณีวนลูปไม่ครบ n (เซฟตี้) จะ “บังคับ” ทำให้ครบ (fallback)
ประกอบข้อมูล
หลังเก็บ DATA เป็น sess["data"][seq] = payload:
order = lcg_order(...)chunks[i] = data[j] # โดย i = order[j]ciphertext = b"".join(chunks)
ตรงตามสเปค: ค่าseqที่ส่งมาเป็น “ดัชนี j” ของ LCG; ตำแหน่งที่ควรไปอยู่คือi = x_j
4) Transforms + Keystream XOR
flags2
if flags2 & 0x02: base64.b64decode(...)if flags2 & 0x01: buf = buf[::-1]
- bit1 (
0x02) = มี base64 - bit0 (
0x01) = reverse
Keystream (SHA-256 counter)
def keystream(session_id_be4, client_nonce, server_nonce, salt, L): base = sid||client||server||salt for k=0.. : SHA256(base || k_be32) ต่อกันจนยาว >= L
- เหมือน CTR-mode ง่าย ๆ → XOR กับบัฟเฟอร์หลัง transform
5) Decompress + หา flag
พยายาม zlib หลายโหมด:
for w in (zlib.MAX_WBITS, -zlib.MAX_WBITS, 16+zlib.MAX_WBITS): try: plain = zlib.decompress(comp, w)
- ถ้าไม่สำเร็จเลย → ใช้
compตรง ๆ (fallback) re.search(rb'flag\{[^}]+\}', plain)แล้วดึงสตริง
6) main(): วนอ่าน pcap → รวมผล → ถอดทุก session
sessions = defaultdict(lambda: {"hello":None, "welcome":None, "data":{}})last_hello = None
- ระหว่างเดินแพ็กเก็ต:
- HELLO → เก็บ
client_nonce; จำlast_helloเผื่อจับคู่ WELCOME ที่ส่งตามมา - WELCOME → เก็บพารามิเตอร์; ถ้ายังไม่มี hello ให้ผูก
last_helloเป็น default - DATA → เก็บเป็น map ตาม
seq
เงื่อนไขถอด:
- ต้องมี
welcome - จำนวน
DATAครบtotal_chunks - จากนั้น
decode_session(sess)→ คืน flag หรือNone
พิมพ์ผล:
จุดที่ควรรู้ / ปรับแต่งได้
- ถ้าไฟล์เป็น pcapng:
- ทางลัด:
editcap -F pcap in.pcapng out.pcapแล้วใช้สคริปต์นี้ - หรือเพิ่มตัวอ่าน pcapng (ในเวอร์ชันก่อนหน้านี้ผมมีตัวอย่าง parser mini ๆ อยู่แล้ว)
- ถ้าบาง session DATA ไม่ครบ:
- ลองพิมพ์เช็ก
len(sess["data"])กับtotal_chunksเพื่อวินิจฉัย - ถ้าไม่เจอ flag:
- ดู
plain[:200]ช่วยวิเคราะห์ว่าถูกขั้นไหน (base64/reverse/keystream/zlib) - ประสิทธิภาพ:
- สคริปต์ทำ O(#packets) ในการ parse + O(n) สำหรับ LCG + O(len(buf)) สำหรับ keystream/XOR/decompress — ปกติเร็วมาก
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
ขอบคุณทุกคนที่เข้ามาอ่านผมทำ blog นี้เพื่อให้เห็นในอีกมุมนึงเท่านั่นว่าหากเรานำเทคโนโลยีเข้ามาช่วยเหลือมันก็จะช่วยเราว่ามารถผ่านปัญหาได้ โดยทั้ง blog
นี้ผมก็ได้ให้ ChatGPT ช่วยเขียนมันขึ้นมาเช่นกัน