# [メタ情報] # 識別子: ファイルパス変更処理_gasからpython移行_exe # システム名: ファイルパス変更処理_gasからpython移行、M1Mac上で常時稼働 # 技術種別: Misc # 機能名: Misc # 使用言語: Python 自動起動スクリプト.plist # 状態: 実行用 # [/メタ情報] 要約:file_rename_watcher.pyはmmedia/pmediaの移動・リネームを監視し、変更前後パス・JST時刻・更新フラグをスプレッドシートに追記。ヘッダー保持、(A,B)重複は古い方のみ残し、更新日時降順に整形。watchdog/gspread使用。launchdのplistで常駐起動。compare_and_notify.pyはpytest_sheet1とsheet1を基準時刻以降で比較し、差分があればApple Mailで通知。こちらもplistで5分ごとに実行。 Mac mini 起動 └── LaunchAgent(com.xxxxxxxx.filewatcher.plist)起動 └── file_rename_watcher.py 常駐開始 ├── mmedia / pmedia の変更をリアルタイム検出 ├── Google Sheets「sheet1」に即時反映 └── ~/Library/Logs/folder_watch.log に記録 /Users/xxxxxxxxm1/python_scripts/file_rename_watcher.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ file_rename_watcher.py - mmedia / pmedia を監視し、ファイル/フォルダの移動・リネーム(MOVED)を検出 - Google スプレッドシートへ [A=変更前, B=変更後, C=JST更新時刻, D=TRUE] を追記 - その後、(A,B)重複は「古い方のみ残す」に正規化して C 降順に整列 - sheet1 は 1 行目にヘッダーを保持 依存: pip install gspread google-auth watchdog """ import os, time, logging from datetime import datetime, timezone, timedelta from pathlib import Path from typing import List, Tuple, Dict # ========= 設定(ここだけ変えればOK) ========= # --- 監視対象ディレクトリ --- WATCH_DIRS = [ "/Volumes/NO3_SSD/Dropbox/dropbox_1/mmedia", "/Volumes/NO3_SSD/Dropbox/dropbox_1/pmedia", ] # --- ログファイル --- # テスト運転: ~/Library/Logs/pytest_folder_watch.log # 本番運転 : ~/Library/Logs/folder_watch.log LOG_FILE = os.path.expanduser("~/Library/Logs/pytest_folder_watch.log") # --- Google Sheets --- SPREADSHEET_ID = "<あなたのID>" # テスト運転: "pytest_sheet1" # 本番運転 : "sheet1" SHEET_NAME = "pytest_sheet1" # シートの1行目にヘッダーがあるか HAS_HEADER = True # ヘッダー内容(本番 sheet1 の1行目) HEADERS = ["変更前パス", "変更後パス", "更新日時", "更新フラグ"] # --- 認証キー(JSON) --- SERVICE_ACCOUNT_JSON = "/Users/xxxxxxxxm1/gcp_keys/f3_rename_sa.json" # --- 無視フィルタ --- IGNORE_SUFFIXES = (".DS_Store",) IGNORE_SUBSTRINGS = ("/.dropbox", "/.Trash", "/.~", "/.TemporaryItems") # ========= ロギング ========= os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) # ========= Google Sheets クライアント ========= import gspread from google.oauth2.service_account import Credentials SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive", ] def get_sheet(): creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_JSON, scopes=SCOPES) gc = gspread.authorize(creds) sh = gc.open_by_key(SPREADSHEET_ID) try: ws = sh.worksheet(SHEET_NAME) except gspread.WorksheetNotFound: ws = sh.add_worksheet(title=SHEET_NAME, rows=1000, cols=4) return ws def jst_now_str() -> str: return datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S") # ========= シート整形ユーティリティ ========= def ensure_header(ws): """ヘッダーが必要なら A1 に HEADERS を保証""" if not HAS_HEADER: return vals = ws.get_all_values() if not vals: ws.update("A1", [HEADERS]) return cur = (vals[0] + [""] * 4)[:4] if cur != HEADERS: ws.update("A1", [HEADERS]) def load_sheet(ws) -> Tuple[List[str], List[List[str]]]: """(header, rows) を返す。HAS_HEADER=False のとき header=[]""" vals = ws.get_all_values() if not vals: return (HEADERS if HAS_HEADER else []), [] if HAS_HEADER: header = (vals[0] + [""] * 4)[:4] rows = vals[1:] else: header = [] rows = vals rows = [r for r in rows if any(str(c).strip() for c in r)] return header, rows def _parse_dt(s: str) -> datetime: for f in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(s, f) except Exception: pass return datetime(1970, 1, 1) def dedupe_oldest_keep_only(rows: List[List[str]]) -> List[List[str]]: """ (A,B)が同一の重複は『古い方のみ残す』。 最後に C(更新日時) の降順(新しいほど上)に並べる。 """ norm = [(r + [""] * 4)[:4] for r in rows] keep: Dict[Tuple[str, str], Tuple[str, str, datetime, str]] = {} for r in norm: a, b, c, d = r t = _parse_dt(c) key = (a, b) if key not in keep or t < keep[key][2]: keep[key] = (a, b, t, d or "TRUE") out = [[a, b, t.strftime("%Y-%m-%d %H:%M:%S"), d] for (a, b, t, d) in keep.values()] out.sort(key=lambda r: r[2], reverse=True) return out def append_and_cleanup(before_path: str, after_path: str, when_str: str): ws = get_sheet() ensure_header(ws) ws.append_row([before_path, after_path, when_str, "TRUE"], value_input_option="RAW") header, rows = load_sheet(ws) deduped = dedupe_oldest_keep_only(rows) if HAS_HEADER: ws.clear() ws.update("A1", [HEADERS]) if deduped: ws.update("A2", deduped, value_input_option="RAW") else: ws.clear() if deduped: ws.update("A1", deduped, value_input_option="RAW") # ========= 監視パート ========= from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent def is_ignored(path: str) -> bool: if any(s in path for s in IGNORE_SUBSTRINGS): return True if path.endswith(IGNORE_SUFFIXES): return True return False class RenameHandler(FileSystemEventHandler): def on_moved(self, event: FileSystemMovedEvent): try: src = event.src_path dst = event.dest_path if is_ignored(src) or is_ignored(dst): return ts = jst_now_str() logging.info(f"MOVED src={src} dst={dst} ts={ts}") append_and_cleanup(src, dst, ts) except Exception as e: logging.exception(f"on_moved error: {e}") def main(): event_handler = RenameHandler() observer = Observer() scheduled = 0 for d in WATCH_DIRS: if os.path.exists(d): observer.schedule(event_handler, d, recursive=True) scheduled += 1 else: logging.warning(f"watch dir not found: {d}") if scheduled == 0: logging.error("no watch dirs scheduled; exiting") return observer.start() logging.info(f"watch started on {scheduled} dirs") try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() except Exception as e: logging.exception(f"main loop error: {e}") observer.stop() observer.join() if __name__ == "__main__": main() file_rename_watcher.py自動起動させるplist /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.filewatcher.plist Label com.xxxxxxxx.filewatcher ProgramArguments /Users/xxxxxxxxm1/venvs/gsheets/bin/python3 /Users/xxxxxxxxm1/python_scripts/file_rename_watcher.py RunAtLoad KeepAlive StandardOutPath /Users/xxxxxxxxm1/Library/Logs/filewatcher.out.log StandardErrorPath /Users/xxxxxxxxm1/Library/Logs/filewatcher.err.log GAS処理方式とPython処理方式の結果に差異があれば異常通知メール /Users/xxxxxxxxm1/python_scripts/compare_and_notify.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ compare_and_notify.py - 同一ブックの「pytest_sheet1」(新方式)と「sheet1」(旧方式)の “監視開始以降” の差分を比較 - 差分があればメール通知+ログ、差分がなければログのみ(静か) - 初回実行時に baseline(監視開始時刻)を現在時刻に設定して終了(以降、その時刻以降のみを比較) """ import os, json, subprocess, logging, sys from datetime import datetime, timezone, timedelta from typing import List, Tuple, Dict, Set # ====== 設定 ====== SPREADSHEET_ID = "<あなたのID>" SHEET_A = "pytest_sheet1" # 新方式(Python) SHEET_B = "sheet1" # 旧方式(GAS) SERVICE_ACCOUNT_JSON = "/Users/xxxxxxxxm1/gcp_keys/f3_rename_sa.json" STATE_DIR = os.path.expanduser("~/Library/Application Support/filewatcher") STATE_FILE = os.path.join(STATE_DIR, "compare_state.json") # baseline・最終実行結果など LOG_FILE = os.path.expanduser("~/Library/Logs/compare_filewatcher.log") # 通知先(Apple Mail を使って送る:macの「メール」アプリに設定済みアカウントから送信) MAIL_TO = "bbbbbbbb.yyyyyyyy@gmail.com" # ←宛先(変更) MAIL_FROM = "FileWatcher" # 送信者表示名(任意) MAIL_APP_ACCOUNT = None # 特定アカウントを指定したい場合はアカウント名を入れる。未指定ならデフォルト MAX_DIFF_LIST = 50 # 通知本文に載せる最大件数 # ====== 依存 ====== import gspread from google.oauth2.service_account import Credentials SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive", ] # ====== 共通ユーティリティ ====== def jst_now(): return datetime.now(timezone(timedelta(hours=9))) def jst_str(dt: datetime): return dt.astimezone(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S") def parse_time(s: str) -> datetime: # 受け取りは JST 文字列前提(watcherの出力と同じ) for f in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: dt = datetime.strptime(s, f) return dt.replace(tzinfo=timezone(timedelta(hours=9))) except Exception: pass # 解析できなければ十分古い時刻にする return datetime(1970,1,1,tzinfo=timezone(timedelta(hours=9))) def ensure_dirs(): os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) os.makedirs(STATE_DIR, exist_ok=True) # ====== ロギング ====== ensure_dirs() logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) # ====== Apple Mail で送信(パスワード不要) ====== def send_mail_applescript(subject: str, body: str, to_addr: str, from_name: str = "FileWatcher", account_name: str = None): # AppleScript 文字列を普通の文字列結合で生成する mailSubject = subject.replace('"', '\\"') mailContent = body.replace('"', '\\"') recipientAddress = to_addr as_script = ( 'on run\n' f' set mailSubject to "{mailSubject}"\n' f' set mailContent to "{mailContent}"\n' f' set recipientAddress to "{recipientAddress}"\n' ' tell application "Mail"\n' ' set theMessage to make new outgoing message with properties {subject:mailSubject, content:mailContent & "\\n"}\n' ' tell theMessage\n' ' make new to recipient at end of to recipients with properties {address:recipientAddress}\n' ' set visible to false\n' ) if account_name: as_script += f' set sender to "{account_name}"\n' as_script += ( ' end tell\n' ' send theMessage\n' ' end tell\n' 'end run\n' ) try: subprocess.run(["osascript", "-e", as_script], check=True) return True except subprocess.CalledProcessError as e: logging.error(f"osascript send failed: {e}") return False # ====== Google Sheets ====== def get_ws(sheet_name: str): creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_JSON, scopes=SCOPES) gc = gspread.authorize(creds) sh = gc.open_by_key(SPREADSHEET_ID) return sh.worksheet(sheet_name) def load_rows(ws, has_header=True) -> List[List[str]]: vals = ws.get_all_values() if not vals: return [] if has_header and len(vals) >= 2: vals = vals[1:] # 空行は除去 out = [] for r in vals: if any(str(c).strip() for c in r): # 4列に揃える rr = (r + [""]*4)[:4] out.append(rr) return out # ====== コア比較 ====== def filter_since(rows: List[List[str]], baseline: datetime) -> List[List[str]]: out = [] for a,b,c,d in rows: if parse_time(c) >= baseline: out.append([a,b,c,d]) return out def compare_sets(rows_a: List[List[str]], rows_b: List[List[str]]) -> Tuple[Set[Tuple[str,str]], Set[Tuple[str,str]]]: # 比較キーは (before, after)。必要なら timestamp の一致も追加可能 set_a = {(a,b) for a,b,_,_ in rows_a} set_b = {(a,b) for a,b,_,_ in rows_b} only_a = set_a - set_b only_b = set_b - set_a return only_a, only_b def main(): # 1) baseline のロード or 初期化 if os.path.exists(STATE_FILE): with open(STATE_FILE, "r") as f: state = json.load(f) else: state = {} if "baseline_jst" not in state: baseline = jst_now() # ここから監視開始 state["baseline_jst"] = jst_str(baseline) with open(STATE_FILE, "w") as f: json.dump(state, f, ensure_ascii=False, indent=2) logging.info(f"Initialized baseline at {state['baseline_jst']}; first run exits without compare.") print("Baseline initialized. Next run will compare records since:", state["baseline_jst"]) return baseline = parse_time(state["baseline_jst"]) # 2) データ取得 try: ws_a = get_ws(SHEET_A) ws_b = get_ws(SHEET_B) except Exception as e: logging.exception(f"Worksheet open failed: {e}") print("Open failed:", e) return rows_a = filter_since(load_rows(ws_a, has_header=True), baseline) rows_b = filter_since(load_rows(ws_b, has_header=True), baseline) only_a, only_b = compare_sets(rows_a, rows_b) # 3) 結果まとめ summary = (f"[Compare since {jst_str(baseline)}]\n" f"- {SHEET_A}: {len(rows_a)} rows\n" f"- {SHEET_B}: {len(rows_b)} rows\n" f"- only in {SHEET_A}: {len(only_a)}\n" f"- only in {SHEET_B}: {len(only_b)}\n") if not only_a and not only_b: logging.info("OK: no differences. " + summary.replace("\n", " ")) print("OK: no differences.\n" + summary) return # 差分あり → 詳細ログ&メール def fmt_pairs(pairs: Set[Tuple[str,str]], title: str) -> str: lines = [title] for i, (a,b) in enumerate(sorted(pairs)): if i >= MAX_DIFF_LIST: lines.append(f"... and {len(pairs)-MAX_DIFF_LIST} more") break lines.append(f"- {a} -> {b}") return "\n".join(lines) body = summary + "\n" if only_a: body += fmt_pairs(only_a, f"Only in {SHEET_A}:") + "\n" if only_b: body += fmt_pairs(only_b, f"Only in {SHEET_B}:") + "\n" logging.warning("DIFF DETECTED\n" + body.replace("\n"," | ")) subject = "【FileWatcher】差分検出(pytest vs sheet1)" ok = send_mail_applescript(subject, body, MAIL_TO, MAIL_FROM, MAIL_APP_ACCOUNT) print(("Mail sent." if ok else "Mail send failed."), "\n", body) if __name__ == "__main__": main() compare_and_notify.pyを自動起動させるplist /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.filewatcher.compare.plist Label com.xxxxxxxx.filewatcher.compare ProgramArguments /Users/xxxxxxxxm1/venvs/gsheets/bin/python3 /Users/xxxxxxxxm1/python_scripts/compare_and_notify.py StartInterval 300 RunAtLoad KeepAlive StandardOutPath /Users/xxxxxxxxm1/Library/Logs/compare_filewatcher.out.log StandardErrorPath /Users/xxxxxxxxm1/Library/Logs/compare_filewatcher.err.log