# [メタ情報] # 識別子: videoembed.json生成_python_exe # システム名: videoembed.json生成_python # 技術種別: Misc # 機能名: Misc # 使用言語: Python # 状態: 実行用 # [/メタ情報] 要約:macOSのLaunchAgentが10秒ごとにkick_flags.pyを起動。F1_2のI列またはF2のB/U列がTRUEで発火。まずsync_video_embedcode.pyでS列を再生成(F2起点は–force)。次にvideoembed.json(必要ならvideoct.json)を生成・アップし、F2のB/UをFALSEへ。syncは所在場所×テンプレ条件で埋め込み作成、変更行のU=TRUE・IをFALSE化。generate_videoembed_jsonは公開行のみJSON化。xserver_uploaderはscp後にサーバ側mvで原子的更新。 ┌────────────────────────────────────────────┐ │ LaunchAgent (macOS, 常駐) │ │ ~/Library/LaunchAgents/com.xxxxxxxx.kick-flags.plist │ └─ 10秒ごとに kick_flags.py を起動/監視 └────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────┐ │ kick_flags.py │ │ トリガ検出: │ │ • F1_2.I = TRUE(名前関係付け) │ │ • F2.B or F2.U = TRUE(シート1) │ │ 実行シーケンス: │ │ 1) sync_video_embedcode.py [--force] │ │ - F2でTRUEあり → --force で全件S再生成 │ │ - F1_2のみTRUE → 通常モード │ │ 2) JSON生成とアップロード │ │ - (統合版を使うなら) │ │ generate_and_upload_videoembed.py │ │ generate_and_upload_videoct.py(任意)│ │ - (分離運用なら) │ │ generate_videoembed_json.py │ │ └→ xserver_uploader.upload_secure_json() │ 3) F2のB/UをFALSEへ戻す(batch_update) │ └────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────┐ │ sync_video_embedcode.py(--force対応) │ │ • F1_2(A:所在場所, F:テンプレ, G:条件JSON) │ │ × F2(E:所在場所) で突合し、S列を再生成 │ │ • 変更があった行は F2.U=TRUE に立てる │ │ • F1_2.I=TRUE がヒットしていた場合は FALSEへ │ └────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────┐ │ generate_videoembed_json.py │ │ • F2の「公開」行だけ抽出 │ │ - A=公開, C=videoid, S=embedCode, W=video2│ │ • JSONファイル出力(tmp→置換で安全) │ │ 例: /Users/xxxxxxxxm1/meta_gd_wp_data/pytest_videoembed.json └────────────────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────┐ │ xserver_uploader.py │ │ upload_secure_json(local_json_path) │ │ 1) scpでXserverのホームへ一時アップ │ │ 2) sshで _secure/wp_json へ mv(原子的更新) │ │ 3) chmod 640, touch -m, wc -c ログ出力 │ └────────────────────────────────────────────┘ M1 Mac mini /Users/xxxxxxxxm1/python_scripts/kick_flags.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ kick_flags.py - 発火条件(どれか1つでも TRUE なら発火) (A) F1_2(名前関係付け)I列 に TRUE が1件以上 (B) F2(シート1) B列(更新状況) or U列(埋め込みコード更新状況) に TRUE が1件以上 - 手順 1) sync_video_embedcode.py で S を再生成 * F2 起点(B/U=TRUE あり)の場合は --force で強制再生成 * F1_2 起点のみの場合は引数なし(従来どおり) * F1_2.I の FALSE 戻しは sync 側に任せる(--force 時は触らない) 2) generate_and_upload_videoembed.py を実行(videoembed.json を生成→アップ) 3) generate_and_upload_videoct.py を実行(videoct.json を生成→アップ) 4) F2 の B/U 列 TRUE を FALSE に戻す(本スクリプトで実施) - 安全策 * ロックファイルで多重起動防止 * 429(write/min)に遭遇したら指数バックオフ再試行 * ログは ~/Library/Logs/kick_flags.log """ import os import sys import time import subprocess from contextlib import contextmanager from typing import List, Tuple import gspread from google.oauth2.service_account import Credentials # ====== 固定設定 ====== SA_JSON_PATH = "/Users/xxxxxxxxm1/keys/service.json" # F2 SPREADSHEET_F2_ID = "17TMSsh8OF8bgcP0NEuM3X67oPfACVSmkGvy8w-DgrZY" SHEET_NAME_F2 = "シート1" F2_COL_B_UPDATED = 1 # B列: 更新状況(0-based) F2_COL_U_EMB_STATUS = 20 # U列: 埋め込みコード更新状況(0-based) # F1_2 SPREADSHEET_F12_ID = "1_7c823FgcV_KTZ4vg_tQ0WPPljqtzX8JY8vfyIdXz2Q" SHEET_NAME_F12 = "名前関係付け" F12_COL_I_UPDATE = 8 # I列: 更新フラグ(0-based) # 実行スクリプト PYTHON_BIN = "/opt/homebrew/opt/python@3.11/bin/python3.11" SYNC_SCRIPT = "/Users/xxxxxxxxm1/python_scripts/sync_video_embedcode.py" GEN_EMBED_SCRIPT = "/Users/xxxxxxxxm1/python_scripts/generate_and_upload_videoembed.py" GEN_VIDEOCT_SCRIPT = "/Users/xxxxxxxxm1/python_scripts/generate_and_upload_videoct.py" # スコープ SC_RO = ["https://www.googleapis.com/auth/spreadsheets.readonly"] SC_RW = ["https://www.googleapis.com/auth/spreadsheets"] # ログ LOG_PATH = os.path.expanduser("~/Library/Logs/kick_flags.log") # ロック LOCK_PATH = "/tmp/kick_flags.lock" LOCK_TTL_SEC = 180 # フェイルセーフで3分後に捨てる def log(msg: str) -> None: s = (msg if msg.endswith("\n") else msg + "\n") sys.stdout.write(s) sys.stdout.flush() try: with open(LOG_PATH, "a", encoding="utf-8") as f: f.write(s) except Exception: pass def _gc(scopes: List[str]) -> gspread.Client: creds = Credentials.from_service_account_file(SA_JSON_PATH, scopes=scopes) return gspread.authorize(creds) def _open(ws_key: str, sheet: str, scopes: List[str]): return _gc(scopes).open_by_key(ws_key).worksheet(sheet) def _rows_true(rows: List[List[str]], col: int) -> List[int]: hits = [] for i, r in enumerate(rows[1:], start=2): v = (r[col].strip().upper() if len(r) > col and r[col] else "") if v == "TRUE": hits.append(i) return hits def _detect_triggers() -> Tuple[List[int], List[int]]: """戻り値: (F12_I_true_rows, F2_B_or_U_true_rows) 各 1-based 行番号のリスト""" ws_f12 = _open(SPREADSHEET_F12_ID, SHEET_NAME_F12, SC_RO) ws_f2 = _open(SPREADSHEET_F2_ID, SHEET_NAME_F2, SC_RO) rows_f12 = ws_f12.get_all_values() rows_f2 = ws_f2.get_all_values() i_hits = _rows_true(rows_f12, F12_COL_I_UPDATE) b_hits = _rows_true(rows_f2, F2_COL_B_UPDATED) u_hits = _rows_true(rows_f2, F2_COL_U_EMB_STATUS) fu_hits = sorted(set(b_hits + u_hits)) return i_hits, fu_hits def _run(cmd: List[str], name: str, retries: int = 3) -> bool: """429などで失敗したら指数バックオフ再試行""" delay = 3 for attempt in range(1, retries + 1): log(f"▶ {' '.join(cmd)}") try: subprocess.run(cmd, check=True) return True except subprocess.CalledProcessError as e: log(f"❌ {name} 失敗 ({attempt}/{retries}): {e}") if attempt == retries: return False time.sleep(delay) delay *= 2 return False def _reset_f2_flags(rows_1based: List[int]) -> int: """F2 の B/U を FALSE に戻す(行リスト指定): batch_update で一括""" if not rows_1based: return 0 ws = _open(SPREADSHEET_F2_ID, SHEET_NAME_F2, SC_RW) updates = [] for r in rows_1based: updates.append({"range": f"B{r}:B{r}", "values": [["FALSE"]]}) updates.append({"range": f"U{r}:U{r}", "values": [["FALSE"]]}) try: ws.batch_update(updates) return len(rows_1based) except Exception as e: log(f"⚠️ F2フラグ戻し失敗(一括): {e}") # フォールバック(個別更新) restored = 0 for r in rows_1based: try: ws.update(f"B{r}:B{r}", [["FALSE"]]) ws.update(f"U{r}:U{r}", [["FALSE"]]) restored += 1 time.sleep(0.15) except Exception as ee: log(f"⚠️ F2フラグ戻し失敗 row={r}: {ee}") return restored @contextmanager def _lock(path: str, ttl: int): now = time.time() if os.path.exists(path): try: st = os.stat(path) if now - st.st_mtime <= ttl: log("⚠️ 他のkick_flagsが実行中。スキップ。") yield False return except Exception: pass try: with open(path, "w") as f: f.write(str(os.getpid())) yield True finally: try: os.remove(path) except Exception: pass def main(): log("===== kick_flags.py: 起動(F12+F2スキャン+ロック+再試行)=====") with _lock(LOCK_PATH, LOCK_TTL_SEC) as acquired: if not acquired: return # 伝播遅延対策(2回読む) i_hits, fu_hits = _detect_triggers() if not (i_hits or fu_hits): time.sleep(1.5) i_hits, fu_hits = _detect_triggers() if not (i_hits or fu_hits): log("フラグなし。終了。") return if i_hits: log(f"✅ F1_2.I TRUE 検出: rows={i_hits}") if fu_hits: log(f"✅ F2.B/U TRUE 検出: rows={fu_hits}") # 1) S再生成 # - F2 側で TRUE を検出した場合は --force を付けて「強制全体再生成」 # - F1_2 側のみの場合は従来どおり引数なし sync_cmd = [PYTHON_BIN, SYNC_SCRIPT] if fu_hits: sync_cmd.append("--force") if not _run(sync_cmd, "sync_video_embedcode.py", retries=4): log("⛔ 同期失敗。終了。") return # 2) embed JSON if not _run([PYTHON_BIN, GEN_EMBED_SCRIPT], "generate_and_upload_videoembed.py", retries=3): log("⚠️ videoembed.json 生成・アップで失敗") # 3) videoct JSON(存在する場合のみ) if os.path.exists(GEN_VIDEOCT_SCRIPT): if not _run([PYTHON_BIN, GEN_VIDEOCT_SCRIPT], "generate_and_upload_videoct.py", retries=3): log("⚠️ videoct.json 生成・アップで失敗") else: log("ℹ️ generate_and_upload_videoct.py が見つかりませんでした(videoct はスキップ)") # 4) F2 の B/U を FALSE に戻す(TRUE を見つけたときのみ) if fu_hits: restored = _reset_f2_flags(fu_hits) log(f"🧹 F2フラグ戻し: {restored} 行を FALSE に更新") log("🎉 全処理完了(F12+F2トリガ対応)") if __name__ == "__main__": main() sync_video_embedcode.py /Users/xxxxxxxxm1/python_scripts/sync_video_embedcode.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ sync_video_embedcode.py(高速・バルク更新版 + F2起点対応) - F1_2(名前関係付け)の I列 に TRUE が1件でもあれば、 F2(シート1)の全行を走査して S を一括再生成し、S列を一括上書き。 変更があった行の U を TRUE にする(範囲で一括更新)。 - 再生成ロジック: F2の E列(動画所在場所) と F1_2の A列 をキーに突き合わせ、 F1_2の F列(埋め込みテンプレ)と G列(JSONの対応条件)で @@key@@ を F2の「対応条件で指定された列番号(1-based)」の値に置換する。 - 実行後、F1_2 の I列=TRUE は FALSE に戻す(※ヒットがあったとき)。 - 追加: `--force` を付けると、I=TRUE が無くても強制で全体再生成(F2保存起点用)。 使い分け: - F1_2保存起点 … 引数なしで呼ぶ(従来どおり I=TRUE 検知で動作) - F2保存起点 … `--force` で呼ぶ(I=TRUEが無くても S/U を再生成) """ import argparse import json import re import time from typing import Dict, Any, List, Tuple import gspread from google.oauth2.service_account import Credentials # ====== 設定 ====== SA_JSON_PATH = "/Users/xxxxxxxxm1/keys/service.json" # F2 SPREADSHEET_F2_ID = "17TMSsh8OF8bgcP0NEuM3X67oPfACVSmkGvy8w-DgrZY" SHEET_NAME_F2 = "シート1" # F1_2 SPREADSHEET_F12_ID = "1_7c823FgcV_KTZ4vg_tQ0WPPljqtzX8JY8vfyIdXz2Q" SHEET_NAME_F12 = "名前関係付け" # 列定義(0-based) F2_COL_E_LOCATION = 4 # E: 動画所在場所 F2_COL_S_EMBED = 18 # S: 埋め込みコード F2_COL_U_STATUS = 20 # U: 埋め込みコード更新状況(フラグ) F12_COL_A_LOCATION = 0 # A: 動画所在場所 F12_COL_F_TEMPLATE = 5 # F: 埋め込みテンプレート F12_COL_G_CONDITIONS = 6 # G: 対応条件(JSON) F12_COL_I_UPDATE_FLAG = 8 # I: 更新フラグ(TRUEで走る) # gspread スコープ(編集権限あり) SCOPES = ["https://www.googleapis.com/auth/spreadsheets"] def _auth(): creds = Credentials.from_service_account_file(SA_JSON_PATH, scopes=SCOPES) return gspread.authorize(creds) def _safe(row: List[str], idx: int) -> str: return row[idx] if idx < len(row) and row[idx] is not None else "" def _build_relation_map(rows_f12: List[List[str]]) -> Dict[str, Tuple[str, Dict[str, Any]]]: """A=所在場所 をキーに (F=テンプレ, G=条件dict) を返すマップを作る。""" m: Dict[str, Tuple[str, Dict[str, Any]]] = {} for r in rows_f12[1:]: loc = _safe(r, F12_COL_A_LOCATION).strip() if not loc: continue tmpl = _safe(r, F12_COL_F_TEMPLATE) cond_raw = (_safe(r, F12_COL_G_CONDITIONS) or "").strip() try: cond = json.loads(cond_raw) if cond_raw else {} if not isinstance(cond, dict): cond = {} except Exception: cond = {} m[loc] = (tmpl or "", cond) return m def _render_embed_for_row(row_f2: List[str], relation: Tuple[str, Dict[str, Any]]) -> str: """ F2の1行に対し、(template, conditions) で @@key@@ 置換して返す。 conditions は {"key": "列番号(1-based)"} を想定。 """ template, conditions = relation out = str(template or "") if not conditions: return out for key, idx_str in conditions.items(): col_1based = None try: col_1based = int(str(idx_str).strip()) except Exception: pass repl = "" if col_1based and col_1based >= 1: repl = _safe(row_f2, col_1based - 1) out = re.sub(r"@@{}@@".format(re.escape(str(key))), repl, out) return out def _regenerate_and_update(ws_f2, rows_f2, relmap) -> int: """S/U を再生成して一括更新。戻り値は S 変化行数。""" if not rows_f2: print("F2 が空です。") return 0 n_rows = len(rows_f2) # ヘッダ含む # 現在の S / U 列(比較用) cur_S = [_safe(rows_f2[i], F2_COL_S_EMBED) if i < len(rows_f2) else "" for i in range(1, n_rows)] cur_U = [_safe(rows_f2[i], F2_COL_U_STATUS) if i < len(rows_f2) else "" for i in range(1, n_rows)] # 新しい S と U を行ごとに作成(2行目〜) new_S_list: List[List[str]] = [] new_U_list: List[List[str]] = [] changed = 0 for i, r in enumerate(rows_f2[1:], start=2): loc = _safe(r, F2_COL_E_LOCATION).strip() old_s = _safe(r, F2_COL_S_EMBED) if loc and loc in relmap: new_s = _render_embed_for_row(r, relmap[loc]) else: # 対応ルールなし → 現状維持 new_s = old_s # Sは常に「最新再生成」で上書き new_S_list.append([new_s]) # Uは「Sに変化があった行のみ TRUE」、なければ現状維持 if str(new_s) != str(old_s): new_U_list.append(["TRUE"]) changed += 1 else: new_U_list.append([_safe(r, F2_COL_U_STATUS)]) # 一括更新(S列・U列) if n_rows > 1: s_range = f"S2:S{n_rows}" u_range = f"U2:U{n_rows}" ws_f2.update(s_range, new_S_list, value_input_option="RAW") ws_f2.update(u_range, new_U_list, value_input_option="RAW") print(f"F2: S/U を一括更新(行数={n_rows-1}、S変化あり={changed})") else: print("F2 にデータ行がありません。") return changed def main(): parser = argparse.ArgumentParser(description="Sync WordPress embed codes (F2 S列) from F1_2 mapping.") parser.add_argument("--force", action="store_true", help="I=TRUE が無くても強制で全件再生成(F2保存起点で使う)") args = parser.parse_args() print("=== sync_video_embedcode.py: start ===") gc = _auth() ws_f12 = gc.open_by_key(SPREADSHEET_F12_ID).worksheet(SHEET_NAME_F12) rows_f12 = ws_f12.get_all_values() # I列=TRUE の有無を確認 hits = [ i for i, r in enumerate(rows_f12[1:], start=2) if len(r) > F12_COL_I_UPDATE_FLAG and (r[F12_COL_I_UPDATE_FLAG] or "").strip().upper() == "TRUE" ] if not args.force and not hits: print("I=TRUE が見つからないため、処理をスキップします。(--force で強制実行可)") print("=== sync_video_embedcode.py: done ===") return if not args.force: # AppSheet書き込み直後のズレ対策:少し待ってから F1_2 を再読込 time.sleep(1.5) rows_f12 = ws_f12.get_all_values() # 突き合わせマップ relmap = _build_relation_map(rows_f12) # F2 取得 ws_f2 = gc.open_by_key(SPREADSHEET_F2_ID).worksheet(SHEET_NAME_F2) rows_f2 = ws_f2.get_all_values() # 再生成&一括更新 changed = _regenerate_and_update(ws_f2, rows_f2, relmap) # I=TRUE を FALSE に戻す(ヒットがあった場合のみ) if hits: data = [{"range": f"I{row}:I{row}", "values": [["FALSE"]]} for row in hits] if data: ws_f12.batch_update(data) print(f"F1_2:I back to FALSE: {len(hits)}") print("=== sync_video_embedcode.py: done ===") if __name__ == "__main__": main() generate_videoembed_json.py /Users/xxxxxxxxm1/python_scripts/generate_videoembed_json.py # /Users/xxxxxxxxm1/python_scripts/generate_videoembed_json.py # -*- coding: utf-8 -*- """ 完全版:GAS版 videoembed.json と同一仕様・同一フォーマットで出力 ----------------------------------------------------- - 元仕様: A列 (0): 公開/非公開 → "公開" の行のみ対象 C列 (2): 動画ID → videoid S列 (18): 埋め込みコード → embedCode W列 (22): 低画質URL → video2 (任意) - その他の列は無視 - embedCode, videoid が空行は除外 - 出力: /Users/xxxxxxxxm1/meta_gd_wp_data/pytest_videoembed.json - 構造: JSON配列(各要素 = {videoid, embedCode, (video2)}) - 整形: インデント2、改行あり、UTF-8 / ensure_ascii=False """ import os import json import gspread from google.oauth2.service_account import Credentials # ======= 基本設定 ======= SHEET_ID = "" SHEET_TAB = "" SERVICE_JSON = "/Users/xxxxxxxxm1/keys/service.json" OUTPUT_PATH = "/Users/xxxxxxxxm1/meta_gd_wp_data/pytest_videoembed.json" # ======= 列インデックス (0-based) ======= COL_A_PUBLISH = 0 # 公開/非公開 COL_C_VIDEOID = 2 # 動画ID COL_S_EMBED = 18 # WordPress埋め込みコード COL_W_VIDEO2 = 22 # 低画質動画URL (任意) # ======= 認証 ======= SCOPES = ["https://www.googleapis.com/auth/spreadsheets.readonly"] creds = Credentials.from_service_account_file(SERVICE_JSON, scopes=SCOPES) gc = gspread.authorize(creds) # ======= 関数群 ======= def _safe_get(row, idx): """行データの中から安全にインデックス参照""" return row[idx] if idx < len(row) else "" def _to_int_or_str(v): """GAS同様、数値化可能ならint、そうでなければ文字列""" s = str(v).strip() if not s: return None try: i = int(float(s)) return i except Exception: return s def _is_valid_embed(embed): """GAS版の空・null・空文字チェック""" if embed is None: return False e = str(embed).strip() if e in ("", "null", '""'): return False return True def generate_videoembed_json(): # === シート読み取り === ws = gc.open_by_key(SHEET_ID).worksheet(SHEET_TAB) all_rows = ws.get_all_values() if not all_rows or len(all_rows) <= 1: print("[generate_videoembed_json] no data") with open(OUTPUT_PATH, "w", encoding="utf-8") as f: json.dump([], f, ensure_ascii=False, indent=2) return OUTPUT_PATH # === データ構築 === output = [] for row in all_rows[1:]: # 0行目はヘッダ # 列取得(存在しない列は空文字で補う) publish = _safe_get(row, COL_A_PUBLISH).strip() videoid_raw = _safe_get(row, COL_C_VIDEOID).strip() embed = _safe_get(row, COL_S_EMBED) video2 = _safe_get(row, COL_W_VIDEO2).strip() # 公開判定 if publish != "公開": continue # embedCode / videoid 必須チェック if not _is_valid_embed(embed) or videoid_raw == "": continue videoid_val = _to_int_or_str(videoid_raw) item = { "videoid": videoid_val, "embedCode": embed.strip(), } if video2: item["video2"] = video2 output.append(item) # === JSON出力 === os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) tmp_path = OUTPUT_PATH + ".tmp" with open(tmp_path, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) os.replace(tmp_path, OUTPUT_PATH) print(f"[generate_videoembed_json] wrote: {OUTPUT_PATH} (items={len(output)})") return OUTPUT_PATH # ======= メイン実行 ======= if __name__ == "__main__": generate_videoembed_json() /Users/xxxxxxxxm1/python_scripts/xserver_uploader.py # -*- coding: utf-8 -*- import os, shlex, subprocess, uuid, logging, pathlib XS_USER = "xxxxxxxx" XS_HOST = "xxxxxxxx.xsrv.jp" XS_SSH_PORT = 10022 SSH_KEY_PATH = os.path.expanduser("~/.ssh/id_xserver") # 本番の“非公開・配信用”ディレクトリ DEST_DIR = "/home/xxxxxxxx/xxxxxxxx.com/public_html/_secure/wp_json" def _run(cmd): logging.info("RUN: %s", " ".join(shlex.quote(c) for c in cmd)) return subprocess.run(cmd, check=True, capture_output=True, text=True).stdout.strip() def upload_secure_json(local_path: str): """JSON を Xserver へ原子的に反映(ホームへ一旦置いてから mv)""" src = pathlib.Path(local_path) if not src.exists(): raise FileNotFoundError(f"local file not found: {src}") tmp_name = f"__upload_{uuid.uuid4().hex}.tmp" userhost = f"{XS_USER}@{XS_HOST}" remote_tmp_home = f"/home/{XS_USER}/{tmp_name}" remote_final = f"{DEST_DIR}/{src.name}" # ① scp: まずホームへ _run([ "scp", "-P", str(XS_SSH_PORT), "-o", "StrictHostKeyChecking=accept-new", "-o", "UserKnownHostsFile=~/.ssh/known_hosts", "-i", SSH_KEY_PATH, str(src), f"{userhost}:{remote_tmp_home}" ]) # ② サーバ側で原子的に反映(chmod 640、更新時刻も付け替え) move_script = ( f"set -euo pipefail; " f"DEST={shlex.quote(DEST_DIR)}; " f"TMP={shlex.quote(remote_tmp_home)}; " f"mkdir -p \"$DEST\"; " f"[ -f \"$TMP\" ] || {{ echo 'tmp not found' >&2; exit 1; }}; " f"chmod 640 \"$TMP\"; " f"mv -f \"$TMP\" \"$DEST/{src.name}\"; " f"touch -m \"$DEST/{src.name}\"; " f"wc -c < \"$DEST/{src.name}\"; " ) out = _run([ "ssh", "-p", str(XS_SSH_PORT), "-i", SSH_KEY_PATH, "-o", "StrictHostKeyChecking=accept-new", "-o", "UserKnownHostsFile=~/.ssh/known_hosts", userhost, "bash", "-lc", move_script ]) logging.info("uploaded bytes: %s", out.strip()) /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.kick-flags.plist Disabled EnvironmentVariables PATH /opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin Label com.xxxxxxxx.kick-flags ProgramArguments /opt/homebrew/opt/python@3.11/bin/python3.11 /Users/xxxxxxxxm1/python_scripts/kick_flags.py RunAtLoad StandardErrorPath /Users/xxxxxxxxm1/Library/Logs/kick_flags.err StandardOutPath /Users/xxxxxxxxm1/Library/Logs/kick_flags.log StartInterval 10