# [メタ情報]# 識別子: XXXXXXブログ全集自動化_exe# システム名: 未分類# 技術種別: Misc# 機能名: Misc# 使用言語: Python ShellScript# 状態: 実行用# [/メタ情報] 要約:このテキストは、cronジョブで毎日自動実行されるブログ記事データ収集・処理システムについて記述しています。主要なプロセスは`update_all.sh`シェルスクリプトによって制御されます。まず、`XXXXXX_full_auto.py`が起動し、指定されたサイトマップから全てのブログ記事URLを探索。各記事のタイトル、URL、本文などを抽出し、DOCX形式でローカルフォルダに保存します。既存ファイルは最新版に更新されます。次に、`docx_to_clean_csv.py`が実行され、これらのDOCXファイルから記事ID、タイトル、取得日、URL、本文、クリーンアップされたトランスクリプトといった情報を抽出し、一つの整形されたCSVファイルとして出力します。CSVの先頭にはシステムの更新日時も記録されます。最終的に、`update_all.sh`は生成されたCSVファイルを、`.env` で設定したGoogleドライブの指定パスへ自動的にアップロードします。このシステムは、ブログ記事データを定期的に自動収集し、加工してGoogleドライブへ同期する、一貫したワークフローを提供します。 Macの「プライバシーとセキュリティ」フルディスクアクセスでcronをオンにすること。 XXXXXX@yyyyyyyy-macminim1 ~ % crontab -l 0 4 * * * /usr/bin/python3 /Users/XXXXXX/python_scripts/XXXXXX_full_auto.py XXXXXX@yyyyyyyy-macminim1 ~ % XXXXXX@yyyyyyyy-macminim1 ~ % echo "30 4 * * * /bin/sh /Users/XXXXXX/python_scripts/update_all.sh" | crontab - XXXXXX@yyyyyyyy-macminim1 ~ % /Users/XXXXXX/python_scripts/update_all.sh update_all.sh ``` #!/bin/bash # ========================================== # 【設定】.env からパスを読み込む # ========================================== # XXXXXX_BLOG_DRIVE_PATH を /Users/XXXXXX/python_scripts/.env から取得 export $(grep -v '^#' /Users/XXXXXX/python_scripts/.env | xargs) DRIVE_PATH="$XXXXXX_BLOG_DRIVE_PATH" # ========================================== # 処理開始 # ========================================== # ★ cron対策:cd も絶対パスにする cd /Users/XXXXXX/python_scripts echo "--- [1] ブログ記事の取得 (XXXXXX_full_auto.py) ---" # ★ cron対策:python3 も絶対パスにする /usr/bin/python3 XXXXXX_full_auto.py echo "--- [2] CSVデータの作成 (docx_to_clean_csv.py) ---" /usr/bin/python3 docx_to_clean_csv.py echo "--- [3] Googleドライブへのアップロード ---" # ★ cron対策:$HOME ではなく絶対パスにする CSV_FILE="/Users/XXXXXX/XXXXXX_blog_data/XXXXXX_clean_database.csv" # フォルダがあるか確認してからコピー if [ -d "$DRIVE_PATH" ]; then cp "$CSV_FILE" "$DRIVE_PATH/" echo "✅ 成功!Googleドライブに最新版を配置しました。" echo "保存先: $DRIVE_PATH" else echo "❌ エラー: 指定されたGoogleドライブの場所が見つかりません。" echo "設定したパス: $DRIVE_PATH" fi echo "--- 全処理完了 ---" ``` /Users/XXXXXX/python_scripts/XXXXXX_full_auto.py XXXXXX_full_auto.py ``` import os import requests import re from datetime import datetime from bs4 import BeautifulSoup from docx import Document # ========================================== # 設定エリア # ========================================== BASE_DIR = os.path.expanduser('~/XXXXXX_blog_data') DOCX_DIR = os.path.join(BASE_DIR, 'docx') # 探索の入り口(親玉サイトマップ) # ※ここからリンクされているものは、ファイル名やドメインを問わず全て見に行きます ROOT_SITEMAP = 'https://XXXXXX.com/sitemap.xml' def setup_folders(): if not os.path.exists(DOCX_DIR): os.makedirs(DOCX_DIR) def sanitize_filename(title): # ファイル名に使えない文字を消す return "".join([c for c in title if c.isalnum() or c in (' ', '-', '_', '【', '】', 'id', 'ID', '(', ')')]).strip() def fetch_all_urls(target_url, visited=None): """ サイトマップ(XML)を読み込み、中にあるタグを全て回収する。 .xmlならさらに潜る(再帰)。それ以外なら記事として扱う。 """ if visited is None: visited = set() if target_url in visited: return set() visited.add(target_url) urls = set() print(f"📡 読込中: {target_url}") try: headers = {'User-Agent': 'Mozilla/5.0'} response = requests.get(target_url, headers=headers, timeout=15) if response.status_code != 200: print(f" ❌ アクセス失敗 ({response.status_code})") return set() # 正規表現で の中身を無条件に引っこ抜く # ドメインチェックも /entry/ チェックもしない(どんなURLでも拾う) raw_locs = re.findall(r'\s*(.*?)\s*', response.text, re.DOTALL) for loc in raw_locs: loc = loc.strip() # 除外条件(画像や動画のサイトマップは無視) if 'image' in loc or 'video' in loc: continue # 子サイトマップ(.xml)なら、中に入ってさらに探す if loc.endswith('.xml'): print(f" ↳ 子マップへ移動: {loc.split('/')[-1]}") sub_urls = fetch_all_urls(loc, visited) urls.update(sub_urls) else: # それ以外は「記事」とみなしてリストに入れる urls.add(loc) except Exception as e: print(f" ⚠️ エラー: {e}") return urls def save_as_docx(url): try: # 記事IDをURLから無理やり作る(最後のスラッシュより後ろを使う) # 例: .../entry/12345 -> 12345 # 例: .../2024/02/07/hello -> hello clean_url = url.rstrip('/') article_id = clean_url.split('/')[-1] # 【修正版】ID生成ルールの変更 # IDが短すぎる(3桁未満)、あるいは数字だけで短い(5桁未満)場合は、もう一つ前もくっつける # 5桁以上の数字(22551など)はユニークなIDとみなして、そのまま採用する(余計な日付をつけない) if len(article_id) < 3 or (article_id.isdigit() and len(article_id) < 5): parts = clean_url.split('/') if len(parts) > 2: article_id = f"{parts[-2]}-{parts[-1]}" # 既存チェック&更新対応 # 同じIDを持つファイルが既に存在する場合は、タイトルが変わっている可能性があるため削除する existing_files = [f for f in os.listdir(DOCX_DIR) if f"ID-{article_id}】" in f] for old_file in existing_files: try: os.remove(os.path.join(DOCX_DIR, old_file)) # print(f"♻️ 更新のため旧ファイルを削除: {old_file}") except Exception as e: print(f"⚠️ 旧ファイル削除エラー: {e}") # --- 新規ダウンロード --- response = requests.get(url, timeout=15) soup = BeautifulSoup(response.content, 'html.parser') # タイトル取得(h1を優先、なければtitleタグ) title_tag = soup.find('h1') if not title_tag: title_tag = soup.find('title') title = title_tag.get_text(strip=True) if title_tag else article_id # ファイル名作成 filename = f"{sanitize_filename(title)}【ID-{article_id}】.docx" filepath = os.path.join(DOCX_DIR, filename) doc = Document() doc.add_heading(title, 0) doc.add_paragraph(f"記事ID: {article_id}") doc.add_paragraph(f"記事URL: {url}") doc.add_paragraph(f"取得日: {datetime.now().strftime('%Y-%m-%d')}") doc.add_paragraph("---") # 本文取得(entry-contentがあればそこ、なければbody全体からpタグを拾う) # ※これでどんなブログサービスでもだいたい本文が取れます content_div = soup.find('div', class_='entry-content') # Hatena/WordPress if not content_div: content_div = soup.find('article') # HTML5 if not content_div: content_div = soup.find('main') # HTML5 if not content_div: content_div = soup.find('body') # 最終手段 if content_div: for p in content_div.find_all(['p', 'h2', 'h3', 'li']): text = p.get_text(strip=True) # メニューやフッターのノイズを除去(簡易的) if len(text) > 10: doc.add_paragraph(text) doc.save(filepath) print(f"🆕 保存: {filename}") return True except Exception as e: print(f"エラー ({url}): {e}") return False def main(): setup_folders() print("--- サイトマップ全探索モード ---") # 全URL収集 urls = sorted(list(fetch_all_urls(ROOT_SITEMAP))) print(f"\n🔍 検出されたURL数: {len(urls)} 件") if len(urls) == 0: print("⚠️ 警告: URLが1つも見つかりません。sitemap.xmlの中身が空の可能性があります。") return # 保存処理 new_count = 0 print("ライブラリと比較中...") for i, url in enumerate(urls): if save_as_docx(url): new_count += 1 if (i+1) % 50 == 0: print(f"... {i+1} / {len(urls)} 完了") print(f"\n✅ 結果: 新規保存 {new_count} 件") print("--- 完了 ---") if __name__ == "__main__": main() ``` /Users/XXXXXX/python_scripts/docx_to_clean_csv.py docx_to_clean_csv.py ``` import os import csv import re from docx import Document from datetime import datetime # 時計機能を追加 # ========================================== # 設定エリア # ========================================== SOURCE_DIR = os.path.expanduser('~/XXXXXX_blog_data/docx') OUTPUT_FILE = os.path.expanduser('~/XXXXXX_blog_data/XXXXXX_clean_database.csv') def clean_transcript(text): text = re.sub(r'\d{2}:\d{2}:\d{2}\.\d{3} --> \d{2}:\d{2}:\d{2}\.\d{3}.*\n?', '', text) text = re.sub(r'^\d{1,2}:\d{2}\s*', '', text, flags=re.MULTILINE) text = text.replace('WEBVTT', '').replace('Kind: captions', '').replace('Language: ja', '') text = re.sub(r'\n\s*\n', '\n', text) return text.strip() def extract_data(doc, filename): data = {'ID': '', 'Title': '', 'Date': '', 'URL': '', 'Body': '', 'Transcript': ''} base_name = os.path.splitext(filename)[0] if '【ID-' in base_name: parts = base_name.split('【ID-') data['Title'] = parts[0].strip() data['ID'] = parts[1].replace('】', '').strip() else: data['Title'] = base_name lines = [p.text.strip() for p in doc.paragraphs if p.text.strip()] body_lines = [] transcript_lines = [] current_mode = 'body' for line in lines: if line.startswith('記事URL:'): data['URL'] = line.replace('記事URL:', '').strip() continue if line.startswith('記事ID:') and not data['ID']: data['ID'] = line.replace('記事ID:', '').strip() continue if line.startswith('取得日:'): data['Date'] = line.replace('取得日:', '').strip() continue if '字幕データ' in line or '.vtt' in line or '.txt' in line: current_mode = 'transcript' continue if line == '---' or line == '---------------------------': continue if current_mode == 'body': body_lines.append(line) elif current_mode == 'transcript': transcript_lines.append(line) data['Body'] = "\n".join(body_lines).strip() raw_transcript = "\n".join(transcript_lines) data['Transcript'] = clean_transcript(raw_transcript) return data def main(): print(f"処理開始: {SOURCE_DIR}") if not os.path.exists(SOURCE_DIR): print("エラー: docxフォルダが見つかりません") return with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as csvfile: fieldnames = ['ID', 'Title', 'Date', 'URL', 'Body', 'Transcript'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() # --- ここが新機能:システム更新情報の書き込み --- now_str = datetime.now().strftime('%Y年%m月%d日 %H:%M') writer.writerow({ 'ID': 'SYSTEM_UPDATE', 'Title': 'データベース更新情報', 'Date': now_str, 'URL': 'https://XXXXXX.com/', 'Body': f'このデータベースは {now_str} に更新されました。最新の記事データが含まれています。', 'Transcript': '' }) print(f"✅ 更新日時を記録しました: {now_str}") # --------------------------------------------- files = sorted([f for f in os.listdir(SOURCE_DIR) if f.endswith('.docx')]) print(f"対象ファイル: {len(files)} 件") for i, filename in enumerate(files): try: doc = Document(os.path.join(SOURCE_DIR, filename)) row_data = extract_data(doc, filename) writer.writerow(row_data) if (i+1) % 50 == 0: print(f"... {i+1} 件 処理中") except Exception as e: print(f"スキップ ({filename}): {e}") print(f"\n完了!作成ファイル: {OUTPUT_FILE}") if __name__ == "__main__": main() ```