セキュリティキャンプ全国大会2025のCクラスに受かったので応募課題をさらしてみようと思います。
自分的にはブログに書いた方がやりやすかったので、応募する際はブログへのリンクと内容を書きました。
急いで書いたのでけっこう文のつながりがおかしかったりします。(本当に読みにくい)
私の個人的な問を晒しても面白くないのでそれ以外を晒します。
問5
1.わかったこと
pandaとkiteshieldというリポジトリをフォークしている。
pikaxというリポジトリを3月17日から作成しており、Takeru Ito と clwomkvがコミットをしている。
clwomkvのメールアドレスは、c.seccamp@proton.meであり、2025/03/17 12:30:45に作成されており、セキュリティキャンプ用に作った匿名のアカウントである。
また、Takeru Itoは、itotx1993+work@gmail.comというアドレスを使用しており、
2025/03/17 04:23:17にプロフィール画像を変更している。
こちらもセキュリティキャンプ用に作った空想の人物だと考えられる。
2.どのように調べたか
まず、自作のリポジトリはpikaxのみだったので、そのコミット履歴を調べました。
すると、Takeru Itoというアカウントがコミットをしていることがわかりました。
また、githubではコミットIDのあとに.patchをつけることでより詳細な情報を見ることができるので、各コミットのURLの最後に.patchをつけることでメールアドレスなどが判明しました。
clwomkvのメールアドレスはproton mail という匿名のメールサービスを使用していました。
proton mailにはProtOSINTというツールがあることが分かったので、それでc.seccamp@proton.meというアドレスを調べてみると、2025/03/17 12:30:45に作成されたことが分かったのでセキュリティキャンプ用に作られたものだと判断しました。
itotx1993はgmailなのでGHUNT等のツールで調べてみましたが、コミットする40分前に、プロフィールを変更していたことや、アイコンが初期画像ということしか主にわかりませんでした。
つまり、この課題用に、直前に作ったアカウント、もしくは、個人情報保護のためにプロフィール画像を初期画像に変更したものと推測されます。
itotx1993で調べてもほかのアカウントは見つからず、1993年生まれであろうことしか予想できませんでした。
また、sherlockといったツールでclwomkvやitotx1993というユーザ名を調べてみてもヒットしませんでした。
なので、Takeru Itoとgoogleで検索し、1993年生まれの人を全探索しました。
しかし、なかなかヒットしません。
なので、セキュリティキャンプ用に作られたものと推測しました。
3.気を付けたこと
このようないわゆるOSINTと呼ばれるような行為を行う際は、こちらは勝手に調査を行っているので相手に迷惑がかからないように気を付けています。
例えば、今回の例ではメールアドレスが出ましたが、決してそのメールアドレスには調査目的で連絡してはいけないと考えています。
セキュリティの調査を行う際はこのような倫理観も大切だと考えているのでとても気を付けています。
問6
問6-1
Ghidraで解析をしてみると、AESMainという怪しい関数があります。
GhidraのReference機能で呼び出しもとをたどってみると、呼び出し元の関数でdata.txtに対して上書きをしているようです。
そして、最終的にmainで呼び出されているCheckTimestamp関数内で呼び出されていました。
CheckTimestamp関数では、GetSystemTimeで日付を取得し、2025/8/11の場合にのみスレッドを作成し先ほどの関数を呼び出していました。
よって、data.txtを暗号化するためにはシステムの日付を2025/8/11に設定する必要があります。
実際に実験してみたところ確かに、2025/8/11の場合にdata.txtが暗号化されていることが確認できました。
問6-2
私の環境ではAESMainを呼び出して、data.txtに書き込みをしている関数名がUndefinedFunction_140001719なので、以後そのように記載します。
まずは、UndefinedFunction_140001719関数内で
1.data.txtをOPEN
2.ファイル長Lを取得
3.malloc(L+1)でbuf実体
4.ReaFileでbuf ← data.txtの内容
5.AESMain(buf,&outBuf,L)
6.SetFilePointer 先頭へ
7.WriteFile(outBuf,Lバイト分)
8.CloseHandle
といったフローで実行されており、AESMainは
1.DecAesKey() 32バイト鍵key[]を復元
2.makeIV() 16バイト IV[]を準備
3.BCryptOpenAlgorithmProvider(&algo, L”AES”, NULL, 0)
4.BCryptSetProperty(algo, L”ChainingMode”, L”CBC”, …)
5.BCryptGetProperty(algo, L”ObjectLength”, &objLen, …)
6.malloc(objLen) → keyObject バッファ確保
7.BCryptGenerateSymmetricKey(algo, &keyHandle, keyObject,objLen, &key, 32, 0)
8.BCryptEncrypt(keyHandle, buf, L, NULL, &IV, 16, NULL, 0,&neededSize, BCRYPT_BLOCK_PADDING) // 出力長取得
9.malloc(neededSize) → outBuf 確保
10.BCryptEncrypt(keyHandle, buf, L, NULL, &IV, 16, outBuf,neededSize, &writtenSize, BCRYPT_BLOCK_PADDING)
11.*param_2 = outBuf
となっています。
まとめ
data.txtを読み込み平文バッファを得る。それをAESMainに渡して暗号化。鍵生成をし、IVを生成してから、AES-CBC暗号化本体を行っています。
DecAesKeyは既存のデータから計算をしてkeyを復元しているだけで、makeIVは時間をシードにとり、乱数でIVを生成しています。
つまり、keyは手動で復元可能で、IVもシードの時間を合わせれば疑似乱数なので乱数を再現できます。
そして、暗号化された生データを同じファイルに書き戻しています。
問6-3
利用した複合プログラムは次です。
import datetime
import os
import sys
from Crypto.Cipher import AES
#IDA及びGhidraから抽出
encTable_static = [0x3A, 0x7F, 0xC2, 0x9B]
encKey = [
0xD6,0x37,0x0C,0xB9, 0x3A,0x19,0x86,0x61,
0xE2,0x55,0xCA,0xF5, 0x76,0x9D,0x02,0xBD,
0x3E,0x05,0x9A,0x71, 0xF2,0x51,0xCE,0xF9,
0x7A,0x99,0x06,0xA1, 0x22,0x15,0x8A,0x79
]
def derive_key():
tbl = [(b ^ 0xAA) for b in encTable_static]
key = bytearray(32)
for i in range(32):
key[i] = ((encKey[i] ^ tbl[i % 4]) >> 1) & 0xFF
return bytes(key)
class MSVCRT_Rand: #Cのrandを再現
def __init__(self, seed):
self.hold = seed & 0xFFFFFFFF
def rand(self):
self.hold = (self.hold * 214013 + 2531011) & 0xFFFFFFFF
return (self.hold >> 16) & 0x7FFF
def make_iv(seed):
rng = MSVCRT_Rand(seed)
return bytes((rng.rand() & 0xFF) for _ in range(16))
def printable_ratio(data: bytes) -> float:
printable = sum(1 for b in data if 32 <= b < 127 or b in (9,10,13))
return printable / len(data)
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} data.txt", file=sys.stderr)
sys.exit(1)
raw = open(sys.argv[1],"rb").read()
orig_size = len(raw)
aligned = (orig_size // 16) * 16
cipher = raw[:aligned]
key = derive_key()
print("[*] Derived AES-256 key:", key.hex())
import datetime, time
utc0 = int(datetime.datetime(2025,8,11,0,0,0).timestamp())
utc1 = int(datetime.datetime(2025,8,11,23,59,59).timestamp())
jst0 = int(datetime.datetime(2025,8,10,15,0,0).timestamp())
jst1 = int(datetime.datetime(2025,8,11,14,59,59).timestamp())
ranges = [("UTC", utc0, utc1), ("JST→UTC", jst0, jst1)]
outdir = "candidates"
os.makedirs(outdir, exist_ok=True)
# 閾値(可読率)
threshold = 1.00
print(f"[*] Searching seeds, keeping printable_ratio ≥ {threshold:.2f}")
for label, lo, hi in ranges:
print(f"[*] Range {label}: seeds {lo}–{hi}")
for seed in range(lo, hi+1):
iv = make_iv(seed)
plain_padded = AES.new(key, AES.MODE_CBC, iv).decrypt(cipher)
plain = plain_padded[:orig_size] # パディング除去せずに切り出し
ratio = printable_ratio(plain)
if ratio >= threshold:
fname = os.path.join(outdir, f"candidate_{seed}.bin")
with open(fname, "wb") as wf:
wf.write(plain)
print(f"[+] Seed={seed} IV={iv.hex()} printable={ratio:.2%} → {fname}")
print("[*] Done. Check the `candidates/` directory for outputs.")
if __name__ == "__main__":
main()
復号結果は次です。
Decoding Ransomware: Unraveling the Mind of Cybercriminals
Ransomware is malicious malware that encrypts data and demands a ransom to decrypt it. It has become a major risk for individuals and organizations. This lecture aims to provide students with an understanding of the basic principles of ransomware operation and infection routes. In addition to this, the course also touches on the psychology of cybercriminals who launch ransomware attacks, their motives, and attack strategies.
The lecture will begin with a basic overview of ransomware and the relationship between threat actors. Then, through static and dynamic analysis, students will learn basic analysis techniques using debuggers. The goal of this lecture is to understand why ransomware systems are created and why ransomware is prevalent after mastering basic ransomware analysi
最後の分は「ransomware analysis techniques.」で終わるはずなので完全には復号できていません。
しかし、それは仕方ありません。
hexeditorで暗号前のファイルを除いてみるとわかりやすいでしょう。
analysisまでで、0x340の行にいますが、s techniques.からは0x350の行に行き、8バイト分だけあふれています。ちょうど16バイトで終わっていません。
なぜでしょうか?
私のプログラムでaligned = (orig_size // 16) * 16をしているからですが、なぜこうするを得なかったのでしょう?
まずAESMainではちゃんとAES暗号化の処理を行っています。これから、AESMainの一部をGhidraで逆コンパイルしたものを抽出します。
今回はBCryptSetProperty(local_18,L”ChainingMode”,(PUCHAR)L”ChainingModeCBC”,0x20,0);と、CBCモードで暗号化を行っています。
CBCのようなモードでは入力を必ず16バイト(128ビット)のブロックに区切って暗号化します。
なので、16バイト未満のものはパディング(主にPKCS#7パディング方法)という手法が使われ強制的に16バイトのブロックになるようにしています。
逆に16バイトのブロックがそろっていなければ復号できません。
ここまでは全然大丈夫ですが、一度UndefinedFunction_140001719(core)の逆コンパイル結果を見ると、
nNumberOfBytesToRead = GetFileSize(hFile, 0LL);
if ( nNumberOfBytesToRead != -1 )
{
lpBuffer = malloc(nNumberOfBytesToRead + 1);
ReadFile(hFile, lpBuffer, nNumberOfBytesToRead, &NumberOfBytesRead, 0LL);
if ( lpBuffer )
AESMain(lpBuffer, &v3, nNumberOfBytesToRead);
SetFilePointer(hFile, 0, 0LL, 0);
WriteFile(hFile, v3, nNumberOfBytesToRead, &NumberOfBytesRead, 0LL);
}
このような実装になっていて、暗号化前のdata.txtのファイルサイズ(nNumberOfBytesToRead)をそのままWriteFileの第三引数、つまりファイルに書き込むバイト数に代入しています。
これは、せっかくAESMainで暗号文にパディングをつけて16バイトずつに分けたのに結局書き込まれるバイト数は暗号化する前のバイト数なので、パディング分が消えてしまいます。
16バイトの倍数にしないとAES-CBCは復号できないので、最後の行が消えてしまうというわけです。
パディングが消えているということに気づくのには沼りました。
ずっとエラーがでてしまい、IDAでkeyやivの値も確認しながら一致していることを確認していたのですごく混乱しました。最終的にたどり着けて良かったです。
追記:
あとから見てみてもまじで何書いてるか分かりにくかったのでまとめると、せっかくパディングつけたのにパディング前のファイルサイズで書き込んでいるので最後の16バイトのブロックになっていないところは復号できないよねという話です。
問7
問7-1
import struct
import io
import sys
import os
import hashlib
from typing import List, Tuple, Optional
# ----------------------------
# [1] 定数定義(ext4 on-disk フォーマット)
# ----------------------------
EXT4_SUPERBLOCK_OFFSET = 1024
EXT4_SUPERBLOCK_SIZE = 1024 # バイト数
# ext4 inode フラグ (i_flags) 内
EXT4_EXTENTS_FL = 0x80000 # extents フォーマットが使われているかを示す
# extent ヘッダ内のマジック
EXT4_EXTENT_MAGIC = 0xF30A
# ----------------------------
# [2] スーパーブロック構造から必要フィールドを読むフォーマット
# ----------------------------
# オフセット 24: s_log_block_size (4 バイト)
# オフセット 32: s_blocks_per_group (4 バイト)
# オフセット 40: s_inodes_per_group (4 バイト)
# オフセット 88: s_inode_size (2 バイト)
# オフセット 20: s_first_data_block (4 バイト)
# → これらを struct.unpack_from で抜き取る
# ----------------------------
# ----------------------------
# [3] グループディスクリプタ構造体(on-disk)
# ----------------------------
# bg_inode_table_lo: オフセット 8, 4 バイト
# bg_inode_table_hi: オフセット 36, 4 バイト
# (1 グループあたり 64 バイトとして扱う)
# ----------------------------
# GROUP_DESC_STRUCT_FMT = "<III I I I H H H H H I" # (全部読む必要はなく、offset + struct.unpack_from だけで可)
# ----------------------------
# [4] inode 構造体から最初の 128 バイト分を読むフォーマット
# ----------------------------
INODE_STRUCT_FMT = "<H H I I I I I H H I I 15I"
# H: i_mode
# H: i_uid
# I: i_size_lo
# I: i_atime
# I: i_ctime
# I: i_mtime
# I: i_dtime
# H: i_gid
# H: i_links_count
# I: i_blocks_lo
# I: i_flags
# I: i_osd1
# 15I: i_block[15] (bytes 40-99 の部分)
# ----------------------------
# [5] extent ヘッダ/エントリ/インデックス構造
# ----------------------------
EXTENT_HEADER_FMT = "<H H H H I"
# eh_magic (2) | eh_entries (2) | eh_max (2) | eh_depth (2) | eh_generation (4)
EXTENT_INDEX_FMT = "<I I H H"
# ei_block (4) | ei_leaf_lo (4) | ei_leaf_hi (2) | ei_unused (2)
EXTENT_LEAF_FMT = "<I H H I"
# ee_block (4) | ee_len (2) | ee_start_hi (2) | ee_start_lo (4)
# ----------------------------
# [6] ディレクトリエントリ構造体(on-disk)
# 各エントリは variable-length:
# - inode (4 bytes)
# - rec_len (2 bytes)
# - name_len (1 byte)
# - file_type (1 byte)
# - name (name_len bytes)
# - padding (rec_len - (8 + name_len)) bytes
# ----------------------------
# ----------------------------
# [7] ユーティリティ関数群
# ----------------------------
def read_superblock(f: io.BufferedReader) -> dict:
"""
スーパーブロックを読み、以下の情報を辞書で返す:
- block_size (int)
- inodes_per_group (int)
- inode_size (int)
- first_data_block (int)
"""
f.seek(EXT4_SUPERBLOCK_OFFSET)
sb_data = f.read(EXT4_SUPERBLOCK_SIZE)
s_log_block_size = struct.unpack_from("<I", sb_data, 24)[0]
s_blocks_per_group = struct.unpack_from("<I", sb_data, 32)[0]
s_inodes_per_group = struct.unpack_from("<I", sb_data, 40)[0]
s_inode_size = struct.unpack_from("<H", sb_data, 88)[0]
s_first_data_block = struct.unpack_from("<I", sb_data, 20)[0]
block_size = 1024 << s_log_block_size
return {
"block_size": block_size,
"blocks_per_group": s_blocks_per_group,
"inodes_per_group": s_inodes_per_group,
"inode_size": s_inode_size,
"first_data_block": s_first_data_block,
}
def read_group_descriptor(f: io.BufferedReader, gd_table_offset: int, group_index: int) -> dict:
"""
指定したグループインデックスの「グループディスクリプタ」を読み、
- inode_table_lo (lower 32bit)
- inode_table_hi (upper 32bit)
を辞書で返す。
"""
size_per_gd = 64 # ext4 では少なくとも 64 bytes
f.seek(gd_table_offset + group_index * size_per_gd)
raw = f.read(size_per_gd)
inode_table_lo = struct.unpack_from("<I", raw, 8)[0]
inode_table_hi = struct.unpack_from("<I", raw, 36)[0]
return {
"inode_table_lo": inode_table_lo,
"inode_table_hi": inode_table_hi,
}
def read_inode(f: io.BufferedReader,
inode_number: int,
superblock: dict,
gd_table_offset: int) -> Tuple[dict, bytes]:
"""
指定 inode_number の inode 構造を読み込み、
- inode_meta (dict)
- raw_i_block (bytes: 15×4 = 60 bytes)
を返す。
"""
block_size = superblock["block_size"]
inodes_per_group = superblock["inodes_per_group"]
inode_size = superblock["inode_size"]
# グループインデックス算出
group_index = (inode_number - 1) // inodes_per_group
local_index = (inode_number - 1) % inodes_per_group
# そのグループの inode テーブル先頭ブロック番号を取得
gd = read_group_descriptor(f, gd_table_offset, group_index)
inode_table_block = (gd["inode_table_hi"] << 32) | gd["inode_table_lo"]
# inode テーブル領域先頭バイトオフセット
inode_table_offset = inode_table_block * block_size
inode_offset = inode_table_offset + local_index * inode_size
# inode 構造を最初の 128 bytes 分読む
f.seek(inode_offset)
raw_inode = f.read(max(inode_size, 128))
# 必要なフィールドを unpack
unpacked = struct.unpack_from(INODE_STRUCT_FMT, raw_inode, 0)
(
i_mode, i_uid, i_size_lo, i_atime, i_ctime, i_mtime, i_dtime,
i_gid, i_links_count, i_blocks_lo, i_flags, i_osd1,
*i_block_list
) = unpacked
# 大きいファイル用に i_size_high を読む (offset 108、4 bytes)
i_size_high = struct.unpack_from("<I", raw_inode, 108)[0]
full_size = (i_size_high << 32) | i_size_lo
inode_meta = {
"i_mode": i_mode,
"i_uid": i_uid,
"i_size": full_size,
"i_blocks_lo": i_blocks_lo,
"i_flags": i_flags,
"i_links_count": i_links_count,
}
# raw_i_block 部分 (60 bytes) をそのまま保持
raw_i_block = raw_inode[40:40 + 15*4]
return inode_meta, raw_i_block
def parse_extents_tree(f: io.BufferedReader,
raw_root: bytes,
block_size: int) -> List[Tuple[int,int]]:
"""
inode の raw_i_block (最初の 60 bytes) に含まれる「extent ヘッダ/index」を読む。
戻り値: [(logical_block, physical_block), ...] のリスト(すべての leaf extents をフラットに)
"""
eh_magic, eh_entries, eh_max, eh_depth, eh_generation = struct.unpack_from(EXTENT_HEADER_FMT, raw_root, 0)
if eh_magic != EXT4_EXTENT_MAGIC:
raise RuntimeError(f"Invalid extent magic: 0x{eh_magic:04x}")
# [1] depth=0 → 葉ノード
if eh_depth == 0:
ret: List[Tuple[int,int]] = []
offset = struct.calcsize(EXTENT_HEADER_FMT) # 12 bytes
for _ in range(eh_entries):
ee_block, ee_len, ee_start_hi, ee_start_lo = struct.unpack_from(EXTENT_LEAF_FMT, raw_root, offset)
physical = (ee_start_hi << 32) | ee_start_lo
# 連続ブロックをすべて追加 (logical: ee_block + j, physical: physical + j)
for j in range(ee_len):
ret.append((ee_block + j, physical + j))
offset += struct.calcsize(EXTENT_LEAF_FMT)
return ret
# [2] depth>0 → index ノード
results: List[Tuple[int,int]] = []
offset = struct.calcsize(EXTENT_HEADER_FMT)
for _ in range(eh_entries):
ei_block, ei_leaf_lo, ei_leaf_hi, ei_unused = struct.unpack_from(EXTENT_INDEX_FMT, raw_root, offset)
leaf_block = (ei_leaf_hi << 32) | ei_leaf_lo
# 下位ノードを block_size bytes 読み出し
data_offset = leaf_block * block_size
f.seek(data_offset)
page = f.read(block_size)
# 再帰的に同じ parse_extents_tree を呼ぶ (leaf ノードの header 部分は先頭から)
results.extend(parse_extents_tree(f, page[:60], block_size))
offset += struct.calcsize(EXTENT_INDEX_FMT)
return results
def extract_file_data_and_mapping(f: io.BufferedReader,
inode_meta: dict,
raw_i_block: bytes,
superblock: dict) -> Tuple[bytes, List[Tuple[int,int]]]:
"""
- i_flags に EXT4_EXTENTS_FL が立っている想定でのみ使う
- extent ツリーをたどり、ファイル全体をバイト列で返す (truncate も含む)
戻り値: (data_bytes, mapping)
- data_bytes: 実際のファイルサイズぶんだけ詰めた bytes
- mapping: [(logical_block, physical_block), ...]
"""
block_size = superblock["block_size"]
mapping = parse_extents_tree(f, raw_i_block, block_size)
# mapping→physical ブロック順にソート
mapping.sort(key=lambda x: x[0])
buffer = bytearray()
for logical, physical in mapping:
f.seek(physical * block_size)
buffer.extend(f.read(block_size))
# あふれ分はファイルサイズで切り詰め
data_bytes = bytes(buffer[: inode_meta["i_size"]])
return data_bytes, mapping
def extract_file_data_direct_and_mapping(f: io.BufferedReader,
raw_i_block: bytes,
inode_meta: dict,
superblock: dict) -> Tuple[bytes, List[Tuple[int,int]]]:
"""
ext4 で extent フラグが立っていない場合 (direct/indirect) の簡易処理
- direct ブロックのみ対応 (i_block[0..11])
戻り値: (data_bytes, mapping)
"""
block_size = superblock["block_size"]
# raw_i_block: 60 bytes → struct.unpack して 15 個の 32bit を得る
i_block_vals = list(struct.unpack("<15I", raw_i_block))
direct_blocks = i_block_vals[:12]
buffer = bytearray()
mapping: List[Tuple[int,int]] = []
for idx, pb in enumerate(direct_blocks):
if pb == 0:
break
f.seek(pb * block_size)
buffer.extend(f.read(block_size))
mapping.append((idx, pb))
data_bytes = bytes(buffer[: inode_meta["i_size"]])
return data_bytes, mapping
def parse_directory_data(data: bytes) -> List[Tuple[int, str]]:
"""
生のディレクトリコンテンツ (bytes) から、(inode_number, name) のリストを返す。
ext4 のディレクトリエントリ形式に従い、1 エントリずつ走査する。
"""
entries: List[Tuple[int, str]] = []
offset = 0
total = len(data)
while offset < total:
# 最低 8 bytes は確保されていないと無効
if offset + 8 > total:
break
inode_num = struct.unpack_from("<I", data, offset)[0]
rec_len = struct.unpack_from("<H", data, offset + 4)[0]
name_len = data[offset + 6]
file_type = data[offset + 7] # 必要なら使う (1=ファイル, 2=ディレクトリ, etc)
if inode_num == 0 or rec_len < 8:
break
name = data[offset + 8 : offset + 8 + name_len].decode("utf-8", errors="ignore")
entries.append((inode_num, name))
offset += rec_len
return entries
def find_filename_by_inode(f: io.BufferedReader,
target_inode: int,
superblock: dict,
gd_table_offset: int) -> Optional[str]:
"""
ルートディレクトリ (inode=2) から再帰的にディレクトリをたどり、
target_inode を指すディレクトリエントリの「ファイル名」を返す。
見つからなければ None。
"""
visited_dirs = set()
def recurse(current_inode: int, path_prefix: str) -> Optional[str]:
"""
current_inode (ディレクトリ) の下を探索。見つかればファイル名を返す。
path_prefix は「dirname/...」のように途中で辿ってきたパス (今は使わず)。
"""
if current_inode in visited_dirs:
return None
visited_dirs.add(current_inode)
# [1] current_inode の inode 構造を読む
inode_meta, raw_i_block = read_inode(f, current_inode, superblock, gd_table_offset)
# ディレクトリでなければ無効
# i_mode の上位 4 bit が 0b0100 (0x4000) ならディレクトリ
if (inode_meta["i_mode"] & 0xF000) != 0x4000:
return None
# [2] extent からディレクトリデータを取り出す
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
dir_data, _ = extract_file_data_and_mapping(f, inode_meta, raw_i_block, superblock)
else:
dir_data, _ = extract_file_data_direct_and_mapping(f, raw_i_block, inode_meta, superblock)
# [3] parse_directory_data して (inode, name) を列挙
entries = parse_directory_data(dir_data)
for inode_num, name in entries:
if inode_num == 0:
continue
# "." と ".." はスキップ
if name in (".", ".."):
continue
# [4] ターゲット inode を発見したら name を返す
if inode_num == target_inode:
return name
# [5] 見つからなかったら、サブディレクトリを再帰探索
for inode_num, name in entries:
if inode_num == 0:
continue
if name in (".", ".."):
continue
# サブディレクトリだったら再帰
# i_mode をチェックするため、その inode のメタを読む
sub_meta, _ = read_inode(f, inode_num, superblock, gd_table_offset)
if (sub_meta["i_mode"] & 0xF000) == 0x4000:
found = recurse(inode_num, path_prefix + "/" + name)
if found:
return found
return None
# ルートディレクトリ inode は常に 2
return recurse(2, "")
# ----------------------------
# [8] メイン処理: イメージから inode=14 のファイルを取り出し、
# ファイル名・物理アドレス・ハッシュチェック まで行う
# ----------------------------
def main(image_path: str, target_inode: int):
# (A) まずイメージファイルを開く
with open(image_path, "rb") as f:
# 1) スーパーブロックを取得
sb = read_superblock(f)
block_size = sb["block_size"]
# 2) グループディスクリプタテーブル先頭バイトオフセットを計算
if sb["first_data_block"] == 0:
gd_table_offset = block_size
else:
gd_table_offset = (sb["first_data_block"] + 1) * block_size
# 3) target_inode の inode メタ & raw_i_block を取得
inode_meta, raw_i_block = read_inode(f, target_inode, sb, gd_table_offset)
# 4) ファイル名を探す (ルートディレクトリから再帰)
filename = find_filename_by_inode(f, target_inode, sb, gd_table_offset)
if filename is None:
print(f"⚠️ inode={target_inode} を指すファイル名が見つかりませんでした。")
filename = f"inode_{target_inode}"
else:
print(f"✅ inode={target_inode} のファイル名: '{filename}'")
# 5) extent フラグをチェックし、(data_bytes, mapping) を得る
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
data_bytes, mapping = extract_file_data_and_mapping(f, inode_meta, raw_i_block, sb)
else:
data_bytes, mapping = extract_file_data_direct_and_mapping(f, raw_i_block, inode_meta, sb)
if not mapping:
print("⚠️ 物理ブロックマッピングが空でした。")
else:
# mapping は (logical, physical) のリスト。最初の要素の physical を使う
_, first_phys = mapping[0]
phys_offset = first_phys * block_size
print(f"✅ ファイルデータの先頭物理ブロック番号: {first_phys}")
print(f"✅ 物理アドレス(バイトオフセット) の先頭: {phys_offset} bytes")
# 6) 出力先ディレクトリはイメージファイルと同じ
image_dir = os.path.dirname(os.path.abspath(image_path))
output_path = os.path.join(image_dir, filename)
# 7) ファイルデータを書き出す
with open(output_path, "wb") as out_f:
out_f.write(data_bytes)
print(f"✅ ファイルを '{output_path}' として保存しました。")
# 8) SHA-256 ハッシュを計算し、指定の値と比較
h = hashlib.sha256()
h.update(data_bytes)
digest = h.hexdigest()
expected = "fdf7c291905cf475f22a434f1f1c188a13496eff7429e6b15dd2887845c3558f"
print(f"🔑 出力ファイルの SHA-256: {digest}")
if digest == expected:
print("✅ ハッシュ値が一致しました。")
else:
print("❌ ハッシュ値が一致しません!")
print(f" 期待値: {expected}")
print(f" 実際: {digest}")
sys.exit(1)
if __name__ == "__main__":
image_path = "questions.dd"
main(image_path, target_inode=14)
問7-2
"""
問7-1と内容は同じファイルです
"""
import struct
import io
import sys
import os
import hashlib
from typing import List, Tuple, Optional
# ----------------------------
# [1] 定数定義(ext4 on-disk フォーマット)
# ----------------------------
EXT4_SUPERBLOCK_OFFSET = 1024
EXT4_SUPERBLOCK_SIZE = 1024 # バイト数
# ext4 inode フラグ (i_flags) 内
EXT4_EXTENTS_FL = 0x80000 # extents フォーマットが使われているかを示す
# extent ヘッダ内のマジック
EXT4_EXTENT_MAGIC = 0xF30A
# ----------------------------
# [2] スーパーブロック構造から必要フィールドを読むフォーマット
# ----------------------------
# オフセット 24: s_log_block_size (4 バイト)
# オフセット 32: s_blocks_per_group (4 バイト)
# オフセット 40: s_inodes_per_group (4 バイト)
# オフセット 88: s_inode_size (2 バイト)
# オフセット 20: s_first_data_block (4 バイト)
# → これらを struct.unpack_from で抜き取る
# ----------------------------
# ----------------------------
# [3] グループディスクリプタ構造体(on-disk)
# ----------------------------
# bg_inode_table_lo: オフセット 8, 4 バイト
# bg_inode_table_hi: オフセット 36, 4 バイト
# (1 グループあたり 64 バイトとして扱う)
# ----------------------------
# GROUP_DESC_STRUCT_FMT = "<III I I I H H H H H I" # (全部読む必要はなく、offset + struct.unpack_from だけで可)
# ----------------------------
# [4] inode 構造体から最初の 128 バイト分を読むフォーマット
# ----------------------------
INODE_STRUCT_FMT = "<H H I I I I I H H I I 15I"
# H: i_mode
# H: i_uid
# I: i_size_lo
# I: i_atime
# I: i_ctime
# I: i_mtime
# I: i_dtime
# H: i_gid
# H: i_links_count
# I: i_blocks_lo
# I: i_flags
# I: i_osd1
# 15I: i_block[15] (bytes 40-99 の部分)
# ----------------------------
# [5] extent ヘッダ/エントリ/インデックス構造
# ----------------------------
EXTENT_HEADER_FMT = "<H H H H I"
# eh_magic (2) | eh_entries (2) | eh_max (2) | eh_depth (2) | eh_generation (4)
EXTENT_INDEX_FMT = "<I I H H"
# ei_block (4) | ei_leaf_lo (4) | ei_leaf_hi (2) | ei_unused (2)
EXTENT_LEAF_FMT = "<I H H I"
# ee_block (4) | ee_len (2) | ee_start_hi (2) | ee_start_lo (4)
# ----------------------------
# [6] ディレクトリエントリ構造体(on-disk)
# 各エントリは variable-length:
# - inode (4 bytes)
# - rec_len (2 bytes)
# - name_len (1 byte)
# - file_type (1 byte)
# - name (name_len bytes)
# - padding (rec_len - (8 + name_len)) bytes
# ----------------------------
# ----------------------------
# [7] ユーティリティ関数群
# ----------------------------
def read_superblock(f: io.BufferedReader) -> dict:
"""
スーパーブロックを読み、以下の情報を辞書で返す:
- block_size (int)
- inodes_per_group (int)
- inode_size (int)
- first_data_block (int)
"""
f.seek(EXT4_SUPERBLOCK_OFFSET)
sb_data = f.read(EXT4_SUPERBLOCK_SIZE)
s_log_block_size = struct.unpack_from("<I", sb_data, 24)[0]
s_blocks_per_group = struct.unpack_from("<I", sb_data, 32)[0]
s_inodes_per_group = struct.unpack_from("<I", sb_data, 40)[0]
s_inode_size = struct.unpack_from("<H", sb_data, 88)[0]
s_first_data_block = struct.unpack_from("<I", sb_data, 20)[0]
block_size = 1024 << s_log_block_size
return {
"block_size": block_size,
"blocks_per_group": s_blocks_per_group,
"inodes_per_group": s_inodes_per_group,
"inode_size": s_inode_size,
"first_data_block": s_first_data_block,
}
def read_group_descriptor(f: io.BufferedReader, gd_table_offset: int, group_index: int) -> dict:
"""
指定したグループインデックスの「グループディスクリプタ」を読み、
- inode_table_lo (lower 32bit)
- inode_table_hi (upper 32bit)
を辞書で返す。
"""
size_per_gd = 64 # ext4 では少なくとも 64 bytes
f.seek(gd_table_offset + group_index * size_per_gd)
raw = f.read(size_per_gd)
inode_table_lo = struct.unpack_from("<I", raw, 8)[0]
inode_table_hi = struct.unpack_from("<I", raw, 36)[0]
return {
"inode_table_lo": inode_table_lo,
"inode_table_hi": inode_table_hi,
}
def read_inode(f: io.BufferedReader,
inode_number: int,
superblock: dict,
gd_table_offset: int) -> Tuple[dict, bytes]:
"""
指定 inode_number の inode 構造を読み込み、
- inode_meta (dict)
- raw_i_block (bytes: 15×4 = 60 bytes)
を返す。
"""
block_size = superblock["block_size"]
inodes_per_group = superblock["inodes_per_group"]
inode_size = superblock["inode_size"]
# グループインデックス算出
group_index = (inode_number - 1) // inodes_per_group
local_index = (inode_number - 1) % inodes_per_group
# そのグループの inode テーブル先頭ブロック番号を取得
gd = read_group_descriptor(f, gd_table_offset, group_index)
inode_table_block = (gd["inode_table_hi"] << 32) | gd["inode_table_lo"]
# inode テーブル領域先頭バイトオフセット
inode_table_offset = inode_table_block * block_size
inode_offset = inode_table_offset + local_index * inode_size
# inode 構造を最初の 128 bytes 分読む
f.seek(inode_offset)
raw_inode = f.read(max(inode_size, 128))
# 必要なフィールドを unpack
unpacked = struct.unpack_from(INODE_STRUCT_FMT, raw_inode, 0)
(
i_mode, i_uid, i_size_lo, i_atime, i_ctime, i_mtime, i_dtime,
i_gid, i_links_count, i_blocks_lo, i_flags, i_osd1,
*i_block_list
) = unpacked
# 大きいファイル用に i_size_high を読む (offset 108、4 bytes)
i_size_high = struct.unpack_from("<I", raw_inode, 108)[0]
full_size = (i_size_high << 32) | i_size_lo
inode_meta = {
"i_mode": i_mode,
"i_uid": i_uid,
"i_size": full_size,
"i_blocks_lo": i_blocks_lo,
"i_flags": i_flags,
"i_links_count": i_links_count,
}
# raw_i_block 部分 (60 bytes) をそのまま保持
raw_i_block = raw_inode[40:40 + 15*4]
return inode_meta, raw_i_block
def parse_extents_tree(f: io.BufferedReader,
raw_root: bytes,
block_size: int) -> List[Tuple[int,int]]:
"""
inode の raw_i_block (最初の 60 bytes) に含まれる「extent ヘッダ/index」を読む。
戻り値: [(logical_block, physical_block), ...] のリスト(すべての leaf extents をフラットに)
"""
eh_magic, eh_entries, eh_max, eh_depth, eh_generation = struct.unpack_from(EXTENT_HEADER_FMT, raw_root, 0)
if eh_magic != EXT4_EXTENT_MAGIC:
raise RuntimeError(f"Invalid extent magic: 0x{eh_magic:04x}")
# [1] depth=0 → 葉ノード
if eh_depth == 0:
ret: List[Tuple[int,int]] = []
offset = struct.calcsize(EXTENT_HEADER_FMT) # 12 bytes
for _ in range(eh_entries):
ee_block, ee_len, ee_start_hi, ee_start_lo = struct.unpack_from(EXTENT_LEAF_FMT, raw_root, offset)
physical = (ee_start_hi << 32) | ee_start_lo
# 連続ブロックをすべて追加 (logical: ee_block + j, physical: physical + j)
for j in range(ee_len):
ret.append((ee_block + j, physical + j))
offset += struct.calcsize(EXTENT_LEAF_FMT)
return ret
# [2] depth>0 → index ノード
results: List[Tuple[int,int]] = []
offset = struct.calcsize(EXTENT_HEADER_FMT)
for _ in range(eh_entries):
ei_block, ei_leaf_lo, ei_leaf_hi, ei_unused = struct.unpack_from(EXTENT_INDEX_FMT, raw_root, offset)
leaf_block = (ei_leaf_hi << 32) | ei_leaf_lo
# 下位ノードを block_size bytes 読み出し
data_offset = leaf_block * block_size
f.seek(data_offset)
page = f.read(block_size)
# 再帰的に同じ parse_extents_tree を呼ぶ (leaf ノードの header 部分は先頭から)
results.extend(parse_extents_tree(f, page[:60], block_size))
offset += struct.calcsize(EXTENT_INDEX_FMT)
return results
def extract_file_data_and_mapping(f: io.BufferedReader,
inode_meta: dict,
raw_i_block: bytes,
superblock: dict) -> Tuple[bytes, List[Tuple[int,int]]]:
"""
- i_flags に EXT4_EXTENTS_FL が立っている想定でのみ使う
- extent ツリーをたどり、ファイル全体をバイト列で返す (truncate も含む)
戻り値: (data_bytes, mapping)
- data_bytes: 実際のファイルサイズぶんだけ詰めた bytes
- mapping: [(logical_block, physical_block), ...]
"""
block_size = superblock["block_size"]
mapping = parse_extents_tree(f, raw_i_block, block_size)
# mapping→physical ブロック順にソート
mapping.sort(key=lambda x: x[0])
buffer = bytearray()
for logical, physical in mapping:
f.seek(physical * block_size)
buffer.extend(f.read(block_size))
# あふれ分はファイルサイズで切り詰め
data_bytes = bytes(buffer[: inode_meta["i_size"]])
return data_bytes, mapping
def extract_file_data_direct_and_mapping(f: io.BufferedReader,
raw_i_block: bytes,
inode_meta: dict,
superblock: dict) -> Tuple[bytes, List[Tuple[int,int]]]:
"""
ext4 で extent フラグが立っていない場合 (direct/indirect) の簡易処理
- direct ブロックのみ対応 (i_block[0..11])
戻り値: (data_bytes, mapping)
"""
block_size = superblock["block_size"]
# raw_i_block: 60 bytes → struct.unpack して 15 個の 32bit を得る
i_block_vals = list(struct.unpack("<15I", raw_i_block))
direct_blocks = i_block_vals[:12]
buffer = bytearray()
mapping: List[Tuple[int,int]] = []
for idx, pb in enumerate(direct_blocks):
if pb == 0:
break
f.seek(pb * block_size)
buffer.extend(f.read(block_size))
mapping.append((idx, pb))
data_bytes = bytes(buffer[: inode_meta["i_size"]])
return data_bytes, mapping
def parse_directory_data(data: bytes) -> List[Tuple[int, str]]:
"""
生のディレクトリコンテンツ (bytes) から、(inode_number, name) のリストを返す。
ext4 のディレクトリエントリ形式に従い、1 エントリずつ走査する。
"""
entries: List[Tuple[int, str]] = []
offset = 0
total = len(data)
while offset < total:
# 最低 8 bytes は確保されていないと無効
if offset + 8 > total:
break
inode_num = struct.unpack_from("<I", data, offset)[0]
rec_len = struct.unpack_from("<H", data, offset + 4)[0]
name_len = data[offset + 6]
file_type = data[offset + 7] # 必要なら使う (1=ファイル, 2=ディレクトリ, etc)
if inode_num == 0 or rec_len < 8:
break
name = data[offset + 8 : offset + 8 + name_len].decode("utf-8", errors="ignore")
entries.append((inode_num, name))
offset += rec_len
return entries
def find_filename_by_inode(f: io.BufferedReader,
target_inode: int,
superblock: dict,
gd_table_offset: int) -> Optional[str]:
"""
ルートディレクトリ (inode=2) から再帰的にディレクトリをたどり、
target_inode を指すディレクトリエントリの「ファイル名」を返す。
見つからなければ None。
"""
visited_dirs = set()
def recurse(current_inode: int, path_prefix: str) -> Optional[str]:
"""
current_inode (ディレクトリ) の下を探索。見つかればファイル名を返す。
path_prefix は「dirname/...」のように途中で辿ってきたパス (今は使わず)。
"""
if current_inode in visited_dirs:
return None
visited_dirs.add(current_inode)
# [1] current_inode の inode 構造を読む
inode_meta, raw_i_block = read_inode(f, current_inode, superblock, gd_table_offset)
# ディレクトリでなければ無効
# i_mode の上位 4 bit が 0b0100 (0x4000) ならディレクトリ
if (inode_meta["i_mode"] & 0xF000) != 0x4000:
return None
# [2] extent からディレクトリデータを取り出す
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
dir_data, _ = extract_file_data_and_mapping(f, inode_meta, raw_i_block, superblock)
else:
dir_data, _ = extract_file_data_direct_and_mapping(f, raw_i_block, inode_meta, superblock)
# [3] parse_directory_data して (inode, name) を列挙
entries = parse_directory_data(dir_data)
for inode_num, name in entries:
if inode_num == 0:
continue
# "." と ".." はスキップ
if name in (".", ".."):
continue
# [4] ターゲット inode を発見したら name を返す
if inode_num == target_inode:
return name
# [5] 見つからなかったら、サブディレクトリを再帰探索
for inode_num, name in entries:
if inode_num == 0:
continue
if name in (".", ".."):
continue
# サブディレクトリだったら再帰
# i_mode をチェックするため、その inode のメタを読む
sub_meta, _ = read_inode(f, inode_num, superblock, gd_table_offset)
if (sub_meta["i_mode"] & 0xF000) == 0x4000:
found = recurse(inode_num, path_prefix + "/" + name)
if found:
return found
return None
# ルートディレクトリ inode は常に 2
return recurse(2, "")
# ----------------------------
# [8] メイン処理: イメージから inode=14 のファイルを取り出し、
# ファイル名・物理アドレス・ハッシュチェック まで行う
# ----------------------------
def main(image_path: str, target_inode: int):
# (A) まずイメージファイルを開く
with open(image_path, "rb") as f:
# 1) スーパーブロックを取得
sb = read_superblock(f)
block_size = sb["block_size"]
# 2) グループディスクリプタテーブル先頭バイトオフセットを計算
if sb["first_data_block"] == 0:
gd_table_offset = block_size
else:
gd_table_offset = (sb["first_data_block"] + 1) * block_size
# 3) target_inode の inode メタ & raw_i_block を取得
inode_meta, raw_i_block = read_inode(f, target_inode, sb, gd_table_offset)
# 4) ファイル名を探す (ルートディレクトリから再帰)
filename = find_filename_by_inode(f, target_inode, sb, gd_table_offset)
if filename is None:
print(f"⚠️ inode={target_inode} を指すファイル名が見つかりませんでした。")
filename = f"inode_{target_inode}"
else:
print(f"✅ inode={target_inode} のファイル名: '{filename}'")
# 5) extent フラグをチェックし、(data_bytes, mapping) を得る
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
data_bytes, mapping = extract_file_data_and_mapping(f, inode_meta, raw_i_block, sb)
else:
data_bytes, mapping = extract_file_data_direct_and_mapping(f, raw_i_block, inode_meta, sb)
if not mapping:
print("⚠️ 物理ブロックマッピングが空でした。")
else:
# mapping は (logical, physical) のリスト。最初の要素の physical を使う
_, first_phys = mapping[0]
phys_offset = first_phys * block_size
print(f"✅ ファイルデータの先頭物理ブロック番号: {first_phys}")
print(f"✅ 物理アドレス(バイトオフセット) の先頭: {phys_offset} bytes")
# 6) 出力先ディレクトリはイメージファイルと同じ
image_dir = os.path.dirname(os.path.abspath(image_path))
output_path = os.path.join(image_dir, filename)
# 7) ファイルデータを書き出す
with open(output_path, "wb") as out_f:
out_f.write(data_bytes)
print(f"✅ ファイルを '{output_path}' として保存しました。")
# 8) SHA-256 ハッシュを計算し、指定の値と比較
h = hashlib.sha256()
h.update(data_bytes)
digest = h.hexdigest()
expected = "9f964b89facdc3671569c812ba8b3fadd20f9dfcf1873b06fe672761392e7523"
print(f"🔑 出力ファイルの SHA-256: {digest}")
if digest == expected:
print("✅ ハッシュ値が一致しました。")
else:
print("❌ ハッシュ値が一致しません!")
print(f" 期待値: {expected}")
print(f" 実際: {digest}")
sys.exit(1)
if __name__ == "__main__":
image_path = "questions.dd"
main(image_path, target_inode=16)
問7-3
import struct
import io
import sys
import os
import hashlib
from typing import List, Tuple, Optional
# ----------------------------
# 定数定義(ext4 on-disk フォーマット)
# ----------------------------
EXT4_SUPERBLOCK_OFFSET = 1024
EXT4_SUPERBLOCK_SIZE = 1024 # bytes
# ext4 inode フラグ (i_flags) 内
EXT4_EXTENTS_FL = 0x80000 # extents フォーマットが使われているかどうか
# extent ヘッダ内のマジック
EXT4_EXTENT_MAGIC = 0xF30A
# ----------------------------
# inode 構造体から最初の 128 バイトを unpack するフォーマット
# ----------------------------
INODE_STRUCT_FMT = "<H H I I I I I H H I I 15I"
# H: i_mode
# H: i_uid
# I: i_size_lo
# I: i_atime
# I: i_ctime
# I: i_mtime
# I: i_dtime
# H: i_gid
# H: i_links_count
# I: i_blocks_lo
# I: i_flags
# I: i_osd1
# 15I: i_block[15] (60 bytes, offset=40~99)
# ----------------------------
# extent ヘッダ/エントリ/インデックスの on-disk 構造
# ----------------------------
EXTENT_HEADER_FMT = "<H H H H I"
# eh_magic (2) | eh_entries (2) | eh_max (2) | eh_depth (2) | eh_generation (4)
EXTENT_INDEX_FMT = "<I I H H"
# ei_block (4) | ei_leaf_lo (4) | ei_leaf_hi (2) | ei_unused (2)
EXTENT_LEAF_FMT = "<I H H I"
# ee_block (4) | ee_len (2) | ee_start_hi (2) | ee_start_lo (4)
# ----------------------------
# ディレクトリエントリの on-disk フォーマット
# ----------------------------
# - inode (4 bytes)
# - rec_len (2 bytes)
# - name_len (1 byte)
# - file_type (1 byte)
# - name (name_len bytes)
# - padding (rec_len - (8 + name_len)) bytes
# ----------------------------
# ユーティリティ関数群
# ----------------------------
def read_superblock(f: io.BufferedReader) -> dict:
"""
スーパーブロックを読み込み、必要なフィールドを辞書で返す:
- block_size (int)
- inodes_per_group (int)
- inode_size (int)
- first_data_block (int)
"""
f.seek(EXT4_SUPERBLOCK_OFFSET)
sb_data = f.read(EXT4_SUPERBLOCK_SIZE)
s_log_block_size = struct.unpack_from("<I", sb_data, 24)[0]
s_blocks_per_group = struct.unpack_from("<I", sb_data, 32)[0]
s_inodes_per_group = struct.unpack_from("<I", sb_data, 40)[0]
s_inode_size = struct.unpack_from("<H", sb_data, 88)[0]
s_first_data_block = struct.unpack_from("<I", sb_data, 20)[0]
block_size = 1024 << s_log_block_size # 例: s_log_block_size=2 → block_size=4096
return {
"block_size": block_size,
"blocks_per_group": s_blocks_per_group,
"inodes_per_group": s_inodes_per_group,
"inode_size": s_inode_size,
"first_data_block": s_first_data_block,
}
def read_group_descriptor(f: io.BufferedReader, gd_table_offset: int, group_index: int) -> dict:
"""
グループインデックスを指定して、そのグループのディスクリプタを読み込む。
必要なのは inode_table_lo, inode_table_hi のみ。
戻り値:
- inode_table_lo (int)
- inode_table_hi (int)
"""
size_per_gd = 64 # ext4 では少なくとも 64 バイト
f.seek(gd_table_offset + group_index * size_per_gd)
raw = f.read(size_per_gd)
inode_table_lo = struct.unpack_from("<I", raw, 8)[0]
inode_table_hi = struct.unpack_from("<I", raw, 36)[0]
return {
"inode_table_lo": inode_table_lo,
"inode_table_hi": inode_table_hi,
}
def read_inode(f: io.BufferedReader,
inode_number: int,
superblock: dict,
gd_table_offset: int) -> Tuple[dict, bytes]:
"""
指定した inode_number の inode 構造を読み込む。
返り値:
- inode_meta (dict) → { i_mode, i_uid, i_size, i_blocks_lo, i_flags, i_links_count }
- raw_i_block (bytes) → i_block[15] 部分 (60 bytes)
"""
block_size = superblock["block_size"]
inodes_per_group = superblock["inodes_per_group"]
inode_size = superblock["inode_size"]
# グループインデックスとグループ内相対インデックスを計算
group_index = (inode_number - 1) // inodes_per_group
local_index = (inode_number - 1) % inodes_per_group
# そのグループの inode テーブル先頭ブロックを取得
gd = read_group_descriptor(f, gd_table_offset, group_index)
inode_table_block = (gd["inode_table_hi"] << 32) | gd["inode_table_lo"]
# inode テーブルの先頭バイトオフセット
inode_table_offset = inode_table_block * block_size
# ターゲット inode のディスク上オフセット
inode_offset = inode_table_offset + local_index * inode_size
# 最低 128 bytes は読んでおく (inode_size が 128 より大きいケースもあるが、
# i_block[15] は最初の 60 bytes(offset=40~99)にあるので 128 もあれば十分)
f.seek(inode_offset)
raw_inode = f.read(max(inode_size, 128))
# unpack
unpacked = struct.unpack_from(INODE_STRUCT_FMT, raw_inode, 0)
(
i_mode, i_uid, i_size_lo, i_atime, i_ctime, i_mtime, i_dtime,
i_gid, i_links_count, i_blocks_lo, i_flags, i_osd1,
*i_block_list
) = unpacked
# 大きなファイルの上位 32bit を取得 (i_size_high は offset=108~111 にある)
i_size_high = struct.unpack_from("<I", raw_inode, 108)[0]
full_size = (i_size_high << 32) | i_size_lo # 実際のファイルサイズ (バイト数)
inode_meta = {
"i_mode": i_mode,
"i_uid": i_uid,
"i_size": full_size,
"i_blocks_lo": i_blocks_lo,
"i_flags": i_flags,
"i_links_count": i_links_count,
}
# raw_i_block (60 bytes) は offset=40 から取得
raw_i_block = raw_inode[40 : 40 + 15*4]
return inode_meta, raw_i_block
def parse_extents_records(f: io.BufferedReader,
raw_root: bytes,
block_size: int) -> List[Tuple[int, int, int]]:
"""
扱いやすい「extent レコード」のリストを返す:
[(logical_block_start, length_blocks, physical_block_start), ...]
- raw_root: i_block 領域の最初の 60 bytes
- block_size: ブロックサイズ (例: 4096)
ext4 の extents ツリーをたどり、
- leaf ノードならそのまま (ee_block, ee_len & 0x7FFF, ee_physical_start) を取得
- index ノードなら子ノードを再帰的に読み込んでフラットにまとめる
"""
# ヘッダ部 (12 bytes) を unpack
eh_magic, eh_entries, eh_max, eh_depth, eh_generation = struct.unpack_from(EXTENT_HEADER_FMT, raw_root, 0)
if eh_magic != EXT4_EXTENT_MAGIC:
raise RuntimeError(f"Invalid extent magic: 0x{eh_magic:04x}")
# [1] depth = 0 → ここは leaf ノード。本体に eh_entries 個分の ext4_extent 構造 (12 bytes × eh_entries) が並んでいる
if eh_depth == 0:
records: List[Tuple[int, int, int]] = []
offset = struct.calcsize(EXTENT_HEADER_FMT) # 12 bytes
for _ in range(eh_entries):
ee_block, ee_len_raw, ee_start_hi, ee_start_lo = struct.unpack_from(EXTENT_LEAF_FMT, raw_root, offset)
# 上位ビット (0x8000) は uninitialized フラグなので、下位 15 ビットだけ取る
length_blocks = ee_len_raw & 0x7FFF
ee_physical = (ee_start_hi << 32) | ee_start_lo
records.append((ee_block, length_blocks, ee_physical))
offset += struct.calcsize(EXTENT_LEAF_FMT)
return records
# [2] depth > 0 → index ノード。本体に eh_entries 個の ext4_extent_idx (12 bytes × eh_entries) が並ぶ
all_records: List[Tuple[int, int, int]] = []
offset = struct.calcsize(EXTENT_HEADER_FMT) # 12 bytes
for _ in range(eh_entries):
ei_block, ei_leaf_lo, ei_leaf_hi, ei_unused = struct.unpack_from(EXTENT_INDEX_FMT, raw_root, offset)
leaf_block = (ei_leaf_hi << 32) | ei_leaf_lo
# 物理ブロック leaf_block の先頭 (leaf_block * block_size) から block_size bytes 読む
data_offset = leaf_block * block_size
f.seek(data_offset)
page = f.read(block_size)
# 再帰的にこのブロックを parse_extents_records で扱う
all_records.extend(parse_extents_records(f, page, block_size))
offset += struct.calcsize(EXTENT_INDEX_FMT)
return all_records
def extract_file_data_with_holes(f: io.BufferedReader,
inode_meta: dict,
raw_i_block: bytes,
superblock: dict) -> bytes:
"""
- extent フラグ (i_flags & EXT4_EXTENTS_FL) が立っている場合のファイルデータ抽出。
- ホール部分はゼロバイトで埋める。
- 戻り値: ファイル本体を示す bytes (inode_meta["i_size"] 分)
"""
block_size = superblock["block_size"]
file_size = inode_meta["i_size"]
# (1) extent レコードを取得 [(logical_block, length_blocks, physical_block), ...]
records = parse_extents_records(f, raw_i_block, block_size)
# 論理ブロック順にソートしておく
records.sort(key=lambda x: x[0])
# (2) ファイル全体長分のバッファをゼロで初期化
buffer = bytearray(b"\x00") * file_size
# (3) 各レコードをまとめて読み込み、対応するファイル内オフセットにコピーする
for logical_block, length_blocks, physical_start in records:
# ファイル内バイトオフセット
dest_offset = logical_block * block_size
# 読み込むバイト数 (extent 長 × block_size) ただしファイル長を超えたら truncate
read_len = length_blocks * block_size
if dest_offset + read_len > file_size:
read_len = file_size - dest_offset
# ディスクからまとめて読み出す
f.seek(physical_start * block_size)
data = f.read(read_len)
# 読み出した bytes をバッファにコピー
buffer[dest_offset : dest_offset + read_len] = data
return bytes(buffer)
def extract_file_data_direct_and_mapping(f: io.BufferedReader,
raw_i_block: bytes,
inode_meta: dict,
superblock: dict) -> bytes:
"""
- extent フラグが立っていない (direct/indirect) 場合の簡易取り出し。
- 今回は direct ブロックのみ対応 (i_block[0..11])。間接ブロックは未実装。
- ホールは考慮せず、直接ブロックしか使わない想定。
- 戻り値: ファイル本体を示す bytes (inode_meta["i_size"] 分, 直後に truncate)
"""
block_size = superblock["block_size"]
file_size = inode_meta["i_size"]
i_block_vals = list(struct.unpack("<15I", raw_i_block))
direct_blocks = i_block_vals[:12]
buffer = bytearray(b"\x00") * file_size
read_so_far = 0
for idx, pb in enumerate(direct_blocks):
if pb == 0 or read_so_far >= file_size:
break
to_read = min(block_size, file_size - read_so_far)
f.seek(pb * block_size)
data = f.read(to_read)
buffer[idx * block_size : idx * block_size + to_read] = data
read_so_far += to_read
return bytes(buffer)
def parse_directory_data(data: bytes) -> List[Tuple[int, str]]:
"""
生のディレクトリバイト列 (data) から、(inode_number, name) のリストを返す。
ext4 の directory entry フォーマットに従う。
"""
entries: List[Tuple[int, str]] = []
offset = 0
total = len(data)
while offset < total:
if offset + 8 > total:
break
inode_num = struct.unpack_from("<I", data, offset)[0]
rec_len = struct.unpack_from("<H", data, offset + 4)[0]
name_len = data[offset + 6]
_file_type = data[offset + 7]
if inode_num == 0 or rec_len < 8:
break
name = data[offset + 8 : offset + 8 + name_len].decode("utf-8", errors="ignore")
entries.append((inode_num, name))
offset += rec_len
return entries
def find_filename_by_inode(f: io.BufferedReader,
target_inode: int,
superblock: dict,
gd_table_offset: int) -> Optional[str]:
"""
ルートディレクトリ (inode=2) から再帰的にディレクトリをたどり、
target_inode を指すディレクトリエントリの「ファイル名」を返す。
なければ None。
"""
visited_dirs = set()
def recurse(current_inode: int) -> Optional[str]:
if current_inode in visited_dirs:
return None
visited_dirs.add(current_inode)
inode_meta, raw_i_block = read_inode(f, current_inode, superblock, gd_table_offset)
# i_mode の上位ビット (0xF000) が 0x4000 → ディレクトリ
if (inode_meta["i_mode"] & 0xF000) != 0x4000:
return None
# ディレクトリデータを読む
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
dir_data = extract_file_data_with_holes(f, inode_meta, raw_i_block, superblock)
else:
dir_data = extract_file_data_direct_and_mapping(f, raw_i_block, inode_meta, superblock)
entries = parse_directory_data(dir_data)
for inode_num, name in entries:
if inode_num == 0 or name in (".", ".."):
continue
if inode_num == target_inode:
return name
for inode_num, name in entries:
if inode_num == 0 or name in (".", ".."):
continue
sub_meta, _ = read_inode(f, inode_num, superblock, gd_table_offset)
if (sub_meta["i_mode"] & 0xF000) == 0x4000:
found = recurse(inode_num)
if found:
return found
return None
return recurse(2) # ルートディレクトリ inode=2 から開始
def main(image_path: str, target_inode: int):
# イメージファイルを開く
with open(image_path, "rb") as f:
# 1) スーパーブロックを読む
sb = read_superblock(f)
block_size = sb["block_size"]
# 2) グループディスクリプタテーブル先頭バイトオフセットを計算
if sb["first_data_block"] == 0:
gd_table_offset = block_size
else:
gd_table_offset = (sb["first_data_block"] + 1) * block_size
# 3) target_inode の inode メタ & raw_i_block を読む
inode_meta, raw_i_block = read_inode(f, target_inode, sb, gd_table_offset)
# 4) ファイル名を探す
filename = find_filename_by_inode(f, target_inode, sb, gd_table_offset)
if filename is None:
print(f"⚠️ inode={target_inode} を指すファイル名が見つかりませんでした。")
filename = f"inode_{target_inode}"
else:
print(f"✅ inode={target_inode} のファイル名: '{filename}'")
# 5) ファイルデータを取り出す
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
data_bytes = extract_file_data_with_holes(f, inode_meta, raw_i_block, sb)
else:
data_bytes = extract_file_data_direct_and_mapping(f, raw_i_block, inode_meta, sb)
# 6) 物理アドレス(先頭物理ブロック&バイトオフセット)を表示
if inode_meta["i_flags"] & EXT4_EXTENTS_FL:
# extent レコードを取得して最初のものを使う
records = parse_extents_records(f, raw_i_block, block_size)
if records:
first_logical, first_len, first_phys = sorted(records, key=lambda x: x[0])[0]
phys_offset = first_phys * block_size
print(f"✅ ファイルデータの先頭物理ブロック番号: {first_phys}")
print(f"✅ 物理アドレス(バイトオフセット) の先頭: {phys_offset} bytes")
else:
# direct モードなら i_block[0] を使う
i_block_vals = list(struct.unpack("<15I", raw_i_block))
first_phys = i_block_vals[0] if i_block_vals else 0
phys_offset = first_phys * block_size
print(f"✅ ファイルデータの先頭物理ブロック番号: {first_phys}")
print(f"✅ 物理アドレス(バイトオフセット) の先頭: {phys_offset} bytes")
# 7) イメージと同じディレクトリにファイルを保存
image_dir = os.path.dirname(os.path.abspath(image_path))
output_path = os.path.join(image_dir, filename)
with open(output_path, "wb") as out_f:
out_f.write(data_bytes)
print(f"✅ ファイルを '{output_path}' として保存しました。")
# 8) SHA-256 ハッシュを計算して検証
h = hashlib.sha256()
h.update(data_bytes)
digest = h.hexdigest()
expected = "31055b7421279896ad6e0b8d2f6993ee219c2ba88d758917ac2f662e39dba32e"
print(f"🔑 出力ファイルの SHA-256: {digest}")
if digest == expected:
print("✅ ハッシュ値が一致しました。")
else:
print("❌ ハッシュ値が一致しません!")
print(f" 期待値: {expected}")
print(f" 実際: {digest}")
sys.exit(1)
if __name__ == "__main__":
image_path = "questions.dd"
main(image_path, target_inode=18)
問7-4
初めにautopsyで中身を確認していたので、inode16にはinode14と違って、jpgファイルが含まれていることがわかりましたが、いずれにしろ、 バイナリの中身を取ってきているので、拡張子を変えればjpgでも大丈夫だろうと処理は変えていません。
問7-5
autopsyで中身を見たときは、empty fileとなっていますが、ファイルサイズが大きく、開始アドレスの欄もエラーになっていることからめんどくさそうだなと、とりあえずinode14,16のファイルで実行してみたところ次のようなエラーが出ました。
struct.error: unpack_from requires a buffer of at least 72 bytes for unpacking 12 bytes at offset 60 (actual buffer size is 60)
まずは、エラー文から処理をしていこうとpage[:60]と実装していた部分をpage全体を渡すようにしました。
そして、今まではインデックスノード内の全エントリ(12バイト×eh_entries)を含められず、unpack_fromが足りないバッファ長を要求してクラッシュしていました。
なので、古ブロックを渡すことで、インデックス/リーフのオードどちらかも必要量を確実に読み込めるようにしました。
また、ファイルサイズが大きかったので、リーフノードの各ee_len * ee_blockに対してループし、ブロック単位で(論理、物理)ペアを大量に展開していたのを、レコード単位でまとめて高速化しました。
これで、一応取り出すことはできたのですが、ハッシュ値が合いませんでした。
ためしに、pytsk3でleaf(問題ファイルの名前)を取り出してみると、ハッシュ値が一致したので、私のプログラムがなにかおかしいことになります。
まず、stringsコマンドで調べてみると、leaf1からleaf100000までが出てきました。
hexeditorで二つの差異を調べてみると、正解のほうは、leaf1からleaf2までのオフセットが0x2000だったのに対し、私の作ったほうでは、0x1000でした。
その間はすべて0で埋められているので何かを見落としているはずです。
ファイルサイズが大きいという点が肝だと思うので、いろいろ調べていると、https://www.kernel.org/doc/html/latest/filesystems/ext4/dynamic.htmlに次のような記載がありました。
Number of blocks covered by extent. If the value of this field is <= 32768, the extent is initialized. If the value of the field is > 32768, the extent is uninitialized and the actual extent length is ee_len – 32768. Therefore, the maximum length of a initialized extent is 32768 blocks, and the maximum length of an uninitialized extent is 32767. |
なので、一応length_blocks = ee_len_raw & 0x7FFFとしましたが、のちのち試してみるとこれはあまり関係ないことがわかりました。
オフセットの計算がダメなのでそれについていろいろ調べていると、ee_blockによる計算を飛ばしていることがわかりました。
旧実装では、そのまま単に順番に.extend()していたのでオフセットが正しくありませんでしたが、ee_block * block_sizeとすることで正しいオフセットにすることができました。
問7-6
LLMはライブラリを使った作成は得意ですが、今回のように一から実装するというものは得意ではないと思うので、ext4の構造について詳しく書かれた記事等を集めさせるのに利用しました。
また、コードの作成の部分では、課題を丸投げしても絶対に無理なので、pythonの文法について聞いたり、私が構造をみてこのような実装をしたいと思ったときに詳しくこのような実装をしたいと伝えると、いい感じに実装してくれるのでそれも利用しました。