【Gray Hat Python】构建自己的windows调试器

news2024/11/25 21:36:52
  • 环境准备
    windows10 64bit
    python3.7 64bit

打开可执行文件,创建进程

定义变量

以下代码用 ctypes 定义了调用 windows API 需要的结构

  • my_debugger_define.py
import ctypes

WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010

# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

调用CreateProcess

值得注意的是,原版用的是python2,是ascii编码,这里使用的是python3,是Unicode编码,所以使用 CreateProcessW 函数

  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        pass

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)
        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())
  • my_test.py
import my_debugger

debugger = my_debugger.debugger()

debugger.load("11.exe")

打开pid对应的进程

一些需要的结构体

  • my_debugger_define.py
import ctypes

WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002


# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

class EXCEPTION_RECORD(ctypes.Structure):
    pass
EXCEPTION_RECORD._fields_ = [
        ("ExceptionCode",        DWORD),
        ("ExceptionFlags",       DWORD),
        ("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),
        ("ExceptionAddress",     PVOID),
        ("NumberParameters",     DWORD),
        ("ExceptionInformation", UINT_PTR * 15),
        ]

class EXCEPTION_DEBUG_INFO(ctypes.Structure):
    _fields_ = [
        ("ExceptionRecord",    EXCEPTION_RECORD),
        ("dwFirstChance",      DWORD),
        ]
  
class DEBUG_EVENT_UNION(ctypes.Union):
    _fields_ = [
        ("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),
        ] 

class DEBUG_EVENT(ctypes.Structure):
    _fields_ = [
        ("dwDebugEventCode", DWORD),
        ("dwProcessId",      DWORD),
        ("dwThreadId",       DWORD),
        ("u",                DEBUG_EVENT_UNION),
        ]

要实现附加,需要:
打开进程,获得进程的所有权限。
让进程等待调试事件发生。
调试事件发生,要处理调试事件(这里还没有处理调试事件)。
处理完调试事件,下一步继续执行。
解除附加。

如下代码,有比较详细的注释

  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        self.h_process = None
        self.pid = None
        self.debugger_active = False

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)

            # 保存上面创建进程的句柄,以供后续进程访问使用
            self.h_process = self.open_process(process_information.dwProcessId)

        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())

    # 打开 pid 进程,获得该进程的所有权限
    # 返回该进程句柄
    def open_process(self, pid):
        return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
    
    def attach(self, pid):
        self.h_process = self.open_process(pid)
        # DebugActiveProcess 附加进程
        if kernel32.DebugActiveProcess(pid):
            self.debugger_active = True
            self.pid = int(pid)
            self.run()
        else:
            print("[*] Unable to attach to the process.")
    
    # 循环调用 WaitForDebugEvent ,等待调试事件发生
    def run(self):

        while self.debugger_active == True:
            self.get_debug_event()

    def get_debug_event(self):
        debug_event = my_debugger_defines.DEBUG_EVENT()
        continue_status = my_debugger_defines.DBG_CONTINUE
        
        # INFINITE:直到有调试事件发生
        if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
            input("press a key to continue...")
            self.debugger_active = False
            # 调试事件处理完成后,恢复原执行状态
            # continue_status -> DBG_CONTINUE:下一步继续执行
            kernel32.ContinueDebugEvent(
                debug_event.dwProcessId,
                debug_event.dwThreadId,
                continue_status
            )

    # DebugActiveProcessStop:去除附加
    def detach(self):
        if kernel32.DebugActiveProcessStop(self.pid):
            print("[*] Finished debugging. Exiting...")
            return True
        else:
            print("There was an error")
            return False
  • test.py
import my_debugger

debugger = my_debugger.debugger()

pid = input("Enter the PID of the process to attach to:")

debugger.attach(int(pid))

debugger.detach()

获取寄存器状态信息

这里值得说的是,在define中,CONTEXT应该是64位的版本,如果python是32的,CONTEXT就用老的版本。如果在64位的调式器中使用老版本的话就会得到 context全零的情况。

  • debugger_define.py
import ctypes

BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF

# snapshot
TH32CS_SNAPTHREAD   = 0x00000004

# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010


# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

class EXCEPTION_RECORD(ctypes.Structure):
    pass
EXCEPTION_RECORD._fields_ = [
        ("ExceptionCode",        DWORD),
        ("ExceptionFlags",       DWORD),
        ("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),
        ("ExceptionAddress",     PVOID),
        ("NumberParameters",     DWORD),
        ("ExceptionInformation", UINT_PTR * 15),
        ]

class EXCEPTION_DEBUG_INFO(ctypes.Structure):
    _fields_ = [
        ("ExceptionRecord",    EXCEPTION_RECORD),
        ("dwFirstChance",      DWORD),
        ]
  
class DEBUG_EVENT_UNION(ctypes.Union):
    _fields_ = [
        ("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),
        ] 

class DEBUG_EVENT(ctypes.Structure):
    _fields_ = [
        ("dwDebugEventCode", DWORD),
        ("dwProcessId",      DWORD),
        ("dwThreadId",       DWORD),
        ("u",                DEBUG_EVENT_UNION),
        ]
    
class THREADENTRY32(ctypes.Structure):
    _fields_ = [
        ("dwSize",             DWORD),
        ("cntUsage",           DWORD),
        ("th32ThreadID",       DWORD),
        ("th32OwnerProcessID", DWORD),
        ("tpBasePri",          DWORD),
        ("tpDeltaPri",         DWORD),
        ("dwFlags",            DWORD),
    ]

class FLOATING_SAVE_AREA(ctypes.Structure):
   _fields_ = [
   
        ("ControlWord", DWORD),
        ("StatusWord", DWORD),
        ("TagWord", DWORD),
        ("ErrorOffset", DWORD),
        ("ErrorSelector", DWORD),
        ("DataOffset", DWORD),
        ("DataSelector", DWORD),
        ("RegisterArea", BYTE * 80),
        ("Cr0NpxState", DWORD),
]

# class CONTEXT(ctypes.Structure):
#     _fields_ = [
    
#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]

DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong

class M128A(ctypes.Structure):
    _fields_ = [
        ("Low", ULONGLONG),
        ("High", LONGLONG)
    ]

class CONTEXT_UNION(ctypes.Union):
    _fields_ = [
        ("S", DWORD * 32)
    ]
class CONTEXT(ctypes.Structure):
    _fields_ = [
        ("P1Home", DWORD64),
        ("P2Home", DWORD64),
        ("P3Home", DWORD64),
        ("P4Home", DWORD64),
        ("P5Home", DWORD64),
        ("P6Home", DWORD64),
        ("ContextFlags", DWORD),
        ("MxCsr", DWORD),
        ("SegCs", WORD),
        ("SegDs", WORD),
        ("SegEs", WORD),
        ("SegFs", WORD),
        ("SegGs", WORD),
        ("SegSs", WORD),
        ("EFlags", DWORD),
        ("Dr0", DWORD),
        ("Dr1", DWORD),
        ("Dr2", DWORD),
        ("Dr3", DWORD),
        ("Dr6", DWORD),
        ("Dr7", DWORD),
        ("Rax", DWORD64),
        ("Rcx", DWORD64),
        ("Rdx", DWORD64),
        ("Rbx", DWORD64),
        ("Rsp", DWORD64),
        ("Rbp", DWORD64),
        ("Rsi", DWORD64),
        ("Rdi", DWORD64),
        ("R8", DWORD64),
        ("R9", DWORD64),
        ("R10", DWORD64),
        ("R11", DWORD64),
        ("R12", DWORD64),
        ("R13", DWORD64),
        ("R14", DWORD64),
        ("R15", DWORD64),
        ("Rip", DWORD64),
        ("DUMMYUNIONNAME",CONTEXT_UNION),
        ("VectorRegister", M128A * 26),

        ("VectorControl", DWORD64),
        ("DebugControl", DWORD64),
        ("LastBranchToRip", DWORD64),
        ("LastBranchFromRip", DWORD64),
        ("LastExceptionToRip", DWORD64),
        ("LastExceptionFromRip", DWORD64)
]

这里有详细的注释

  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        self.h_process = None
        self.pid = None
        self.debugger_active = False

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)

            # 保存上面创建进程的句柄,以供后续进程访问使用
            self.h_process = self.open_process(process_information.dwProcessId)

        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())

    # 打开 pid 进程,获得该进程的所有权限
    # 返回该进程句柄
    def open_process(self, pid):
        return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
    
    def attach(self, pid):
        self.h_process = self.open_process(pid)
        # DebugActiveProcess 附加进程
        if kernel32.DebugActiveProcess(pid):
            self.debugger_active = True
            self.pid = int(pid)
            # self.run()
        else:
            print("[*] Unable to attach to the process.")
    
    # 循环调用 WaitForDebugEvent ,等待调试事件发生
    def run(self):

        while self.debugger_active == True:
            self.get_debug_event()

    def get_debug_event(self):
        debug_event = my_debugger_defines.DEBUG_EVENT()
        continue_status = my_debugger_defines.DBG_CONTINUE
        
        # INFINITE:直到有调试事件发生
        # 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
        if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
            input("press a key to continue...")
            self.debugger_active = False
            # 调试事件处理完成后,恢复原执行状态
            # continue_status -> DBG_CONTINUE:下一步继续执行
            kernel32.ContinueDebugEvent(
                debug_event.dwProcessId,
                debug_event.dwThreadId,
                continue_status
            )

    # DebugActiveProcessStop:去除附加
    def detach(self):
        if kernel32.DebugActiveProcessStop(self.pid):
            print("[*] Finished debugging. Exiting...")
            return True
        else:
            print("There was an error")
            return False
    
    def open_thread(self, thread_id):
        h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
        if h_thread is not None:
            return h_thread
        else:
            print("[*] Could not obtain a valid thread handle.")
            return False
    
    def enumerate_threads(self):
        # THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
        thread_entry = my_debugger_defines.THREADENTRY32()
        thread_list = []
        # CreateToolhelp32Snapshot返回一个快照句柄
        # TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
        # pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
        # 需要一一对比
        snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
        if snapshot is not None:
            thread_entry.dwSize = ctypes.sizeof(thread_entry)
            # 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
            success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
            while success:
                # 对比该线程的拥有者是不是 pid
                if thread_entry.th32OwnerProcessID == self.pid:
                    # 把该进程的线程放到列表
                    thread_list.append(thread_entry.th32ThreadID)
                # 遍历后面所有线程
                success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
            kernel32.CloseHandle(snapshot)
            return thread_list
        else:
            return False
    
    # 获取线程上下文context
    def get_thread_context(self, thread_id=None, h_thread=None):
        # 线程上下文结构体为 CONTEXT
        context = my_debugger_defines.CONTEXT()
        context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS

        # 获取线程句柄
        h_thread = self.open_thread(thread_id)
        # 或许线程上下文
        if kernel32.GetThreadContext(h_thread, ctypes.byref(context)):
            kernel32.CloseHandle(h_thread)
            return context
        else:
            return False
        
  • my_test.py
import my_debugger
import win32con
import win32api

debugger = my_debugger.debugger()

pid = input("Enter the PID of the process to attach to:")

debugger.attach(int(pid))

