# [メタ情報] # 識別子: 備忘録のnotebookLMほか全自動同期パイプライン_exe # 補足: # [/メタ情報] 要約: このシステムは、ローカルの指定フォルダにあるテキストファイルをGoogleドキュメントへ自動同期するものです。主要なコンポーネントは以下の通りです。 1. `sync_note_sum.py`: ローカルフォルダ内の`.txt`ファイルを更新日時順に取得し、複数のGoogleドキュメントに目次付きで分割して同期(上書き)します。Google Docs APIを利用し、認証情報を管理しながら実行します。 2. `watcher_note_sum.py`: `sync_note_sum.py`と連携し、監視対象フォルダ内の`.txt`ファイルの変更をリアルタイムで検知します。変更後、5秒のクールダウン期間を経て`sync_note_sum.py`を自動実行し、Googleドキュメントの同期をトリガーします。これにより、ファイル更新時に自動で同期が行われます。 3. `com.XXXXXX.notesum-updater.plist`: macOSのLaunchAgents設定ファイルで、ユーザーログイン時に`watcher_note_sum.py`を自動的にバックグラウンドで起動・常駐させます。これにより、システムは常に稼働し、ローカルのメモをGoogleドキュメントに最新の状態に保つ自動化された環境を提供します。 /Users/XXXXXX/python_scripts/sync_note_sum.py ``` import os import os.path import unicodedata import socket from googleapiclient.discovery import build from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from dotenv import load_dotenv # デフォルトのタイムアウト設定(10分) socket.setdefaulttimeout(600) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # ================= 設定エリア ================= load_dotenv("/Users/XXXXXX/python_scripts/.env") SOURCE_FOLDER_PATH = "/Users/XXXXXX/note_sum/" CLIENT_SECRET_FILE = os.path.join(SCRIPT_DIR, 'credentials.json') TOKEN_FILE = os.path.join(SCRIPT_DIR, 'token.json') # 【設定】書き込み先のGoogleドキュメントIDリスト(.envから読み込み) TARGET_DOC_IDS = [ os.environ.get("NOTE_SUM_DOC_ID_1"), os.environ.get("NOTE_SUM_DOC_ID_2"), os.environ.get("NOTE_SUM_DOC_ID_3"), os.environ.get("NOTE_SUM_DOC_ID_4"), os.environ.get("NOTE_SUM_DOC_ID_5"), os.environ.get("NOTE_SUM_DOC_ID_6"), os.environ.get("NOTE_SUM_DOC_ID_7"), os.environ.get("NOTE_SUM_DOC_ID_8"), os.environ.get("NOTE_SUM_DOC_ID_9"), os.environ.get("NOTE_SUM_DOC_ID_10") ] # 1つのドキュメントに含めるファイル数 FILES_PER_DOC = 30 # 対象とする拡張子のリスト(.txt) TARGET_SUFFIXES = ['.txt'] # 除外ファイル EXCLUDE_FILES = ['.DS_Store'] EXCLUDE_DIRS = ['.git', '__pycache__', '.idea', 'venv', 'node_modules'] SCOPES = ['https://www.googleapis.com/auth/documents'] # ============================================ def get_sorted_files_list(root_folder): """ ディレクトリを探索し、対象ファイル(*.txt)の情報をリスト化して返す(更新日時降順) """ collected_files = [] skipped_files = 0 print(f"📂 探索開始: {root_folder}") if not os.path.exists(root_folder): print(f" [エラー] フォルダが存在しません: {root_folder}") return collected_files, skipped_files for root, dirs, files in os.walk(root_folder): dirs[:] = [d for d in dirs if d not in EXCLUDE_DIRS] for file_name in files: # 除外ファイルチェック if file_name in EXCLUDE_FILES: skipped_files += 1 continue # いずれかの対象サフィックスにマッチするかチェック if not any(file_name.endswith(suffix) for suffix in TARGET_SUFFIXES): continue file_path = os.path.join(root, file_name) relative_path = os.path.relpath(file_path, root_folder) normalized_path = unicodedata.normalize('NFC', relative_path) try: mtime = os.path.getmtime(file_path) with open(file_path, 'r', encoding='utf-8') as f: content = f.read() collected_files.append({ 'path': normalized_path, 'content': content, 'mtime': mtime }) except Exception as e: print(f" [エラー] 読み込み失敗: {relative_path} ({e})") # 更新日時(mtime)の新しい順(降順)に並び替え collected_files.sort(key=lambda x: x['mtime'], reverse=True) return collected_files, skipped_files def format_text_for_chunk(file_chunk, start_index): """ 分割されたファイルリストから、目次付きテキストを生成する """ if not file_chunk: return "" toc_lines = ["[目次]"] for i, file_data in enumerate(file_chunk, start_index): toc_lines.append(f"{i}. {file_data['path']}") toc_text = "\n".join(toc_lines) body_text_parts = [] separator_line = "=" * 80 for file_data in file_chunk: header = f"\n{separator_line}\n\n" header += f"=== {file_data['path']} ===\n\n" body_text_parts.append(header + file_data['content'] + "\n") return toc_text + "\n\n" + "".join(body_text_parts) def get_credentials(): creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: print("🔄 認証トークンを更新中...") creds.refresh(Request()) else: print("🌍 ブラウザを開いてログインしてください...") if not os.path.exists(CLIENT_SECRET_FILE): raise FileNotFoundError(f"エラー: {CLIENT_SECRET_FILE} が見つかりません。") flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) creds = flow.run_local_server(port=0) with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) return creds def update_google_doc(target_doc_id, text_content): """ 指定されたIDのドキュメントを更新する """ if not target_doc_id: print("⚠️ ドキュメントIDが設定されていないため、スキップします。") return short_id = target_doc_id[-5:] print(f"Google Docs APIに接続中... (ID: ...{short_id})") if not text_content: text_content = "(このドキュメントに割り当てられるファイルはありません)" try: creds = get_credentials() service = build('docs', 'v1', credentials=creds) doc = service.documents().get( documentId=target_doc_id, fields="body(content(endIndex))" ).execute() content = doc.get('body').get('content') if not content: end_index = 1 else: end_index = content[-1]['endIndex'] - 1 requests = [] if end_index > 1: requests.append({'deleteContentRange': {'range': {'startIndex': 1, 'endIndex': end_index}}}) requests.append({'insertText': {'location': {'index': 1}, 'text': text_content}}) service.documents().batchUpdate(documentId=target_doc_id, body={'requests': requests}).execute() print(f"✅ 更新完了 (ID: ...{short_id})") except Exception as e: print(f"❌ APIエラー (ID: ...{short_id}): {e}") def main_sync(): """メイン同期処理""" all_files, skipped = get_sorted_files_list(SOURCE_FOLDER_PATH) total_files = len(all_files) print("-" * 30) print(f"読み込み成功: {total_files} ファイル") print(f"除外スキップ: {skipped} ファイル") for i, doc_id in enumerate(TARGET_DOC_IDS): start_idx = i * FILES_PER_DOC end_idx = start_idx + FILES_PER_DOC chunk_files = all_files[start_idx:end_idx] toc_start_num = start_idx + 1 file_count_in_chunk = len(chunk_files) if file_count_in_chunk > 0: current_range = f"{toc_start_num} - {start_idx + file_count_in_chunk}" else: current_range = "データなし・待機中" print(f"\n--- Doc #{i+1} (Files {current_range}) ---") chunk_text = format_text_for_chunk(chunk_files, toc_start_num) update_google_doc(doc_id, chunk_text) if total_files > len(TARGET_DOC_IDS) * FILES_PER_DOC: limit = len(TARGET_DOC_IDS) * FILES_PER_DOC print(f"\n⚠️ 注意: ファイル数が上限({limit})を超えています。残りのファイルは同期されません。") if __name__ == "__main__": main_sync() ``` /Users/XXXXXX/python_scripts/watcher_note_sum.py ``` import time import sys import os import unicodedata from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler sys.path.append(os.path.dirname(os.path.abspath(__file__))) import sync_note_sum class ChangeHandler(FileSystemEventHandler): """変更を検知するクラス""" def __init__(self): self.last_modified = time.time() # 確実にエディタの保存(ディスク書き込み)が完了するよう、待機を5秒に設定 self.cooldown_seconds = 5 self.file_last_seen = {} def process_event(self, src_path): """イベントの共通処理""" filename = os.path.basename(src_path) # 隠しファイルやスクリプト自身は無視 if filename.startswith('.') or filename in ['watcher_note_sum.py', 'sync_note_sum.py']: return normalized_name = unicodedata.normalize('NFC', filename) if normalized_name in sync_note_sum.EXCLUDE_FILES: return # 対象のサフィックス(.txt)を持っている場合のみトリガー if any(normalized_name.endswith(suffix) for suffix in sync_note_sum.TARGET_SUFFIXES): current_time = time.time() last_seen = self.file_last_seen.get(src_path, 0) # 連射ガード(10秒) if current_time - last_seen < 10: return self.file_last_seen[src_path] = current_time print(f"👀 検知: {filename} の変更を捕捉しました(5秒後に同期を開始します)") self.last_modified = time.time() def on_modified(self, event): if not event.is_directory: self.process_event(event.src_path) def on_created(self, event): if not event.is_directory: self.process_event(event.src_path) def on_moved(self, event): if not event.is_directory: self.process_event(event.dest_path) def run_sync_process(): """同期実行""" print("\n" + "="*30) print("🔄 Googleドキュメント同期プロセスを開始...") try: # スクリプトのキャッシュをクリアして確実に最新のファイルを読み込ませる import importlib importlib.reload(sync_note_sum) sync_note_sum.main_sync() except Exception as e: print(f"❌ 重大エラー: {e}") print("="*30 + "\n") print(f"監視中... 対象: {sync_note_sum.SOURCE_FOLDER_PATH}") if __name__ == "__main__": target_path = sync_note_sum.SOURCE_FOLDER_PATH if not os.path.exists(target_path): print(f"❌ エラー: 監視対象のフォルダが見つかりません: {target_path}") sys.exit(1) event_handler = ChangeHandler() observer = Observer() observer.schedule(event_handler, target_path, recursive=True) observer.start() print(f"🚀 監視を開始しました: {target_path}") print("終了するには 'Ctrl + C' を押してください。") try: last_triggered = 0 while True: time.sleep(1) if event_handler.last_modified > last_triggered: # 最後に変更を検知してから指定秒数(5秒)以上経過したら実行 if time.time() - event_handler.last_modified > event_handler.cooldown_seconds: run_sync_process() last_triggered = time.time() except KeyboardInterrupt: observer.stop() print("\n🛑 終了します") observer.join() ``` /Users/XXXXXX/Library/LaunchAgents/com.XXXXXX.notesum-updater.plist ``` Label com.XXXXXX.notesum-updater ProgramArguments /usr/bin/python3 -u /Users/XXXXXX/python_scripts/watcher_note_sum.py WorkingDirectory /Users/XXXXXX/python_scripts RunAtLoad StandardOutPath /tmp/notesum-updater.out StandardErrorPath /tmp/notesum-updater.err ```