# [メタ情報] # 識別子: DAS棚田管理_メタデータ処理py_sh_exe # システム名: DAS棚田管理_メタデータ処理py_sh # 技術種別: Misc # 機能名: Misc # 使用言語: Python ShellScript # 状態: 実行用 # [/メタ情報] 要約:Google Sheetsの「tanada」シートを設定CSV(tanada.csv)へ同期し、そのD列「転出パス」配下を全走査して世代SQLite(scan DB)を作成、さらにscan DB+tanada.csvから全件の計画SQLite(plan DB)を1つ生成して「KEEP/MOVE/EXCLUDE」と新旧パス(abs_path/to_abs_path)を確定する一連のメタデータ用パイプライン。D列は1セル内の「,」や改行で複数パス指定可として分解し、重複は深いroot優先で排除。analyzeでは処理idごとに「転出パス1つ・転入パス1つ・除外パス複数可」を強制し、除外容量を差し引いた限度内でmtime降順に累積して新段位を決定、逆流(転出段≥転入段)はpipelineで拒否。最後にplan DBを集計し「どこ→どこへ/件数/容量」をMail.appで通知(print-only可)。das_runner.shがexport→pipeline→notifyをボタン1発で実行するランチャ。 /Users/xxxxxxxxm1/python_scripts/tanada_export.py tanada_export.py ``` #!/usr/bin/env python3 """ tanada_export.py Google Sheets の tanada シートを CSV に書き出す(設定同期専用) - 実体ファイルは一切触らない - DB は一切触らない - 何度実行してもよい(idempotent) """ from __future__ import annotations import argparse import csv import os import sys from pathlib import Path from datetime import datetime # Google Sheets 用 import gspread from google.oauth2.service_account import Credentials def now_iso(): return datetime.now().astimezone().isoformat(timespec="seconds") def main() -> int: home = Path.home() default_out = home / "das" / "config" / "tanada.csv" ap = argparse.ArgumentParser() ap.add_argument("--cred-json", required=True, help="Google Service Account の credential JSON") ap.add_argument("--spreadsheet-id", required=True, help="tanada を含む Spreadsheet ID") ap.add_argument("--sheet-name", default="tanada", help="シート名(既定: tanada)") ap.add_argument("--out-csv", default=str(default_out), help=f"出力CSV(既定: {default_out})") args = ap.parse_args() out_csv = Path(os.path.expanduser(args.out_csv)) out_csv.parent.mkdir(parents=True, exist_ok=True) scopes = ["https://www.googleapis.com/auth/spreadsheets.readonly"] creds = Credentials.from_service_account_file(args.cred_json, scopes=scopes) gc = gspread.authorize(creds) sh = gc.open_by_key(args.spreadsheet_id) ws = sh.worksheet(args.sheet_name) rows = ws.get_all_values() if not rows: print("[ERROR] sheet is empty", file=sys.stderr) return 2 # CSV 書き出し(完全写像) with out_csv.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) for row in rows: w.writerow(row) print("[OK] tanada exported") print(f"[OK] sheet={args.sheet_name}") print(f"[OK] rows={len(rows)-1} (excluding header)") print(f"[OK] out_csv={out_csv}") print(f"[OK] time={now_iso()}") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/python_scripts/tanada_scan.py tanada_scan.py ``` #!/usr/bin/env python3 # tanada_scan.py # tanada CSV の D列(転出パス)配下を全走査し、世代SQLite(棚卸しDB)を生成する。 # # 仕様準拠: # - 対象範囲: tanadaシート D列「転出パス」すべての配下(網羅性最優先) # - E列(転出除外パス)はDB生成時には使わない(除外判断は後工程) # - 最低限の記録: abs_path/root_path/rel_path/filename/size_bytes/mtime/mtime_iso/scan_time # # 運用上の正式配置(絶対パス固定): # - tanada.csv は /Users/xxxxxxxxm1/das/config/tanada.csv を正式配置とする # - 本スクリプトは --tanada-csv 未指定時、上記をデフォルト採用する # # 注意: # - 実体ファイルは一切変更しない(読み取りのみ) # - D列の転出パスが重複・内包する場合は「より深いrootを優先」してroot_pathを付与しつつ、 # 同一abs_pathは二重登録しない(重複排除) # # 追加(今回の修正点): # - D列1セルに複数パスが入る場合、"," と改行 "\n" の両方を区切りとして分解してroot化する # 例: "/a/b,/c/d" / "/a/b\n/c/d" / "/a/b,\n/c/d" すべて対応 # # ★重要(揺らぎ対策): # - 走査結果が 0 件だった場合は異常として非0で終了する(0TB要約を作らせない) # - PermissionError / FileNotFoundError 等の発生回数を表示する((1) の成否を毎回可視化) # # ★追加(同一パス内容揺らぎ検知): # - --with-sha256 指定時のみ sha256 を計算して記録(重いので通常はOFF推奨) # - sha256 は「前後stat一致」のときだけ確定(観測の原子性を担保) # - 途中で変化した場合は hash_status=unstable とし、sha256はNULL(確定しない) from __future__ import annotations import argparse import csv import hashlib import os import re import sqlite3 import sys import time from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict, Iterator, List, Set, Tuple # ===== 正式配置(絶対パス)===== DAS_BASE = Path("/Users/xxxxxxxxm1/das") DEFAULT_TANADA_CSV = DAS_BASE / "config" / "tanada.csv" @dataclass(frozen=True) class Root: root_path: str # D列の起点パス(フルパス) norm: str # 正規化(末尾スラッシュ除去など) @dataclass(frozen=True) class StatSnap: size: int mtime_ns: int inode: int dev: int def now_iso_local() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def ts_iso_local(epoch: float) -> str: return datetime.fromtimestamp(epoch).astimezone().isoformat(timespec="seconds") def norm_dir_path(p: str) -> str: p = (p or "").strip() if not p: return "" # 末尾スラッシュ除去 while p.endswith("/"): p = p[:-1] # 末尾CRなども削る(念のため) p = p.strip() return p def split_multi_paths(cell: str) -> List[str]: """ 1セルに複数パスが入っている場合に分解する。 対応区切り: カンマ , / 改行 \n / その混在 """ if cell is None: return [] s = str(cell) # 区切りをまとめてsplit(カンマ or 改行) parts = re.split(r"[,\n]+", s) out: List[str] = [] for part in parts: p = norm_dir_path(part) if p: out.append(p) return out def read_roots_from_tanada_csv(csv_path: Path, d_col_index_1based: int) -> List[Root]: roots: List[Root] = [] with csv_path.open("r", encoding="utf-8", newline="") as f: r = csv.reader(f) rows = list(r) if not rows: return [] for row in rows[1:]: # 1行目はヘッダ想定 if len(row) < d_col_index_1based: continue raw_cell = row[d_col_index_1based - 1] # ★セル内の複数パスを分解 for p in split_multi_paths(raw_cell): roots.append(Root(root_path=p, norm=p)) # 重複除去(順序保持) seen: Set[str] = set() uniq: List[Root] = [] for rt in roots: if rt.norm in seen: continue seen.add(rt.norm) uniq.append(rt) # “より深いrootを優先”するため、長い順に並べる uniq.sort(key=lambda x: len(x.norm), reverse=True) return uniq def iter_files_under(root_dir: str, stats: Dict[str, int]) -> Iterator[Tuple[str, os.stat_result]]: """ root_dir 配下を再帰走査してファイルをyieldする。 stats にはエラー回数などを加算する(握り潰さず可視化)。 """ stack: List[str] = [root_dir] while stack: d = stack.pop() try: with os.scandir(d) as it: stats["dirs_scanned"] += 1 for entry in it: try: if entry.is_symlink(): continue if entry.is_dir(follow_symAPP_KEY=False): stack.append(entry.path) elif entry.is_file(follow_symAPP_KEY=False): st = entry.stat(follow_symAPP_KEY=False) yield entry.path, st except FileNotFoundError: stats["fnf_errors"] += 1 continue except PermissionError: stats["perm_errors"] += 1 continue except OSError: stats["os_errors"] += 1 continue except FileNotFoundError: stats["fnf_errors"] += 1 continue except PermissionError: stats["perm_errors"] += 1 continue except OSError: stats["os_errors"] += 1 continue def stat_snap_nofollow(path: str) -> StatSnap: st = os.stat(path, follow_symAPP_KEY=False) return StatSnap( size=int(st.st_size), mtime_ns=int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))), inode=int(getattr(st, "st_ino", 0)), dev=int(getattr(st, "st_dev", 0)), ) def sha256_file(path: str, chunk_size: int = 1024 * 1024) -> str: h = hashlib.sha256() with open(path, "rb") as f: while True: b = f.read(chunk_size) if not b: break h.update(b) return h.hexdigest() def stable_sha256(path: str, retries: int = 2, sleep_sec: float = 0.2): """ returns: (sha256_or_none, pre, post, status) status: ok : 前後stat一致、sha256確定 unstable : 途中で変化(書き換え/差し替え/同期中) missing : 観測中に消えた error : その他エラー """ for i in range(retries + 1): try: pre = stat_snap_nofollow(path) except FileNotFoundError: return None, None, None, "missing" except Exception: return None, None, None, "error" try: h = sha256_file(path) except FileNotFoundError: return None, pre, None, "missing" except Exception: return None, pre, None, "error" try: post = stat_snap_nofollow(path) except FileNotFoundError: return None, pre, None, "missing" except Exception: return None, pre, None, "error" if pre == post: return h, pre, post, "ok" if i < retries: time.sleep(sleep_sec) continue return None, pre, post, "unstable" def ensure_db(db_path: Path) -> sqlite3.Connection: db_path.parent.mkdir(parents=True, exist_ok=True) con = sqlite3.connect(str(db_path)) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") con.execute("PRAGMA temp_store=MEMORY;") con.execute(""" CREATE TABLE IF NOT EXISTS scan_runs ( scan_id TEXT PRIMARY KEY, scan_time TEXT NOT NULL, tanada_csv TEXT NOT NULL, roots_text TEXT NOT NULL, created_at TEXT NOT NULL ); """) # 新規DBは拡張スキーマで作成 con.execute(""" CREATE TABLE IF NOT EXISTS scan_files ( abs_path TEXT PRIMARY KEY, root_path TEXT NOT NULL, rel_path TEXT NOT NULL, filename TEXT NOT NULL, size_bytes INTEGER NOT NULL, mtime INTEGER NOT NULL, mtime_ns INTEGER NOT NULL, mtime_iso TEXT NOT NULL, inode INTEGER NOT NULL, dev INTEGER NOT NULL, sha256 TEXT, hash_status TEXT NOT NULL, scan_time TEXT NOT NULL ); """) # 既存DB互換:旧スキーマに列が無い場合は追加(壊さない) def add_col(sql: str) -> None: try: con.execute(sql) except sqlite3.OperationalError: pass add_col("ALTER TABLE scan_files ADD COLUMN mtime_ns INTEGER NOT NULL DEFAULT 0;") add_col("ALTER TABLE scan_files ADD COLUMN inode INTEGER NOT NULL DEFAULT 0;") add_col("ALTER TABLE scan_files ADD COLUMN dev INTEGER NOT NULL DEFAULT 0;") add_col("ALTER TABLE scan_files ADD COLUMN sha256 TEXT;") add_col("ALTER TABLE scan_files ADD COLUMN hash_status TEXT NOT NULL DEFAULT 'skipped';") con.execute("CREATE INDEX IF NOT EXISTS idx_scan_files_root ON scan_files(root_path);") con.execute("CREATE INDEX IF NOT EXISTS idx_scan_files_mtime ON scan_files(mtime);") con.execute("CREATE INDEX IF NOT EXISTS idx_scan_files_hash_status ON scan_files(hash_status);") return con def main() -> int: ap = argparse.ArgumentParser() ap.add_argument( "--tanada-csv", default=str(DEFAULT_TANADA_CSV), help=f"tanadaシートをexportしたCSVのパス(既定: {DEFAULT_TANADA_CSV})", ) ap.add_argument("--d-col", type=int, default=4, help="転出パス列(1-based)。既定=4(D列)") ap.add_argument( "--scan-id", default=None, help="スキャン世代ID(例: 2025Q4, 20251226_0500)。省略時は日時で自動生成", ) ap.add_argument( "--out-db", required=True, help="出力SQLiteパス(例: /Users/xxxxxxxxm1/das/meta/scan_20251226.sqlite)", ) ap.add_argument("--limit-files", type=int, default=0, help="テスト用。0=無制限") # ★sha256(任意・重い) ap.add_argument("--with-sha256", action="store_true", help="sha256 を計算してDBに記録(重い)") ap.add_argument("--sha256-retries", type=int, default=2, help="sha256安定確認のリトライ回数") ap.add_argument("--sha256-sleep", type=float, default=0.2, help="安定確認リトライ時の待ち秒") args = ap.parse_args() tanada_csv = Path(os.path.expanduser(args.tanada_csv)).resolve() out_db = Path(os.path.expanduser(args.out_db)).resolve() if not tanada_csv.exists(): print(f"[ERROR] tanada csv not found: {tanada_csv}", file=sys.stderr) print(f" (expected default location: {DEFAULT_TANADA_CSV})", file=sys.stderr) return 2 scan_time = now_iso_local() scan_id = args.scan_id or datetime.now().strftime("%Y%m%d_%H%M%S") roots = read_roots_from_tanada_csv(tanada_csv, args.d_col) if not roots: print("[ERROR] no roots found in tanada CSV (D column).", file=sys.stderr) return 3 # ルート存在チェック(存在しないものは警告してスキップ) live_roots: List[Root] = [] missing_roots: List[str] = [] for rt in roots: if os.path.isdir(rt.norm): live_roots.append(rt) else: missing_roots.append(rt.norm) for p in missing_roots: print(f"[WARN] root not found or not dir: {p}", file=sys.stderr) if not live_roots: print("[ERROR] no existing root directories to scan.", file=sys.stderr) return 4 roots_text = "\n".join([r.norm for r in live_roots]) con = ensure_db(out_db) cur = con.cursor() # 同scan_idの上書きは禁止(世代固定) try: cur.execute( "INSERT INTO scan_runs(scan_id, scan_time, tanada_csv, roots_text, created_at) VALUES (?,?,?,?,?)", (scan_id, scan_time, str(tanada_csv), roots_text, now_iso_local()), ) except sqlite3.IntegrityError: print(f"[ERROR] scan_id already exists in DB: {scan_id}", file=sys.stderr) print(" choose another --scan-id or a new --out-db", file=sys.stderr) return 5 con.commit() inserted = 0 skipped_dup = 0 scanned_roots = 0 seen_paths: Set[str] = set() limit_files = int(args.limit_files or 0) scan_time_fixed = scan_time # この走査世代のscan_timeは固定 stats: Dict[str, int] = { "dirs_scanned": 0, "perm_errors": 0, "fnf_errors": 0, "os_errors": 0, } # hash_status 集計 hash_ok = 0 hash_unstable = 0 hash_missing = 0 hash_error = 0 hash_skipped = 0 for rt in live_roots: scanned_roots += 1 root_dir = rt.norm for abs_path, st in iter_files_under(root_dir, stats): abs_norm = abs_path if abs_norm in seen_paths: skipped_dup += 1 continue seen_paths.add(abs_norm) try: rel_path = os.path.relpath(abs_norm, root_dir) except ValueError: rel_path = abs_norm filename = os.path.basename(abs_norm) size_bytes = int(st.st_size) mtime = int(st.st_mtime) mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))) inode = int(getattr(st, "st_ino", 0)) dev = int(getattr(st, "st_dev", 0)) mtime_iso = ts_iso_local(st.st_mtime) sha256 = None hash_status = "skipped" if args.with_sha256: sha256, pre, post, hash_status = stable_sha256( abs_norm, retries=int(args.sha256_retries), sleep_sec=float(args.sha256_sleep), ) # sha256は ok の時だけ確定、それ以外はNULLのまま if hash_status == "ok": hash_ok += 1 elif hash_status == "unstable": hash_unstable += 1 elif hash_status == "missing": hash_missing += 1 elif hash_status == "error": hash_error += 1 else: hash_skipped += 1 cur.execute( "INSERT OR REPLACE INTO scan_files(" "abs_path, root_path, rel_path, filename, " "size_bytes, mtime, mtime_ns, mtime_iso, inode, dev, " "sha256, hash_status, scan_time" ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)", ( abs_norm, root_dir, rel_path, filename, size_bytes, mtime, mtime_ns, mtime_iso, inode, dev, sha256, hash_status, scan_time_fixed ), ) inserted += 1 if inserted % 5000 == 0: con.commit() if limit_files and inserted >= limit_files: break con.commit() if limit_files and inserted >= limit_files: break con.commit() con.close() print(f"[OK] scan_id={scan_id}") print(f"[OK] db={out_db}") print(f"[OK] scan_time={scan_time_fixed}") print(f"[OK] roots_total={len(roots)}") print(f"[OK] roots_live={len(live_roots)}") print(f"[OK] roots_missing={len(missing_roots)}") print(f"[OK] roots_scanned={scanned_roots}") print(f"[OK] dirs_scanned={stats['dirs_scanned']}") print(f"[OK] files_inserted={inserted}") print(f"[OK] dup_skipped={skipped_dup}") print(f"[OK] perm_errors={stats['perm_errors']}") print(f"[OK] fnf_errors={stats['fnf_errors']}") print(f"[OK] os_errors={stats['os_errors']}") print(f"[OK] hash_ok={hash_ok}") print(f"[OK] hash_unstable={hash_unstable}") print(f"[OK] hash_missing={hash_missing}") print(f"[OK] hash_error={hash_error}") print(f"[OK] hash_skipped={hash_skipped}") # ★0件は異常((1)未完遂) if inserted == 0: print("[ERROR] scan_files inserted=0 (metadata scope(1) failed).", file=sys.stderr) print(" This prevents 0TB summary. Check volume mount / permissions / Dropbox availability.", file=sys.stderr) return 6 return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/python_scripts/tanada_analyze.py tanada_analyze.py ``` #!/usr/bin/env python3 # tanada_analyze.py (REVISED: conforms to the latest "DAS棚田処理_メタデータ更新アルゴリズム") # # scan世代DB + tanada.csv を読み、 # 「ALL正本の計画DB(Plan DB)」を1つ生成する(メタデータ更新=シミュレーション)。 # # この版の追加要件: # - items_all に「現ファイルパス(abs_path)」と「新ファイルパス(to_abs_path)」を必ず併記する # - items_all に action(KEEP / MOVE / EXCLUDE)を必ず書く # - 1つの処理idにおいて、転出パスは1つ、転入パスは1つ、転出除外パスは複数可 を強制する # # 新ファイルパスの決定則(重要): # - rel_path は「その段の転出パス(root_path)配下の相対パス(ファイル名含む)」として scan_db に保存されている # - よって移動先の絶対パスは必ず: # to_root_path = stage_root_map[new_stage] # to_abs_path = to_root_path + "/" + rel_path # となる(filename を二重に付けない) # from __future__ import annotations import argparse import csv import os import re import sqlite3 from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple DAS_BASE = Path("/Users/xxxxxxxxm1/das") DEFAULT_TANADA_CSV = DAS_BASE / "config" / "tanada.csv" TB_BYTES = 1024 ** 4 # 1TiB # tanada列名(名寄せは read 側で行う) COL_PID = "処理id" COL_FROM_STAGE = "転出段" COL_FROM_PATHS = "転出パス" COL_EXCLUDE = "転出除外パス" COL_TO_STAGE = "転入段" COL_DST_BASE = "転入パス" COL_LIMIT_TB = "転出パスに含まれる使用量に対する限度_TB" # ---------- util ---------- def now_iso_local() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def norm_dir_path(p: str) -> str: p = (p or "").strip() while p.endswith("/"): p = p[:-1] return p def split_paths(cell: str) -> List[str]: """ tanadaのD/E列は「カンマ / 改行」混在を許容。 ただし、このスクリプトでは最終的に「転出パス(D列)は1つ」を強制する(後段で検証)。 """ if cell is None: return [] s = str(cell) parts = re.split(r"[,\n]+", s) out: List[str] = [] for part in parts: p = norm_dir_path(part) if p: out.append(p) # 重複除去(順序保持)→深い順 seen = set() uniq: List[str] = [] for p in out: if p in seen: continue seen.add(p) uniq.append(p) uniq.sort(key=len, reverse=True) return uniq def is_under(path: str, prefix: str) -> bool: return path == prefix or path.startswith(prefix + "/") def stage_to_n(s: str) -> int: s = (s or "").strip() if not s.startswith("T"): raise ValueError(s) return int(s[1:]) def _normalize_header(h: str) -> str: """ CSVヘッダの揺れ対策(空白・全角空白・BOM等)。 """ if h is None: return "" s = str(h) s = s.replace("\ufeff", "") # BOM s = s.replace(" ", " ") # 全角空白→半角 s = s.strip() s = re.sub(r"\s+", "", s) # 余計な空白を除去 return s def _build_header_map(headers: List[str]) -> Dict[str, str]: """ 正規化ヘッダ -> 元のヘッダ を作る(先勝ち) """ m: Dict[str, str] = {} for h in headers: nh = _normalize_header(h) if not nh: continue if nh not in m: m[nh] = h return m def _get(row: Dict[str, str], hmap: Dict[str, str], want: str) -> str: """ want(想定列名)から、実CSVの列名揺れを吸収して値を取得する。 """ key_norm = _normalize_header(want) real = hmap.get(key_norm, "") if real and real in row: return (row.get(real) or "").strip() if want in row: return (row.get(want) or "").strip() for k in row.keys(): if _normalize_header(k) == key_norm: return (row.get(k) or "").strip() return "" def parse_limit_tb(s: str) -> float: """ I列の値を堅牢にfloat化。 """ if s is None: return 0.0 t = str(s).strip() if not t: return 0.0 t = t.replace(",", ",").replace(".", ".").replace(",", ".") m = re.search(r"[-+]?\d+(?:\.\d+)?", t) if not m: return 0.0 try: return float(m.group(0)) except Exception: return 0.0 def join_path(root: str, rel: str) -> str: """ root + "/" + rel を安全に結合する(// を作らない、末尾/も潰す) """ root = norm_dir_path(root) rel = (rel or "").lstrip("/") if not root: return rel if not rel: return root return f"{root}/{rel}" # ---------- model ---------- @dataclass(frozen=True) class ProcessRow: process_id: str src_stage: str src_paths: List[str] # 転出パス(roots)※最終的に1つを強制 exclude_paths: List[str] # 転出除外パス(prefix)※複数可 dst_stage: str # 転入段(互換のため保持) dst_base: str # 転入パス(互換のため保持)※最終的に1つ(非空)を強制 limit_tb: float # I列(TB) # ---------- read tanada ---------- def read_tanada_csv(path: Path) -> List[ProcessRow]: """ tanada.csv の列名揺れを吸収して読む。 """ with path.open("r", encoding="utf-8", newline="") as f: r = csv.DictReader(f) if not r.fieldnames: raise SystemExit("[ERROR] tanada.csv header is empty") hmap = _build_header_map(list(r.fieldnames)) rows: List[ProcessRow] = [] for row_i, row in enumerate(r, start=2): pid = _get(row, hmap, COL_PID) if not pid: continue src_stage = _get(row, hmap, COL_FROM_STAGE) dst_stage = _get(row, hmap, COL_TO_STAGE) if not src_stage or not dst_stage: raise SystemExit(f"[ERROR] tanada 必須列が空: row={row_i} pid={pid} {src_stage}->{dst_stage}") try: _ = stage_to_n(src_stage) _ = stage_to_n(dst_stage) except Exception: raise SystemExit(f"[ERROR] 段表記が不正: row={row_i} pid={pid} {src_stage}->{dst_stage}") src_paths = split_paths(_get(row, hmap, COL_FROM_PATHS)) exc_paths = split_paths(_get(row, hmap, COL_EXCLUDE)) limit_s = _get(row, hmap, COL_LIMIT_TB) limit_tb = parse_limit_tb(limit_s) dst_base = norm_dir_path(_get(row, hmap, COL_DST_BASE)) rows.append( ProcessRow( process_id=pid, src_stage=src_stage, src_paths=src_paths, exclude_paths=exc_paths, dst_stage=dst_stage, dst_base=dst_base, limit_tb=limit_tb, ) ) return rows # ---------- DB helpers ---------- def _table_columns(con: sqlite3.Connection, table: str) -> List[str]: try: rows = con.execute(f"PRAGMA table_info({table})").fetchall() return [str(r[1]) for r in rows] except Exception: return [] def _add_column_if_missing(con: sqlite3.Connection, table: str, col: str, ddl: str) -> None: cols = set(_table_columns(con, table)) if col in cols: return con.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}") con.commit() def ensure_plan_db(db_path: Path) -> sqlite3.Connection: db_path.parent.mkdir(parents=True, exist_ok=True) con = sqlite3.connect(str(db_path)) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") con.execute("PRAGMA temp_store=MEMORY;") con.execute(""" CREATE TABLE IF NOT EXISTS process_config ( plan_id TEXT NOT NULL, process_id TEXT NOT NULL, src_stage TEXT NOT NULL, dst_stage TEXT NOT NULL, src_paths TEXT NOT NULL, exclude_paths TEXT NOT NULL, limit_tb REAL NOT NULL DEFAULT 0, PRIMARY KEY (plan_id, process_id) ); """) con.execute("CREATE INDEX IF NOT EXISTS idx_process_config_pid ON process_config(plan_id, process_id);") con.execute(""" CREATE TABLE IF NOT EXISTS plan_runs ( plan_id TEXT PRIMARY KEY, created_time TEXT NOT NULL, scan_db TEXT NOT NULL, tanada_csv TEXT NOT NULL, process_ids TEXT NOT NULL ); """) con.execute(""" CREATE TABLE IF NOT EXISTS items_all ( plan_id TEXT NOT NULL, process_id TEXT NOT NULL, abs_path TEXT NOT NULL, -- 現ファイルパス(絶対) root_path TEXT NOT NULL, -- 現ルート(=転出パス) rel_path TEXT NOT NULL, -- 現ルート配下の相対(ファイル名含む) filename TEXT NOT NULL, size_bytes INTEGER NOT NULL, mtime INTEGER NOT NULL, mtime_iso TEXT NOT NULL, current_stage TEXT NOT NULL, new_stage TEXT NOT NULL, PRIMARY KEY (plan_id, abs_path) ); """) con.execute("CREATE INDEX IF NOT EXISTS idx_items_all_proc ON items_all(plan_id, process_id);") con.execute("CREATE INDEX IF NOT EXISTS idx_items_all_mtime ON items_all(plan_id, process_id, mtime);") # 追加列(無ければ追加) _add_column_if_missing(con, "items_all", "exclude_flag", "exclude_flag INTEGER NOT NULL DEFAULT 1") _add_column_if_missing(con, "items_all", "rank", "rank INTEGER NOT NULL DEFAULT 0") _add_column_if_missing(con, "items_all", "accums", "accums INTEGER NOT NULL DEFAULT 0") # ★今回の要件: 新ファイルパス&アクション _add_column_if_missing(con, "items_all", "to_root_path", "to_root_path TEXT NOT NULL DEFAULT ''") _add_column_if_missing(con, "items_all", "to_abs_path", "to_abs_path TEXT NOT NULL DEFAULT ''") _add_column_if_missing(con, "items_all", "action", "action TEXT NOT NULL DEFAULT ''") con.execute(""" CREATE TABLE IF NOT EXISTS excluded_items ( plan_id TEXT NOT NULL, process_id TEXT NOT NULL, abs_path TEXT NOT NULL, root_path TEXT, reason TEXT NOT NULL, matched_rule TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (plan_id, process_id, abs_path) ); """) con.execute("CREATE INDEX IF NOT EXISTS idx_excluded_items_proc ON excluded_items(plan_id, process_id);") con.execute(""" CREATE TABLE IF NOT EXISTS excluded_summary ( plan_id TEXT NOT NULL, current_stage TEXT NOT NULL, ex_count INTEGER NOT NULL, ex_accums INTEGER NOT NULL, PRIMARY KEY (plan_id, current_stage) ); """) con.execute(""" CREATE TABLE IF NOT EXISTS stage_change_items ( plan_id TEXT NOT NULL, process_id TEXT NOT NULL, abs_path TEXT NOT NULL, size_bytes INTEGER NOT NULL, from_stage TEXT NOT NULL, to_stage TEXT NOT NULL, PRIMARY KEY (plan_id, process_id, abs_path) ); """) con.execute(""" CREATE TABLE IF NOT EXISTS summary_transition ( plan_id TEXT NOT NULL, process_id TEXT NOT NULL, from_stage TEXT NOT NULL, to_stage TEXT NOT NULL, file_count INTEGER NOT NULL, total_bytes INTEGER NOT NULL, PRIMARY KEY (plan_id, process_id, from_stage, to_stage) ); """) con.execute(""" CREATE TABLE IF NOT EXISTS plan_summary ( plan_id TEXT NOT NULL, process_id TEXT NOT NULL, current_stage TEXT, dst_stage TEXT, limit_bytes INTEGER NOT NULL DEFAULT 0, keep_bytes_total INTEGER NOT NULL DEFAULT 0, effective_limit_bytes INTEGER NOT NULL DEFAULT 0, stage_change_files INTEGER NOT NULL DEFAULT 0, stage_change_bytes INTEGER NOT NULL DEFAULT 0, total_files INTEGER NOT NULL DEFAULT 0, total_bytes INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (plan_id, process_id) ); """) con.execute("CREATE INDEX IF NOT EXISTS idx_plan_summary_pid ON plan_summary(plan_id, process_id);") con.commit() return con # ---------- scan read ---------- def open_scan_db(scan_db: Path) -> sqlite3.Connection: if not scan_db.exists(): raise SystemExit(f"[ERROR] scan db not found: {scan_db}") return sqlite3.connect(str(scan_db)) def fetch_scan_files_for_roots( scan_con: sqlite3.Connection, roots: List[str], ) -> List[Tuple[str, str, str, str, int, int, str]]: if not roots: return [] q_marks = ",".join(["?"] * len(roots)) sql = f""" SELECT abs_path, root_path, rel_path, filename, size_bytes, mtime, mtime_iso FROM scan_files WHERE root_path IN ({q_marks}) """ return scan_con.execute(sql, roots).fetchall() # ---------- core ---------- def match_exclude(abs_path: str, exclude_prefixes: List[str]) -> Optional[str]: for pref in exclude_prefixes: if pref and is_under(abs_path, pref): return pref return None def _sorted_stages(processes: List[ProcessRow]) -> List[str]: stages = sorted({p.src_stage for p in processes}, key=stage_to_n) return stages def enforce_process_constraints(processes_use: List[ProcessRow]) -> None: """ 1つの処理idにおいて、 - 転出パスは1つ - 転入パスは1つ(非空) - 転出除外パスは複数可 を強制する。 """ for pr in processes_use: if len(pr.src_paths) != 1: raise SystemExit( f"[ERROR] 制約違反: 転出パスは1つのみ許容です: pid={pr.process_id} src_stage={pr.src_stage} src_paths={pr.src_paths}" ) if not pr.dst_base: raise SystemExit( f"[ERROR] 制約違反: 転入パスは空にできません(1つ必要): pid={pr.process_id} src_stage={pr.src_stage}" ) def build_stage_root_map(processes_use: List[ProcessRow]) -> Dict[str, str]: """ 段位 -> 物理ルート(転出パス) を 1対1 で作る。 このマップが「新段位での to_root_path をどこに置くか」を決める。 """ m: Dict[str, str] = {} for pr in processes_use: root = pr.src_paths[0] if pr.src_stage in m and m[pr.src_stage] != root: raise SystemExit( f"[ERROR] 段位ルートが一意ではありません: stage={pr.src_stage} {m[pr.src_stage]} vs {root}" ) m[pr.src_stage] = root # どの new_stage にも必ず root が必要(MOVE の to_root_path を決めるため) stages = sorted(m.keys(), key=stage_to_n) if not stages: raise SystemExit("[ERROR] stage_root_map is empty") return m def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--scan-db", required=True, help="scan SQLite(tanada_scan.py の出力)") ap.add_argument("--out-plan-db", required=True, help="出力 plan SQLite") ap.add_argument("--tanada-csv", default=str(DEFAULT_TANADA_CSV), help="tanada.csv(既定: /Users/xxxxxxxxm1/das/config/tanada.csv)") ap.add_argument("--plan-id", default="", help="plan_id(省略時は日時)") ap.add_argument("--process-ids", default="", help="処理idをカンマ指定(デバッグ用)") args = ap.parse_args() scan_db = Path(os.path.expanduser(args.scan_db)).resolve() out_plan_db = Path(os.path.expanduser(args.out_plan_db)).resolve() tanada_csv = Path(os.path.expanduser(args.tanada_csv)).resolve() plan_id = (args.plan_id or "").strip() or datetime.now().strftime("%Y%m%d_%H%M%S") if not tanada_csv.exists(): raise SystemExit(f"[ERROR] tanada csv not found: {tanada_csv}") processes = read_tanada_csv(tanada_csv) if not processes: raise SystemExit("[ERROR] no process rows in tanada.csv") allow_pids: Optional[set[str]] = None if args.process_ids.strip(): allow_pids = {p.strip() for p in args.process_ids.split(",") if p.strip()} processes_use = [p for p in processes if (allow_pids is None or p.process_id in allow_pids)] if not processes_use: raise SystemExit("[ERROR] no matched process rows (process-ids filter)") # ★制約を強制(ここで止める) enforce_process_constraints(processes_use) # ★段位 -> 物理ルート(転出パス) を決定 stage_root_map = build_stage_root_map(processes_use) scan_con = open_scan_db(scan_db) plan_con = ensure_plan_db(out_plan_db) cur = plan_con.cursor() pids_text = ",".join([p.process_id for p in processes_use]) cur.execute( "INSERT OR REPLACE INTO plan_runs(plan_id, created_time, scan_db, tanada_csv, process_ids) VALUES (?,?,?,?,?)", (plan_id, now_iso_local(), str(scan_db), str(tanada_csv), pids_text), ) # 既存の同 plan_id を消して作り直し(安全) cur.execute("DELETE FROM items_all WHERE plan_id=?", (plan_id,)) cur.execute("DELETE FROM excluded_items WHERE plan_id=?", (plan_id,)) cur.execute("DELETE FROM excluded_summary WHERE plan_id=?", (plan_id,)) cur.execute("DELETE FROM stage_change_items WHERE plan_id=?", (plan_id,)) cur.execute("DELETE FROM summary_transition WHERE plan_id=?", (plan_id,)) cur.execute("DELETE FROM plan_summary WHERE plan_id=?", (plan_id,)) cur.execute("DELETE FROM process_config WHERE plan_id=?", (plan_id,)) plan_con.commit() # --- (1)(2)(3): items_all 生成 + current_stage + exclude_flag --- all_rows: List[Tuple] = [] excluded_rows: List[Tuple] = [] # 段位ごとの limit_bytes(L(i)) limit_by_stage: Dict[str, int] = {} for pr in processes_use: limit_by_stage[pr.src_stage] = int(pr.limit_tb * TB_BYTES) for pr in processes_use: # process_config 保存(再現用) cur.execute( """INSERT OR REPLACE INTO process_config( plan_id, process_id, src_stage, dst_stage, src_paths, exclude_paths, limit_tb ) VALUES (?,?,?,?,?,?,?)""", ( plan_id, pr.process_id, pr.src_stage, pr.dst_stage, "\n".join(pr.src_paths), "\n".join(pr.exclude_paths), float(pr.limit_tb), ), ) rows = fetch_scan_files_for_roots(scan_con, pr.src_paths) for (abs_path, root_path, rel_path, filename, size_bytes, mtime, mtime_iso) in rows: rule = match_exclude(abs_path, pr.exclude_paths) exclude_flag = 2 if rule else 1 # 初期値(あとで通常データに対して rank/accums/new_stage を埋める) all_rows.append(( plan_id, pr.process_id, str(abs_path), str(root_path), str(rel_path), str(filename), int(size_bytes), int(mtime), str(mtime_iso), pr.src_stage, pr.src_stage, int(exclude_flag), 0, 0, "", "", "" # to_root_path, to_abs_path, action(後で確定) )) if exclude_flag == 2: excluded_rows.append(( plan_id, pr.process_id, str(abs_path), str(root_path), "EXCLUDE_PREFIX", rule or "" )) cur.executemany( "INSERT OR REPLACE INTO items_all(" "plan_id, process_id, abs_path, root_path, rel_path, filename," "size_bytes, mtime, mtime_iso, current_stage, new_stage," "exclude_flag, rank, accums," "to_root_path, to_abs_path, action" ") VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", all_rows ) if excluded_rows: cur.executemany( "INSERT OR REPLACE INTO excluded_items(plan_id, process_id, abs_path, root_path, reason, matched_rule) " "VALUES (?,?,?,?,?,?)", excluded_rows ) plan_con.commit() # --- (4)(5): 除外集計 ex_count/ex_accums(段位=現段位) --- cur.execute(""" INSERT OR REPLACE INTO excluded_summary(plan_id, current_stage, ex_count, ex_accums) SELECT plan_id, current_stage, COUNT(*) AS ex_count, COALESCE(SUM(size_bytes),0) AS ex_accums FROM items_all WHERE plan_id=? AND exclude_flag=2 GROUP BY plan_id, current_stage """, (plan_id,)) plan_con.commit() ex_by_stage: Dict[str, int] = {} for (stg, ex_acc) in cur.execute( "SELECT current_stage, ex_accums FROM excluded_summary WHERE plan_id=?", (plan_id,) ).fetchall(): ex_by_stage[str(stg)] = int(ex_acc) # --- (8): cap(i)=L(i)-ex_accums(i) の負値チェック + LA計算 --- stages = _sorted_stages(processes_use) # T0,T1,T2,... cap_by_stage: Dict[str, int] = {} for stg in stages: L = int(limit_by_stage.get(stg, 0)) ex = int(ex_by_stage.get(stg, 0)) cap = L - ex if cap < 0: raise SystemExit( f"[ERROR] 限度超過: stage={stg} L={L} ex_accums={ex} cap=L-ex={cap} (許容限度を見直してください)" ) cap_by_stage[stg] = cap # LA境界(上限値)を stage順に積み上げる la_upper: List[Tuple[str, int]] = [] run_sum = 0 for stg in stages: run_sum += int(cap_by_stage.get(stg, 0)) la_upper.append((stg, run_sum)) def decide_new_stage_by_accums(accums: int) -> str: lower = 0 for stg, upper in la_upper: if lower <= accums < upper: return stg lower = upper return stages[-1] if stages else "" # --- (6)(7)(8): 通常データ(exclude_flag=1)を mtime DESC で rank/accums/新段位 --- normal_items = cur.execute(""" SELECT abs_path, size_bytes, mtime FROM items_all WHERE plan_id=? AND exclude_flag=1 ORDER BY mtime DESC, abs_path ASC """, (plan_id,)).fetchall() updates: List[Tuple[str, int, int, str]] = [] # (new_stage, rank, accums, abs_path) acc = 0 rank = 0 for (abs_path, size_bytes, _mtime) in normal_items: rank += 1 acc += int(size_bytes) ns = decide_new_stage_by_accums(acc) updates.append((ns, rank, acc, str(abs_path))) cur.executemany( "UPDATE items_all SET new_stage=?, rank=?, accums=? WHERE plan_id=? AND abs_path=?", [(ns, r, a, plan_id, p) for (ns, r, a, p) in updates] ) # --- (9): 除外データは new_stage=current_stage(すでに初期値で一致) --- cur.execute( "UPDATE items_all SET new_stage=current_stage, rank=0, accums=0 WHERE plan_id=? AND exclude_flag=2", (plan_id,) ) plan_con.commit() # ------------------------------------------------------------------ # ★新ファイルパス(to_abs_path) と action の確定(ここが今回の肝) # - EXCLUDE: action=EXCLUDE, to_abs_path=abs_path(移動しない) # - KEEP : action=KEEP, to_abs_path=abs_path(移動しない) # - MOVE : action=MOVE, to_abs_path=stage_root_map[new_stage] + "/" + rel_path # ------------------------------------------------------------------ rows_for_path = cur.execute(""" SELECT abs_path, root_path, rel_path, current_stage, new_stage, exclude_flag FROM items_all WHERE plan_id=? """, (plan_id,)).fetchall() path_updates: List[Tuple[str, str, str, str]] = [] # (to_root_path, to_abs_path, action, abs_path) for (abs_path, root_path, rel_path, cur_stage, new_stage, ex_flag) in rows_for_path: abs_path = str(abs_path) root_path = str(root_path) rel_path = str(rel_path) cur_stage = str(cur_stage) new_stage = str(new_stage) ex_flag = int(ex_flag) if ex_flag == 2: action = "EXCLUDE" to_root = root_path to_abs = abs_path else: if new_stage == cur_stage: action = "KEEP" to_root = root_path to_abs = abs_path else: action = "MOVE" if new_stage not in stage_root_map: raise SystemExit(f"[ERROR] new_stage の物理ルートが未定義です: new_stage={new_stage} abs_path={abs_path}") to_root = stage_root_map[new_stage] to_abs = join_path(to_root, rel_path) if not to_abs: raise SystemExit(f"[ERROR] to_abs_path が空です(必ず埋める): abs_path={abs_path}") path_updates.append((to_root, to_abs, action, abs_path)) cur.executemany( "UPDATE items_all SET to_root_path=?, to_abs_path=?, action=? WHERE plan_id=? AND abs_path=?", [(tr, ta, ac, plan_id, ap) for (tr, ta, ac, ap) in path_updates] ) plan_con.commit() # --- stage_change_items / summary_transition --- stage_change = cur.execute(""" SELECT process_id, abs_path, size_bytes, current_stage, new_stage FROM items_all WHERE plan_id=? AND new_stage != current_stage """, (plan_id,)).fetchall() cur.executemany( "INSERT OR REPLACE INTO stage_change_items(plan_id, process_id, abs_path, size_bytes, from_stage, to_stage) " "VALUES (?,?,?,?,?,?)", [(plan_id, pid, p, int(sz), fs, ts) for (pid, p, sz, fs, ts) in stage_change] ) cur.execute(""" INSERT OR REPLACE INTO summary_transition(plan_id, process_id, from_stage, to_stage, file_count, total_bytes) SELECT plan_id, process_id, current_stage AS from_stage, new_stage AS to_stage, COUNT(*) AS file_count, COALESCE(SUM(size_bytes),0) AS total_bytes FROM items_all WHERE plan_id=? GROUP BY plan_id, process_id, current_stage, new_stage """, (plan_id,)) # plan_summary(process別 + __ALL__) total_all_files = int(cur.execute("SELECT COUNT(*) FROM items_all WHERE plan_id=?", (plan_id,)).fetchone()[0]) total_all_bytes = int(cur.execute("SELECT COALESCE(SUM(size_bytes),0) FROM items_all WHERE plan_id=?", (plan_id,)).fetchone()[0]) for pr in processes_use: files = int(cur.execute( "SELECT COUNT(*) FROM items_all WHERE plan_id=? AND process_id=?", (plan_id, pr.process_id) ).fetchone()[0]) total_bytes = int(cur.execute( "SELECT COALESCE(SUM(size_bytes),0) FROM items_all WHERE plan_id=? AND process_id=?", (plan_id, pr.process_id) ).fetchone()[0]) ex_bytes = int(cur.execute( "SELECT COALESCE(SUM(size_bytes),0) FROM items_all WHERE plan_id=? AND process_id=? AND exclude_flag=2", (plan_id, pr.process_id) ).fetchone()[0]) ch_files = int(cur.execute( "SELECT COUNT(*) FROM items_all WHERE plan_id=? AND process_id=? AND new_stage!=current_stage", (plan_id, pr.process_id) ).fetchone()[0]) ch_bytes = int(cur.execute( "SELECT COALESCE(SUM(size_bytes),0) FROM items_all WHERE plan_id=? AND process_id=? AND new_stage!=current_stage", (plan_id, pr.process_id) ).fetchone()[0]) limit_bytes = int(pr.limit_tb * TB_BYTES) cap = int(cap_by_stage.get(pr.src_stage, 0)) cur.execute( """INSERT OR REPLACE INTO plan_summary( plan_id, process_id, current_stage, dst_stage, limit_bytes, keep_bytes_total, effective_limit_bytes, stage_change_files, stage_change_bytes, total_files, total_bytes ) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", ( plan_id, pr.process_id, pr.src_stage, pr.dst_stage, int(limit_bytes), int(ex_bytes), int(cap), int(ch_files), int(ch_bytes), int(files), int(total_bytes), ), ) sum_limit_bytes = sum(int(p.limit_tb * TB_BYTES) for p in processes_use) sum_ex_bytes = int(cur.execute( "SELECT COALESCE(SUM(size_bytes),0) FROM items_all WHERE plan_id=? AND exclude_flag=2", (plan_id,) ).fetchone()[0]) sum_ch_files = int(cur.execute( "SELECT COUNT(*) FROM items_all WHERE plan_id=? AND new_stage!=current_stage", (plan_id,) ).fetchone()[0]) sum_ch_bytes = int(cur.execute( "SELECT COALESCE(SUM(size_bytes),0) FROM items_all WHERE plan_id=? AND new_stage!=current_stage", (plan_id,) ).fetchone()[0]) sum_cap = sum(cap_by_stage.values()) cur.execute( """INSERT OR REPLACE INTO plan_summary( plan_id, process_id, current_stage, dst_stage, limit_bytes, keep_bytes_total, effective_limit_bytes, stage_change_files, stage_change_bytes, total_files, total_bytes ) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", ( plan_id, "__ALL__", "", "", int(sum_limit_bytes), int(sum_ex_bytes), int(sum_cap), int(sum_ch_files), int(sum_ch_bytes), int(total_all_files), int(total_all_bytes), ), ) plan_con.commit() scan_con.close() plan_con.close() print(f"[OK] plan_id={plan_id}") print(f"[OK] out_plan_db={out_plan_db}") print(f"[OK] time={now_iso_local()}") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/python_scripts/tanada_pipeline.py tanada_pipeline.py ``` #!/usr/bin/env python3 # tanada_pipeline.py # # 棚田パイプライン実行制御(メタデータ側) # - tanada.csv を唯一の地形定義とする # - scan は1回(tanada 転出パス ALL を棚卸し) # - analyze は1回(ALL正本 plan DB を生成) # - 物理法則(重力): 転出段 < 転入段(逆流NG) ※ここが唯一の検証箇所 # # simulate_only: # True : scan/analyze は実行(=メタデータDB生成)/実体操作はしない # False : scan/analyze は実行(将来 notify/apply まで含める) from __future__ import annotations import argparse import csv import subprocess import sys from datetime import datetime from pathlib import Path from typing import List, Tuple DAS_BASE = Path("/Users/xxxxxxxxm1/das") CONFIG_TANADA_CSV = DAS_BASE / "config" / "tanada.csv" PY_BASE = Path("/Users/xxxxxxxxm1/python_scripts") SCAN_PY = PY_BASE / "tanada_scan.py" ANALYZE_PY = PY_BASE / "tanada_analyze.py" COL_PID = "処理id" COL_FROM = "転出段" COL_TO = "転入段" def stage_to_n(s: str) -> int: s = (s or "").strip() if not s.startswith("T"): raise ValueError(s) return int(s[1:]) def read_pipeline_from_tanada(csv_path: Path) -> List[Tuple[int, int, str]]: items: List[Tuple[int, int, str]] = [] with csv_path.open("r", encoding="utf-8", newline="") as f: r = csv.DictReader(f) for row_i, row in enumerate(r, start=2): pid = (row.get(COL_PID) or "").strip() fr = (row.get(COL_FROM) or "").strip() to = (row.get(COL_TO) or "").strip() if not pid: continue if not fr or not to: raise SystemExit(f"[ERROR] 転出段/転入段が空です: row={row_i} pid={pid}") try: fn = stage_to_n(fr) tn = stage_to_n(to) except Exception: raise SystemExit(f"[ERROR] 段表記が不正です: row={row_i} pid={pid} {fr}->{to}") # ---- 唯一の物理法則 ---- if fn >= tn: raise SystemExit(f"[ERROR] 重力違反: row={row_i} pid={pid} {fr}->{to}") items.append((fn, row_i, pid)) if not items: raise SystemExit("[ERROR] 有効な処理行が tanada.csv に存在しません") items.sort(key=lambda x: (x[0], x[1])) return items def run(cmd: List[str]): print("[RUN]", " ".join(cmd)) subprocess.run(cmd, check=True) def default_run_id() -> str: return datetime.now().strftime("%Y%m%d_%H%M%S") def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--run-id", default="", help="実行ID(省略時は日時)") ap.add_argument("--simulate-only", action="store_true", help="scan/analyze は実行するが、将来のapplyはしない") ap.add_argument("--skip-scan", action="store_true", help="既存 scan_db を使う(デバッグ用)") ap.add_argument("--scan-db", default="", help="--skip-scan 時に使う scan DB(必須)") ap.add_argument("--process-ids", default="", help="デバッグ用: process_id をカンマ指定(任意)") args = ap.parse_args() if not CONFIG_TANADA_CSV.exists(): raise SystemExit(f"[ERROR] tanada.csv not found: {CONFIG_TANADA_CSV}") run_id = (args.run_id or "").strip() or default_run_id() # 物理法則チェック(表の整合性をここで担保) _pipeline = read_pipeline_from_tanada(CONFIG_TANADA_CSV) print("[INFO] pipeline order:", [pid for (_, _, pid) in _pipeline]) # scan_db / plan_db は集合名が分かる命名に if args.skip_scan: if not args.scan_db.strip(): raise SystemExit("[ERROR] --skip-scan requires --scan-db") scan_db = Path(args.scan_db).expanduser().resolve() if not scan_db.exists(): raise SystemExit(f"[ERROR] scan db not found: {scan_db}") print("[INFO] skip scan, use existing scan_db:", scan_db) else: scan_db = DAS_BASE / "meta" / f"scan_{run_id}__tanada_roots_ALL.sqlite" run(["python3", str(SCAN_PY), "--scan-id", run_id, "--out-db", str(scan_db)]) plan_db = DAS_BASE / "meta" / f"plan_{run_id}__tanada_items_ALL.sqlite" analyze_cmd = [ "python3", str(ANALYZE_PY), "--scan-db", str(scan_db), "--out-plan-db", str(plan_db), "--tanada-csv", str(CONFIG_TANADA_CSV), "--plan-id", run_id, ] if args.process_ids.strip(): analyze_cmd += ["--process-ids", args.process_ids.strip()] run(analyze_cmd) print("[OK] pipeline completed", "(simulate_only)" if args.simulate_only else "") print(f"[OK] scan_db={scan_db}") print(f"[OK] plan_db={plan_db}") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/python_scripts/tanada_notify.py tanada_notify.py ``` #!/usr/bin/env python3 # tanada_notify.py # # Move plan DB(plan_*.sqlite) を読み、 # 1) 詳細レポートをstdoutに出す # 2) 5段階評価つき「要約」を作る(今回合意した書式) # 3) action_result シートへ要約を出す(A6 など) # 4) メール通知(要約 + DBパス)を送る # # 重要: # - 要約は必ず stdout にも出す(print-onlyでも見失わない) # - 段位(T0/T1/...)はDBから動的取得(静的固定しない) # - 「現状/結果」の段位別合計は items_all から集計する(summary_transition が “移動のみ” でも崩れない) # # DB差異吸収: # - summary_transition: (current_stage,new_stage) 版 / (from_stage,to_stage) 版 両対応 # - plan_summary: # - wide形式(process_id列あり)にも # - KV形式(plan_id,k,v の3列)にも対応する # # 注意: # - KV形式 plan_summary では limit/keep が存在しないことがあるため、 # 評価は「暫定」として扱う(設定限度が無ければ厳密評価はできない)。 # # ★今回の修正ポイント(あなたの要望) # - 要約内の「設定限度なし」表現を廃止し、 # 「設定限度は tanada シートに設定した通りとし」と表記する。 # - plan_summary(wide) から limit/KEEP を取得できる場合は評価にも利用する。 # - summary_transition の列差異(from_stage/to_stage)でも WHERE 句が壊れないよう修正。 from __future__ import annotations import argparse import os import re import sqlite3 import subprocess import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Tuple, Optional DEFAULT_MAIL_TO = "bbbbbbbb.yyyyyyyy@gmail.com" MAIL_TIMEOUT_SEC = 180 RISK_LEVELS = ("問題なし", "軽微な注意", "注意", "警戒", "非常に危険") def now_str() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def human_bytes(n: int) -> str: units = ["B", "KB", "MB", "GB", "TB", "PB", "EB"] x = float(int(n or 0)) for u in units: if x < 1024: return f"{x:.2f}{u}" x /= 1024 return f"{x:.2f}ZB" def to_tb_str(n_bytes: int) -> str: tb = float(int(n_bytes or 0)) / (1024.0 ** 4) return f"{tb:.2f}TB" def safe_int(v: Any) -> int: try: return int(v or 0) except Exception: return 0 def send_mail(to_addr: str, subject: str, body: str) -> None: """ Mail.app で送信(osascript)。 """ script = r''' on run argv set toAddr to item 1 of argv set subj to item 2 of argv set contentText to item 3 of argv with timeout of {TIMEOUT} seconds tell application "Mail" set m to make new outgoing message with properties {{subject:subj, content:contentText, visible:false}} tell m make new to recipient at end of to recipients with properties {{address:toAddr}} send end tell end tell end timeout return true end run '''.replace("{TIMEOUT}", str(MAIL_TIMEOUT_SEC)).strip() subprocess.run( ["/usr/bin/osascript", "-e", script, to_addr, subject, body], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) def has_table(conn: sqlite3.Connection, name: str) -> bool: r = conn.execute( "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,), ).fetchone() return r is not None def _table_columns(conn: sqlite3.Connection, table: str) -> List[str]: try: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() return [str(r[1]) for r in rows] except Exception: return [] def detect_plan_id(conn: sqlite3.Connection) -> str: if not has_table(conn, "plan_runs"): return "" cols = _table_columns(conn, "plan_runs") order_col = "created_time" if "created_time" in cols else "rowid" row = conn.execute(f"SELECT plan_id FROM plan_runs ORDER BY {order_col} DESC LIMIT 1").fetchone() return str(row[0]) if row else "" def _count_table(conn: sqlite3.Connection, table: str, plan_id: str = "") -> int: if not has_table(conn, table): return 0 cols = _table_columns(conn, table) if plan_id and "plan_id" in cols: row = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE plan_id=?", (plan_id,)).fetchone() else: row = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone() return int(row[0] or 0) if row else 0 def _sum_bytes_items_all(conn: sqlite3.Connection, plan_id: str = "") -> int: if not has_table(conn, "items_all"): return 0 cols = _table_columns(conn, "items_all") if plan_id and "plan_id" in cols: row = conn.execute("SELECT SUM(size_bytes) FROM items_all WHERE plan_id=?", (plan_id,)).fetchone() else: row = conn.execute("SELECT SUM(size_bytes) FROM items_all").fetchone() return int(row[0] or 0) if row and row[0] is not None else 0 def get_scope_stats(conn: sqlite3.Connection, plan_id: str) -> Dict[str, int]: st: Dict[str, int] = {} st["items_all_cnt"] = _count_table(conn, "items_all", plan_id=plan_id) st["items_all_bytes"] = _sum_bytes_items_all(conn, plan_id=plan_id) st["keep_items_cnt"] = _count_table(conn, "keep_items", plan_id=plan_id) st["excluded_items_cnt"] = _count_table(conn, "excluded_items", plan_id=plan_id) st["stage_change_items_cnt"] = _count_table(conn, "stage_change_items", plan_id=plan_id) st["summary_transition_cnt"] = _count_table(conn, "summary_transition", plan_id=plan_id) st["plan_summary_cnt"] = _count_table(conn, "plan_summary", plan_id=plan_id) st["process_config_cnt"] = _count_table(conn, "process_config", plan_id=plan_id) return st def format_scope_text(plan_id: str, plan_db: Path, st: Dict[str, int]) -> str: return ( "(1)スコープ確認\n" f"plan_id={plan_id}\n" f"items_all={st.get('items_all_cnt',0):,}件 / {to_tb_str(st.get('items_all_bytes',0))}\n" f"keep_items={st.get('keep_items_cnt',0):,}件\n" f"excluded_items={st.get('excluded_items_cnt',0):,}件\n" f"stage_change_items={st.get('stage_change_items_cnt',0):,}件\n" f"summary_transition={st.get('summary_transition_cnt',0):,}行\n" f"plan_summary={st.get('plan_summary_cnt',0):,}行\n" f"process_config={st.get('process_config_cnt',0):,}行\n" f"DB={str(plan_db)}" ) def build_error_message(plan_id: str, plan_db: Path, scope: Dict[str, int], extra: str) -> str: return ( "ERROR: メタデータ計算((1)スコープ)が未完了のため要約を生成しません。\n" f"plan_id={plan_id}\n" f"items_all={scope.get('items_all_cnt',0):,}件 / {to_tb_str(scope.get('items_all_bytes',0))}\n" f"summary_transition={scope.get('summary_transition_cnt',0):,}行\n" f"plan_summary={scope.get('plan_summary_cnt',0):,}行\n" f"DB={str(plan_db)}\n" f"原因候補: {extra}" ) # ---------------- plan_summary(wide / KV) ---------------- def load_plan_summary_kv(conn: sqlite3.Connection, plan_id: str) -> Dict[str, Any]: """ plan_summary が (plan_id,k,v) のKV形式の場合に読む。 返り値: {"total_bytes": ..., "total_files": ..., ...} """ if not has_table(conn, "plan_summary"): return {} cols = _table_columns(conn, "plan_summary") if not ("plan_id" in cols and "k" in cols and "v" in cols): return {} rows = conn.execute( "SELECT k, v FROM plan_summary WHERE plan_id=?", (plan_id,), ).fetchall() out: Dict[str, Any] = {} for k, v in rows: out[str(k)] = v return out def load_plan_summary_wide(conn: sqlite3.Connection, plan_id: str) -> Dict[str, Dict[str, Any]]: """ plan_summary が wide形式(process_id列などを持つ)場合に読む。 返り値: {process_id: rowdict} """ if not has_table(conn, "plan_summary"): return {} cols = _table_columns(conn, "plan_summary") pid_col = "" for cand in ["process_id", "process", "pid", "process_name"]: if cand in cols: pid_col = cand break if not pid_col: return {} if plan_id and "plan_id" in cols: rows = conn.execute("SELECT * FROM plan_summary WHERE plan_id=?", (plan_id,)).fetchall() else: rows = conn.execute("SELECT * FROM plan_summary").fetchall() if not rows: return {} desc = conn.execute("SELECT * FROM plan_summary LIMIT 1").description colnames = [d[0] for d in desc] out: Dict[str, Dict[str, Any]] = {} for row in rows: d = dict(zip(colnames, row)) pid = str(d.get(pid_col) or "UNKNOWN") out[pid] = d return out def extract_limits_keeps_from_ps_wide(ps_wide: Dict[str, Dict[str, Any]]) -> Tuple[Dict[str, int], Dict[str, int]]: """ plan_summary(wide) がある場合: - current_stage ごとに limit_bytes / keep_bytes_total を集約して返す """ limits: Dict[str, int] = {} keeps: Dict[str, int] = {} if not ps_wide: return limits, keeps for pid, row in ps_wide.items(): if str(pid) == "__ALL__": continue cs = str(row.get("current_stage") or "").strip() if not cs: continue lim = safe_int(row.get("limit_bytes")) keep = safe_int(row.get("keep_bytes_total")) if lim > 0: limits[cs] = limits.get(cs, 0) + lim if keep > 0: keeps[cs] = keeps.get(cs, 0) + keep return limits, keeps # ---------------- transitions(変更のみ) ---------------- def load_transition_changes(conn: sqlite3.Connection, plan_id: str = "") -> List[Tuple[str, str, str, int, int]]: """ 変更のみ(cs!=ns)の集計を返す。 優先: summary_transition(列名差異吸収) fallback: items_all を group by return: (process_id, current_stage, new_stage, file_count, total_bytes) """ if has_table(conn, "summary_transition"): cols = _table_columns(conn, "summary_transition") has_current = ("current_stage" in cols and "new_stage" in cols) has_from = ("from_stage" in cols and "to_stage" in cols) if has_current: sel = "process_id, current_stage, new_stage, file_count, total_bytes" where_diff = "(current_stage != new_stage)" elif has_from: sel = "process_id, from_stage as current_stage, to_stage as new_stage, file_count, total_bytes" where_diff = "(from_stage != to_stage)" else: sel = "" where_diff = "" if sel: if plan_id and "plan_id" in cols: rows = conn.execute( f"""SELECT {sel} FROM summary_transition WHERE plan_id=? AND {where_diff} ORDER BY process_id, current_stage, new_stage""", (plan_id,), ).fetchall() else: rows = conn.execute( f"""SELECT {sel} FROM summary_transition WHERE {where_diff} ORDER BY process_id, current_stage, new_stage""" ).fetchall() return [(str(r[0]), str(r[1]), str(r[2]), int(r[3] or 0), int(r[4] or 0)) for r in rows] # fallback: items_all から変更のみを集計 if not has_table(conn, "items_all"): return [] cols = _table_columns(conn, "items_all") if not ("process_id" in cols and "current_stage" in cols and "new_stage" in cols and "size_bytes" in cols): return [] if plan_id and "plan_id" in cols: rows = conn.execute( """SELECT process_id, current_stage, new_stage, COUNT(*) as c, SUM(size_bytes) as b FROM items_all WHERE plan_id=? AND current_stage != new_stage GROUP BY process_id, current_stage, new_stage ORDER BY process_id, current_stage, new_stage""", (plan_id,), ).fetchall() else: rows = conn.execute( """SELECT process_id, current_stage, new_stage, COUNT(*) as c, SUM(size_bytes) as b FROM items_all WHERE current_stage != new_stage GROUP BY process_id, current_stage, new_stage ORDER BY process_id, current_stage, new_stage""" ).fetchall() return [(str(r[0]), str(r[1]), str(r[2]), int(r[3] or 0), int(r[4] or 0)) for r in rows] def load_stage_totals_from_items_all(conn: sqlite3.Connection, plan_id: str) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]: """ items_all から “現状(current_stage)” と “結果(new_stage)” を段位別に集計。 """ cur: Dict[str, Dict[str, int]] = {} res: Dict[str, Dict[str, int]] = {} if not has_table(conn, "items_all"): return cur, res cols = _table_columns(conn, "items_all") if not ("current_stage" in cols and "new_stage" in cols and "size_bytes" in cols): return cur, res use_plan = ("plan_id" in cols and bool(plan_id)) where = "WHERE plan_id=?" if use_plan else "" params = (plan_id,) if use_plan else () rows = conn.execute( f"""SELECT current_stage, COUNT(*) as c, SUM(size_bytes) as b FROM items_all {where} GROUP BY current_stage""", params, ).fetchall() for st, c, b in rows: cur[str(st)] = {"files": int(c or 0), "bytes": int(b or 0)} rows = conn.execute( f"""SELECT new_stage, COUNT(*) as c, SUM(size_bytes) as b FROM items_all {where} GROUP BY new_stage""", params, ).fetchall() for st, c, b in rows: res[str(st)] = {"files": int(c or 0), "bytes": int(b or 0)} return cur, res # ---------------- レポート/要約 ---------------- def stage_key(s: str) -> Tuple[int, str]: m = re.fullmatch(r"T(\d+)", str(s).strip()) if m: return (int(m.group(1)), str(s)) return (10**9, str(s)) def format_stage_line(stage: str, d: Dict[str, int]) -> str: return f"{stage}: {to_tb_str(d.get('bytes',0))}({d.get('files',0):,}件)" def build_move_list(changes: List[Tuple[str, str, str, int, int]]) -> List[Tuple[str, str, int, int]]: agg: Dict[Tuple[str, str], Dict[str, int]] = {} for _pid, cs, ns, c, b in changes: if cs == ns: continue k = (cs, ns) agg.setdefault(k, {"files": 0, "bytes": 0}) agg[k]["files"] += int(c or 0) agg[k]["bytes"] += int(b or 0) items = [(a, b, d["files"], d["bytes"]) for (a, b), d in agg.items()] items.sort(key=lambda x: x[3], reverse=True) return items def evaluate_5level(total_move_bytes: int, total_move_files: int, limits: Dict[str, int], keeps: Dict[str, int]) -> str: # limits が取れない場合は暫定評価 if not limits: if total_move_files <= 0 or total_move_bytes <= 0: return "問題なし" return "注意" if total_move_files <= 0 or total_move_bytes <= 0: return "問題なし" # 判定対象限度(limit - KEEP)の最小を見て、残余が薄いのに移動量が大きい場合は強く警告 eff_list = [] for st, lim in limits.items(): eff = max(int(lim or 0) - int(keeps.get(st, 0) or 0), 0) if lim > 0: eff_list.append(eff) if eff_list: min_eff = min(eff_list) if min_eff <= 20 * (1024**3) and total_move_bytes >= 300 * (1024**3): return "非常に危険" if min_eff <= 50 * (1024**3) and total_move_bytes >= 200 * (1024**3): return "警戒" base_lim = max(limits.values()) if limits else 0 if base_lim > 0: ratio = total_move_bytes / base_lim if ratio > 1.0: return "警戒" if ratio > 0.3: return "注意" return "軽微な注意" return "注意" def _trim_ja_sentence_safe(text: str, max_len: int) -> str: if len(text) <= max_len: return text cut = text[:max_len] i = cut.rfind("。") if i >= 0 and i >= max_len - 140: return cut[: i + 1] return cut.rstrip()[:-1] + "…" def enforce_len_ja(summary: str, min_len: int = 300, max_len: int = 400) -> str: pad = "次にやることは、除外パスの妥当性確認→再simulate→問題なければapplyです。" if len(summary) < min_len: summary = summary.rstrip() + "\n" + pad summary = _trim_ja_sentence_safe(summary, max_len) if summary and (not summary.endswith("。")) and (not summary.endswith("…")): summary += "。" return summary def build_summary_template( cur_by_stage: Dict[str, Dict[str, int]], res_by_stage: Dict[str, Dict[str, int]], move_list: List[Tuple[str, str, int, int]], limits: Dict[str, int], keeps: Dict[str, int], ) -> str: stages = sorted(set(cur_by_stage.keys()) | set(res_by_stage.keys()), key=stage_key) cur_total_b = sum(d.get("bytes", 0) for d in cur_by_stage.values()) cur_total_f = sum(d.get("files", 0) for d in cur_by_stage.values()) res_total_b = sum(d.get("bytes", 0) for d in res_by_stage.values()) res_total_f = sum(d.get("files", 0) for d in res_by_stage.values()) move_total_b = sum(bb for _a, _b2, _c, bb in move_list) move_total_f = sum(c for _a, _b2, c, _bb in move_list) risk = evaluate_5level(move_total_b, move_total_f, limits, keeps) lines: List[str] = [] lines.append(f"【評価:{risk}】") lines.append("現在、対象とする全ファイルは") cur_lines = "、".join([format_stage_line(st, cur_by_stage.get(st, {"bytes": 0, "files": 0})) for st in stages]) lines.append(f"{cur_lines}の合計 {to_tb_str(cur_total_b)}({cur_total_f:,}件)存在します。") # ★表記修正:「設定限度は tanada シートに設定した通り」と明言(“設定限度なし”を出さない) lines.append( "シミュレーション条件として、設定限度は tanada シートに設定した通りとし、" "ファイルの変更日時を基準に古いものから下段へ、転出除外パスを除外して新段位を割り付けた結果、" ) res_lines = "、".join([format_stage_line(st, res_by_stage.get(st, {"bytes": 0, "files": 0})) for st in stages]) lines.append(f"{res_lines}の合計 {to_tb_str(res_total_b)}({res_total_f:,}件)となりました。") if move_list: a, b, c, bb = move_list[0] lines.append( f"なお、この過程で {a}→{b} に {to_tb_str(bb)}({c:,}件)の移動が発生します(他段位間の移動はありません)。" ) else: lines.append("なお、この過程で段位間の移動は発生しません。") # 末尾の評価コメント if limits: lines.append("評価としては、設定限度とKEEPの差(判定対象限度)により移動量が増減するため、限度値と除外パスの妥当性を確認してください。") else: lines.append("評価としては、設定限度(limit/KEEP)がDBから取得できないため暫定です。移動量(何件/何TB)が妥当かで判断してください。") summary = "\n".join(lines) return enforce_len_ja(summary, min_len=300, max_len=400) def build_detail_report( plan_id: str, ps_wide: Dict[str, Dict[str, Any]], ps_kv: Dict[str, Any], changes: List[Tuple[str, str, str, int, int]], ) -> str: lines: List[str] = [] lines.append("DAS 棚田管理 / 段位シミュレーション(メタデータ)") lines.append(f"生成日時: {now_str()}") if plan_id: lines.append(f"plan_id: {plan_id}") lines.append("") if ps_kv and not ps_wide: lines.append("■ plan_summary(KV)") for k in sorted(ps_kv.keys()): v = ps_kv[k] if k.endswith("_bytes"): lines.append(f" {k}: {human_bytes(safe_int(v))}") else: lines.append(f" {k}: {v}") lines.append("") elif ps_wide: pids = sorted(ps_wide.keys()) for pid in pids: s = ps_wide.get(pid, {}) cur_total = safe_int(s.get("total_bytes")) limit_bytes = safe_int(s.get("limit_bytes")) keep_bytes = safe_int(s.get("keep_bytes_total")) eff = safe_int(s.get("effective_limit_bytes")) chg_files = safe_int(s.get("stage_change_files")) chg_bytes = safe_int(s.get("stage_change_bytes")) lines.append(f"■ {pid}") lines.append( f" total={human_bytes(cur_total)} / limit(I)={human_bytes(limit_bytes)} / KEEP(E)={human_bytes(keep_bytes)} / 判定限度(I−E)={human_bytes(eff)}" ) lines.append(f" 段位変更: {chg_files} 件 / {human_bytes(chg_bytes)}") lines.append("") else: lines.append("■ plan_summary なし") lines.append("") if changes: by_pid: Dict[str, List[Tuple[str, str, str, int, int]]] = {} for t in changes: by_pid.setdefault(t[0], []).append(t) for pid in sorted(by_pid.keys()): lines.append(f"■ {pid}") lines.append(" 変更(どこ→どこへ 何件 何サイズ):") for _, cs, ns, c, b in by_pid[pid]: lines.append(f" - {cs} → {ns} : {c} 件 / {human_bytes(b)}") lines.append("") else: lines.append("■ 変更") lines.append(" 変更: 0(移動なし)") lines.append("") return "\n".join(lines) # ---------------- Sheets ---------------- def sheets_update_cell_v4(cred_json: Path, spreadsheet_id: str, a1_range: str, value: str) -> None: try: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build except Exception as e: raise RuntimeError("Google API ライブラリが見つかりません(google-auth / google-api-python-client が必要)") from e scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) body = {"values": [[value]]} service.spreadsheets().values().update( spreadsheetId=spreadsheet_id, range=a1_range, valueInputOption="RAW", body=body, ).execute() # ---------------- main ---------------- def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--plan-db", required=True) ap.add_argument("--plan-id", default="", help="省略時はplan_runsから最新を自動選択") ap.add_argument("--print-only", action="store_true", help="メール送信しない(stdout/シートは実行可)") ap.add_argument("--mode", default="simulate_only", choices=["simulate_only", "apply"], help="表示上のモード") ap.add_argument("--write-sheet", action="store_true", help="Google Sheets(action_result)へ要約を書き込む") ap.add_argument("--cred-json", default="", help="Service Account JSON(省略時は env:DAS_CRED_JSON)") ap.add_argument("--spreadsheet-id", default="", help="Spreadsheet ID(省略時は env:DAS_SHEET_ID)") ap.add_argument("--sheet-name", default="action_result", help="出力先シート名") ap.add_argument("--summary-cell", default="A6", help="要約セル(A1)") ap.add_argument("--dbpath-cell", default="A8", help="DBパスセル(A1)") ap.add_argument("--mail-to", default="", help="送信先(省略時は固定 or env:DAS_MAIL_TO)") ap.add_argument("--mail-subject", default="【棚田管理】要約通知(段位シミュレーション)", help="メール件名") args = ap.parse_args() plan_db = Path(args.plan_db).expanduser().resolve() if not plan_db.exists(): print("[ERROR] plan-db not found", file=sys.stderr) return 1 conn = sqlite3.connect(str(plan_db)) try: plan_id = (args.plan_id or "").strip() or detect_plan_id(conn) if not plan_id: plan_id = "UNKNOWN" scope = get_scope_stats(conn, plan_id) print(format_scope_text(plan_id, plan_db, scope)) print("") if scope.get("items_all_cnt", 0) <= 0: err = build_error_message( plan_id, plan_db, scope, extra="items_all が 0 件。tanada_analyze の出力DBを確認してください。" ) print(err, file=sys.stderr) return 6 ps_kv = load_plan_summary_kv(conn, plan_id) ps_wide = load_plan_summary_wide(conn, plan_id) changes = load_transition_changes(conn, plan_id=plan_id) cur_by_stage, res_by_stage = load_stage_totals_from_items_all(conn, plan_id) if not cur_by_stage or not res_by_stage: err = build_error_message( plan_id, plan_db, scope, extra="items_all から段位別合計を取得できません(DBスキーマ不一致)。" ) print(err, file=sys.stderr) return 7 finally: conn.close() detail = build_detail_report(plan_id, ps_wide=ps_wide, ps_kv=ps_kv, changes=changes) print(detail) move_list = build_move_list(changes) # ★ここが重要:wideがあれば limit/KEEP を取得して評価に使う limits: Dict[str, int] = {} keeps: Dict[str, int] = {} if ps_wide: limits, keeps = extract_limits_keeps_from_ps_wide(ps_wide) summary = build_summary_template( cur_by_stage=cur_by_stage, res_by_stage=res_by_stage, move_list=move_list, limits=limits, keeps=keeps, ) print("\n---\n[SUMMARY]\n" + summary) env_sid = os.environ.get("DAS_SHEET_ID", "").strip() env_cred = os.environ.get("DAS_CRED_JSON", "").strip() spreadsheet_id = (args.spreadsheet_id or env_sid).strip() cred_json_s = (args.cred_json or env_cred).strip() do_write_sheet = bool(args.write_sheet or (spreadsheet_id and cred_json_s)) if do_write_sheet: if not spreadsheet_id or not cred_json_s: print("[ERROR] sheet write requires spreadsheet-id and cred-json (or env DAS_SHEET_ID / DAS_CRED_JSON)", file=sys.stderr) return 3 cred_json = Path(cred_json_s).expanduser().resolve() if not cred_json.exists(): print(f"[ERROR] cred-json not found: {cred_json}", file=sys.stderr) return 3 summary_range = f"{args.sheet_name}!{args.summary_cell}" db_range = f"{args.sheet_name}!{args.dbpath_cell}" try: sheets_update_cell_v4(cred_json, spreadsheet_id, summary_range, summary) sheets_update_cell_v4(cred_json, spreadsheet_id, db_range, str(plan_db)) print("[OK] sheet write: summary->" + summary_range + ", dbpath->" + db_range) except Exception as e: print(f"[ERROR] sheet write failed: {e}", file=sys.stderr) return 4 if not args.print_only: env_mail_to = os.environ.get("DAS_MAIL_TO", "").strip() mail_to = (args.mail_to or env_mail_to or DEFAULT_MAIL_TO).strip() subject = args.mail_subject body = summary + "\n\n" + str(plan_db) try: send_mail(mail_to, subject, body) print("[OK] mail sent to:", mail_to) except subprocess.CalledProcessError as e: print("[ERROR] Mail.app send failed", file=sys.stderr) if e.stderr: print(e.stderr, file=sys.stderr) return 2 return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/das/tanada_button_daemon.py ``` #!/usr/bin/env python3 from __future__ import annotations import os import sys import time import subprocess from pathlib import Path from datetime import datetime, time as dtime SHEET_NAME = "action_result" REQ_CELL = "B2" STATUS_CELL = "B3" POLL_SEC = 8 # ===== 静音時間帯設定 ===== QUIET_START = dtime(0, 55) QUIET_END = dtime(2, 10) QUIET_SLEEP = 60 # 静音中は60秒単位で待機 # ========================= def ts() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def in_quiet_hours() -> bool: now = datetime.now().time() return QUIET_START <= now <= QUIET_END def sheets_read(cred_json: Path, sid: str, a1: str) -> str: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) r = service.spreadsheets().values().get(spreadsheetId=sid, range=a1).execute() vals = r.get("values", []) if not vals or not vals[0]: return "" return str(vals[0][0]) def sheets_write(cred_json: Path, sid: str, a1: str, value: str) -> None: from google.oauth2.service_account import Credentials from googleapiclient.discovery import build scopes = ["https://www.googleapis.com/auth/spreadsheets"] creds = Credentials.from_service_account_file(str(cred_json), scopes=scopes) service = build("sheets", "v4", credentials=creds, cache_discovery=False) service.spreadsheets().values().update( spreadsheetId=sid, range=a1, valueInputOption="RAW", body={"values": [[value]]}, ).execute() def run_runner(req_id: str) -> tuple[int, str]: """ ボタン押下(req_id)でのみ runner を起動する。 - MANUAL_RUN=1 を必ず付与(das_runner.sh の手動ガードを通す) - RUN_ID=req_id を付与(plan DB 名がボタンIDと一致する) - PRINT_ONLY=0 を強制(ボタン実行は通知あり) """ env = os.environ.copy() env["MANUAL_RUN"] = "1" env["RUN_ID"] = req_id env["PRINT_ONLY"] = "0" p = subprocess.run( ["bash", str(Path.home() / "das" / "das_runner.sh")], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, ) return p.returncode, p.stdout def main() -> int: sid = os.environ.get("DAS_SHEET_ID", "").strip() cred = os.environ.get("DAS_CRED_JSON", "").strip() if not sid or not cred: print("[ERROR] env missing: DAS_SHEET_ID / DAS_CRED_JSON", file=sys.stderr, flush=True) return 2 cred_json = Path(cred).expanduser().resolve() if not cred_json.exists(): print(f"[ERROR] cred json not found: {cred_json}", file=sys.stderr, flush=True) return 2 a1_req = f"{SHEET_NAME}!{REQ_CELL}" a1_status = f"{SHEET_NAME}!{STATUS_CELL}" last_req = "" print(f"[INFO] {ts()} tanada_button_daemon started", flush=True) print(f"[INFO] watching: {sid} {a1_req} poll={POLL_SEC}s", flush=True) while True: try: # ===== 静音時間帯ガード ===== if in_quiet_hours(): time.sleep(QUIET_SLEEP) continue # =========================== req = sheets_read(cred_json, sid, a1_req).strip() if req and req != last_req: # ★ここが根治点: # いったん拾ったら、即B2を空に戻す(再起動で再実行しない) last_req = req sheets_write(cred_json, sid, a1_status, f"RUNNING: {req}") try: sheets_write(cred_json, sid, a1_req, "") except Exception: # B2クリア失敗でも処理は続行(ただし再発の可能性は残る) pass code, out = run_runner(req) if code == 0: sheets_write(cred_json, sid, a1_status, f"DONE: {req}") else: head = "\n".join(out.splitlines()[:25]) sheets_write(cred_json, sid, a1_status, f"ERROR: {req}\n{head}") time.sleep(POLL_SEC) except Exception as e: try: sheets_write(cred_json, sid, a1_status, f"DAEMON_ERROR: {e}") except Exception: pass time.sleep(POLL_SEC) return 0 if __name__ == "__main__": raise SystemExit(main()) ``` /Users/xxxxxxxxm1/das/das_runner.sh ``` #!/bin/bash # das_runner.sh # # 目的: # - 「ボタン1発」で simulate を回すためのランチャ # - export(tanada sheet) → scan/analyze(ALL plan DB) → notify(メール/表示/シート) # # 必須: # - DAS_SHEET_ID : Spreadsheet ID # - DAS_CRED_JSON : Service Account credential JSON のフルパス # # 任意: # - RUN_ID : 実行ID(省略時は日時) # - PRINT_ONLY=1 : メール送信しない(シート出力は行う) # # 追加: # - MANUAL_RUN=1 : 手動実行フラグ(これが無い場合は即終了=自動実行を遮断) set -euo pipefail # ===== 手動実行限定ガード ===== # 明示的に MANUAL_RUN=1 が指定されていない場合は即終了 if [[ "${MANUAL_RUN:-0}" != "1" ]]; then echo "[ABORT] das_runner.sh is manual-only. Set MANUAL_RUN=1 to run." >&2 exit 0 fi # =============================== : "${DAS_SHEET_ID:?Set DAS_SHEET_ID}" : "${DAS_CRED_JSON:?Set DAS_CRED_JSON}" BASE_DIR="$HOME/das" LOG_DIR="$BASE_DIR/logs" LOG_FILE="$LOG_DIR/das_runner.log" mkdir -p "$LOG_DIR" ts() { date "+%Y-%m-%d %H:%M:%S"; } log() { echo "[$(ts)] $*" | tee -a "$LOG_FILE" >/dev/null; } RUN_ID="${RUN_ID:-$(date "+%Y%m%d_%H%M%S")}" PRINT_ONLY="${PRINT_ONLY:-0}" PY_EXPORT="$HOME/python_scripts/tanada_export.py" PY_PIPE="$HOME/python_scripts/tanada_pipeline.py" PY_NOTIFY="$HOME/python_scripts/tanada_notify.py" # 任意の補助処理(無ければスキップ) KEEP10_SH="/Users/xxxxxxxxm1/scripts/tanada_keep10_plan.sh" TANADA_CSV="$HOME/das/config/tanada.csv" PLAN_DB="$HOME/das/meta/plan_${RUN_ID}__tanada_items_ALL.sqlite" main() { log "START run_id=$RUN_ID" # 1) tanada sheet -> tanada.csv log "EXPORT tanada.csv" python3 "$PY_EXPORT" \ --cred-json "$DAS_CRED_JSON" \ --spreadsheet-id "$DAS_SHEET_ID" \ --sheet-name "tanada" \ --out-csv "$TANADA_CSV" # 2) scan/analyze (ALL) log "PIPELINE simulate (metadata only)" python3 "$PY_PIPE" --run-id "$RUN_ID" --simulate-only # 2.5) 任意: plan DB の世代整理(無ければスキップ) if [[ -x "$KEEP10_SH" ]]; then log "KEEP10 plan DB (optional): $KEEP10_SH" "$KEEP10_SH" || log "[WARN] keep10 returned non-zero (ignored)" else log "[WARN] keep10 script not found or not executable, skip: $KEEP10_SH" fi # 3) notify (sheet + mail) log "NOTIFY (write sheet + mail)" if [[ "$PRINT_ONLY" == "1" ]]; then python3 "$PY_NOTIFY" \ --plan-db "$PLAN_DB" \ --print-only \ --mode simulate_only \ --write-sheet \ --cred-json "$DAS_CRED_JSON" \ --spreadsheet-id "$DAS_SHEET_ID" \ --sheet-name "action_result" \ --summary-cell "A6" \ --dbpath-cell "A8" else python3 "$PY_NOTIFY" \ --plan-db "$PLAN_DB" \ --mode simulate_only \ --write-sheet \ --cred-json "$DAS_CRED_JSON" \ --spreadsheet-id "$DAS_SHEET_ID" \ --sheet-name "action_result" \ --summary-cell "A6" \ --dbpath-cell "A8" fi log "END" } main ```