list = debugger.enumerate_threads()
for thread in list:
    thread_context = debugger.get_thread_context(thread)
    print("[*] Dumping registers for thread ID: 0x%08x." % thread)
    print("[*] RIP 0x%08x" % thread_context.Rip)
    print("[*] RSP 0x%08x" % thread_context.Rsp)
    print("[*] RBP 0x%08x" % thread_context.Rbp)
    print("[*] RAX 0x%08x" % thread_context.Rax)
    print("[*] RBX 0x%08x" % thread_context.Rbx)
    print("[*] RCX 0x%08x" % thread_context.Rcx)
    print("[*] RDX 0x%08x" % thread_context.Rdx)

debugger.detach()

实现调试事件

  • my_debugger_define.py
import ctypes

BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF

# snapshot
TH32CS_SNAPTHREAD   = 0x00000004

# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010

# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1

# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004


# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

class EXCEPTION_RECORD(ctypes.Structure):
    pass
EXCEPTION_RECORD._fields_ = [
        ("ExceptionCode",        DWORD),
        ("ExceptionFlags",       DWORD),
        ("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),
        ("ExceptionAddress",     PVOID),
        ("NumberParameters",     DWORD),
        ("ExceptionInformation", UINT_PTR * 15),
        ]

class EXCEPTION_DEBUG_INFO(ctypes.Structure):
    _fields_ = [
        ("ExceptionRecord",    EXCEPTION_RECORD),
        ("dwFirstChance",      DWORD),
        ]
  
class DEBUG_EVENT_UNION(ctypes.Union):
    _fields_ = [
        ("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),
        ] 

class DEBUG_EVENT(ctypes.Structure):
    _fields_ = [
        ("dwDebugEventCode", DWORD),
        ("dwProcessId",      DWORD),
        ("dwThreadId",       DWORD),
        ("u",                DEBUG_EVENT_UNION),
        ]
    
class THREADENTRY32(ctypes.Structure):
    _fields_ = [
        ("dwSize",             DWORD),
        ("cntUsage",           DWORD),
        ("th32ThreadID",       DWORD),
        ("th32OwnerProcessID", DWORD),
        ("tpBasePri",          DWORD),
        ("tpDeltaPri",         DWORD),
        ("dwFlags",            DWORD),
    ]

class FLOATING_SAVE_AREA(ctypes.Structure):
   _fields_ = [
   
        ("ControlWord", DWORD),
        ("StatusWord", DWORD),
        ("TagWord", DWORD),
        ("ErrorOffset", DWORD),
        ("ErrorSelector", DWORD),
        ("DataOffset", DWORD),
        ("DataSelector", DWORD),
        ("RegisterArea", BYTE * 80),
        ("Cr0NpxState", DWORD),
]

# class CONTEXT(ctypes.Structure):
#     _fields_ = [
    
#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]

DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong

class M128A(ctypes.Structure):
    _fields_ = [
        ("Low", ULONGLONG),
        ("High", LONGLONG)
    ]

class CONTEXT_UNION(ctypes.Union):
    _fields_ = [
        ("S", DWORD * 32)
    ]
class CONTEXT(ctypes.Structure):
    _fields_ = [
        ("P1Home", DWORD64),
        ("P2Home", DWORD64),
        ("P3Home", DWORD64),
        ("P4Home", DWORD64),
        ("P5Home", DWORD64),
        ("P6Home", DWORD64),
        ("ContextFlags", DWORD),
        ("MxCsr", DWORD),
        ("SegCs", WORD),
        ("SegDs", WORD),
        ("SegEs", WORD),
        ("SegFs", WORD),
        ("SegGs", WORD),
        ("SegSs", WORD),
        ("EFlags", DWORD),
        ("Dr0", DWORD),
        ("Dr1", DWORD),
        ("Dr2", DWORD),
        ("Dr3", DWORD),
        ("Dr6", DWORD),
        ("Dr7", DWORD),
        ("Rax", DWORD64),
        ("Rcx", DWORD64),
        ("Rdx", DWORD64),
        ("Rbx", DWORD64),
        ("Rsp", DWORD64),
        ("Rbp", DWORD64),
        ("Rsi", DWORD64),
        ("Rdi", DWORD64),
        ("R8", DWORD64),
        ("R9", DWORD64),
        ("R10", DWORD64),
        ("R11", DWORD64),
        ("R12", DWORD64),
        ("R13", DWORD64),
        ("R14", DWORD64),
        ("R15", DWORD64),
        ("Rip", DWORD64),
        ("DUMMYUNIONNAME",CONTEXT_UNION),
        ("VectorRegister", M128A * 26),

        ("VectorControl", DWORD64),
        ("DebugControl", DWORD64),
        ("LastBranchToRip", DWORD64),
        ("LastBranchFromRip", DWORD64),
        ("LastExceptionToRip", DWORD64),
        ("LastExceptionFromRip", DWORD64)
]
  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        self.h_process = None
        self.pid = None
        self.debugger_active = False
        # 实现调试事件处理功能
        self.h_thread = None
        self.context = None
        self.exception = None
        self.exception_address = None

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)

            # 保存上面创建进程的句柄,以供后续进程访问使用
            self.h_process = self.open_process(process_information.dwProcessId)

        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())

    # 打开 pid 进程,获得该进程的所有权限
    # 返回该进程句柄
    def open_process(self, pid):
        return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
    
    def attach(self, pid):
        self.h_process = self.open_process(pid)
        # DebugActiveProcess 附加进程
        if kernel32.DebugActiveProcess(pid):
            self.debugger_active = True
            self.pid = int(pid)
            # self.run()
        else:
            print("[*] Unable to attach to the process.")
    
    # 循环调用 WaitForDebugEvent ,等待调试事件发生
    def run(self):
        while self.debugger_active == True:
            self.get_debug_event()

    def get_debug_event(self):
        debug_event = my_debugger_defines.DEBUG_EVENT()
        continue_status = my_debugger_defines.DBG_CONTINUE
        
        # INFINITE:直到有调试事件发生
        # 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
        if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
        # 处理指定异常事件
            # 获取线程句柄
            self.h_thread = self.open_thread(debug_event.dwThreadId)
            self.context = self.get_thread_context(h_thread=self.h_thread)
            # 打印调试事件类型和线程ID
            print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))

            if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
                # 获取异常代码
                exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
                # 异常地址
                self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
                # 内存违法访问
                if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
                    print("Acess Violation Detected.")
                # 遇到一个断点
                elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
                    continue_status = self.exception_handler_breakpoint()
                # 内存断点
                elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
                    print("Guard Page Access Detected.")
                # 硬件断点
                elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
                    print("Single Stepping.")

            # input("press a key to continue...")
            # self.debugger_active = False

            # 调试事件处理完成后,恢复原执行状态
            # continue_status -> DBG_CONTINUE:下一步继续执行
            kernel32.ContinueDebugEvent(
                debug_event.dwProcessId,
                debug_event.dwThreadId,
                continue_status
            )
    def exception_handler_breakpoint(self):
        print("[*] Inside the breakpoint handler.")
        print("Exception address: 0x%08x" % self.exception_address)
        return my_debugger_defines.DBG_CONTINUE

    # DebugActiveProcessStop:去除附加
    def detach(self):
        if kernel32.DebugActiveProcessStop(self.pid):
            print("[*] Finished debugging. Exiting...")
            return True
        else:
            print("There was an error")
            return False
    
    # 通过线程ID打开线程,获取线程句柄
    def open_thread(self, thread_id):
        h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
        if h_thread is not None:
            return h_thread
        else:
            print("[*] Could not obtain a valid thread handle.")
            return False
    
    # 获取属于pid进程的线程的ID
    def enumerate_threads(self):
        # THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
        thread_entry = my_debugger_defines.THREADENTRY32()
        thread_list = []
        # CreateToolhelp32Snapshot返回一个快照句柄
        # TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
        # pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
        # 需要一一对比
        snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
        if snapshot is not None:
            thread_entry.dwSize = ctypes.sizeof(thread_entry)
            # 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
            success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
            while success:
                # 对比该线程的拥有者是不是 pid
                if thread_entry.th32OwnerProcessID == self.pid:
                    # 把该进程的线程ID放到列表
                    thread_list.append(thread_entry.th32ThreadID)
                # 遍历后面所有线程
                success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
            kernel32.CloseHandle(snapshot)
            return thread_list
        else:
            return False
    
    # 通过线程ID,获取线程上下文context
    def get_thread_context(self, thread_id=None, h_thread=None):
        # 线程上下文结构体为 CONTEXT
        context = my_debugger_defines.CONTEXT()
        context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS

        # 通过线程ID,获取线程句柄
        if h_thread is None:
            self.h_thread = self.open_thread(thread_id)
        # 通过线程句柄,线程上下文
        if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
            kernel32.CloseHandle(h_thread)
            return context
        else:
            return False
  • my_test.py
import my_debugger

debugger = my_debugger.debugger()

pid = input("Enter the PID of the process to attach to:")

debugger.attach(int(pid))

debugger.run()

debugger.detach()

要学习的内容:

DEBUG_EVENT 结构中的联合体和 dwDebugEventCode 的关系:
在这里插入图片描述get_debug_event 函数中实现了当 dwDebugEventCode 等于 0x1的情况。

值得说的是:在DEBUG_EVENT 联合体中 Exception.ExceptionRecord.ExceptionCode又很多取值,有一个是EXCEPTION_GUARD_PAGE,但这个取值在 微软学习手册里没有找到,在VS中的minwinbase.h头文件中有定义

#define EXCEPTION_GUARD_PAGE STATUS_GUARD_PAGE_VIOLATION

软断点(INT3)

这个节坑太多,原文中显示源代码和运行的都不一样。需要看源代码文件去慢慢领悟。
还有就是迁移到python3有很多需要注意的地方,例如语法不同、ctypes调用API的方法不同

  • my_debugger_define.py
import ctypes

BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF
DBG_CONTINUE = 0x00010002
INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF

# snapshot
TH32CS_SNAPTHREAD   = 0x00000004

# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010

# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1

# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004

# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE         = 0x00000040

# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

class EXCEPTION_RECORD(ctypes.Structure):
    pass
