# [メタ情報] # 識別子: ファイルパス変更処理_gasからpython移行_exe # システム名: ファイルパス変更処理_gasからpython移行、M1Mac上で常時稼働 # 技術種別: Misc # 機能名: Misc # 使用言語: Python 自動起動スクリプト.plist # 状態: 実行用 # [/メタ情報] 要約:mmedia・pmedia フォルダを常時監視し、ファイルやフォルダの移動・リネーム(MOVED)を検出して記録する Python スクリプト。検出時には変更前・変更後パス、JST の更新時刻、更新フラグ TRUE を Google スプレッドシート(sheet1)へ追記し、その後シート全体を読み込んで (A,B) が同じ重複記録は「最も古いものだけ残す」形に正規化し、更新日時の降順で再整列して書き戻す。scripts_pub や .dropbox など特定パスは無視するフィルタを搭載。watchdog によりディレクトリを監視し続け、ログは ~/Library/Logs に記録。LaunchAgent(com.xxxxxxxx.filewatcher.plist)を用いて macOS 起動時に自動実行される構成となっている。 Mac mini 起動 └── LaunchAgent(com.xxxxxxxx.filewatcher.plist)起動 └── file_rename_watcher.py 常駐開始 ├── mmedia / pmedia の変更をリアルタイム検出 ├── Google Sheets「sheet1」に即時反映 └── ~/Library/Logs/folder_watch.log に記録 /Users/xxxxxxxxm1/python_scripts/file_rename_watcher.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ file_rename_watcher.py - mmedia / pmedia を監視し、ファイル/フォルダの移動・リネーム(MOVED)を検出 - Google スプレッドシートへ [A=変更前, B=変更後, C=JST更新時刻, D=TRUE] を追記 - その後、(A,B)重複は「古い方のみ残す」に正規化して C 降順に整列 - sheet1 は 1 行目にヘッダーを保持 依存: pip install gspread google-auth watchdog """ import os, time, logging from datetime import datetime, timezone, timedelta from pathlib import Path from typing import List, Tuple, Dict # ========= 設定(ここだけ変えればOK) ========= # --- 監視対象ディレクトリ --- WATCH_DIRS = [ "/Volumes/NO3_SSD/Dropbox/dropbox_1/mmedia", "/Volumes/NO3_SSD/Dropbox/dropbox_1/pmedia", ] # --- ログファイル --- # テスト運転: ~/Library/Logs/pytest_folder_watch.log # 本番運転 : ~/Library/Logs/folder_watch.log LOG_FILE = os.path.expanduser("~/Library/Logs/folder_watch.log") # --- Google Sheets --- SPREADSHEET_ID = "<あなたのID>" # テスト運転: "pytest_sheet1" # 本番運転 : "sheet1" SHEET_NAME = "sheet1" # シートの1行目にヘッダーがあるか HAS_HEADER = True # ヘッダー内容(本番 sheet1 の1行目) HEADERS = ["変更前パス", "変更後パス", "更新日時", "更新フラグ"] # --- 認証キー(JSON) --- SERVICE_ACCOUNT_JSON = "/Users/xxxxxxxxm1/gcp_keys/f3_rename_sa.json" # --- 無視フィルタ --- IGNORE_SUFFIXES = (".DS_Store",) IGNORE_SUBSTRINGS = ( "/.dropbox", "/.Trash", "/.~", "/.TemporaryItems", "/pmedia/scripts_pub/", # ★ scripts_pub は F3 対象外 ) # ========= ロギング ========= os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) logging.basicConfig( filename=LOG_FILE, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) # ========= Google Sheets クライアント ========= import gspread from google.oauth2.service_account import Credentials SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive", ] def get_sheet(): creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_JSON, scopes=SCOPES) gc = gspread.authorize(creds) sh = gc.open_by_key(SPREADSHEET_ID) try: ws = sh.worksheet(SHEET_NAME) except gspread.WorksheetNotFound: ws = sh.add_worksheet(title=SHEET_NAME, rows=1000, cols=4) return ws def jst_now_str() -> str: return datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S") # ========= シート整形ユーティリティ ========= def ensure_header(ws): """ヘッダーが必要なら A1 に HEADERS を保証""" if not HAS_HEADER: return vals = ws.get_all_values() if not vals: # ★ 新しい書式:values, range_name をキーワード引数で指定 ws.update(values=[HEADERS], range_name="A1") return cur = (vals[0] + [""] * 4)[:4] if cur != HEADERS: ws.update(values=[HEADERS], range_name="A1") def load_sheet(ws) -> Tuple[List[str], List[List[str]]]: """(header, rows) を返す。HAS_HEADER=False のとき header=[]""" vals = ws.get_all_values() if not vals: return (HEADERS if HAS_HEADER else []), [] if HAS_HEADER: header = (vals[0] + [""] * 4)[:4] rows = vals[1:] else: header = [] rows = vals # 空行を除外 rows = [r for r in rows if any(str(c).strip() for c in r)] return header, rows def _parse_dt(s: str) -> datetime: for f in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(s, f) except Exception: pass return datetime(1970, 1, 1) def dedupe_oldest_keep_only(rows: List[List[str]]) -> List[List[str]]: """ (A,B)が同一の重複は『古い方のみ残す』。 最後に C(更新日時) の降順(新しいほど上)に並べる。 """ norm = [(r + [""] * 4)[:4] for r in rows] keep: Dict[Tuple[str, str], Tuple[str, str, datetime, str]] = {} for r in norm: a, b, c, d = r t = _parse_dt(c) key = (a, b) if key not in keep or t < keep[key][2]: keep[key] = (a, b, t, d or "TRUE") out = [[a, b, t.strftime("%Y-%m-%d %H:%M:%S"), d] for (a, b, t, d) in keep.values()] out.sort(key=lambda r: r[2], reverse=True) return out def append_and_cleanup(before_path: str, after_path: str, when_str: str): ws = get_sheet() ensure_header(ws) # 新規行を末尾に追加 ws.append_row([before_path, after_path, when_str, "TRUE"], value_input_option="RAW") # 全体を読み込み → 正規化 → 書き戻し header, rows = load_sheet(ws) deduped = dedupe_oldest_keep_only(rows) if HAS_HEADER: ws.clear() # ★ ここも新しい書式に変更 ws.update(values=[HEADERS], range_name="A1") if deduped: ws.update(values=deduped, range_name="A2", value_input_option="RAW") else: ws.clear() if deduped: ws.update(values=deduped, range_name="A1", value_input_option="RAW") # ========= 監視パート ========= from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent def is_ignored(path: str) -> bool: if any(s in path for s in IGNORE_SUBSTRINGS): return True if path.endswith(IGNORE_SUFFIXES): return True return False class RenameHandler(FileSystemEventHandler): def on_moved(self, event: FileSystemMovedEvent): try: src = event.src_path dst = event.dest_path if is_ignored(src) or is_ignored(dst): return ts = jst_now_str() logging.info(f"MOVED src={src} dst={dst} ts={ts}") append_and_cleanup(src, dst, ts) except Exception as e: logging.exception(f"on_moved error: {e}") def main(): event_handler = RenameHandler() observer = Observer() scheduled = 0 for d in WATCH_DIRS: if os.path.exists(d): observer.schedule(event_handler, d, recursive=True) scheduled += 1 else: logging.warning(f"watch dir not found: {d}") if scheduled == 0: logging.error("no watch dirs scheduled; exiting") return observer.start() logging.info(f"watch started on {scheduled} dirs") try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() except Exception as e: logging.exception(f"main loop error: {e}") observer.stop() observer.join() if __name__ == "__main__": main() /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.filewatcher.plist Label com.xxxxxxxx.filewatcher ProgramArguments /usr/bin/python3 /Users/xxxxxxxxm1/python_scripts/file_rename_watcher.py RunAtLoad StandardOutPath /Users/xxxxxxxxm1/Library/Logs/filewatcher.log StandardErrorPath /Users/xxxxxxxxm1/Library/Logs/filewatcher_error.log