published on in Python

密码篇・并行密码猜解

返回教程主页

上篇 密码篇・密码猜解

在上一节中我们实现了简单的ZIP压缩包密码猜解。为了充分利用计算机的性能我们可以同时使用多个CPU核心参与破译,这一节中我们将实现多核心版本或者称为并行版本的密码猜解程序。

代码清单在末尾,我们先看一下运行效果,然后对程序的各部分进行介绍:

python3 zip_decrypt_parallel.py password.dict myzipfile.zip 4
password is admin

整体结构

import zipfile
from multiprocessing import Queue, Process

def load_passwords(filename:str):
	# 省略
	
def check_passwords(filename:str, passwords:str, queue:Queue):
	# 省略

def main(password_dict:str, zip_filename:str, process_num:int):
	# 省略

if __name__ == '__main__':
	# 省略

首先引入zipfile模块以及multiprocessing模块中的QueueProcess类:

  • zipfile用于处理ZIP文件
  • Queue用于处理进程中的数据传输
  • Process用于启动进程来运行函数

然后定义了三个函数:

  1. load_passwords函数用于加载密码字典;
  2. check_passwords函数用于猜解密码;
  3. main函数作为程序的主体函数;

最后通过判断该脚本是否为入口来运行程序。

程序入口

if __name__ == '__main__':
    import sys

    password_dict = sys.argv[1]
    zip_filename  = sys.argv[2]
    process_num   = int(sys.argv[3])
    
    password = main(password_dict, zip_filename, process_num)
    if password:
        print('password is', password)
    else:
        print('no find password')

我们通过变量__name__判断当前脚本是否为程序入口:

  • 如果__name__ == '__main__'True则是入口;
  • 如果__name__ == '__main__'False则不是;

引入sys模块,使用sys.argv来获取参数:

  1. sys.argv[0]脚本文件名;
  2. sys.argv[1]第一个参数,用于指定字典文件;
  3. sys.argv[2]第二个参数,用于指定ZIP文件;
  4. sys.argv[3]第三个参数,用于设置进程数量;

获得参数后我们将其传入主体函数main中进行调用,然后根据main函数的返回结果password来打印输出。

载入字典并启动进程

def main(password_dict:str, zip_filename:str, process_num:int):
    passwords  = load_passwords(password_dict)
    queue      = Queue(maxsize=process_num)
    batch_size = len(passwords) // process_num
    processes  = []
    for i in range(process_num):
        limit_l = i * batch_size
        limit_r = limit_l + batch_size
        limit_r = limit_r if i < process_num - 1 else None
        batches = passwords[limit_l:limit_r]
        process = Process(
            target=check_passwords,
            args=(zip_filename, batches, queue))
        process.start()
        processes.append(process)

	# 省略

进入main函数后,我们使用load_passwords载入字典文件;然后创建一个队列queue「队列的最大长度可以设置为进程数量process_num」;将密码均分到每一个进程,得到单个进程应该负担的密码数量为batch_size

使用一个for启动进程来猜解密码。将passwords进行切片得到batches。创建Process对象并使用start方法启动进程。

密码猜解函数

def check_passwords(filename:str, passwords:str, queue:Queue):
    with zipfile.ZipFile(filename) as zf:
        for password in passwords:
            try:
                zf.extractall(pwd=password.encode())
            except RuntimeError:
                continue
            queue.put(password)
            return
    queue.put('')

逐一验证密码是否正确,如果正确则在队列中放入密码queue.put(password)并退出函数;如果最终没有找到密码则在队列中放入空字符串queue.put('')并结束函数。

从队列中获取消息

def main(password_dict:str, zip_filename:str, process_num:int):
	# 省略

    password = ''
    count = 0
    while count < process_num:
        password = queue.get()
        count += 1
        if password:
            break
    for process in processes:
        if process.is_alive():
            process.terminate()
    return password

使用队列的queue.get方法可以取出队列中的消息,且当队列中没有消息时,该方法将持续等待,直到有新消息被放入队列。调用queue.get的次数不能大于process_num否则如果没有在字典中发现正确的密码程序将不会退出。

最后我们需要使用process.terminate方法结束仍然在运行的进程。

代码清单:

import zipfile
from multiprocessing import Queue, Process


def load_passwords(filename:str):
    with open(filename) as f:
        passwords = f.read().splitlines()
    return passwords


def check_passwords(filename:str, passwords:str, queue:Queue):
    with zipfile.ZipFile(filename) as zf:
        for password in passwords:
            try:
                zf.extractall(pwd=password.encode())
            except RuntimeError:
                continue
            queue.put(password)
            return
    queue.put('')


def main(password_dict:str, zip_filename:str, process_num:int):
    passwords  = load_passwords(password_dict)
    queue      = Queue(maxsize=process_num)
    batch_size = len(passwords) // process_num
    processes  = []
    for i in range(process_num):
        limit_l = i * batch_size
        limit_r = limit_l + batch_size
        limit_r = limit_r if i < process_num - 1 else None
        batches = passwords[limit_l:limit_r]
        process = Process(
            target=check_passwords,
            args=(zip_filename, batches, queue))
        process.start()
        processes.append(process)

    password = ''
    count = 0
    while count < process_num:
        password = queue.get()
        count += 1
        if password:
            break
    for process in processes:
        if process.is_alive():
            process.terminate()
    return password


if __name__ == '__main__':
    import sys

    password_dict = sys.argv[1]
    zip_filename  = sys.argv[2]
    process_num   = int(sys.argv[3])
    
    password = main(password_dict, zip_filename, process_num)
    if password:
        print('password is', password)
    else:
        print('no find password')

下篇 密码篇・PyCryptodome示例「扩展」