EXCEPTION_RECORD._fields_ = [
        ("ExceptionCode",        DWORD),
        ("ExceptionFlags",       DWORD),
        ("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),
        ("ExceptionAddress",     PVOID),
        ("NumberParameters",     DWORD),
        ("ExceptionInformation", UINT_PTR * 15),
        ]

class EXCEPTION_DEBUG_INFO(ctypes.Structure):
    _fields_ = [
        ("ExceptionRecord",    EXCEPTION_RECORD),
        ("dwFirstChance",      DWORD),
        ]
  
class DEBUG_EVENT_UNION(ctypes.Union):
    _fields_ = [
        ("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),
        ] 

class DEBUG_EVENT(ctypes.Structure):
    _fields_ = [
        ("dwDebugEventCode", DWORD),
        ("dwProcessId",      DWORD),
        ("dwThreadId",       DWORD),
        ("u",                DEBUG_EVENT_UNION),
        ]
    
class THREADENTRY32(ctypes.Structure):
    _fields_ = [
        ("dwSize",             DWORD),
        ("cntUsage",           DWORD),
        ("th32ThreadID",       DWORD),
        ("th32OwnerProcessID", DWORD),
        ("tpBasePri",          DWORD),
        ("tpDeltaPri",         DWORD),
        ("dwFlags",            DWORD),
    ]

class FLOATING_SAVE_AREA(ctypes.Structure):
   _fields_ = [
   
        ("ControlWord", DWORD),
        ("StatusWord", DWORD),
        ("TagWord", DWORD),
        ("ErrorOffset", DWORD),
        ("ErrorSelector", DWORD),
        ("DataOffset", DWORD),
        ("DataSelector", DWORD),
        ("RegisterArea", BYTE * 80),
        ("Cr0NpxState", DWORD),
]

# class CONTEXT(ctypes.Structure):
#     _fields_ = [
    
#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]

DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong

class M128A(ctypes.Structure):
    _fields_ = [
        ("Low", ULONGLONG),
        ("High", LONGLONG)
    ]

class CONTEXT_UNION(ctypes.Union):
    _fields_ = [
        ("S", DWORD * 32)
    ]
class CONTEXT(ctypes.Structure):
    _fields_ = [
        ("P1Home", DWORD64),
        ("P2Home", DWORD64),
        ("P3Home", DWORD64),
        ("P4Home", DWORD64),
        ("P5Home", DWORD64),
        ("P6Home", DWORD64),
        ("ContextFlags", DWORD),
        ("MxCsr", DWORD),
        ("SegCs", WORD),
        ("SegDs", WORD),
        ("SegEs", WORD),
        ("SegFs", WORD),
        ("SegGs", WORD),
        ("SegSs", WORD),
        ("EFlags", DWORD),
        ("Dr0", DWORD),
        ("Dr1", DWORD),
        ("Dr2", DWORD),
        ("Dr3", DWORD),
        ("Dr6", DWORD),
        ("Dr7", DWORD),
        ("Rax", DWORD64),
        ("Rcx", DWORD64),
        ("Rdx", DWORD64),
        ("Rbx", DWORD64),
        ("Rsp", DWORD64),
        ("Rbp", DWORD64),
        ("Rsi", DWORD64),
        ("Rdi", DWORD64),
        ("R8", DWORD64),
        ("R9", DWORD64),
        ("R10", DWORD64),
        ("R11", DWORD64),
        ("R12", DWORD64),
        ("R13", DWORD64),
        ("R14", DWORD64),
        ("R15", DWORD64),
        ("Rip", DWORD64),
        ("DUMMYUNIONNAME",CONTEXT_UNION),
        ("VectorRegister", M128A * 26),

        ("VectorControl", DWORD64),
        ("DebugControl", DWORD64),
        ("LastBranchToRip", DWORD64),
        ("LastBranchFromRip", DWORD64),
        ("LastExceptionToRip", DWORD64),
        ("LastExceptionFromRip", DWORD64)
]
    
LPCONTEXT = ctypes.POINTER(CONTEXT)
  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        self.h_process = None
        self.pid = None
        self.debugger_active = False
        # 实现调试事件处理功能
        self.h_thread = None
        self.context = None
        self.exception = None
        self.exception_address = None
        # 软断点
        self.breakpoints = {}
        self.first_breakpoint = True

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)

            # 保存上面创建进程的句柄,以供后续进程访问使用
            self.h_process = self.open_process(process_information.dwProcessId)

        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())

    # 打开 pid 进程,获得该进程的所有权限
    # 返回该进程句柄
    def open_process(self, pid):
        return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
    
    def attach(self, pid):
        self.h_process = self.open_process(pid)
        # DebugActiveProcess 附加进程
        if kernel32.DebugActiveProcess(pid):
            self.debugger_active = True
            self.pid = int(pid)
            # self.run()
        else:
            print("[*] Unable to attach to the process.")
    
    # 循环调用 WaitForDebugEvent ,等待调试事件发生
    def run(self):
        while self.debugger_active == True:
            self.get_debug_event()

    def get_debug_event(self):
        debug_event = my_debugger_defines.DEBUG_EVENT()
        continue_status = my_debugger_defines.DBG_CONTINUE
        
        # INFINITE:直到有调试事件发生
        # 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
        if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
        # 处理指定异常事件
            # 获取线程句柄
            self.h_thread = self.open_thread(debug_event.dwThreadId)
            self.context = self.get_thread_context(h_thread=self.h_thread)
            # 打印调试事件类型和线程ID
            print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))

            # Event Code = 0x1
            if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
                # 获取异常代码
                exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
                # 异常地址
                self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
                # 内存违法访问
                if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
                    print("Acess Violation Detected.")
                # 遇到一个断点
                elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
                    continue_status = self.exception_handler_breakpoint()
                # 内存断点
                elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
                    print("Guard Page Access Detected.")
                # 硬件断点
                elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
                    print("Single Stepping.")

            # input("press a key to continue...")
            # self.debugger_active = False

            # 调试事件处理完成后,恢复原执行状态
            # continue_status -> DBG_CONTINUE:下一步继续执行
            kernel32.ContinueDebugEvent(
                debug_event.dwProcessId,
                debug_event.dwThreadId,
                continue_status
            )
    def exception_handler_breakpoint(self):
        print("[*] Exception address: 0x%08x" % self.exception_address)
        # 检查断点是否为我们设置的断点
        if not (self.exception_address in self.breakpoints):           
                # 如果它是第一个Windows驱动的断点
                # 就继续执行
                if self.first_breakpoint == True:
                   self.first_breakpoint = False
                   print("[*] Hit the first breakpoint.")
                   return my_debugger_defines.DBG_CONTINUE
               
        else:
            print("[*] Hit user defined breakpoint.")
            # 处理设置的断点
            # 先还原原始数据(没设断点之前)
            self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])

            # 获取新的context
            # 将EIP重置回原始字节处            
            self.context = self.get_thread_context(h_thread=self.h_thread)
            if self.context:
                self.context.Rip -= 1
                # 用新的RIP值设置线程的上下文记录
                kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))
                self.debugger_active = True
            else:
                self.debugger_active = False
                
            continue_status = my_debugger_defines.DBG_CONTINUE


        return continue_status

    # DebugActiveProcessStop:去除附加
    def detach(self):
        if kernel32.DebugActiveProcessStop(self.pid):
            print("[*] Finished debugging. Exiting...")
            return True
        else:
            print("There was an error")
            return False
    
    # 通过线程ID打开线程,获取线程句柄
    def open_thread(self, thread_id):
        h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
        if h_thread is not None:
            return h_thread
        else:
            print("[*] Could not obtain a valid thread handle.")
            return False
    
    # 获取属于pid进程的线程的ID
    def enumerate_threads(self):
        # THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
        thread_entry = my_debugger_defines.THREADENTRY32()
        thread_list = []
        # CreateToolhelp32Snapshot返回一个快照句柄
        # TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
        # pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
        # 需要一一对比
        snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
        if snapshot is not None:
            thread_entry.dwSize = ctypes.sizeof(thread_entry)
            # 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
            success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
            while success:
                # 对比该线程的拥有者是不是 pid
                if thread_entry.th32OwnerProcessID == self.pid:
                    # 把该进程的线程ID放到列表
                    thread_list.append(thread_entry.th32ThreadID)
                # 遍历后面所有线程
                success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
            kernel32.CloseHandle(snapshot)
            return thread_list
        else:
            return False
    
    # 通过线程ID,获取线程上下文context
    def get_thread_context(self, thread_id=None, h_thread=None):
        # 线程上下文结构体为 CONTEXT
        context = my_debugger_defines.CONTEXT()
        context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS

        # 通过线程ID,获取线程句柄
        if h_thread is None:
            self.h_thread = self.open_thread(thread_id)
        # 通过线程句柄,线程上下文
        kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]
        kernel32.GetThreadContext.restype = ctypes.c_bool
        if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
            kernel32.CloseHandle(h_thread)
            return context
        else:
            # return my_debugger_defines.CONTEXT()
            return False
        
    # 读取address处的内容length个字节
    # 返回读取到的内容
    def read_process_memory(self, address, length):
        data = b""
        read_buf = ctypes.create_string_buffer(length)
        count = ctypes.c_ulong(0)
        kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
        kernel32.ReadProcessMemory.restype = ctypes.c_bool
        if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):
            return False
        else:
            data += read_buf.raw
            return data
    
    # 把data数据写入到address处
    def write_process_memory(self, address, data):
        count = ctypes.c_ulong(0)
        length = len(data)
        c_data = ctypes.c_char_p(data[count.value:])
        kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
        kernel32.WriteProcessMemory.restype = ctypes.c_bool
        if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):
            return False
        else:
            return True
    
    def bp_set(self, address):
        print("[*] Setting breakpoint at: 0x%08x" % address)
        if not (address in self.breakpoints):
            # 设置断点前,先保存之前的数据
            old_protect = ctypes.c_ulong(0)
            kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]
            kernel32.VirtualProtectEx.restype = ctypes.c_bool
            kernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))
            
            # 读取原始1个字节
            original_byte = self.read_process_memory(address, 1)
            if original_byte != False:
                # 写入 INT3 opcode
                if self.write_process_memory(address, b"\xCC"):
                    # 在字典中注册断点
                    self.breakpoints[address] = original_byte
                    return True
                else:
                    return False
    
    # 解析模块中函数的地址
    def func_resolve(self, dll, function):

        # HMODULE GetModuleHandleW(
        #     [in, optional] LPCWSTR lpModuleName
        # );
        kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
        kernel32.GetModuleHandleW.restype = ctypes.c_void_p
        # FARPROC GetProcAddress(
        #     [in] HMODULE hModule,
        #     [in] LPCSTR  lpProcName
        # );
        kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
        kernel32.GetProcAddress.restype = ctypes.c_void_p

        handle = kernel32.GetModuleHandleW(dll)

        address = kernel32.GetProcAddress(handle, function)

        kernel32.CloseHandle.argtypes = [ctypes.c_void_p]
        kernel32.CloseHandle.restype = ctypes.c_bool
        kernel32.CloseHandle(handle)
        return address
  • my_test.py
import my_debugger

debugger = my_debugger.debugger()

pid = input("Enter the PID of the process to attach to:")

debugger.attach(int(pid))

printf_address = debugger.func_resolve("msvcrt.dll", b"printf")

print("[*] Address of printf: 0x%08x" % printf_address)

debugger.bp_set(printf_address)

debugger.run()

debugger.detach()
  • printf_loop.py

这个是被调试程序

import ctypes
import time

msvcrt = ctypes.cdll.msvcrt

counter = 0

while True:
    msvcrt.printf(b"Loop iteration %d!\n", counter)
    time.sleep(2)
    counter += 1

运行结果正好符合预期

Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 6 Thread ID: 18744
Event Code: 2 Thread ID: 18880
Event Code: 1 Thread ID: 18880
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 18880
Event Code: 1 Thread ID: 18744
[*] Exception address: 0x7ffe68b88b50
[*] Hit user defined breakpoint.
[*] Finished debugging. Exiting...

硬件断点

