大量のGmailデータをナレッジとして活用するための方法
SHOGAKU
23 days ago
大量のGmailデータをナレッジとして活用するための方法
Google Apps Script (GAS)の制限を回避する方法
バッチ処理によるデータ取得
- 日付やラベルで分割して少量ずつ取得する方法があります
- 一度に全部ではなく、期間を区切って取得するスクリプトを作成できます
Gmail APIを直接使用
- Pythonを使用してGmail APIに直接アクセスする方法
- Google Colabと組み合わせるとより多くのデータを処理できます
Python + Google Colabを使った例を示します:
import os
import pickle
import pandas as pd
import base64
import datetime
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
import time
# Gmail APIの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
def authenticate_gmail():
"""Gmailの認証を行い、サービスオブジェクトを返す"""
creds = None
# トークンがあれば読み込む
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# 有効な認証情報がなければ、ユーザー認証を行う
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# 次回のために認証情報を保存
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
# Gmail APIサービスを構築
service = build('gmail', 'v1', credentials=creds)
return service
def get_email_data(service, query='', max_results=500):
"""
指定したクエリに一致するメールを取得し、データフレームに格納
query: Gmailの検索クエリ (例: 'after:2023/01/01 before:2023/12/31')
max_results: 一度に取得する最大メール数
"""
results = service.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
messages = results.get('messages', [])
email_data = []
batch_size = 100 # 1バッチ当たりの処理メール数
for i in range(0, len(messages), batch_size):
batch = messages[i:i+batch_size]
print(f"処理中: {i+1}から{i+len(batch)}まで (全{len(messages)}件)")
for message in batch:
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
# ヘッダー情報を取得
headers = msg['payload']['headers']
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
date_str = next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown')
# 本文を取得(シンプルな実装、実際には複雑な場合がある)
if 'parts' in msg['payload']:
body = get_body_from_parts(msg['payload']['parts'])
else:
body = get_body(msg['payload'])
# スレッドIDを取得
thread_id = msg['threadId']
# ラベルを取得
labels = msg['labelIds'] if 'labelIds' in msg else []
# メールID
email_id = msg['id']
# データを追加
email_data.append({
'id': email_id,
'thread_id': thread_id,
'date': date_str,
'from': sender,
'subject': subject,
'body': body,
'labels': ','.join(labels)
})
except Exception as e:
print(f"エラー発生: {e}")
# API制限回避のため少し待機
time.sleep(0.05)
# 各バッチ後に少し長めに待機
time.sleep(1)
return pd.DataFrame(email_data)
def get_body(payload):
"""メッセージボディを取得"""
if 'body' in payload and 'data' in payload['body']:
body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore')
return body
return ""
def get_body_from_parts(parts):
"""メッセージのパートから本文を取得"""
body = ""
for part in parts:
if part['mimeType'] == 'text/plain' and 'body' in part and 'data' in part['body']:
body += base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
elif 'parts' in part:
body += get_body_from_parts(part['parts'])
return body
def batch_download_emails(start_date, end_date, output_file='gmail_data.csv', batch_days=30):
"""日付範囲を指定して、バッチで少しずつメールをダウンロード"""
service = authenticate_gmail()
# 日付範囲を作成
start = datetime.datetime.strptime(start_date, '%Y/%m/%d')
end = datetime.datetime.strptime(end_date, '%Y/%m/%d')
all_emails = pd.DataFrame()
# バッチ期間ごとにメールを取得
current_start = start
while current_start < end:
# バッチの終了日を計算
current_end = current_start + datetime.timedelta(days=batch_days)
if current_end > end:
current_end = end
# 日付範囲を文字列に変換
start_str = current_start.strftime('%Y/%m/%d')
end_str = current_end.strftime('%Y/%m/%d')
print(f"\n{start_str}から{end_str}までのメールを取得中...")
query = f'after:{start_str} before:{end_str}'
# メールを取得
batch_emails = get_email_data(service, query=query)
# 結果を追加
all_emails = pd.concat([all_emails, batch_emails], ignore_index=True)
print(f"{len(batch_emails)}件のメールを取得しました。累計: {len(all_emails)}件")
# 現在のバッチを保存
batch_filename = f"gmail_data_{start_str.replace('/', '')}_to_{end_str.replace('/', '')}.csv"
batch_emails.to_csv(batch_filename, index=False, encoding='utf-8-sig')
print(f"中間ファイル {batch_filename} を保存しました。")
# 次のバッチの開始日を設定
current_start = current_end
# API制限を回避するために待機
time.sleep(2)
# 全てのデータを保存
all_emails.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"\n全てのメールデータを {output_file} に保存しました。合計: {len(all_emails)}件")
return all_emails
# 使用例
if __name__ == "__main__":
# 取得したいメールの日付範囲を指定
start_date = '2020/01/01' # 開始日
end_date = '2023/12/31' # 終了日
# メールをダウンロード
emails_df = batch_download_emails(start_date, end_date, output_file='all_gmail_data.csv', batch_days=90)
# 取得したデータを表示
print(f"\nデータサンプル (最初の5件):")
print(emails_df.head())
Gmail APIの制限とその他の代替案
Gmail APIにも以下の制限があります:
- 1日あたりのクォータ(1日に250MB〜1GBのデータ)
- 1秒あたりのリクエスト数制限
他の選択肢も検討してみましょう:
Google Takeoutを使用
- Googleのデータエクスポートサービス
- Gmailデータを含むすべてのGoogleデータをMBOXファイル形式でダウンロード可能
- 大量データでも一括取得可能
MBOXをCSVやスプレッドシートに変換
- Google Takeoutで取得したMBOXファイルを解析するPythonスクリプト
import mailbox
import csv
import email
from email.header import decode_header
import quopri
import base64
import html
import re
import pandas as pd
import os
from tqdm import tqdm
import argparse
def decode_email_header(header):
"""メールヘッダをデコード"""
if header is None:
return ""
decoded_parts = []
for part, encoding in decode_header(header):
if isinstance(part, bytes):
try:
if encoding is not None:
decoded_parts.append(part.decode(encoding))
else:
decoded_parts.append(part.decode('utf-8', errors='replace'))
except:
decoded_parts.append(part.decode('utf-8', errors='replace'))
else:
decoded_parts.append(part)
return ''.join(decoded_parts)
def get_email_body(message):
"""メール本文を取得"""
body = ""
# 本文を取得
if message.is_multipart():
for part in message.get_payload():
# プレーンテキストの部分を探す
if part.get_content_type() == 'text/plain':
body_part = part.get_payload(decode=True)
charset = part.get_content_charset()
if charset:
try:
body += body_part.decode(charset, errors='replace')
except:
body += body_part.decode('utf-8', errors='replace')
else:
body += body_part.decode('utf-8', errors='replace')
else:
# シングルパートのメール
payload = message.get_payload(decode=True)
charset = message.get_content_charset()
if charset:
try:
body = payload.decode(charset, errors='replace')
except:
body = payload.decode('utf-8', errors='replace')
else:
body = payload.decode('utf-8', errors='replace')
# HTMLタグを削除
body = re.sub(r'<[^>]+>', ' ', body)
# 余分な空白を削除
body = re.sub(r'\s+', ' ', body).strip()
return body
def convert_mbox_to_csv(mbox_file, output_csv, max_emails=None):
"""MBOXファイルをCSVに変換"""
# MBOXファイルを開く
print(f"MBOXファイル '{mbox_file}' を読み込んでいます...")
mbox = mailbox.mbox(mbox_file)
# 処理するメール数を決定
total_emails = len(mbox)
if max_emails is None or max_emails > total_emails:
max_emails = total_emails
print(f"合計 {total_emails} メールのうち、{max_emails} メールを処理します...")
# CSVファイルを準備
email_data = []
# プログレスバーを表示
for i, message in enumerate(tqdm(mbox, total=max_emails)):
if i >= max_emails:
break
try:
# 基本的なメール情報を取得
subject = decode_email_header(message['subject'])
from_addr = decode_email_header(message['from'])
to_addr = decode_email_header(message['to'])
date = message['date']
message_id = message['message-id']
# 本文を取得
body = get_email_body(message)
# データを追加
email_data.append({
'message_id': message_id,
'date': date,
'from': from_addr,
'to': to_addr,
'subject': subject,
'body': body,
'labels': message.get('X-Gmail-Labels', '')
})
# メモリ使用量を抑えるため、一定間隔でデータをファイルに書き出す
if len(email_data) >= 10000:
pd.DataFrame(email_data).to_csv(
output_csv,
index=False,
mode='a',
header=not os.path.exists(output_csv),
encoding='utf-8-sig'
)
email_data = []
except Exception as e:
print(f"メールID {i} の処理中にエラーが発生しました: {e}")
# 残りのデータを書き出し
if email_data:
pd.DataFrame(email_data).to_csv(
output_csv,
index=False,
mode='a',
header=not os.path.exists(output_csv),
encoding='utf-8-sig'
)
print(f"変換完了! データは '{output_csv}' に保存されました。")
def split_large_csv(input_csv, output_prefix, rows_per_file=100000):
"""大きなCSVファイルを複数の小さなファイルに分割"""
print(f"CSVファイル '{input_csv}' を分割しています...")
# CSVファイルを読み込む
df = pd.read_csv(input_csv, encoding='utf-8-sig')
total_rows = len(df)
# ファイルを分割
for i in range(0, total_rows, rows_per_file):
end_idx = min(i + rows_per_file, total_rows)
part_df = df.iloc[i:end_idx]
# 出力ファイル名
output_file = f"{output_prefix}_part{i//rows_per_file + 1}.csv"
# ファイルに保存
part_df.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"部分ファイル '{output_file}' を作成しました ({len(part_df)} 行)")
print(f"分割完了! {total_rows} 行のデータを {(total_rows + rows_per_file - 1) // rows_per_file} ファイルに分割しました。")
def main():
parser = argparse.ArgumentParser(description='MBOXファイルをCSVに変換するツール')
parser.add_argument('mbox_file', help='変換するMBOXファイルのパス')
parser.add_argument('--output', '-o', default='gmail_data.csv', help='出力CSVファイルのパス (デフォルト: gmail_data.csv)')
parser.add_argument('--max', '-m', type=int, help='処理する最大メール数 (オプション)')
parser.add_argument('--split', '-s', action='store_true', help='大きなCSVファイルを分割するかどうか')
parser.add_argument('--rows', '-r', type=int, default=100000, help='分割する場合の1ファイルあたりの行数 (デフォルト: 100000)')
args = parser.parse_args()
# MBOXをCSVに変換
convert_mbox_to_csv(args.mbox_file, args.output, args.max)
# 必要に応じてCSVを分割
if args.split:
output_prefix = os.path.splitext(args.output)[0]
split_large_csv(args.output, output_prefix, args.rows)
if __name__ == "__main__":
main()
Google Takeoutの利用手順
- Google Takeoutへアクセス: https://takeout.google.com/
- Gmailのみを選択(他のサービスはチェックを外す)
- 必要に応じてフィルタ(ラベルなど)を設定可能
- エクスポートをリクエスト
- 準備ができたらダウンロードリンクがメールで届く
PDFでの取得
Gmail全体をPDFとして保存する方法も検討できます:
import os
import pickle
import base64
import datetime
import time
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
import html
import re
# Gmail APIの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
def authenticate_gmail():
"""Gmailの認証を行い、サービスオブジェクトを返す"""
creds = None
# トークンがあれば読み込む
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# 有効な認証情報がなければ、ユーザー認証を行う
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# 次回のために認証情報を保存
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
# Gmail APIサービスを構築
service = build('gmail', 'v1', credentials=creds)
return service
def get_body(payload):
"""メッセージボディを取得"""
if 'body' in payload and 'data' in payload['body']:
body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore')
return body
return ""
def get_body_from_parts(parts):
"""メッセージのパートから本文を取得"""
body = ""
for part in parts:
if part['mimeType'] == 'text/plain' and 'body' in part and 'data' in part['body']:
body += base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
elif 'parts' in part:
body += get_body_from_parts(part['parts'])
return body
def clean_text_for_pdf(text):
"""PDFに表示するためのテキストをクリーニング"""
# HTMLタグを削除
text = re.sub(r'<[^>]+>', ' ', text)
# HTMLエンティティをデコード
text = html.unescape(text)
# 制御文字を削除
text = re.sub(r'[\x00-\x1F\x7F]', '', text)
# 複数の空白を1つにまとめる
text = re.sub(r'\s+', ' ', text)
return text
def export_emails_to_pdf(service, query='', max_results=500, output_dir='gmail_pdfs'):
"""指定したクエリに一致するメールをPDFとして保存"""
# 出力ディレクトリを作成
os.makedirs(output_dir, exist_ok=True)
# Gmailメッセージを取得
results = service.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
messages = results.get('messages', [])
if not messages:
print('メッセージが見つかりませんでした。')
return
print(f'{len(messages)}件のメールを処理します...')
# サマリーPDFの準備
summary_pdf = os.path.join(output_dir, 'email_summary.pdf')
doc = SimpleDocTemplate(summary_pdf, pagesize=letter)
styles = getSampleStyleSheet()
story = []
# タイトルを追加
title_style = styles['Heading1']
story.append(Paragraph(f"Gmail メール概要 - {query}", title_style))
story.append(Spacer(1, 12))
# サマリーテーブル用のデータ
table_data = [['#', '日付', '差出人', '件名', 'ファイル名']]
for i, message in enumerate(messages):
try:
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
# ヘッダー情報を取得
headers = msg['payload']['headers']
subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject')
sender = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
date_str = next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown')
# 本文を取得
if 'parts' in msg['payload']:
body = get_body_from_parts(msg['payload']['parts'])
else:
body = get_body(msg['payload'])
# テキストをクリーニング
clean_subject = clean_text_for_pdf(subject)
clean_sender = clean_text_for_pdf(sender)
clean_body = clean_text_for_pdf(body)
# 個別のメールPDF用のファイル名を作成(件名をファイル名に適した形式に変更)
safe_subject = re.sub(r'[\\/*?:"<>|]', '_', clean_subject)
# ファイル名の長さを制限
if len(safe_subject) > 50:
safe_subject = safe_subject[:47] + '...'
email_filename = f"email_{i+1:04d}_{safe_subject}.pdf"
email_pdf_path = os.path.join(output_dir, email_filename)
# 個別のメールPDFを作成
email_doc = SimpleDocTemplate(email_pdf_path, pagesize=letter)
email_story = []
# ヘッダー情報を追加
header_style = ParagraphStyle(
'Header',
parent=styles['Normal'],
fontName='Helvetica-Bold',
fontSize=12,
spaceAfter=6
)
email_story.append(Paragraph(f"件名: {clean_subject}", header_style))
email_story.append(Paragraph(f"差出人: {clean_sender}", header_style))
email_story.append(Paragraph(f"日付: {date_str}", header_style))
email_story.append(Spacer(1, 12))
# 本文を追加
body_style = styles['Normal']
email_story.append(Paragraph(clean_body, body_style))
# PDFを生成
email_doc.build(email_story)
print(f"メール {i+1}/{len(messages)}: {clean_subject} -> {email_filename}")
# サマリーテーブルにデータを追加
table_data.append([
str(i+1),
date_str[:10] if len(date_str) > 10 else date_str,
clean_sender[:30] + '...' if len(clean_sender) > 30 else clean_sender,
clean_subject[:50] + '...' if len(clean_subject) > 50 else clean_subject,
email_filename
])
except Exception as e:
print(f"メール {i+1} の処理中にエラーが発生しました: {e}")
# API制限回避のため少し待機
time.sleep(0.05)
# サマリーテーブルをPDFに追加
table = Table(table_data, repeatRows=1)
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
]))
story.append(table)
doc.build(story)
print(f"\n処理完了! {len(messages)} 件のメールを個別PDFとして保存し、サマリーを作成しました。")
print(f"出力ディレクトリ: {output_dir}")
print(f"サマリーファイル: {summary_pdf}")
def batch_export_by_date(start_date, end_date, batch_days=30, output_dir='gmail_pdfs'):
"""日付範囲を指定して、バッチで少しずつメールをPDFに変換"""
service = authenticate_gmail()
# 日付範囲を作成
start = datetime.datetime.strptime(start_date, '%Y/%m/%d')
end = datetime.datetime.strptime(end_date, '%Y/%m/%d')
# バッチ期間ごとにメールを取得
current_start = start
batch_num = 1
while current_start < end:
# バッチの終了日を計算
current_end = current_start + datetime.timedelta(days=batch_days)
if current_end > end:
current_end = end
# 日付範囲を文字列に変換
start_str = current_start.strftime('%Y/%m/%d')
end_str = current_end.strftime('%Y/%m/%d')
print(f"\nバッチ {batch_num}: {start_str}から{end_str}までのメールを処理中...")
query = f'after:{start_str} before:{end_str}'
# バッチごとの出力ディレクトリ
batch_dir = os.path.join(output_dir, f"batch_{batch_num:03d}_{start_str.replace('/', '')}_to_{end_str.replace('/', '')}")
# メールをPDFに変換
export_emails_to_pdf(service, query=query, max_results=500, output_dir=batch_dir)
# 次のバッチの開始日を設定
current_start = current_end
batch_num += 1
# API制限を回避するために待機
print("APIレート制限を避けるため10秒待機します...")
time.sleep(10)
def batch_export_by_label(label_name, output_dir='gmail_pdfs_by_label'):
"""特定のラベルが付いたメールをPDFとして保存"""
service = authenticate_gmail()
print(f"\n'{label_name}'ラベルが付いたメールを処理します...")
query = f'label:{label_name}'
# ラベル用の出力ディレクトリ
label_dir = os.path.join(output_dir, f"label_{label_name}")
# メールをPDFに変換
export_emails_to_pdf(service, query=query, max_results=1000, output_dir=label_dir)
if __name__ == "__main__":
# 使用例1: 2023年1月から2023年12月までのメールを30日ごとのバッチで処理
# batch_export_by_date("2023/01/01", "2023/12/31", batch_days=30, output_dir="gmail_pdfs_2023")
# 使用例2: 特定のラベルが付いたメールを処理
# batch_export_by_label("重要", output_dir="gmail_pdfs_by_label")
# 使用例3: 特定の検索クエリに一致するメールを処理する
service = authenticate_gmail()
export_emails_to_pdf(service, query="from:example@gmail.com", max_results=100, output_dir="specific_sender_pdfs")
print("\nすべての処理が完了しました。")
コメント
いいね
投げ銭
最新順
人気順
コメント
いいね
投げ銭
最新順
人気順