返回教程主页
上篇 网络篇・端口扫描
上一节我们实现了一个简单的端口扫描工具,接下来我们把它升级成为多线程版本。
批量扫描
虽然端口扫描属于IO密集型操作,但是由于我们的测试目标是本机,所以有必要对线程切换导致的效率损失问题进行处理。在这里我们使用批量分配任务到每一个线程的方法来进行处理。
我们首先定义一个批量处理函数:
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
def batch_test(host:str, ports:range):
for port in ports:
s = socket(AF_INET, SOCK_STREAM)
s.settimeout(10)
if not s.connect_ex((host, port)):
print(f'Opened Port: {port}')
分批并发配
将任务进行等分「近似」并分发给每一个线程:
def test_ports(host:str, ports:range, thread_num:int):
count = ports.stop - ports.start
batchsize = count // thread_num
works = []
for thread_id in range(thread_num):
start = ports.start + thread_id * batchsize
stop = start + batchsize
if thread_id == thread_num - 1:
stop = ports.stop
t = Thread(target=batch_test, args=(host, range(start, stop)))
t.start()
works.append(t)
for work in works:
work.join()
程序入口
根据用户输入设置函数参数并运行程序:
if __name__ == "__main__":
import sys
host = sys.argv[1]
start = int(sys.argv[2])
stop = int(sys.argv[3])
thread_num = int(sys.argv[4])
test_ports(host, range(start, stop), thread_num)
下篇 隐写术・操作环境准备