这里define修改了CONTEXT结构体,之前没有影响,在硬件断点出现故障了

  • my_debugger_define.py
import ctypes

BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF

DBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001

INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF

# snapshot
TH32CS_SNAPTHREAD   = 0x00000004

# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010

# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1

# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004

# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE         = 0x00000040

# Hardware breakpoint conditions
HW_ACCESS                      = 0x00000003
HW_EXECUTE                     = 0x00000000
HW_WRITE                       = 0x00000001

# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

class EXCEPTION_RECORD(ctypes.Structure):
    pass
EXCEPTION_RECORD._fields_ = [
        ("ExceptionCode",        DWORD),
        ("ExceptionFlags",       DWORD),
        ("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),
        ("ExceptionAddress",     PVOID),
        ("NumberParameters",     DWORD),
        ("ExceptionInformation", UINT_PTR * 15),
        ]

class EXCEPTION_DEBUG_INFO(ctypes.Structure):
    _fields_ = [
        ("ExceptionRecord",    EXCEPTION_RECORD),
        ("dwFirstChance",      DWORD),
        ]
  
class DEBUG_EVENT_UNION(ctypes.Union):
    _fields_ = [
        ("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),
        ] 

class DEBUG_EVENT(ctypes.Structure):
    _fields_ = [
        ("dwDebugEventCode", DWORD),
        ("dwProcessId",      DWORD),
        ("dwThreadId",       DWORD),
        ("u",                DEBUG_EVENT_UNION),
        ]
    
class THREADENTRY32(ctypes.Structure):
    _fields_ = [
        ("dwSize",             DWORD),
        ("cntUsage",           DWORD),
        ("th32ThreadID",       DWORD),
        ("th32OwnerProcessID", DWORD),
        ("tpBasePri",          DWORD),
        ("tpDeltaPri",         DWORD),
        ("dwFlags",            DWORD),
    ]

class FLOATING_SAVE_AREA(ctypes.Structure):
   _fields_ = [
   
        ("ControlWord", DWORD),
        ("StatusWord", DWORD),
        ("TagWord", DWORD),
        ("ErrorOffset", DWORD),
        ("ErrorSelector", DWORD),
        ("DataOffset", DWORD),
        ("DataSelector", DWORD),
        ("RegisterArea", BYTE * 80),
        ("Cr0NpxState", DWORD),
]

# class CONTEXT(ctypes.Structure):
#     _fields_ = [
    
#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]

DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong

class M128A(ctypes.Structure):
    _fields_ = [
        ("Low", ULONGLONG),
        ("High", LONGLONG)
    ]

class CONTEXT_UNION(ctypes.Union):
    _fields_ = [
        ("S", DWORD * 32)
    ]
class CONTEXT(ctypes.Structure):
    _fields_ = [
        ("P1Home", DWORD64),
        ("P2Home", DWORD64),
        ("P3Home", DWORD64),
        ("P4Home", DWORD64),
        ("P5Home", DWORD64),
        ("P6Home", DWORD64),
        ("ContextFlags", DWORD),
        ("MxCsr", DWORD),
        ("SegCs", WORD),
        ("SegDs", WORD),
        ("SegEs", WORD),
        ("SegFs", WORD),
        ("SegGs", WORD),
        ("SegSs", WORD),
        ("EFlags", DWORD),
        ("Dr0", DWORD64),
        ("Dr1", DWORD64),
        ("Dr2", DWORD64),
        ("Dr3", DWORD64),
        ("Dr6", DWORD64),
        ("Dr7", DWORD64),
        ("Rax", DWORD64),
        ("Rcx", DWORD64),
        ("Rdx", DWORD64),
        ("Rbx", DWORD64),
        ("Rsp", DWORD64),
        ("Rbp", DWORD64),
        ("Rsi", DWORD64),
        ("Rdi", DWORD64),
        ("R8", DWORD64),
        ("R9", DWORD64),
        ("R10", DWORD64),
        ("R11", DWORD64),
        ("R12", DWORD64),
        ("R13", DWORD64),
        ("R14", DWORD64),
        ("R15", DWORD64),
        ("Rip", DWORD64),
        ("DUMMYUNIONNAME",CONTEXT_UNION),
        ("VectorRegister", M128A * 26),

        ("VectorControl", DWORD64),
        ("DebugControl", DWORD64),
        ("LastBranchToRip", DWORD64),
        ("LastBranchFromRip", DWORD64),
        ("LastExceptionToRip", DWORD64),
        ("LastExceptionFromRip", DWORD64)
]
    
LPCONTEXT = ctypes.POINTER(CONTEXT)
  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        self.h_process = None
        self.pid = None
        self.debugger_active = False
        # 实现调试事件处理功能
        self.h_thread = None
        self.context = None
        self.exception = None
        self.exception_address = None
        # 软断点
        self.breakpoints = {}
        self.first_breakpoint = True
        # 硬中断
        self.hardware_breakpoints = {}

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)

            # 保存上面创建进程的句柄,以供后续进程访问使用
            self.h_process = self.open_process(process_information.dwProcessId)

        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())

    # 打开 pid 进程,获得该进程的所有权限
    # 返回该进程句柄
    def open_process(self, pid):
        return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
    
    def attach(self, pid):
        self.h_process = self.open_process(pid)
        # DebugActiveProcess 附加进程
        if kernel32.DebugActiveProcess(pid):
            self.debugger_active = True
            self.pid = int(pid)
            # self.run()
        else:
            print("[*] Unable to attach to the process.")
    
    # 循环调用 WaitForDebugEvent ,等待调试事件发生
    def run(self):
        while self.debugger_active == True:
            self.get_debug_event()

    def get_debug_event(self):
        debug_event = my_debugger_defines.DEBUG_EVENT()
        continue_status = my_debugger_defines.DBG_CONTINUE
        
        # INFINITE:直到有调试事件发生
        # 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
        if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
        # 处理指定异常事件
            # 获取线程句柄
            self.h_thread = self.open_thread(debug_event.dwThreadId)
            self.context = self.get_thread_context(h_thread=self.h_thread)
            # 打印调试事件类型和线程ID
            print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))

            # Event Code = 0x1
            if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
                # 获取异常代码
                exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
                # print("0x%08x" % exception)
                # 异常地址
                self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
                # 内存违法访问
                if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
                    print("Acess Violation Detected.")
                # 遇到一个断点
                elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
                    continue_status = self.exception_handler_breakpoint()
                # 内存断点
                elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
                    print("Guard Page Access Detected.")
                # 硬件断点
                elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
                    print("Single Stepping.")
                    self.exception_handler_single_step()

            # input("press a key to continue...")
            # self.debugger_active = False

            # 调试事件处理完成后,恢复原执行状态
            # continue_status -> DBG_CONTINUE:下一步继续执行
            kernel32.ContinueDebugEvent(
                debug_event.dwProcessId,
                debug_event.dwThreadId,
                continue_status
            )
    def exception_handler_breakpoint(self):
        print("[*] Exception address: 0x%08x" % self.exception_address)
        # 检查断点是否为我们设置的断点
        if not (self.exception_address in self.breakpoints):           
                # 如果它是第一个Windows驱动的断点
                # 就继续执行
                if self.first_breakpoint == True:
                   self.first_breakpoint = False
                   print("[*] Hit the first breakpoint.")
                   return my_debugger_defines.DBG_CONTINUE
               
        else:
            print("[*] Hit user defined breakpoint.")
            # 处理设置的断点
            # 先还原原始数据(没设断点之前)
            self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])

            # 获取新的context
            # 将EIP重置回原始字节处            
            self.context = self.get_thread_context(h_thread=self.h_thread)
            if self.context:
                self.context.Rip -= 1
                # 用新的RIP值设置线程的上下文记录
                kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))
                self.debugger_active = True
            else:
                self.debugger_active = False
                
            continue_status = my_debugger_defines.DBG_CONTINUE

        return continue_status
    
    def exception_handler_single_step(self):
        if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):
            slot = 0
        elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):
            slot = 1
        elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):
            slot = 2
        elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):
            slot = 3
        else:
            continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED

        # 从断点字典中删除断点
        if self.bp_del_hw(slot):
            continue_status = my_debugger_defines.DBG_CONTINUE

        print("[*] Hardware breakpoint removed.")
        return continue_status
    
    def bp_del_hw(self, slot):
        for thread_id in self.enumerate_threads():

            context = self.get_thread_context(thread_id=thread_id)
            
            # 给所有线程删除断点
            context.Dr7 &= ~(1 << (slot * 2))

            # 将断点地址清零
            if   slot == 0: 
                context.Dr0 = 0x0000000000000000
            elif slot == 1: 
                context.Dr1 = 0x0000000000000000
            elif slot == 2: 
                context.Dr2 = 0x0000000000000000
            elif slot == 3: 
                context.Dr3 = 0x0000000000000000

            # 移除Dr7中的触发断点标志位
            context.Dr7 &= ~(3 << ((slot * 4) + 16))

            # 移除断点长度标志位
            context.Dr7 &= ~(3 << ((slot * 4) + 18))

            # 提交移除断点后的线程context
            h_thread = self.open_thread(thread_id)
            kernel32.SetThreadContext(h_thread, ctypes.byref(context))
            
        # 把该断点从字典中移除
        del self.hardware_breakpoints[slot]

        return True


    # DebugActiveProcessStop:去除附加
    def detach(self):
        if kernel32.DebugActiveProcessStop(self.pid):
            print("[*] Finished debugging. Exiting...")
            return True
        else:
            print("There was an error")
            return False
    
    # 通过线程ID打开线程,获取线程句柄
    def open_thread(self, thread_id):
        h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
        if h_thread is not None:
            return h_thread
        else:
            print("[*] Could not obtain a valid thread handle.")
            return False
    
    # 获取属于pid进程的线程的ID
    def enumerate_threads(self):
        # THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
        thread_entry = my_debugger_defines.THREADENTRY32()
        thread_list = []
        # CreateToolhelp32Snapshot返回一个快照句柄
        # TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
        # pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
        # 需要一一对比
        snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
        if snapshot is not None:
            thread_entry.dwSize = ctypes.sizeof(thread_entry)
            # 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
            success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
            while success:
                # 对比该线程的拥有者是不是 pid
                if thread_entry.th32OwnerProcessID == self.pid:
                    # 把该进程的线程ID放到列表
                    thread_list.append(thread_entry.th32ThreadID)
                # 遍历后面所有线程
                success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
            kernel32.CloseHandle(snapshot)
            return thread_list
        else:
            return False
    
    # 通过线程ID,获取线程上下文context
    def get_thread_context(self, thread_id=None, h_thread=None):
        # 线程上下文结构体为 CONTEXT
        context = my_debugger_defines.CONTEXT()
        context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS

        # 通过线程ID,获取线程句柄
        if h_thread is None:
            self.h_thread = self.open_thread(thread_id)
        # 通过线程句柄,线程上下文
        kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]
        kernel32.GetThreadContext.restype = ctypes.c_bool
        if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
            kernel32.CloseHandle(h_thread)
            return context
        else:
            # return my_debugger_defines.CONTEXT()
            return False
        
    # 读取address处的内容length个字节
    # 返回读取到的内容
    def read_process_memory(self, address, length):
        data = b""
        read_buf = ctypes.create_string_buffer(length)
        count = ctypes.c_ulong(0)
        kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
        kernel32.ReadProcessMemory.restype = ctypes.c_bool
        if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):
            return False
        else:
            data += read_buf.raw
            return data
    
    # 把data数据写入到address处
    def write_process_memory(self, address, data):
        count = ctypes.c_ulong(0)
        length = len(data)
        c_data = ctypes.c_char_p(data[count.value:])
        kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
        kernel32.WriteProcessMemory.restype = ctypes.c_bool
        if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):
            return False
        else:
            return True
    
    def bp_set(self, address):
        print("[*] Setting breakpoint at: 0x%08x" % address)
        if not (address in self.breakpoints):
            # 设置断点前,先保存之前的数据
            old_protect = ctypes.c_ulong(0)
            kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]
            kernel32.VirtualProtectEx.restype = ctypes.c_bool
            kernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))
            
            # 读取原始1个字节
            original_byte = self.read_process_memory(address, 1)
            if original_byte != False:
                # 写入 INT3 opcode
                if self.write_process_memory(address, b"\xCC"):
                    # 在字典中注册断点
                    self.breakpoints[address] = original_byte
                    return True
                else:
                    return False
    # 硬件断点
    def bp_set_hw(self, address, length, condition):
        # 检测硬件断点长度是否有效
        if length not in (1, 2, 4):
            return False
        else:
            length -= 1

        # 检测硬件断点的触发调试是否有效
        if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):
            return False
        # 检测是否存在空置的调试器槽
        if not (0 in self.hardware_breakpoints):
            available = 0
        elif not (1 in self.hardware_breakpoints):
            available = 1
        elif not (2 in self.hardware_breakpoints):
            available = 2
        elif not (3 in self.hardware_breakpoints):
            available = 3
        else:
            return False
        
        # 在每个线程环境下设置调试寄存器
        for thread_id in self.enumerate_threads():
            context = self.get_thread_context(thread_id=thread_id)
            # 设置DR7中对应的标志位,激活硬件断点
            context.Dr7 |= 1 << (available * 2)
            # 在空闲的DR0 ~ 3寄存器中写入断点地址
            if available == 0:
                context.Dr0 = address
                # print("Dr0 0x%08x" % context.Dr0)
            elif available == 1:
                context.Dr1 = address
                # print("Dr1 0x%08x" % context.Dr1)
            elif available == 2:
                context.Dr2 = address
                # print("Dr2 0x%08x" % context.Dr2)
            elif available == 3:
                context.Dr3 = address
                # print("Dr3 0x%08x" % context.Dr3)
            # 设置触发条件
            context.Dr7 |= condition << ((available * 4) + 16)

            # 设置触发长度
            context.Dr7 |= length << ((available * 4) + 18)

            # 提交改动后的context
            h_thread = self.open_thread(thread_id)
            kernel32.SetThreadContext(h_thread, ctypes.byref(context))

        # 更新内部硬件断点列表
        self.hardware_breakpoints[available] = (address, length, condition)
        return True


    # 解析模块中函数的地址
    def func_resolve(self, dll, function):

        # HMODULE GetModuleHandleW(
        #     [in, optional] LPCWSTR lpModuleName
        # );
        kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
        kernel32.GetModuleHandleW.restype = ctypes.c_void_p
        # FARPROC GetProcAddress(
        #     [in] HMODULE hModule,
        #     [in] LPCSTR  lpProcName
        # );
        kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
        kernel32.GetProcAddress.restype = ctypes.c_void_p

        handle = kernel32.GetModuleHandleW(dll)

        address = kernel32.GetProcAddress(handle, function)

        kernel32.CloseHandle.argtypes = [ctypes.c_void_p]
        kernel32.CloseHandle.restype = ctypes.c_bool
        kernel32.CloseHandle(handle)
        return address

  • printf_loop.py
