# [メタ情報] # 識別子: Vimeo動画処理パイプライン_py_exe # システム名: 動画VimeoWorker_gasからpython移行 # 技術種別: Misc # 機能名: Misc # 使用言語: Python plist # 状態: 公開用 # [/メタ情報] 要約: 提供されたテキストは、`vimeo_worker.py`というPythonスクリプトと、その自動実行を設定する`com.XXXXXX.vimeo-worker.plist`ファイルに関するものです。 `vimeo_worker.py`は、Vimeo動画処理パイプライン(動画のアップロード/置換、サムネイル設定、字幕アップロード、埋め込みJSON生成)を単一ファイルで実行するPythonスクリプトです。Google Sheetsをタスク管理に利用し、Vimeo APIと連携して各ステップを処理します。一度に最大1件のタスクを短時間で処理し、`launchd`のような短周期トリガで繰り返し呼び出されることを想定しています。排他制御により多重起動を防ぎ、生成された埋め込み用JSONファイルを本番環境のデータと比較し、差分がない場合または合格した場合のみ本番サーバへPOSTしてデータの整合性を保ちます。 `com.XXXXXX.vimeo-worker.plist`は、macOSの`launchd`エージェント設定ファイルで、上記の`vimeo_worker.py`スクリプトを自動的に実行するためのものです。この設定により、スクリプトは1分間隔で起動され、`--max-items 3 --poll-secs 20`の引数で実行されます。VimeoトークンやGoogleサービスアカウントのJSONパスなどの環境変数を指定し、スクリプトが予期せず終了した場合でも自動的に再起動するよう設定されています。作業ディレクトリやエラーログの出力先も定義されており、安定したバックグラウンド処理を実現します。 <任意のフォルダパス>/vimeo_worker.py ``` #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Python VimeoWorker: runPipelinePoll(動画→サムネ→字幕→embed JSON)互換の単一ファイル実装 - 1回の実行で最大1件のみ処理(短時間・再入可能) - launchd などの短周期トリガで繰り返し呼び出し - pytest_videoembed.json を生成し、本番と軽比較→合格時のみPOST 準備: 1) pip install requests google-api-python-client google-auth google-auth-oauthlib 2) Google API 認証(OAuthクライアント or サービスアカウント)。 3) 下部 CONFIG を編集。 実行例: python3 vimeo_worker.py --max-items 1 --poll-secs 20 """ from __future__ import annotations import os, re, sys, json, time, argparse, fcntl, pathlib, logging from typing import Dict, Any, List, Optional, Tuple import requests from google.oauth2.service_account import Credentials from googleapiclient.discovery import build from dotenv import load_dotenv # ============================= # CONFIG # ============================= load_dotenv("<任意のフォルダパス>") CONFIG: Dict[str, Any] = { # --- Google Sheets --- "GOOGLE_SERVICE_ACCOUNT_JSON": os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "service_account.json"), "PKG_SHEET_ID": os.environ.get("VIMEO_PKG_SHEET_ID"), "PKG_SHEET_NAME": "シート1", # 見出し(列名) "COL": { "JOB": "更新状況", # B: pending→...→step3_finished "VID": "動画id", # C: Vimeo videoId "TITLE": "題名", # D "LOC": "動画所在場所", # E (vimeo/dynamic/youtube) "THUMB_URL": "初期画像ファイルURL", # F(rd.php可) "VIDEO_URL": "動画ファイルURL", # H(rd.php = WP再生URL) "SUB_URL": "字幕ファイルURL_vtt", # J(rd.php可) "NOTE": "補足説明", # Q(進捗・ログ) "PLAY_URL": "再生URL", # R(vm5?movid=...) "CUSTOM": "WordPress埋め込みコード", # S "REPL": "差し替えステータス", # AA "PUB": "publishable", # AB }, # --- Vimeo --- "VIMEO_TOKEN": os.environ.get("VIMEO_TOKEN", ""), "EMBED_DOMAIN": "XXXXXX.com", # --- JSON 出力/比較 --- "PYTEST_OUTDIR": str(pathlib.Path.home()/"pytest_out"), "PROD_EMBED_GET_URL": "https://XXXXXX.com/wp-content/uploads/gd_wp_data/videoembed.json", # 本番の取得先 "EMBED_JSON_POST_URL": os.environ.get("EMBED_JSON_ENDPOINT", ""), # 本番のPOST先(Xserver php) # --- 動作 --- "LOCK_PATH": "/tmp/XXXXXX_vimeo_worker.lock", "LOG_PATH": str(pathlib.Path.home()/"Library/Logs/XXXXXX_vimeo-worker.out"), } # 明示上書き & 環境変数での上書きを許可 CONFIG["PROD_EMBED_GET_URL"] = os.environ.get("PROD_EMBED_GET_URL", CONFIG["PROD_EMBED_GET_URL"]) # ============================= # LOGGING # ============================= pathlib.Path(CONFIG["LOG_PATH"]).parent.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(CONFIG["LOG_PATH"]), logging.StreamHandler(sys.stdout)] ) log = logging.getLogger("worker") # ============================= # Google Sheets I/F # ============================= SCOPES = ["https://www.googleapis.com/auth/spreadsheets"] def sheets_service(): cred_path = CONFIG["GOOGLE_SERVICE_ACCOUNT_JSON"] if not pathlib.Path(cred_path).exists(): raise FileNotFoundError(f"Service account file not found: {cred_path}") creds = Credentials.from_service_account_file(cred_path, scopes=SCOPES) return build("sheets", "v4", credentials=creds, cache_discovery=False) def read_sheet_all(sheet_id: str, sheet_name: str) -> List[List[Any]]: svc = sheets_service() rng = f"{sheet_name}!A1:ZZ" resp = svc.spreadsheets().values().get(spreadsheetId=sheet_id, range=rng).execute() values = resp.get("values", []) return values def _a1_col(col_1based: int) -> str: """1-based列番号をA1書式の列文字(A, B, ... AA)へ""" n = col_1based s = "" while n > 0: n, rem = divmod(n-1, 26) s = chr(65 + rem) + s return s def write_cell(sheet_id: str, sheet_name: str, row_1based: int, col_0based: int, value: Any): """単一セル更新。col_0based は0始まり(ヘッダ配列のindex想定)""" svc = sheets_service() col_letter = _a1_col(col_0based + 1) rng = f"{sheet_name}!{col_letter}{row_1based}" svc.spreadsheets().values().update( spreadsheetId=sheet_id, range=rng, valueInputOption="RAW", body={"values": [[value]]} ).execute() # ============================= # NOTE(補足説明)互換ユーティリティ # ============================= META_RE = re.compile(r"\[\[(.+?)::([^\]]+)\]\]") def now_ts() -> str: import datetime JST = datetime.timezone(datetime.timedelta(hours=9)) return datetime.datetime.now(JST).strftime("%Y/%m/%d %H:%M:%S") def esc_reg(s: str) -> str: return re.sub(r"[.*+?^${}()|[\\]\\]", lambda m: "\\" + m.group(0), s) def q_set_meta(text: str, key: str, val: str) -> str: re_key = re.compile(r"\[\[" + esc_reg(key) + r"::[^\]]*\]\]") t = re_key.sub("", text or "").strip() return (t + (" " if t else "") + f"[[{key}::{val}]]").strip() def q_get_meta(text: str, key: str) -> str: m = re.search(r"\[\[" + esc_reg(key) + r"::([^\]]+)\]\]", text or "") return m.group(1) if m else "" def q_clear_meta_keys(text: str, keys: List[str]) -> str: t = text or "" for k in keys: t = re.sub(r"\[\[" + esc_reg(k) + r"::[^\]]*\]\]", "", t).strip() return t def progress_label(status: str) -> str: if status == "reading": return "読み込み中" if status == "error": return "エラー" if status == "done": return "完了" return "-" def render_progress_head(note: str) -> str: def g(k): return q_get_meta(note, k) s1 = progress_label(g("s1_status")); t1 = g("s1_time") or "-" s2 = progress_label(g("s2_status")); t2 = g("s2_time") or "-" s3 = progress_label(g("s3_status")); t3 = g("s3_time") or "-" return f"Step1: {s1} ({t1}) Step2: {s2} ({t2}) Step3: {s3} ({t3})" def note_set_step_status(head: str, step_no: int, status: str) -> str: ts = now_ts() head = q_set_meta(head, f"s{step_no}_status", status) head = q_set_meta(head, f"s{step_no}_time", ts) return render_progress_head(head) def note_append(head: str, history: str, msg: str) -> Tuple[str, str]: ts = now_ts() add = f"[{ts}] {msg}" new_hist = (add + ("\n" + history if history else "")) return head, new_hist # ============================= # URL 解決(rd.php / Dropbox) # ============================= DROPBOX_HOST = "dropbox.com" def to_dropbox_direct(u: str) -> str: if not u or DROPBOX_HOST not in u: return u s = u.replace("://www.dropbox.com", "://dl.dropboxusercontent.com") s = re.sub(r"[?&](dl|raw)=[01]", "", s) return s + ("&" if "?" in s else "?") + "dl=1" def dereference_once(url: str) -> str: try: r = requests.get(url, allow_redirects=False, timeout=8) if 300 <= r.status_code < 400: return r.headers.get("Location", "") except Exception: pass return "" def resolve_to_raw(url: str) -> str: if not url: return "" s = url.strip() if re.search(r"\.(mp4|mov|m4v|webm)(\?|$)", s, re.I): return to_dropbox_direct(s) if DROPBOX_HOST in s else s if "/rd.php?" in s or "/vm5?" in s: loc = dereference_once(s) if DROPBOX_HOST in loc: loc = to_dropbox_direct(loc) return loc or s if f"{DROPBOX_HOST}/" in s: return to_dropbox_direct(s) return s def parse_vid_from_playurl(play: str) -> str: m = re.search(r"[?&]movid=(\d{6,})", play or "") return m.group(1) if m else "" # ============================= # Vimeo API # ============================= VIMEO_API = "https://api.vimeo.com" class Vimeo: def __init__(self, token: str): if not token: raise RuntimeError("VIMEO_TOKEN not set.") self.h = { "Authorization": f"bearer {token}", "Accept": "application/vnd.vimeo.*+json;version=3.4" } def upload_pull(self, link: str, name: str, desc: str = "") -> str: p = { "file_name": _safe_filename(link, "upload.mp4"), "upload": {"approach": "pull", "link": link}, "name": name, "description": desc, "privacy": {"view": "anybody", "embed": "whitelist"} } r = requests.post(f"{VIMEO_API}/me/videos", headers={**self.h, "Content-Type":"application/json"}, json=p, timeout=30) _raise_with_body(r) return (r.json().get("uri", "").replace("/videos/", "")) def replace_pull(self, video_id: str, link: str): p = {"file_name": _safe_filename(link, f"{video_id}.mp4"), "upload": {"approach":"pull", "link":link}} r = requests.post(f"{VIMEO_API}/videos/{video_id}/versions", headers={**self.h, "Content-Type":"application/json"}, json=p, timeout=30) _raise_with_body(r) def is_playable(self, video_id: str) -> bool: r = requests.get(f"{VIMEO_API}/videos/{video_id}?fields=is_playable", headers=self.h, timeout=10) _raise_with_body(r) return bool(r.json().get("is_playable") is True) def ensure_embed_domain(self, video_id: str, domain: str = "XXXXXX.com", attempts: int = 5, wait: float = 3.0): for i in range(attempts): try: r1 = requests.patch(f"{VIMEO_API}/videos/{video_id}", headers={**self.h, "Content-Type":"application/json"}, json={"privacy": {"embed":"whitelist", "view":"anybody"}}, timeout=15) _raise_with_body(r1) r2 = requests.put(f"{VIMEO_API}/videos/{video_id}/privacy/domains/{domain}", headers=self.h, timeout=10) if r2.status_code < 300: return except Exception as e: if i == attempts - 1: log.warning("ensure_embed_domain exhausted: %s", e) return time.sleep(wait) # Thumbnail def create_picture(self, video_id: str) -> Dict[str, Any]: r = requests.post(f"{VIMEO_API}/videos/{video_id}/pictures", headers={**self.h, "Content-Type":"application/json"}, json={"active": False, "type": "custom"}, timeout=15) _raise_with_body(r) return r.json() def activate_picture(self, uri: str): r = requests.patch(f"{VIMEO_API}{uri}", headers={**self.h, "Content-Type":"application/json"}, json={"active": True}, timeout=10) _raise_with_body(r) # Subtitles def list_texttracks(self, video_id: str) -> List[Dict[str, Any]]: r = requests.get(f"{VIMEO_API}/videos/{video_id}/texttracks", headers=self.h, timeout=15) _raise_with_body(r); j = r.json() return j.get("data", j if isinstance(j, list) else []) def delete_ja_subs(self, video_id: str): for t in self.list_texttracks(video_id): lang = (t.get("language") or "").lower() name = (t.get("name") or "") if t.get("type") == "subtitles" and (lang == "ja" or "日本語" in name or "japanese" in name) and t.get("uri"): requests.delete(f"{VIMEO_API}{t['uri']}", headers=self.h, timeout=10) def create_sub_container(self, video_id: str) -> str: r = requests.post(f"{VIMEO_API}/videos/{video_id}/texttracks", headers={**self.h, "Content-Type":"application/json"}, json={"type":"subtitles","language":"ja","active":True,"name":"Japanese"}, timeout=15) _raise_with_body(r); j = r.json() up = j.get("upload_link") or j.get("link") or j.get("upload_link_secure") if not up: raise RuntimeError("upload_link not found") return up def put_subtitle(self, upload_link: str, vtt_text: str, video_id: str): r = requests.put(upload_link, data=vtt_text.encode("utf-8"), headers={"Content-Type":"text/vtt"}, timeout=30) if r.status_code >= 300: r = requests.put(upload_link, data=vtt_text.encode("utf-8"), headers={"Content-Type":"text/plain"}, timeout=30) _raise_with_body(r) # helpers SAFE_NAME_RE = re.compile(r"[^A-Za-z0-9_.-]") def _safe_filename(url: str, fallback: str) -> str: base = (url or "").split("?")[0].split("/")[-1] or fallback if not re.search(r"\.[A-Za-z0-9]{2,5}$", base): base += ".bin" name = SAFE_NAME_RE.sub("_", base) if len(name) > 64: ext = ("." + name.split(".")[-1]) if "." in name else "" name = name[:64-len(ext)] + ext return name def _raise_with_body(r: requests.Response): if r.status_code >= 300: body = (r.text or "")[:800] raise RuntimeError(f"HTTP {r.status_code}: {body}") # ============================= # VTT sanitize # ============================= TS_RE = r"(?:\d{2}:)?\d{2}:\d{2}\.\d{3}" CUE_RE = re.compile(rf"^\s*(?:.*\S.*\n)?\s*{TS_RE}\s+-->\s+{TS_RE}(?:\s+.*)?$", re.M) def sanitize_vtt(raw: str) -> str: vtt = (raw or "").lstrip("\ufeff").replace("\r\n", "\n").replace("\r", "\n").lstrip() first = vtt.split("\n")[0] if vtt else "" if not re.match(r"^WEBVTT(?:[ \t].*)?$", first): if "-->" in vtt: vtt = "WEBVTT\n\n" + vtt else: raise ValueError("VTT形式ではない可能性(WEBVTT も --> も無し)") else: vtt = re.sub(r"^(WEBVTT[^\n]*\n)(?!\n)", r"\1\n", vtt, count=1, flags=re.M) if not CUE_RE.search(vtt): raise ValueError("有効なタイムスタンプ行が見つかりません") if not vtt.endswith("\n"): vtt += "\n" return vtt # ============================= # 埋め込み JSON: pytest生成・比較・POST # ============================= def build_pytest_embed_json(rows: List[Dict[str, Any]], outdir: str) -> str: data = [] for r in rows: pub = str(r.get("PUB", "")).upper() vid = str(r.get("VID", "")).strip() ply = str(r.get("PLAY_URL", "")).strip() if not vid or not ply: continue if pub == "N": continue data.append({ "videoid": vid, "title": r.get("TITLE", ""), "playUrl": ply, "embedCode": r.get("CUSTOM", "") }) p = pathlib.Path(outdir); p.mkdir(parents=True, exist_ok=True) dst = p/"pytest_videoembed.json" dst.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") return str(dst) def compare_sets(py_path: str, prod_url: str) -> Tuple[bool, Dict[str, Any]]: py = json.loads(pathlib.Path(py_path).read_text(encoding="utf-8")) try: prod = requests.get(prod_url, timeout=15).json() except Exception: prod = [] s_py = {x.get("videoid") for x in py} s_pr = {x.get("videoid") for x in prod} if isinstance(prod, list) else set() added = sorted(s_py - s_pr) removed = sorted(s_pr - s_py) ok = (s_py == s_pr) return ok, {"added": added, "removed": removed, "py_count": len(s_py), "pr_count": len(s_pr)} def post_embed_json(py_path: str, post_url: str): body = pathlib.Path(py_path).read_text(encoding="utf-8") r = requests.post(post_url, data=body.encode("utf-8"), headers={"Content-Type":"application/json"}, timeout=30) _raise_with_body(r) # ============================= # 行選定・列アクセス # ============================= def header_index(header: List[Any], name: str) -> int: want = (name or "").strip() for i, v in enumerate(header): if (v or "").strip() == want: return i return -1 # ============================= # メイン処理(1回で最大1行) # ============================= def process_once(max_items: int, poll_secs: int) -> bool: # ロック取得 fd = os.open(CONFIG["LOCK_PATH"], os.O_CREAT | os.O_RDWR, 0o644) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: log.info("another instance is running; exit") return True try: values = read_sheet_all(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"]) if not values or len(values) < 2: log.info("sheet empty") return True hd = values[0] col = CONFIG["COL"] iJOB = header_index(hd, col["JOB"]) iVID = header_index(hd, col["VID"]) iTTL = header_index(hd, col["TITLE"]) iLOC = header_index(hd, col["LOC"]) iF = header_index(hd, col["THUMB_URL"]) iH = header_index(hd, col["VIDEO_URL"]) iJ = header_index(hd, col["SUB_URL"]) iQ = header_index(hd, col["NOTE"]) iR = header_index(hd, col["PLAY_URL"]) iS = header_index(hd, col["CUSTOM"]) iPUB = header_index(hd, col["PUB"]) needed = [iJOB,iVID,iTTL,iLOC,iF,iH,iJ,iQ,iR,iS,iPUB] if any(x < 0 for x in needed): raise RuntimeError("header not found; please confirm column names") # 対象行選定 target = -1; stage = "" for r in range(1, len(values)): row = values[r] job = str(row[iJOB] or "").lower() loc = str(row[iLOC] or "").lower() if loc != "vimeo": continue if job in ("pending", "step1_pending", "uploading_step1", "encoding_step1"): target = r; stage = "step1"; break if job in ("step1_finished",): target = r; stage = "step2"; break if job in ("step2_finished", "step3_creating", "step3_putting"): target = r; stage = "step3"; break # 仕事がない場合でも pytest を更新 if target < 0: rows = _rows_for_embed(values, hd, iVID, iTTL, iR, iS, iPUB) py_path = build_pytest_embed_json(rows, CONFIG["PYTEST_OUTDIR"]) ok, diff = compare_sets(py_path, CONFIG["PROD_EMBED_GET_URL"]) log.info("no target; pytest=%s prod=%s ok=%s diff=%s", diff.get("py_count"), diff.get("pr_count"), ok, diff) if ok and CONFIG["EMBED_JSON_POST_URL"]: try: post_embed_json(py_path, CONFIG["EMBED_JSON_POST_URL"]) log.info("POST done (noop if same on server side)") except Exception as e: log.warning("POST failed: %s", e) return True R = target + 1 # 1-based row = values[target] # NOTEヘッダ/履歴 cur_note = str(row[iQ] or "") parts = cur_note.split("\n") head = parts[0] if parts else "" hist = "\n".join(parts[1:]) if len(parts)>1 else "" # Vimeoクライアント vimeo = Vimeo(CONFIG["VIMEO_TOKEN"]) if CONFIG["VIMEO_TOKEN"] else None if not vimeo: raise RuntimeError("VIMEO_TOKEN is empty") # === Step1 === if stage == "step1": head = note_set_step_status(head, 1, "reading") head, hist = note_append(head, hist, "Step1: start") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + ("\n"+hist if hist else "")) try: src_play = str(row[iH] or "") raw = resolve_to_raw(src_play) head, hist = note_append(head, hist, f"Step1: resolved URL = {raw}") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) exist_vid = parse_vid_from_playurl(str(row[iR] or "")) if exist_vid: vimeo.replace_pull(exist_vid, raw) vid = exist_vid else: vid = vimeo.upload_pull(raw, name=str(row[iTTL] or "video")) new_play = f"https://XXXXXX.com/vm5?movid={vid}" write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iVID, vid) write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iR, new_play) vimeo.ensure_embed_domain(vid, CONFIG["EMBED_DOMAIN"]) # playable poll until = time.time() + max(3, poll_secs) ok = False while time.time() < until: try: if vimeo.is_playable(vid): ok = True; break except Exception: pass time.sleep(3) if ok: head = note_set_step_status(head, 1, "done") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "step1_finished") else: write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "encoding_step1") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + ("\n"+hist if hist else "")) return True except Exception as e: head = note_set_step_status(head, 1, "error") head, hist = note_append(head, hist, f"Step1 error: {str(e)[:500]}") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "error_step1") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True # === Step2 === if stage == "step2": try: vid = str(row[iVID] or "") or parse_vid_from_playurl(str(row[iR] or "")) if not vid: write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "error_step2") head = note_set_step_status(head, 2, "error"); head, hist = note_append(head, hist, "videoId 不明") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True thumb_src = str(row[iF] or "").strip() if not thumb_src: write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "step2_finished") head = note_set_step_status(head, 2, "done"); head, hist = note_append(head, hist, "サムネURLなし(スキップ)") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True head = note_set_step_status(head, 2, "reading") raw_img = resolve_to_raw(thumb_src) head, hist = note_append(head, hist, f"Step2: resolved thumb = {raw_img or '(empty)'}") pic = vimeo.create_picture(vid) up = pic.get("upload_link") or pic.get("link") or pic.get("upload_link_secure") if up: img = requests.get(raw_img, timeout=15); img.raise_for_status() ct = img.headers.get("Content-Type", "image/jpeg") r = requests.put(up, data=img.content, headers={"Content-Type": ct}, timeout=30) _raise_with_body(r) if pic.get("uri"): vimeo.activate_picture(pic["uri"]) write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "step2_finished") head = note_set_step_status(head, 2, "done") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True except Exception as e: head = note_set_step_status(head, 2, "error") head, hist = note_append(head, hist, f"Step2 error: {str(e)[:500]}") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "error_step2") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True # === Step3 === if stage == "step3": try: vid = str(row[iVID] or "") or parse_vid_from_playurl(str(row[iR] or "")) if not vid: write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "error_step3") head = note_set_step_status(head, 3, "error"); head, hist = note_append(head, hist, "videoId 不明") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True sub_src = str(row[iJ] or "").strip() if not sub_src: write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "step3_finished") head = note_set_step_status(head, 3, "done"); head, hist = note_append(head, hist, "字幕URLなし(スキップ)") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) _mark_publishable_y(R, hd) return True head = note_set_step_status(head, 3, "reading") sub_url = resolve_to_raw(sub_src) if DROPBOX_HOST in sub_url: sub_url = to_dropbox_direct(sub_url) vimeo.delete_ja_subs(vid) up = vimeo.create_sub_container(vid) vtt_text = requests.get(sub_url, timeout=20).text vtt = sanitize_vtt(vtt_text) try: vimeo.put_subtitle(up, vtt, vid) except Exception: up2 = vimeo.create_sub_container(vid) vimeo.put_subtitle(up2, vtt, vid) write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "step3_finished") head = note_set_step_status(head, 3, "done") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) _mark_publishable_y(R, hd) return True except Exception as e: head = note_set_step_status(head, 3, "error") head, hist = note_append(head, hist, f"Step3 error: {str(e)[:500]}") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iJOB, "error_step3") write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], R, iQ, head + "\n" + hist) return True return True finally: try: fcntl.flock(fd, fcntl.LOCK_UN) os.close(fd) except Exception: pass def _rows_for_embed(values: List[List[Any]], hd: List[Any], iVID: int, iTTL: int, iR: int, iS: int, iPUB: int) -> List[Dict[str, Any]]: rows = [] for r in range(1, len(values)): row = values[r] vid = str((row[iVID] if iVID>=0 and iVID=0 and iTTL=0 and iR =0 and iS =0 and iPUB= 0: write_cell(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"], row_1based, iPUB, "Y") # ============================= # CLI # ============================= def main(): ap = argparse.ArgumentParser() ap.add_argument("--max-items", type=int, default=1) ap.add_argument("--poll-secs", type=int, default=20) args = ap.parse_args() ok = process_once(args.max_items, args.poll_secs) # 最後の比較/POSTは「差分がある時のみ」ログ&POST(重複ログ抑制) try: values = read_sheet_all(CONFIG["PKG_SHEET_ID"], CONFIG["PKG_SHEET_NAME"]) if values and len(values) >= 2: hd = values[0] col = CONFIG["COL"] iVID = header_index(hd, col["VID"]) iTTL = header_index(hd, col["TITLE"]) iR = header_index(hd, col["PLAY_URL"]) iS = header_index(hd, col["CUSTOM"]) iPUB = header_index(hd, col["PUB"]) rows = _rows_for_embed(values, hd, iVID, iTTL, iR, iS, iPUB) py_path = build_pytest_embed_json(rows, CONFIG["PYTEST_OUTDIR"]) ok2, diff = compare_sets(py_path, CONFIG["PROD_EMBED_GET_URL"]) if not ok2: log.info("compare result ok=%s diff=%s", ok2, diff) if CONFIG["EMBED_JSON_POST_URL"]: try: post_embed_json(py_path, CONFIG["EMBED_JSON_POST_URL"]) log.info("POST done") except Exception as e: log.warning("POST failed: %s", e) # ok=True のときは静かにスルー(必要ならDEBUGに切替) # else: # log.debug("final verify ok=True; no action") except Exception as e: log.warning("post/compare phase failed: %s", e) if __name__ == "__main__": main() ``` 自動起動plist <任意のフォルダパス>/com.XXXXXX.vimeo-worker.plist ``` Labelcom.XXXXXX.vimeo-worker ProgramArguments <任意のフォルダパス> <任意のフォルダパス>/vimeo_worker.py --max-items3 --poll-secs20 EnvironmentVariables VIMEO_TOKEN EMBED_JSON_ENDPOINT GOOGLE_SERVICE_ACCOUNT_JSON <任意のフォルダパス>/service_account.json PROD_EMBED_GET_URL https://XXXXXX.com/wp-content/uploads/gd_wp_data/videoembed.json StartInterval60 RunAtLoad KeepAlive SuccessfulExit WorkingDirectory <任意のフォルダパス> StandardOutPath/dev/null StandardErrorPath<任意のフォルダパス>/XXXXXX_vimeo-worker.err ```