Python 调用 Ghidra 实现反汇编

在逆向工程、二进制安全分析与漏洞挖掘领域,反汇编是拆解二进制程序、还原其底层执行逻辑的核心技术手段。Ghidra是由 美国国家安全局(NSA) 开源的跨平台、全功能逆向工程套件,也是当前逆向分析、二进制反汇编领域最主流、最强大的工具之一,完全免费且开源可定制。

它的核心能力覆盖二进制程序全生命周期分析:支持 x86、ARM、MIPS 等几乎所有主流处理器架构的反汇编、反编译(可直接将机器码还原为类 C 伪代码)、程序流程分析、数据结构识别、漏洞定位等核心功能,同时提供可视化的交互界面,兼顾手动逆向分析与脚本化批量处理需求。其唯一的「小短板」在于:原生交互以 GUI 操作为主,高阶批量分析、定制化反汇编逻辑需要依托脚本能力。因此官方先后提供了pyhidrapyghidra两个包支持。目前pyhidra已经停止更新,下载时可以有效考虑使用pyghidra

安装 Ghidra 和使用其 Python API

安装 Ghidra

Ghidra的安装非常容易,首先需要到ghidra仓库下载,这里我在ubuntu22.04环境下,下载 12.0 版本工具,对应要下载 Java21,根据自己的需要可以下载对应的 Java 即可

sudo apt update
sudo apt install -y openjdk-21-jdk openjdk-21-jre

然后把你下载的 Ghidra 压缩包解压到希望的地方

# 解压
unzip ghidra_12.0_PUBLIC_20251205.zip

# 重命名为简单的目录名(可选)
mv ghidra_12.0.0_PUBLIC ghidra

设置环境变量

# 添加Ghidra到PATH
echo 'export GHIDRA_INSTALL_DIR=~/tools/ghidra' >> ~/.bashrc
echo 'export PATH=$GHIDRA_INSTALL_DIR:$PATH' >> ~/.bashrc

source ~/.bashrc

测试 Ghidra GUI 启动(可选)

cd ~/tools/ghidra
./ghidraRun

# 或者如果设置了环境变量
ghidraRun

安装 PyGhidra

然后是PyGhidra,有两种安装方法,第一种是直接使用在线下载安装

pip install pyghidra

如果担心兼容性之类的问题,可以用 git 下载的 Ghidra 仓库中的 pypkg

python3 -m pip install --no-index -f <GhidraInstallDir>/Ghidra/Features/PyGhidra/pypkg/dist pyghidra

简单的 PyGhidra 调用例程

1. 初始化 PyGhidra 环境

第一步是启动 PyGhidra 的 JVM 环境。PyGhidra 会自动定位 Ghidra 安装目录,或者你可以手动指定:

# 检查 pyghidra 是否已启动
if not pyghidra.started():
    print(f"Starting pyghidra with install_dir={ghidra_path}")
    pyghidra.start(install_dir=ghidra_path)
else:
    print("pyghidra already started")

关键点:

  • pyghidra.started(): 检查 JVM 是否已经启动,避免重复初始化
  • pyghidra.start(): 启动 Ghidra 的 JVM 环境,可选参数 install_dir 指定 Ghidra 安装路径
  • 如果没有设置 install_dir,PyGhidra 会自动查找 GHIDRA_INSTALL_DIR 环境变量

2. 创建 Ghidra 项目

Ghidra 使用项目(Project)来组织分析结果。我们创建一个临时项目用于演示:

import tempfile

# 创建临时项目目录
project_dir = tempfile.mkdtemp(prefix='ghidra_demo_')
project_name = 'demo_project'

print(f"Creating project at: {project_dir}/{project_name}")

# 打开或创建项目
project = pyghidra.open_project(project_dir, project_name, create=True)

3. 导入二进制文件

将二进制文件导入到 Ghidra 项目中:

# 获取二进制文件名
program_name = os.path.basename(binary_path)

print(f"Importing binary: {binary_path}")

# 创建加载器并配置
loader = pyghidra.program_loader()
loader = loader.project(project).source(binary_path).name(program_name)

# 加载二进制文件
with loader.load() as load_results:
    load_results.save(pyghidra.task_monitor())
    loaded_program = load_results.getPrimary()
    if loaded_program:
        program_path = f"/{loaded_program.getName()}"
        print(f"Program saved to: {program_path}")

4. 获取程序并执行自动分析

从项目中获取程序对象,并执行 Ghidra 的自动分析:

# 从项目中获取程序
program, consumer = pyghidra.consume_program(project, program_path)

try:
    # 检查是否需要分析
    from ghidra.program.util import GhidraProgramUtilities
    
    if GhidraProgramUtilities.shouldAskToAnalyze(program):
        print("Analyzing program...")
        analysis_log = pyghidra.analyze(program, pyghidra.task_monitor())
        program.save("Analyzed", pyghidra.task_monitor())
        print("Analysis complete")
    else:
        print("Program already analyzed")