import ctypes
import time

msvcrt = ctypes.cdll.msvcrt

counter = 0

while True:
    msvcrt.printf(b"Loop iteration %d!\n", counter)
    time.sleep(2)
    counter += 1
  • my_test.py
import my_debugger
from my_debugger_defines import *

debugger = my_debugger.debugger()

pid = input("Enter the PID of the process to attach to:")

debugger.attach(int(pid))

printf_address = debugger.func_resolve("msvcrt.dll", b"printf")

print("[*] Address of printf: 0x%08x" % printf_address)

debugger.bp_set_hw(printf_address, 1, HW_EXECUTE)

debugger.run()

debugger.detach()

运行结果(这里显示了一些调试的print)

Event Code: 2 Thread ID: 9928
Event Code: 1 Thread ID: 9928
0x80000003
[*] Exception address: 0x7ffe68e10b10
[*] Hit the first breakpoint.
Event Code: 4 Thread ID: 9928
Event Code: 1 Thread ID: 18456
0x80000004
Single Stepping.
[*] Hardware breakpoint removed.
Event Code: 2 Thread ID: 21584

内存断点

内存断点本质上不是真正的断点。当一个调试器设置一个内存断点时,调试器实质上所做的是改变一个内存区域或–个内存页的访问权限。内存页是操作系统可以一次处理的最小内存块。操作系统每分配一个内存页时,都会为这个内存页设置访问权限,该权限决定了这个内存页被访问的方式。以下是几个不同的内存页访问权限的例子:

  • 页可执行:允许进程执行页上的代码,但是如果进程试图读写这个页将导致非法内存操作异常;
  • 页可读:进程只能从这个内存页中读取数据;任何企图写入数据或者执行代码的操作会导致非法内存操作异常;
  • 页可写:允许进程在这个内存页上写入数据;
  • 保护页:对保护页任何类型的访问将导致一次性异常, 之后这个内存页会恢复到之前的状态。

主要过程

VirtualQueryEx :找内存断点区域的内存页
VirtualProtectEx :修改内存页属性为保护页

  • my_debugger_define.py
import ctypes

BYTE = ctypes.c_ubyte
WORD = ctypes.c_ushort
DWORD = ctypes.c_ulong
LPBYTE = ctypes.POINTER(ctypes.c_ubyte)
LPTSTR = ctypes.POINTER(ctypes.c_wchar)
LPCSTR = ctypes.POINTER(ctypes.c_char)
LPVOID    = ctypes.c_void_p
HANDLE = ctypes.c_void_p
PVOID = ctypes.c_void_p
UINT_PTR  = ctypes.c_ulong
SIZE_T    = ctypes.c_ulong


DEBUG_PROCESS = 0x00000001
CREATE_NEW_CONSOLE = 0x00000010
PROCESS_ALL_ACCESS = 0x001F0FFF

DBG_CONTINUE = 0x00010002
DBG_EXCEPTION_NOT_HANDLED = 0x80010001

INFINITE = 0x00010002
THREAD_ALL_ACCESS = 0x001F03FF

# snapshot
TH32CS_SNAPTHREAD   = 0x00000004

# Context flags for GetThreadContext()
CONTEXT_FULL                   = 0x00010007
CONTEXT_DEBUG_REGISTERS        = 0x00010010

# Debug event constants
EXCEPTION_DEBUG_EVENT      =    0x1

# debug exception codes.
EXCEPTION_ACCESS_VIOLATION     = 0xC0000005
EXCEPTION_BREAKPOINT           = 0x80000003
EXCEPTION_GUARD_PAGE           = 0x80000001
EXCEPTION_SINGLE_STEP          = 0x80000004

# Memory page permissions, used by VirtualProtect()
PAGE_EXECUTE_READWRITE         = 0x00000040

# Hardware breakpoint conditions
HW_ACCESS                      = 0x00000003
HW_EXECUTE                     = 0x00000000
HW_WRITE                       = 0x00000001

# Memory page permissions, used by VirtualProtect()
PAGE_NOACCESS                  = 0x00000001
PAGE_READONLY                  = 0x00000002
PAGE_READWRITE                 = 0x00000004
PAGE_WRITECOPY                 = 0x00000008
PAGE_EXECUTE                   = 0x00000010
PAGE_EXECUTE_READ              = 0x00000020
PAGE_EXECUTE_READWRITE         = 0x00000040
PAGE_EXECUTE_WRITECOPY         = 0x00000080
PAGE_GUARD                     = 0x00000100
PAGE_NOCACHE                   = 0x00000200
PAGE_WRITECOMBINE              = 0x00000400

# 定义CreateProcessA需要的结构体
# typedef struct _STARTUPINFO {  
#     DWORD cb;  
#     LPTSTR lpReserved;  
#     LPTSTR lpDesktop;  
#     LPTSTR lpTitle;  
#     DWORD dwX;  
#     DWORD dwY;  
#     DWORD dwXSize;  
#     DWORD dwYSize;  
#     DWORD dwXCountChars;  
#     DWORD dwYCountChars;  
#     DWORD dwFillAttribute;  
#     DWORD dwFlags;  
#     WORD wShowWindow;  
#     WORD cbReserved2;  
#     LPBYTE lpReserved2;  
#     HANDLE hStdInput;  
#     HANDLE hStdOutput;  
#     HANDLE hStdError;
# } STARTUPINFO,  *LPSTARTUPINFO;

class STARTUPINFO(ctypes.Structure):
    _fields_ = [
        ("cb", DWORD),
        ("lpReserved", LPTSTR),
        ("lpDesktop", LPTSTR),
        ("lpTitle", LPTSTR),
        ("dwX", DWORD),
        ("dwY", DWORD),
        ("dwXSize", DWORD),
        ("dwYSize", DWORD),
        ("dwXcountChars", DWORD),
        ("dwYcountChars", DWORD),
        ("dwFillAttribute", DWORD),
        ("dwFlags", DWORD),
        ("wShowWindow", WORD),
        ("cbReserved2", WORD),
        ("lpReserved2", LPBYTE),
        ("hStdIput", HANDLE),
        ("hStdOutput", HANDLE),
        ("hStdError", HANDLE)   
    ]

# typedef struct _PROCESS_INFORMATION {  
#     HANDLE hProcess;  
#     HANDLE hThread;  
#     DWORD dwProcessId;  
#     DWORD dwThreadId;
# } PROCESS_INFORMATION,  *LPPROCESS_INFORMATION;

class PROCESS_INFORMATION(ctypes.Structure):
    _fields_ = [
        ("hProcess",    HANDLE),
        ("hThread",     HANDLE),
        ("dwProcessId", DWORD),
        ("dwThreadId",  DWORD),
        ]

class EXCEPTION_RECORD(ctypes.Structure):
    pass
EXCEPTION_RECORD._fields_ = [
        ("ExceptionCode",        DWORD),
        ("ExceptionFlags",       DWORD),
        ("ExceptionRecord",      ctypes.POINTER(EXCEPTION_RECORD)),
        ("ExceptionAddress",     PVOID),
        ("NumberParameters",     DWORD),
        ("ExceptionInformation", UINT_PTR * 15),
        ]

class EXCEPTION_DEBUG_INFO(ctypes.Structure):
    _fields_ = [
        ("ExceptionRecord",    EXCEPTION_RECORD),
        ("dwFirstChance",      DWORD),
        ]
  
