# [メタ情報] # 識別子: scripts_exeフォルダからgdocを生成_exe # 補足: # [/メタ情報] 要約: このシステムは、macOS上でローカルのテキストファイルをGoogleドキュメントへ自動同期するものです。`sync_exe_docs.py`は、指定フォルダ内の`_exe.txt`や`_wip.txt`ファイルを更新日時順に収集し、それらを複数のGoogleドキュメントに、目次と内容を含めて書き込みます。Google Docs APIのOAuth2認証を利用します。 `watcher_exe.py`は、`sync_exe_docs.py`が対象とするローカルフォルダをリアルタイムで監視します。ファイル作成、変更、移動を検知すると、一定のクールダウン期間(5秒)を経て`sync_exe_docs.py`を呼び出し、同期処理を自動実行します。 `com.XXXXXX.exe-docsupdater.plist`は、macOSのLaunch Agentとして`watcher_exe.py`をシステム起動時に自動起動し、バックグラウンドで常駐させる設定ファイルです。これにより、ユーザー操作なしに常にローカルファイルとGoogleドキュメント間の同期が維持されます。標準出力とエラー出力は`/tmp/`にリダイレクトされます。 /Users/XXXXXX/python_scripts/sync_exe_docs.py ``` import os import os.path import unicodedata import socket from googleapiclient.discovery import build from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from dotenv import load_dotenv # デフォルトのタイムアウト設定(10分) socket.setdefaulttimeout(600) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # ================= 設定エリア ================= load_dotenv("/Users/XXXXXX/python_scripts/.env") SOURCE_FOLDER_PATH = "/Users/XXXXXX/scripts_exe/" CLIENT_SECRET_FILE = os.path.join(SCRIPT_DIR, 'credentials.json') TOKEN_FILE = os.path.join(SCRIPT_DIR, 'token.json') # 【設定】書き込み先のGoogleドキュメントIDリスト TARGET_DOC_IDS = [ "1CRt4GyegH22Lv8nXW_I7YuRUEv73gkfsuoVDiT1vdq0", # 生成コード1 (1-15件目) "1O2tr4Jqryi9G92gJ0mk0Q-4PMJS_MJf65Tv8A79qfPg", # 生成コード2 (16-30件目) "1GD0JHg-cq79Ao33Uj3cspeslqhV3hfUSHriizs9lJ0g", # 生成コード3 (31-45件目) "1nXwTb4R1GA8iisiR_zxuR4VGorA94prZs7MUy_ACjJo", # 生成コード4 (46-60件目) "1nJatI-jfvtv2dPBp_aYA_3_pf9W2Y7WfTukrw8PxIKg", # 生成コード5 (61-75件目) "1_nd1GDHAHXpgk7Shx0HPDPkVK0ykCgXSYv29DG9qcKI", # 生成コード6 (76-90件目) "1OINrlumP5B_u9-EzrywntT2rTYQnGwt9r57po4YlOUs", # 生成コード7 (91-105件目) "19ph_9khDM11dcYuKve7OU1iDhh0jYNFkWEwA-UR6AaQ", # 生成コード8 (106-120件目) "1bbwgZwnXLZpfgpxXDILo7tJfI4e5d-_v4Siz-4l-K4s", # 生成コード9 (121-135件目) "174wkQFIudpwc7DiWIpm1zoAlfmHPtZbj37JU69qwJhk", # 生成コード10 (136-150件目) ] # 1つのドキュメントに含めるファイル数 FILES_PER_DOC = 15 # 対象とするサフィックスのリスト(_exe.txt と _wip.txt) TARGET_SUFFIXES = ['_exe.txt', '_wip.txt'] # 除外ファイル(必要に応じて '空_wip.txt' なども追加可能) EXCLUDE_FILES = ['空_exe.txt', '空_wip.txt', '.DS_Store'] EXCLUDE_DIRS = ['.git', '__pycache__', '.idea', 'venv', 'node_modules'] SCOPES = ['https://www.googleapis.com/auth/documents'] # ============================================ def get_sorted_files_list(root_folder): """ ディレクトリを探索し、対象ファイル(*_exe.txt, *_wip.txt)の情報をリスト化して返す(更新日時降順) """ collected_files = [] skipped_files = 0 print(f"📂 探索開始: {root_folder}") if not os.path.exists(root_folder): print(f" [エラー] フォルダが存在しません: {root_folder}") return collected_files, skipped_files for root, dirs, files in os.walk(root_folder): dirs[:] = [d for d in dirs if d not in EXCLUDE_DIRS] for file_name in files: # 除外ファイルチェック if file_name in EXCLUDE_FILES: skipped_files += 1 continue # いずれかの対象サフィックスにマッチするかチェック if not any(file_name.endswith(suffix) for suffix in TARGET_SUFFIXES): continue file_path = os.path.join(root, file_name) relative_path = os.path.relpath(file_path, root_folder) normalized_path = unicodedata.normalize('NFC', relative_path) try: mtime = os.path.getmtime(file_path) with open(file_path, 'r', encoding='utf-8') as f: content = f.read() collected_files.append({ 'path': normalized_path, 'content': content, 'mtime': mtime }) except Exception as e: print(f" [エラー] 読み込み失敗: {relative_path} ({e})") # 更新日時(mtime)の新しい順(降順)に並び替え collected_files.sort(key=lambda x: x['mtime'], reverse=True) return collected_files, skipped_files def format_text_for_chunk(file_chunk, start_index): """ 分割されたファイルリストから、目次付きテキストを生成する """ if not file_chunk: return "" toc_lines = ["[目次]"] for i, file_data in enumerate(file_chunk, start_index): toc_lines.append(f"{i}. {file_data['path']}") toc_text = "\n".join(toc_lines) body_text_parts = [] separator_line = "=" * 80 for file_data in file_chunk: header = f"\n{separator_line}\n\n" header += f"=== {file_data['path']} ===\n\n" body_text_parts.append(header + file_data['content'] + "\n") return toc_text + "\n\n" + "".join(body_text_parts) def get_credentials(): creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: print("🔄 認証トークンを更新中...") creds.refresh(Request()) else: print("🌍 ブラウザを開いてログインしてください...") if not os.path.exists(CLIENT_SECRET_FILE): raise FileNotFoundError(f"エラー: {CLIENT_SECRET_FILE} が見つかりません。") flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) creds = flow.run_local_server(port=0) with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) return creds def update_google_doc(target_doc_id, text_content): """ 指定されたIDのドキュメントを更新する """ if not target_doc_id: return short_id = target_doc_id[-5:] print(f"Google Docs APIに接続中... (ID: ...{short_id})") if not text_content: text_content = "(このドキュメントに割り当てられるファイルはありません)" try: creds = get_credentials() service = build('docs', 'v1', credentials=creds) doc = service.documents().get( documentId=target_doc_id, fields="body(content(endIndex))" ).execute() content = doc.get('body').get('content') if not content: end_index = 1 else: end_index = content[-1]['endIndex'] - 1 requests = [] if end_index > 1: requests.append({'deleteContentRange': {'range': {'startIndex': 1, 'endIndex': end_index}}}) requests.append({'insertText': {'location': {'index': 1}, 'text': text_content}}) service.documents().batchUpdate(documentId=target_doc_id, body={'requests': requests}).execute() print(f"✅ 更新完了 (ID: ...{short_id})") except Exception as e: print(f"❌ APIエラー (ID: ...{short_id}): {e}") def main_sync(): """メイン同期処理""" all_files, skipped = get_sorted_files_list(SOURCE_FOLDER_PATH) total_files = len(all_files) print("-" * 30) print(f"読み込み成功: {total_files} ファイル") print(f"除外スキップ: {skipped} ファイル") for i, doc_id in enumerate(TARGET_DOC_IDS): start_idx = i * FILES_PER_DOC end_idx = start_idx + FILES_PER_DOC chunk_files = all_files[start_idx:end_idx] toc_start_num = start_idx + 1 file_count_in_chunk = len(chunk_files) if file_count_in_chunk > 0: current_range = f"{toc_start_num} - {start_idx + file_count_in_chunk}" else: current_range = "データなし・待機中" print(f"\n--- Doc #{i+1} (Files {current_range}) ---") chunk_text = format_text_for_chunk(chunk_files, toc_start_num) update_google_doc(doc_id, chunk_text) if total_files > len(TARGET_DOC_IDS) * FILES_PER_DOC: limit = len(TARGET_DOC_IDS) * FILES_PER_DOC print(f"\n⚠️ 注意: ファイル数が上限({limit})を超えています。残りのファイルは同期されません。") if __name__ == "__main__": main_sync() ``` /Users/XXXXXX/python_scripts/watcher_exe.py ``` import time import sys import os import unicodedata from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler sys.path.append(os.path.dirname(os.path.abspath(__file__))) import sync_exe_docs class ChangeHandler(FileSystemEventHandler): """変更を検知するクラス""" def __init__(self): self.last_modified = time.time() # 確実にエディタの保存(ディスク書き込み)が完了するよう、待機を5秒に延長 self.cooldown_seconds = 5 self.file_last_seen = {} def process_event(self, src_path): """イベントの共通処理""" filename = os.path.basename(src_path) # 隠しファイルやスクリプト自身は無視 if filename.startswith('.') or filename in ['watcher_exe.py', 'sync_exe_docs.py']: return normalized_name = unicodedata.normalize('NFC', filename) if normalized_name in sync_exe_docs.EXCLUDE_FILES: return # 対象のサフィックス(_exe.txt または _wip.txt)を持っている場合のみトリガー if any(normalized_name.endswith(suffix) for suffix in sync_exe_docs.TARGET_SUFFIXES): current_time = time.time() last_seen = self.file_last_seen.get(src_path, 0) # 連射ガード(10秒) if current_time - last_seen < 10: return self.file_last_seen[src_path] = current_time print(f"👀 検知: {filename} の変更を捕捉しました(5秒後に同期を開始します)") self.last_modified = time.time() def on_modified(self, event): if not event.is_directory: self.process_event(event.src_path) def on_created(self, event): if not event.is_directory: self.process_event(event.src_path) def on_moved(self, event): if not event.is_directory: self.process_event(event.dest_path) def run_sync_process(): """同期実行""" print("\n" + "="*30) print("🔄 Googleドキュメント同期プロセスを開始...") try: # スクリプトのキャッシュをクリアして確実に最新のファイルを読み込ませる import importlib importlib.reload(sync_exe_docs) sync_exe_docs.main_sync() except Exception as e: print(f"❌ 重大エラー: {e}") print("="*30 + "\n") print(f"監視中... 対象: {sync_exe_docs.SOURCE_FOLDER_PATH}") if __name__ == "__main__": target_path = sync_exe_docs.SOURCE_FOLDER_PATH if not os.path.exists(target_path): print(f"❌ エラー: 監視対象のフォルダが見つかりません: {target_path}") sys.exit(1) event_handler = ChangeHandler() observer = Observer() observer.schedule(event_handler, target_path, recursive=True) observer.start() print(f"🚀 監視を開始しました: {target_path}") print("終了するには 'Ctrl + C' を押してください。") try: last_triggered = 0 while True: time.sleep(1) if event_handler.last_modified > last_triggered: # 最後に変更を検知してから指定秒数(5秒)以上経過したら実行 if time.time() - event_handler.last_modified > event_handler.cooldown_seconds: run_sync_process() last_triggered = time.time() except KeyboardInterrupt: observer.stop() print("\n🛑 終了します") observer.join() ``` /Users/XXXXXX/Library/LaunchAgents/com.XXXXXX.exe-docsupdater.plist ``` Label com.XXXXXX.exe-docsupdater ProgramArguments /usr/bin/python3 -u /Users/XXXXXX/python_scripts/watcher_exe.py WorkingDirectory /Users/XXXXXX/python_scripts RunAtLoad StandardOutPath /tmp/exe-docsupdater.out StandardErrorPath /tmp/exe-docsupdater.err ```