5. 提取程序架构信息

获取二进制文件的架构信息:

# 获取架构信息
lang = program.getLanguage()
processor = lang.getProcessor().toString()

print(f"\nArchitecture: {processor}")
print(f"Language ID: {lang.getLanguageID()}")

6. 遍历函数

获取程序中的所有函数:

# 获取函数管理器
func_manager = program.getFunctionManager()
functions = list(func_manager.getFunctions(True))

print(f"\nFound {len(functions)} functions")

# 遍历函数
for function in functions:
    func_name = function.getName()
    entry_point = function.getEntryPoint()
    
    print(f"\n--- Function: {func_name} ---")
    print(f"Entry point: {entry_point}")
    print(f"Is thunk: {function.isThunk()}")

7. 分析基本块和控制流

这是示例代码的核心部分,展示如何提取函数的控制流图:

from ghidra.program.model.block import BasicBlockModel

# 创建基本块模型
bb_model = BasicBlockModel(program)
body = function.getBody()
monitor = pyghidra.task_monitor()

# 获取函数体内的所有基本块
blocks = list(bb_model.getCodeBlocksContaining(body, monitor))
print(f"Basic blocks: {len(blocks)}")

# 遍历每个基本块
for bb_idx, block in enumerate(blocks[:5]):  # 只显示前5个块
    print(f"\n  Block {bb_idx + 1}:")
    print(f"    Start: {block.getMinAddress()}")
    print(f"    End: {block.getMaxAddress()}")

8. 提取指令

遍历基本块中的指令:

# 获取程序列表(包含指令信息)
listing = program.getListing()
addr = block.getMinAddress()
ins_count = 0
ins_list = []

# 遍历基本块中的指令
while addr and addr <= block.getMaxAddress():
    ins = listing.getInstructionAt(addr)
    if ins is None:
        break
    
    # 构建指令字符串
    if ins_count < 5:  # 只显示前5条指令
        op_rep = ""
        if ins.getNumOperands() > 0:
            op_rep = ins.getDefaultOperandRepresentation(0)
            if ins.getNumOperands() > 1:
                op_rep += ", " + ins.getDefaultOperandRepresentation(1)
        ins_list.append(f"{ins.getMnemonicString()} {op_rep}")
    
    ins_count += 1
    addr = addr.add(ins.getLength())

print(f"    Instructions ({ins_count} total):")
for ins_str in ins_list:
    print(f"      {ins_str}")

9. 分析控制流边

获取基本块的后继和前驱:

# 获取后继块(出边)
try:
    destinations = block.getDestinations(monitor)
    dest_list = []
    while destinations.hasNext():
        dest = destinations.next()
        dest_list.append(dest)
    
    print(f"    Successors: {len(dest_list)}")
    for dest in dest_list[:3]:
        print(f"      -> {dest.getDestinationAddress()}")
except Exception as e:
    print(f"    Successors: Error - {e}")

# 获取前驱块(入边)
try:
    sources = block.getSources(monitor)
    src_list = []
    while sources.hasNext():
        src = sources.next()
        src_list.append(src)
    
    if len(src_list) > 0:
        print(f"    Sources: {len(src_list)}")
        for src in src_list[:2]:
            print(f"      <- {src.getSourceAddress()}")
except Exception as e:
    print(f"    Sources: Error - {e}")

完整代码:

#!/usr/bin/env python3
"""
Demo script to test pyghidra disassembly and observe the structure of results.
"""
import os
import sys
import pyghidra
from pathlib import Path

# Add lib to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'lib'))

