# [メタ情報] # 識別子: マイライブラリに基づく4jsonのM1MacからXserverへの更新_exe # システム名: 未分類 # 技術種別: Misc # 機能名: Misc # 使用言語: Python plist # 状態: 公開用 # [/メタ情報] 要約: 提示されたファイル群は、Google DriveとGoogleスプレッドシートからWordPress用メタデータを生成し、リモートサーバーへ自動同期させるシステムを構築しています。 具体的には、`sync_dropbox_wp_library.py`がGoogle DriveからJSONファイルを、`generate_videoct_json.py`、`generate_videoembed_json.py`、`generate_alt_map_json.py`がGoogleスプレッドシートから動画関連情報(コンテンツ、埋め込みコード、代替ID)をそれぞれ抽出し、ローカルにJSONファイルを生成します。 これらのJSONファイルは、`upload_secure_json.py`によって内容の差分がチェックされ、変更があった場合のみSSH/SCPでリモートサーバーの`_secure/wp_json`ディレクトリにセキュアにアップロードされます。アップロードされたファイルはWordPressが読み取れるよう適切な権限(644)に設定されます。 macOSのLaunchAgent設定ファイル`com.XXXXXX.push-meta.plist`は、この一連のプロセスを起動する`upload_secure_json.py`を30秒ごとに自動実行し、WordPress環境のメタデータを常に最新の状態に保つ役割を担っています。 <任意のフォルダパス>/sync_dropbox_wp_library.py sync_dropbox_wp_library.py ``` #!/usr/bin/env python3 import os, sys, io, json, tempfile from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload from dotenv import load_dotenv load_dotenv("<任意のフォルダパス>") FILE_ID = os.environ.get("PUSH_META_LIB_JSON_ID") META_DIR = os.environ.get("META_DIR", os.path.expanduser("~/meta_gd_wp_data")) OUT_PATH = os.path.join(META_DIR, "dropbox_wp_library.json") def main(): if not FILE_ID: print("[sync] PUSH_META_LIB_JSON_ID が未設定です。", file=sys.stderr); sys.exit(2) creds_path = os.environ.get("PUSH_META_SA_JSON") or os.environ.get("<固有のランダム文字列>") if not creds_path or not os.path.isfile(creds_path): print("[sync] PUSH_META_SA_JSON (or GOOGLE_APPLICATION_CREDENTIALS) が不正です。", file=sys.stderr); sys.exit(2) os.makedirs(META_DIR, exist_ok=True) creds = service_account.Credentials.from_service_account_file( creds_path, scopes=["https://www.googleapis.com/auth/drive.readonly"] ) svc = build("drive", "v3", credentials=creds, cache_discovery=False) meta = svc.files().get(fileId=FILE_ID, fields="id,name,modifiedTime,md5Checksum").execute() stamp_path = OUT_PATH + ".stamp.json" old = {} if os.path.exists(stamp_path): try: with open(stamp_path) as f: old = json.load(f) except Exception: pass if old.get("md5Checksum") == meta.get("md5Checksum"): print("[sync] 変更なし:", meta["modifiedTime"]); return req = svc.files().get_media(fileId=FILE_ID) buf = io.BytesIO() downloader = MediaIoBaseDownload(buf, req, chunksize=1024*1024) done = False while not done: status, done = downloader.next_chunk() if status: print(f"[sync] downloading {int(status.progress()*100)}%") data = buf.getvalue() try: json.loads(data.decode("utf-8")) except Exception as e: print("[sync] JSON decode error:", e, file=sys.stderr); sys.exit(1) fd, tmp = tempfile.mkstemp(prefix="libjson_", suffix=".json", dir=META_DIR) with os.fdopen(fd, "wb") as f: f.write(data) os.replace(tmp, OUT_PATH) with open(stamp_path, "w") as f: json.dump({"md5Checksum": meta.get("md5Checksum"), "modifiedTime": meta.get("modifiedTime")}, f, ensure_ascii=False) print("[sync] 更新完了:", OUT_PATH, meta.get("modifiedTime")) if __name__ == "__main__": main() ``` <任意のフォルダパス>/generate_videoct_json.py generate_videoct_json.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- import os, json, gspread from google.oauth2.service_account import Credentials from dotenv import load_dotenv load_dotenv("<任意のフォルダパス>") OUTPUT_PATH = "<任意のフォルダパス>/videoct.json" def generate_videoct_json(): creds = Credentials.from_service_account_file(os.getenv("PUSH_META_SA_JSON"), scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"]) gc = gspread.authorize(creds) ws = gc.open_by_key(os.getenv("PUSH_META_SHEET_ID")).worksheet("シート1") all_rows = ws.get_all_records() output = [] for row in all_rows: vid = str(row.get("動画id", "")).strip() # ★ 必要な項目だけを厳選(これ以外の列がシートで更新されても無視されます) if row.get("公開状況") == "公開" and vid: item = { "videoid": vid, "image": str(row.get("初期画像ファイルURL", "")), "video": str(row.get("動画ファイルURL", "")), "video2": str(row.get("低画質動画ファイルURL", "")), "subtitle": str(row.get("字幕ファイルURL_vtt", "")), "subtitle_list": str(row.get("字幕一覧ファイルURL_txt", "")), "explain_line": str(row.get("説明ファイルURL_txt", "")) } output.append(item) # ID順に並び替え output.sort(key=lambda x: x["videoid"]) os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) # ★ 1. 新しく書き込む予定のJSONテキストをメモリ上に作成 new_json_str = json.dumps(output, ensure_ascii=False, indent=2, sort_keys=True) # ★ 2. 既存のファイルの中身を読み込む old_json_str = "" if os.path.exists(OUTPUT_PATH): with open(OUTPUT_PATH, "r", encoding="utf-8") as f: old_json_str = f.read() # ★ 3. 中身に変化があった時だけ「上書き保存」を実行する if new_json_str != old_json_str: with open(OUTPUT_PATH, "w", encoding="utf-8") as f: f.write(new_json_str) print(f"[videoct] wrote: {OUTPUT_PATH} ({len(output)} items)") else: print(f"[videoct] no changes. skipped.") if __name__ == "__main__": generate_videoct_json() ``` geberate_videoembed_json.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- import os, json, gspread from google.oauth2.service_account import Credentials from dotenv import load_dotenv load_dotenv("<任意のフォルダパス>") OUTPUT_PATH = "<任意のフォルダパス>/videoembed.json" def generate_videoembed_json(): creds = Credentials.from_service_account_file(os.getenv("PUSH_META_SA_JSON"), scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"]) gc = gspread.authorize(creds) ws = gc.open_by_key(os.getenv("PUSH_META_SHEET_ID")).worksheet("シート1") all_data = ws.get_all_values() if not all_data or len(all_data) <= 1: return output = [] header = all_data[0] # 列の位置を特定 def get_idx(name): return header.index(name) if name in header else -1 idx_pub = get_idx("公開状況") idx_vid = get_idx("動画id") idx_emb = get_idx("WordPress埋め込みコード") idx_v2 = get_idx("低画質動画ファイルURL") for row in all_data[1:]: if idx_pub == -1 or row[idx_pub] != "公開": continue vid = str(row[idx_vid]).strip() if idx_vid != -1 else "" emb = str(row[idx_emb]).strip() if idx_emb != -1 else "" if vid and emb: item = {"videoid": vid, "embedCode": emb} if idx_v2 != -1 and row[idx_v2].strip(): item["video2"] = str(row[idx_v2]).strip() output.append(item) # ソートして固定 output.sort(key=lambda x: x["videoid"]) os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) # ★ 1. 新しく書き込む予定のJSONテキストをメモリ上に作成 new_json_str = json.dumps(output, ensure_ascii=False, indent=2, sort_keys=True) # ★ 2. 既存のファイルの中身を読み込む old_json_str = "" if os.path.exists(OUTPUT_PATH): with open(OUTPUT_PATH, "r", encoding="utf-8") as f: old_json_str = f.read() # ★ 3. 中身に変化があった時だけ「上書き保存」を実行する if new_json_str != old_json_str: with open(OUTPUT_PATH, "w", encoding="utf-8") as f: f.write(new_json_str) print(f"[videoembed] wrote: {OUTPUT_PATH}") else: print(f"[videoembed] no changes. skipped.") if __name__ == "__main__": generate_videoembed_json() ``` upload_secure_json.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- import os, sys, shlex, json, hashlib, random, string import subprocess as sp from pathlib import Path from typing import Optional, Tuple from dotenv import load_dotenv load_dotenv("<任意のフォルダパス>") HOME = Path.home() META_DIR = Path(os.environ.get("META_DIR", str(HOME / "meta_gd_wp_data"))) REMOTE_DIR = "<任意のフォルダパス>/XXXXXX.com/public_html/_secure/wp_json" SSH_HOST = os.environ.get("SSH_HOST", "xsrv") SSH_OPTS = shlex.split(os.environ.get("SSH_OPTS", "").strip()) # ★ すり替え表を追加 TARGETS = [ "videoembed.json", "videoct.json", "alt_video_map.json", "dropbox_wp_library.json", ] # --- 以下、内部処理用関数 --- def _run(cmd: list[str]) -> int: try: res = sp.run(cmd, check=True) return res.returncode except sp.CalledProcessError as e: print(f"[error] コマンド失敗: {e}", file=sys.stderr) return e.returncode def _run_capture(cmd: list[str]) -> Tuple[int, str, str]: try: res = sp.run(cmd, check=True, stdout=sp.PIPE, stderr=sp.PIPE, text=True) return res.returncode, res.stdout.strip(), res.stderr.strip() except sp.CalledProcessError as e: return e.returncode, (e.stdout or "").strip(), (e.stderr or "").strip() def _ssh(cmd_str: str) -> int: cmd = ["ssh", *SSH_OPTS, SSH_HOST, f"LC_ALL=C LANG=C {cmd_str}"] rc, out, err = _run_capture(cmd) return rc def _ssh_out(cmd_str: str) -> Tuple[int, str]: cmd = ["ssh", *SSH_OPTS, SSH_HOST, f"LC_ALL=C LANG=C {cmd_str}"] rc, out, err = _run_capture(cmd) return rc, out def _scp(local: Path, remote_tmp: str) -> int: cmd = ["scp", *SSH_OPTS, str(local), f"{SSH_HOST}:{remote_tmp}"] return _run(cmd) def sha256_local(path: Path) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024*1024), b""): h.update(chunk) return h.hexdigest() def sha256_remote(base_name: str) -> Optional[str]: remote_path = f"{REMOTE_DIR}/{base_name}" cmd = f"f={shlex.quote(remote_path)}; if [ ! -f \"$f\" ]; then echo '__NOFILE__'; exit 0; fi; sha256sum \"$f\" | awk '{{print $1}}'" rc, out = _ssh_out(cmd) if rc != 0 or out.strip() == "__NOFILE__": return None return out.strip() def upload_one(local_file: Path, base_name: str) -> bool: local_hash = sha256_local(local_file) remote_hash = sha256_remote(base_name) if remote_hash == local_hash: return False tag = "".join(random.choices(string.digits, k=5)) remote_tmp = f"{REMOTE_DIR}/.__tmp_{tag}_{base_name}" remote_final = f"{REMOTE_DIR}/{base_name}" if _scp(local_file, remote_tmp) == 0: # ★ 反映+権限設定 (chmod 600 から 644 へ変更) # これにより、WordPress(PHP)がファイルを読み取れるようになります _ssh(f"mv -f {shlex.quote(remote_tmp)} {shlex.quote(remote_final)} && chmod 644 {shlex.quote(remote_final)}") print(f"[write] {base_name} updated.") return True return False # --- ★ 大動脈を流すための全生成処理 --- def generate_all_jsons(): scripts = [ "sync_dropbox_wp_library.py", "generate_videoct_json.py", "generate_videoembed_json.py", "generate_alt_map_json.py" ] py_bin = "/usr/bin/python3" script_dir = HOME / "python_scripts" for s in scripts: path = script_dir / s if path.exists(): print(f"[gen] Running {s}...") _run([py_bin, str(path)]) def main() -> int: # 1. すべての最新JSONを生成・取得 generate_all_jsons() # 2. 差分をチェックしてアップロード updated = 0 for name in TARGETS: p = META_DIR / name if p.exists() and upload_one(p, name): updated += 1 print(f"[done] Update complete. ({updated} files changed)") return 0 if __name__ == "__main__": sys.exit(main()) ``` <任意のフォルダパス>/generate_alt_map_json.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- import os, json, gspread from google.oauth2.service_account import Credentials from dotenv import load_dotenv load_dotenv("<任意のフォルダパス>") OUTPUT_PATH = "<任意のフォルダパス>/alt_video_map.json" def generate_alt_map_json(): creds = Credentials.from_service_account_file(os.getenv("PUSH_META_SA_JSON"), scopes=["https://www.googleapis.com/auth/spreadsheets.readonly"]) gc = gspread.authorize(creds) ws = gc.open_by_key(os.getenv("PUSH_META_SHEET_ID")).worksheet("動画id対照表") data = ws.get_all_records() output = [] for row in data: vid = str(row.get("動画id", "")).strip() aid = str(row.get("代替動画id", "")).strip() if vid and aid: output.append({"動画id": vid, "代替動画id": aid}) output.sort(key=lambda x: x["動画id"]) os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) # ★ 1. 新しく書き込む予定のJSONテキストをメモリ上に作成 new_json_str = json.dumps(output, ensure_ascii=False, indent=2, sort_keys=True) # ★ 2. 既存のファイルの中身を読み込む old_json_str = "" if os.path.exists(OUTPUT_PATH): with open(OUTPUT_PATH, "r", encoding="utf-8") as f: old_json_str = f.read() # ★ 3. 中身に変化があった時だけ「上書き保存」を実行する if new_json_str != old_json_str: with open(OUTPUT_PATH, "w", encoding="utf-8") as f: f.write(new_json_str) print(f"[alt_map] wrote: {OUTPUT_PATH}") else: print(f"[alt_map] no changes. skipped.") if __name__ == "__main__": generate_alt_map_json() ``` com.XXXXXX.push-meta.plist <任意のフォルダパス>/com.XXXXXX.push-meta.plist ``` EnvironmentVariables PUSH_META_SA_JSON <任意のフォルダパス>/service.json PUSH_META_LIB_JSON_ID META_DIR <任意のフォルダパス> PATH /usr/local/bin:/opt/homebrew/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Library/Developer/CommandLineTools/usr/bin SSH_OPTS SYNC_GDRIVE true Label com.XXXXXX.push-meta ProgramArguments /bin/zsh -lc /usr/bin/python3 <任意のフォルダパス>/upload_secure_json.py RunAtLoad StandardErrorPath <任意のフォルダパス>/push_meta.err.log StandardOutPath <任意のフォルダパス>/push_meta.out.log StartInterval 30 WorkingDirectory <任意のフォルダパス> ```