class DEBUG_EVENT_UNION(ctypes.Union):
    _fields_ = [
        ("Exception",         EXCEPTION_DEBUG_INFO),
#        ("CreateThread",      CREATE_THREAD_DEBUG_INFO),
#        ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO),
#        ("ExitThread",        EXIT_THREAD_DEBUG_INFO),
#        ("ExitProcess",       EXIT_PROCESS_DEBUG_INFO),
#        ("LoadDll",           LOAD_DLL_DEBUG_INFO),
#        ("UnloadDll",         UNLOAD_DLL_DEBUG_INFO),
#        ("DebugString",       OUTPUT_DEBUG_STRING_INFO),
#        ("RipInfo",           RIP_INFO),
        ] 

class DEBUG_EVENT(ctypes.Structure):
    _fields_ = [
        ("dwDebugEventCode", DWORD),
        ("dwProcessId",      DWORD),
        ("dwThreadId",       DWORD),
        ("u",                DEBUG_EVENT_UNION),
        ]
    
class THREADENTRY32(ctypes.Structure):
    _fields_ = [
        ("dwSize",             DWORD),
        ("cntUsage",           DWORD),
        ("th32ThreadID",       DWORD),
        ("th32OwnerProcessID", DWORD),
        ("tpBasePri",          DWORD),
        ("tpDeltaPri",         DWORD),
        ("dwFlags",            DWORD),
    ]

class FLOATING_SAVE_AREA(ctypes.Structure):
   _fields_ = [
   
        ("ControlWord", DWORD),
        ("StatusWord", DWORD),
        ("TagWord", DWORD),
        ("ErrorOffset", DWORD),
        ("ErrorSelector", DWORD),
        ("DataOffset", DWORD),
        ("DataSelector", DWORD),
        ("RegisterArea", BYTE * 80),
        ("Cr0NpxState", DWORD),
]

# class CONTEXT(ctypes.Structure):
#     _fields_ = [
    
#         ("ContextFlags", DWORD),
#         ("Dr0", DWORD),
#         ("Dr1", DWORD),
#         ("Dr2", DWORD),
#         ("Dr3", DWORD),
#         ("Dr6", DWORD),
#         ("Dr7", DWORD),
#         ("FloatSave", FLOATING_SAVE_AREA),
#         ("SegGs", DWORD),
#         ("SegFs", DWORD),
#         ("SegEs", DWORD),
#         ("SegDs", DWORD),
#         ("Edi", DWORD),
#         ("Esi", DWORD),
#         ("Ebx", DWORD),
#         ("Edx", DWORD),
#         ("Ecx", DWORD),
#         ("Eax", DWORD),
#         ("Ebp", DWORD),
#         ("Eip", DWORD),
#         ("SegCs", DWORD),
#         ("EFlags", DWORD),
#         ("Esp", DWORD),
#         ("SegSs", DWORD),
#         ("ExtendedRegisters", BYTE * 512),
# ]

DWORD64 = ctypes.c_ulonglong
ULONGLONG = ctypes.c_ulonglong
LONGLONG = ctypes.c_longlong

class M128A(ctypes.Structure):
    _fields_ = [
        ("Low", ULONGLONG),
        ("High", LONGLONG)
    ]

# class DUMMYUNIONNAME(Union):
#     _fields_=[
#               ("FltSave", XMM_SAVE_AREA32),
#               ("DummyStruct", DUMMYSTRUCTNAME)
#               ]
 
# class DUMMYSTRUCTNAME(Structure):
#     _fields_=[
#               ("Header", M128A * 2),
#               ("Legacy", M128A * 8),
#               ("Xmm0", M128A),
#               ("Xmm1", M128A),
#               ("Xmm2", M128A),
#               ("Xmm3", M128A),
#               ("Xmm4", M128A),
#               ("Xmm5", M128A),
#               ("Xmm6", M128A),
#               ("Xmm7", M128A),
#               ("Xmm8", M128A),
#               ("Xmm9", M128A),
#               ("Xmm10", M128A),
#               ("Xmm11", M128A),
#               ("Xmm12", M128A),
#               ("Xmm13", M128A),
#               ("Xmm14", M128A),
#               ("Xmm15", M128A)
#               ]
 
# class XMM_SAVE_AREA32(Structure):
#     _pack_ = 1 
#     _fields_ = [  
#                 ('ControlWord', WORD), 
#                 ('StatusWord', WORD), 
#                 ('TagWord', BYTE), 
#                 ('Reserved1', BYTE), 
#                 ('ErrorOpcode', WORD), 
#                 ('ErrorOffset', DWORD), 
#                 ('ErrorSelector', WORD), 
#                 ('Reserved2', WORD), 
#                 ('DataOffset', DWORD), 
#                 ('DataSelector', WORD), 
#                 ('Reserved3', WORD), 
#                 ('MxCsr', DWORD), 
#                 ('MxCsr_Mask', DWORD), 
#                 ('FloatRegisters', M128A * 8), 
#                 ('XmmRegisters', M128A * 16), 
#                 ('Reserved4', BYTE * 96)
#                 ] 

class CONTEXT_UNION(ctypes.Union):
    _fields_ = [
        ("S", DWORD * 32)
    ]
class CONTEXT(ctypes.Structure):
    _fields_ = [
        ("P1Home", DWORD64),
        ("P2Home", DWORD64),
        ("P3Home", DWORD64),
        ("P4Home", DWORD64),
        ("P5Home", DWORD64),
        ("P6Home", DWORD64),
        ("ContextFlags", DWORD),
        ("MxCsr", DWORD),
        ("SegCs", WORD),
        ("SegDs", WORD),
        ("SegEs", WORD),
        ("SegFs", WORD),
        ("SegGs", WORD),
        ("SegSs", WORD),
        ("EFlags", DWORD),
        ("Dr0", DWORD64),
        ("Dr1", DWORD64),
        ("Dr2", DWORD64),
        ("Dr3", DWORD64),
        ("Dr6", DWORD64),
        ("Dr7", DWORD64),
        ("Rax", DWORD64),
        ("Rcx", DWORD64),
        ("Rdx", DWORD64),
        ("Rbx", DWORD64),
        ("Rsp", DWORD64),
        ("Rbp", DWORD64),
        ("Rsi", DWORD64),
        ("Rdi", DWORD64),
        ("R8", DWORD64),
        ("R9", DWORD64),
        ("R10", DWORD64),
        ("R11", DWORD64),
        ("R12", DWORD64),
        ("R13", DWORD64),
        ("R14", DWORD64),
        ("R15", DWORD64),
        ("Rip", DWORD64),
        ("DUMMYUNIONNAME",CONTEXT_UNION),
        ("VectorRegister", M128A * 26),

        ("VectorControl", DWORD64),
        ("DebugControl", DWORD64),
        ("LastBranchToRip", DWORD64),
        ("LastBranchFromRip", DWORD64),
        ("LastExceptionToRip", DWORD64),
        ("LastExceptionFromRip", DWORD64)
]
    
LPCONTEXT = ctypes.POINTER(CONTEXT)


class PROC_STRUCT(ctypes.Structure):
    _fields_ = [
        ("wProcessorArchitecture",    WORD),
        ("wReserved",                 WORD),
]
class SYSTEM_INFO_UNION(ctypes.Union):
    _fields_ = [
        ("dwOemId",    DWORD),
        ("sProcStruc", PROC_STRUCT),
]
class SYSTEM_INFO(ctypes.Structure):
    _fields_ = [
        ("uSysInfo", SYSTEM_INFO_UNION),
        ("dwPageSize", DWORD),
        ("lpMinimumApplicationAddress", LPVOID),
        ("lpMaximumApplicationAddress", LPVOID),
        ("dwActiveProcessorMask", DWORD),
        ("dwNumberOfProcessors", DWORD),
        ("dwProcessorType", DWORD),
        ("dwAllocationGranularity", DWORD),
        ("wProcessorLevel", WORD),
        ("wProcessorRevision", WORD),
]
    

# class MEMORY_BASIC_INFORMATION(ctypes.Structure):
#     _fields_ = [
#         ("BaseAddress", PVOID),
#         ("AllocationBase", PVOID),
#         ("AllocationProtect", DWORD),
#         ("PartitionId", WORD),
#         ("RegionSize", SIZE_T),
#         ("State", DWORD),
#         ("Protect", DWORD),
#         ("Type", DWORD),
# ]
class MEMORY_BASIC_INFORMATION (ctypes.Structure):
    _fields_ = [
        ("BaseAddress", ULONGLONG),
        ("AllocationBase", ULONGLONG),
        ("AllocationProtect", DWORD),
        ("__alignment1", DWORD),
        ("RegionSize", ULONGLONG),
        ("State", DWORD),
        ("Protect", DWORD),
        ("Type", DWORD),
        ("__alignment2", DWORD)
        ]
PMEMORY_BASIC_INFORMATION = ctypes.POINTER(MEMORY_BASIC_INFORMATION)
  • my_debugger.py
import ctypes
import my_debugger_defines

kernel32 = ctypes.windll.kernel32

