# [メタ情報] # 識別子: DAS棚田管理_実データ更新処理_exe # システム名: DAS棚田管理_実データ更新処理 # 技術種別: Misc # 機能名: Misc # 使用言語: Python ShellScript # 状態: 実行用 # [/メタ情報] 要約: このシステムは、Google Sheetsをインターフェースとしたファイル管理の自動化パイプラインです。`tanada_apply_button_daemon.py`は、Google Sheets上の手動ボタン(A10)によるAPPLY要求のみを監視し、深夜時間帯を除いて`tanada_apply.py`を起動します。`apply_runner.sh`は、SheetsのA8セルからファイル移動計画DBのパスを読み込み、それを固定名の適用DBにコピーし、`tanada_apply.py`を実行するためのシェルスクリプトです。コアロジックを担う`tanada_apply.py`は、適用DBに記録されたアクション(主にファイルの移動)をDRYRUNまたはAPPLYモードで実行します。このスクリプトは、「現段位がT0ではないファイルを新段位T0へ移動する」ポリシー違反を最優先でチェックし、違反があれば直ちに処理を停止する安全機構を備えています。処理結果はSheetsの指定セルにフィードバックされます。`tanada_make_apply_plan_db.py`は、シミュレーション結果DBから、`tanada_apply.py`が使用する固定名の適用計画DBを生成し、古いDBは世代管理してアーカイブします。`com.xxxxxxxxm1.tanada_apply_button_daemon.plist`は、デーモンをmacOSのLaunchAgentとして自動起動・常駐させ、必要な環境変数を設定します。このパイプラインは、手動トリガーと厳格な安全ポリシーにより、ファイル移動の誤操作を防ぎながらも自動化を実現しています。 /Users/xxxxxxxxm1/das/tanada_apply_button_daemon.py ``` #!/usr/bin/env python3 from __future__ import annotations """ 【最重要:apply 起動制約(手動ボタンのみ)】 このデーモンは、Google Sheets の action_result シート上の 「A10に重ねた手動ボタン」が書き込む APPLY 要求セルだけを監視し、 そのセルが更新された場合に限り tanada_apply.py を起動する。 - 禁止:時間トリガー、定期実行、他セル(B2等)からの混線起動 - 許可:ユーザーが action_result シートの A10ボタンを手動で押した場合のみ 追加要件: - 「古すぎる plan_id を使った apply」を拒否して表示する(例:1時間以上前は中止) → ★固定の apply_plan DB ではなく、Sheets に表示されている “最新 plan DB(A8)” を見て判定する 【追加(不親切な勝手表示の防止)】 - REQUESTED が古い(例:前日など)場合は「押していない残骸」とみなし、 CANCEL表示を出さずに REQUESTED を黙って自動クリアする。 - 実行結果が DONE / CANCEL / ERROR のいずれでも、最後に REQUESTED を必ずクリアする。 """ import os import sys import time import subprocess import sqlite3 from pathlib import Path from datetime import datetime, time as dtime # ====== apply専用(simulateと混線させない)====== SHEET_NAME = "action_result" APPLY_REQ = "B10" # A10ボタンが書く要求セル(例:REQUESTED:20260113_140050) APPLY_STATUS = "A11" # 状態表示(短文) PLAN_PATH_CELL = "A8" # ★simulate が出した最新 plan DB パス(/Users/.../plan_....sqlite) POLL_SEC = 5 # =============================================== # ===== 静音時間帯設定(深夜は実行しない)===== QUIET_START = dtime(0, 55) QUIET_END = dtime(2, 10) QUIET_SLEEP = 60 # ============================================ # ===== 実行する apply 本体 ===== PYTHON = "/usr/bin/python3" APPLY_SCRIPT = "/Users/xxxxxxxxm1/python_scripts/tanada_apply.py" # ============================== # ===== 古すぎる plan を拒否する閾値(秒)===== MAX_PLAN_AGE_SEC = 3600 # 1時間 # ========================================== # ===== 古いREQUESTEDは無視して黙って消す(秒)===== MAX_REQUEST_AGE_SEC = 600 # 10分(これより古いREQUESTEDは「押していない残骸」扱い) # =============================================== def in_quiet_hours() -> bool: now = datetime.now().time() return QUIET_START <= now <= QUIET_END def sheets_read(cred_json: Path, sid: str, a1: str) -> str: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) r = service.spreadsheets().values().get(spreadsheetId=sid, range=a1).execute() vals = r.get("values", []) if not vals or not vals[0]: return "" return str(vals[0][0]) def sheets_write(cred_json: Path, sid: str, a1: str, value: str) -> None: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) service.spreadsheets().values().update( spreadsheetId=sid, range=a1, valueInputOption="RAW", body={"values": [[value]]}, ).execute() def parse_nonce(req: str) -> str: # 許可:APPLY:YYYYMMDD_HHMMSS / REQUESTED:YYYYMMDD_HHMMSS req = (req or "").strip() if req.startswith("APPLY:"): return req.split(":", 1)[1].strip() if req.startswith("REQUESTED:"): return req.split(":", 1)[1].strip() return "" def nonce_age_sec(nonce: str) -> int | None: """ nonce(YYYYMMDD_HHMMSS) の年齢(秒)を返す。読めなければ None。 """ nonce = (nonce or "").strip() try: dt = datetime.strptime(nonce, "%Y%m%d_%H%M%S") return int((datetime.now() - dt).total_seconds()) except Exception: return None def _parse_created_time_iso(s: str) -> datetime | None: s = (s or "").strip() if not s: return None # 例: 2026-01-13T13:05:21+09:00 try: return datetime.fromisoformat(s) except Exception: return None def get_latest_plan_age_sec(plan_db_path: str) -> int | None: """ plan_runs.created_time から最新 plan の年齢(秒)を返す。 """ p = Path(plan_db_path) if not p.exists(): return None con = None try: con = sqlite3.connect(str(p)) con.row_factory = sqlite3.Row row = con.execute( "SELECT plan_id, created_time FROM plan_runs ORDER BY created_time DESC LIMIT 1" ).fetchone() if not row: return None ct = _parse_created_time_iso(str(row["created_time"])) if ct is None: return None now = datetime.now().astimezone() # created_time に tz が無い場合はローカルとみなす if ct.tzinfo is None: ct = ct.replace(tzinfo=now.tzinfo) return int((now - ct).total_seconds()) except Exception: return None finally: try: if con is not None: con.close() except Exception: pass def count_planned_moves(plan_db_path: str) -> int | None: """ items_all の action='MOVE' 件数(計画MOVE件数)を返す。 """ p = Path(plan_db_path) if not p.exists(): return None con = None try: con = sqlite3.connect(str(p)) row = con.execute("SELECT COUNT(*) FROM items_all WHERE action='MOVE'").fetchone() return int(row[0]) if row else 0 except Exception: return None finally: try: if con is not None: con.close() except Exception: pass def run_apply(plan_db_path: str) -> tuple[int, str]: """ A10ボタン由来でのみ apply を起動する。 - MANUAL_RUN=1 を必ず付与(手動起動の印) - ★Sheets(A8)の最新 plan DB を --plan-db に渡す(固定DBは使わない) """ env = os.environ.copy() env["MANUAL_RUN"] = "1" p = subprocess.run( [ PYTHON, APPLY_SCRIPT, "--plan-db", plan_db_path, "--apply", "--sheet-id", os.environ.get("DAS_SHEET_ID", "").strip(), "--sheet-name", SHEET_NAME, "--status-cell", APPLY_STATUS, # A12は使わない "--comment-cell", "", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, ) return p.returncode, p.stdout def main() -> int: sid = os.environ.get("DAS_SHEET_ID", "").strip() cred = os.environ.get("DAS_CRED_JSON", "").strip() if not sid or not cred: print("[ERROR] env missing: DAS_SHEET_ID / DAS_CRED_JSON", file=sys.stderr, flush=True) return 2 cred_json = Path(cred).expanduser().resolve() if not cred_json.exists(): print(f"[ERROR] cred json not found: {cred_json}", file=sys.stderr, flush=True) return 2 a1_req = f"{SHEET_NAME}!{APPLY_REQ}" a1_status = f"{SHEET_NAME}!{APPLY_STATUS}" a1_plan_path = f"{SHEET_NAME}!{PLAN_PATH_CELL}" last_req = "" def clear_request() -> None: nonlocal last_req try: sheets_write(cred_json, sid, a1_req, "") except Exception: pass last_req = "" while True: try: if in_quiet_hours(): time.sleep(QUIET_SLEEP) continue req = sheets_read(cred_json, sid, a1_req).strip() if req and req != last_req: last_req = req nonce = parse_nonce(req) if not nonce: sheets_write(cred_json, sid, a1_status, f"CANCEL: bad request ({req})") clear_request() time.sleep(POLL_SEC) continue # ★古いREQUESTEDは「押していない残骸」扱い:表示せず黙って消す req_age = nonce_age_sec(nonce) if req_age is None: # nonceが読めない場合も誤作動防止で黙って消す(表示しない) clear_request() time.sleep(POLL_SEC) continue if req_age > MAX_REQUEST_AGE_SEC: # 前日などの残骸で勝手に表示されるのを防ぐ:無表示で消す clear_request() time.sleep(POLL_SEC) continue # ★Sheets に表示されている最新 plan DB を取得 plan_db_path = sheets_read(cred_json, sid, a1_plan_path).strip() if not plan_db_path: sheets_write(cred_json, sid, a1_status, f"CANCEL: plan path empty (nonce={nonce})") clear_request() time.sleep(POLL_SEC) continue plan_db_path = str(Path(plan_db_path).expanduser()) # ★古すぎる plan を拒否(A8の plan を見て判定) age = get_latest_plan_age_sec(plan_db_path) if age is None: sheets_write(cred_json, sid, a1_status, f"CANCEL: plan unreadable (nonce={nonce})") clear_request() time.sleep(POLL_SEC) continue if age > MAX_PLAN_AGE_SEC: # これは「押したが、simulateから時間が経ちすぎ」なので表示は出す(親切) sheets_write( cred_json, sid, a1_status, f"CANCEL: plan too old ({age}s) nonce={nonce}" ) clear_request() time.sleep(POLL_SEC) continue planned = count_planned_moves(plan_db_path) if planned is None: planned = -1 sheets_write(cred_json, sid, a1_status, f"RUNNING(APPLY): {nonce}") code, out = run_apply(plan_db_path) if code == 0: # ★ここでは “DRYRUN” とは書かない。apply を実行した事実を表示する。 if planned >= 0: sheets_write( cred_json, sid, a1_status, f"DONE(APPLY): {nonce}\n計画MOVE: {planned}" ) else: sheets_write( cred_json, sid, a1_status, f"DONE(APPLY): {nonce}" ) else: head = "\n".join(out.splitlines()[:25]) sheets_write(cred_json, sid, a1_status, f"ERROR(APPLY): {nonce}\n{head}") # ★どんな結果でもREQUESTEDは残さない(翌朝の勝手表示を根絶) clear_request() time.sleep(POLL_SEC) except Exception as e: # ★修正: シート(DAEMON_ERROR)には書き込まず、ログにのみ残してスルーする # これにより一時的な通信エラー等がシートに表示されなくなる print(f"[WARN] Loop error (ignored in sheet): {e}", file=sys.stderr, flush=True) # 異常時も要求が残っていると再反応し続ける可能性があるのでクリアを試みる try: clear_request() except Exception: pass time.sleep(POLL_SEC) return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/das/apply_runner.sh ``` #!/bin/bash set -euo pipefail export PATH="/usr/bin:/bin:/usr/sbin:/sbin" LOCK_DIR="$HOME/das/locks" mkdir -p "$LOCK_DIR" LOCK_FILE="$LOCK_DIR/apply_pipeline.lock" # ---- 設定(固定名DB)---- APPLY_PLAN_DB="$HOME/das/meta/apply_plan__tanada_items_ALL.sqlite" TANADA_APPLY_PY="$HOME/python_scripts/tanada_apply.py" # ------------------------ # Sheets 参照(環境変数は LaunchAgent から渡される想定) : "${DAS_SHEET_ID:?missing DAS_SHEET_ID}" : "${DAS_CRED_JSON:?missing DAS_CRED_JSON}" SHEET_NAME="${SHEET_NAME:-action_result}" PLAN_PATH_CELL="${PLAN_PATH_CELL:-A8}" # ★simulateが書く「planパス」 APPLY_STATUS_CELL="${APPLY_STATUS_CELL:-A11}" APPLY_COMMENT_CELL="${APPLY_COMMENT_CELL:-A12}" # 二重起動だけ防止(macOS: flock無し → mkdirロック) LOCK_DIR_PATH="${LOCK_DIR}/apply_pipeline.lockdir" if ! /bin/mkdir "${LOCK_DIR_PATH}" 2>/dev/null; then echo "[INFO] apply_runner: already running (lock=${LOCK_DIR_PATH})" >&2 exit 0 fi cleanup() { /bin/rmdir "${LOCK_DIR_PATH}" 2>/dev/null || true; } trap cleanup EXIT INT TERM # A8 から plan DB パスを取得(Pythonで読む) PLAN_DB=$(/usr/bin/python3 - <<'PY' import os from pathlib import Path from google.oauth2.service_account import Credentials from googleapiclient.discovery import build sid = os.environ["DAS_SHEET_ID"].strip() cred = os.environ["DAS_CRED_JSON"].strip() sheet_name = os.environ.get("SHEET_NAME", "action_result") cell = os.environ.get("PLAN_PATH_CELL", "A8") scopes = ["https://www.googleapis.com/auth/spreadsheets.readonly"] creds = Credentials.from_service_account_file(cred, scopes=scopes) svc = build("sheets","v4",credentials=creds,cache_discovery=False) a1 = f"{sheet_name}!{cell}" r = svc.spreadsheets().values().get(spreadsheetId=sid, range=a1).execute() vals = r.get("values", []) v = (vals[0][0] if vals and vals[0] else "").strip() print(v) PY ) if [ -z "${PLAN_DB}" ]; then echo "[ERROR] apply_runner: plan path is empty (cell=${SHEET_NAME}!${PLAN_PATH_CELL})" >&2 exit 2 fi # A8 が "…plan_XXXX.sqlite" そのものではなく説明文入りでも拾えるように最後のパスっぽいものを抜く if [ ! -f "$PLAN_DB" ]; then # 文字列中から /Users/...sqlite を抽出 EXTRACTED=$(echo "$PLAN_DB" | /usr/bin/perl -ne 'if(m#(/[^ ]+plan_[0-9_]+__tanada_items_ALL\.sqlite)#){print $1}') if [ -n "${EXTRACTED:-}" ] && [ -f "$EXTRACTED" ]; then PLAN_DB="$EXTRACTED" fi fi if [ ! -f "$PLAN_DB" ]; then echo "[ERROR] apply_runner: plan db not found: $PLAN_DB" >&2 exit 2 fi # apply_plan を更新(固定名へコピー) /bin/cp -a "$PLAN_DB" "$APPLY_PLAN_DB" || exit 2 [ -f "${PLAN_DB}-wal" ] && /bin/cp -a "${PLAN_DB}-wal" "${APPLY_PLAN_DB}-wal" || true [ -f "${PLAN_DB}-shm" ] && /bin/cp -a "${PLAN_DB}-shm" "${APPLY_PLAN_DB}-shm" || true # apply 実行(いまは安全のため self-check 付きのまま) /usr/bin/python3 "$TANADA_APPLY_PY" \ --plan-db "$APPLY_PLAN_DB" \ --apply \ --self-check \ --sheet-id "$DAS_SHEET_ID" \ --sheet-name "$SHEET_NAME" \ --status-cell "$APPLY_STATUS_CELL" \ --comment-cell "$APPLY_COMMENT_CELL" ``` /Users/xxxxxxxxm1/python_scripts/tanada_apply.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ tanada_apply.py - plan-db(apply_plan__tanada_items_ALL.sqlite) を読み、 items_all の action に従って DRYRUN / APPLY を行う。 - 方針: 「T0戻り禁止」を最優先で検査。違反があれば即終了。 - Sheets への表示先セルは --status-cell / --comment-cell で指定可能。 (例: --status-cell B10 --comment-cell A12) 前提: - sqlite の items_all に: plan_id, process_id, abs_path, to_abs_path, size_bytes, current_stage, new_stage, action が入っていること """ from __future__ import annotations import argparse import os import sys import sqlite3 import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional, List, Tuple # ---------------------------- # Utilities # ---------------------------- def ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def human_gb(num_bytes: int) -> str: return f"{num_bytes / (1024**3):.2f}GB" def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) # ---------------------------- # Google Sheets (optional) # ---------------------------- def try_write_sheets(sheet_id: str, sheet_name: str, status_cell: str, comment_cell: str, status_text: str, comment_text: str) -> bool: """ gspread を使って書き込み。環境が無ければ False。 必要条件の例: - pip install gspread google-auth - GOOGLE_APPLICATION_CREDENTIALS が service account json を指す """ try: import gspread # type: ignore from google.oauth2.service_account import Credentials # type: ignore except Exception: return False cred_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "") if not cred_path or not Path(cred_path).exists(): return False try: scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(cred_path, scopes=scopes) gc = gspread.authorize(creds) sh = gc.open_by_key(sheet_id) ws = sh.worksheet(sheet_name) ws.update_acell(status_cell, status_text) ws.update_acell(comment_cell, comment_text) return True except Exception: return False # ---------------------------- # DB access # ---------------------------- @dataclass class MoveItem: abs_path: str to_abs_path: str size_bytes: int current_stage: str new_stage: str action: str process_id: str def db_connect(db_path: Path) -> sqlite3.Connection: con = sqlite3.connect(str(db_path)) con.row_factory = sqlite3.Row return con def detect_plan_id(con: sqlite3.Connection, forced_plan_id: Optional[str]) -> str: if forced_plan_id: return forced_plan_id # plan_runs があれば最新(=唯一想定)を取る try: row = con.execute("SELECT plan_id FROM plan_runs ORDER BY created_time DESC LIMIT 1").fetchone() if row and row["plan_id"]: return str(row["plan_id"]) except Exception: pass # items_all から推測 row = con.execute("SELECT plan_id FROM items_all LIMIT 1").fetchone() if row and row["plan_id"]: return str(row["plan_id"]) raise RuntimeError("plan_id を特定できません。--plan-id を指定してください。") def fetch_t0_return_violations(con: sqlite3.Connection, plan_id: str) -> Tuple[int, int, List[str]]: """ ルール: 現段位が T0 ではないのに、新段位が T0 で action=MOVE は禁止 """ q = """ SELECT abs_path, size_bytes FROM items_all WHERE plan_id = ? AND UPPER(action) = 'MOVE' AND current_stage != 'T0' AND new_stage = 'T0' ORDER BY size_bytes DESC, abs_path """ rows = con.execute(q, (plan_id,)).fetchall() count = len(rows) total = sum(int(r["size_bytes"]) for r in rows) examples = [str(r["abs_path"]) for r in rows[:10]] return count, total, examples def fetch_move_items(con: sqlite3.Connection, plan_id: str, process_id: Optional[str]) -> List[MoveItem]: q = """ SELECT abs_path, to_abs_path, size_bytes, current_stage, new_stage, action, process_id FROM items_all WHERE plan_id = ? AND UPPER(action) = 'MOVE' AND to_abs_path IS NOT NULL AND to_abs_path != '' """ params: List[object] = [plan_id] if process_id: q += " AND process_id = ?" params.append(process_id) q += " ORDER BY abs_path" rows = con.execute(q, tuple(params)).fetchall() out: List[MoveItem] = [] for r in rows: out.append( MoveItem( abs_path=str(r["abs_path"]), to_abs_path=str(r["to_abs_path"]), size_bytes=int(r["size_bytes"]), current_stage=str(r["current_stage"]), new_stage=str(r["new_stage"]), action=str(r["action"]), process_id=str(r["process_id"]), ) ) return out # ---------------------------- # Apply (filesystem move) # ---------------------------- def ensure_parent_dir(dst: Path): dst.parent.mkdir(parents=True, exist_ok=True) def safe_move(src: Path, dst: Path): """ 同一ボリューム跨ぎも考慮して shutil.move を使用。 """ ensure_parent_dir(dst) shutil.move(str(src), str(dst)) # ---------------------------- # Main # ---------------------------- def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--plan-db", required=True, help="apply_plan__tanada_items_ALL.sqlite のパス") ap.add_argument("--plan-id", default="", help="plan_id を明示したい場合のみ") ap.add_argument("--process-id", default="", help="特定 process_id のみ対象にしたい場合") ap.add_argument("--dryrun", action="store_true", help="DRYRUN(実体移動なし)") ap.add_argument("--apply", action="store_true", help="APPLY(実体移動あり)") ap.add_argument("--self-check", action="store_true", help="方針チェックのみ(DBや実体は触らない)") # Sheets (optional) ap.add_argument("--sheet-id", default=os.environ.get("TANADA_SHEET_ID", ""), help="Google Sheet ID") ap.add_argument("--sheet-name", default=os.environ.get("TANADA_SHEET_NAME", "action_result"), help="Sheet tab name") ap.add_argument("--status-cell", default=os.environ.get("TANADA_STATUS_CELL", "B10"), help="status表示セル") ap.add_argument("--comment-cell", default=os.environ.get("TANADA_COMMENT_CELL", "A12"), help="comment表示セル") args = ap.parse_args() if args.apply and args.dryrun: eprint("ERROR: --apply と --dryrun は同時指定できません。") return 2 if not args.apply and not args.dryrun: # 互換: 何も指定されない時は dryrun 扱い args.dryrun = True mode = "APPLY" if args.apply else "DRYRUN" plan_db = Path(args.plan_db).expanduser() if not plan_db.exists(): eprint(f"ERROR: plan-db not found: {plan_db}") return 2 forced_plan_id = args.plan_id.strip() or None proc = args.process_id.strip() or None print("────────────────────────────────────────") print("[tanada_apply] START") print(f" time : {ts()}") print(f" mode : {mode}") print("────────────────────────────────────────") con = db_connect(plan_db) try: plan_id = detect_plan_id(con, forced_plan_id) # 1) Policy check: T0戻り禁止 v_count, v_total, v_examples = fetch_t0_return_violations(con, plan_id) if v_count > 0: # status/comment 文章(短め) status_text = f"ERROR: {plan_id} T0戻り禁止違反 {v_count}件 {human_gb(v_total)}" comment_lines = [ "⛔️ T0への戻りは禁止です(現!=T0 かつ 新=T0 の MOVE)", f"- plan_id: {plan_id}", f"- 検出: {v_count}件 / {human_gb(v_total)}", "- 代表例:", ] + [f" * {p}" for p in v_examples] comment_text = "\n".join(comment_lines) print("────────────────────────────────────────") print("⛔️ [POLICY] T0戻り禁止違反を検出しました") print("────────────────────────────────────────") print(comment_text) wrote = False if args.sheet_id: wrote = try_write_sheets( sheet_id=args.sheet_id, sheet_name=args.sheet_name, status_cell=args.status_cell, comment_cell=args.comment_cell, status_text=status_text, comment_text=comment_text, ) if not wrote: print("⚠️ [SHEETS] status/comment の書き込みに失敗(または未設定)でした。") print("⛔️ [POLICY] DRYRUN/APPLYともに、ここで終了します。DB出力・実体移動は行いません。") return 1 # self-check だけならここで終わり if args.self_check: status_text = f"OK: {plan_id} policy check passed" comment_text = "OK: T0戻り禁止違反なし" wrote = False if args.sheet_id: wrote = try_write_sheets( sheet_id=args.sheet_id, sheet_name=args.sheet_name, status_cell=args.status_cell, comment_cell=args.comment_cell, status_text=status_text, comment_text=comment_text, ) if wrote: print("✅ [SHEETS] policy check OK を書き込みました。") else: print("✅ policy check OK(Sheets未使用)") return 0 # 2) MOVE 実行(DRYRUNなら表示のみ) items = fetch_move_items(con, plan_id, proc) total_bytes = sum(i.size_bytes for i in items) print("────────────────────────────────────────") print(f"[PLAN] plan_id={plan_id} process_id={proc or '*'}") print(f"[PLAN] MOVE items: {len(items)} total: {human_gb(total_bytes)}") print("────────────────────────────────────────") if args.dryrun: # 最大30件だけ表示(画面爆発防止) max_show = 30 for i, it in enumerate(items[:max_show], start=1): print(f"[DRYRUN] {i:03d} {it.abs_path} -> {it.to_abs_path}") if len(items) > max_show: print(f"... ({len(items)-max_show} 件は省略)") status_text = f"OK: {plan_id} DRYRUN MOVE={len(items)} {human_gb(total_bytes)}" comment_text = "DRYRUN 完了(実体移動なし)" else: # APPLY moved = 0 failed = 0 for it in items: src = Path(it.abs_path) dst = Path(it.to_abs_path) try: if not src.exists(): failed += 1 continue safe_move(src, dst) moved += 1 except Exception: failed += 1 status_text = f"OK: {plan_id} APPLY moved={moved} failed={failed}" comment_text = f"APPLY 完了: moved={moved} failed={failed}" wrote = False if args.sheet_id: wrote = try_write_sheets( sheet_id=args.sheet_id, sheet_name=args.sheet_name, status_cell=args.status_cell, comment_cell=args.comment_cell, status_text=status_text, comment_text=comment_text, ) if wrote: print("✅ [SHEETS] 結果を書き込みました。") else: print("⚠️ [SHEETS] 未使用(または書き込み失敗)") print("────────────────────────────────────────") print("[tanada_apply] END") print(f" time : {ts()}") print("────────────────────────────────────────") return 0 finally: con.close() if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/python_scripts/tanada_make_apply_plan_db.py ``` #!/usr/bin/env python3 # tanada_make_apply_plan_db.py # # simulate結果DB(items_ALL相当)を “完全コピー” して、実データ更新用DBとして確定する。 # # 重要方針: # - DBレイアウトは変えない(simulate DBをそのままコピー) # - 危険なDBなので、常に固定名に上書きで「現役は1つ」にする # - 必要なら旧版を _archive に退避(keep-archive 世代だけ保持) # # 探索: # - meta_dir 配下から「*tanada_items_ALL.sqlite」を優先的に探す # - 見つからない場合は、いくつかの互換パターンでも探す # # 出力: # - /Users/xxxxxxxxm1/das/meta/apply_plan__tanada_items_ALL.sqlite (固定名) # - 退避時: /Users/xxxxxxxxm1/das/meta/_archive/apply_plan__tanada_items_ALL__ARCHIVED_YYYYmmdd_HHMMSS.sqlite # from __future__ import annotations import argparse import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple # 既定 DEFAULT_META_DIR = Path("/Users/xxxxxxxxm1/das/meta") APPLY_PLAN_FIXED_NAME = "apply_plan__tanada_items_ALL.sqlite" ARCHIVE_DIR_NAME = "_archive" def ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def now_tag() -> str: return datetime.now().strftime("%Y%m%d_%H%M%S") def _glob_sorted(meta_dir: Path, patterns: List[str]) -> List[Path]: hits: List[Path] = [] for pat in patterns: hits.extend(meta_dir.glob(pat)) # 新しい順(mtime desc) hits = [p for p in hits if p.is_file()] hits.sort(key=lambda p: p.stat().st_mtime, reverse=True) return hits def find_latest_simulate_items_all_db(meta_dir: Path, debug: bool = False) -> Optional[Path]: """ simulate側の items_ALL相当DB を探す。 あなたの運用上、prefixが items_ とは限らないので、まず “末尾一致” を最優先。 """ # 最優先: “末尾が __tanada_items_ALL.sqlite” patterns_primary = [ "*__tanada_items_ALL.sqlite", # items_/plan_/analyze_ など prefix不問で拾う "*tanada_items_ALL.sqlite", # 念のため(微妙な命名差) ] # 互換(過去の想定) patterns_compat = [ "items_*__tanada_items_ALL.sqlite", "items_ALL.sqlite", ] hits = _glob_sorted(meta_dir, patterns_primary) if hits: if debug: print("[DEBUG] simulate items_ALL DB candidates (primary):") for p in hits[:10]: print(f" - {p.name}") return hits[0] hits = _glob_sorted(meta_dir, patterns_compat) if hits: if debug: print("[DEBUG] simulate items_ALL DB candidates (compat):") for p in hits[:10]: print(f" - {p.name}") return hits[0] if debug: print("[DEBUG] no simulate items_ALL DB found by any pattern.") return None def rotate_archives(archive_dir: Path, keep: int, debug: bool = False) -> None: """ archive_dir 内の apply_plan__tanada_items_ALL__ARCHIVED_*.sqlite を世代管理する。 keep=0 なら削除しない(無制限) """ if keep <= 0: return pats = ["apply_plan__tanada_items_ALL__ARCHIVED_*.sqlite"] olds = _glob_sorted(archive_dir, pats) if len(olds) <= keep: return for p in olds[keep:]: try: if debug: print(f"[DEBUG] archive prune: {p.name}") p.unlink() except Exception: pass def archive_existing(dest_db: Path, keep_archive: int, debug: bool = False) -> Optional[Path]: """ dest_db が既にある場合、_archiveへ退避して世代数を整える。 """ if not dest_db.exists(): return None archive_dir = dest_db.parent / ARCHIVE_DIR_NAME archive_dir.mkdir(parents=True, exist_ok=True) archived = archive_dir / f"apply_plan__tanada_items_ALL__ARCHIVED_{now_tag()}.sqlite" if debug: print(f"[DEBUG] archive: {dest_db.name} -> {archived.name}") # まずコピー(安全側:元を残す)→ その後、destは上書きされる shutil.copy2(str(dest_db), str(archived)) rotate_archives(archive_dir, keep=keep_archive, debug=debug) return archived def ensure_dir(p: Path) -> None: p.mkdir(parents=True, exist_ok=True) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--meta-dir", default=str(DEFAULT_META_DIR), help="metaディレクトリ(既定: /Users/xxxxxxxxm1/das/meta)") ap.add_argument("--src-db", default="", help="simulate items_ALL DB を明示指定(指定すれば探索しない)") ap.add_argument("--archive", action="store_true", help="既存の apply_plan DB を _archive に退避してから上書き") ap.add_argument("--keep-archive", type=int, default=3, help="_archive に残す世代数(既定:3 / 0で無制限)") ap.add_argument("--debug", action="store_true", help="デバッグ表示") args = ap.parse_args() meta_dir = Path(args.meta_dir).expanduser().resolve() if not meta_dir.exists(): print(f"[ERROR] meta_dir not found: {meta_dir}") return 2 # 出力先は固定(現役は1つ) dest_db = meta_dir / APPLY_PLAN_FIXED_NAME # 入力DBを決定 src_db: Optional[Path] = None if args.src_db.strip(): src_db = Path(args.src_db).expanduser().resolve() if not src_db.exists(): print(f"[ERROR] --src-db not found: {src_db}") return 2 else: src_db = find_latest_simulate_items_all_db(meta_dir, debug=bool(args.debug)) if not src_db: print(f"[ERROR] simulate items_ALL.sqlite not found in: {meta_dir}") print(" 探索パターン: *__tanada_items_ALL.sqlite, *tanada_items_ALL.sqlite, items_*__tanada_items_ALL.sqlite, items_ALL.sqlite") print(" まず次を実行して “__tanada_items_ALL.sqlite” があるか確認してください:") print(f" ls -1t {meta_dir}/**/*__tanada_items_ALL.sqlite 2>/dev/null | head") print(f" ls -1t {meta_dir}/*__tanada_items_ALL.sqlite 2>/dev/null | head") return 2 if args.debug: print("────────────────────────────────────────") print("[tanada_make_apply_plan_db] DEBUG") print("────────────────────────────────────────") print(f"time : {ts()}") print(f"meta_dir : {meta_dir}") print(f"src_db : {src_db}") print(f"dest_db : {dest_db}") print("────────────────────────────────────────") # 退避(任意) if args.archive: _ = archive_existing(dest_db, keep_archive=int(args.keep_archive), debug=bool(args.debug)) # “完全コピー” で確定(レイアウト保持) try: shutil.copy2(str(src_db), str(dest_db)) except Exception as e: print(f"[ERROR] copy failed: {e}") return 3 print("────────────────────────────────────────") print("[tanada_make_apply_plan_db] OK") print("────────────────────────────────────────") print(f"src : {src_db}") print(f"dest: {dest_db}") print("note: simulate DB を完全コピーして apply_plan DB として確定しました(レイアウト変更なし)") print("────────────────────────────────────────") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxxm1.tanada_apply_button_daemon.plist ``` Label com.xxxxxxxxm1.tanada_apply_button_daemon ProgramArguments /usr/bin/python3 /Users/xxxxxxxxm1/das/tanada_apply_button_daemon.py RunAtLoad KeepAlive StandardOutPath /Users/xxxxxxxxm1/das/logs/tanada_apply_button_daemon.out.log StandardErrorPath /Users/xxxxxxxxm1/das/logs/tanada_apply_button_daemon.err.log EnvironmentVariables DAS_SHEET_ID 1i6fBlPSORhyCbwv6a7K1x4Jk3rsoKrLkJlGZDGWjj5c DAS_CRED_JSON /Users/xxxxxxxxm1/secrets/service_account.json PRINT_ONLY 0 ```