在上一节中我们实现了简单的ZIP压缩包密码猜解。为了充分利用计算机的性能我们可以同时使用多个CPU核心参与破译,这一节中我们将实现多核心版本或者称为并行版本的密码猜解程序。
代码清单在末尾,我们先看一下运行效果,然后对程序的各部分进行介绍:
python3 zip_decrypt_parallel.py password.dict myzipfile.zip 4
password is admin
整体结构
import zipfile
from multiprocessing import Queue, Process
def load_passwords(filename:str):
# 省略
def check_passwords(filename:str, passwords:str, queue:Queue):
# 省略
def main(password_dict:str, zip_filename:str, process_num:int):
# 省略
if __name__ == '__main__':
# 省略
首先引入zipfile
模块以及multiprocessing
模块中的Queue
与Process
类:
zipfile
用于处理ZIP文件Queue
用于处理进程中的数据传输Process
用于启动进程来运行函数
然后定义了三个函数:
load_passwords
函数用于加载密码字典;check_passwords
函数用于猜解密码;main
函数作为程序的主体函数;
最后通过判断该脚本是否为入口来运行程序。
程序入口
if __name__ == '__main__':
import sys
password_dict = sys.argv[1]
zip_filename = sys.argv[2]
process_num = int(sys.argv[3])
password = main(password_dict, zip_filename, process_num)
if password:
print('password is', password)
else:
print('no find password')
我们通过变量__name__
判断当前脚本是否为程序入口:
- 如果
__name__ == '__main__'
为True
则是入口; - 如果
__name__ == '__main__'
为False
则不是;
引入sys
模块,使用sys.argv
来获取参数:
sys.argv[0]
脚本文件名;sys.argv[1]
第一个参数,用于指定字典文件;sys.argv[2]
第二个参数,用于指定ZIP文件;sys.argv[3]
第三个参数,用于设置进程数量;
获得参数后我们将其传入主体函数main
中进行调用,然后根据main
函数的返回结果password
来打印输出。
载入字典并启动进程
def main(password_dict:str, zip_filename:str, process_num:int):
passwords = load_passwords(password_dict)
queue = Queue(maxsize=process_num)
batch_size = len(passwords) // process_num
processes = []
for i in range(process_num):
limit_l = i * batch_size
limit_r = limit_l + batch_size
limit_r = limit_r if i < process_num - 1 else None
batches = passwords[limit_l:limit_r]
process = Process(
target=check_passwords,
args=(zip_filename, batches, queue))
process.start()
processes.append(process)
# 省略
进入main
函数后,我们使用load_passwords
载入字典文件;然后创建一个队列queue
「队列的最大长度可以设置为进程数量process_num
」;将密码均分到每一个进程,得到单个进程应该负担的密码数量为batch_size
。
使用一个for
启动进程来猜解密码。将passwords
进行切片得到batches
。创建Process
对象并使用start
方法启动进程。
密码猜解函数
def check_passwords(filename:str, passwords:str, queue:Queue):
with zipfile.ZipFile(filename) as zf:
for password in passwords:
try:
zf.extractall(pwd=password.encode())
except RuntimeError:
continue
queue.put(password)
return
queue.put('')
逐一验证密码是否正确,如果正确则在队列中放入密码queue.put(password)
并退出函数;如果最终没有找到密码则在队列中放入空字符串queue.put('')
并结束函数。
从队列中获取消息
def main(password_dict:str, zip_filename:str, process_num:int):
# 省略
password = ''
count = 0
while count < process_num:
password = queue.get()
count += 1
if password:
break
for process in processes:
if process.is_alive():
process.terminate()
return password
使用队列的queue.get
方法可以取出队列中的消息,且当队列中没有消息时,该方法将持续等待,直到有新消息被放入队列。调用queue.get
的次数不能大于process_num
否则如果没有在字典中发现正确的密码程序将不会退出。
最后我们需要使用process.terminate
方法结束仍然在运行的进程。
代码清单:
import zipfile
from multiprocessing import Queue, Process
def load_passwords(filename:str):
with open(filename) as f:
passwords = f.read().splitlines()
return passwords
def check_passwords(filename:str, passwords:str, queue:Queue):
with zipfile.ZipFile(filename) as zf:
for password in passwords:
try:
zf.extractall(pwd=password.encode())
except RuntimeError:
continue
queue.put(password)
return
queue.put('')
def main(password_dict:str, zip_filename:str, process_num:int):
passwords = load_passwords(password_dict)
queue = Queue(maxsize=process_num)
batch_size = len(passwords) // process_num
processes = []
for i in range(process_num):
limit_l = i * batch_size
limit_r = limit_l + batch_size
limit_r = limit_r if i < process_num - 1 else None
batches = passwords[limit_l:limit_r]
process = Process(
target=check_passwords,
args=(zip_filename, batches, queue))
process.start()
processes.append(process)
password = ''
count = 0
while count < process_num:
password = queue.get()
count += 1
if password:
break
for process in processes:
if process.is_alive():
process.terminate()
return password
if __name__ == '__main__':
import sys
password_dict = sys.argv[1]
zip_filename = sys.argv[2]
process_num = int(sys.argv[3])
password = main(password_dict, zip_filename, process_num)
if password:
print('password is', password)
else:
print('no find password')