# [メタ情報] # 識別子: F1_2更新によりjson生成_python移行_exe # システム名: F1_2(動画関係付け)更新によりjson生成_python移行 # 技術種別: Misc # 機能名: Misc # 使用言語: Python # 状態: 実行用 # [/メタ情報] M1 Mac mini [F1_2 名前関係付け] ──(K列=TRUE or /tmp/force_kick)──▶ kick_flags_embed.py │ │(1. トリガ検知・ロック管理・順次実行制御) ▼ ┌───────────────────────────────┐ │ sync_video_embedcode.py │ │ - F1_2 → F2 embedCode反映 │ │ - 「F2反映完了: changed=...」│ └───────────────────────────────┘ │ ▼ ┌───────────────────────────────┐ │ generate_videoembed_json.py │ │ - F2 → pytest_videoembed.json│ │ - Mac側でファイル生成 │ └───────────────────────────────┘ │ ▼ ┌───────────────────────────────┐ │ upload_to_xserver.py │ │ - scpで .tmp転送 → mv -f確定 │ │ - touch -mでmtime更新 │ │ - ls -lhで検証ログを残す │ └───────────────────────────────┘ kick_flags_embed.py /Users/xxxxxxxxm1/python_scripts/kick_flags_embed.py # -*- coding: utf-8 -*- """ kick_flags_embed.py - F1_2(シート: 名前関係付け)を 20 秒ごとに監視(短縮) - K列(PY更新フラグ)が TRUE の行が1つでもあれば着火 - 実行順: 1) sync_video_embedcode() … F1_2→F2 へ embedCode を反映 2) generate_videoembed_json() … F2 から pytest_videoembed.json を生成 3) upload_to_xserver() … pytest_videoembed.json を Xserver に反映 - 依存: pip install gspread google-auth requests - サービスアカウントを関連スプレッドシートに共有(F1_2:閲覧, F2:編集)しておく """ import warnings warnings.filterwarnings("ignore", category=UserWarning) import os import sys import time import random import logging import fcntl import gspread from google.oauth2.service_account import Credentials from requests.exceptions import RequestException # ========= 設定 ========= SA_JSON_PATH = "/Users/xxxxxxxxm1/keys/service.json" SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly"] F12_SPREADSHEET_ID = "" F12_SHEET_NAME = "" FLAG_COL_INDEX_1BASED = 11 # K列 = 11(PY更新フラグ) INTERVAL_SECONDS = int(os.environ.get("INTERVAL_SECONDS", "20")) # ← 60秒 → 20秒に短縮 LOG_PATH = os.path.expanduser("~/Library/Logs/kick_flags_embed.log") LOCK_PATH = "/tmp/kick_flags_embed.lock" # ========= 外部モジュール ========= from sync_video_embedcode import sync_video_embedcode from generate_videoembed_json import generate_videoembed_json from upload_to_xserver import upload_to_xserver # ========= ユーティリティ ========= def _normalize_bool(s: str) -> str: if s is None: return "" return str(s).strip().lower().replace(" ", "") TRUE_TOKENS = {"true", "1", "yes", "on", "y", "t", "✓", "✔"} def fetch_f12_rows(gc=None): for attempt in range(3): try: if gc is None: creds = Credentials.from_service_account_file(SA_JSON_PATH, scopes=SCOPES) gc = gspread.authorize(creds) ws = gc.open_by_key(F12_SPREADSHEET_ID).worksheet(F12_SHEET_NAME) rows = ws.get_all_values() if not rows: raise ValueError("シートが空です。") return rows except Exception as e: logging.warning(f"シート取得失敗(試行 {attempt+1}/3): {e}") time.sleep(5) raise RuntimeError("F1_2 の取得に3回失敗しました。") def _any_flag_true(rows, flag_col_1based: int) -> bool: if not rows: return False flag_ix = flag_col_1based - 1 for r in rows[1:]: if len(r) <= flag_ix: continue if _normalize_bool(r[flag_ix]) in TRUE_TOKENS: return True return False def _acquire_lock(lock_path: str): fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o644) fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) return fd # ========= メイン処理 ========= _logged_header_once = False def run_once(gc): global _logged_header_once rows = fetch_f12_rows(gc) force = os.path.exists("/tmp/force_kick") if not _logged_header_once: logging.info(f"debug: header_cols={len(rows[0]) if rows else 0}") _logged_header_once = True if not _any_flag_true(rows, FLAG_COL_INDEX_1BASED) and not force: logging.info("(フラグなし)") return if force: logging.info("⚡ 強制キック: /tmp/force_kick を検出しました。") try: os.remove("/tmp/force_kick") except Exception: pass logging.info("✅ 処理開始:F1_2→F2反映→JSON生成→Xserver反映") try: changed = sync_video_embedcode() logging.info(f"F2反映完了: changed={changed}") except Exception as e: logging.exception(f"❌ sync_video_embedcode() 失敗: {e}") return try: json_path = generate_videoembed_json() logging.info(f"JSON生成完了: {json_path}") except Exception as e: logging.exception(f"❌ JSON生成失敗: {e}") return try: upload_to_xserver(json_path) logging.info("✅ Xserver反映完了") except Exception as e: logging.exception(f"❌ Xserverアップロード失敗: {e}") # ========= ループ ========= def main_loop(): try: lock_fd = _acquire_lock(LOCK_PATH) except OSError: return creds = Credentials.from_service_account_file(SA_JSON_PATH, scopes=SCOPES) gc = gspread.authorize(creds) try: while True: try: run_once(gc) except RequestException as e: logging.warning(f"通信エラー: {e}") except Exception as e: logging.exception(f"エラー: {e}") jitter = random.uniform(-3, 3) time.sleep(max(5, INTERVAL_SECONDS + jitter)) finally: try: os.close(lock_fd) os.remove(LOCK_PATH) except Exception: pass # ========= 実行部 ========= if __name__ == "__main__": os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) logging.basicConfig( filename=LOG_PATH, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) logging.info("=== kick_flags_embed 起動 ===") logging.info(f"python: {sys.executable}") main_loop() sync_video_embedcode.py /Users/xxxxxxxxm1/python_scripts/sync_video_embedcode.py # ~/python_scripts/sync_video_embedcode.py # -*- coding: utf-8 -*- """ F1_2(名前関係付け) の K列(PY更新フラグ) が TRUE の行を拾い、 対応する「動画所在場所」グループの埋め込みコード(F列)を F2(シート1) の S列(WordPress埋め込みコード)へ反映する。 完了後、F1_2 の当該行: - K列(PY更新フラグ)を FALSE に戻す - J列(ステータス)に "PY DONE @ " を書く 戻り値: 変更があれば True, なければ False """ from __future__ import annotations import datetime as _dt import gspread from google.oauth2.service_account import Credentials # === Google Sheets 定義 === SA_JSON_PATH = "/Users/xxxxxxxxm1/keys/service.json" SCOPES = ["https://www.googleapis.com/auth/spreadsheets"] # F1_2 F12_SPREADSHEET_ID = "" F12_SHEET_NAME = "" # 列インデックス(1-based) F12_COL_A_GROUP = 1 # A: 動画所在場所 F12_COL_F_EMBED = 6 # F: 埋め込みコード F12_COL_J_STATUS = 10 # J: ステータス F12_COL_K_PYFLAG = 11 # K: PY更新フラグ # F2 F2_SPREADSHEET_ID = "" F2_SHEET_NAME = "" # 列インデックス(1-based) F2_COL_E_GROUP = 5 # E: 動画所在場所 F2_COL_S_EMBED = 19 # S: WordPress埋め込みコード def _auth(): creds = Credentials.from_service_account_file(SA_JSON_PATH, scopes=SCOPES) return gspread.authorize(creds) def sync_video_embedcode() -> bool: gc = _auth() # --- F1_2 読み込み --- ws_f12 = gc.open_by_key(F12_SPREADSHEET_ID).worksheet(F12_SHEET_NAME) f12_values = ws_f12.get_all_values() if not f12_values: return False # 1行目はヘッダ想定 # K列が TRUE の行だけ抽出(空白/短行はスキップ) targets = [] # (row_idx, group, embed) for i, row in enumerate(f12_values[1:], start=2): # シート上の行番号 = i if len(row) < max(F12_COL_A_GROUP, F12_COL_F_EMBED, F12_COL_K_PYFLAG): continue pyflag = str(row[F12_COL_K_PYFLAG-1]).strip().lower() if pyflag in ("true", "1", "yes", "on"): group = row[F12_COL_A_GROUP-1].strip() embed = row[F12_COL_F_EMBED-1] if group: targets.append((i, group, embed)) if not targets: return False # --- F2 読み込み --- ws_f2 = gc.open_by_key(F2_SPREADSHEET_ID).worksheet(F2_SHEET_NAME) f2_values = ws_f2.get_all_values() # F2 側の「動画所在場所(E列)」→ 行番号のインデックスを構築 group_to_rows_f2 = {} for j, row in enumerate(f2_values[1:], start=2): if len(row) >= F2_COL_E_GROUP: g = row[F2_COL_E_GROUP-1].strip() if g: group_to_rows_f2.setdefault(g, []).append(j) # --- 反映計画を作る --- updates_f2 = [] # (row, col, value) updates_f12 = [] # (row, col, value) : K を FALSE, J にステータス timestamp = _dt.datetime.now().astimezone().isoformat(timespec="seconds") for row_idx_f12, group, embed in targets: rows_in_f2 = group_to_rows_f2.get(group, []) if not rows_in_f2: # F2 に該当グループが無い場合はスキップ(必要ならログ化) # ここでは F1_2 のフラグは下げないでおく方が安全 continue # F2 の該当行すべての S列を更新 for r in rows_in_f2: updates_f2.append((r, F2_COL_S_EMBED, embed)) # F1_2 のフラグを落とし、J列にDONEを書く updates_f12.append((row_idx_f12, F12_COL_K_PYFLAG, "FALSE")) updates_f12.append((row_idx_f12, F12_COL_J_STATUS, f"PY DONE @ {timestamp}")) if not updates_f2 and not updates_f12: return False # --- バッチ更新 --- def _to_a1(r, c): # 1-based → A1 表記 # 例: c=1 -> A, 26 -> Z, 27 -> AA letters = "" while c: c, rem = divmod(c-1, 26) letters = chr(65 + rem) + letters return f"{letters}{r}" # F2 更新 if updates_f2: data = [] for r, c, v in updates_f2: rng = f"{_to_a1(r, c)}:{_to_a1(r, c)}" data.append({"range": rng, "values": [[v]]}) ws_f2.batch_update(data) # F1_2 更新(K=FALSE と J=ステータス) if updates_f12: data = [] for r, c, v in updates_f12: rng = f"{_to_a1(r, c)}:{_to_a1(r, c)}" data.append({"range": rng, "values": [[v]]}) ws_f12.batch_update(data) return True generate_videoembed_json.py /Users/xxxxxxxxm1/python_scripts/generate_videoembed_json.py # /Users/xxxxxxxxm1/python_scripts/generate_videoembed_json.py # -*- coding: utf-8 -*- """ 完全版:GAS版 videoembed.json と同一仕様・同一フォーマットで出力 ----------------------------------------------------- - 元仕様: A列 (0): 公開/非公開 → "公開" の行のみ対象 C列 (2): 動画ID → videoid S列 (18): 埋め込みコード → embedCode W列 (22): 低画質URL → video2 (任意) - その他の列は無視 - embedCode, videoid が空行は除外 - 出力: /Users/xxxxxxxxm1/meta_gd_wp_data/pytest_videoembed.json - 構造: JSON配列(各要素 = {videoid, embedCode, (video2)}) - 整形: インデント2、改行あり、UTF-8 / ensure_ascii=False """ import os import json import gspread from google.oauth2.service_account import Credentials # ======= 基本設定 ======= SHEET_ID = "" SHEET_TAB = "" SERVICE_JSON = "/Users/xxxxxxxxm1/keys/service.json" OUTPUT_PATH = "/Users/xxxxxxxxm1/meta_gd_wp_data/pytest_videoembed.json" # ======= 列インデックス (0-based) ======= COL_A_PUBLISH = 0 # 公開/非公開 COL_C_VIDEOID = 2 # 動画ID COL_S_EMBED = 18 # WordPress埋め込みコード COL_W_VIDEO2 = 22 # 低画質動画URL (任意) # ======= 認証 ======= SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly"] creds = Credentials.from_service_account_file(SERVICE_JSON, scopes=SCOPES) gc = gspread.authorize(creds) # ======= 関数群 ======= def _safe_get(row, idx): """行データの中から安全にインデックス参照""" return row[idx] if idx < len(row) else "" def _to_int_or_str(v): """GAS同様、数値化可能ならint、そうでなければ文字列""" s = str(v).strip() if not s: return None try: i = int(float(s)) return i except Exception: return s def _is_valid_embed(embed): """GAS版の空・null・空文字チェック""" if embed is None: return False e = str(embed).strip() if e in ("", "null", '""'): return False return True def generate_videoembed_json(): # === シート読み取り === ws = gc.open_by_key(SHEET_ID).worksheet(SHEET_TAB) all_rows = ws.get_all_values() if not all_rows or len(all_rows) <= 1: print("[generate_videoembed_json] no data") with open(OUTPUT_PATH, "w", encoding="utf-8") as f: json.dump([], f, ensure_ascii=False, indent=2) return OUTPUT_PATH # === データ構築 === output = [] for row in all_rows[1:]: # 0行目はヘッダ # 列取得(存在しない列は空文字で補う) publish = _safe_get(row, COL_A_PUBLISH).strip() videoid_raw = _safe_get(row, COL_C_VIDEOID).strip() embed = _safe_get(row, COL_S_EMBED) video2 = _safe_get(row, COL_W_VIDEO2).strip() # 公開判定 if publish != "公開": continue # embedCode / videoid 必須チェック if not _is_valid_embed(embed) or videoid_raw == "": continue videoid_val = _to_int_or_str(videoid_raw) item = { "videoid": videoid_val, "embedCode": embed.strip(), } if video2: item["video2"] = video2 output.append(item) # === JSON出力 === os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) tmp_path = OUTPUT_PATH + ".tmp" with open(tmp_path, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) os.replace(tmp_path, OUTPUT_PATH) print(f"[generate_videoembed_json] wrote: {OUTPUT_PATH} (items={len(output)})") return OUTPUT_PATH # ======= メイン実行 ======= if __name__ == "__main__": generate_videoembed_json() upload_to_xserver.py /Users/xxxxxxxxm1/python_scripts/upload_to_xserver.py # /Users/xxxxxxxxm1/python_scripts/upload_to_xserver.py # -*- coding: utf-8 -*- import os import shlex import logging import subprocess import time from pathlib import Path # ==== あなたの環境 ==== XS_USER = "xxxxxxxx" XS_HOST = "xxxxxxxx.xsrv.jp" XS_SSH_PORT = 10022 SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_xserver") # パスフレーズ無し鍵 DEST_DIR = "/home/xxxxxxxx/xxxxxxxx.com/public_html/wp-content/uploads/gd_wp_data" # ===================== def _ssh_base() -> list[str]: """ssh 共通オプション(鍵明示、known_hosts 自動登録)""" cmd = [ "ssh", "-p", str(XS_SSH_PORT), "-o", "StrictHostKeyChecking=accept-new", "-o", "UserKnownHostsFile=~/.ssh/known_hosts", ] if SSH_KEY_PATH: cmd += ["-i", SSH_KEY_PATH] return cmd def _run_quiet(cmd: list[str]) -> None: """成功時は無音、失敗時は例外""" logging.info("RUN: %s", " ".join(shlex.quote(c) for c in cmd)) subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def _run_capture(cmd: list[str]) -> str: """標準出力を取得(検証用)""" logging.info("RUN: %s", " ".join(shlex.quote(c) for c in cmd)) out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) return out.decode("utf-8", errors="replace").strip() def upload_to_xserver(local_json_path: str, remote_name: str | None = None) -> None: """ 常に上書き(内容同一でも転送する)。 1) scpで {name}.tmp にアップロード 2) サーバ側で mv -f tmp → 本番名 + touch -m でmtime更新(原子的反映) 3) 反映後にサーバ側のサイズと時刻を取得し、ログへ出力(検証) """ src = Path(local_json_path) if not src.exists(): raise FileNotFoundError(f"local file not found: {src}") if SSH_KEY_PATH and not Path(SSH_KEY_PATH).exists(): raise FileNotFoundError(f"SSH key not found: {SSH_KEY_PATH}") base = src.name if remote_name is None else remote_name tmp_name = base + ".tmp" final_name = base userhost = f"{XS_USER}@{XS_HOST}" remote_tmp = f"{DEST_DIR}/{tmp_name}" remote_final = f"{DEST_DIR}/{final_name}" # 1) 転送先ディレクトリを必ず作成 _run_quiet(_ssh_base() + [userhost, "mkdir", "-p", DEST_DIR]) # 2) 毎回 scp で tmp に上書き転送(3回までリトライ) scp_cmd = [ "scp", "-P", str(XS_SSH_PORT), "-q", "-o", "StrictHostKeyChecking=accept-new", "-o", "UserKnownHostsFile=~/.ssh/known_hosts", ] if SSH_KEY_PATH: scp_cmd += ["-i", SSH_KEY_PATH] scp_cmd += [str(src), f"{userhost}:{remote_tmp}"] for attempt, delay in enumerate((1, 3, 7), start=1): try: _run_quiet(scp_cmd) break except subprocess.CalledProcessError as e: logging.warning(f"scp failed (attempt {attempt}): {e}") time.sleep(delay) else: raise RuntimeError("scp failed after retries") # 3) 原子的置換 + mtime 更新 ssh_mv_touch = _ssh_base() + [ userhost, "bash", "-lc", f"set -euo pipefail; " f"mv -f {shlex.quote(remote_tmp)} {shlex.quote(remote_final)}; " f"touch -m {shlex.quote(remote_final)}" ] _run_quiet(ssh_mv_touch) # 4) 反映検証(サイズ&時刻) BSD/Linux両対応 verify_cmd = _ssh_base() + [ userhost, "bash", "-lc", ( f"FILE={shlex.quote(remote_final)};" f"if stat --version >/dev/null 2>&1; then " f" stat -c '%n %s %y' \"$FILE\";" f"elif stat -f '%N %z %Sm' -t '%Y-%m-%d %H:%M:%S' \"$FILE\" >/dev/null 2>&1; then " f" stat -f '%N %z %Sm' -t '%Y-%m-%d %H:%M:%S' \"$FILE\";" f"else " f" ls -lhT \"$FILE\";" f"fi" ), ] result = _run_capture(verify_cmd) logging.info(f"✅ upload done: {src} → {XS_HOST}:{remote_final}") logging.info(f"🧪 remote file after upload: {result}") /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.kickflags.plist Label com.xxxxxxxx.kickflags ProgramArguments /opt/homebrew/bin/python3.11 /Users/xxxxxxxxm1/python_scripts/kick_flags_embed.py RunAtLoad KeepAlive StandardOutPath /Users/xxxxxxxxm1/Library/Logs/kick_flags_embed.log StandardErrorPath /Users/xxxxxxxxm1/Library/Logs/kick_flags_embed.log EnvironmentVariables INTERVAL_SECONDS 20