- 环境准备
windows10 64bit
python3.7 64bit
打开可执行文件,创建进程
定义变量
以下代码用 ctypes 定义了调用 windows API 需要的结构
- my_debugger_define.py
import ctypes
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
调用CreateProcess
值得注意的是,原版用的是python2,是ascii编码,这里使用的是python3,是Unicode编码,所以使用 CreateProcessW 函数
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
pass
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
- my_test.py
import my_debugger
debugger = my_debugger.debugger()
debugger.load("11.exe")
打开pid对应的进程
一些需要的结构体
- my_debugger_define.py
import ctypes
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
class EXCEPTION_RECORD(ctypes.Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode", DWORD),
("ExceptionFlags", DWORD),
("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),
("ExceptionAddress", PVOID),
("NumberParameters", DWORD),
("ExceptionInformation", UINT_PTR * 15),
]
class EXCEPTION_DEBUG_INFO(ctypes.Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance", DWORD),
]
class DEBUG_EVENT_UNION(ctypes.Union):
_fields_ = [
("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),
]
class DEBUG_EVENT(ctypes.Structure):
_fields_ = [
("dwDebugEventCode", DWORD),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
("u", DEBUG_EVENT_UNION),
]
要实现附加,需要:
打开进程,获得进程的所有权限。
让进程等待调试事件发生。
调试事件发生,要处理调试事件(这里还没有处理调试事件)。
处理完调试事件,下一步继续执行。
解除附加。
如下代码,有比较详细的注释
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
# 保存上面创建进程的句柄,以供后续进程访问使用
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
# 打开 pid 进程,获得该进程的所有权限
# 返回该进程句柄
def open_process(self, pid):
return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
def attach(self, pid):
self.h_process = self.open_process(pid)
# DebugActiveProcess 附加进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
self.run()
else:
print("[*] Unable to attach to the process.")
# 循环调用 WaitForDebugEvent ,等待调试事件发生
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = my_debugger_defines.DEBUG_EVENT()
continue_status = my_debugger_defines.DBG_CONTINUE
# INFINITE:直到有调试事件发生
if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
input("press a key to continue...")
self.debugger_active = False
# 调试事件处理完成后,恢复原执行状态
# continue_status -> DBG_CONTINUE:下一步继续执行
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status
)
# DebugActiveProcessStop:去除附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
return True
else:
print("There was an error")
return False
- test.py
import my_debugger
debugger = my_debugger.debugger()
pid = input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
debugger.detach()
获取寄存器状态信息
这里值得说的是,在define中,CONTEXT应该是64位的版本,如果python是32的,CONTEXT就用老的版本。如果在64位的调式器中使用老版本的话就会得到 context全零的情况。
- debugger_define.py
import ctypes
BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF
# snapshot
TH32CS_SNAPTHREAD = 0x00000004
# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
class EXCEPTION_RECORD(ctypes.Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode", DWORD),
("ExceptionFlags", DWORD),
("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),
("ExceptionAddress", PVOID),
("NumberParameters", DWORD),
("ExceptionInformation", UINT_PTR * 15),
]
class EXCEPTION_DEBUG_INFO(ctypes.Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance", DWORD),
]
class DEBUG_EVENT_UNION(ctypes.Union):
_fields_ = [
("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),
]
class DEBUG_EVENT(ctypes.Structure):
_fields_ = [
("dwDebugEventCode", DWORD),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
("u", DEBUG_EVENT_UNION),
]
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ThreadID", DWORD),
("th32OwnerProcessID", DWORD),
("tpBasePri", DWORD),
("tpDeltaPri", DWORD),
("dwFlags", DWORD),
]
class FLOATING_SAVE_AREA(ctypes.Structure):
_fields_ = [
("ControlWord", DWORD),
("StatusWord", DWORD),
("TagWord", DWORD),
("ErrorOffset", DWORD),
("ErrorSelector", DWORD),
("DataOffset", DWORD),
("DataSelector", DWORD),
("RegisterArea", BYTE * 80),
("Cr0NpxState", DWORD),
]
# class CONTEXT(ctypes.Structure):
# _fields_ = [
# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]
DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong
class M128A(ctypes.Structure):
_fields_ = [
("Low", ULONGLONG),
("High", LONGLONG)
]
class CONTEXT_UNION(ctypes.Union):
_fields_ = [
("S", DWORD * 32)
]
class CONTEXT(ctypes.Structure):
_fields_ = [
("P1Home", DWORD64),
("P2Home", DWORD64),
("P3Home", DWORD64),
("P4Home", DWORD64),
("P5Home", DWORD64),
("P6Home", DWORD64),
("ContextFlags", DWORD),
("MxCsr", DWORD),
("SegCs", WORD),
("SegDs", WORD),
("SegEs", WORD),
("SegFs", WORD),
("SegGs", WORD),
("SegSs", WORD),
("EFlags", DWORD),
("Dr0", DWORD),
("Dr1", DWORD),
("Dr2", DWORD),
("Dr3", DWORD),
("Dr6", DWORD),
("Dr7", DWORD),
("Rax", DWORD64),
("Rcx", DWORD64),
("Rdx", DWORD64),
("Rbx", DWORD64),
("Rsp", DWORD64),
("Rbp", DWORD64),
("Rsi", DWORD64),
("Rdi", DWORD64),
("R8", DWORD64),
("R9", DWORD64),
("R10", DWORD64),
("R11", DWORD64),
("R12", DWORD64),
("R13", DWORD64),
("R14", DWORD64),
("R15", DWORD64),
("Rip", DWORD64),
("DUMMYUNIONNAME",CONTEXT_UNION),
("VectorRegister", M128A * 26),
("VectorControl", DWORD64),
("DebugControl", DWORD64),
("LastBranchToRip", DWORD64),
("LastBranchFromRip", DWORD64),
("LastExceptionToRip", DWORD64),
("LastExceptionFromRip", DWORD64)
]
这里有详细的注释
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
# 保存上面创建进程的句柄,以供后续进程访问使用
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
# 打开 pid 进程,获得该进程的所有权限
# 返回该进程句柄
def open_process(self, pid):
return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
def attach(self, pid):
self.h_process = self.open_process(pid)
# DebugActiveProcess 附加进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
# self.run()
else:
print("[*] Unable to attach to the process.")
# 循环调用 WaitForDebugEvent ,等待调试事件发生
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = my_debugger_defines.DEBUG_EVENT()
continue_status = my_debugger_defines.DBG_CONTINUE
# INFINITE:直到有调试事件发生
# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
input("press a key to continue...")
self.debugger_active = False
# 调试事件处理完成后,恢复原执行状态
# continue_status -> DBG_CONTINUE:下一步继续执行
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status
)
# DebugActiveProcessStop:去除附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
return True
else:
print("There was an error")
return False
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print("[*] Could not obtain a valid thread handle.")
return False
def enumerate_threads(self):
# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
thread_entry = my_debugger_defines.THREADENTRY32()
thread_list = []
# CreateToolhelp32Snapshot返回一个快照句柄
# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
# 需要一一对比
snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = ctypes.sizeof(thread_entry)
# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
while success:
# 对比该线程的拥有者是不是 pid
if thread_entry.th32OwnerProcessID == self.pid:
# 把该进程的线程放到列表
thread_list.append(thread_entry.th32ThreadID)
# 遍历后面所有线程
success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
# 获取线程上下文context
def get_thread_context(self, thread_id=None, h_thread=None):
# 线程上下文结构体为 CONTEXT
context = my_debugger_defines.CONTEXT()
context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS
# 获取线程句柄
h_thread = self.open_thread(thread_id)
# 或许线程上下文
if kernel32.GetThreadContext(h_thread, ctypes.byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
return False
- my_test.py
import my_debugger
import win32con
import win32api
debugger = my_debugger.debugger()
pid = input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
list = debugger.enumerate_threads()
for thread in list:
thread_context = debugger.get_thread_context(thread)
print("[*] Dumping registers for thread ID: 0x%08x." % thread)
print("[*] RIP 0x%08x" % thread_context.Rip)
print("[*] RSP 0x%08x" % thread_context.Rsp)
print("[*] RBP 0x%08x" % thread_context.Rbp)
print("[*] RAX 0x%08x" % thread_context.Rax)
print("[*] RBX 0x%08x" % thread_context.Rbx)
print("[*] RCX 0x%08x" % thread_context.Rcx)
print("[*] RDX 0x%08x" % thread_context.Rdx)
debugger.detach()
实现调试事件
- my_debugger_define.py
import ctypes
BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF
# snapshot
TH32CS_SNAPTHREAD = 0x00000004
# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010
# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1
# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
class EXCEPTION_RECORD(ctypes.Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode", DWORD),
("ExceptionFlags", DWORD),
("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),
("ExceptionAddress", PVOID),
("NumberParameters", DWORD),
("ExceptionInformation", UINT_PTR * 15),
]
class EXCEPTION_DEBUG_INFO(ctypes.Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance", DWORD),
]
class DEBUG_EVENT_UNION(ctypes.Union):
_fields_ = [
("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),
]
class DEBUG_EVENT(ctypes.Structure):
_fields_ = [
("dwDebugEventCode", DWORD),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
("u", DEBUG_EVENT_UNION),
]
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ThreadID", DWORD),
("th32OwnerProcessID", DWORD),
("tpBasePri", DWORD),
("tpDeltaPri", DWORD),
("dwFlags", DWORD),
]
class FLOATING_SAVE_AREA(ctypes.Structure):
_fields_ = [
("ControlWord", DWORD),
("StatusWord", DWORD),
("TagWord", DWORD),
("ErrorOffset", DWORD),
("ErrorSelector", DWORD),
("DataOffset", DWORD),
("DataSelector", DWORD),
("RegisterArea", BYTE * 80),
("Cr0NpxState", DWORD),
]
# class CONTEXT(ctypes.Structure):
# _fields_ = [
# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]
DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong
class M128A(ctypes.Structure):
_fields_ = [
("Low", ULONGLONG),
("High", LONGLONG)
]
class CONTEXT_UNION(ctypes.Union):
_fields_ = [
("S", DWORD * 32)
]
class CONTEXT(ctypes.Structure):
_fields_ = [
("P1Home", DWORD64),
("P2Home", DWORD64),
("P3Home", DWORD64),
("P4Home", DWORD64),
("P5Home", DWORD64),
("P6Home", DWORD64),
("ContextFlags", DWORD),
("MxCsr", DWORD),
("SegCs", WORD),
("SegDs", WORD),
("SegEs", WORD),
("SegFs", WORD),
("SegGs", WORD),
("SegSs", WORD),
("EFlags", DWORD),
("Dr0", DWORD),
("Dr1", DWORD),
("Dr2", DWORD),
("Dr3", DWORD),
("Dr6", DWORD),
("Dr7", DWORD),
("Rax", DWORD64),
("Rcx", DWORD64),
("Rdx", DWORD64),
("Rbx", DWORD64),
("Rsp", DWORD64),
("Rbp", DWORD64),
("Rsi", DWORD64),
("Rdi", DWORD64),
("R8", DWORD64),
("R9", DWORD64),
("R10", DWORD64),
("R11", DWORD64),
("R12", DWORD64),
("R13", DWORD64),
("R14", DWORD64),
("R15", DWORD64),
("Rip", DWORD64),
("DUMMYUNIONNAME",CONTEXT_UNION),
("VectorRegister", M128A * 26),
("VectorControl", DWORD64),
("DebugControl", DWORD64),
("LastBranchToRip", DWORD64),
("LastBranchFromRip", DWORD64),
("LastExceptionToRip", DWORD64),
("LastExceptionFromRip", DWORD64)
]
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
# 实现调试事件处理功能
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
# 保存上面创建进程的句柄,以供后续进程访问使用
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
# 打开 pid 进程,获得该进程的所有权限
# 返回该进程句柄
def open_process(self, pid):
return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
def attach(self, pid):
self.h_process = self.open_process(pid)
# DebugActiveProcess 附加进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
# self.run()
else:
print("[*] Unable to attach to the process.")
# 循环调用 WaitForDebugEvent ,等待调试事件发生
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = my_debugger_defines.DEBUG_EVENT()
continue_status = my_debugger_defines.DBG_CONTINUE
# INFINITE:直到有调试事件发生
# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
# 处理指定异常事件
# 获取线程句柄
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
# 打印调试事件类型和线程ID
print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))
if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
# 获取异常代码
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
# 异常地址
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
# 内存违法访问
if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
print("Acess Violation Detected.")
# 遇到一个断点
elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
# 内存断点
elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
print("Guard Page Access Detected.")
# 硬件断点
elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
print("Single Stepping.")
# input("press a key to continue...")
# self.debugger_active = False
# 调试事件处理完成后,恢复原执行状态
# continue_status -> DBG_CONTINUE:下一步继续执行
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status
)
def exception_handler_breakpoint(self):
print("[*] Inside the breakpoint handler.")
print("Exception address: 0x%08x" % self.exception_address)
return my_debugger_defines.DBG_CONTINUE
# DebugActiveProcessStop:去除附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
return True
else:
print("There was an error")
return False
# 通过线程ID打开线程,获取线程句柄
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print("[*] Could not obtain a valid thread handle.")
return False
# 获取属于pid进程的线程的ID
def enumerate_threads(self):
# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
thread_entry = my_debugger_defines.THREADENTRY32()
thread_list = []
# CreateToolhelp32Snapshot返回一个快照句柄
# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
# 需要一一对比
snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = ctypes.sizeof(thread_entry)
# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
while success:
# 对比该线程的拥有者是不是 pid
if thread_entry.th32OwnerProcessID == self.pid:
# 把该进程的线程ID放到列表
thread_list.append(thread_entry.th32ThreadID)
# 遍历后面所有线程
success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
# 通过线程ID,获取线程上下文context
def get_thread_context(self, thread_id=None, h_thread=None):
# 线程上下文结构体为 CONTEXT
context = my_debugger_defines.CONTEXT()
context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS
# 通过线程ID,获取线程句柄
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
# 通过线程句柄,线程上下文
if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
return False
- my_test.py
import my_debugger
debugger = my_debugger.debugger()
pid = input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
debugger.run()
debugger.detach()
要学习的内容:
DEBUG_EVENT 结构中的联合体和 dwDebugEventCode 的关系:
在get_debug_event
函数中实现了当 dwDebugEventCode 等于 0x1
的情况。
值得说的是:在DEBUG_EVENT 联合体中 Exception.ExceptionRecord.ExceptionCode
又很多取值,有一个是EXCEPTION_GUARD_PAGE
,但这个取值在 微软学习手册里没有找到,在VS中的minwinbase.h
头文件中有定义
#define EXCEPTION_GUARD_PAGE STATUS_GUARD_PAGE_VIOLATION
软断点(INT3)
这个节坑太多,原文中显示源代码和运行的都不一样。需要看源代码文件去慢慢领悟。
还有就是迁移到python3有很多需要注意的地方,例如语法不同、ctypes调用API的方法不同
- my_debugger_define.py
import ctypes
BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF
# snapshot
TH32CS_SNAPTHREAD = 0x00000004
# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010
# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1
# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004
# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE = 0x00000040
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
class EXCEPTION_RECORD(ctypes.Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode", DWORD),
("ExceptionFlags", DWORD),
("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),
("ExceptionAddress", PVOID),
("NumberParameters", DWORD),
("ExceptionInformation", UINT_PTR * 15),
]
class EXCEPTION_DEBUG_INFO(ctypes.Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance", DWORD),
]
class DEBUG_EVENT_UNION(ctypes.Union):
_fields_ = [
("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),
]
class DEBUG_EVENT(ctypes.Structure):
_fields_ = [
("dwDebugEventCode", DWORD),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
("u", DEBUG_EVENT_UNION),
]
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ThreadID", DWORD),
("th32OwnerProcessID", DWORD),
("tpBasePri", DWORD),
("tpDeltaPri", DWORD),
("dwFlags", DWORD),
]
class FLOATING_SAVE_AREA(ctypes.Structure):
_fields_ = [
("ControlWord", DWORD),
("StatusWord", DWORD),
("TagWord", DWORD),
("ErrorOffset", DWORD),
("ErrorSelector", DWORD),
("DataOffset", DWORD),
("DataSelector", DWORD),
("RegisterArea", BYTE * 80),
("Cr0NpxState", DWORD),
]
# class CONTEXT(ctypes.Structure):
# _fields_ = [
# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]
DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong
class M128A(ctypes.Structure):
_fields_ = [
("Low", ULONGLONG),
("High", LONGLONG)
]
class CONTEXT_UNION(ctypes.Union):
_fields_ = [
("S", DWORD * 32)
]
class CONTEXT(ctypes.Structure):
_fields_ = [
("P1Home", DWORD64),
("P2Home", DWORD64),
("P3Home", DWORD64),
("P4Home", DWORD64),
("P5Home", DWORD64),
("P6Home", DWORD64),
("ContextFlags", DWORD),
("MxCsr", DWORD),
("SegCs", WORD),
("SegDs", WORD),
("SegEs", WORD),
("SegFs", WORD),
("SegGs", WORD),
("SegSs", WORD),
("EFlags", DWORD),
("Dr0", DWORD),
("Dr1", DWORD),
("Dr2", DWORD),
("Dr3", DWORD),
("Dr6", DWORD),
("Dr7", DWORD),
("Rax", DWORD64),
("Rcx", DWORD64),
("Rdx", DWORD64),
("Rbx", DWORD64),
("Rsp", DWORD64),
("Rbp", DWORD64),
("Rsi", DWORD64),
("Rdi", DWORD64),
("R8", DWORD64),
("R9", DWORD64),
("R10", DWORD64),
("R11", DWORD64),
("R12", DWORD64),
("R13", DWORD64),
("R14", DWORD64),
("R15", DWORD64),
("Rip", DWORD64),
("DUMMYUNIONNAME",CONTEXT_UNION),
("VectorRegister", M128A * 26),
("VectorControl", DWORD64),
("DebugControl", DWORD64),
("LastBranchToRip", DWORD64),
("LastBranchFromRip", DWORD64),
("LastExceptionToRip", DWORD64),
("LastExceptionFromRip", DWORD64)
]
LPCONTEXT = ctypes.POINTER(CONTEXT)
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
# 实现调试事件处理功能
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
# 软断点
self.breakpoints = {}
self.first_breakpoint = True
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
# 保存上面创建进程的句柄,以供后续进程访问使用
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
# 打开 pid 进程,获得该进程的所有权限
# 返回该进程句柄
def open_process(self, pid):
return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
def attach(self, pid):
self.h_process = self.open_process(pid)
# DebugActiveProcess 附加进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
# self.run()
else:
print("[*] Unable to attach to the process.")
# 循环调用 WaitForDebugEvent ,等待调试事件发生
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = my_debugger_defines.DEBUG_EVENT()
continue_status = my_debugger_defines.DBG_CONTINUE
# INFINITE:直到有调试事件发生
# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
# 处理指定异常事件
# 获取线程句柄
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
# 打印调试事件类型和线程ID
print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))
# Event Code = 0x1
if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
# 获取异常代码
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
# 异常地址
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
# 内存违法访问
if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
print("Acess Violation Detected.")
# 遇到一个断点
elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
# 内存断点
elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
print("Guard Page Access Detected.")
# 硬件断点
elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
print("Single Stepping.")
# input("press a key to continue...")
# self.debugger_active = False
# 调试事件处理完成后,恢复原执行状态
# continue_status -> DBG_CONTINUE:下一步继续执行
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status
)
def exception_handler_breakpoint(self):
print("[*] Exception address: 0x%08x" % self.exception_address)
# 检查断点是否为我们设置的断点
if not (self.exception_address in self.breakpoints):
# 如果它是第一个Windows驱动的断点
# 就继续执行
if self.first_breakpoint == True:
self.first_breakpoint = False
print("[*] Hit the first breakpoint.")
return my_debugger_defines.DBG_CONTINUE
else:
print("[*] Hit user defined breakpoint.")
# 处理设置的断点
# 先还原原始数据(没设断点之前)
self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])
# 获取新的context
# 将EIP重置回原始字节处
self.context = self.get_thread_context(h_thread=self.h_thread)
if self.context:
self.context.Rip -= 1
# 用新的RIP值设置线程的上下文记录
kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))
self.debugger_active = True
else:
self.debugger_active = False
continue_status = my_debugger_defines.DBG_CONTINUE
return continue_status
# DebugActiveProcessStop:去除附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
return True
else:
print("There was an error")
return False
# 通过线程ID打开线程,获取线程句柄
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print("[*] Could not obtain a valid thread handle.")
return False
# 获取属于pid进程的线程的ID
def enumerate_threads(self):
# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
thread_entry = my_debugger_defines.THREADENTRY32()
thread_list = []
# CreateToolhelp32Snapshot返回一个快照句柄
# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
# 需要一一对比
snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = ctypes.sizeof(thread_entry)
# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
while success:
# 对比该线程的拥有者是不是 pid
if thread_entry.th32OwnerProcessID == self.pid:
# 把该进程的线程ID放到列表
thread_list.append(thread_entry.th32ThreadID)
# 遍历后面所有线程
success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
# 通过线程ID,获取线程上下文context
def get_thread_context(self, thread_id=None, h_thread=None):
# 线程上下文结构体为 CONTEXT
context = my_debugger_defines.CONTEXT()
context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS
# 通过线程ID,获取线程句柄
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
# 通过线程句柄,线程上下文
kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]
kernel32.GetThreadContext.restype = ctypes.c_bool
if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
# return my_debugger_defines.CONTEXT()
return False
# 读取address处的内容length个字节
# 返回读取到的内容
def read_process_memory(self, address, length):
data = b""
read_buf = ctypes.create_string_buffer(length)
count = ctypes.c_ulong(0)
kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.ReadProcessMemory.restype = ctypes.c_bool
if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):
return False
else:
data += read_buf.raw
return data
# 把data数据写入到address处
def write_process_memory(self, address, data):
count = ctypes.c_ulong(0)
length = len(data)
c_data = ctypes.c_char_p(data[count.value:])
kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.WriteProcessMemory.restype = ctypes.c_bool
if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):
return False
else:
return True
def bp_set(self, address):
print("[*] Setting breakpoint at: 0x%08x" % address)
if not (address in self.breakpoints):
# 设置断点前,先保存之前的数据
old_protect = ctypes.c_ulong(0)
kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]
kernel32.VirtualProtectEx.restype = ctypes.c_bool
kernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))
# 读取原始1个字节
original_byte = self.read_process_memory(address, 1)
if original_byte != False:
# 写入 INT3 opcode
if self.write_process_memory(address, b"\xCC"):
# 在字典中注册断点
self.breakpoints[address] = original_byte
return True
else:
return False
# 解析模块中函数的地址
def func_resolve(self, dll, function):
# HMODULE GetModuleHandleW(
# [in, optional] LPCWSTR lpModuleName
# );
kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
kernel32.GetModuleHandleW.restype = ctypes.c_void_p
# FARPROC GetProcAddress(
# [in] HMODULE hModule,
# [in] LPCSTR lpProcName
# );
kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
kernel32.GetProcAddress.restype = ctypes.c_void_p
handle = kernel32.GetModuleHandleW(dll)
address = kernel32.GetProcAddress(handle, function)
kernel32.CloseHandle.argtypes = [ctypes.c_void_p]
kernel32.CloseHandle.restype = ctypes.c_bool
kernel32.CloseHandle(handle)
return address
- my_test.py
import my_debugger
debugger = my_debugger.debugger()
pid = input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
printf_address = debugger.func_resolve("msvcrt.dll", b"printf")
print("[*] Address of printf: 0x%08x" % printf_address)
debugger.bp_set(printf_address)
debugger.run()
debugger.detach()
- printf_loop.py
这个是被调试程序
import ctypes
import time
msvcrt = ctypes.cdll.msvcrt
counter = 0
while True:
msvcrt.printf(b"Loop iteration %d!\n", counter)
time.sleep(2)
counter += 1
运行结果正好符合预期
Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 2 Thread ID: 18880
Event Code: 1 Thread ID: 18880
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 18880
Event Code: 1 Thread ID: 18744
[*] Exception address: 0x7ffe68b88b50
[*] Hit user defined breakpoint.
[*] Finished debugging. Exiting...
硬件断点
这里define修改了CONTEXT结构体,之前没有影响,在硬件断点出现故障了
- my_debugger_define.py
import ctypes
BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF
# snapshot
TH32CS_SNAPTHREAD = 0x00000004
# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010
# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1
# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004
# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE = 0x00000040
# Hardware breakpoint conditions
HW_ACCESS = 0x00000003
HW_EXECUTE = 0x00000000
HW_WRITE = 0x00000001
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
class EXCEPTION_RECORD(ctypes.Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode", DWORD),
("ExceptionFlags", DWORD),
("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),
("ExceptionAddress", PVOID),
("NumberParameters", DWORD),
("ExceptionInformation", UINT_PTR * 15),
]
class EXCEPTION_DEBUG_INFO(ctypes.Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance", DWORD),
]
class DEBUG_EVENT_UNION(ctypes.Union):
_fields_ = [
("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),
]
class DEBUG_EVENT(ctypes.Structure):
_fields_ = [
("dwDebugEventCode", DWORD),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
("u", DEBUG_EVENT_UNION),
]
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ThreadID", DWORD),
("th32OwnerProcessID", DWORD),
("tpBasePri", DWORD),
("tpDeltaPri", DWORD),
("dwFlags", DWORD),
]
class FLOATING_SAVE_AREA(ctypes.Structure):
_fields_ = [
("ControlWord", DWORD),
("StatusWord", DWORD),
("TagWord", DWORD),
("ErrorOffset", DWORD),
("ErrorSelector", DWORD),
("DataOffset", DWORD),
("DataSelector", DWORD),
("RegisterArea", BYTE * 80),
("Cr0NpxState", DWORD),
]
# class CONTEXT(ctypes.Structure):
# _fields_ = [
# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]
DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong
class M128A(ctypes.Structure):
_fields_ = [
("Low", ULONGLONG),
("High", LONGLONG)
]
class CONTEXT_UNION(ctypes.Union):
_fields_ = [
("S", DWORD * 32)
]
class CONTEXT(ctypes.Structure):
_fields_ = [
("P1Home", DWORD64),
("P2Home", DWORD64),
("P3Home", DWORD64),
("P4Home", DWORD64),
("P5Home", DWORD64),
("P6Home", DWORD64),
("ContextFlags", DWORD),
("MxCsr", DWORD),
("SegCs", WORD),
("SegDs", WORD),
("SegEs", WORD),
("SegFs", WORD),
("SegGs", WORD),
("SegSs", WORD),
("EFlags", DWORD),
("Dr0", DWORD64),
("Dr1", DWORD64),
("Dr2", DWORD64),
("Dr3", DWORD64),
("Dr6", DWORD64),
("Dr7", DWORD64),
("Rax", DWORD64),
("Rcx", DWORD64),
("Rdx", DWORD64),
("Rbx", DWORD64),
("Rsp", DWORD64),
("Rbp", DWORD64),
("Rsi", DWORD64),
("Rdi", DWORD64),
("R8", DWORD64),
("R9", DWORD64),
("R10", DWORD64),
("R11", DWORD64),
("R12", DWORD64),
("R13", DWORD64),
("R14", DWORD64),
("R15", DWORD64),
("Rip", DWORD64),
("DUMMYUNIONNAME",CONTEXT_UNION),
("VectorRegister", M128A * 26),
("VectorControl", DWORD64),
("DebugControl", DWORD64),
("LastBranchToRip", DWORD64),
("LastBranchFromRip", DWORD64),
("LastExceptionToRip", DWORD64),
("LastExceptionFromRip", DWORD64)
]
LPCONTEXT = ctypes.POINTER(CONTEXT)
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
# 实现调试事件处理功能
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
# 软断点
self.breakpoints = {}
self.first_breakpoint = True
# 硬中断
self.hardware_breakpoints = {}
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
# 保存上面创建进程的句柄,以供后续进程访问使用
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
# 打开 pid 进程,获得该进程的所有权限
# 返回该进程句柄
def open_process(self, pid):
return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
def attach(self, pid):
self.h_process = self.open_process(pid)
# DebugActiveProcess 附加进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
# self.run()
else:
print("[*] Unable to attach to the process.")
# 循环调用 WaitForDebugEvent ,等待调试事件发生
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = my_debugger_defines.DEBUG_EVENT()
continue_status = my_debugger_defines.DBG_CONTINUE
# INFINITE:直到有调试事件发生
# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
# 处理指定异常事件
# 获取线程句柄
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
# 打印调试事件类型和线程ID
print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))
# Event Code = 0x1
if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
# 获取异常代码
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
# print("0x%08x" % exception)
# 异常地址
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
# 内存违法访问
if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
print("Acess Violation Detected.")
# 遇到一个断点
elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
# 内存断点
elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
print("Guard Page Access Detected.")
# 硬件断点
elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
print("Single Stepping.")
self.exception_handler_single_step()
# input("press a key to continue...")
# self.debugger_active = False
# 调试事件处理完成后,恢复原执行状态
# continue_status -> DBG_CONTINUE:下一步继续执行
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status
)
def exception_handler_breakpoint(self):
print("[*] Exception address: 0x%08x" % self.exception_address)
# 检查断点是否为我们设置的断点
if not (self.exception_address in self.breakpoints):
# 如果它是第一个Windows驱动的断点
# 就继续执行
if self.first_breakpoint == True:
self.first_breakpoint = False
print("[*] Hit the first breakpoint.")
return my_debugger_defines.DBG_CONTINUE
else:
print("[*] Hit user defined breakpoint.")
# 处理设置的断点
# 先还原原始数据(没设断点之前)
self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])
# 获取新的context
# 将EIP重置回原始字节处
self.context = self.get_thread_context(h_thread=self.h_thread)
if self.context:
self.context.Rip -= 1
# 用新的RIP值设置线程的上下文记录
kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))
self.debugger_active = True
else:
self.debugger_active = False
continue_status = my_debugger_defines.DBG_CONTINUE
return continue_status
def exception_handler_single_step(self):
if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):
slot = 0
elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):
slot = 1
elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):
slot = 2
elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):
slot = 3
else:
continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED
# 从断点字典中删除断点
if self.bp_del_hw(slot):
continue_status = my_debugger_defines.DBG_CONTINUE
print("[*] Hardware breakpoint removed.")
return continue_status
def bp_del_hw(self, slot):
for thread_id in self.enumerate_threads():
context = self.get_thread_context(thread_id=thread_id)
# 给所有线程删除断点
context.Dr7 &= ~(1 << (slot * 2))
# 将断点地址清零
if slot == 0:
context.Dr0 = 0x0000000000000000
elif slot == 1:
context.Dr1 = 0x0000000000000000
elif slot == 2:
context.Dr2 = 0x0000000000000000
elif slot == 3:
context.Dr3 = 0x0000000000000000
# 移除Dr7中的触发断点标志位
context.Dr7 &= ~(3 << ((slot * 4) + 16))
# 移除断点长度标志位
context.Dr7 &= ~(3 << ((slot * 4) + 18))
# 提交移除断点后的线程context
h_thread = self.open_thread(thread_id)
kernel32.SetThreadContext(h_thread, ctypes.byref(context))
# 把该断点从字典中移除
del self.hardware_breakpoints[slot]
return True
# DebugActiveProcessStop:去除附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
return True
else:
print("There was an error")
return False
# 通过线程ID打开线程,获取线程句柄
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print("[*] Could not obtain a valid thread handle.")
return False
# 获取属于pid进程的线程的ID
def enumerate_threads(self):
# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
thread_entry = my_debugger_defines.THREADENTRY32()
thread_list = []
# CreateToolhelp32Snapshot返回一个快照句柄
# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
# 需要一一对比
snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = ctypes.sizeof(thread_entry)
# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
while success:
# 对比该线程的拥有者是不是 pid
if thread_entry.th32OwnerProcessID == self.pid:
# 把该进程的线程ID放到列表
thread_list.append(thread_entry.th32ThreadID)
# 遍历后面所有线程
success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
# 通过线程ID,获取线程上下文context
def get_thread_context(self, thread_id=None, h_thread=None):
# 线程上下文结构体为 CONTEXT
context = my_debugger_defines.CONTEXT()
context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS
# 通过线程ID,获取线程句柄
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
# 通过线程句柄,线程上下文
kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]
kernel32.GetThreadContext.restype = ctypes.c_bool
if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
# return my_debugger_defines.CONTEXT()
return False
# 读取address处的内容length个字节
# 返回读取到的内容
def read_process_memory(self, address, length):
data = b""
read_buf = ctypes.create_string_buffer(length)
count = ctypes.c_ulong(0)
kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.ReadProcessMemory.restype = ctypes.c_bool
if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):
return False
else:
data += read_buf.raw
return data
# 把data数据写入到address处
def write_process_memory(self, address, data):
count = ctypes.c_ulong(0)
length = len(data)
c_data = ctypes.c_char_p(data[count.value:])
kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.WriteProcessMemory.restype = ctypes.c_bool
if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):
return False
else:
return True
def bp_set(self, address):
print("[*] Setting breakpoint at: 0x%08x" % address)
if not (address in self.breakpoints):
# 设置断点前,先保存之前的数据
old_protect = ctypes.c_ulong(0)
kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]
kernel32.VirtualProtectEx.restype = ctypes.c_bool
kernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))
# 读取原始1个字节
original_byte = self.read_process_memory(address, 1)
if original_byte != False:
# 写入 INT3 opcode
if self.write_process_memory(address, b"\xCC"):
# 在字典中注册断点
self.breakpoints[address] = original_byte
return True
else:
return False
# 硬件断点
def bp_set_hw(self, address, length, condition):
# 检测硬件断点长度是否有效
if length not in (1, 2, 4):
return False
else:
length -= 1
# 检测硬件断点的触发调试是否有效
if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):
return False
# 检测是否存在空置的调试器槽
if not (0 in self.hardware_breakpoints):
available = 0
elif not (1 in self.hardware_breakpoints):
available = 1
elif not (2 in self.hardware_breakpoints):
available = 2
elif not (3 in self.hardware_breakpoints):
available = 3
else:
return False
# 在每个线程环境下设置调试寄存器
for thread_id in self.enumerate_threads():
context = self.get_thread_context(thread_id=thread_id)
# 设置DR7中对应的标志位,激活硬件断点
context.Dr7 |= 1 << (available * 2)
# 在空闲的DR0 ~ 3寄存器中写入断点地址
if available == 0:
context.Dr0 = address
# print("Dr0 0x%08x" % context.Dr0)
elif available == 1:
context.Dr1 = address
# print("Dr1 0x%08x" % context.Dr1)
elif available == 2:
context.Dr2 = address
# print("Dr2 0x%08x" % context.Dr2)
elif available == 3:
context.Dr3 = address
# print("Dr3 0x%08x" % context.Dr3)
# 设置触发条件
context.Dr7 |= condition << ((available * 4) + 16)
# 设置触发长度
context.Dr7 |= length << ((available * 4) + 18)
# 提交改动后的context
h_thread = self.open_thread(thread_id)
kernel32.SetThreadContext(h_thread, ctypes.byref(context))
# 更新内部硬件断点列表
self.hardware_breakpoints[available] = (address, length, condition)
return True
# 解析模块中函数的地址
def func_resolve(self, dll, function):
# HMODULE GetModuleHandleW(
# [in, optional] LPCWSTR lpModuleName
# );
kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
kernel32.GetModuleHandleW.restype = ctypes.c_void_p
# FARPROC GetProcAddress(
# [in] HMODULE hModule,
# [in] LPCSTR lpProcName
# );
kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
kernel32.GetProcAddress.restype = ctypes.c_void_p
handle = kernel32.GetModuleHandleW(dll)
address = kernel32.GetProcAddress(handle, function)
kernel32.CloseHandle.argtypes = [ctypes.c_void_p]
kernel32.CloseHandle.restype = ctypes.c_bool
kernel32.CloseHandle(handle)
return address
- printf_loop.py
import ctypes
import time
msvcrt = ctypes.cdll.msvcrt
counter = 0
while True:
msvcrt.printf(b"Loop iteration %d!\n", counter)
time.sleep(2)
counter += 1
- my_test.py
import my_debugger
from my_debugger_defines import *
debugger = my_debugger.debugger()
pid = input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
printf_address = debugger.func_resolve("msvcrt.dll", b"printf")
print("[*] Address of printf: 0x%08x" % printf_address)
debugger.bp_set_hw(printf_address, 1, HW_EXECUTE)
debugger.run()
debugger.detach()
运行结果(这里显示了一些调试的print)
Event Code: 2 Thread ID: 9928
Event Code: 1 Thread ID: 9928
0x80000003
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 9928
Event Code: 1 Thread ID: 18456
0x80000004
Single Stepping.
[*] Hardware breakpoint removed.
Event Code: 2 Thread ID: 21584
内存断点
内存断点本质上不是真正的断点。当一个调试器设置一个内存断点时,调试器实质上所做的是改变一个内存区域或–个内存页的访问权限。内存页是操作系统可以一次处理的最小内存块。操作系统每分配一个内存页时,都会为这个内存页设置访问权限,该权限决定了这个内存页被访问的方式。以下是几个不同的内存页访问权限的例子:
- 页可执行:允许进程执行页上的代码,但是如果进程试图读写这个页将导致非法内存操作异常;
- 页可读:进程只能从这个内存页中读取数据;任何企图写入数据或者执行代码的操作会导致非法内存操作异常;
- 页可写:允许进程在这个内存页上写入数据;
- 保护页:对保护页任何类型的访问将导致一次性异常, 之后这个内存页会恢复到之前的状态。
主要过程
VirtualQueryEx :找内存断点区域的内存页
VirtualProtectEx :修改内存页属性为保护页
- my_debugger_define.py
import ctypes
BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
LPVOID = ctypes.c_void_p
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR = ctypes.c_ulong
SIZE_T = ctypes.c_ulong
DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF
# snapshot
TH32CS_SNAPTHREAD = 0x00000004
# Context flags for GetThreadContext()
CONTEXT_FULL = 0x00010007
CONTEXT_DEBUG_REGISTERS = 0x00010010
# Debug event constants
EXCEPTION_DEBUG_EVENT = 0x1
# debug exception codes.
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004
# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE = 0x00000040
# Hardware breakpoint conditions
HW_ACCESS = 0x00000003
HW_EXECUTE = 0x00000000
HW_WRITE = 0x00000001
# Memory page permissions, used by VirtualProtect()
PAGE_NOACCESS = 0x00000001
PAGE_READONLY = 0x00000002
PAGE_READWRITE = 0x00000004
PAGE_WRITECOPY = 0x00000008
PAGE_EXECUTE = 0x00000010
PAGE_EXECUTE_READ = 0x00000020
PAGE_EXECUTE_READWRITE = 0x00000040
PAGE_EXECUTE_WRITECOPY = 0x00000080
PAGE_GUARD = 0x00000100
PAGE_NOCACHE = 0x00000200
PAGE_WRITECOMBINE = 0x00000400
# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {
# DWORD cb;
# LPTSTR lpReserved;
# LPTSTR lpDesktop;
# LPTSTR lpTitle;
# DWORD dwX;
# DWORD dwY;
# DWORD dwXSize;
# DWORD dwYSize;
# DWORD dwXCountChars;
# DWORD dwYCountChars;
# DWORD dwFillAttribute;
# DWORD dwFlags;
# WORD wShowWindow;
# WORD cbReserved2;
# LPBYTE lpReserved2;
# HANDLE hStdInput;
# HANDLE hStdOutput;
# HANDLE hStdError;
# } STARTUPINFO, *LPSTARTUPINFO;
class STARTUPINFO(ctypes.Structure):
_fields_ = [
("cb", DWORD),
("lpReserved", LPTSTR),
("lpDesktop", LPTSTR),
("lpTitle", LPTSTR),
("dwX", DWORD),
("dwY", DWORD),
("dwXSize", DWORD),
("dwYSize", DWORD),
("dwXcountChars", DWORD),
("dwYcountChars", DWORD),
("dwFillAttribute", DWORD),
("dwFlags", DWORD),
("wShowWindow", WORD),
("cbReserved2", WORD),
("lpReserved2", LPBYTE),
("hStdIput", HANDLE),
("hStdOutput", HANDLE),
("hStdError", HANDLE)
]
# typedef struct _PROCESS_INFORMATION {
# HANDLE hProcess;
# HANDLE hThread;
# DWORD dwProcessId;
# DWORD dwThreadId;
# } PROCESS_INFORMATION, *LPPROCESS_INFORMATION;
class PROCESS_INFORMATION(ctypes.Structure):
_fields_ = [
("hProcess", HANDLE),
("hThread", HANDLE),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
]
class EXCEPTION_RECORD(ctypes.Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode", DWORD),
("ExceptionFlags", DWORD),
("ExceptionRecord", ctypes.POINTER(EXCEPTION_RECORD)),
("ExceptionAddress", PVOID),
("NumberParameters", DWORD),
("ExceptionInformation", UINT_PTR * 15),
]
class EXCEPTION_DEBUG_INFO(ctypes.Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance", DWORD),
]
class DEBUG_EVENT_UNION(ctypes.Union):
_fields_ = [
("Exception", EXCEPTION_DEBUG_INFO),
# ("CreateThread", CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread", EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess", EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll", LOAD_DLL_DEBUG_INFO),
# ("UnloadDll", UNLOAD_DLL_DEBUG_INFO),
# ("DebugString", OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo", RIP_INFO),
]
class DEBUG_EVENT(ctypes.Structure):
_fields_ = [
("dwDebugEventCode", DWORD),
("dwProcessId", DWORD),
("dwThreadId", DWORD),
("u", DEBUG_EVENT_UNION),
]
class THREADENTRY32(ctypes.Structure):
_fields_ = [
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ThreadID", DWORD),
("th32OwnerProcessID", DWORD),
("tpBasePri", DWORD),
("tpDeltaPri", DWORD),
("dwFlags", DWORD),
]
class FLOATING_SAVE_AREA(ctypes.Structure):
_fields_ = [
("ControlWord", DWORD),
("StatusWord", DWORD),
("TagWord", DWORD),
("ErrorOffset", DWORD),
("ErrorSelector", DWORD),
("DataOffset", DWORD),
("DataSelector", DWORD),
("RegisterArea", BYTE * 80),
("Cr0NpxState", DWORD),
]
# class CONTEXT(ctypes.Structure):
# _fields_ = [
# ("ContextFlags", DWORD),
# ("Dr0", DWORD),
# ("Dr1", DWORD),
# ("Dr2", DWORD),
# ("Dr3", DWORD),
# ("Dr6", DWORD),
# ("Dr7", DWORD),
# ("FloatSave", FLOATING_SAVE_AREA),
# ("SegGs", DWORD),
# ("SegFs", DWORD),
# ("SegEs", DWORD),
# ("SegDs", DWORD),
# ("Edi", DWORD),
# ("Esi", DWORD),
# ("Ebx", DWORD),
# ("Edx", DWORD),
# ("Ecx", DWORD),
# ("Eax", DWORD),
# ("Ebp", DWORD),
# ("Eip", DWORD),
# ("SegCs", DWORD),
# ("EFlags", DWORD),
# ("Esp", DWORD),
# ("SegSs", DWORD),
# ("ExtendedRegisters", BYTE * 512),
# ]
DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong
class M128A(ctypes.Structure):
_fields_ = [
("Low", ULONGLONG),
("High", LONGLONG)
]
# class DUMMYUNIONNAME(Union):
# _fields_=[
# ("FltSave", XMM_SAVE_AREA32),
# ("DummyStruct", DUMMYSTRUCTNAME)
# ]
# class DUMMYSTRUCTNAME(Structure):
# _fields_=[
# ("Header", M128A * 2),
# ("Legacy", M128A * 8),
# ("Xmm0", M128A),
# ("Xmm1", M128A),
# ("Xmm2", M128A),
# ("Xmm3", M128A),
# ("Xmm4", M128A),
# ("Xmm5", M128A),
# ("Xmm6", M128A),
# ("Xmm7", M128A),
# ("Xmm8", M128A),
# ("Xmm9", M128A),
# ("Xmm10", M128A),
# ("Xmm11", M128A),
# ("Xmm12", M128A),
# ("Xmm13", M128A),
# ("Xmm14", M128A),
# ("Xmm15", M128A)
# ]
# class XMM_SAVE_AREA32(Structure):
# _pack_ = 1
# _fields_ = [
# ('ControlWord', WORD),
# ('StatusWord', WORD),
# ('TagWord', BYTE),
# ('Reserved1', BYTE),
# ('ErrorOpcode', WORD),
# ('ErrorOffset', DWORD),
# ('ErrorSelector', WORD),
# ('Reserved2', WORD),
# ('DataOffset', DWORD),
# ('DataSelector', WORD),
# ('Reserved3', WORD),
# ('MxCsr', DWORD),
# ('MxCsr_Mask', DWORD),
# ('FloatRegisters', M128A * 8),
# ('XmmRegisters', M128A * 16),
# ('Reserved4', BYTE * 96)
# ]
class CONTEXT_UNION(ctypes.Union):
_fields_ = [
("S", DWORD * 32)
]
class CONTEXT(ctypes.Structure):
_fields_ = [
("P1Home", DWORD64),
("P2Home", DWORD64),
("P3Home", DWORD64),
("P4Home", DWORD64),
("P5Home", DWORD64),
("P6Home", DWORD64),
("ContextFlags", DWORD),
("MxCsr", DWORD),
("SegCs", WORD),
("SegDs", WORD),
("SegEs", WORD),
("SegFs", WORD),
("SegGs", WORD),
("SegSs", WORD),
("EFlags", DWORD),
("Dr0", DWORD64),
("Dr1", DWORD64),
("Dr2", DWORD64),
("Dr3", DWORD64),
("Dr6", DWORD64),
("Dr7", DWORD64),
("Rax", DWORD64),
("Rcx", DWORD64),
("Rdx", DWORD64),
("Rbx", DWORD64),
("Rsp", DWORD64),
("Rbp", DWORD64),
("Rsi", DWORD64),
("Rdi", DWORD64),
("R8", DWORD64),
("R9", DWORD64),
("R10", DWORD64),
("R11", DWORD64),
("R12", DWORD64),
("R13", DWORD64),
("R14", DWORD64),
("R15", DWORD64),
("Rip", DWORD64),
("DUMMYUNIONNAME",CONTEXT_UNION),
("VectorRegister", M128A * 26),
("VectorControl", DWORD64),
("DebugControl", DWORD64),
("LastBranchToRip", DWORD64),
("LastBranchFromRip", DWORD64),
("LastExceptionToRip", DWORD64),
("LastExceptionFromRip", DWORD64)
]
LPCONTEXT = ctypes.POINTER(CONTEXT)
class PROC_STRUCT(ctypes.Structure):
_fields_ = [
("wProcessorArchitecture", WORD),
("wReserved", WORD),
]
class SYSTEM_INFO_UNION(ctypes.Union):
_fields_ = [
("dwOemId", DWORD),
("sProcStruc", PROC_STRUCT),
]
class SYSTEM_INFO(ctypes.Structure):
_fields_ = [
("uSysInfo", SYSTEM_INFO_UNION),
("dwPageSize", DWORD),
("lpMinimumApplicationAddress", LPVOID),
("lpMaximumApplicationAddress", LPVOID),
("dwActiveProcessorMask", DWORD),
("dwNumberOfProcessors", DWORD),
("dwProcessorType", DWORD),
("dwAllocationGranularity", DWORD),
("wProcessorLevel", WORD),
("wProcessorRevision", WORD),
]
# class MEMORY_BASIC_INFORMATION(ctypes.Structure):
# _fields_ = [
# ("BaseAddress", PVOID),
# ("AllocationBase", PVOID),
# ("AllocationProtect", DWORD),
# ("PartitionId", WORD),
# ("RegionSize", SIZE_T),
# ("State", DWORD),
# ("Protect", DWORD),
# ("Type", DWORD),
# ]
class MEMORY_BASIC_INFORMATION (ctypes.Structure):
_fields_ = [
("BaseAddress", ULONGLONG),
("AllocationBase", ULONGLONG),
("AllocationProtect", DWORD),
("__alignment1", DWORD),
("RegionSize", ULONGLONG),
("State", DWORD),
("Protect", DWORD),
("Type", DWORD),
("__alignment2", DWORD)
]
PMEMORY_BASIC_INFORMATION = ctypes.POINTER(MEMORY_BASIC_INFORMATION)
- my_debugger.py
import ctypes
import my_debugger_defines
kernel32 = ctypes.windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
# 实现调试事件处理功能
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
# 软断点
self.breakpoints = {}
self.first_breakpoint = True
# 硬件断点
self.hardware_breakpoints = {}
# 内存断点
self.guarded_pages = []
self.memory_breakpoints = {}
# 获取当前系统默认的内存页的大小设定
system_info = my_debugger_defines.SYSTEM_INFO()
kernel32.GetSystemInfo(ctypes.byref(system_info))
self.page_size = system_info.dwPageSize
def load(self, path_to_exe):
creation_flags = my_debugger_defines.DEBUG_PROCESS
startupinfo = my_debugger_defines.STARTUPINFO()
process_information = my_debugger_defines.PROCESS_INFORMATION()
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = ctypes.sizeof(startupinfo)
if kernel32.CreateProcessW(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
ctypes.byref(startupinfo),
ctypes.byref(process_information)):
print("[*] We have successfully lanuched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
# 保存上面创建进程的句柄,以供后续进程访问使用
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
# 打开 pid 进程,获得该进程的所有权限
# 返回该进程句柄
def open_process(self, pid):
return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
def attach(self, pid):
self.h_process = self.open_process(pid)
# DebugActiveProcess 附加进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
# self.run()
else:
print("[*] Unable to attach to the process.")
# 循环调用 WaitForDebugEvent ,等待调试事件发生
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = my_debugger_defines.DEBUG_EVENT()
continue_status = my_debugger_defines.DBG_CONTINUE
# INFINITE:直到有调试事件发生
# 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
# 处理指定异常事件
# 获取线程句柄
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
# 打印调试事件类型和线程ID
print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))
# Event Code = 0x1
if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
# 获取异常代码
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
print("0x%08x" % exception)
# 异常地址
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
# 内存违法访问
if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
print("Acess Violation Detected.")
# 遇到一个断点
elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
# 内存断点
elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
print("Guard Page Access Detected.")
# 硬件断点
elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
print("Single Stepping.")
self.exception_handler_single_step()
# input("press a key to continue...")
# self.debugger_active = False
# 调试事件处理完成后,恢复原执行状态
# continue_status -> DBG_CONTINUE:下一步继续执行
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status
)
def exception_handler_breakpoint(self):
print("[*] Exception address: 0x%08x" % self.exception_address)
# 检查断点是否为我们设置的断点
if not (self.exception_address in self.breakpoints):
# 如果它是第一个Windows驱动的断点
# 就继续执行
if self.first_breakpoint == True:
self.first_breakpoint = False
print("[*] Hit the first breakpoint.")
return my_debugger_defines.DBG_CONTINUE
else:
print("[*] Hit user defined breakpoint.")
# 处理设置的断点
# 先还原原始数据(没设断点之前)
self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])
# 获取新的context
# 将EIP重置回原始字节处
self.context = self.get_thread_context(h_thread=self.h_thread)
if self.context:
self.context.Rip -= 1
# 用新的RIP值设置线程的上下文记录
kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))
self.debugger_active = True
else:
self.debugger_active = False
continue_status = my_debugger_defines.DBG_CONTINUE
return continue_status
def exception_handler_single_step(self):
if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):
slot = 0
elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):
slot = 1
elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):
slot = 2
elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):
slot = 3
else:
continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED
# 从断点字典中删除断点
if self.bp_del_hw(slot):
continue_status = my_debugger_defines.DBG_CONTINUE
print("[*] Hardware breakpoint removed.")
return continue_status
def bp_del_hw(self, slot):
for thread_id in self.enumerate_threads():
context = self.get_thread_context(thread_id=thread_id)
# 给所有线程删除断点
context.Dr7 &= ~(1 << (slot * 2))
# 将断点地址清零
if slot == 0:
context.Dr0 = 0x0000000000000000
elif slot == 1:
context.Dr1 = 0x0000000000000000
elif slot == 2:
context.Dr2 = 0x0000000000000000
elif slot == 3:
context.Dr3 = 0x0000000000000000
# 移除Dr7中的触发断点标志位
context.Dr7 &= ~(3 << ((slot * 4) + 16))
# 移除断点长度标志位
context.Dr7 &= ~(3 << ((slot * 4) + 18))
# 提交移除断点后的线程context
h_thread = self.open_thread(thread_id)
kernel32.SetThreadContext(h_thread, ctypes.byref(context))
# 把该断点从字典中移除
del self.hardware_breakpoints[slot]
return True
# DebugActiveProcessStop:去除附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
return True
else:
print("There was an error")
return False
# 通过线程ID打开线程,获取线程句柄
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print("[*] Could not obtain a valid thread handle.")
return False
# 获取属于pid进程的线程的ID
def enumerate_threads(self):
# THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
thread_entry = my_debugger_defines.THREADENTRY32()
thread_list = []
# CreateToolhelp32Snapshot返回一个快照句柄
# TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
# pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
# 需要一一对比
snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = ctypes.sizeof(thread_entry)
# 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
while success:
# 对比该线程的拥有者是不是 pid
if thread_entry.th32OwnerProcessID == self.pid:
# 把该进程的线程ID放到列表
thread_list.append(thread_entry.th32ThreadID)
# 遍历后面所有线程
success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
# 通过线程ID,获取线程上下文context
def get_thread_context(self, thread_id=None, h_thread=None):
# 线程上下文结构体为 CONTEXT
context = my_debugger_defines.CONTEXT()
context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS
# 通过线程ID,获取线程句柄
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
# 通过线程句柄,线程上下文
kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]
kernel32.GetThreadContext.restype = ctypes.c_bool
if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
# return my_debugger_defines.CONTEXT()
return False
# 读取address处的内容length个字节
# 返回读取到的内容
def read_process_memory(self, address, length):
data = b""
read_buf = ctypes.create_string_buffer(length)
count = ctypes.c_ulong(0)
kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.ReadProcessMemory.restype = ctypes.c_bool
if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):
return False
else:
data += read_buf.raw
return data
# 把data数据写入到address处
def write_process_memory(self, address, data):
count = ctypes.c_ulong(0)
length = len(data)
c_data = ctypes.c_char_p(data[count.value:])
kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.WriteProcessMemory.restype = ctypes.c_bool
if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):
return False
else:
return True
def bp_set(self, address):
print("[*] Setting breakpoint at: 0x%08x" % address)
if not (address in self.breakpoints):
# 设置断点前,先保存之前的数据
old_protect = ctypes.c_ulong(0)
kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]
kernel32.VirtualProtectEx.restype = ctypes.c_bool
kernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))
# 读取原始1个字节
original_byte = self.read_process_memory(address, 1)
if original_byte != False:
# 写入 INT3 opcode
if self.write_process_memory(address, b"\xCC"):
# 在字典中注册断点
self.breakpoints[address] = original_byte
return True
else:
return False
# 硬件断点
def bp_set_hw(self, address, length, condition):
# 检测硬件断点长度是否有效
if length not in (1, 2, 4):
return False
else:
length -= 1
# 检测硬件断点的触发调试是否有效
if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):
return False
# 检测是否存在空置的调试器槽
if not (0 in self.hardware_breakpoints):
available = 0
elif not (1 in self.hardware_breakpoints):
available = 1
elif not (2 in self.hardware_breakpoints):
available = 2
elif not (3 in self.hardware_breakpoints):
available = 3
else:
return False
# 在每个线程环境下设置调试寄存器
for thread_id in self.enumerate_threads():
context = self.get_thread_context(thread_id=thread_id)
# 设置DR7中对应的标志位,激活硬件断点
context.Dr7 |= 1 << (available * 2)
# 在空闲的DR0 ~ 3寄存器中写入断点地址
if available == 0:
context.Dr0 = address
# print("Dr0 0x%08x" % context.Dr0)
elif available == 1:
context.Dr1 = address
# print("Dr1 0x%08x" % context.Dr1)
elif available == 2:
context.Dr2 = address
# print("Dr2 0x%08x" % context.Dr2)
elif available == 3:
context.Dr3 = address
# print("Dr3 0x%08x" % context.Dr3)
# 设置触发条件
context.Dr7 |= condition << ((available * 4) + 16)
# 设置触发长度
context.Dr7 |= length << ((available * 4) + 18)
# 提交改动后的context
h_thread = self.open_thread(thread_id)
kernel32.SetThreadContext(h_thread, ctypes.byref(context))
# 更新内部硬件断点列表
self.hardware_breakpoints[available] = (address, length, condition)
return True
def bp_set_mem(self, address, size):
# 调用VirtualQueryEx, 返回一个MEMORY_BASIC_INFORMATION结构体
# 从而获取内存页的起始地址(MEMORY_BASIC_INFORMATION.BaseAddress)
mbi = my_debugger_defines.MEMORY_BASIC_INFORMATION()
kernel32.VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, my_debugger_defines.PMEMORY_BASIC_INFORMATION, ctypes.c_size_t]
kernel32.VirtualQueryEx.restype = ctypes.c_size_t
# print(address)
length = kernel32.VirtualQueryEx(self.h_process, address, ctypes.byref(mbi), ctypes.sizeof(mbi))
print("[*] Error with error code %d." % kernel32.GetLastError())
length_mbi = ctypes.sizeof(mbi)
if length < length_mbi:
return False
current_page = mbi.BaseAddress # 当前页的起始地址
# 把内存断点涉及的内存页都设置成保护页
while current_page <= address + size:
self.guarded_pages.append(current_page)
old_protection = ctypes.c_ulong(0)
# 设置内存页的访问权限
kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
kernel32.VirtualProtectEx.restype = ctypes.c_bool
if not kernel32.VirtualProtectEx(self.h_process, current_page, size,
mbi.Protect | my_debugger_defines.PAGE_GUARD, ctypes.byref(old_protection)):
return False
current_page += self.page_size
# 将该内存断点记录在全局字典中
self.memory_breakpoints[address] = (address, size, mbi)
return True
# 解析模块中函数的地址
def func_resolve(self, dll, function):
# HMODULE GetModuleHandleW(
# [in, optional] LPCWSTR lpModuleName
# );
kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
kernel32.GetModuleHandleW.restype = ctypes.c_void_p
handle = kernel32.GetModuleHandleW(dll)
# FARPROC GetProcAddress(
# [in] HMODULE hModule,
# [in] LPCSTR lpProcName
# );
kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
kernel32.GetProcAddress.restype = ctypes.c_void_p
address = kernel32.GetProcAddress(handle, function)
kernel32.CloseHandle.argtypes = [ctypes.c_void_p]
kernel32.CloseHandle.restype = ctypes.c_bool
kernel32.CloseHandle(handle)
return address
- printf_loop.py
import ctypes
import time
msvcrt = ctypes.cdll.msvcrt
counter = 0
while True:
msvcrt.printf(b"Loop iteration %d!\n", counter)
time.sleep(2)
counter += 1
- my_test.py
import my_debugger
from my_debugger_defines import *
debugger = my_debugger.debugger()
pid = input("Enter the PID of the process to attach to:")
debugger.attach(int(pid))
printf_address = debugger.func_resolve("msvcrt.dll", b"printf")
print("[*] Address of printf: 0x%08x" % printf_address)
print(debugger.bp_set_mem(printf_address, 10))
debugger.run()
debugger.detach()
完结完结!!!