Trong pentesting, port scanning là bước đầu tiên của reconnaissance. Thay vì dùng nmap mỗi lần, hãy hiểu bên trong nó hoạt động thế nào bằng cách tự build một công cụ với Python asyncio.
Tại sao asyncio?
Port scanning về bản chất là I/O-bound — phần lớn thời gian là chờ response từ network. Asyncio cho phép chạy hàng nghìn kết nối đồng thời trong một thread duy nhất, thay vì tạo thread/process riêng cho mỗi port.
Port Scanner cơ bản
import asyncio
import sys
from datetime import datetime
async def scan_port(host: str, port: int, timeout: float = 1.0) -> tuple[int, bool]:
"""Thử kết nối TCP đến host:port"""
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
writer.close()
await writer.wait_closed()
return port, True
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return port, False
async def scan_host(host: str, ports: list[int], concurrency: int = 1000):
"""Scan nhiều port đồng thời với semaphore giới hạn concurrency"""
semaphore = asyncio.Semaphore(concurrency)
open_ports = []
async def bounded_scan(port):
async with semaphore:
return await scan_port(host, port)
print(f"[*] Scanning {host} — {len(ports)} ports...")
start = datetime.now()
tasks = [bounded_scan(p) for p in ports]
results = await asyncio.gather(*tasks)
for port, is_open in results:
if is_open:
open_ports.append(port)
print(f" [+] {port}/tcp OPEN")
elapsed = (datetime.now() - start).total_seconds()
print(f"\n[*] Done in {elapsed:.2f}s — {len(open_ports)} ports open")
return sorted(open_ports)
# Entry point
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
port_range = range(1, 10001) # top 10000 ports
asyncio.run(scan_host(target, list(port_range)))
Thêm banner grabbing
async def grab_banner(host: str, port: int, timeout: float = 2.0) -> str:
"""Đọc banner từ service đang chạy"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
# Gửi probe tùy port
if port == 80:
writer.write(b"HEAD / HTTP/1.0\r\n\r\n")
elif port == 21:
pass # FTP tự gửi banner
else:
writer.write(b"\r\n")
await writer.drain()
banner = await asyncio.wait_for(reader.read(1024), timeout=timeout)
writer.close()
await writer.wait_closed()
return banner.decode(errors='ignore').strip()[:100]
except Exception:
return ""
Benchmark
# Scan 65535 ports — so sánh
time python3 scanner.py 192.168.1.1
# real: 12.3s
time nmap -T4 -p- 192.168.1.1
# real: 58.7s
# asyncio scanner nhanh hơn ~5x với concurrency=2000
Chạy thử
$ python3 scanner.py scanme.nmap.org
[*] Scanning scanme.nmap.org — 10000 ports...
[+] 22/tcp OPEN
[+] 80/tcp OPEN
[+] 9929/tcp OPEN
[*] Done in 8.41s — 3 ports open
Lưu ý pháp lý: Chỉ scan các host bạn có quyền kiểm tra. Unauthorized port scanning có thể vi phạm pháp luật.