published on in Python

网络篇・并发端口扫描

返回教程主页

上篇 网络篇・端口扫描

上一节我们实现了一个简单的端口扫描工具,接下来我们把它升级成为多线程版本。

批量扫描

虽然端口扫描属于IO密集型操作,但是由于我们的测试目标是本机,所以有必要对线程切换导致的效率损失问题进行处理。在这里我们使用批量分配任务到每一个线程的方法来进行处理。

我们首先定义一个批量处理函数:

from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread


def batch_test(host:str, ports:range):
    for port in ports:
        s = socket(AF_INET, SOCK_STREAM)
        s.settimeout(10)
        if not s.connect_ex((host, port)):
            print(f'Opened Port: {port}')

分批并发配

将任务进行等分「近似」并分发给每一个线程:

def test_ports(host:str, ports:range, thread_num:int):
    count = ports.stop - ports.start
    batchsize = count // thread_num
    works = []
    for thread_id in range(thread_num):
        start = ports.start + thread_id * batchsize
        stop = start + batchsize
        if thread_id == thread_num - 1:
            stop = ports.stop
        t = Thread(target=batch_test, args=(host, range(start, stop)))
        t.start()
        works.append(t)
    for work in works:
        work.join()

程序入口

根据用户输入设置函数参数并运行程序:

if __name__ == "__main__":
    import sys

    host       = sys.argv[1]
    start      = int(sys.argv[2])
    stop       = int(sys.argv[3])
    thread_num = int(sys.argv[4])
    test_ports(host, range(start, stop), thread_num)

下篇 隐写术・操作环境准备