Zuletzt aktiv 1739418963

utils.py Orginalformat
1import httpx
2from xml.etree import ElementTree as ET
3from nonebot.adapters.onebot.v11 import Message, MessageSegment
4import re
5from bs4 import BeautifulSoup
6from datetime import datetime, timezone, timedelta
7from nonebot import get_plugin_config
8from .config import Config
9
10# Load config at the top level of the module
11config = get_plugin_config(Config)
12OPENAI_API_BASE = config.openai_api_base
13OPENAI_API_KEY = config.openai_api_key
14
15async def fetch_tweet_data(rss_url, original_link):
16 """
17 Fetches and parses tweet data from the given RSS URL.
18 Now it finds the matching item by comparing a portion of guid from the end.
19
20 Args:
21 rss_url: The RSS feed URL.
22 original_link: The original Twitter link to match against.
23
24 Returns:
25 A dictionary containing the tweet content, or None if no matching item found.
26 """
27 try:
28 print(f"Fetching RSS data from: {rss_url}")
29 async with httpx.AsyncClient() as client:
30 response = await client.get(rss_url)
31 response.raise_for_status()
32
33 root = ET.fromstring(response.text)
34 items = root.findall(".//item")
35 if not items:
36 return None
37
38 # Extract user and tweet ID from the original link for comparison
39 match = re.search(r"twitter\.com/(\w+)/status/(\d+)", original_link)
40 if not match:
41 match = re.search(r"x\.com/(\w+)/status/(\d+)", original_link)
42 if not match:
43 print(f"Could not extract user/tweet ID from original link: {original_link}")
44 return None
45 original_user, original_tweet_id = match.groups()
46
47 # Iterate items in reverse order
48 for item in reversed(items):
49 guid = item.find("guid").text
50 # Extract user and tweet ID from the guid for comparison
51 guid_match = re.search(r"twitter\.com/(\w+)/status/(\d+)", guid)
52 if not guid_match:
53 continue
54 guid_user, guid_tweet_id = guid_match.groups()
55
56 # Compare user and tweet ID
57 if guid_user == original_user and guid_tweet_id == original_tweet_id:
58 content = item.find("description").text
59 pub_date = item.find("pubDate").text
60 author = item.find("author").text
61 text, image_urls = extract_text_and_images(content)
62 video_urls = extract_video_urls(content)
63 return {
64 "text": text,
65 "images": image_urls,
66 "videos": video_urls,
67 "pub_date": pub_date,
68 "author": author,
69 }
70
71 return None # No matching item found
72
73 except httpx.HTTPError as e:
74 print(f"HTTP error fetching RSS: {e}")
75 return None
76 except ET.ParseError as e:
77 print(f"Error parsing RSS XML: {e}")
78 return None
79
80async def translate_text(text, target_language="zh-Hans"):
81 """
82 Translates the given text to the target language using a compatible OpenAI API.
83
84 Args:
85 text: The text to translate.
86 target_language: The target language code (e.g., "zh-Hans" for Simplified Chinese).
87
88 Returns:
89 The translated text, or None if an error occurred.
90 """
91 if not text:
92 return None
93
94 if not OPENAI_API_KEY:
95 print("Error: OPENAI_API_KEY is not set. Translation will not work.")
96 return None
97
98 headers = {
99 "Content-Type": "application/json",
100 "Authorization": f"Bearer {OPENAI_API_KEY}"
101 }
102
103 json_data = {
104 "model": "llama-3.3-70b-versatile", # 或者其他支持的模型
105 "messages": [
106 {
107 "role": "system",
108 "content": (
109 "You are a helpful assistant that translates text from any language into the language specified "
110 "by the user. Please provide a fluent, natural-sounding translation for the entire input text. "
111 "Whenever you encounter '干し物' or '干し芋', translate them as '愿望单'. "
112 "Output only the translated text without any additional explanations."
113 )
114 },
115 {
116 "role": "user",
117 "content": f"请将以下文本翻译成 {target_language}\n\n{text}"
118 }
119 ]
120 }
121
122
123 try:
124 async with httpx.AsyncClient() as client:
125 response = await client.post(
126 f"{OPENAI_API_BASE}/v1/chat/completions",
127 headers=headers,
128 json=json_data,
129 timeout=30.0
130 )
131 response.raise_for_status()
132 translated_text = response.json()["choices"][0]["message"]["content"].strip()
133 return translated_text
134 except Exception as e:
135 print(f"Error translating text: {e}")
136 return None
137
138def extract_text_and_images(content):
139 """
140 Extracts text and image URLs from the raw RSS content.
141 This assumes the content is HTML.
142
143 Args:
144 content: The raw HTML content from the RSS feed.
145
146 Returns:
147 A tuple containing the text content and a list of image URLs.
148 """
149 soup = BeautifulSoup(content, "html.parser")
150
151 # Remove unnecessary elements (e.g., links to images and videos)
152 for a_tag in soup.find_all("a", href=True):
153 if "https://pbs.twimg.com/media/" in a_tag["href"] or "https://video.twimg.com/" in a_tag["href"]:
154 a_tag.extract()
155
156 for video_tag in soup.find_all("video", src=True):
157 video_tag.extract()
158
159 # Extract text
160 text = soup.get_text(separator="\n", strip=True)
161
162 # Extract image URLs
163 image_urls = [
164 img["src"] for img in soup.find_all("img", src=re.compile(r"^https://pbs\.twimg\.com/media/"))
165 ]
166
167 return text, image_urls
168
169def extract_video_urls(content):
170 """
171 Extracts video URLs from the raw RSS content.
172 Now it can handle video URLs in both <a> and <video> tags.
173
174 Args:
175 content: The raw HTML content from the RSS feed.
176
177 Returns:
178 A list of video URLs.
179 """
180 soup = BeautifulSoup(content, "html.parser")
181 video_urls = []
182
183 # Find video URLs in <a> tags
184 for a_tag in soup.find_all("a", href=True):
185 if "https://video.twimg.com/" in a_tag["href"]:
186 video_urls.append(a_tag["href"])
187
188 # Find video URLs in <video> tags
189 for video_tag in soup.find_all("video", src=True):
190 if "https://video.twimg.com/" in video_tag["src"]:
191 video_urls.append(video_tag["src"])
192
193 return video_urls
194
195def format_pub_date(pub_date_str):
196 """
197 Converts a GMT formatted pubDate string to a simplified East Asia time string.
198
199 Args:
200 pub_date_str: A string representing the publication date in GMT format.
201
202 Returns:
203 A string representing the date in a simplified format with East Asia timezone, or None if the input is invalid.
204 """
205 try:
206 # Parse the GMT time string
207 pub_date_gmt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z")
208
209 # Convert to East Asia Time (UTC+8)
210 pub_date_east_asia = pub_date_gmt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
211
212 # Format the date string as desired
213 formatted_date_str = pub_date_east_asia.strftime("%y-%m-%d %H:%M")
214
215 return formatted_date_str
216 except ValueError as e:
217 print(f"Error parsing pubDate: {e}")
218 return None
219
220async def build_message(tweet_data, user_name):
221 """
222 Builds a message from the tweet data, including translated text and Twitter user ID.
223
224 Args:
225 tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
226 user_name: The Twitter user ID.
227
228 Returns:
229 A Message object ready to be sent, or None if there's no content to send.
230 """
231 message = Message()
232 formatted_date = format_pub_date(tweet_data.get('pub_date',''))
233
234 # Add author and time
235 if formatted_date and tweet_data.get('author'):
236 message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n"))
237
238 # Add tweet content
239 if tweet_data.get('text'):
240 message.append(MessageSegment.text(f"{tweet_data['text']}\n"))
241
242 # Translate the text and append both original and translated text
243 if tweet_data["text"]:
244 translated_text = await translate_text(tweet_data["text"])
245 if translated_text:
246 message.append(MessageSegment.text(f"--------\n{translated_text}\n"))
247
248 if tweet_data.get("images"):
249 for image_url in tweet_data["images"]:
250 message.append(MessageSegment.image(image_url))
251
252 # We will handle video sending separately, so we don't add it to the message here.
253
254 # Check if there's any content to send before returning
255 if len(message) > 0:
256 return message
257 else:
258 return None
259
260async def build_message_content_only(tweet_data, user_name):
261 """
262 Builds a message containing only image and video content, without translation.
263
264 Args:
265 tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
266 user_name: The Twitter user ID.
267
268 Returns:
269 A Message object ready to be sent, or None if there's no media content to send.
270 """
271 message = Message()
272
273 if tweet_data.get("images"):
274 for image_url in tweet_data["images"]:
275 message.append(MessageSegment.image(image_url))
276
277 # Check if there's any content to send before returning
278 if len(message) > 0:
279 return message
280 else:
281 return None
282
283async def build_message_original(tweet_data, user_name):
284 """
285 Builds a message from the tweet data without translation, including Twitter user ID.
286
287 Args:
288 tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
289 user_name: The Twitter user ID.
290
291 Returns:
292 A Message object ready to be sent, or None if there's no content to send.
293 """
294 message = Message()
295 formatted_date = format_pub_date(tweet_data.get('pub_date',''))
296
297 # Add author and time
298 if formatted_date and tweet_data.get('author'):
299 message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n"))
300
301 # Add tweet content without translation
302 if tweet_data.get('text'):
303 message.append(MessageSegment.text(f"{tweet_data['text']}\n"))
304
305 if tweet_data.get("images"):
306 for image_url in tweet_data["images"]:
307 message.append(MessageSegment.image(image_url))
308
309 # Check if there's any content to send before returning
310 if len(message) > 0:
311 return message
312 else:
313 return None