# [メタ情報] # 識別子: scripts_pubフォルダからgドキュメントを生成_exe # システム名: 未分類 # 技術種別: Misc # 機能名: 260205 # 使用言語: [] # 状態: 実行用 # [/メタ情報] 要約: このシステムは、ローカルのスクリプトファイルをGoogleドキュメントに自動同期し、公開するためのものです。 `sync_docs.py`は、設定されたローカルフォルダから特定の拡張子を持つファイルを読み込み、更新日時順に整理します。これらのファイル内容を目次と共に整形し、Google Docs API経由で、指定された複数のGoogleドキュメントに分割して書き込みます(1ドキュメントあたり最大50ファイル)。既存内容を認証後に更新する機能を持ちます。 `watcher.py`は`sync_docs.py`と連携し、監視対象フォルダ内のファイル変更をリアルタイムで検知します。変更後クールダウン期間を経て、自動的に`sync_docs.main_sync()`を呼び出し、Googleドキュメントへの同期処理を実行。ローカルファイルの更新を自動的にクラウドへ反映させます。 `com.xxxxxxxx.docsupdater.plist`はmacOSのLaunchAgents設定ファイルで、システムの起動時に`watcher.py`を自動実行し、バックグラウンドでファイル監視とGoogleドキュメント同期プロセスを常駐させる役割を担います。 全体として、このシステムはローカルで管理するスクリプトやテキストファイルを常に最新の状態でGoogleドキュメント上に保ち、ウェブ公開可能な自動化ワークフローを提供します。 ------------------------ /Users/xxxxxxxxm1/python_scripts/sync_docs.py ``` import os import os.path import urllib.parse import unicodedata import socket from googleapiclient.discovery import build from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow # デフォルトのタイムアウト設定(10分) socket.setdefaulttimeout(600) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # ================= 設定エリア ================= SOURCE_FOLDER_PATH = "/Users/xxxxxxxxm1/scripts_pub/" BASE_URL = "https://xxxxxxxx.com/script_list/?pubtxt=" CLIENT_SECRET_FILE = os.path.join(SCRIPT_DIR, 'credentials.json') TOKEN_FILE = os.path.join(SCRIPT_DIR, 'token.json') # 【設定】書き込み先のGoogleドキュメントIDリスト TARGET_DOC_IDS = [ '1NFNA-7xUSbCd4JX1sNbwtaS-Dg47Q_M698n-meEdXik', # 1-50 '1jyq43Kry3vmXMS1wzH92QYJufK38Ic_Sp9J4fAjodLE', # 51-100 '1LyJGlq4aeHYuBC4WBqUam5Wvii6DUniZHNkBymF5OVA', # 101-150 '1i84BoKKA3AiQPbHqONTt8ecvmWanmiB57HxdMHAEKE4', # 151-200 ] # 1つのドキュメントに含めるファイル数 FILES_PER_DOC = 50 TARGET_EXTENSIONS = [ '.txt', '.py', '.log', '.sh', '.md', '.json', '.yaml', '.html', '.css', '.js', '.php' ] EXCLUDE_FILES = [ 'key_patterns.txt', '.DS_Store', 'credentials.json', 'token.json', 'sync_docs.py', 'watcher.py' ] EXCLUDE_DIRS = ['.git', '__pycache__', '.idea', 'venv', 'node_modules'] SCOPES = ['https://www.googleapis.com/auth/documents'] # ============================================ def get_sorted_files_list(root_folder): """ ディレクトリを探索し、対象ファイルの情報をリスト化して返す(更新日時降順) """ collected_files = [] skipped_files = 0 print(f"📂 探索開始: {root_folder}") for root, dirs, files in os.walk(root_folder): dirs[:] = [d for d in dirs if d not in EXCLUDE_DIRS] for file_name in files: if file_name in EXCLUDE_FILES: skipped_files += 1 continue _, ext = os.path.splitext(file_name) if ext.lower() not in TARGET_EXTENSIONS: continue file_path = os.path.join(root, file_name) relative_path = os.path.relpath(file_path, root_folder) normalized_path = unicodedata.normalize('NFC', relative_path) url_path_str = normalized_path.replace(os.sep, '/') file_url = f"{BASE_URL}{url_path_str}" try: mtime = os.path.getmtime(file_path) with open(file_path, 'r', encoding='utf-8') as f: content = f.read() collected_files.append({ 'path': normalized_path, 'url': file_url, 'content': content, 'mtime': mtime }) except Exception as e: print(f" [エラー] 読み込み失敗: {relative_path} ({e})") # 更新日時(mtime)の新しい順(降順)に並び替え collected_files.sort(key=lambda x: x['mtime'], reverse=True) return collected_files, skipped_files def format_text_for_chunk(file_chunk, start_index): """ 分割されたファイルリストから、目次付きテキストを生成する """ if not file_chunk: return "" toc_lines = ["[目次]"] # 目次番号を通し番号にする for i, file_data in enumerate(file_chunk, start_index): toc_lines.append(f"{i}. {file_data['path']}") toc_text = "\n".join(toc_lines) body_text_parts = [] separator_line = "=" * 80 for file_data in file_chunk: header = f"\n{separator_line}\n\n" header += f"=== {file_data['path']} ===\n\n" header += f"URL: {file_data['url']}\n" body_text_parts.append(header + file_data['content'] + "\n") return toc_text + "\n\n" + "".join(body_text_parts) def get_credentials(): creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: print("🔄 認証トークンを更新中...") creds.refresh(Request()) else: print("🌍 ブラウザを開いてログインしてください...") if not os.path.exists(CLIENT_SECRET_FILE): raise FileNotFoundError(f"エラー: {CLIENT_SECRET_FILE} が見つかりません。") flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) creds = flow.run_local_server(port=0) with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) return creds def update_google_doc(target_doc_id, text_content): """ 指定されたIDのドキュメントを更新する """ if not target_doc_id: return # IDの末尾5文字だけ表示(ログ用) short_id = target_doc_id[-5:] print(f"Google Docs APIに接続中... (ID: ...{short_id})") if not text_content: text_content = "(このドキュメントに割り当てられるファイルはありません)" try: creds = get_credentials() service = build('docs', 'v1', credentials=creds) # 末尾のインデックスだけ取得(高速化) doc = service.documents().get( documentId=target_doc_id, fields="body(content(endIndex))" ).execute() content = doc.get('body').get('content') if not content: end_index = 1 else: end_index = content[-1]['endIndex'] - 1 requests = [] # 既存の内容を削除 if end_index > 1: requests.append({'deleteContentRange': {'range': {'startIndex': 1, 'endIndex': end_index}}}) # 新しい内容を挿入 requests.append({'insertText': {'location': {'index': 1}, 'text': text_content}}) service.documents().batchUpdate(documentId=target_doc_id, body={'requests': requests}).execute() print(f"✅ 更新完了 (ID: ...{short_id})") except Exception as e: print(f"❌ APIエラー (ID: ...{short_id}): {e}") def main_sync(): """メイン同期処理""" all_files, skipped = get_sorted_files_list(SOURCE_FOLDER_PATH) total_files = len(all_files) print("-" * 30) print(f"読み込み成功: {total_files} ファイル") print(f"除外スキップ: {skipped} ファイル") # 設定されたドキュメントID順に、50ファイルずつ処理 for i, doc_id in enumerate(TARGET_DOC_IDS): start_idx = i * FILES_PER_DOC end_idx = start_idx + FILES_PER_DOC # リストをスライス chunk_files = all_files[start_idx:end_idx] # 目次番号の開始値 toc_start_num = start_idx + 1 # ログ表示用 file_count_in_chunk = len(chunk_files) current_range = f"{toc_start_num} - {start_idx + file_count_in_chunk}" print(f"\n--- Doc #{i+1} (Files {current_range}) ---") # テキスト整形 chunk_text = format_text_for_chunk(chunk_files, toc_start_num) # API更新実行 update_google_doc(doc_id, chunk_text) if total_files > len(TARGET_DOC_IDS) * FILES_PER_DOC: limit = len(TARGET_DOC_IDS) * FILES_PER_DOC print(f"\n⚠️ 注意: ファイル数が上限({limit})を超えています。残りのファイルは同期されません。") if __name__ == "__main__": main_sync() ``` /Users/xxxxxxxxm1/python_scripts/watcher.py ``` import time import sys import os import unicodedata from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 同じフォルダにある sync_docs.py を読み込む設定 sys.path.append(os.path.dirname(os.path.abspath(__file__))) import sync_docs class ChangeHandler(FileSystemEventHandler): """変更を検知するクラス""" def __init__(self): self.last_modified = time.time() self.cooldown_seconds = 2 self.file_last_seen = {} def on_any_event(self, event): if event.is_directory: return src_path = event.src_path filename = os.path.basename(src_path) # 隠しファイルや自分自身は無視 if filename.startswith('.') or filename == 'watcher.py' or filename == 'sync_docs.py': return # 連射ガード(10秒) current_time = time.time() last_seen = self.file_last_seen.get(src_path, 0) if current_time - last_seen < 10: return self.file_last_seen[src_path] = current_time normalized_name = unicodedata.normalize('NFC', filename) if normalized_name in sync_docs.EXCLUDE_FILES: return print(f"👀 検知: {filename} が変更されました") self.last_modified = time.time() def run_sync_process(): """同期実行""" print("\n" + "="*30) print("🔄 Googleドキュメント同期プロセスを開始...") try: # 分割同期に対応したメイン関数を実行 sync_docs.main_sync() except Exception as e: print(f"❌ 重大なエラー: {e}") print("="*30 + "\n") print(f"監視中... 対象: {sync_docs.SOURCE_FOLDER_PATH}") if __name__ == "__main__": target_path = sync_docs.SOURCE_FOLDER_PATH if not os.path.exists(target_path): print(f"❌ エラー: 監視対象のフォルダが見つかりません: {target_path}") sys.exit(1) event_handler = ChangeHandler() observer = Observer() observer.schedule(event_handler, target_path, recursive=True) observer.start() print(f"🚀 監視を開始しました: {target_path}") print(f"プログラム場所: {os.getcwd()}") print("終了するには 'Ctrl + C' を押してください。") try: last_triggered = 0 while True: time.sleep(1) # 変更検知後に少し待ってから実行(cooldown) if event_handler.last_modified > last_triggered: if time.time() - event_handler.last_modified > event_handler.cooldown_seconds: run_sync_process() last_triggered = time.time() except KeyboardInterrupt: observer.stop() print("\n🛑 終了します") observer.join() ``` /Users/xxxxxxxxm1/Library/LaunchAgents/com.xxxxxxxx.docsupdater.plist ``` Label com.xxxxxxxx.docsupdater ProgramArguments /usr/bin/python3 -u /Users/xxxxxxxxm1/python_scripts/watcher.py WorkingDirectory /Users/xxxxxxxxm1/python_scripts RunAtLoad StandardOutPath /tmp/docsupdater.out StandardErrorPath /tmp/docsupdater.err ```