images/blog-posts

密码篇・并行密码猜解

返回教程主页

上篇 密码篇・密码猜解

在上一节中我们实现了简单的ZIP压缩包密码猜解。为了充分利用计算机的性能我们可以同时使用多个CPU核心参与破译,这一节中我们将实现多核心版本或者称为并行版本的密码猜解程序。

代码清单在末尾,我们先看一下运行效果,然后对程序的各部分进行介绍:

python3 zip_decrypt_parallel.py password.dict myzipfile.zip 4
password is admin

整体结构

import zipfile
from multiprocessing import Queue, Process

def load_passwords(filename:str):
	# 省略
	
def check_passwords(filename:str, passwords:str, queue:Queue):
	# 省略

def main(password_dict:str, zip_filename:str, process_num:int):
	# 省略

if __name__ == '__main__':
	# 省略

首先引入zipfile模块以及multiprocessing模块中的QueueProcess类:

  • zipfile用于处理ZIP文件
  • Queue用于处理进程中的数据传输
  • Process用于启动进程来运行函数

然后定义了三个函数:

  1. load_passwords函数用于加载密码字典;
  2. check_passwords函数用于猜解密码;
  3. main函数作为程序的主体函数;

最后通过判断该脚本是否为入口来运行程序。

程序入口

if __name__ == '__main__':
    import sys

    password_dict = sys.argv[1]
    zip_filename  = sys.argv[2]
    process_num   = int(sys.argv[3])
    
    password = main(password_dict, zip_filename, process_num)
    if password:
        print('password is', password)
    else:
        print('no find password')

我们通过变量__name__判断当前脚本是否为程序入口:

  • 如果__name__ == '__main__'True则是入口;
  • 如果__name__ == '__main__'False则不是;

引入sys模块,使用sys.argv来获取参数:

  1. sys.argv[0]脚本文件名;
  2. sys.argv[1]第一个参数,用于指定字典文件;
  3. sys.argv[2]第二个参数,用于指定ZIP文件;
  4. sys.argv[3]第三个参数,用于设置进程数量;

获得参数后我们将其传入主体函数main中进行调用,然后根据main函数的返回结果password来打印输出。

载入字典并启动进程

def main(password_dict:str, zip_filename:str, process_num:int):
    passwords  = load_passwords(password_dict)
    queue      = Queue(maxsize=process_num)
    batch_size = len(passwords) // process_num
    processes  = []
    for i in range(process_num):
        limit_l = i * batch_size
        limit_r = limit_l + batch_size
        limit_r = limit_r if i < process_num - 1 else None
        batches = passwords[limit_l:limit_r]
        process = Process(
            target=check_passwords,
            args=(zip_filename, batches, queue))
        process.start()
        processes.append(process)

	# 省略

进入main函数后,我们使用load_passwords载入字典文件;然后创建一个队列queue「队列的最大长度可以设置为进程数量process_num」;将密码均分到每一个进程,得到单个进程应该负担的密码数量为batch_size

使用一个for启动进程来猜解密码。将passwords进行切片得到batches。创建Process对象并使用start方法启动进程。

密码猜解函数

def check_passwords(filename:str, passwords:str, queue:Queue):
    with zipfile.ZipFile(filename) as zf:
        for password in passwords:
            try:
                zf.extractall(pwd=password.encode())
            except RuntimeError:
                continue
            queue.put(password)
            return
    queue.put('')

逐一验证密码是否正确,如果正确则在队列中放入密码queue.put(password)并退出函数;如果最终没有找到密码则在队列中放入空字符串queue.put('')并结束函数。

从队列中获取消息

def main(password_dict:str, zip_filename:str, process_num:int):
	# 省略

    password = ''
    count = 0
    while count < process_num:
        password = queue.get()
        count += 1
        if password:
            break
    for process in processes:
        if process.is_alive():
            process.terminate()
    return password

使用队列的queue.get方法可以取出队列中的消息,且当队列中没有消息时,该方法将持续等待,直到有新消息被放入队列。调用queue.get的次数不能大于process_num否则如果没有在字典中发现正确的密码程序将不会退出。

最后我们需要使用process.terminate方法结束仍然在运行的进程。

代码清单:

import zipfile
from multiprocessing import Queue, Process


def load_passwords(filename:str):
    with open(filename) as f:
        passwords = f.read().splitlines()
    return passwords


def check_passwords(filename:str, passwords:str, queue:Queue):
    with zipfile.ZipFile(filename) as zf:
        for password in passwords:
            try:
                zf.extractall(pwd=password.encode())
            except RuntimeError:
                continue
            queue.put(password)
            return
    queue.put('')


def main(password_dict:str, zip_filename:str, process_num:int):
    passwords  = load_passwords(password_dict)
    queue      = Queue(maxsize=process_num)
    batch_size = len(passwords) // process_num
    processes  = []
    for i in range(process_num):
        limit_l = i * batch_size
        limit_r = limit_l + batch_size
        limit_r = limit_r if i < process_num - 1 else None
        batches = passwords[limit_l:limit_r]
        process = Process(
            target=check_passwords,
            args=(zip_filename, batches, queue))
        process.start()
        processes.append(process)

    password = ''
    count = 0
    while count < process_num:
        password = queue.get()
        count += 1
        if password:
            break
    for process in processes:
        if process.is_alive():
            process.terminate()
    return password


if __name__ == '__main__':
    import sys

    password_dict = sys.argv[1]
    zip_filename  = sys.argv[2]
    process_num   = int(sys.argv[3])
    
    password = main(password_dict, zip_filename, process_num)
    if password:
        print('password is', password)
    else:
        print('no find password')

下篇 密码篇・PyCryptodome示例「扩展」

SUBSCRIBE


🔒 No spam. Unsubscribe any time.

About kk

kk

Vincenzo Antedoro is an engineer who helps those who want to invest in renewables. For the rest he enjoys teaching with the method of learning by doing..

» More about kk