class debugger():
    def __init__(self):
        self.h_process = None
        self.pid = None
        self.debugger_active = False
        # 实现调试事件处理功能
        self.h_thread = None
        self.context = None
        self.exception = None
        self.exception_address = None
        # 软断点
        self.breakpoints = {}
        self.first_breakpoint = True
        # 硬件断点
        self.hardware_breakpoints = {}

        # 内存断点
        self.guarded_pages = []
        self.memory_breakpoints = {}
        # 获取当前系统默认的内存页的大小设定
        system_info = my_debugger_defines.SYSTEM_INFO()
        kernel32.GetSystemInfo(ctypes.byref(system_info))
        self.page_size = system_info.dwPageSize

    def load(self, path_to_exe):
        creation_flags = my_debugger_defines.DEBUG_PROCESS

        startupinfo = my_debugger_defines.STARTUPINFO()
        process_information = my_debugger_defines.PROCESS_INFORMATION()

        startupinfo.dwFlags = 0x1
        startupinfo.wShowWindow = 0x0

        startupinfo.cb = ctypes.sizeof(startupinfo)

        if kernel32.CreateProcessW(path_to_exe,
                                  None,
                                  None,
                                  None,
                                  None,
                                  creation_flags,
                                  None,
                                  None,
                                  ctypes.byref(startupinfo),
                                  ctypes.byref(process_information)):
            print("[*] We have successfully lanuched the process!")
            print("[*] PID: %d" % process_information.dwProcessId)

            # 保存上面创建进程的句柄,以供后续进程访问使用
            self.h_process = self.open_process(process_information.dwProcessId)

        else:
            print("[*] Error: 0x%08x." % kernel32.GetLastError())

    # 打开 pid 进程,获得该进程的所有权限
    # 返回该进程句柄
    def open_process(self, pid):
        return kernel32.OpenProcess(my_debugger_defines.PROCESS_ALL_ACCESS, False, pid)
    
    def attach(self, pid):
        self.h_process = self.open_process(pid)
        # DebugActiveProcess 附加进程
        if kernel32.DebugActiveProcess(pid):
            self.debugger_active = True
            self.pid = int(pid)
            # self.run()
        else:
            print("[*] Unable to attach to the process.")
    
    # 循环调用 WaitForDebugEvent ,等待调试事件发生
    def run(self):
        while self.debugger_active == True:
            self.get_debug_event()

    def get_debug_event(self):
        debug_event = my_debugger_defines.DEBUG_EVENT()
        continue_status = my_debugger_defines.DBG_CONTINUE
        
        # INFINITE:直到有调试事件发生
        # 一旦有调试事件,WaitForDebugEvent会更新debug_event结构体
        if kernel32.WaitForDebugEvent(ctypes.byref(debug_event), my_debugger_defines.INFINITE):
        # 处理指定异常事件
            # 获取线程句柄
            self.h_thread = self.open_thread(debug_event.dwThreadId)
            self.context = self.get_thread_context(h_thread=self.h_thread)
            # 打印调试事件类型和线程ID
            print("Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId))

            # Event Code = 0x1
            if debug_event.dwDebugEventCode == my_debugger_defines.EXCEPTION_DEBUG_EVENT:
                # 获取异常代码
                exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
                print("0x%08x" % exception)
                # 异常地址
                self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
                # 内存违法访问
                if exception == my_debugger_defines.EXCEPTION_ACCESS_VIOLATION:
                    print("Acess Violation Detected.")
                # 遇到一个断点
                elif exception == my_debugger_defines.EXCEPTION_BREAKPOINT:
                    continue_status = self.exception_handler_breakpoint()
                # 内存断点
                elif exception == my_debugger_defines.EXCEPTION_GUARD_PAGE:
                    print("Guard Page Access Detected.")
                # 硬件断点
                elif exception == my_debugger_defines.EXCEPTION_SINGLE_STEP:
                    print("Single Stepping.")
                    self.exception_handler_single_step()

            # input("press a key to continue...")
            # self.debugger_active = False

            # 调试事件处理完成后,恢复原执行状态
            # continue_status -> DBG_CONTINUE:下一步继续执行
            kernel32.ContinueDebugEvent(
                debug_event.dwProcessId,
                debug_event.dwThreadId,
                continue_status
            )
    def exception_handler_breakpoint(self):
        print("[*] Exception address: 0x%08x" % self.exception_address)
        # 检查断点是否为我们设置的断点
        if not (self.exception_address in self.breakpoints):           
                # 如果它是第一个Windows驱动的断点
                # 就继续执行
                if self.first_breakpoint == True:
                   self.first_breakpoint = False
                   print("[*] Hit the first breakpoint.")
                   return my_debugger_defines.DBG_CONTINUE
               
        else:
            print("[*] Hit user defined breakpoint.")
            # 处理设置的断点
            # 先还原原始数据(没设断点之前)
            self.write_process_memory(self.exception_address, self.breakpoints[self.exception_address])

            # 获取新的context
            # 将EIP重置回原始字节处            
            self.context = self.get_thread_context(h_thread=self.h_thread)
            if self.context:
                self.context.Rip -= 1
                # 用新的RIP值设置线程的上下文记录
                kernel32.SetThreadContext(self.h_thread,ctypes.byref(self.context))
                self.debugger_active = True
            else:
                self.debugger_active = False
                
            continue_status = my_debugger_defines.DBG_CONTINUE

        return continue_status
    
    def exception_handler_single_step(self):
        if self.context.Dr6 & 0x1 and (0 in self.hardware_breakpoints):
            slot = 0
        elif self.context.Dr6 & 0x2 and (1 in self.hardware_breakpoints):
            slot = 1
        elif self.context.Dr6 & 0x4 and (2 in self.hardware_breakpoints):
            slot = 2
        elif self.context.Dr6 & 0x8 and (3 in self.hardware_breakpoints):
            slot = 3
        else:
            continue_status = my_debugger_defines.DBG_EXCEPTION_NOT_HANDLED

        # 从断点字典中删除断点
        if self.bp_del_hw(slot):
            continue_status = my_debugger_defines.DBG_CONTINUE

        print("[*] Hardware breakpoint removed.")
        return continue_status
    
    def bp_del_hw(self, slot):
        for thread_id in self.enumerate_threads():

            context = self.get_thread_context(thread_id=thread_id)
            
            # 给所有线程删除断点
            context.Dr7 &= ~(1 << (slot * 2))

            # 将断点地址清零
            if   slot == 0: 
                context.Dr0 = 0x0000000000000000
            elif slot == 1: 
                context.Dr1 = 0x0000000000000000
            elif slot == 2: 
                context.Dr2 = 0x0000000000000000
            elif slot == 3: 
                context.Dr3 = 0x0000000000000000

            # 移除Dr7中的触发断点标志位
            context.Dr7 &= ~(3 << ((slot * 4) + 16))

            # 移除断点长度标志位
            context.Dr7 &= ~(3 << ((slot * 4) + 18))

            # 提交移除断点后的线程context
            h_thread = self.open_thread(thread_id)
            kernel32.SetThreadContext(h_thread, ctypes.byref(context))
            
        # 把该断点从字典中移除
        del self.hardware_breakpoints[slot]

        return True


    # DebugActiveProcessStop:去除附加
    def detach(self):
        if kernel32.DebugActiveProcessStop(self.pid):
            print("[*] Finished debugging. Exiting...")
            return True
        else:
            print("There was an error")
            return False
    
    # 通过线程ID打开线程,获取线程句柄
    def open_thread(self, thread_id):
        h_thread = kernel32.OpenThread(my_debugger_defines.THREAD_ALL_ACCESS, None, thread_id)
        if h_thread is not None:
            return h_thread
        else:
            print("[*] Could not obtain a valid thread handle.")
            return False
    
    # 获取属于pid进程的线程的ID
    def enumerate_threads(self):
        # THREADENTRY32:描述拍摄快照时系统中执行的线程的信息
        thread_entry = my_debugger_defines.THREADENTRY32()
        thread_list = []
        # CreateToolhelp32Snapshot返回一个快照句柄
        # TH32CS_SNAPTHREAD: 枚举快照中系统的所有线程
        # pid只有TH32CS_SNAPHEAPLIST, TH32CS_SNAPMODULE, TH32CS_SNAPMODULE32, or TH32CS_SNAPALL才有意义,所以列举的线程不一定属于这个pid,
        # 需要一一对比
        snapshot = kernel32.CreateToolhelp32Snapshot(my_debugger_defines.TH32CS_SNAPTHREAD, self.pid)
        if snapshot is not None:
            thread_entry.dwSize = ctypes.sizeof(thread_entry)
            # 遍历第一个线程,线程信息写到 THREADENTRY32 这个结构体中
            success = kernel32.Thread32First(snapshot, ctypes.byref(thread_entry))
            while success:
                # 对比该线程的拥有者是不是 pid
                if thread_entry.th32OwnerProcessID == self.pid:
                    # 把该进程的线程ID放到列表
                    thread_list.append(thread_entry.th32ThreadID)
                # 遍历后面所有线程
                success = kernel32.Thread32Next(snapshot, ctypes.byref(thread_entry))
            kernel32.CloseHandle(snapshot)
            return thread_list
        else:
            return False
    
    # 通过线程ID,获取线程上下文context
    def get_thread_context(self, thread_id=None, h_thread=None):
        # 线程上下文结构体为 CONTEXT
        context = my_debugger_defines.CONTEXT()
        context.ContextFlags = my_debugger_defines.CONTEXT_FULL | my_debugger_defines.CONTEXT_DEBUG_REGISTERS

        # 通过线程ID,获取线程句柄
        if h_thread is None:
            self.h_thread = self.open_thread(thread_id)
        # 通过线程句柄,线程上下文
        kernel32.GetThreadContext.argtypes = [ctypes.c_void_p, my_debugger_defines.LPCONTEXT]
        kernel32.GetThreadContext.restype = ctypes.c_bool
        if kernel32.GetThreadContext(self.h_thread, ctypes.byref(context)):
            kernel32.CloseHandle(h_thread)
            return context
        else:
            # return my_debugger_defines.CONTEXT()
            return False
        
    # 读取address处的内容length个字节
    # 返回读取到的内容
    def read_process_memory(self, address, length):
        data = b""
        read_buf = ctypes.create_string_buffer(length)
        count = ctypes.c_ulong(0)
        kernel32.ReadProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
        kernel32.ReadProcessMemory.restype = ctypes.c_bool
        if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, ctypes.byref(count)):
            return False
        else:
            data += read_buf.raw
            return data
    
    # 把data数据写入到address处
    def write_process_memory(self, address, data):
        count = ctypes.c_ulong(0)
        length = len(data)
        c_data = ctypes.c_char_p(data[count.value:])
        kernel32.WriteProcessMemory.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
        kernel32.WriteProcessMemory.restype = ctypes.c_bool
        if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, ctypes.byref(count)):
            return False
        else:
            return True
    
    def bp_set(self, address):
        print("[*] Setting breakpoint at: 0x%08x" % address)
        if not (address in self.breakpoints):
            # 设置断点前,先保存之前的数据
            old_protect = ctypes.c_ulong(0)
            kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, my_debugger_defines.DWORD, ctypes.POINTER(ctypes.c_ulong)]
            kernel32.VirtualProtectEx.restype = ctypes.c_bool
            kernel32.VirtualProtectEx(self.h_process, address, 1, my_debugger_defines.PAGE_EXECUTE_READWRITE, ctypes.byref(old_protect))
            
            # 读取原始1个字节
            original_byte = self.read_process_memory(address, 1)
            if original_byte != False:
                # 写入 INT3 opcode
                if self.write_process_memory(address, b"\xCC"):
                    # 在字典中注册断点
                    self.breakpoints[address] = original_byte
                    return True
                else:
                    return False
    # 硬件断点
    def bp_set_hw(self, address, length, condition):
        # 检测硬件断点长度是否有效
        if length not in (1, 2, 4):
            return False
        else:
            length -= 1

        # 检测硬件断点的触发调试是否有效
        if condition not in (my_debugger_defines.HW_ACCESS, my_debugger_defines.HW_EXECUTE, my_debugger_defines.HW_WRITE):
            return False
        # 检测是否存在空置的调试器槽
        if not (0 in self.hardware_breakpoints):
            available = 0
        elif not (1 in self.hardware_breakpoints):
            available = 1
        elif not (2 in self.hardware_breakpoints):
            available = 2
        elif not (3 in self.hardware_breakpoints):
            available = 3
        else:
            return False
        
        # 在每个线程环境下设置调试寄存器
        for thread_id in self.enumerate_threads():
            context = self.get_thread_context(thread_id=thread_id)
            # 设置DR7中对应的标志位,激活硬件断点
            context.Dr7 |= 1 << (available * 2)
            # 在空闲的DR0 ~ 3寄存器中写入断点地址
            if available == 0:
                context.Dr0 = address
                # print("Dr0 0x%08x" % context.Dr0)
            elif available == 1:
                context.Dr1 = address
                # print("Dr1 0x%08x" % context.Dr1)
            elif available == 2:
                context.Dr2 = address
                # print("Dr2 0x%08x" % context.Dr2)
            elif available == 3:
                context.Dr3 = address
                # print("Dr3 0x%08x" % context.Dr3)
            # 设置触发条件
            context.Dr7 |= condition << ((available * 4) + 16)

            # 设置触发长度
            context.Dr7 |= length << ((available * 4) + 18)

            # 提交改动后的context
            h_thread = self.open_thread(thread_id)
            kernel32.SetThreadContext(h_thread, ctypes.byref(context))

        # 更新内部硬件断点列表
        self.hardware_breakpoints[available] = (address, length, condition)
        return True
    
    def bp_set_mem(self, address, size):

        # 调用VirtualQueryEx, 返回一个MEMORY_BASIC_INFORMATION结构体
        # 从而获取内存页的起始地址(MEMORY_BASIC_INFORMATION.BaseAddress)
        mbi = my_debugger_defines.MEMORY_BASIC_INFORMATION()

        kernel32.VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, my_debugger_defines.PMEMORY_BASIC_INFORMATION, ctypes.c_size_t]
        kernel32.VirtualQueryEx.restype = ctypes.c_size_t
        # print(address)
        length = kernel32.VirtualQueryEx(self.h_process, address, ctypes.byref(mbi), ctypes.sizeof(mbi))
        print("[*] Error with error code %d." % kernel32.GetLastError())
        length_mbi = ctypes.sizeof(mbi)
        if length < length_mbi:
            return False        
        
        current_page = mbi.BaseAddress # 当前页的起始地址

        # 把内存断点涉及的内存页都设置成保护页
        while current_page <= address + size:
            self.guarded_pages.append(current_page)
            old_protection = ctypes.c_ulong(0)
            # 设置内存页的访问权限
            kernel32.VirtualProtectEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong)]
            kernel32.VirtualProtectEx.restype = ctypes.c_bool
            if not kernel32.VirtualProtectEx(self.h_process, current_page, size,
                                             mbi.Protect | my_debugger_defines.PAGE_GUARD, ctypes.byref(old_protection)):
                return False
            
            current_page += self.page_size
        
        # 将该内存断点记录在全局字典中
        self.memory_breakpoints[address] = (address, size, mbi)
        return True



    # 解析模块中函数的地址
    def func_resolve(self, dll, function):

        # HMODULE GetModuleHandleW(
        #     [in, optional] LPCWSTR lpModuleName
        # );
        kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p]
        kernel32.GetModuleHandleW.restype = ctypes.c_void_p        
        handle = kernel32.GetModuleHandleW(dll)

        # FARPROC GetProcAddress(
        #     [in] HMODULE hModule,
        #     [in] LPCSTR  lpProcName
        # );
        kernel32.GetProcAddress.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
        kernel32.GetProcAddress.restype = ctypes.c_void_p
        address = kernel32.GetProcAddress(handle, function)

        kernel32.CloseHandle.argtypes = [ctypes.c_void_p]
        kernel32.CloseHandle.restype = ctypes.c_bool
        kernel32.CloseHandle(handle)
        return address
  • printf_loop.py
