utils.py
· 10 KiB · Python
Bruto
import httpx
from xml.etree import ElementTree as ET
from nonebot.adapters.onebot.v11 import Message, MessageSegment
import re
from bs4 import BeautifulSoup
from datetime import datetime, timezone, timedelta
from nonebot import get_plugin_config
from .config import Config
# Load config at the top level of the module
config = get_plugin_config(Config)
OPENAI_API_BASE = config.openai_api_base
OPENAI_API_KEY = config.openai_api_key
async def fetch_tweet_data(rss_url, original_link):
"""
Fetches and parses tweet data from the given RSS URL.
Now it finds the matching item by comparing a portion of guid from the end.
Args:
rss_url: The RSS feed URL.
original_link: The original Twitter link to match against.
Returns:
A dictionary containing the tweet content, or None if no matching item found.
"""
try:
print(f"Fetching RSS data from: {rss_url}")
async with httpx.AsyncClient() as client:
response = await client.get(rss_url)
response.raise_for_status()
root = ET.fromstring(response.text)
items = root.findall(".//item")
if not items:
return None
# Extract user and tweet ID from the original link for comparison
match = re.search(r"twitter\.com/(\w+)/status/(\d+)", original_link)
if not match:
match = re.search(r"x\.com/(\w+)/status/(\d+)", original_link)
if not match:
print(f"Could not extract user/tweet ID from original link: {original_link}")
return None
original_user, original_tweet_id = match.groups()
# Iterate items in reverse order
for item in reversed(items):
guid = item.find("guid").text
# Extract user and tweet ID from the guid for comparison
guid_match = re.search(r"twitter\.com/(\w+)/status/(\d+)", guid)
if not guid_match:
continue
guid_user, guid_tweet_id = guid_match.groups()
# Compare user and tweet ID
if guid_user == original_user and guid_tweet_id == original_tweet_id:
content = item.find("description").text
pub_date = item.find("pubDate").text
author = item.find("author").text
text, image_urls = extract_text_and_images(content)
video_urls = extract_video_urls(content)
return {
"text": text,
"images": image_urls,
"videos": video_urls,
"pub_date": pub_date,
"author": author,
}
return None # No matching item found
except httpx.HTTPError as e:
print(f"HTTP error fetching RSS: {e}")
return None
except ET.ParseError as e:
print(f"Error parsing RSS XML: {e}")
return None
async def translate_text(text, target_language="zh-Hans"):
"""
Translates the given text to the target language using a compatible OpenAI API.
Args:
text: The text to translate.
target_language: The target language code (e.g., "zh-Hans" for Simplified Chinese).
Returns:
The translated text, or None if an error occurred.
"""
if not text:
return None
if not OPENAI_API_KEY:
print("Error: OPENAI_API_KEY is not set. Translation will not work.")
return None
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
json_data = {
"model": "llama-3.3-70b-versatile", # 或者其他支持的模型
"messages": [
{
"role": "system",
"content": (
"You are a helpful assistant that translates text from any language into the language specified "
"by the user. Please provide a fluent, natural-sounding translation for the entire input text. "
"Whenever you encounter '干し物' or '干し芋', translate them as '愿望单'. "
"Output only the translated text without any additional explanations."
)
},
{
"role": "user",
"content": f"请将以下文本翻译成 {target_language}:\n\n{text}"
}
]
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{OPENAI_API_BASE}/v1/chat/completions",
headers=headers,
json=json_data,
timeout=30.0
)
response.raise_for_status()
translated_text = response.json()["choices"][0]["message"]["content"].strip()
return translated_text
except Exception as e:
print(f"Error translating text: {e}")
return None
def extract_text_and_images(content):
"""
Extracts text and image URLs from the raw RSS content.
This assumes the content is HTML.
Args:
content: The raw HTML content from the RSS feed.
Returns:
A tuple containing the text content and a list of image URLs.
"""
soup = BeautifulSoup(content, "html.parser")
# Remove unnecessary elements (e.g., links to images and videos)
for a_tag in soup.find_all("a", href=True):
if "https://pbs.twimg.com/media/" in a_tag["href"] or "https://video.twimg.com/" in a_tag["href"]:
a_tag.extract()
for video_tag in soup.find_all("video", src=True):
video_tag.extract()
# Extract text
text = soup.get_text(separator="\n", strip=True)
# Extract image URLs
image_urls = [
img["src"] for img in soup.find_all("img", src=re.compile(r"^https://pbs\.twimg\.com/media/"))
]
return text, image_urls
def extract_video_urls(content):
"""
Extracts video URLs from the raw RSS content.
Now it can handle video URLs in both <a> and <video> tags.
Args:
content: The raw HTML content from the RSS feed.
Returns:
A list of video URLs.
"""
soup = BeautifulSoup(content, "html.parser")
video_urls = []
# Find video URLs in <a> tags
for a_tag in soup.find_all("a", href=True):
if "https://video.twimg.com/" in a_tag["href"]:
video_urls.append(a_tag["href"])
# Find video URLs in <video> tags
for video_tag in soup.find_all("video", src=True):
if "https://video.twimg.com/" in video_tag["src"]:
video_urls.append(video_tag["src"])
return video_urls
def format_pub_date(pub_date_str):
"""
Converts a GMT formatted pubDate string to a simplified East Asia time string.
Args:
pub_date_str: A string representing the publication date in GMT format.
Returns:
A string representing the date in a simplified format with East Asia timezone, or None if the input is invalid.
"""
try:
# Parse the GMT time string
pub_date_gmt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z")
# Convert to East Asia Time (UTC+8)
pub_date_east_asia = pub_date_gmt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
# Format the date string as desired
formatted_date_str = pub_date_east_asia.strftime("%y-%m-%d %H:%M")
return formatted_date_str
except ValueError as e:
print(f"Error parsing pubDate: {e}")
return None
async def build_message(tweet_data, user_name):
"""
Builds a message from the tweet data, including translated text and Twitter user ID.
Args:
tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
user_name: The Twitter user ID.
Returns:
A Message object ready to be sent, or None if there's no content to send.
"""
message = Message()
formatted_date = format_pub_date(tweet_data.get('pub_date',''))
# Add author and time
if formatted_date and tweet_data.get('author'):
message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n"))
# Add tweet content
if tweet_data.get('text'):
message.append(MessageSegment.text(f"{tweet_data['text']}\n"))
# Translate the text and append both original and translated text
if tweet_data["text"]:
translated_text = await translate_text(tweet_data["text"])
if translated_text:
message.append(MessageSegment.text(f"--------\n{translated_text}\n"))
if tweet_data.get("images"):
for image_url in tweet_data["images"]:
message.append(MessageSegment.image(image_url))
# We will handle video sending separately, so we don't add it to the message here.
# Check if there's any content to send before returning
if len(message) > 0:
return message
else:
return None
async def build_message_content_only(tweet_data, user_name):
"""
Builds a message containing only image and video content, without translation.
Args:
tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
user_name: The Twitter user ID.
Returns:
A Message object ready to be sent, or None if there's no media content to send.
"""
message = Message()
if tweet_data.get("images"):
for image_url in tweet_data["images"]:
message.append(MessageSegment.image(image_url))
# Check if there's any content to send before returning
if len(message) > 0:
return message
else:
return None
async def build_message_original(tweet_data, user_name):
"""
Builds a message from the tweet data without translation, including Twitter user ID.
Args:
tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
user_name: The Twitter user ID.
Returns:
A Message object ready to be sent, or None if there's no content to send.
"""
message = Message()
formatted_date = format_pub_date(tweet_data.get('pub_date',''))
# Add author and time
if formatted_date and tweet_data.get('author'):
message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n"))
# Add tweet content without translation
if tweet_data.get('text'):
message.append(MessageSegment.text(f"{tweet_data['text']}\n"))
if tweet_data.get("images"):
for image_url in tweet_data["images"]:
message.append(MessageSegment.image(image_url))
# Check if there's any content to send before returning
if len(message) > 0:
return message
else:
return None
| 1 | import httpx |
| 2 | from xml.etree import ElementTree as ET |
| 3 | from nonebot.adapters.onebot.v11 import Message, MessageSegment |
| 4 | import re |
| 5 | from bs4 import BeautifulSoup |
| 6 | from datetime import datetime, timezone, timedelta |
| 7 | from nonebot import get_plugin_config |
| 8 | from .config import Config |
| 9 | |
| 10 | # Load config at the top level of the module |
| 11 | config = get_plugin_config(Config) |
| 12 | OPENAI_API_BASE = config.openai_api_base |
| 13 | OPENAI_API_KEY = config.openai_api_key |
| 14 | |
| 15 | async def fetch_tweet_data(rss_url, original_link): |
| 16 | """ |
| 17 | Fetches and parses tweet data from the given RSS URL. |
| 18 | Now it finds the matching item by comparing a portion of guid from the end. |
| 19 | |
| 20 | Args: |
| 21 | rss_url: The RSS feed URL. |
| 22 | original_link: The original Twitter link to match against. |
| 23 | |
| 24 | Returns: |
| 25 | A dictionary containing the tweet content, or None if no matching item found. |
| 26 | """ |
| 27 | try: |
| 28 | print(f"Fetching RSS data from: {rss_url}") |
| 29 | async with httpx.AsyncClient() as client: |
| 30 | response = await client.get(rss_url) |
| 31 | response.raise_for_status() |
| 32 | |
| 33 | root = ET.fromstring(response.text) |
| 34 | items = root.findall(".//item") |
| 35 | if not items: |
| 36 | return None |
| 37 | |
| 38 | # Extract user and tweet ID from the original link for comparison |
| 39 | match = re.search(r"twitter\.com/(\w+)/status/(\d+)", original_link) |
| 40 | if not match: |
| 41 | match = re.search(r"x\.com/(\w+)/status/(\d+)", original_link) |
| 42 | if not match: |
| 43 | print(f"Could not extract user/tweet ID from original link: {original_link}") |
| 44 | return None |
| 45 | original_user, original_tweet_id = match.groups() |
| 46 | |
| 47 | # Iterate items in reverse order |
| 48 | for item in reversed(items): |
| 49 | guid = item.find("guid").text |
| 50 | # Extract user and tweet ID from the guid for comparison |
| 51 | guid_match = re.search(r"twitter\.com/(\w+)/status/(\d+)", guid) |
| 52 | if not guid_match: |
| 53 | continue |
| 54 | guid_user, guid_tweet_id = guid_match.groups() |
| 55 | |
| 56 | # Compare user and tweet ID |
| 57 | if guid_user == original_user and guid_tweet_id == original_tweet_id: |
| 58 | content = item.find("description").text |
| 59 | pub_date = item.find("pubDate").text |
| 60 | author = item.find("author").text |
| 61 | text, image_urls = extract_text_and_images(content) |
| 62 | video_urls = extract_video_urls(content) |
| 63 | return { |
| 64 | "text": text, |
| 65 | "images": image_urls, |
| 66 | "videos": video_urls, |
| 67 | "pub_date": pub_date, |
| 68 | "author": author, |
| 69 | } |
| 70 | |
| 71 | return None # No matching item found |
| 72 | |
| 73 | except httpx.HTTPError as e: |
| 74 | print(f"HTTP error fetching RSS: {e}") |
| 75 | return None |
| 76 | except ET.ParseError as e: |
| 77 | print(f"Error parsing RSS XML: {e}") |
| 78 | return None |
| 79 | |
| 80 | async def translate_text(text, target_language="zh-Hans"): |
| 81 | """ |
| 82 | Translates the given text to the target language using a compatible OpenAI API. |
| 83 | |
| 84 | Args: |
| 85 | text: The text to translate. |
| 86 | target_language: The target language code (e.g., "zh-Hans" for Simplified Chinese). |
| 87 | |
| 88 | Returns: |
| 89 | The translated text, or None if an error occurred. |
| 90 | """ |
| 91 | if not text: |
| 92 | return None |
| 93 | |
| 94 | if not OPENAI_API_KEY: |
| 95 | print("Error: OPENAI_API_KEY is not set. Translation will not work.") |
| 96 | return None |
| 97 | |
| 98 | headers = { |
| 99 | "Content-Type": "application/json", |
| 100 | "Authorization": f"Bearer {OPENAI_API_KEY}" |
| 101 | } |
| 102 | |
| 103 | json_data = { |
| 104 | "model": "llama-3.3-70b-versatile", # 或者其他支持的模型 |
| 105 | "messages": [ |
| 106 | { |
| 107 | "role": "system", |
| 108 | "content": ( |
| 109 | "You are a helpful assistant that translates text from any language into the language specified " |
| 110 | "by the user. Please provide a fluent, natural-sounding translation for the entire input text. " |
| 111 | "Whenever you encounter '干し物' or '干し芋', translate them as '愿望单'. " |
| 112 | "Output only the translated text without any additional explanations." |
| 113 | ) |
| 114 | }, |
| 115 | { |
| 116 | "role": "user", |
| 117 | "content": f"请将以下文本翻译成 {target_language}:\n\n{text}" |
| 118 | } |
| 119 | ] |
| 120 | } |
| 121 | |
| 122 | |
| 123 | try: |
| 124 | async with httpx.AsyncClient() as client: |
| 125 | response = await client.post( |
| 126 | f"{OPENAI_API_BASE}/v1/chat/completions", |
| 127 | headers=headers, |
| 128 | json=json_data, |
| 129 | timeout=30.0 |
| 130 | ) |
| 131 | response.raise_for_status() |
| 132 | translated_text = response.json()["choices"][0]["message"]["content"].strip() |
| 133 | return translated_text |
| 134 | except Exception as e: |
| 135 | print(f"Error translating text: {e}") |
| 136 | return None |
| 137 | |
| 138 | def extract_text_and_images(content): |
| 139 | """ |
| 140 | Extracts text and image URLs from the raw RSS content. |
| 141 | This assumes the content is HTML. |
| 142 | |
| 143 | Args: |
| 144 | content: The raw HTML content from the RSS feed. |
| 145 | |
| 146 | Returns: |
| 147 | A tuple containing the text content and a list of image URLs. |
| 148 | """ |
| 149 | soup = BeautifulSoup(content, "html.parser") |
| 150 | |
| 151 | # Remove unnecessary elements (e.g., links to images and videos) |
| 152 | for a_tag in soup.find_all("a", href=True): |
| 153 | if "https://pbs.twimg.com/media/" in a_tag["href"] or "https://video.twimg.com/" in a_tag["href"]: |
| 154 | a_tag.extract() |
| 155 | |
| 156 | for video_tag in soup.find_all("video", src=True): |
| 157 | video_tag.extract() |
| 158 | |
| 159 | # Extract text |
| 160 | text = soup.get_text(separator="\n", strip=True) |
| 161 | |
| 162 | # Extract image URLs |
| 163 | image_urls = [ |
| 164 | img["src"] for img in soup.find_all("img", src=re.compile(r"^https://pbs\.twimg\.com/media/")) |
| 165 | ] |
| 166 | |
| 167 | return text, image_urls |
| 168 | |
| 169 | def extract_video_urls(content): |
| 170 | """ |
| 171 | Extracts video URLs from the raw RSS content. |
| 172 | Now it can handle video URLs in both <a> and <video> tags. |
| 173 | |
| 174 | Args: |
| 175 | content: The raw HTML content from the RSS feed. |
| 176 | |
| 177 | Returns: |
| 178 | A list of video URLs. |
| 179 | """ |
| 180 | soup = BeautifulSoup(content, "html.parser") |
| 181 | video_urls = [] |
| 182 | |
| 183 | # Find video URLs in <a> tags |
| 184 | for a_tag in soup.find_all("a", href=True): |
| 185 | if "https://video.twimg.com/" in a_tag["href"]: |
| 186 | video_urls.append(a_tag["href"]) |
| 187 | |
| 188 | # Find video URLs in <video> tags |
| 189 | for video_tag in soup.find_all("video", src=True): |
| 190 | if "https://video.twimg.com/" in video_tag["src"]: |
| 191 | video_urls.append(video_tag["src"]) |
| 192 | |
| 193 | return video_urls |
| 194 | |
| 195 | def format_pub_date(pub_date_str): |
| 196 | """ |
| 197 | Converts a GMT formatted pubDate string to a simplified East Asia time string. |
| 198 | |
| 199 | Args: |
| 200 | pub_date_str: A string representing the publication date in GMT format. |
| 201 | |
| 202 | Returns: |
| 203 | A string representing the date in a simplified format with East Asia timezone, or None if the input is invalid. |
| 204 | """ |
| 205 | try: |
| 206 | # Parse the GMT time string |
| 207 | pub_date_gmt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z") |
| 208 | |
| 209 | # Convert to East Asia Time (UTC+8) |
| 210 | pub_date_east_asia = pub_date_gmt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8))) |
| 211 | |
| 212 | # Format the date string as desired |
| 213 | formatted_date_str = pub_date_east_asia.strftime("%y-%m-%d %H:%M") |
| 214 | |
| 215 | return formatted_date_str |
| 216 | except ValueError as e: |
| 217 | print(f"Error parsing pubDate: {e}") |
| 218 | return None |
| 219 | |
| 220 | async def build_message(tweet_data, user_name): |
| 221 | """ |
| 222 | Builds a message from the tweet data, including translated text and Twitter user ID. |
| 223 | |
| 224 | Args: |
| 225 | tweet_data: A dictionary containing the tweet text, image URLs, and video URLs. |
| 226 | user_name: The Twitter user ID. |
| 227 | |
| 228 | Returns: |
| 229 | A Message object ready to be sent, or None if there's no content to send. |
| 230 | """ |
| 231 | message = Message() |
| 232 | formatted_date = format_pub_date(tweet_data.get('pub_date','')) |
| 233 | |
| 234 | # Add author and time |
| 235 | if formatted_date and tweet_data.get('author'): |
| 236 | message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n")) |
| 237 | |
| 238 | # Add tweet content |
| 239 | if tweet_data.get('text'): |
| 240 | message.append(MessageSegment.text(f"{tweet_data['text']}\n")) |
| 241 | |
| 242 | # Translate the text and append both original and translated text |
| 243 | if tweet_data["text"]: |
| 244 | translated_text = await translate_text(tweet_data["text"]) |
| 245 | if translated_text: |
| 246 | message.append(MessageSegment.text(f"--------\n{translated_text}\n")) |
| 247 | |
| 248 | if tweet_data.get("images"): |
| 249 | for image_url in tweet_data["images"]: |
| 250 | message.append(MessageSegment.image(image_url)) |
| 251 | |
| 252 | # We will handle video sending separately, so we don't add it to the message here. |
| 253 | |
| 254 | # Check if there's any content to send before returning |
| 255 | if len(message) > 0: |
| 256 | return message |
| 257 | else: |
| 258 | return None |
| 259 | |
| 260 | async def build_message_content_only(tweet_data, user_name): |
| 261 | """ |
| 262 | Builds a message containing only image and video content, without translation. |
| 263 | |
| 264 | Args: |
| 265 | tweet_data: A dictionary containing the tweet text, image URLs, and video URLs. |
| 266 | user_name: The Twitter user ID. |
| 267 | |
| 268 | Returns: |
| 269 | A Message object ready to be sent, or None if there's no media content to send. |
| 270 | """ |
| 271 | message = Message() |
| 272 | |
| 273 | if tweet_data.get("images"): |
| 274 | for image_url in tweet_data["images"]: |
| 275 | message.append(MessageSegment.image(image_url)) |
| 276 | |
| 277 | # Check if there's any content to send before returning |
| 278 | if len(message) > 0: |
| 279 | return message |
| 280 | else: |
| 281 | return None |
| 282 | |
| 283 | async def build_message_original(tweet_data, user_name): |
| 284 | """ |
| 285 | Builds a message from the tweet data without translation, including Twitter user ID. |
| 286 | |
| 287 | Args: |
| 288 | tweet_data: A dictionary containing the tweet text, image URLs, and video URLs. |
| 289 | user_name: The Twitter user ID. |
| 290 | |
| 291 | Returns: |
| 292 | A Message object ready to be sent, or None if there's no content to send. |
| 293 | """ |
| 294 | message = Message() |
| 295 | formatted_date = format_pub_date(tweet_data.get('pub_date','')) |
| 296 | |
| 297 | # Add author and time |
| 298 | if formatted_date and tweet_data.get('author'): |
| 299 | message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n")) |
| 300 | |
| 301 | # Add tweet content without translation |
| 302 | if tweet_data.get('text'): |
| 303 | message.append(MessageSegment.text(f"{tweet_data['text']}\n")) |
| 304 | |
| 305 | if tweet_data.get("images"): |
| 306 | for image_url in tweet_data["images"]: |
| 307 | message.append(MessageSegment.image(image_url)) |
| 308 | |
| 309 | # Check if there's any content to send before returning |
| 310 | if len(message) > 0: |
| 311 | return message |
| 312 | else: |
| 313 | return None |