# [メタ情報]
# 識別子: ファイルパス変更処理_gasからpython移行_exe
# システム名: ファイルパス変更処理_gasからpython移行、M1Mac上で常時稼働
# 技術種別: Misc
# 機能名: Misc
# 使用言語: Python 自動起動スクリプト.plist
# 状態: 実行用
# [/メタ情報]
要約:file_rename_watcher.pyはmmedia/pmediaの移動・リネームを監視し、変更前後パス・JST時刻・更新フラグをスプレッドシートに追記。ヘッダー保持、(A,B)重複は古い方のみ残し、更新日時降順に整形。watchdog/gspread使用。launchdのplistで常駐起動。compare_and_notify.pyはpytest_sheet1とsheet1を基準時刻以降で比較し、差分があればApple Mailで通知。こちらもplistで5分ごとに実行。
Mac mini 起動
└── LaunchAgent(com.xxxxxxxx.filewatcher.plist)起動
└── file_rename_watcher.py 常駐開始
├── mmedia / pmedia の変更をリアルタイム検出
├── Google Sheets「sheet1」に即時反映
└── ~/Library/Logs/folder_watch.log に記録
/Users/xxxxxxxxm1/python_scripts/file_rename_watcher.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
file_rename_watcher.py
- mmedia / pmedia を監視し、ファイル/フォルダの移動・リネーム(MOVED)を検出
- Google スプレッドシートへ [A=変更前, B=変更後, C=JST更新時刻, D=TRUE] を追記
- その後、(A,B)重複は「古い方のみ残す」に正規化して C 降順に整列
- sheet1 は 1 行目にヘッダーを保持
依存:
pip install gspread google-auth watchdog
"""
import os, time, logging
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import List, Tuple, Dict
# ========= 設定(ここだけ変えればOK) =========
# --- 監視対象ディレクトリ ---
WATCH_DIRS = [
"/Volumes/NO3_SSD/Dropbox/dropbox_1/mmedia",
"/Volumes/NO3_SSD/Dropbox/dropbox_1/pmedia",
]
# --- ログファイル ---
# テスト運転: ~/Library/Logs/pytest_folder_watch.log
# 本番運転 : ~/Library/Logs/folder_watch.log
LOG_FILE = os.path.expanduser("~/Library/Logs/pytest_folder_watch.log")
# --- Google Sheets ---
SPREADSHEET_ID = "<あなたのID>"
# テスト運転: "pytest_sheet1"
# 本番運転 : "sheet1"
SHEET_NAME = "pytest_sheet1"
# シートの1行目にヘッダーがあるか
HAS_HEADER = True
# ヘッダー内容(本番 sheet1 の1行目)
HEADERS = ["変更前パス", "変更後パス", "更新日時", "更新フラグ"]
# --- 認証キー(JSON) ---
SERVICE_ACCOUNT_JSON = "/Users/xxxxxxxxm1/gcp_keys/f3_rename_sa.json"
# --- 無視フィルタ ---
IGNORE_SUFFIXES = (".DS_Store",)
IGNORE_SUBSTRINGS = ("/.dropbox", "/.Trash", "/.~", "/.TemporaryItems")
# ========= ロギング =========
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
# ========= Google Sheets クライアント =========
import gspread
from google.oauth2.service_account import Credentials
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
def get_sheet():
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_JSON, scopes=SCOPES)
gc = gspread.authorize(creds)
sh = gc.open_by_key(SPREADSHEET_ID)
try:
ws = sh.worksheet(SHEET_NAME)
except gspread.WorksheetNotFound:
ws = sh.add_worksheet(title=SHEET_NAME, rows=1000, cols=4)
return ws
def jst_now_str() -> str:
return datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S")
# ========= シート整形ユーティリティ =========
def ensure_header(ws):
"""ヘッダーが必要なら A1 に HEADERS を保証"""
if not HAS_HEADER:
return
vals = ws.get_all_values()
if not vals:
ws.update("A1", [HEADERS])
return
cur = (vals[0] + [""] * 4)[:4]
if cur != HEADERS:
ws.update("A1", [HEADERS])
def load_sheet(ws) -> Tuple[List[str], List[List[str]]]:
"""(header, rows) を返す。HAS_HEADER=False のとき header=[]"""
vals = ws.get_all_values()
if not vals:
return (HEADERS if HAS_HEADER else []), []
if HAS_HEADER:
header = (vals[0] + [""] * 4)[:4]
rows = vals[1:]
else:
header = []
rows = vals
rows = [r for r in rows if any(str(c).strip() for c in r)]
return header, rows
def _parse_dt(s: str) -> datetime:
for f in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s, f)
except Exception:
pass
return datetime(1970, 1, 1)
def dedupe_oldest_keep_only(rows: List[List[str]]) -> List[List[str]]:
"""
(A,B)が同一の重複は『古い方のみ残す』。
最後に C(更新日時) の降順(新しいほど上)に並べる。
"""
norm = [(r + [""] * 4)[:4] for r in rows]
keep: Dict[Tuple[str, str], Tuple[str, str, datetime, str]] = {}
for r in norm:
a, b, c, d = r
t = _parse_dt(c)
key = (a, b)
if key not in keep or t < keep[key][2]:
keep[key] = (a, b, t, d or "TRUE")
out = [[a, b, t.strftime("%Y-%m-%d %H:%M:%S"), d] for (a, b, t, d) in keep.values()]
out.sort(key=lambda r: r[2], reverse=True)
return out
def append_and_cleanup(before_path: str, after_path: str, when_str: str):
ws = get_sheet()
ensure_header(ws)
ws.append_row([before_path, after_path, when_str, "TRUE"], value_input_option="RAW")
header, rows = load_sheet(ws)
deduped = dedupe_oldest_keep_only(rows)
if HAS_HEADER:
ws.clear()
ws.update("A1", [HEADERS])
if deduped:
ws.update("A2", deduped, value_input_option="RAW")
else:
ws.clear()
if deduped:
ws.update("A1", deduped, value_input_option="RAW")
# ========= 監視パート =========
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent
def is_ignored(path: str) -> bool:
if any(s in path for s in IGNORE_SUBSTRINGS):
return True
if path.endswith(IGNORE_SUFFIXES):
return True
return False
class RenameHandler(FileSystemEventHandler):
def on_moved(self, event: FileSystemMovedEvent):
try:
src = event.src_path
dst = event.dest_path
if is_ignored(src) or is_ignored(dst):
return
ts = jst_now_str()
logging.info(f"MOVED src={src} dst={dst} ts={ts}")
append_and_cleanup(src, dst, ts)
except Exception as e:
logging.exception(f"on_moved error: {e}")
def main():
event_handler = RenameHandler()
observer = Observer()
scheduled = 0
for d in WATCH_DIRS:
if os.path.exists(d):
observer.schedule(event_handler, d, recursive=True)
scheduled += 1
else:
logging.warning(f"watch dir not found: {d}")
if scheduled == 0:
logging.error("no watch dirs scheduled; exiting")
return
observer.start()
logging.info(f"watch started on {scheduled} dirs")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
except Exception as e:
logging.exception(f"main loop error: {e}")
observer.stop()
observer.join()
if __name__ == "__main__":
main()
file_rename_watcher.py自動起動させるplist
/Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.filewatcher.plist
Label
com.xxxxxxxx.filewatcher
ProgramArguments
/Users/xxxxxxxxm1/venvs/gsheets/bin/python3
/Users/xxxxxxxxm1/python_scripts/file_rename_watcher.py
RunAtLoad
KeepAlive
StandardOutPath
/Users/xxxxxxxxm1/Library/Logs/filewatcher.out.log
StandardErrorPath
/Users/xxxxxxxxm1/Library/Logs/filewatcher.err.log
GAS処理方式とPython処理方式の結果に差異があれば異常通知メール
/Users/xxxxxxxxm1/python_scripts/compare_and_notify.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
compare_and_notify.py
- 同一ブックの「pytest_sheet1」(新方式)と「sheet1」(旧方式)の “監視開始以降” の差分を比較
- 差分があればメール通知+ログ、差分がなければログのみ(静か)
- 初回実行時に baseline(監視開始時刻)を現在時刻に設定して終了(以降、その時刻以降のみを比較)
"""
import os, json, subprocess, logging, sys
from datetime import datetime, timezone, timedelta
from typing import List, Tuple, Dict, Set
# ====== 設定 ======
SPREADSHEET_ID = "<あなたのID>"
SHEET_A = "pytest_sheet1" # 新方式(Python)
SHEET_B = "sheet1" # 旧方式(GAS)
SERVICE_ACCOUNT_JSON = "/Users/xxxxxxxxm1/gcp_keys/f3_rename_sa.json"
STATE_DIR = os.path.expanduser("~/Library/Application Support/filewatcher")
STATE_FILE = os.path.join(STATE_DIR, "compare_state.json") # baseline・最終実行結果など
LOG_FILE = os.path.expanduser("~/Library/Logs/compare_filewatcher.log")
# 通知先(Apple Mail を使って送る:macの「メール」アプリに設定済みアカウントから送信)
MAIL_TO = "bbbbbbbb.yyyyyyyy@gmail.com" # ←宛先(変更)
MAIL_FROM = "FileWatcher" # 送信者表示名(任意)
MAIL_APP_ACCOUNT = None # 特定アカウントを指定したい場合はアカウント名を入れる。未指定ならデフォルト
MAX_DIFF_LIST = 50 # 通知本文に載せる最大件数
# ====== 依存 ======
import gspread
from google.oauth2.service_account import Credentials
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
# ====== 共通ユーティリティ ======
def jst_now():
return datetime.now(timezone(timedelta(hours=9)))
def jst_str(dt: datetime):
return dt.astimezone(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S")
def parse_time(s: str) -> datetime:
# 受け取りは JST 文字列前提(watcherの出力と同じ)
for f in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
dt = datetime.strptime(s, f)
return dt.replace(tzinfo=timezone(timedelta(hours=9)))
except Exception:
pass
# 解析できなければ十分古い時刻にする
return datetime(1970,1,1,tzinfo=timezone(timedelta(hours=9)))
def ensure_dirs():
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
os.makedirs(STATE_DIR, exist_ok=True)
# ====== ロギング ======
ensure_dirs()
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
# ====== Apple Mail で送信(パスワード不要) ======
def send_mail_applescript(subject: str, body: str, to_addr: str, from_name: str = "FileWatcher", account_name: str = None):
# AppleScript 文字列を普通の文字列結合で生成する
mailSubject = subject.replace('"', '\\"')
mailContent = body.replace('"', '\\"')
recipientAddress = to_addr
as_script = (
'on run\n'
f' set mailSubject to "{mailSubject}"\n'
f' set mailContent to "{mailContent}"\n'
f' set recipientAddress to "{recipientAddress}"\n'
' tell application "Mail"\n'
' set theMessage to make new outgoing message with properties {subject:mailSubject, content:mailContent & "\\n"}\n'
' tell theMessage\n'
' make new to recipient at end of to recipients with properties {address:recipientAddress}\n'
' set visible to false\n'
)
if account_name:
as_script += f' set sender to "{account_name}"\n'
as_script += (
' end tell\n'
' send theMessage\n'
' end tell\n'
'end run\n'
)
try:
subprocess.run(["osascript", "-e", as_script], check=True)
return True
except subprocess.CalledProcessError as e:
logging.error(f"osascript send failed: {e}")
return False
# ====== Google Sheets ======
def get_ws(sheet_name: str):
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_JSON, scopes=SCOPES)
gc = gspread.authorize(creds)
sh = gc.open_by_key(SPREADSHEET_ID)
return sh.worksheet(sheet_name)
def load_rows(ws, has_header=True) -> List[List[str]]:
vals = ws.get_all_values()
if not vals:
return []
if has_header and len(vals) >= 2:
vals = vals[1:]
# 空行は除去
out = []
for r in vals:
if any(str(c).strip() for c in r):
# 4列に揃える
rr = (r + [""]*4)[:4]
out.append(rr)
return out
# ====== コア比較 ======
def filter_since(rows: List[List[str]], baseline: datetime) -> List[List[str]]:
out = []
for a,b,c,d in rows:
if parse_time(c) >= baseline:
out.append([a,b,c,d])
return out
def compare_sets(rows_a: List[List[str]], rows_b: List[List[str]]) -> Tuple[Set[Tuple[str,str]], Set[Tuple[str,str]]]:
# 比較キーは (before, after)。必要なら timestamp の一致も追加可能
set_a = {(a,b) for a,b,_,_ in rows_a}
set_b = {(a,b) for a,b,_,_ in rows_b}
only_a = set_a - set_b
only_b = set_b - set_a
return only_a, only_b
def main():
# 1) baseline のロード or 初期化
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as f:
state = json.load(f)
else:
state = {}
if "baseline_jst" not in state:
baseline = jst_now() # ここから監視開始
state["baseline_jst"] = jst_str(baseline)
with open(STATE_FILE, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
logging.info(f"Initialized baseline at {state['baseline_jst']}; first run exits without compare.")
print("Baseline initialized. Next run will compare records since:", state["baseline_jst"])
return
baseline = parse_time(state["baseline_jst"])
# 2) データ取得
try:
ws_a = get_ws(SHEET_A)
ws_b = get_ws(SHEET_B)
except Exception as e:
logging.exception(f"Worksheet open failed: {e}")
print("Open failed:", e)
return
rows_a = filter_since(load_rows(ws_a, has_header=True), baseline)
rows_b = filter_since(load_rows(ws_b, has_header=True), baseline)
only_a, only_b = compare_sets(rows_a, rows_b)
# 3) 結果まとめ
summary = (f"[Compare since {jst_str(baseline)}]\n"
f"- {SHEET_A}: {len(rows_a)} rows\n"
f"- {SHEET_B}: {len(rows_b)} rows\n"
f"- only in {SHEET_A}: {len(only_a)}\n"
f"- only in {SHEET_B}: {len(only_b)}\n")
if not only_a and not only_b:
logging.info("OK: no differences. " + summary.replace("\n", " "))
print("OK: no differences.\n" + summary)
return
# 差分あり → 詳細ログ&メール
def fmt_pairs(pairs: Set[Tuple[str,str]], title: str) -> str:
lines = [title]
for i, (a,b) in enumerate(sorted(pairs)):
if i >= MAX_DIFF_LIST:
lines.append(f"... and {len(pairs)-MAX_DIFF_LIST} more")
break
lines.append(f"- {a} -> {b}")
return "\n".join(lines)
body = summary + "\n"
if only_a:
body += fmt_pairs(only_a, f"Only in {SHEET_A}:") + "\n"
if only_b:
body += fmt_pairs(only_b, f"Only in {SHEET_B}:") + "\n"
logging.warning("DIFF DETECTED\n" + body.replace("\n"," | "))
subject = "【FileWatcher】差分検出(pytest vs sheet1)"
ok = send_mail_applescript(subject, body, MAIL_TO, MAIL_FROM, MAIL_APP_ACCOUNT)
print(("Mail sent." if ok else "Mail send failed."), "\n", body)
if __name__ == "__main__":
main()
compare_and_notify.pyを自動起動させるplist
/Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.filewatcher.compare.plist
Label
com.xxxxxxxx.filewatcher.compare
ProgramArguments
/Users/xxxxxxxxm1/venvs/gsheets/bin/python3
/Users/xxxxxxxxm1/python_scripts/compare_and_notify.py
StartInterval
300
RunAtLoad
KeepAlive
StandardOutPath
/Users/xxxxxxxxm1/Library/Logs/compare_filewatcher.out.log
StandardErrorPath
/Users/xxxxxxxxm1/Library/Logs/compare_filewatcher.err.log