# [メタ情報]
# 識別子: XXXXXXブログgem作成パイプライン_exe
# 補足:
# [/メタ情報]
要約:
このテキスト群は、XXXXXX.comのブログ記事を自動的に取得、加工、そして複数のプラットフォームへ同期させる一連のシステムを記述しています。
メインの`update_all.sh`スクリプトは、WordPress APIから記事のID、タイトル、更新日、本文などを取得(`XXXXXX_full_auto.py`)し、不要なHTMLタグを除去した「ラベル付きブロック形式」の統合TXTデータ(`XXXXXX_clean_database.txt`)を作成(`docx_to_clean_csv.py`)後、Googleドライブへアップロードします。
さらに、`split_db.py`は、この統合TXTを更新日とIDでソートし、指定文字数ごとに最大10個のローカル分割ファイルとして出力します。また、`sync_blog_docs.py`は同様の処理を行いながら、ソートされた記事データを直接10個の指定されたGoogleドキュメントに上書き同期します。
これらの自動化処理は、macOSのLaunchAgent設定ファイル(`com.XXXXXX.updateall.plist`と`com.XXXXXX.syncblog.plist`)により、それぞれ毎日午前4時と午前5時に自動実行されるようにスケジュールされており、ブログ記事データの継続的な収集、整理、複数サービスへの反映を目的としています。
/Users/XXXXXX/python_scripts/update_all.sh
update_all.sh
```
#!/bin/bash
# ==========================================
# 【設定】.env からパスを読み込む
# ==========================================
export $(grep -v '^#' /Users/XXXXXX/python_scripts/.env | xargs)
DRIVE_PATH="$XXXXXX_BLOG_DRIVE_PATH"
# ==========================================
# 処理開始
# ==========================================
cd /Users/XXXXXX/python_scripts
echo "--- ブログ記事の取得 (XXXXXX_full_auto.py) ---"
/usr/bin/python3 XXXXXX_full_auto.py
echo "--- 統合TXTデータの作成 (docx_to_clean_csv.py) ---"
/usr/bin/python3 docx_to_clean_csv.py
echo "--- Googleドライブへのアップロード ---"
# ★ ここを修正:csv ではなく txt に変更
DB_FILE="/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt"
# フォルダがあるか確認してからコピー
if [ -d "$DRIVE_PATH" ]; then
cp "$DB_FILE" "$DRIVE_PATH/"
echo "✅ 成功!Googleドライブに最新の統合TXTを配置しました。"
echo "保存先: $DRIVE_PATH"
else
echo "❌ エラー: 指定されたGoogleドライブの場所が見つかりません。"
echo "設定したパス: $DRIVE_PATH"
fi
echo "--- 全処理完了 ---"
```
/Users/XXXXXX/python_scripts/XXXXXX_full_auto.py
XXXXXX_full_auto.py
```
import os
import requests
import re
import time
# ==========================================
# 【修正・確実版】XXXXXX_full_auto.py
# ==========================================
SITE_URL = "https://XXXXXX.com"
SAVE_DIR = os.path.expanduser('~/XXXXXX_blog_data/articles')
def sync_articles_with_true_date():
api_url = f"{SITE_URL}/wp-json/wp/v2/posts"
page = 1
total_synced = 0
if not os.path.exists(SAVE_DIR):
os.makedirs(SAVE_DIR)
print("🚀 サーバーから『真の更新日』を取得中...")
while True:
try:
# サーバー負荷を抑えるため20件ずつ
params = {'per_page': 20, 'page': page}
response = requests.get(api_url, params=params, timeout=60)
if response.status_code != 200:
if response.status_code == 400: # ページがなくなったら終了
break
print(f"\n⚠️ サーバーエラー: {response.status_code}")
break
except Exception as e:
print(f"\n⚠️ 接続エラー: {e}")
break
posts = response.json()
if not posts:
break
for post in posts:
post_id = post['id']
# --- 新しい書き方(エラーが起きにくい方式) ---
# modified の先頭10文字 (2026-05-12) を取って、ハイフンを点に変える
true_update_date = post['modified'][:10].replace('-', '.')
pub_date = post['date'][:10].replace('-', '.')
# ------------------------------------------
title = post['title']['rendered']
safe_title = re.sub(r'[\\/:*?"<>|]', '_', title)
# ファイル名は [ID-番号] 形式
filename = f"[ID-{post_id}] {safe_title}.txt"
filepath = os.path.join(SAVE_DIR, filename)
lines = [
f"タイトル: {title}",
f"[ID-{post_id}]",
f"更新日: {true_update_date}",
f"公開日: {pub_date}",
f"URL: {post['link']}",
"-" * 30,
post['content']['rendered']
]
with open(filepath, 'w', encoding='utf-8') as f:
f.write("\n".join(lines))
total_synced += 1
print(f"✅ {page}ページ目(累計 {total_synced}件)を取得完了...", end='\r')
page += 1
time.sleep(0.5) # サーバーをいたわる休憩
if total_synced > 0:
print(f"\n\n✨ 成功!合計 {total_synced} 件のファイルを『正確な更新日』で同期しました。")
else:
print("\n\n🤔 取得できませんでした。")
if __name__ == "__main__":
sync_articles_with_true_date()
```
/Users/XXXXXX/python_scripts/check_dates.py
```
import requests
from bs4 import BeautifulSoup
import re
import time
# URLの末尾に「?t=現在時刻」をつけて、サーバーキャッシュを100%貫通します
target_url = "https://XXXXXX.com/2026/04/18/28865/"
buster_url = f"{target_url}?t={time.time()}"
print(f"🧨 サーバーキャッシュ強制貫通テストを開始します...\n")
print(f"アクセス先: {buster_url}\n")
try:
res = requests.get(buster_url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10)
res.encoding = 'utf-8'
soup = BeautifulSoup(res.text, 'html.parser')
print(" [メタデータ(検索エンジン用)]")
metas = soup.find_all('meta', property=re.compile(r'(published_time|modified_time)'))
for m in metas:
print(f" - {m.get('property')}: {m.get('content')}")
except Exception as e:
print(f" エラーが発生しました: {e}")
```
/Users/XXXXXX/python_scripts/docx_to_clean_csv.py
docx_to_clean_csv.py
生成しているのはtxtファイル名なので後日、ファイル名を変える予定
```
import os
import re
from datetime import datetime
# --- 設定 ---
SOURCE_DIR = '/Users/XXXXXX/XXXXXX_blog_data/articles'
OUTPUT_FILE = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt'
def main():
if not os.path.exists(SOURCE_DIR):
print(f"❌ フォルダが見つかりません: {SOURCE_DIR}")
return
files = sorted([f for f in os.listdir(SOURCE_DIR) if f.endswith('.txt')])
print(f"🚀 {len(files)}件を「ラベル付きブロック形式」で統合中...")
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out:
for filename in files:
try:
with open(os.path.join(SOURCE_DIR, filename), 'r', encoding='utf-8') as f_in:
content = f_in.read()
# --- 項目抽出(f-stringの外で計算することでエラーを回避) ---
# ID抽出
id_match = re.search(r'\[ID-(.*?)\]', filename)
id_val = id_match.group(1) if id_match else 'N/A'
# タイトル抽出(ここで計算しておく)
# 正規表現 \[ID-.*?\] を使ってファイル名からIDタグを消す
clean_title = re.sub(r'\[ID-.*?\]', '', filename).replace('.txt', '').strip()
# 更新日・公開日
u_match = re.search(r'更新日:\s*([\d\.]+)', content)
u_date = u_match.group(1) if u_match else 'N/A'
p_match = re.search(r'公開日:\s*([\d\.]+)', content)
p_date = p_match.group(1) if p_match else 'N/A'
# URL
url_match = re.search(r'URL:\s*(https?://[^\s\n]+)', content)
url_val = url_match.group(1) if url_match else 'N/A'
# 本文抽出(線より後)
if "------------------------------" in content:
# 分割を1回に制限し、最初の区切り線より後ろをすべて取得する
parts = content.split("------------------------------", 1)
pure_body = parts[1].strip() if len(parts) > 1 else content
else:
pure_body = content
# --- 新規追加: 不要なタグのクリーニング処理 ---
# を中身ごと削除(大文字小文字区別なし、改行をまたぐ)
pure_body = re.sub(r'.*?', '', pure_body, flags=re.DOTALL | re.IGNORECASE)
# を中身ごと削除
pure_body = re.sub(r'.*?', '', pure_body, flags=re.DOTALL | re.IGNORECASE)
# --- 書き出し(f-stringの中身をシンプルに) ---
f_out.write("[[[ARTICLE_START]]]\n")
f_out.write(f"ID:[ID-{id_val}]\n")
f_out.write(f"TITLE:{clean_title}\n")
f_out.write(f"UPDATE:{u_date}\n")
f_out.write(f"POST:{p_date}\n")
f_out.write(f"URL:{url_val}\n")
f_out.write("BODY_START:\n")
f_out.write(pure_body + "\n")
f_out.write("[[[ARTICLE_END]]]\n\n")
except Exception as e:
print(f"⚠️ スキップ: {filename} ({e})")
print(f"✅ 統合完了: {OUTPUT_FILE}")
if __name__ == "__main__":
main()
```
/Users/XXXXXX/python_scripts/split_db.py
```
import re
import os
# --- 設定 ---
# 読み込む元ファイル(M1 Macのフルパスを指定)
input_file = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt'
# 出力するファイル名のベース(M1 Macのフルパスを指定)
output_prefix = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_DB_Part'
# 1ファイルあたりの目安文字数(約26万文字に設定)
chars_per_file = 260000
# 生成する最大ファイル数(公開スクリプトgemの仕様に統一)
MAX_FILES = 10
def process_database():
print("データの読み込みと解析を開始します...")
# 1. ファイルの読み込み
try:
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
except FileNotFoundError:
print(f"エラー: '{input_file}' が見つかりません。パスが正しいか確認してください。")
return
# 2. 記事ブロックの抽出
pattern = r'(\[\[\[ARTICLE_START\]\]\].*?\[\[\[ARTICLE_END\]\]\])'
blocks = re.findall(pattern, content, re.DOTALL)
articles = []
for block in blocks:
update_match = re.search(r'UPDATE:\s*([0-9\.]+)', block)
id_match = re.search(r'ID:\[.*?([0-9]+)\]', block)
update_val = update_match.group(1) if update_match else "0000.00.00"
id_val = int(id_match.group(1)) if id_match else 0
articles.append({
'update': update_val,
'id': id_val,
'content': block
})
# 3. ソート(並び替え)
articles.sort(key=lambda x: (x['update'], x['id']), reverse=True)
# 4. 目次番号の付与とファイル分割
current_file_index = 1
current_char_count = 0
current_file_content = []
for i, article in enumerate(articles):
index_no = i + 1
index_str = f"INDEX: No.{index_no:03}\n"
modified_content = re.sub(r'(ID:\[.*?\])', rf'{index_str}\1', article['content'], count=1)
current_file_content.append(modified_content)
current_char_count += len(modified_content)
# 5. 文字数が目安を超え、かつMAX_FILES未満なら新しいファイルへ
if current_char_count >= chars_per_file and current_file_index < MAX_FILES:
write_file(current_file_index, current_file_content)
current_file_index += 1
current_char_count = 0
current_file_content = []
# 6. 余った最後のブロックを書き出し
if current_file_content:
write_file(current_file_index, current_file_content)
current_file_index += 1
# 7. MAX_FILES(10個)に満たない場合は、空のファイルを生成して埋める
while current_file_index <= MAX_FILES:
write_file(current_file_index, ["(このドキュメントに割り当てられるファイルはありません)"])
current_file_index += 1
print(f"\n🎉 完了しました!仕様通り、全 {MAX_FILES} 個のファイルを出力しました。")
def write_file(index, content_list):
filename = f"{output_prefix}{index}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write("\n\n".join(content_list))
char_len = sum(len(c) for c in content_list)
if content_list == ["(このドキュメントに割り当てられるファイルはありません)"]:
print(f" -> {filename} を作成しました(※空ファイルとして処理)")
else:
print(f" -> {filename} を作成しました(約 {char_len:,} 文字)")
if __name__ == "__main__":
process_database()
```
/Users/XXXXXX/python_scripts/sync_blog_docs.py
```
import os
import re
import socket
from googleapiclient.discovery import build
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
# --- 設定エリア ---
socket.setdefaulttimeout(600)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# 既存のAPI認証ファイルを再利用
CLIENT_SECRET_FILE = os.path.join(SCRIPT_DIR, 'credentials.json')
TOKEN_FILE = os.path.join(SCRIPT_DIR, 'token.json')
SCOPES = ['https://www.googleapis.com/auth/documents']
# 読み込む元データ
INPUT_FILE = '/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.txt'
CHARS_PER_FILE = 260000 # 1ドキュメントあたりの上限目安文字数
# ご提示いただいた10個のブログ用ドキュメントID
TARGET_DOC_IDS = [
"[Part1のID]", # Part 1
"[Part2のID]", # Part 2
"[Part3のID]", # Part 3
"[Part4のID]", # Part 4
"[Part5のID]", # Part 5
"[Part6のID]", # Part 6
"[Part7のID]", # Part 7
"[Part8のID]", # Part 8
"[Part9のID]", # Part 9
"[Part10のID]" # Part 10
]
def get_credentials():
creds = None
if os.path.exists(TOKEN_FILE):
creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
print("🔄 認証トークンを更新中...")
creds.refresh(Request())
else:
print("🌍 ブラウザを開いてログインしてください...")
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES)
creds = flow.run_local_server(port=0)
with open(TOKEN_FILE, 'w') as token:
token.write(creds.to_json())
return creds
def update_google_doc(service, target_doc_id, text_content):
short_id = target_doc_id[-5:]
print(f" -> Google Docs APIに接続中... (ID: ...{short_id})")
try:
doc = service.documents().get(documentId=target_doc_id, fields="body(content(endIndex))").execute()
content = doc.get('body').get('content')
end_index = content[-1]['endIndex'] - 1 if content else 1
requests = []
if end_index > 1:
requests.append({'deleteContentRange': {'range': {'startIndex': 1, 'endIndex': end_index}}})
requests.append({'insertText': {'location': {'index': 1}, 'text': text_content}})
service.documents().batchUpdate(documentId=target_doc_id, body={'requests': requests}).execute()
print(f" ✅ 更新完了 (ID: ...{short_id})")
except Exception as e:
print(f" ❌ APIエラー (ID: ...{short_id}): {e}")
def process_and_sync():
print("データの読み込みと解析を開始します...")
try:
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
content = f.read()
except FileNotFoundError:
print(f"エラー: '{INPUT_FILE}' が見つかりません。")
return
# 記事ブロックの抽出
pattern = r'(\[\[\[ARTICLE_START\]\]\].*?\[\[\[ARTICLE_END\]\]\])'
blocks = re.findall(pattern, content, re.DOTALL)
articles = []
for block in blocks:
update_match = re.search(r'UPDATE:\s*([0-9\.]+)', block)
id_match = re.search(r'ID:\[.*?([0-9]+)\]', block)
articles.append({
'update': update_match.group(1) if update_match else "0000.00.00",
'id': int(id_match.group(1)) if id_match else 0,
'content': block
})
# ソート(UPDATE降順 > ID降順)
articles.sort(key=lambda x: (x['update'], x['id']), reverse=True)
# API接続準備
creds = get_credentials()
service = build('docs', 'v1', credentials=creds)
# 分割と書き込み
current_doc_index = 0
current_buffer = []
current_char_count = 0
overall_count = 1
print("\nGoogleドキュメントへの同期を開始します...")
for art in articles:
index_str = f"INDEX: No.{overall_count:03}\n"
modified_content = re.sub(r'(ID:\[.*?\])', rf'{index_str}\1', art['content'], count=1)
overall_count += 1
if (current_char_count + len(modified_content) > CHARS_PER_FILE) and (current_doc_index < 9):
text_to_write = "\n\n".join(current_buffer)
print(f"\n--- Doc #{current_doc_index + 1} の更新 ---")
update_google_doc(service, TARGET_DOC_IDS[current_doc_index], text_to_write)
current_doc_index += 1
current_buffer = []
current_char_count = 0
current_buffer.append(modified_content)
current_char_count += len(modified_content)
# 残りのデータ書き込み
print(f"\n--- Doc #{current_doc_index + 1} の更新 ---")
update_google_doc(service, TARGET_DOC_IDS[current_doc_index], "\n\n".join(current_buffer) or "(このドキュメントに割り当てられるファイルはありません)")
current_doc_index += 1
# 余った枠を空ファイルとして処理
for i in range(current_doc_index, 10):
print(f"\n--- Doc #{i + 1} の更新 (空ファイル) ---")
update_google_doc(service, TARGET_DOC_IDS[i], "(このドキュメントに割り当てられるファイルはありません)")
print("\n🎉 すべての同期が完了しました!")
if __name__ == "__main__":
process_and_sync()
```
/Users/XXXXXX/Library/LaunchAgents/com.XXXXXX.updateall.plist
```
Label
com.XXXXXX.updateall
ProgramArguments
/bin/bash
/Users/XXXXXX/python_scripts/update_all.sh
StartCalendarInterval
Hour
4
Minute
0
StandardOutPath
/Users/XXXXXX/Library/Logs/update_all.log
StandardErrorPath
/Users/XXXXXX/Library/Logs/update_all.err
```
/Users/XXXXXX/Library/LaunchAgents/com.XXXXXX.syncblog.plist
```
Label
com.XXXXXX.syncblog
ProgramArguments
/usr/bin/python3
-u
/Users/XXXXXX/python_scripts/sync_blog_docs.py
StartCalendarInterval
Hour
5
Minute
0
StandardOutPath
/Users/XXXXXX/Library/Logs/sync_blog_docs.log
StandardErrorPath
/Users/XXXXXX/Library/Logs/sync_blog_docs.err
```