def test_ghidra_disassembly(binary_path, ghidra_path=None):
    """Test Ghidra disassembly on a single binary file"""
    
    print(f"Testing Ghidra disassembly on: {binary_path}")
    
    # Start pyghidra
    if not pyghidra.started():
        print(f"Starting pyghidra with install_dir={ghidra_path}")
        pyghidra.start(install_dir=ghidra_path)
    else:
        print("pyghidra already started")
    
    # Create a temporary project
    import tempfile
    project_dir = tempfile.mkdtemp(prefix='ghidra_demo_')
    project_name = 'demo_project'
    
    print(f"Creating project at: {project_dir}/{project_name}")
    
    # Open/create project
    project = pyghidra.open_project(project_dir, project_name, create=True)
    
    try:
        # Get program name
        program_name = os.path.basename(binary_path)
        program_path = f"/{program_name}"
        
        print(f"Importing binary: {binary_path}")
        
        # Import binary
        loader = pyghidra.program_loader()
        loader = loader.project(project).source(binary_path).name(program_name)
        
        with loader.load() as load_results:
            load_results.save(pyghidra.task_monitor())
            # Get the program from load results
            loaded_program = load_results.getPrimary()
            if loaded_program:
                program_path = f"/{loaded_program.getName()}"
                print(f"Program saved to: {program_path}")
            else:
                # Fallback: use the program name we specified
                program_path = f"/{program_name}"
                print(f"Program saved to: {program_path} (using specified name)")
        
        # Get the program
        program, consumer = pyghidra.consume_program(project, program_path)
        
        try:
            # Analyze if needed
            try:
                from ghidra.program.util import GhidraProgramUtilities
                if GhidraProgramUtilities.shouldAskToAnalyze(program):
                    print("Analyzing program...")
                    analysis_log = pyghidra.analyze(program, pyghidra.task_monitor())
                    program.save("Analyzed", pyghidra.task_monitor())
                    print("Analysis complete")
                else:
                    print("Program already analyzed")
            except Exception as e:
                print(f"Analysis check failed, attempting analysis: {e}")
                analysis_log = pyghidra.analyze(program, pyghidra.task_monitor())
                program.save("Analyzed", pyghidra.task_monitor())
            
            # Get architecture
            lang = program.getLanguage()
            processor = lang.getProcessor().toString()
            print(f"\nArchitecture: {processor}")
            print(f"Language ID: {lang.getLanguageID()}")
            
            # Get function manager
            func_manager = program.getFunctionManager()
            functions = list(func_manager.getFunctions(True))
            
            print(f"\nFound {len(functions)} functions")
            
            # Examine first few functions
            func_count = 0
            for function in functions:
                func_count += 1
                if func_count > 3:  # Only show first 3 functions
                    break
                
                func_name = function.getName()
                entry_point = function.getEntryPoint()
                
                print(f"\n--- Function {func_count}: {func_name} ---")
                print(f"Entry point: {entry_point}")
                print(f"Is thunk: {function.isThunk()}")
                
                # Get basic blocks using BasicBlockModel
                from ghidra.program.model.block import BasicBlockModel
                bb_model = BasicBlockModel(program)
                body = function.getBody()
                monitor = pyghidra.task_monitor()
                blocks = list(bb_model.getCodeBlocksContaining(body, monitor))
                print(f"Basic blocks: {len(blocks)}")
                
                # Examine multiple basic blocks (up to 5)
                for bb_idx, block in enumerate(blocks[:5]):
                    print(f"\n  Block {bb_idx + 1}:")
                    print(f"    Start: {block.getMinAddress()}")
                    print(f"    End: {block.getMaxAddress()}")
                    
                    # Get instructions in this block
                    listing = program.getListing()
                    addr = block.getMinAddress()
                    ins_count = 0
                    ins_list = []
                    while addr and addr <= block.getMaxAddress():
                        ins = listing.getInstructionAt(addr)
                        if ins is None:
                            break
                        if ins_count < 5:  # Show first 5 instructions
                            op_rep = ""
                            if ins.getNumOperands() > 0:
                                op_rep = ins.getDefaultOperandRepresentation(0)
                                if ins.getNumOperands() > 1:
                                    op_rep += ", " + ins.getDefaultOperandRepresentation(1)
                            ins_list.append(f"{ins.getMnemonicString()} {op_rep}")
                        ins_count += 1
                        addr = addr.add(ins.getLength())
                    print(f"    Instructions ({ins_count} total):")
                    for ins_str in ins_list:
                        print(f"      {ins_str}")
                    
                    # Get successors
                    try:
                        destinations = block.getDestinations(monitor)
                        dest_list = []
                        while destinations.hasNext():
                            dest = destinations.next()
                            dest_list.append(dest)
                        print(f"    Successors: {len(dest_list)}")
                        for dest in dest_list[:3]:  # Show first 3 successors
                            print(f"      -> {dest.getDestinationAddress()}")
                    except Exception as e:
                        print(f"    Successors: Error getting destinations - {e}")
                    
                    # Get sources (incoming edges)
                    try:
                        sources = block.getSources(monitor)
                        src_list = []
                        while sources.hasNext():
                            src = sources.next()
                            src_list.append(src)
                        if len(src_list) > 0:
                            print(f"    Sources: {len(src_list)}")
                            for src in src_list[:2]:  # Show first 2 sources
                                print(f"      <- {src.getSourceAddress()}")
                    except Exception as e:
                        print(f"    Sources: Error getting sources - {e}")
            
            print(f"\n... and {len(functions) - func_count} more functions")
            
        finally:
            program.release(consumer)
            
    finally:
        try:
            project.close()
            print(f"\nProject closed. Project directory: {project_dir}")
        except Exception as e:
            print(f"\nError closing project: {e}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python test_ghidra_demo.py <binary_path> [ghidra_path]")
        sys.exit(1)
    
    binary_path = sys.argv[1]
    ghidra_path = sys.argv[2] if len(sys.argv) > 2 else os.environ.get('GHIDRA_INSTALL_DIR')
    
    if not os.path.exists(binary_path):
        print(f"Error: Binary file not found: {binary_path}")
        sys.exit(1)
    
    test_ghidra_disassembly(binary_path, ghidra_path)

最后修改于 2025-12-30