本ページは広告が含まれています。気になる広告をクリック頂けますと、サーバ運営費になります(^^
サーバがポートスキャンを受けていないかを確認する
サーバにおいて、自分自身がポートスキャンを受けていないかを検知するpythonコードです。
import os
import subprocess
import tempfile
import signal
import sys
import shutil
import re
# --- グローバル変数定義 ---
TEMP_PACKET_FILE = None
TEMP_PORT_LIST = None
TEMP_SCAN_COUNTS = None
# --- 設定値 ---
DURATION = 10
THRESHOLD = 20
TEMP_DIR = "/tmp"
TEMP_PREFIX = "tcpdump_scan_detect_"
LOCAL_IP = None
INTERFACE = None
# --- 関数定義 ---
def usage():
print(f"使用法: {sys.argv[0]} <インターフェース名>")
print(" <インターフェース名>: 監視するネットワークインターフェース (例: eth0, enp0s3)")
print("")
print(f"例: {sys.argv[0]} eth0")
sys.exit(1)
def error_exit(message):
print(f"エラー: {message}", file=sys.stderr)
sys.exit(1)
def cleanup(signum=None, frame=None):
print("スキャン検出を停止し、一時ファイルをクリーンアップしています...")
global TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS
files_to_clean = [TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS]
for temp_file in files_to_clean:
if temp_file and os.path.isfile(temp_file):
print(f"{temp_file} を削除しています。")
try:
os.remove(temp_file)
except OSError as e:
print(f"{temp_file} の削除中にエラーが発生しました: {e}", file=sys.stderr)
print("クリーンアップが完了しました。終了します。")
sys.exit(0)
def get_local_ip(interface_to_check):
try:
result = subprocess.run(
["ip", "addr", "show", "dev", interface_to_check],
capture_output=True,
text=True,
check=True
)
for line in result.stdout.splitlines():
if "inet " in line:
return line.split()[1].split('/')[0]
except subprocess.CalledProcessError as e:
print(f"{interface_to_check} のIPアドレス取得に失敗しました: {e}", file=sys.stderr)
if e.stderr:
print(f" ip addr show エラー: {e.stderr.strip()}", file=sys.stderr)
except FileNotFoundError:
error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")
return None
def capture_packets(interface_to_capture, duration_sec):
global TEMP_PACKET_FILE
temp_file_obj = tempfile.NamedTemporaryFile(
prefix=TEMP_PREFIX + "packets.pcap.",
dir=TEMP_DIR,
delete=False
)
TEMP_PACKET_FILE = os.path.abspath(temp_file_obj.name)
temp_file_obj.close()
command = [
"tcpdump", "-i", interface_to_capture, "-n",
"-G", str(duration_sec), "-W", "1", "-w", TEMP_PACKET_FILE,
f"(tcp[13] & 2) != 0 or ((tcp[13] & 16) != 0 and (tcp[13] & 2) != 0) and src host not {LOCAL_IP}"
]
print(f"{duration_sec}秒間、{os.path.basename(TEMP_PACKET_FILE)} へパケットをキャプチャしています...")
try:
subprocess.run(command, stderr=subprocess.PIPE, check=True)
# ファイルの所有者を root に変更
chown_result = subprocess.run(["chown", "root:root", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
if chown_result.stderr:
print(f"chown エラー: {chown_result.stderr.strip()}", file=sys.stderr)
# ファイルの権限をすべてのユーザーが読み取れるように変更
chmod_result = subprocess.run(["chmod", "a+r", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
if chmod_result.stderr:
print(f"chmod エラー: {chmod_result.stderr.strip()}", file=sys.stderr)
except subprocess.CalledProcessError as e:
print(f"tcpdumpコマンドが終了ステータス {e.returncode} で失敗しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
except FileNotFoundError:
error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
return TEMP_PACKET_FILE
def analyze_packets(packet_file_path):
global TEMP_PORT_LIST, TEMP_SCAN_COUNTS
temp_port_list_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "ports.txt.", dir=TEMP_DIR, delete=False)
TEMP_PORT_LIST = os.path.abspath(temp_port_list_obj.name)
temp_port_list_obj.close()
temp_scan_counts_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "counts.txt.", dir=TEMP_DIR, delete=False)
TEMP_SCAN_COUNTS = os.path.abspath(temp_scan_counts_obj.name)
temp_scan_counts_obj.close()
if not packet_file_path or not os.path.exists(packet_file_path) or os.path.getsize(packet_file_path) == 0:
print(f"警告: パケットファイル {os.path.basename(packet_file_path) if packet_file_path else 'None'} が空か見つかりません。解析をスキップします。")
open(TEMP_PORT_LIST, 'w').close()
open(TEMP_SCAN_COUNTS, 'w').close()
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
print(f"{os.path.basename(packet_file_path)} からパケットを解析しています...")
try:
cmd_read_filter = ["tcpdump", "-r", packet_file_path, "-n"]
p_tcpdump = subprocess.Popen(cmd_read_filter, stdout=subprocess.PIPE, text=True, stderr=subprocess.PIPE)
with open(TEMP_PORT_LIST, 'w') as f_ports:
for line in p_tcpdump.stdout:
print(f"解析中の行: {line.strip()}") # デバッグ出力
if ' Flags [S]' in line or ' Flags [S.' in line:
parts = line.split()
try:
ip_keyword_index = parts.index("IP")
src_full_address = parts[ip_keyword_index + 1]
dst_full_address_with_colon = parts[ip_keyword_index + 3]
src_ip_parts = src_full_address.split('.')
if len(src_ip_parts) == 5:
source_ip = ".".join(src_ip_parts[:4])
dst_full_address = dst_full_address_with_colon.rstrip(':')
dst_ip_parts = dst_full_address.split('.')
if len(dst_ip_parts) == 5:
dest_port = dst_ip_parts[4]
print(f" 検出 - 送信元IP: {source_ip}, 宛先ポート: {dest_port}") # デバッグ出力
f_ports.write(f"{source_ip} {dest_port}\n")
except (ValueError, IndexError):
print(f" 解析エラー: {line.strip()}") # デバッグ出力
continue
p_tcpdump.stdout.close()
stderr_output = p_tcpdump.stderr.read()
p_tcpdump.stderr.close()
p_tcpdump.wait()
if p_tcpdump.returncode != 0:
print(f"tcpdump -r コマンドが失敗しました。標準エラー出力: {stderr_output}", file=sys.stderr)
except FileNotFoundError:
error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
if not os.path.exists(TEMP_PORT_LIST) or os.path.getsize(TEMP_PORT_LIST) == 0:
print("関連するSYN/SYN-ACKパケットが見つからなかったか、ポートリストへの解析に失敗しました。")
open(TEMP_SCAN_COUNTS, 'w').close()
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
count_command = f"sort '{TEMP_PORT_LIST}' | uniq | awk '{{print $1}}' | uniq -c | sort -nr > '{TEMP_SCAN_COUNTS}'"
try:
subprocess.run(count_command, shell=True, check=True, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"スキャンカウント処理中にエラーが発生しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
open(TEMP_SCAN_COUNTS, 'w').close()
except FileNotFoundError as e:
error_exit(f"カウント処理パイプラインのコマンド (sort, uniq, awk) が見つかりません: {e}")
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
def check_for_scans(scan_counts_file_path):
if not os.path.exists(scan_counts_file_path) or os.path.getsize(scan_counts_file_path) == 0:
return
print("ポートスキャンをチェックしています...")
with open(scan_counts_file_path, 'r') as f:
for line in f:
try:
parts = line.strip().split()
if len(parts) == 2:
count_str, ip = parts
count = int(count_str)
if count >= THRESHOLD:
print("--- !!! ポートスキャン警告 !!! ---")
print(f"IPアドレス: {ip} からのポートスキャンの可能性があります。")
print(f"過去 {DURATION} 秒間に {count} 個の異なるポートへアクセスがありました。")
print("-------------------------------")
except ValueError:
print(f"警告: スキャンカウントファイルの行の解析に失敗しました: {line.strip()}", file=sys.stderr)
# --- メイン処理 ---
if __name__ == "__main__":
if os.geteuid() != 0:
error_exit("このスクリプトはroot権限で実行する必要があります。 (例: sudo python3 script.py eth0)")
required_commands = ["tcpdump", "ip", "awk", "sort", "uniq"]
for cmd in required_commands:
if not shutil.which(cmd):
error_exit(f"'{cmd}' コマンドが見つかりません。インストールされているか確認してください。")
if len(sys.argv) < 2:
usage()
else:
INTERFACE = sys.argv[1]
try:
# インターフェースの存在を確認し、詳細な情報を表示
result = subprocess.run(
["ip", "link", "show", INTERFACE],
capture_output=True,
text=True,
check=True
)
print(f"インターフェース '{INTERFACE}' の情報:\n{result.stdout}")
except subprocess.CalledProcessError as e:
error_exit(f"インターフェース '{INTERFACE}' が見つからないか、確認中にエラーが発生しました: {e.stderr.strip() if e.stderr else str(e)}")
except FileNotFoundError:
error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")
LOCAL_IP = get_local_ip(INTERFACE)
if not LOCAL_IP:
error_exit(f"インターフェース '{INTERFACE}' のローカルIPアドレスを取得できませんでした。設定を確認してください。")
print(f"監視インターフェース: {INTERFACE}")
print(f"ローカルIPアドレス: {LOCAL_IP}")
print(f"パケットキャプチャ時間: {DURATION} 秒")
print(f"スキャン閾値: {THRESHOLD} 個の異なるポート / {DURATION} 秒 / 1送信元IP")
print("Ctrl+C を押すと停止します。")
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
try:
# メインループ:パケットキャプチャと解析を繰り返す
while True:
print("--- 新しい監視インターバルを開始 ---")
# 各インターバルの開始時にグローバルな一時ファイル変数をリセット
# これにより、万が一クリーンアップハンドラが不完全な状態で呼ばれた場合でも、
# 誤ったファイルを参照するのを防ぐ。
TEMP_PACKET_FILE = None
TEMP_PORT_LIST = None
TEMP_SCAN_COUNTS = None
# 1. パケットキャプチャ実行
# capture_packets内でグローバル変数 TEMP_PACKET_FILE が設定される
current_packet_file = capture_packets(INTERFACE, DURATION)
# 2. キャプチャしたパケットの解析
# analyze_packets内でグローバル変数 TEMP_PORT_LIST と TEMP_SCAN_COUNTS が設定される
current_port_list, current_scan_counts = analyze_packets(current_packet_file)
# 3. ポートスキャンのチェック
# スキャンカウントファイルが実際に存在し、中身がある場合のみチェックを実行
if os.path.isfile(current_scan_counts) and os.path.getsize(current_scan_counts) > 0:
check_for_scans(current_scan_counts)
else:
print("このインターバルではスキャンデータがありませんでした。")
# 4. このインターバルで使用した一時ファイルをクリーンアップ
# Ctrl+Cで中断されなかった場合の後始末。
# 中断された場合はシグナルハンドラのcleanup関数が処理する。
iter_files_to_clean = [current_packet_file, current_port_list, current_scan_counts]
for temp_f in iter_files_to_clean:
if temp_f and os.path.isfile(temp_f):
try:
os.remove(temp_f)
except OSError as e:
print(f"イテレーション中の一次ファイル {temp_f} の削除エラー: {e}", file=sys.stderr)
# 次のイテレーションに備えて、グローバル変数もクリア(より安全に)
TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS = None, None, None
except KeyboardInterrupt:
# Ctrl+C が押された場合 (通常はsignal_handlerが処理するが、念のため)
print("\nキーボード割り込みを受信しました。クリーンアップ処理を実行します...")
cleanup() # クリーンアップ関数を呼び出し
except Exception as e:
# その他の予期せぬエラーが発生した場合
print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
cleanup() # クリーンアップ処理を試みる
sys.exit(1) # エラーコード1で終了
これをデーモンで動かす時にジャーナルにログが吐き出されるようにしたのが以下のコードになります。
import os
import subprocess
import tempfile
import signal
import sys
import shutil
import re
import time # time.sleep() を使用するため追加
#Version 14 (Modified for Systemd logging and tcpdump filter)
# --- グローバル変数定義 ---
TEMP_PACKET_FILE = None
TEMP_PORT_LIST = None
TEMP_SCAN_COUNTS = None
# --- 設定値 ---
DURATION = 10
THRESHOLD = 20
TEMP_DIR = "/tmp"
TEMP_PREFIX = "tcpdump_scan_detect_"
LOCAL_IP = None
INTERFACE = None
# --- 関数定義 ---
def usage():
print(f"使用法: {sys.argv[0]} <インターフェース名>")
sys.stdout.flush()
print(" <インターフェース名>: 監視するネットワークインターフェース (例: eth0, enp0s3)")
sys.stdout.flush()
print("")
sys.stdout.flush()
print(f"例: {sys.argv[0]} eth0")
sys.stdout.flush()
sys.exit(1)
def error_exit(message):
print(f"エラー: {message}", file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
def cleanup(signum=None, frame=None):
print("スキャン検出を停止し、一時ファイルをクリーンアップしています...")
sys.stdout.flush()
global TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS
files_to_clean = [TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS]
for temp_file in files_to_clean:
if temp_file and os.path.isfile(temp_file):
print(f"{temp_file} を削除しています。")
sys.stdout.flush()
try:
os.remove(temp_file)
except OSError as e:
print(f"{temp_file} の削除中にエラーが発生しました: {e}", file=sys.stderr)
sys.stderr.flush()
print("クリーンアップが完了しました。終了します。")
sys.stdout.flush()
sys.exit(0)
def get_local_ip(interface_to_check):
try:
result = subprocess.run(
["ip", "addr", "show", "dev", interface_to_check],
capture_output=True,
text=True,
check=True
)
for line in result.stdout.splitlines():
if "inet " in line:
return line.split()[1].split('/')[0]
except subprocess.CalledProcessError as e:
print(f"{interface_to_check} のIPアドレス取得に失敗しました: {e}", file=sys.stderr)
sys.stderr.flush()
if e.stderr:
print(f" ip addr show エラー: {e.stderr.strip()}", file=sys.stderr)
sys.stderr.flush()
except FileNotFoundError:
error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")
return None
def capture_packets(interface_to_capture, duration_sec):
global TEMP_PACKET_FILE
temp_file_obj = tempfile.NamedTemporaryFile(
prefix=TEMP_PREFIX + "packets.pcap.",
dir=TEMP_DIR,
delete=False
)
TEMP_PACKET_FILE = os.path.abspath(temp_file_obj.name)
temp_file_obj.close()
# --- tcpdump のフィルタリング条件を修正 ---
# "(tcp[13] & 2) != 0" はSYNフラグがセットされたパケットを捕捉します。
# これでSYNスキャンは捕捉できるはずです。
# 元のフィルタにあった `src host not {LOCAL_IP}` は、自分自身からのスキャンを
# 検出しないようにするためのものですが、意図しないフィルタリングになる可能性があるので、
# より基本的なSYNパケット捕捉フィルタにしました。
command = [
"tcpdump", "-i", interface_to_capture, "-n",
"-G", str(duration_sec), "-W", "1", "-w", TEMP_PACKET_FILE,
"(tcp[13] & 2) != 0" # SYN フラグがセットされたTCPパケットのみを捕捉
]
print(f"{duration_sec}秒間、{os.path.basename(TEMP_PACKET_FILE)} へパケットをキャプチャしています...")
sys.stdout.flush()
try:
subprocess.run(command, stderr=subprocess.PIPE, check=True)
# ファイルの所有者を root に変更
chown_result = subprocess.run(["chown", "root:root", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
if chown_result.stderr:
print(f"chown エラー: {chown_result.stderr.strip()}", file=sys.stderr)
sys.stderr.flush()
# ファイルの権限をすべてのユーザーが読み取れるように変更
chmod_result = subprocess.run(["chmod", "a+r", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
if chmod_result.stderr:
print(f"chmod エラー: {chmod_result.stderr.strip()}", file=sys.stderr)
sys.stderr.flush()
except subprocess.CalledProcessError as e:
print(f"tcpdumpコマンドが終了ステータス {e.returncode} で失敗しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
sys.stderr.flush()
except FileNotFoundError:
error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
return TEMP_PACKET_FILE
def analyze_packets(packet_file_path):
global TEMP_PORT_LIST, TEMP_SCAN_COUNTS
temp_port_list_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "ports.txt.", dir=TEMP_DIR, delete=False)
TEMP_PORT_LIST = os.path.abspath(temp_port_list_obj.name)
temp_port_list_obj.close()
temp_scan_counts_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "counts.txt.", dir=TEMP_DIR, delete=False)
TEMP_SCAN_COUNTS = os.path.abspath(temp_scan_counts_obj.name)
temp_scan_counts_obj.close()
if not packet_file_path or not os.path.exists(packet_file_path) or os.path.getsize(packet_file_path) == 0:
print(f"警告: パケットファイル {os.path.basename(packet_file_path) if packet_file_path else 'None'} が空か見つかりません。解析をスキップします。")
sys.stdout.flush()
open(TEMP_PORT_LIST, 'w').close()
open(TEMP_SCAN_COUNTS, 'w').close()
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
print(f"{os.path.basename(packet_file_path)} からパケットを解析しています...")
sys.stdout.flush()
try:
cmd_read_filter = ["tcpdump", "-r", packet_file_path, "-n"]
p_tcpdump = subprocess.Popen(cmd_read_filter, stdout=subprocess.PIPE, text=True, stderr=subprocess.PIPE)
with open(TEMP_PORT_LIST, 'w') as f_ports:
for line in p_tcpdump.stdout:
# print(f"解析中の行: {line.strip()}") # デバッグ出力(必要であればコメント解除)
# sys.stdout.flush()
if ' Flags [S]' in line or ' Flags [S.' in line: # SYNパケットを検出
parts = line.split()
try:
ip_keyword_index = parts.index("IP")
src_full_address = parts[ip_keyword_index + 1]
dst_full_address_with_colon = parts[ip_keyword_index + 3]
src_ip_parts = src_full_address.split('.')
if len(src_ip_parts) == 5: # 例: 192.168.0.56.55013
source_ip = ".".join(src_ip_parts[:4])
dst_full_address = dst_full_address_with_colon.rstrip(':')
dst_ip_parts = dst_full_address.split('.')
if len(dst_ip_parts) == 5: # 例: 192.168.0.7.1721:
dest_port = dst_ip_parts[4]
# print(f" 検出 - 送信元IP: {source_ip}, 宛先ポート: {dest_port}") # デバッグ出力
# sys.stdout.flush()
f_ports.write(f"{source_ip} {dest_port}\n")
except (ValueError, IndexError):
# print(f" 解析エラー: {line.strip()}") # デバッグ出力
# sys.stdout.flush()
continue
p_tcpdump.stdout.close()
stderr_output = p_tcpdump.stderr.read()
p_tcpdump.stderr.close()
p_tcpdump.wait()
if p_tcpdump.returncode != 0:
print(f"tcpdump -r コマンドが失敗しました。標準エラー出力: {stderr_output}", file=sys.stderr)
sys.stderr.flush()
except FileNotFoundError:
error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
# return TEMP_PORT_LIST, TEMP_SCAN_COUNTS # error_exitはsys.exitを呼ぶので到達しない
if not os.path.exists(TEMP_PORT_LIST) or os.path.getsize(TEMP_PORT_LIST) == 0:
print("関連するSYN/SYN-ACKパケットが見つからなかったか、ポートリストへの解析に失敗しました。")
sys.stdout.flush()
open(TEMP_SCAN_COUNTS, 'w').close()
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
# temp_port_list をソートし、uniq でユニークな IP とポートの組み合わせを数え、
# awk で IP だけを抽出し、再度 uniq -c で IP ごとのユニークなポート数を数え、
# 最後に降順ソートして temp_scan_counts に保存
# このコマンドは、シェルによって解釈される必要があるため shell=True が必要
count_command = f"sort '{TEMP_PORT_LIST}' | uniq | awk '{{print $1}}' | uniq -c | sort -nr > '{TEMP_SCAN_COUNTS}'"
try:
subprocess.run(count_command, shell=True, check=True, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"スキャンカウント処理中にエラーが発生しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
sys.stderr.flush()
open(TEMP_SCAN_COUNTS, 'w').close() # エラー時にもファイルは空にする
except FileNotFoundError as e:
# shell=Trueの場合、これは通常発生しない。コマンドが見つからないエラーはshellがハンドルする
error_exit(f"カウント処理パイプラインのコマンド (sort, uniq, awk) が見つかりません: {e}")
return TEMP_PORT_LIST, TEMP_SCAN_COUNTS
def check_for_scans(scan_counts_file_path):
if not os.path.exists(scan_counts_file_path) or os.path.getsize(scan_counts_file_path) == 0:
return
print("ポートスキャンをチェックしています...")
sys.stdout.flush()
with open(scan_counts_file_path, 'r') as f:
for line in f:
try:
parts = line.strip().split()
if len(parts) == 2:
count_str, ip = parts
count = int(count_str)
if count >= THRESHOLD:
print("--- !!! ポートスキャン警告 !!! ---")
sys.stdout.flush()
print(f"IPアドレス: {ip} からのポートスキャンの可能性があります。")
sys.stdout.flush()
print(f"過去 {DURATION} 秒間に {count} 個の異なるポートへアクセスがありました。")
sys.stdout.flush()
print("-------------------------------")
sys.stdout.flush()
except ValueError:
print(f"警告: スキャンカウントファイルの行の解析に失敗しました: {line.strip()}", file=sys.stderr)
sys.stderr.flush()
# --- メイン処理 ---
if __name__ == "__main__":
if os.geteuid() != 0:
error_exit("このスクリプトはroot権限で実行する必要があります。 (例: sudo python3 script.py eth0)")
required_commands = ["tcpdump", "ip", "awk", "sort", "uniq"]
for cmd in required_commands:
if not shutil.which(cmd):
error_exit(f"'{cmd}' コマンドが見つかりません。インストールされているか確認してください。")
if len(sys.argv) < 2:
usage()
else:
INTERFACE = sys.argv[1]
try:
# インターフェースの存在を確認し、詳細な情報を表示
result = subprocess.run(
["ip", "link", "show", INTERFACE],
capture_output=True,
text=True,
check=True
)
print(f"インターフェース '{INTERFACE}' の情報:\n{result.stdout}")
sys.stdout.flush()
except subprocess.CalledProcessError as e:
error_exit(f"インターフェース '{INTERFACE}' が見つからないか、確認中にエラーが発生しました: {e.stderr.strip() if e.stderr else str(e)}")
except FileNotFoundError:
error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")
LOCAL_IP = get_local_ip(INTERFACE)
if not LOCAL_IP:
error_exit(f"インターフェース '{INTERFACE}' のローカルIPアドレスを取得できませんでした。設定を確認してください。")
print(f"監視インターフェース: {INTERFACE}")
sys.stdout.flush()
print(f"ローカルIPアドレス: {LOCAL_IP}")
sys.stdout.flush()
print(f"パケットキャプチャ時間: {DURATION} 秒")
sys.stdout.flush()
print(f"スキャン閾値: {THRESHOLD} 個の異なるポート / {DURATION} 秒 / 1送信元IP")
sys.stdout.flush()
print("Ctrl+C を押すと停止します。")
sys.stdout.flush()
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
try:
# メインループ:パケットキャプチャと解析を繰り返す
while True:
print("--- 新しい監視インターバルを開始 ---")
sys.stdout.flush()
time.sleep(1) # インターバル開始の出力がジャーナルに反映されるのを少し待つ
# 各インターバルの開始時にグローバルな一時ファイル変数をリセット
# これにより、万が一クリーンアップハンドラが不完全な状態で呼ばれた場合でも、
# 誤ったファイルを参照するのを防ぐ。
TEMP_PACKET_FILE = None
TEMP_PORT_LIST = None
TEMP_SCAN_COUNTS = None
# 1. パケットキャプチャ実行
# capture_packets内でグローバル変数 TEMP_PACKET_FILE が設定される
current_packet_file = capture_packets(INTERFACE, DURATION)
# 2. キャプチャしたパケットの解析
# analyze_packets内でグローバル変数 TEMP_PORT_LIST と TEMP_SCAN_COUNTS が設定される
current_port_list, current_scan_counts = analyze_packets(current_packet_file)
# 3. ポートスキャンのチェック
# スキャンカウントファイルが実際に存在し、中身がある場合のみチェックを実行
if os.path.isfile(current_scan_counts) and os.path.getsize(current_scan_counts) > 0:
check_for_scans(current_scan_counts)
else:
print("このインターバルではスキャンデータがありませんでした。")
sys.stdout.flush()
# 4. このインターバルで使用した一時ファイルをクリーンアップ
# Ctrl+Cで中断されなかった場合の後始末。
# 中断された場合はシグナルハンドラのcleanup関数が処理する。
iter_files_to_clean = [current_packet_file, current_port_list, current_scan_counts]
for temp_f in iter_files_to_clean:
if temp_f and os.path.isfile(temp_f):
try:
os.remove(temp_f)
except OSError as e:
print(f"イテレーション中の一次ファイル {temp_f} の削除エラー: {e}", file=sys.stderr)
sys.stderr.flush()
# 次のイテレーションに備えて、グローバル変数もクリア(より安全に)
TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS = None, None, None
except KeyboardInterrupt:
# Ctrl+C が押された場合 (通常はsignal_handlerが処理するが、念のため)
print("\nキーボード割り込みを受信しました。クリーンアップ処理を実行します...")
sys.stdout.flush()
cleanup() # クリーンアップ関数を呼び出し
except Exception as e:
# その他の予期せぬエラーが発生した場合
print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
sys.stderr.flush()
cleanup() # クリーンアップ処理を試みる
sys.exit(1) # エラーコード1で終了
デーモン化
上記示したpythonスクリプトを、opt/scan_detector に配置
# mkdir /opt/scan_detector
# cp scan_detector.py /opt/scan_detector/
# chmod +x /opt/scan_detector/scan_detector.py
サービスファイル作成
/etc/systemd/system/scan-detector.service
中身 eno1 は、環境に合わせて変更する必要があります。
[Unit]
Description=Port Scan Detector Service
After=network-online.target
[Service]
ExecStart=/usr/bin/python3 /opt/scan_detector/scan_detector.py eno1
WorkingDirectory=/opt/scan_detector/
Restart=on-failure
User=root
Group=root
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
新しいサービスファイルを認識させるために、systemd の設定をリロードします。
sudo systemctl daemon-reload
サービスを有効化
# systemctl enable scan-detector.service
Created symlink /etc/systemd/system/multi-user.target.wants/scan-detector.service → /etc/systemd/system/scan-detector.service.
サービスを起動
systemctl start scan-detector.service
サービス起動状況の確認
systemctl status scan-detector.service
root@TestDB:/home# systemctl status scan-detector.service
● scan-detector.service - Port Scan Detector Service
Loaded: loaded (/etc/systemd/system/scan-detector.service; enabled; preset: enabled)
Active: active (running) since Wed 2025-05-21 09:35:04 JST; 33s ago
Main PID: 396726 (python3)
Tasks: 2 (limit: 18955)
Memory: 7.2M
CPU: 25ms
CGroup: /system.slice/scan-detector.service
tq396726 /usr/bin/python3 /opt/scan_detector/scan_detector.py eno1
mq396729 tcpdump -i eno1 -n -G 10 -W 1 -w /tmp/tcpdump_scan_detect_packets.pcap.qk92qj3k "(tcp[13] & 2) !=>
5月 21 09:35:04 TestDB systemd[1]: Started scan-detector.service - Port Scan Detector Service.
ポートスキャンを検知してみる
# journalctl -u scan-detector.service -f
5月 21 09:56:48 TestDB python3[397428]: 2: eno1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP mode DEFAULT group default qlen 1000
5月 21 09:56:48 TestDB python3[397428]: link/ether 5c:ba:2c:59:bc:80 brd ff:ff:ff:ff:ff:ff
5月 21 09:56:48 TestDB python3[397428]: altname enp2s0f0
5月 21 09:56:48 TestDB python3[397428]: 監視インターフェース: eno1
5月 21 09:56:48 TestDB python3[397428]: ローカルIPアドレス: 192.168.0.7
5月 21 09:56:48 TestDB python3[397428]: パケットキャプチャ時間: 10 秒
5月 21 09:56:48 TestDB python3[397428]: スキャン閾値: 20 個の異なるポート / 10 秒 / 1送信元IP
5月 21 09:56:48 TestDB python3[397428]: Ctrl+C を押すと停止します。
5月 21 09:56:48 TestDB python3[397428]: --- 新しい監視インターバルを開始 ---
5月 21 09:56:49 TestDB python3[397428]: 10秒間、tcpdump_scan_detect_packets.pcap.pvpadoav へパケットをキャプチャしてい ます...
5月 21 09:57:17 TestDB python3[397428]: tcpdump_scan_detect_packets.pcap.pvpadoav からパケットを解析しています...
5月 21 09:57:17 TestDB python3[397428]: 関連するSYN/SYN-ACKパケットが見つからなかったか、ポートリストへの解析に失敗しました。
5月 21 09:57:17 TestDB python3[397428]: このインターバルではスキャンデータがありませんでした。
5月 21 09:57:17 TestDB python3[397428]: --- 新しい監視インターバルを開始 ---
5月 21 09:57:18 TestDB python3[397428]: 10秒間、tcpdump_scan_detect_packets.pcap.tj7rkmxj へパケットをキャプチャしてい ます...
5月 21 09:57:31 TestDB python3[397428]: tcpdump_scan_detect_packets.pcap.tj7rkmxj からパケットを解析しています...
5月 21 09:57:31 TestDB python3[397428]: ポートスキャンをチェックしています...
5月 21 09:57:31 TestDB python3[397428]: --- !!! ポートスキャン警告 !!! ---
5月 21 09:57:31 TestDB python3[397428]: IPアドレス: 192.168.0.56 からのポートスキャンの可能性があります。
5月 21 09:57:31 TestDB python3[397428]: 過去 10 秒間に 1000 個の異なるポートへアクセスがありました。
5月 21 09:57:31 TestDB python3[397428]: -------------------------------
スクリプトを調整する
スクリプトを調整した際にデーモンを再起動する方法です
# systemctl restart scan-detector.servic
過去ログの確認方法
以下のコマンドにより過去のログを探る事ができます。
# journalctl -u scan-detector.service
気になるログを発見したので、他にも同じスキャンを受けているか確認
# journalctl -u scan-detector.service | grep "192.168.0.7 からのポートスキャン"
5月 21 14:47:05 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 22 09:52:25 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 22 10:12:53 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 22 15:17:08 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 22 16:15:04 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 22 17:40:50 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 23 09:13:10 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
5月 23 09:54:02 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。