import ctypes
import time

msvcrt = ctypes.cdll.msvcrt

counter = 0

while True:
    msvcrt.printf(b"Loop iteration %d!\n", counter)
    time.sleep(2)
    counter += 1
  • my_test.py
import my_debugger
from my_debugger_defines import *

debugger = my_debugger.debugger()

pid = input("Enter the PID of the process to attach to:")

debugger.attach(int(pid))

printf_address = debugger.func_resolve("msvcrt.dll", b"printf")

print("[*] Address of printf: 0x%08x" % printf_address)

print(debugger.bp_set_mem(printf_address, 10))

debugger.run()

debugger.detach()

完结完结!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/794510.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微软5年敏捷转型策略:成功的16个关键

许多管理者怀疑规模化敏捷组织是否可行。微软成功地实现了为期五年的大规模敏捷转型表明&#xff0c;答案是肯定的。微软已不是一艘巨型军舰&#xff0c;而更像是同步行进的快艇组成的舰队&#xff1a;数百个团队中以协调的方式进行敏捷和Scrum。依赖关系如何处理&#xff1f;团…

常用自动化测试工具有哪些?

1、Appium AppUI自动化测试 Appium 是一个移动端自动化测试开源工具&#xff0c;支持iOS 和Android 平台&#xff0c;支持Python、Java 等语言&#xff0c;即同一套Java 或Python 脚本可以同时运行在iOS 和Android平台&#xff0c;Appium 是一个C/S 架构&#xff0c;核心是一…

Java Swing(C/S模式)特效雨滴系统界面

调节不同参数&#xff0c;生成不同特效: ------------------界面截图--------------------- package org.jd.data.netty.big.window.chat.frame.ui.controller.center; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*;/*** 设计模式: 单例模式* * 自定义线…

Docker 数据管理及网络通信 Dockerfile

一、Docker 的数据管理 管理 Docker 容器中数据主要有两种方式&#xff1a;数据卷&#xff08;Data Volumes&#xff09;和数据卷容器&#xff08;DataVolumes Containers&#xff09;。 1、数据卷 数据卷是一个供容器使用的特殊目录&#xff0c;位于容器中。可将宿主机的目…

华为OD机试真题 Java 实现【阿里巴巴找黄金宝箱(II)】【2023 B卷 100分】,附详细解题思路

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路1、题目关键点&#xff1a;2、大白话的意思就是3、比如4、思路这不就来了 五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于…

STM32单片机蓝牙APP语音识别智能记忆汽车按摩座椅加热通风儿童座椅

实践制作DIY- GC00160---智能记忆汽车按摩座椅 一、功能说明&#xff1a; 基于STM32单片机设计---智能记忆汽车按摩座椅 二、功能说明&#xff1a; 电路组成&#xff1a;STM32F103CXT6最小系统LD3322语音识别模块OLED显示3个ULN2003步进电机&#xff08;分别对应 前后距离、座…

小学期笔记——天天酷跑1

文件快照&#xff08;File snapshot&#xff09;通常是指对文件系统中某个特定时间点的文件或文件夹的快照或副本。它记录了文件或文件夹在某一时刻的状态&#xff0c;包括文件的内容、属性、权限、位置等信息。 文件快照通常用于数据备份、恢复和版本控制等目的。通过捕捉文件…

Stable-Diffusion-Webui部署SDXL0.9报错参数shape不匹配解决

问题 已经在model/stable-diffusion文件夹下放进去了sdxl0.9的safetensor文件&#xff0c;但是在切换model的时候&#xff0c;会报错model的shape不一致。 解决方法 git pullupdate一些web-ui项目就可以&#xff0c;因为当前项目太老了&#xff0c;没有使用最新的版本。

Windows10 任务栏图标的控制

文章目录 前言一、任务栏系统图标设置二、任务栏应用软件图标设置总结前言 在windows系统中,有一个常用功能,那就是在任务栏上图标化加载一些应用。为特殊目的,我们可以把一些常用软件启动后以图标形式摆放任务栏的右下角(例如QQ,微信)。不同的Window版本有不同的任务栏…

最受欢迎的12个Python开源框架,还没用过你就OUT了!!!

今天给大家带来了12个在GitHub等开源网站中最受欢迎的Python开源框架。如果你正在学习python&#xff0c;那么这12个开源框架&#xff0c;千万别错过&#xff0c;这些框架包括事件I/O&#xff0c;OLAP&#xff0c;Web开发&#xff0c;高性能网络通信&#xff0c;测试&#xff0…

【Redis】如何实现一个合格的分布式锁

文章目录 参考1、概述2、Redis粗糙实现3、遗留问题3.1、误删情况3.2、原子性保证3.3、超时自动解决3.4、总结 4、Redis实现优缺5、集群问题5.1、主从集群5.2、集群脑裂 6、RedLock7、Redisson7.1、简单实现7.2、看门狗机制 参考 Redisson实现Redis分布式锁的N种姿势 (qq.com)小…

第六章 复合查询

第六章 复合查询 一、前言二、笛卡尔积三、多表查询1、多表查询的理解2、笛卡尔积与多表拼接3、多表查询示例&#xff08;1&#xff09;显示雇员名、雇员工资以及所在部门的名字&#xff08;2&#xff09;显示部门号为10的部门名&#xff0c;员工名和工资&#xff08;3&#xf…

力扣热门100题之缺失的第一个正数【困难】

题目描述 给你一个未排序的整数数组 nums &#xff0c;请你找出其中没有出现的最小的正整数。 请你实现时间复杂度为 O(n) 并且只使用常数级别额外空间的解决方案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,0] 输出&#xff1a;3 示例 2&#xff1a; 输入&#xff1…

Linux学习之while循环和until循环

while while的格式如下&#xff1a; while 条件表达式 do指令集 done若是条件表达式为真&#xff0c;那么才能执行do和done之间的指令集。若是第一次都不符合条件&#xff0c;就不会执行指令集。每次循环都会判断条件表达式&#xff0c;只要不符合&#xff0c;就会退出循环。…

【前端学JAVA】java的基础语法

theme: cyanosis 作为一个前端程序员&#xff0c;其发展前途是远不及后端程序员的。因此&#xff0c;只有了解后端&#xff0c;才能让自己更加具备核心竞争力。本系列教程将以一个前端程序员的角度快速学习JAVA。 新建项目 开发JAVA程序&#xff0c;我们第一步是使用IDEA新建…

VAE-根据李宏毅视频总结的最通俗理解

1.VAE的直观理解 先简单了解一下自编码器&#xff0c;也就是常说的Auto-Encoder。Auto-Encoder包括一个编码器&#xff08;Encoder&#xff09;和一个解码器&#xff08;Decoder&#xff09;。其结构如下&#xff1a; 自编码器是一种先把输入数据压缩为某种编码, 后仅通过该编…

CMU 15-445 -- Timestamp Ordering Concurrency Control - 15

CMU 15-445 -- Timestamp Ordering Concurrency Control - 15 引言Basic T/OBasic T/O ReadsBasic T/O WritesBasic T/O - Example #1Basic T/O - Example #2 Basic T/O SummaryRecoverable Schedules Optimistic Concurrency Control (OCC)OCC - ExampleSERIAL VALIDATIONOCC …

Linux:ELK:日志分析系统(使用elasticsearch集群)

原理 1. 将日志进行集中化管理&#xff08;beats&#xff09; 2. 将日志格式化&#xff08;logstash&#xff09; 将其安装在那个上面就对那个进行监控 3. 对格式化后的数据进行索引和存储&#xff08;elasticsearch&#xff09; 4. 前端数据的展示&#xff08;kibana&…

【MySQL】基本查询(表的增删查改)

目录 一、插入操作 --- insert1.1 单行指定列插入&&单行全列插入1.2 多行指定列插入&&多行全列插入1.3 插入否则更新 duplicate key update1.4 删除并替换 replace 二、查询操作 --- select2.1 基本查询2.2 where条件2.3 案例演示2.4 排序&#xff08;order by…

HDFS异构存储详解

异构存储 HDFS异构存储类型什么是异构存储异构存储类型如何让HDFS知道集群中的数据存储目录是那种类型存储介质 块存储选择策略选择策略说明选择策略的命令 案例&#xff1a;冷热温数据异构存储对应步骤 HDFS内存存储策略支持-- LAZY PERSIST介绍执行使用 HDFS异构存储类型 冷…