EDRシステム構築中 PortScan_detectorについてメモ

未分類

本ページは広告が含まれています。気になる広告をクリック頂けますと、サーバ運営費になります(^^

サーバがポートスキャンを受けていないかを確認する

サーバにおいて、自分自身がポートスキャンを受けていないかを検知するpythonコードです。

import os
import subprocess
import tempfile
import signal
import sys
import shutil
import re

# --- グローバル変数定義 ---
TEMP_PACKET_FILE = None
TEMP_PORT_LIST = None
TEMP_SCAN_COUNTS = None

# --- 設定値 ---
DURATION = 10
THRESHOLD = 20
TEMP_DIR = "/tmp"
TEMP_PREFIX = "tcpdump_scan_detect_"
LOCAL_IP = None
INTERFACE = None

# --- 関数定義 ---

def usage():
    print(f"使用法: {sys.argv[0]} <インターフェース名>")
    print("  <インターフェース名>: 監視するネットワークインターフェース (例: eth0, enp0s3)")
    print("")
    print(f"例: {sys.argv[0]} eth0")
    sys.exit(1)

def error_exit(message):
    print(f"エラー: {message}", file=sys.stderr)
    sys.exit(1)

def cleanup(signum=None, frame=None):
    print("スキャン検出を停止し、一時ファイルをクリーンアップしています...")
    global TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS
    files_to_clean = [TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS]
    for temp_file in files_to_clean:
        if temp_file and os.path.isfile(temp_file):
            print(f"{temp_file} を削除しています。")
            try:
                os.remove(temp_file)
            except OSError as e:
                print(f"{temp_file} の削除中にエラーが発生しました: {e}", file=sys.stderr)
    print("クリーンアップが完了しました。終了します。")
    sys.exit(0)

def get_local_ip(interface_to_check):
    try:
        result = subprocess.run(
            ["ip", "addr", "show", "dev", interface_to_check],
            capture_output=True,
            text=True,
            check=True
        )
        for line in result.stdout.splitlines():
            if "inet " in line:
                return line.split()[1].split('/')[0]
    except subprocess.CalledProcessError as e:
        print(f"{interface_to_check} のIPアドレス取得に失敗しました: {e}", file=sys.stderr)
        if e.stderr:
            print(f"  ip addr show エラー: {e.stderr.strip()}", file=sys.stderr)
    except FileNotFoundError:
        error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")
    return None

def capture_packets(interface_to_capture, duration_sec):
    global TEMP_PACKET_FILE

    temp_file_obj = tempfile.NamedTemporaryFile(
        prefix=TEMP_PREFIX + "packets.pcap.",
        dir=TEMP_DIR,
        delete=False
    )
    TEMP_PACKET_FILE = os.path.abspath(temp_file_obj.name)
    temp_file_obj.close()

    command = [
        "tcpdump", "-i", interface_to_capture, "-n",
        "-G", str(duration_sec), "-W", "1", "-w", TEMP_PACKET_FILE,
        f"(tcp[13] & 2) != 0 or ((tcp[13] & 16) != 0 and (tcp[13] & 2) != 0) and src host not {LOCAL_IP}"
    ]
    print(f"{duration_sec}秒間、{os.path.basename(TEMP_PACKET_FILE)} へパケットをキャプチャしています...")

    try:
        subprocess.run(command, stderr=subprocess.PIPE, check=True)

        # ファイルの所有者を root に変更
        chown_result = subprocess.run(["chown", "root:root", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
        if chown_result.stderr:
            print(f"chown エラー: {chown_result.stderr.strip()}", file=sys.stderr)

        # ファイルの権限をすべてのユーザーが読み取れるように変更
        chmod_result = subprocess.run(["chmod", "a+r", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
        if chmod_result.stderr:
            print(f"chmod エラー: {chmod_result.stderr.strip()}", file=sys.stderr)

    except subprocess.CalledProcessError as e:
        print(f"tcpdumpコマンドが終了ステータス {e.returncode} で失敗しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
    except FileNotFoundError:
        error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
    return TEMP_PACKET_FILE

def analyze_packets(packet_file_path):
    global TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    temp_port_list_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "ports.txt.", dir=TEMP_DIR, delete=False)
    TEMP_PORT_LIST = os.path.abspath(temp_port_list_obj.name)
    temp_port_list_obj.close()

    temp_scan_counts_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "counts.txt.", dir=TEMP_DIR, delete=False)
    TEMP_SCAN_COUNTS = os.path.abspath(temp_scan_counts_obj.name)
    temp_scan_counts_obj.close()

    if not packet_file_path or not os.path.exists(packet_file_path) or os.path.getsize(packet_file_path) == 0:
        print(f"警告: パケットファイル {os.path.basename(packet_file_path) if packet_file_path else 'None'} が空か見つかりません。解析をスキップします。")
        open(TEMP_PORT_LIST, 'w').close()
        open(TEMP_SCAN_COUNTS, 'w').close()
        return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    print(f"{os.path.basename(packet_file_path)} からパケットを解析しています...")
    try:
        cmd_read_filter = ["tcpdump", "-r", packet_file_path, "-n"]
        p_tcpdump = subprocess.Popen(cmd_read_filter, stdout=subprocess.PIPE, text=True, stderr=subprocess.PIPE)

        with open(TEMP_PORT_LIST, 'w') as f_ports:
            for line in p_tcpdump.stdout:
                print(f"解析中の行: {line.strip()}")  # デバッグ出力
                if ' Flags [S]' in line or ' Flags [S.' in line:
                    parts = line.split()
                    try:
                        ip_keyword_index = parts.index("IP")
                        src_full_address = parts[ip_keyword_index + 1]
                        dst_full_address_with_colon = parts[ip_keyword_index + 3]

                        src_ip_parts = src_full_address.split('.')
                        if len(src_ip_parts) == 5:
                            source_ip = ".".join(src_ip_parts[:4])

                            dst_full_address = dst_full_address_with_colon.rstrip(':')
                            dst_ip_parts = dst_full_address.split('.')
                            if len(dst_ip_parts) == 5:
                                dest_port = dst_ip_parts[4]
                                print(f"  検出 - 送信元IP: {source_ip}, 宛先ポート: {dest_port}")  # デバッグ出力
                                f_ports.write(f"{source_ip} {dest_port}\n")
                    except (ValueError, IndexError):
                        print(f"  解析エラー: {line.strip()}")  # デバッグ出力
                        continue

        p_tcpdump.stdout.close()
        stderr_output = p_tcpdump.stderr.read()
        p_tcpdump.stderr.close()
        p_tcpdump.wait()
        if p_tcpdump.returncode != 0:
            print(f"tcpdump -r コマンドが失敗しました。標準エラー出力: {stderr_output}", file=sys.stderr)

    except FileNotFoundError:
        error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
        return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    if not os.path.exists(TEMP_PORT_LIST) or os.path.getsize(TEMP_PORT_LIST) == 0:
        print("関連するSYN/SYN-ACKパケットが見つからなかったか、ポートリストへの解析に失敗しました。")
        open(TEMP_SCAN_COUNTS, 'w').close()
        return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    count_command = f"sort '{TEMP_PORT_LIST}' | uniq | awk '{{print $1}}' | uniq -c | sort -nr > '{TEMP_SCAN_COUNTS}'"
    try:
        subprocess.run(count_command, shell=True, check=True, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError as e:
        print(f"スキャンカウント処理中にエラーが発生しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
        open(TEMP_SCAN_COUNTS, 'w').close()
    except FileNotFoundError as e:
        error_exit(f"カウント処理パイプラインのコマンド (sort, uniq, awk) が見つかりません: {e}")

    return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

def check_for_scans(scan_counts_file_path):
    if not os.path.exists(scan_counts_file_path) or os.path.getsize(scan_counts_file_path) == 0:
        return

    print("ポートスキャンをチェックしています...")
    with open(scan_counts_file_path, 'r') as f:
        for line in f:
            try:
                parts = line.strip().split()
                if len(parts) == 2:
                    count_str, ip = parts
                    count = int(count_str)
                    if count >= THRESHOLD:
                        print("--- !!! ポートスキャン警告 !!! ---")
                        print(f"IPアドレス: {ip} からのポートスキャンの可能性があります。")
                        print(f"過去 {DURATION} 秒間に {count} 個の異なるポートへアクセスがありました。")
                        print("-------------------------------")
            except ValueError:
                print(f"警告: スキャンカウントファイルの行の解析に失敗しました: {line.strip()}", file=sys.stderr)

# --- メイン処理 ---
if __name__ == "__main__":
    if os.geteuid() != 0:
        error_exit("このスクリプトはroot権限で実行する必要があります。 (例: sudo python3 script.py eth0)")

    required_commands = ["tcpdump", "ip", "awk", "sort", "uniq"]
    for cmd in required_commands:
        if not shutil.which(cmd):
            error_exit(f"'{cmd}' コマンドが見つかりません。インストールされているか確認してください。")

    if len(sys.argv) < 2:
        usage()
    else:
        INTERFACE = sys.argv[1]

    try:
        # インターフェースの存在を確認し、詳細な情報を表示
        result = subprocess.run(
            ["ip", "link", "show", INTERFACE],
            capture_output=True,
            text=True,
            check=True
        )
        print(f"インターフェース '{INTERFACE}' の情報:\n{result.stdout}")
    except subprocess.CalledProcessError as e:
        error_exit(f"インターフェース '{INTERFACE}' が見つからないか、確認中にエラーが発生しました: {e.stderr.strip() if e.stderr else str(e)}")
    except FileNotFoundError:
        error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")

    LOCAL_IP = get_local_ip(INTERFACE)
    if not LOCAL_IP:
        error_exit(f"インターフェース '{INTERFACE}' のローカルIPアドレスを取得できませんでした。設定を確認してください。")

    print(f"監視インターフェース: {INTERFACE}")
    print(f"ローカルIPアドレス: {LOCAL_IP}")
    print(f"パケットキャプチャ時間: {DURATION} 秒")
    print(f"スキャン閾値: {THRESHOLD} 個の異なるポート / {DURATION} 秒 / 1送信元IP")
    print("Ctrl+C を押すと停止します。")

    signal.signal(signal.SIGINT, cleanup)
    signal.signal(signal.SIGTERM, cleanup)

    try:
        # メインループ:パケットキャプチャと解析を繰り返す
        while True:
            print("--- 新しい監視インターバルを開始 ---")

            # 各インターバルの開始時にグローバルな一時ファイル変数をリセット
            # これにより、万が一クリーンアップハンドラが不完全な状態で呼ばれた場合でも、
            # 誤ったファイルを参照するのを防ぐ。
            TEMP_PACKET_FILE = None
            TEMP_PORT_LIST = None
            TEMP_SCAN_COUNTS = None

            # 1. パケットキャプチャ実行
            #    capture_packets内でグローバル変数 TEMP_PACKET_FILE が設定される
            current_packet_file = capture_packets(INTERFACE, DURATION)

            # 2. キャプチャしたパケットの解析
            #    analyze_packets内でグローバル変数 TEMP_PORT_LIST と TEMP_SCAN_COUNTS が設定される
            current_port_list, current_scan_counts = analyze_packets(current_packet_file)

            # 3. ポートスキャンのチェック
            #    スキャンカウントファイルが実際に存在し、中身がある場合のみチェックを実行
            if os.path.isfile(current_scan_counts) and os.path.getsize(current_scan_counts) > 0:
                check_for_scans(current_scan_counts)
            else:
                print("このインターバルではスキャンデータがありませんでした。")

            # 4. このインターバルで使用した一時ファイルをクリーンアップ
            #    Ctrl+Cで中断されなかった場合の後始末。
            #    中断された場合はシグナルハンドラのcleanup関数が処理する。
            iter_files_to_clean = [current_packet_file, current_port_list, current_scan_counts]
            for temp_f in iter_files_to_clean:
                if temp_f and os.path.isfile(temp_f):
                    try:
                        os.remove(temp_f)
                    except OSError as e:
                        print(f"イテレーション中の一次ファイル {temp_f} の削除エラー: {e}", file=sys.stderr)

            # 次のイテレーションに備えて、グローバル変数もクリア(より安全に)
            TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS = None, None, None

    except KeyboardInterrupt:
        # Ctrl+C が押された場合 (通常はsignal_handlerが処理するが、念のため)
        print("\nキーボード割り込みを受信しました。クリーンアップ処理を実行します...")
        cleanup()  # クリーンアップ関数を呼び出し
    except Exception as e:
        # その他の予期せぬエラーが発生した場合
        print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
        cleanup()  # クリーンアップ処理を試みる
        sys.exit(1) # エラーコード1で終了

これをデーモンで動かす時にジャーナルにログが吐き出されるようにしたのが以下のコードになります。

import os
import subprocess
import tempfile
import signal
import sys
import shutil
import re
import time # time.sleep() を使用するため追加

#Version 14 (Modified for Systemd logging and tcpdump filter)

# --- グローバル変数定義 ---
TEMP_PACKET_FILE = None
TEMP_PORT_LIST = None
TEMP_SCAN_COUNTS = None

# --- 設定値 ---
DURATION = 10
THRESHOLD = 20
TEMP_DIR = "/tmp"
TEMP_PREFIX = "tcpdump_scan_detect_"
LOCAL_IP = None
INTERFACE = None

# --- 関数定義 ---

def usage():
    print(f"使用法: {sys.argv[0]} <インターフェース名>")
    sys.stdout.flush()
    print("  <インターフェース名>: 監視するネットワークインターフェース (例: eth0, enp0s3)")
    sys.stdout.flush()
    print("")
    sys.stdout.flush()
    print(f"例: {sys.argv[0]} eth0")
    sys.stdout.flush()
    sys.exit(1)

def error_exit(message):
    print(f"エラー: {message}", file=sys.stderr)
    sys.stderr.flush()
    sys.exit(1)

def cleanup(signum=None, frame=None):
    print("スキャン検出を停止し、一時ファイルをクリーンアップしています...")
    sys.stdout.flush()
    global TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS
    files_to_clean = [TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS]
    for temp_file in files_to_clean:
        if temp_file and os.path.isfile(temp_file):
            print(f"{temp_file} を削除しています。")
            sys.stdout.flush()
            try:
                os.remove(temp_file)
            except OSError as e:
                print(f"{temp_file} の削除中にエラーが発生しました: {e}", file=sys.stderr)
                sys.stderr.flush()
    print("クリーンアップが完了しました。終了します。")
    sys.stdout.flush()
    sys.exit(0)

def get_local_ip(interface_to_check):
    try:
        result = subprocess.run(
            ["ip", "addr", "show", "dev", interface_to_check],
            capture_output=True,
            text=True,
            check=True
        )
        for line in result.stdout.splitlines():
            if "inet " in line:
                return line.split()[1].split('/')[0]
    except subprocess.CalledProcessError as e:
        print(f"{interface_to_check} のIPアドレス取得に失敗しました: {e}", file=sys.stderr)
        sys.stderr.flush()
        if e.stderr:
            print(f"  ip addr show エラー: {e.stderr.strip()}", file=sys.stderr)
            sys.stderr.flush()
    except FileNotFoundError:
        error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")
    return None

def capture_packets(interface_to_capture, duration_sec):
    global TEMP_PACKET_FILE

    temp_file_obj = tempfile.NamedTemporaryFile(
        prefix=TEMP_PREFIX + "packets.pcap.",
        dir=TEMP_DIR,
        delete=False
    )
    TEMP_PACKET_FILE = os.path.abspath(temp_file_obj.name)
    temp_file_obj.close()

    # --- tcpdump のフィルタリング条件を修正 ---
    # "(tcp[13] & 2) != 0" はSYNフラグがセットされたパケットを捕捉します。
    # これでSYNスキャンは捕捉できるはずです。
    # 元のフィルタにあった `src host not {LOCAL_IP}` は、自分自身からのスキャンを
    # 検出しないようにするためのものですが、意図しないフィルタリングになる可能性があるので、
    # より基本的なSYNパケット捕捉フィルタにしました。
    command = [
        "tcpdump", "-i", interface_to_capture, "-n",
        "-G", str(duration_sec), "-W", "1", "-w", TEMP_PACKET_FILE,
        "(tcp[13] & 2) != 0" # SYN フラグがセットされたTCPパケットのみを捕捉
    ]
    
    print(f"{duration_sec}秒間、{os.path.basename(TEMP_PACKET_FILE)} へパケットをキャプチャしています...")
    sys.stdout.flush()

    try:
        subprocess.run(command, stderr=subprocess.PIPE, check=True)

        # ファイルの所有者を root に変更
        chown_result = subprocess.run(["chown", "root:root", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
        if chown_result.stderr:
            print(f"chown エラー: {chown_result.stderr.strip()}", file=sys.stderr)
            sys.stderr.flush()

        # ファイルの権限をすべてのユーザーが読み取れるように変更
        chmod_result = subprocess.run(["chmod", "a+r", TEMP_PACKET_FILE], capture_output=True, text=True, check=True)
        if chmod_result.stderr:
            print(f"chmod エラー: {chmod_result.stderr.strip()}", file=sys.stderr)
            sys.stderr.flush()

    except subprocess.CalledProcessError as e:
        print(f"tcpdumpコマンドが終了ステータス {e.returncode} で失敗しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
        sys.stderr.flush()
    except FileNotFoundError:
        error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
    return TEMP_PACKET_FILE

def analyze_packets(packet_file_path):
    global TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    temp_port_list_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "ports.txt.", dir=TEMP_DIR, delete=False)
    TEMP_PORT_LIST = os.path.abspath(temp_port_list_obj.name)
    temp_port_list_obj.close()

    temp_scan_counts_obj = tempfile.NamedTemporaryFile(prefix=TEMP_PREFIX + "counts.txt.", dir=TEMP_DIR, delete=False)
    TEMP_SCAN_COUNTS = os.path.abspath(temp_scan_counts_obj.name)
    temp_scan_counts_obj.close()

    if not packet_file_path or not os.path.exists(packet_file_path) or os.path.getsize(packet_file_path) == 0:
        print(f"警告: パケットファイル {os.path.basename(packet_file_path) if packet_file_path else 'None'} が空か見つかりません。解析をスキップします。")
        sys.stdout.flush()
        open(TEMP_PORT_LIST, 'w').close()
        open(TEMP_SCAN_COUNTS, 'w').close()
        return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    print(f"{os.path.basename(packet_file_path)} からパケットを解析しています...")
    sys.stdout.flush()
    try:
        cmd_read_filter = ["tcpdump", "-r", packet_file_path, "-n"]
        p_tcpdump = subprocess.Popen(cmd_read_filter, stdout=subprocess.PIPE, text=True, stderr=subprocess.PIPE)

        with open(TEMP_PORT_LIST, 'w') as f_ports:
            for line in p_tcpdump.stdout:
                # print(f"解析中の行: {line.strip()}")  # デバッグ出力(必要であればコメント解除)
                # sys.stdout.flush()
                if ' Flags [S]' in line or ' Flags [S.' in line: # SYNパケットを検出
                    parts = line.split()
                    try:
                        ip_keyword_index = parts.index("IP")
                        src_full_address = parts[ip_keyword_index + 1]
                        dst_full_address_with_colon = parts[ip_keyword_index + 3]

                        src_ip_parts = src_full_address.split('.')
                        if len(src_ip_parts) == 5: # 例: 192.168.0.56.55013
                            source_ip = ".".join(src_ip_parts[:4])

                            dst_full_address = dst_full_address_with_colon.rstrip(':')
                            dst_ip_parts = dst_full_address.split('.')
                            if len(dst_ip_parts) == 5: # 例: 192.168.0.7.1721:
                                dest_port = dst_ip_parts[4]
                                # print(f"  検出 - 送信元IP: {source_ip}, 宛先ポート: {dest_port}")  # デバッグ出力
                                # sys.stdout.flush()
                                f_ports.write(f"{source_ip} {dest_port}\n")
                    except (ValueError, IndexError):
                        # print(f"  解析エラー: {line.strip()}")  # デバッグ出力
                        # sys.stdout.flush()
                        continue

        p_tcpdump.stdout.close()
        stderr_output = p_tcpdump.stderr.read()
        p_tcpdump.stderr.close()
        p_tcpdump.wait()
        if p_tcpdump.returncode != 0:
            print(f"tcpdump -r コマンドが失敗しました。標準エラー出力: {stderr_output}", file=sys.stderr)
            sys.stderr.flush()

    except FileNotFoundError:
        error_exit("'tcpdump' コマンドが見つかりません。インストールしてください。")
        # return TEMP_PORT_LIST, TEMP_SCAN_COUNTS # error_exitはsys.exitを呼ぶので到達しない

    if not os.path.exists(TEMP_PORT_LIST) or os.path.getsize(TEMP_PORT_LIST) == 0:
        print("関連するSYN/SYN-ACKパケットが見つからなかったか、ポートリストへの解析に失敗しました。")
        sys.stdout.flush()
        open(TEMP_SCAN_COUNTS, 'w').close()
        return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

    # temp_port_list をソートし、uniq でユニークな IP とポートの組み合わせを数え、
    # awk で IP だけを抽出し、再度 uniq -c で IP ごとのユニークなポート数を数え、
    # 最後に降順ソートして temp_scan_counts に保存
    # このコマンドは、シェルによって解釈される必要があるため shell=True が必要
    count_command = f"sort '{TEMP_PORT_LIST}' | uniq | awk '{{print $1}}' | uniq -c | sort -nr > '{TEMP_SCAN_COUNTS}'"
    try:
        subprocess.run(count_command, shell=True, check=True, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError as e:
        print(f"スキャンカウント処理中にエラーが発生しました: {e.stderr.decode(errors='ignore') if e.stderr else 'N/A'}", file=sys.stderr)
        sys.stderr.flush()
        open(TEMP_SCAN_COUNTS, 'w').close() # エラー時にもファイルは空にする
    except FileNotFoundError as e:
        # shell=Trueの場合、これは通常発生しない。コマンドが見つからないエラーはshellがハンドルする
        error_exit(f"カウント処理パイプラインのコマンド (sort, uniq, awk) が見つかりません: {e}")

    return TEMP_PORT_LIST, TEMP_SCAN_COUNTS

def check_for_scans(scan_counts_file_path):
    if not os.path.exists(scan_counts_file_path) or os.path.getsize(scan_counts_file_path) == 0:
        return

    print("ポートスキャンをチェックしています...")
    sys.stdout.flush()
    with open(scan_counts_file_path, 'r') as f:
        for line in f:
            try:
                parts = line.strip().split()
                if len(parts) == 2:
                    count_str, ip = parts
                    count = int(count_str)
                    if count >= THRESHOLD:
                        print("--- !!! ポートスキャン警告 !!! ---")
                        sys.stdout.flush()
                        print(f"IPアドレス: {ip} からのポートスキャンの可能性があります。")
                        sys.stdout.flush()
                        print(f"過去 {DURATION} 秒間に {count} 個の異なるポートへアクセスがありました。")
                        sys.stdout.flush()
                        print("-------------------------------")
                        sys.stdout.flush()
            except ValueError:
                print(f"警告: スキャンカウントファイルの行の解析に失敗しました: {line.strip()}", file=sys.stderr)
                sys.stderr.flush()

# --- メイン処理 ---
if __name__ == "__main__":
    if os.geteuid() != 0:
        error_exit("このスクリプトはroot権限で実行する必要があります。 (例: sudo python3 script.py eth0)")

    required_commands = ["tcpdump", "ip", "awk", "sort", "uniq"]
    for cmd in required_commands:
        if not shutil.which(cmd):
            error_exit(f"'{cmd}' コマンドが見つかりません。インストールされているか確認してください。")

    if len(sys.argv) < 2:
        usage()
    else:
        INTERFACE = sys.argv[1]

    try:
        # インターフェースの存在を確認し、詳細な情報を表示
        result = subprocess.run(
            ["ip", "link", "show", INTERFACE],
            capture_output=True,
            text=True,
            check=True
        )
        print(f"インターフェース '{INTERFACE}' の情報:\n{result.stdout}")
        sys.stdout.flush()
    except subprocess.CalledProcessError as e:
        error_exit(f"インターフェース '{INTERFACE}' が見つからないか、確認中にエラーが発生しました: {e.stderr.strip() if e.stderr else str(e)}")
    except FileNotFoundError:
        error_exit("'ip' コマンドが見つかりません。iproute2パッケージがインストールされているか確認してください。")

    LOCAL_IP = get_local_ip(INTERFACE)
    if not LOCAL_IP:
        error_exit(f"インターフェース '{INTERFACE}' のローカルIPアドレスを取得できませんでした。設定を確認してください。")

    print(f"監視インターフェース: {INTERFACE}")
    sys.stdout.flush()
    print(f"ローカルIPアドレス: {LOCAL_IP}")
    sys.stdout.flush()
    print(f"パケットキャプチャ時間: {DURATION} 秒")
    sys.stdout.flush()
    print(f"スキャン閾値: {THRESHOLD} 個の異なるポート / {DURATION} 秒 / 1送信元IP")
    sys.stdout.flush()
    print("Ctrl+C を押すと停止します。")
    sys.stdout.flush()

    signal.signal(signal.SIGINT, cleanup)
    signal.signal(signal.SIGTERM, cleanup)

    try:
        # メインループ:パケットキャプチャと解析を繰り返す
        while True:
            print("--- 新しい監視インターバルを開始 ---")
            sys.stdout.flush()
            time.sleep(1) # インターバル開始の出力がジャーナルに反映されるのを少し待つ

            # 各インターバルの開始時にグローバルな一時ファイル変数をリセット
            # これにより、万が一クリーンアップハンドラが不完全な状態で呼ばれた場合でも、
            # 誤ったファイルを参照するのを防ぐ。
            TEMP_PACKET_FILE = None
            TEMP_PORT_LIST = None
            TEMP_SCAN_COUNTS = None

            # 1. パケットキャプチャ実行
            #    capture_packets内でグローバル変数 TEMP_PACKET_FILE が設定される
            current_packet_file = capture_packets(INTERFACE, DURATION)

            # 2. キャプチャしたパケットの解析
            #    analyze_packets内でグローバル変数 TEMP_PORT_LIST と TEMP_SCAN_COUNTS が設定される
            current_port_list, current_scan_counts = analyze_packets(current_packet_file)

            # 3. ポートスキャンのチェック
            #    スキャンカウントファイルが実際に存在し、中身がある場合のみチェックを実行
            if os.path.isfile(current_scan_counts) and os.path.getsize(current_scan_counts) > 0:
                check_for_scans(current_scan_counts)
            else:
                print("このインターバルではスキャンデータがありませんでした。")
                sys.stdout.flush()

            # 4. このインターバルで使用した一時ファイルをクリーンアップ
            #    Ctrl+Cで中断されなかった場合の後始末。
            #    中断された場合はシグナルハンドラのcleanup関数が処理する。
            iter_files_to_clean = [current_packet_file, current_port_list, current_scan_counts]
            for temp_f in iter_files_to_clean:
                if temp_f and os.path.isfile(temp_f):
                    try:
                        os.remove(temp_f)
                    except OSError as e:
                        print(f"イテレーション中の一次ファイル {temp_f} の削除エラー: {e}", file=sys.stderr)
                        sys.stderr.flush()

            # 次のイテレーションに備えて、グローバル変数もクリア(より安全に)
            TEMP_PACKET_FILE, TEMP_PORT_LIST, TEMP_SCAN_COUNTS = None, None, None

    except KeyboardInterrupt:
        # Ctrl+C が押された場合 (通常はsignal_handlerが処理するが、念のため)
        print("\nキーボード割り込みを受信しました。クリーンアップ処理を実行します...")
        sys.stdout.flush()
        cleanup()  # クリーンアップ関数を呼び出し
    except Exception as e:
        # その他の予期せぬエラーが発生した場合
        print(f"予期せぬエラーが発生しました: {e}", file=sys.stderr)
        sys.stderr.flush()
        cleanup()  # クリーンアップ処理を試みる
        sys.exit(1) # エラーコード1で終了

デーモン化

上記示したpythonスクリプトを、opt/scan_detector に配置

# mkdir /opt/scan_detector
# cp scan_detector.py /opt/scan_detector/
# chmod +x /opt/scan_detector/scan_detector.py

サービスファイル作成

/etc/systemd/system/scan-detector.service

中身 eno1 は、環境に合わせて変更する必要があります。

[Unit]
Description=Port Scan Detector Service
After=network-online.target

[Service]
ExecStart=/usr/bin/python3 /opt/scan_detector/scan_detector.py eno1
WorkingDirectory=/opt/scan_detector/
Restart=on-failure
User=root
Group=root
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

新しいサービスファイルを認識させるために、systemd の設定をリロードします。

sudo systemctl daemon-reload

サービスを有効化

# systemctl enable scan-detector.service
Created symlink /etc/systemd/system/multi-user.target.wants/scan-detector.service → /etc/systemd/system/scan-detector.service.

サービスを起動

systemctl start scan-detector.service

サービス起動状況の確認

systemctl status scan-detector.service

root@TestDB:/home# systemctl status scan-detector.service
● scan-detector.service - Port Scan Detector Service
     Loaded: loaded (/etc/systemd/system/scan-detector.service; enabled; preset: enabled)
     Active: active (running) since Wed 2025-05-21 09:35:04 JST; 33s ago
   Main PID: 396726 (python3)
      Tasks: 2 (limit: 18955)
     Memory: 7.2M
        CPU: 25ms
     CGroup: /system.slice/scan-detector.service
             tq396726 /usr/bin/python3 /opt/scan_detector/scan_detector.py eno1
             mq396729 tcpdump -i eno1 -n -G 10 -W 1 -w /tmp/tcpdump_scan_detect_packets.pcap.qk92qj3k "(tcp[13] & 2) !=>

 5月 21 09:35:04 TestDB systemd[1]: Started scan-detector.service - Port Scan Detector Service.

ポートスキャンを検知してみる

# journalctl -u scan-detector.service -f
 5月 21 09:56:48 TestDB python3[397428]: 2: eno1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP mode DEFAULT group default qlen 1000
 5月 21 09:56:48 TestDB python3[397428]:     link/ether 5c:ba:2c:59:bc:80 brd ff:ff:ff:ff:ff:ff
 5月 21 09:56:48 TestDB python3[397428]:     altname enp2s0f0
 5月 21 09:56:48 TestDB python3[397428]: 監視インターフェース: eno1
 5月 21 09:56:48 TestDB python3[397428]: ローカルIPアドレス: 192.168.0.7
 5月 21 09:56:48 TestDB python3[397428]: パケットキャプチャ時間: 10 秒
 5月 21 09:56:48 TestDB python3[397428]: スキャン閾値: 20 個の異なるポート / 10 秒 / 1送信元IP
 5月 21 09:56:48 TestDB python3[397428]: Ctrl+C を押すと停止します。
 5月 21 09:56:48 TestDB python3[397428]: --- 新しい監視インターバルを開始 ---
 5月 21 09:56:49 TestDB python3[397428]: 10秒間、tcpdump_scan_detect_packets.pcap.pvpadoav へパケットをキャプチャしてい ます...
 5月 21 09:57:17 TestDB python3[397428]: tcpdump_scan_detect_packets.pcap.pvpadoav からパケットを解析しています...
 5月 21 09:57:17 TestDB python3[397428]: 関連するSYN/SYN-ACKパケットが見つからなかったか、ポートリストへの解析に失敗しました。
 5月 21 09:57:17 TestDB python3[397428]: このインターバルではスキャンデータがありませんでした。
 5月 21 09:57:17 TestDB python3[397428]: --- 新しい監視インターバルを開始 ---
 5月 21 09:57:18 TestDB python3[397428]: 10秒間、tcpdump_scan_detect_packets.pcap.tj7rkmxj へパケットをキャプチャしてい ます...
 5月 21 09:57:31 TestDB python3[397428]: tcpdump_scan_detect_packets.pcap.tj7rkmxj からパケットを解析しています...
 5月 21 09:57:31 TestDB python3[397428]: ポートスキャンをチェックしています...
 5月 21 09:57:31 TestDB python3[397428]: --- !!! ポートスキャン警告 !!! ---
 5月 21 09:57:31 TestDB python3[397428]: IPアドレス: 192.168.0.56 からのポートスキャンの可能性があります。
 5月 21 09:57:31 TestDB python3[397428]: 過去 10 秒間に 1000 個の異なるポートへアクセスがありました。
 5月 21 09:57:31 TestDB python3[397428]: -------------------------------

スクリプトを調整する

スクリプトを調整した際にデーモンを再起動する方法です

# systemctl restart scan-detector.servic

過去ログの確認方法

以下のコマンドにより過去のログを探る事ができます。

# journalctl -u scan-detector.service

気になるログを発見したので、他にも同じスキャンを受けているか確認

# journalctl -u scan-detector.service | grep "192.168.0.7 からのポートスキャン"
 5月 21 14:47:05 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 22 09:52:25 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 22 10:12:53 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 22 15:17:08 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 22 16:15:04 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 22 17:40:50 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 23 09:13:10 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
 5月 23 09:54:02 TestDB python3[397428]: IPアドレス: 192.168.0.7 からのポートスキャンの可能性があります。
タイトルとURLをコピーしました