# [メタ情報] # 識別子: XXXXXXブログgem作成パイプライン_exe # 補足: # [/メタ情報] 要約: このテキスト群は、XXXXXX.comのブログ記事を自動的に取得、加工、そして複数のプラットフォームへ同期させる一連のシステムを記述しています。 メインの`update_all.sh`スクリプトは、WordPress APIから記事のID、タイトル、更新日、本文などを取得(`XXXXXX_full_auto.py`)し、不要なHTMLタグを除去した「ラベル付きブロック形式」の統合TXTデータ(`XXXXXX_clean_database.txt`)を作成(`docx_to_clean_csv.py`)後、Googleドライブへアップロードします。 さらに、`split_db.py`は、この統合TXTを更新日とIDでソートし、指定文字数ごとに最大10個のローカル分割ファイルとして出力します。また、`sync_blog_docs.py`は同様の処理を行いながら、ソートされた記事データを直接10個の指定されたGoogleドキュメントに上書き同期します。 これらの自動化処理は、macOSのLaunchAgent設定ファイル(`com.XXXXXX.updateall.plist`と`com.XXXXXX.syncblog.plist`)により、それぞれ毎日午前4時と午前5時に自動実行されるようにスケジュールされており、ブログ記事データの継続的な収集、整理、複数サービスへの反映を目的としています。 /Users/XXXXXX/python_scripts/update_all.sh update_all.sh ``` #!/bin/bash # ========================================== # 【設定】.env からパスを読み込む # ========================================== export $(grep -v '^#' /Users/XXXXXX/python_scripts/.env | xargs) DRIVE_PATH="$XXXXXX_BLOG_DRIVE_PATH" # ========================================== # 処理開始 # ========================================== cd /Users/XXXXXX/python_scripts echo "--- ブログ記事の取得 (XXXXXX_full_auto.py) ---" /usr/bin/python3 XXXXXX_full_auto.py echo "--- 統合TXTデータの作成 (docx_to_clean_csv.py) ---" /usr/bin/python3 docx_to_clean_csv.py echo "--- Googleドライブへのアップロード ---" # ★ ここを修正:csv ではなく txt に変更 DB_FILE="/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt" # フォルダがあるか確認してからコピー if [ -d "$DRIVE_PATH" ]; then cp "$DB_FILE" "$DRIVE_PATH/" echo "✅ 成功!Googleドライブに最新の統合TXTを配置しました。" echo "保存先: $DRIVE_PATH" else echo "❌ エラー: 指定されたGoogleドライブの場所が見つかりません。" echo "設定したパス: $DRIVE_PATH" fi echo "--- 全処理完了 ---" ``` /Users/XXXXXX/python_scripts/XXXXXX_full_auto.py XXXXXX_full_auto.py ``` import os import requests import re import time # ========================================== # 【修正・確実版】XXXXXX_full_auto.py # ========================================== SITE_URL = "https://XXXXXX.com" SAVE_DIR = os.path.expanduser('~/XXXXXX_blog_data/articles') def sync_articles_with_true_date(): api_url = f"{SITE_URL}/wp-json/wp/v2/posts" page = 1 total_synced = 0 if not os.path.exists(SAVE_DIR): os.makedirs(SAVE_DIR) print("🚀 サーバーから『真の更新日』を取得中...") while True: try: # サーバー負荷を抑えるため20件ずつ params = {'per_page': 20, 'page': page} response = requests.get(api_url, params=params, timeout=60) if response.status_code != 200: if response.status_code == 400: # ページがなくなったら終了 break print(f"\n⚠️ サーバーエラー: {response.status_code}") break except Exception as e: print(f"\n⚠️ 接続エラー: {e}") break posts = response.json() if not posts: break for post in posts: post_id = post['id'] # --- 新しい書き方(エラーが起きにくい方式) --- # modified の先頭10文字 (2026-05-12) を取って、ハイフンを点に変える true_update_date = post['modified'][:10].replace('-', '.') pub_date = post['date'][:10].replace('-', '.') # ------------------------------------------ title = post['title']['rendered'] safe_title = re.sub(r'[\\/:*?"<>|]', '_', title) # ファイル名は [ID-番号] 形式 filename = f"[ID-{post_id}] {safe_title}.txt" filepath = os.path.join(SAVE_DIR, filename) lines = [ f"タイトル: {title}", f"[ID-{post_id}]", f"更新日: {true_update_date}", f"公開日: {pub_date}", f"URL: {post['link']}", "-" * 30, post['content']['rendered'] ] with open(filepath, 'w', encoding='utf-8') as f: f.write("\n".join(lines)) total_synced += 1 print(f"✅ {page}ページ目(累計 {total_synced}件)を取得完了...", end='\r') page += 1 time.sleep(0.5) # サーバーをいたわる休憩 if total_synced > 0: print(f"\n\n✨ 成功!合計 {total_synced} 件のファイルを『正確な更新日』で同期しました。") else: print("\n\n🤔 取得できませんでした。") if __name__ == "__main__": sync_articles_with_true_date() ``` /Users/XXXXXX/python_scripts/check_dates.py ``` import requests from bs4 import BeautifulSoup import re import time # URLの末尾に「?t=現在時刻」をつけて、サーバーキャッシュを100%貫通します target_url = "https://XXXXXX.com/2026/04/18/28865/" buster_url = f"{target_url}?t={time.time()}" print(f"🧨 サーバーキャッシュ強制貫通テストを開始します...\n") print(f"アクセス先: {buster_url}\n") try: res = requests.get(buster_url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10) res.encoding = 'utf-8' soup = BeautifulSoup(res.text, 'html.parser') print(" [メタデータ(検索エンジン用)]") metas = soup.find_all('meta', property=re.compile(r'(published_time|modified_time)')) for m in metas: print(f" - {m.get('property')}: {m.get('content')}") except Exception as e: print(f" エラーが発生しました: {e}") ``` /Users/XXXXXX/python_scripts/docx_to_clean_csv.py docx_to_clean_csv.py 生成しているのはtxtファイル名なので後日、ファイル名を変える予定 ``` import os import re from datetime import datetime # --- 設定 --- SOURCE_DIR = '/Users/XXXXXX/XXXXXX_blog_data/articles' OUTPUT_FILE = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt' def main(): if not os.path.exists(SOURCE_DIR): print(f"❌ フォルダが見つかりません: {SOURCE_DIR}") return files = sorted([f for f in os.listdir(SOURCE_DIR) if f.endswith('.txt')]) print(f"🚀 {len(files)}件を「ラベル付きブロック形式」で統合中...") with open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out: for filename in files: try: with open(os.path.join(SOURCE_DIR, filename), 'r', encoding='utf-8') as f_in: content = f_in.read() # --- 項目抽出(f-stringの外で計算することでエラーを回避) --- # ID抽出 id_match = re.search(r'\[ID-(.*?)\]', filename) id_val = id_match.group(1) if id_match else 'N/A' # タイトル抽出(ここで計算しておく) # 正規表現 \[ID-.*?\] を使ってファイル名からIDタグを消す clean_title = re.sub(r'\[ID-.*?\]', '', filename).replace('.txt', '').strip() # 更新日・公開日 u_match = re.search(r'更新日:\s*([\d\.]+)', content) u_date = u_match.group(1) if u_match else 'N/A' p_match = re.search(r'公開日:\s*([\d\.]+)', content) p_date = p_match.group(1) if p_match else 'N/A' # URL url_match = re.search(r'URL:\s*(https?://[^\s\n]+)', content) url_val = url_match.group(1) if url_match else 'N/A' # 本文抽出(線より後) if "------------------------------" in content: # 分割を1回に制限し、最初の区切り線より後ろをすべて取得する parts = content.split("------------------------------", 1) pure_body = parts[1].strip() if len(parts) > 1 else content else: pure_body = content # --- 新規追加: 不要なタグのクリーニング処理 --- # を中身ごと削除(大文字小文字区別なし、改行をまたぐ) pure_body = re.sub(r'.*?', '', pure_body, flags=re.DOTALL | re.IGNORECASE) # を中身ごと削除 pure_body = re.sub(r'.*?', '', pure_body, flags=re.DOTALL | re.IGNORECASE) # --- 書き出し(f-stringの中身をシンプルに) --- f_out.write("[[[ARTICLE_START]]]\n") f_out.write(f"ID:[ID-{id_val}]\n") f_out.write(f"TITLE:{clean_title}\n") f_out.write(f"UPDATE:{u_date}\n") f_out.write(f"POST:{p_date}\n") f_out.write(f"URL:{url_val}\n") f_out.write("BODY_START:\n") f_out.write(pure_body + "\n") f_out.write("[[[ARTICLE_END]]]\n\n") except Exception as e: print(f"⚠️ スキップ: {filename} ({e})") print(f"✅ 統合完了: {OUTPUT_FILE}") if __name__ == "__main__": main() ``` /Users/XXXXXX/python_scripts/split_db.py ``` import re import os # --- 設定 --- # 読み込む元ファイル(M1 Macのフルパスを指定) input_file = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt' # 出力するファイル名のベース(M1 Macのフルパスを指定) output_prefix = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_DB_Part' # 1ファイルあたりの目安文字数(約26万文字に設定) chars_per_file = 260000 # 生成する最大ファイル数(公開スクリプトgemの仕様に統一) MAX_FILES = 10 def process_database(): print("データの読み込みと解析を開始します...") # 1. ファイルの読み込み try: with open(input_file, 'r', encoding='utf-8') as f: content = f.read() except FileNotFoundError: print(f"エラー: '{input_file}' が見つかりません。パスが正しいか確認してください。") return # 2. 記事ブロックの抽出 pattern = r'(\[\[\[ARTICLE_START\]\]\].*?\[\[\[ARTICLE_END\]\]\])' blocks = re.findall(pattern, content, re.DOTALL) articles = [] for block in blocks: update_match = re.search(r'UPDATE:\s*([0-9\.]+)', block) id_match = re.search(r'ID:\[.*?([0-9]+)\]', block) update_val = update_match.group(1) if update_match else "0000.00.00" id_val = int(id_match.group(1)) if id_match else 0 articles.append({ 'update': update_val, 'id': id_val, 'content': block }) # 3. ソート(並び替え) articles.sort(key=lambda x: (x['update'], x['id']), reverse=True) # 4. 目次番号の付与とファイル分割 current_file_index = 1 current_char_count = 0 current_file_content = [] for i, article in enumerate(articles): index_no = i + 1 index_str = f"INDEX: No.{index_no:03}\n" modified_content = re.sub(r'(ID:\[.*?\])', rf'{index_str}\1', article['content'], count=1) current_file_content.append(modified_content) current_char_count += len(modified_content) # 5. 文字数が目安を超え、かつMAX_FILES未満なら新しいファイルへ if current_char_count >= chars_per_file and current_file_index < MAX_FILES: write_file(current_file_index, current_file_content) current_file_index += 1 current_char_count = 0 current_file_content = [] # 6. 余った最後のブロックを書き出し if current_file_content: write_file(current_file_index, current_file_content) current_file_index += 1 # 7. MAX_FILES(10個)に満たない場合は、空のファイルを生成して埋める while current_file_index <= MAX_FILES: write_file(current_file_index, ["(このドキュメントに割り当てられるファイルはありません)"]) current_file_index += 1 print(f"\n🎉 完了しました!仕様通り、全 {MAX_FILES} 個のファイルを出力しました。") def write_file(index, content_list): filename = f"{output_prefix}{index}.txt" with open(filename, 'w', encoding='utf-8') as f: f.write("\n\n".join(content_list)) char_len = sum(len(c) for c in content_list) if content_list == ["(このドキュメントに割り当てられるファイルはありません)"]: print(f" -> {filename} を作成しました(※空ファイルとして処理)") else: print(f" -> {filename} を作成しました(約 {char_len:,} 文字)") if __name__ == "__main__": process_database() ``` /Users/XXXXXX/python_scripts/sync_blog_docs.py ``` import os import re import socket from googleapiclient.discovery import build from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow # --- 設定エリア --- socket.setdefaulttimeout(600) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # 既存のAPI認証ファイルを再利用 CLIENT_SECRET_FILE = os.path.join(SCRIPT_DIR, 'credentials.json') TOKEN_FILE = os.path.join(SCRIPT_DIR, 'token.json') SCOPES = ['https://www.googleapis.com/auth/documents'] # 読み込む元データ INPUT_FILE = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt' CHARS_PER_FILE = 260000 # 1ドキュメントあたりの上限目安文字数 # ご提示いただいた10個のブログ用ドキュメントID TARGET_DOC_IDS = [ "[Part1のID]", # Part 1 "[Part2のID]", # Part 2 "[Part3のID]", # Part 3 "[Part4のID]", # Part 4 "[Part5のID]", # Part 5 "[Part6のID]", # Part 6 "[Part7のID]", # Part 7 "[Part8のID]", # Part 8 "[Part9のID]", # Part 9 "[Part10のID]" # Part 10 ] def get_credentials(): creds = None if os.path.exists(TOKEN_FILE): creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: print("🔄 認証トークンを更新中...") creds.refresh(Request()) else: print("🌍 ブラウザを開いてログインしてください...") flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) creds = flow.run_local_server(port=0) with open(TOKEN_FILE, 'w') as token: token.write(creds.to_json()) return creds def update_google_doc(service, target_doc_id, text_content): short_id = target_doc_id[-5:] print(f" -> Google Docs APIに接続中... (ID: ...{short_id})") try: doc = service.documents().get(documentId=target_doc_id, fields="body(content(endIndex))").execute() content = doc.get('body').get('content') end_index = content[-1]['endIndex'] - 1 if content else 1 requests = [] if end_index > 1: requests.append({'deleteContentRange': {'range': {'startIndex': 1, 'endIndex': end_index}}}) requests.append({'insertText': {'location': {'index': 1}, 'text': text_content}}) service.documents().batchUpdate(documentId=target_doc_id, body={'requests': requests}).execute() print(f" ✅ 更新完了 (ID: ...{short_id})") except Exception as e: print(f" ❌ APIエラー (ID: ...{short_id}): {e}") def process_and_sync(): print("データの読み込みと解析を開始します...") try: with open(INPUT_FILE, 'r', encoding='utf-8') as f: content = f.read() except FileNotFoundError: print(f"エラー: '{INPUT_FILE}' が見つかりません。") return # 記事ブロックの抽出 pattern = r'(\[\[\[ARTICLE_START\]\]\].*?\[\[\[ARTICLE_END\]\]\])' blocks = re.findall(pattern, content, re.DOTALL) articles = [] for block in blocks: update_match = re.search(r'UPDATE:\s*([0-9\.]+)', block) id_match = re.search(r'ID:\[.*?([0-9]+)\]', block) articles.append({ 'update': update_match.group(1) if update_match else "0000.00.00", 'id': int(id_match.group(1)) if id_match else 0, 'content': block }) # ソート(UPDATE降順 > ID降順) articles.sort(key=lambda x: (x['update'], x['id']), reverse=True) # API接続準備 creds = get_credentials() service = build('docs', 'v1', credentials=creds) # 分割と書き込み current_doc_index = 0 current_buffer = [] current_char_count = 0 overall_count = 1 print("\nGoogleドキュメントへの同期を開始します...") for art in articles: index_str = f"INDEX: No.{overall_count:03}\n" modified_content = re.sub(r'(ID:\[.*?\])', rf'{index_str}\1', art['content'], count=1) overall_count += 1 if (current_char_count + len(modified_content) > CHARS_PER_FILE) and (current_doc_index < 9): text_to_write = "\n\n".join(current_buffer) print(f"\n--- Doc #{current_doc_index + 1} の更新 ---") update_google_doc(service, TARGET_DOC_IDS[current_doc_index], text_to_write) current_doc_index += 1 current_buffer = [] current_char_count = 0 current_buffer.append(modified_content) current_char_count += len(modified_content) # 残りのデータ書き込み print(f"\n--- Doc #{current_doc_index + 1} の更新 ---") update_google_doc(service, TARGET_DOC_IDS[current_doc_index], "\n\n".join(current_buffer) or "(このドキュメントに割り当てられるファイルはありません)") current_doc_index += 1 # 余った枠を空ファイルとして処理 for i in range(current_doc_index, 10): print(f"\n--- Doc #{i + 1} の更新 (空ファイル) ---") update_google_doc(service, TARGET_DOC_IDS[i], "(このドキュメントに割り当てられるファイルはありません)") print("\n🎉 すべての同期が完了しました!") if __name__ == "__main__": process_and_sync() ``` /Users/XXXXXX/Library/LaunchAgents/com.XXXXXX.updateall.plist ``` Label com.XXXXXX.updateall ProgramArguments /bin/bash /Users/XXXXXX/python_scripts/update_all.sh StartCalendarInterval Hour 4 Minute 0 StandardOutPath /Users/XXXXXX/Library/Logs/update_all.log StandardErrorPath /Users/XXXXXX/Library/Logs/update_all.err ``` /Users/XXXXXX/Library/LaunchAgents/com.XXXXXX.syncblog.plist ``` Label com.XXXXXX.syncblog ProgramArguments /usr/bin/python3 -u /Users/XXXXXX/python_scripts/sync_blog_docs.py StartCalendarInterval Hour 5 Minute 0 StandardOutPath /Users/XXXXXX/Library/Logs/sync_blog_docs.log StandardErrorPath /Users/XXXXXX/Library/Logs/sync_blog_docs.err ```