# [メタ情報] # 識別子: DAS棚田管理_実データ更新処理_exe # システム名: DAS棚田管理_実データ更新処理 # 技術種別: Misc # 機能名: Misc # 使用言語: Python ShellScript # 状態: 実行用 # [/メタ情報] 要約: このシステムは、Google Sheetsをインターフェースとしたファイル管理の自動化パイプラインです。`tanada_apply_button_daemon.py`は、Google Sheets上の手動ボタン(A10)によるAPPLY要求のみを監視し、深夜時間帯を除いて`tanada_apply.py`を起動します。`apply_runner.sh`は、SheetsのA8セルからファイル移動計画DBのパスを読み込み、それを固定名の適用DBにコピーし、`tanada_apply.py`を実行するためのシェルスクリプトです。コアロジックを担う`tanada_apply.py`は、適用DBに記録されたアクション(主にファイルの移動)をDRYRUNまたはAPPLYモードで実行します。このスクリプトは、「現段位がT0ではないファイルを新段位T0へ移動する」ポリシー違反を最優先でチェックし、違反があれば直ちに処理を停止する安全機構を備えています。処理結果はSheetsの指定セルにフィードバックされます。`tanada_make_apply_plan_db.py`は、シミュレーション結果DBから、`tanada_apply.py`が使用する固定名の適用計画DBを生成し、古いDBは世代管理してアーカイブします。`com.xxxxxxxxm1.tanada_apply_button_daemon.plist`は、デーモンをmacOSのLaunchAgentとして自動起動・常駐させ、必要な環境変数を設定します。このパイプラインは、手動トリガーと厳格な安全ポリシーにより、ファイル移動の誤操作を防ぎながらも自動化を実現しています。 /Users/xxxxxxxxm1/das/tanada_apply_button_daemon.py ``` #!/usr/bin/env python3 from __future__ import annotations """ 【最重要:apply 起動制約(手動ボタンのみ)】 このデーモンは、Google Sheets の action_result シート上の 「A10に重ねた手動ボタン」が書き込む APPLY 要求セルだけを監視し、 そのセルが更新された場合に限り tanada_apply.py を起動する。 - 禁止:時間トリガー、定期実行、他セル(B2等)からの混線起動 - 許可:ユーザーが action_result シートの A10ボタンを手動で押した場合のみ """ import os import sys import time import subprocess from pathlib import Path from datetime import datetime, time as dtime # ====== apply専用(simulateと混線させない)====== SHEET_NAME = "action_result" APPLY_REQ = "B10" # A10ボタンが書く要求セル(例:APPLY:20260111_123456) APPLY_STATUS = "A11" # 状態表示 POLL_SEC = 5 # =============================================== # ===== 静音時間帯設定(深夜は実行しない)===== QUIET_START = dtime(0, 55) QUIET_END = dtime(2, 10) QUIET_SLEEP = 60 # ============================================ # ===== 実行する apply 本体 ===== PYTHON = "/usr/bin/python3" APPLY_SCRIPT = "/Users/xxxxxxxxm1/python_scripts/tanada_apply.py" # ============================== def ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def in_quiet_hours() -> bool: now = datetime.now().time() return QUIET_START <= now <= QUIET_END def sheets_read(cred_json: Path, sid: str, a1: str) -> str: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) r = service.spreadsheets().values().get(spreadsheetId=sid, range=a1).execute() vals = r.get("values", []) if not vals or not vals[0]: return "" return str(vals[0][0]) def sheets_write(cred_json: Path, sid: str, a1: str, value: str) -> None: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) service.spreadsheets().values().update( spreadsheetId=sid, range=a1, valueInputOption="RAW", body={"values": [[value]]}, ).execute() def run_apply(nonce: str) -> tuple[int, str]: """ A10ボタン由来のnonceでのみ apply を起動する。 - MANUAL_RUN=1 を必ず付与(手動起動の印として固定) - ここではターミナル操作は不要:デーモンが直接起動する """ env = os.environ.copy() env["MANUAL_RUN"] = "1" p = subprocess.run( [ PYTHON, APPLY_SCRIPT, "--plan-db", "/Users/xxxxxxxxm1/das/meta/apply_plan__tanada_items_ALL.sqlite", "--apply", "--self-check", "--sheet-id", os.environ.get("DAS_SHEET_ID","").strip(), "--sheet-name", SHEET_NAME, "--status-cell", APPLY_STATUS, "--comment-cell", "A12", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, ) return p.returncode, p.stdout def parse_nonce(req: str) -> str: # 許可:APPLY:YYYYMMDD_HHMMSS / REQUESTED:YYYYMMDD_HHMMSS req = (req or "").strip() if req.startswith("APPLY:"): return req.split(":", 1)[1].strip() if req.startswith("REQUESTED:"): return req.split(":", 1)[1].strip() return "" def main() -> int: sid = os.environ.get("DAS_SHEET_ID", "").strip() cred = os.environ.get("DAS_CRED_JSON", "").strip() if not sid or not cred: print("[ERROR] env missing: DAS_SHEET_ID / DAS_CRED_JSON", file=sys.stderr, flush=True) return 2 cred_json = Path(cred).expanduser().resolve() if not cred_json.exists(): print(f"[ERROR] cred json not found: {cred_json}", file=sys.stderr, flush=True) return 2 a1_req = f"{SHEET_NAME}!{APPLY_REQ}" a1_status = f"{SHEET_NAME}!{APPLY_STATUS}" last_req = "" print(f"[INFO] {ts()} tanada_apply_button_daemon started", flush=True) print(f"[INFO] watching: {sid} {a1_req} poll={POLL_SEC}s", flush=True) while True: try: # 深夜は一切処理しない(勝手に動いたと見える事故を避ける) if in_quiet_hours(): time.sleep(QUIET_SLEEP) continue req = sheets_read(cred_json, sid, a1_req).strip() if req and req != last_req: last_req = req nonce = parse_nonce(req) if not nonce: sheets_write(cred_json, sid, a1_status, f"REJECT: {req}") time.sleep(POLL_SEC) continue sheets_write(cred_json, sid, a1_status, f"RUNNING: {nonce}") code, out = run_apply(nonce) if code == 0: sheets_write(cred_json, sid, a1_status, f"DONE: {nonce}") else: head = "\n".join(out.splitlines()[:25]) sheets_write(cred_json, sid, a1_status, f"ERROR: {nonce}\n{head}") time.sleep(POLL_SEC) except Exception as e: try: sheets_write(cred_json, sid, a1_status, f"DAEMON_ERROR: {e}") except Exception: pass time.sleep(POLL_SEC) return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/das/apply_runner.sh ``` #!/bin/bash set -euo pipefail export PATH="/usr/bin:/bin:/usr/sbin:/sbin" LOCK_DIR="$HOME/das/locks" mkdir -p "$LOCK_DIR" LOCK_FILE="$LOCK_DIR/apply_pipeline.lock" # ---- 設定(固定名DB)---- APPLY_PLAN_DB="$HOME/das/meta/apply_plan__tanada_items_ALL.sqlite" TANADA_APPLY_PY="$HOME/python_scripts/tanada_apply.py" # ------------------------ # Sheets 参照(環境変数は LaunchAgent から渡される想定) : "${DAS_SHEET_ID:?missing DAS_SHEET_ID}" : "${DAS_CRED_JSON:?missing DAS_CRED_JSON}" SHEET_NAME="${SHEET_NAME:-action_result}" PLAN_PATH_CELL="${PLAN_PATH_CELL:-A8}" # ★simulateが書く「planパス」 APPLY_STATUS_CELL="${APPLY_STATUS_CELL:-A11}" APPLY_COMMENT_CELL="${APPLY_COMMENT_CELL:-A12}" # 二重起動だけ防止(macOS: flock無し → mkdirロック) LOCK_DIR_PATH="${LOCK_DIR}/apply_pipeline.lockdir" if ! /bin/mkdir "${LOCK_DIR_PATH}" 2>/dev/null; then echo "[INFO] apply_runner: already running (lock=${LOCK_DIR_PATH})" >&2 exit 0 fi cleanup() { /bin/rmdir "${LOCK_DIR_PATH}" 2>/dev/null || true; } trap cleanup EXIT INT TERM # A8 から plan DB パスを取得(Pythonで読む) PLAN_DB=$(/usr/bin/python3 - <<'PY' import os from pathlib import Path from google.oauth2.service_account import Credentials from googleapiclient.discovery import build sid = os.environ["DAS_SHEET_ID"].strip() cred = os.environ["DAS_CRED_JSON"].strip() sheet_name = os.environ.get("SHEET_NAME", "action_result") cell = os.environ.get("PLAN_PATH_CELL", "A8") scopes = ["https://www.googleapis.com/auth/spreadsheets.readonly"] creds = Credentials.from_service_account_file(cred, scopes=scopes) svc = build("sheets","v4",credentials=creds,cache_discovery=False) a1 = f"{sheet_name}!{cell}" r = svc.spreadsheets().values().get(spreadsheetId=sid, range=a1).execute() vals = r.get("values", []) v = (vals[0][0] if vals and vals[0] else "").strip() print(v) PY ) if [ -z "${PLAN_DB}" ]; then echo "[ERROR] apply_runner: plan path is empty (cell=${SHEET_NAME}!${PLAN_PATH_CELL})" >&2 exit 2 fi # A8 が "…plan_XXXX.sqlite" そのものではなく説明文入りでも拾えるように最後のパスっぽいものを抜く if [ ! -f "$PLAN_DB" ]; then # 文字列中から /Users/...sqlite を抽出 EXTRACTED=$(echo "$PLAN_DB" | /usr/bin/perl -ne 'if(m#(/[^ ]+plan_[0-9_]+__tanada_items_ALL\.sqlite)#){print $1}') if [ -n "${EXTRACTED:-}" ] && [ -f "$EXTRACTED" ]; then PLAN_DB="$EXTRACTED" fi fi if [ ! -f "$PLAN_DB" ]; then echo "[ERROR] apply_runner: plan db not found: $PLAN_DB" >&2 exit 2 fi # apply_plan を更新(固定名へコピー) /bin/cp -a "$PLAN_DB" "$APPLY_PLAN_DB" || exit 2 [ -f "${PLAN_DB}-wal" ] && /bin/cp -a "${PLAN_DB}-wal" "${APPLY_PLAN_DB}-wal" || true [ -f "${PLAN_DB}-shm" ] && /bin/cp -a "${PLAN_DB}-shm" "${APPLY_PLAN_DB}-shm" || true # apply 実行(いまは安全のため self-check 付きのまま) /usr/bin/python3 "$TANADA_APPLY_PY" \ --plan-db "$APPLY_PLAN_DB" \ --apply \ --self-check \ --sheet-id "$DAS_SHEET_ID" \ --sheet-name "$SHEET_NAME" \ --status-cell "$APPLY_STATUS_CELL" \ --comment-cell "$APPLY_COMMENT_CELL" ``` /Users/xxxxxxxxm1/python_scripts/tanada_apply.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ tanada_apply.py - plan-db(apply_plan__tanada_items_ALL.sqlite) を読み、 items_all の action に従って DRYRUN / APPLY を行う。 - 方針: 「T0戻り禁止」を最優先で検査。違反があれば即終了。 - Sheets への表示先セルは --status-cell / --comment-cell で指定可能。 (例: --status-cell B10 --comment-cell A12) 前提: - sqlite の items_all に: plan_id, process_id, abs_path, to_abs_path, size_bytes, current_stage, new_stage, action が入っていること """ from __future__ import annotations import argparse import os import sys import sqlite3 import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional, List, Tuple # ---------------------------- # Utilities # ---------------------------- def ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def human_gb(num_bytes: int) -> str: return f"{num_bytes / (1024**3):.2f}GB" def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) # ---------------------------- # Google Sheets (optional) # ---------------------------- def try_write_sheets(sheet_id: str, sheet_name: str, status_cell: str, comment_cell: str, status_text: str, comment_text: str) -> bool: """ gspread を使って書き込み。環境が無ければ False。 必要条件の例: - pip install gspread google-auth - GOOGLE_APPLICATION_CREDENTIALS が service account json を指す """ try: import gspread # type: ignore from google.oauth2.service_account import Credentials # type: ignore except Exception: return False cred_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "") if not cred_path or not Path(cred_path).exists(): return False try: scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(cred_path, scopes=scopes) gc = gspread.authorize(creds) sh = gc.open_by_key(sheet_id) ws = sh.worksheet(sheet_name) ws.update_acell(status_cell, status_text) ws.update_acell(comment_cell, comment_text) return True except Exception: return False # ---------------------------- # DB access # ---------------------------- @dataclass class MoveItem: abs_path: str to_abs_path: str size_bytes: int current_stage: str new_stage: str action: str process_id: str def db_connect(db_path: Path) -> sqlite3.Connection: con = sqlite3.connect(str(db_path)) con.row_factory = sqlite3.Row return con def detect_plan_id(con: sqlite3.Connection, forced_plan_id: Optional[str]) -> str: if forced_plan_id: return forced_plan_id # plan_runs があれば最新(=唯一想定)を取る try: row = con.execute("SELECT plan_id FROM plan_runs ORDER BY created_time DESC LIMIT 1").fetchone() if row and row["plan_id"]: return str(row["plan_id"]) except Exception: pass # items_all から推測 row = con.execute("SELECT plan_id FROM items_all LIMIT 1").fetchone() if row and row["plan_id"]: return str(row["plan_id"]) raise RuntimeError("plan_id を特定できません。--plan-id を指定してください。") def fetch_t0_return_violations(con: sqlite3.Connection, plan_id: str) -> Tuple[int, int, List[str]]: """ ルール: 現段位が T0 ではないのに、新段位が T0 で action=MOVE は禁止 """ q = """ SELECT abs_path, size_bytes FROM items_all WHERE plan_id = ? AND UPPER(action) = 'MOVE' AND current_stage != 'T0' AND new_stage = 'T0' ORDER BY size_bytes DESC, abs_path """ rows = con.execute(q, (plan_id,)).fetchall() count = len(rows) total = sum(int(r["size_bytes"]) for r in rows) examples = [str(r["abs_path"]) for r in rows[:10]] return count, total, examples def fetch_move_items(con: sqlite3.Connection, plan_id: str, process_id: Optional[str]) -> List[MoveItem]: q = """ SELECT abs_path, to_abs_path, size_bytes, current_stage, new_stage, action, process_id FROM items_all WHERE plan_id = ? AND UPPER(action) = 'MOVE' AND to_abs_path IS NOT NULL AND to_abs_path != '' """ params: List[object] = [plan_id] if process_id: q += " AND process_id = ?" params.append(process_id) q += " ORDER BY abs_path" rows = con.execute(q, tuple(params)).fetchall() out: List[MoveItem] = [] for r in rows: out.append( MoveItem( abs_path=str(r["abs_path"]), to_abs_path=str(r["to_abs_path"]), size_bytes=int(r["size_bytes"]), current_stage=str(r["current_stage"]), new_stage=str(r["new_stage"]), action=str(r["action"]), process_id=str(r["process_id"]), ) ) return out # ---------------------------- # Apply (filesystem move) # ---------------------------- def ensure_parent_dir(dst: Path): dst.parent.mkdir(parents=True, exist_ok=True) def safe_move(src: Path, dst: Path): """ 同一ボリューム跨ぎも考慮して shutil.move を使用。 """ ensure_parent_dir(dst) shutil.move(str(src), str(dst)) # ---------------------------- # Main # ---------------------------- def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--plan-db", required=True, help="apply_plan__tanada_items_ALL.sqlite のパス") ap.add_argument("--plan-id", default="", help="plan_id を明示したい場合のみ") ap.add_argument("--process-id", default="", help="特定 process_id のみ対象にしたい場合") ap.add_argument("--dryrun", action="store_true", help="DRYRUN(実体移動なし)") ap.add_argument("--apply", action="store_true", help="APPLY(実体移動あり)") ap.add_argument("--self-check", action="store_true", help="方針チェックのみ(DBや実体は触らない)") # Sheets (optional) ap.add_argument("--sheet-id", default=os.environ.get("TANADA_SHEET_ID", ""), help="Google Sheet ID") ap.add_argument("--sheet-name", default=os.environ.get("TANADA_SHEET_NAME", "action_result"), help="Sheet tab name") ap.add_argument("--status-cell", default=os.environ.get("TANADA_STATUS_CELL", "B10"), help="status表示セル") ap.add_argument("--comment-cell", default=os.environ.get("TANADA_COMMENT_CELL", "A12"), help="comment表示セル") args = ap.parse_args() if args.apply and args.dryrun: eprint("ERROR: --apply と --dryrun は同時指定できません。") return 2 if not args.apply and not args.dryrun: # 互換: 何も指定されない時は dryrun 扱い args.dryrun = True mode = "APPLY" if args.apply else "DRYRUN" plan_db = Path(args.plan_db).expanduser() if not plan_db.exists(): eprint(f"ERROR: plan-db not found: {plan_db}") return 2 forced_plan_id = args.plan_id.strip() or None proc = args.process_id.strip() or None print("────────────────────────────────────────") print("[tanada_apply] START") print(f" time : {ts()}") print(f" mode : {mode}") print("────────────────────────────────────────") con = db_connect(plan_db) try: plan_id = detect_plan_id(con, forced_plan_id) # 1) Policy check: T0戻り禁止 v_count, v_total, v_examples = fetch_t0_return_violations(con, plan_id) if v_count > 0: # status/comment 文章(短め) status_text = f"ERROR: {plan_id} T0戻り禁止違反 {v_count}件 {human_gb(v_total)}" comment_lines = [ "⛔️ T0への戻りは禁止です(現!=T0 かつ 新=T0 の MOVE)", f"- plan_id: {plan_id}", f"- 検出: {v_count}件 / {human_gb(v_total)}", "- 代表例:", ] + [f" * {p}" for p in v_examples] comment_text = "\n".join(comment_lines) print("────────────────────────────────────────") print("⛔️ [POLICY] T0戻り禁止違反を検出しました") print("────────────────────────────────────────") print(comment_text) wrote = False if args.sheet_id: wrote = try_write_sheets( sheet_id=args.sheet_id, sheet_name=args.sheet_name, status_cell=args.status_cell, comment_cell=args.comment_cell, status_text=status_text, comment_text=comment_text, ) if not wrote: print("⚠️ [SHEETS] status/comment の書き込みに失敗(または未設定)でした。") print("⛔️ [POLICY] DRYRUN/APPLYともに、ここで終了します。DB出力・実体移動は行いません。") return 1 # self-check だけならここで終わり if args.self_check: status_text = f"OK: {plan_id} policy check passed" comment_text = "OK: T0戻り禁止違反なし" wrote = False if args.sheet_id: wrote = try_write_sheets( sheet_id=args.sheet_id, sheet_name=args.sheet_name, status_cell=args.status_cell, comment_cell=args.comment_cell, status_text=status_text, comment_text=comment_text, ) if wrote: print("✅ [SHEETS] policy check OK を書き込みました。") else: print("✅ policy check OK(Sheets未使用)") return 0 # 2) MOVE 実行(DRYRUNなら表示のみ) items = fetch_move_items(con, plan_id, proc) total_bytes = sum(i.size_bytes for i in items) print("────────────────────────────────────────") print(f"[PLAN] plan_id={plan_id} process_id={proc or '*'}") print(f"[PLAN] MOVE items: {len(items)} total: {human_gb(total_bytes)}") print("────────────────────────────────────────") if args.dryrun: # 最大30件だけ表示(画面爆発防止) max_show = 30 for i, it in enumerate(items[:max_show], start=1): print(f"[DRYRUN] {i:03d} {it.abs_path} -> {it.to_abs_path}") if len(items) > max_show: print(f"... ({len(items)-max_show} 件は省略)") status_text = f"OK: {plan_id} DRYRUN MOVE={len(items)} {human_gb(total_bytes)}" comment_text = "DRYRUN 完了(実体移動なし)" else: # APPLY moved = 0 failed = 0 for it in items: src = Path(it.abs_path) dst = Path(it.to_abs_path) try: if not src.exists(): failed += 1 continue safe_move(src, dst) moved += 1 except Exception: failed += 1 status_text = f"OK: {plan_id} APPLY moved={moved} failed={failed}" comment_text = f"APPLY 完了: moved={moved} failed={failed}" wrote = False if args.sheet_id: wrote = try_write_sheets( sheet_id=args.sheet_id, sheet_name=args.sheet_name, status_cell=args.status_cell, comment_cell=args.comment_cell, status_text=status_text, comment_text=comment_text, ) if wrote: print("✅ [SHEETS] 結果を書き込みました。") else: print("⚠️ [SHEETS] 未使用(または書き込み失敗)") print("────────────────────────────────────────") print("[tanada_apply] END") print(f" time : {ts()}") print("────────────────────────────────────────") return 0 finally: con.close() if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/python_scripts/tanada_make_apply_plan_db.py ``` #!/usr/bin/env python3 # tanada_make_apply_plan_db.py # # simulate結果DB(items_ALL相当)を “完全コピー” して、実データ更新用DBとして確定する。 # # 重要方針: # - DBレイアウトは変えない(simulate DBをそのままコピー) # - 危険なDBなので、常に固定名に上書きで「現役は1つ」にする # - 必要なら旧版を _archive に退避(keep-archive 世代だけ保持) # # 探索: # - meta_dir 配下から「*tanada_items_ALL.sqlite」を優先的に探す # - 見つからない場合は、いくつかの互換パターンでも探す # # 出力: # - /Users/xxxxxxxxm1/das/meta/apply_plan__tanada_items_ALL.sqlite (固定名) # - 退避時: /Users/xxxxxxxxm1/das/meta/_archive/apply_plan__tanada_items_ALL__ARCHIVED_YYYYmmdd_HHMMSS.sqlite # from __future__ import annotations import argparse import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple # 既定 DEFAULT_META_DIR = Path("/Users/xxxxxxxxm1/das/meta") APPLY_PLAN_FIXED_NAME = "apply_plan__tanada_items_ALL.sqlite" ARCHIVE_DIR_NAME = "_archive" def ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def now_tag() -> str: return datetime.now().strftime("%Y%m%d_%H%M%S") def _glob_sorted(meta_dir: Path, patterns: List[str]) -> List[Path]: hits: List[Path] = [] for pat in patterns: hits.extend(meta_dir.glob(pat)) # 新しい順(mtime desc) hits = [p for p in hits if p.is_file()] hits.sort(key=lambda p: p.stat().st_mtime, reverse=True) return hits def find_latest_simulate_items_all_db(meta_dir: Path, debug: bool = False) -> Optional[Path]: """ simulate側の items_ALL相当DB を探す。 あなたの運用上、prefixが items_ とは限らないので、まず “末尾一致” を最優先。 """ # 最優先: “末尾が __tanada_items_ALL.sqlite” patterns_primary = [ "*__tanada_items_ALL.sqlite", # items_/plan_/analyze_ など prefix不問で拾う "*tanada_items_ALL.sqlite", # 念のため(微妙な命名差) ] # 互換(過去の想定) patterns_compat = [ "items_*__tanada_items_ALL.sqlite", "items_ALL.sqlite", ] hits = _glob_sorted(meta_dir, patterns_primary) if hits: if debug: print("[DEBUG] simulate items_ALL DB candidates (primary):") for p in hits[:10]: print(f" - {p.name}") return hits[0] hits = _glob_sorted(meta_dir, patterns_compat) if hits: if debug: print("[DEBUG] simulate items_ALL DB candidates (compat):") for p in hits[:10]: print(f" - {p.name}") return hits[0] if debug: print("[DEBUG] no simulate items_ALL DB found by any pattern.") return None def rotate_archives(archive_dir: Path, keep: int, debug: bool = False) -> None: """ archive_dir 内の apply_plan__tanada_items_ALL__ARCHIVED_*.sqlite を世代管理する。 keep=0 なら削除しない(無制限) """ if keep <= 0: return pats = ["apply_plan__tanada_items_ALL__ARCHIVED_*.sqlite"] olds = _glob_sorted(archive_dir, pats) if len(olds) <= keep: return for p in olds[keep:]: try: if debug: print(f"[DEBUG] archive prune: {p.name}") p.unlink() except Exception: pass def archive_existing(dest_db: Path, keep_archive: int, debug: bool = False) -> Optional[Path]: """ dest_db が既にある場合、_archiveへ退避して世代数を整える。 """ if not dest_db.exists(): return None archive_dir = dest_db.parent / ARCHIVE_DIR_NAME archive_dir.mkdir(parents=True, exist_ok=True) archived = archive_dir / f"apply_plan__tanada_items_ALL__ARCHIVED_{now_tag()}.sqlite" if debug: print(f"[DEBUG] archive: {dest_db.name} -> {archived.name}") # まずコピー(安全側:元を残す)→ その後、destは上書きされる shutil.copy2(str(dest_db), str(archived)) rotate_archives(archive_dir, keep=keep_archive, debug=debug) return archived def ensure_dir(p: Path) -> None: p.mkdir(parents=True, exist_ok=True) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--meta-dir", default=str(DEFAULT_META_DIR), help="metaディレクトリ(既定: /Users/xxxxxxxxm1/das/meta)") ap.add_argument("--src-db", default="", help="simulate items_ALL DB を明示指定(指定すれば探索しない)") ap.add_argument("--archive", action="store_true", help="既存の apply_plan DB を _archive に退避してから上書き") ap.add_argument("--keep-archive", type=int, default=3, help="_archive に残す世代数(既定:3 / 0で無制限)") ap.add_argument("--debug", action="store_true", help="デバッグ表示") args = ap.parse_args() meta_dir = Path(args.meta_dir).expanduser().resolve() if not meta_dir.exists(): print(f"[ERROR] meta_dir not found: {meta_dir}") return 2 # 出力先は固定(現役は1つ) dest_db = meta_dir / APPLY_PLAN_FIXED_NAME # 入力DBを決定 src_db: Optional[Path] = None if args.src_db.strip(): src_db = Path(args.src_db).expanduser().resolve() if not src_db.exists(): print(f"[ERROR] --src-db not found: {src_db}") return 2 else: src_db = find_latest_simulate_items_all_db(meta_dir, debug=bool(args.debug)) if not src_db: print(f"[ERROR] simulate items_ALL.sqlite not found in: {meta_dir}") print(" 探索パターン: *__tanada_items_ALL.sqlite, *tanada_items_ALL.sqlite, items_*__tanada_items_ALL.sqlite, items_ALL.sqlite") print(" まず次を実行して “__tanada_items_ALL.sqlite” があるか確認してください:") print(f" ls -1t {meta_dir}/**/*__tanada_items_ALL.sqlite 2>/dev/null | head") print(f" ls -1t {meta_dir}/*__tanada_items_ALL.sqlite 2>/dev/null | head") return 2 if args.debug: print("────────────────────────────────────────") print("[tanada_make_apply_plan_db] DEBUG") print("────────────────────────────────────────") print(f"time : {ts()}") print(f"meta_dir : {meta_dir}") print(f"src_db : {src_db}") print(f"dest_db : {dest_db}") print("────────────────────────────────────────") # 退避(任意) if args.archive: _ = archive_existing(dest_db, keep_archive=int(args.keep_archive), debug=bool(args.debug)) # “完全コピー” で確定(レイアウト保持) try: shutil.copy2(str(src_db), str(dest_db)) except Exception as e: print(f"[ERROR] copy failed: {e}") return 3 print("────────────────────────────────────────") print("[tanada_make_apply_plan_db] OK") print("────────────────────────────────────────") print(f"src : {src_db}") print(f"dest: {dest_db}") print("note: simulate DB を完全コピーして apply_plan DB として確定しました(レイアウト変更なし)") print("────────────────────────────────────────") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxxm1.tanada_apply_button_daemon.plist ``` Label com.xxxxxxxxm1.tanada_apply_button_daemon ProgramArguments /usr/bin/python3 /Users/xxxxxxxxm1/das/tanada_apply_button_daemon.py RunAtLoad KeepAlive StandardOutPath /Users/xxxxxxxxm1/das/logs/tanada_apply_button_daemon.out.log StandardErrorPath /Users/xxxxxxxxm1/das/logs/tanada_apply_button_daemon.err.log EnvironmentVariables DAS_SHEET_ID 1i6fBlPSORhyCbwv6a7K1x4Jk3rsoKrLkJlGZDGWjj5c DAS_CRED_JSON /Users/xxxxxxxxm1/secrets/service_account.json PRINT_ONLY 0 ```