Zuletzt aktiv 1739418963

crazt's Avatar crazt hat die Gist bearbeitet 1739418963. Zu Änderung gehen

1 file changed, 313 insertions

utils.py(Datei erstellt)

@@ -0,0 +1,313 @@
1 + import httpx
2 + from xml.etree import ElementTree as ET
3 + from nonebot.adapters.onebot.v11 import Message, MessageSegment
4 + import re
5 + from bs4 import BeautifulSoup
6 + from datetime import datetime, timezone, timedelta
7 + from nonebot import get_plugin_config
8 + from .config import Config
9 +
10 + # Load config at the top level of the module
11 + config = get_plugin_config(Config)
12 + OPENAI_API_BASE = config.openai_api_base
13 + OPENAI_API_KEY = config.openai_api_key
14 +
15 + async def fetch_tweet_data(rss_url, original_link):
16 + """
17 + Fetches and parses tweet data from the given RSS URL.
18 + Now it finds the matching item by comparing a portion of guid from the end.
19 +
20 + Args:
21 + rss_url: The RSS feed URL.
22 + original_link: The original Twitter link to match against.
23 +
24 + Returns:
25 + A dictionary containing the tweet content, or None if no matching item found.
26 + """
27 + try:
28 + print(f"Fetching RSS data from: {rss_url}")
29 + async with httpx.AsyncClient() as client:
30 + response = await client.get(rss_url)
31 + response.raise_for_status()
32 +
33 + root = ET.fromstring(response.text)
34 + items = root.findall(".//item")
35 + if not items:
36 + return None
37 +
38 + # Extract user and tweet ID from the original link for comparison
39 + match = re.search(r"twitter\.com/(\w+)/status/(\d+)", original_link)
40 + if not match:
41 + match = re.search(r"x\.com/(\w+)/status/(\d+)", original_link)
42 + if not match:
43 + print(f"Could not extract user/tweet ID from original link: {original_link}")
44 + return None
45 + original_user, original_tweet_id = match.groups()
46 +
47 + # Iterate items in reverse order
48 + for item in reversed(items):
49 + guid = item.find("guid").text
50 + # Extract user and tweet ID from the guid for comparison
51 + guid_match = re.search(r"twitter\.com/(\w+)/status/(\d+)", guid)
52 + if not guid_match:
53 + continue
54 + guid_user, guid_tweet_id = guid_match.groups()
55 +
56 + # Compare user and tweet ID
57 + if guid_user == original_user and guid_tweet_id == original_tweet_id:
58 + content = item.find("description").text
59 + pub_date = item.find("pubDate").text
60 + author = item.find("author").text
61 + text, image_urls = extract_text_and_images(content)
62 + video_urls = extract_video_urls(content)
63 + return {
64 + "text": text,
65 + "images": image_urls,
66 + "videos": video_urls,
67 + "pub_date": pub_date,
68 + "author": author,
69 + }
70 +
71 + return None # No matching item found
72 +
73 + except httpx.HTTPError as e:
74 + print(f"HTTP error fetching RSS: {e}")
75 + return None
76 + except ET.ParseError as e:
77 + print(f"Error parsing RSS XML: {e}")
78 + return None
79 +
80 + async def translate_text(text, target_language="zh-Hans"):
81 + """
82 + Translates the given text to the target language using a compatible OpenAI API.
83 +
84 + Args:
85 + text: The text to translate.
86 + target_language: The target language code (e.g., "zh-Hans" for Simplified Chinese).
87 +
88 + Returns:
89 + The translated text, or None if an error occurred.
90 + """
91 + if not text:
92 + return None
93 +
94 + if not OPENAI_API_KEY:
95 + print("Error: OPENAI_API_KEY is not set. Translation will not work.")
96 + return None
97 +
98 + headers = {
99 + "Content-Type": "application/json",
100 + "Authorization": f"Bearer {OPENAI_API_KEY}"
101 + }
102 +
103 + json_data = {
104 + "model": "llama-3.3-70b-versatile", # 或者其他支持的模型
105 + "messages": [
106 + {
107 + "role": "system",
108 + "content": (
109 + "You are a helpful assistant that translates text from any language into the language specified "
110 + "by the user. Please provide a fluent, natural-sounding translation for the entire input text. "
111 + "Whenever you encounter '干し物' or '干し芋', translate them as '愿望单'. "
112 + "Output only the translated text without any additional explanations."
113 + )
114 + },
115 + {
116 + "role": "user",
117 + "content": f"请将以下文本翻译成 {target_language}:\n\n{text}"
118 + }
119 + ]
120 + }
121 +
122 +
123 + try:
124 + async with httpx.AsyncClient() as client:
125 + response = await client.post(
126 + f"{OPENAI_API_BASE}/v1/chat/completions",
127 + headers=headers,
128 + json=json_data,
129 + timeout=30.0
130 + )
131 + response.raise_for_status()
132 + translated_text = response.json()["choices"][0]["message"]["content"].strip()
133 + return translated_text
134 + except Exception as e:
135 + print(f"Error translating text: {e}")
136 + return None
137 +
138 + def extract_text_and_images(content):
139 + """
140 + Extracts text and image URLs from the raw RSS content.
141 + This assumes the content is HTML.
142 +
143 + Args:
144 + content: The raw HTML content from the RSS feed.
145 +
146 + Returns:
147 + A tuple containing the text content and a list of image URLs.
148 + """
149 + soup = BeautifulSoup(content, "html.parser")
150 +
151 + # Remove unnecessary elements (e.g., links to images and videos)
152 + for a_tag in soup.find_all("a", href=True):
153 + if "https://pbs.twimg.com/media/" in a_tag["href"] or "https://video.twimg.com/" in a_tag["href"]:
154 + a_tag.extract()
155 +
156 + for video_tag in soup.find_all("video", src=True):
157 + video_tag.extract()
158 +
159 + # Extract text
160 + text = soup.get_text(separator="\n", strip=True)
161 +
162 + # Extract image URLs
163 + image_urls = [
164 + img["src"] for img in soup.find_all("img", src=re.compile(r"^https://pbs\.twimg\.com/media/"))
165 + ]
166 +
167 + return text, image_urls
168 +
169 + def extract_video_urls(content):
170 + """
171 + Extracts video URLs from the raw RSS content.
172 + Now it can handle video URLs in both <a> and <video> tags.
173 +
174 + Args:
175 + content: The raw HTML content from the RSS feed.
176 +
177 + Returns:
178 + A list of video URLs.
179 + """
180 + soup = BeautifulSoup(content, "html.parser")
181 + video_urls = []
182 +
183 + # Find video URLs in <a> tags
184 + for a_tag in soup.find_all("a", href=True):
185 + if "https://video.twimg.com/" in a_tag["href"]:
186 + video_urls.append(a_tag["href"])
187 +
188 + # Find video URLs in <video> tags
189 + for video_tag in soup.find_all("video", src=True):
190 + if "https://video.twimg.com/" in video_tag["src"]:
191 + video_urls.append(video_tag["src"])
192 +
193 + return video_urls
194 +
195 + def format_pub_date(pub_date_str):
196 + """
197 + Converts a GMT formatted pubDate string to a simplified East Asia time string.
198 +
199 + Args:
200 + pub_date_str: A string representing the publication date in GMT format.
201 +
202 + Returns:
203 + A string representing the date in a simplified format with East Asia timezone, or None if the input is invalid.
204 + """
205 + try:
206 + # Parse the GMT time string
207 + pub_date_gmt = datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %Z")
208 +
209 + # Convert to East Asia Time (UTC+8)
210 + pub_date_east_asia = pub_date_gmt.replace(tzinfo=timezone.utc).astimezone(timezone(timedelta(hours=8)))
211 +
212 + # Format the date string as desired
213 + formatted_date_str = pub_date_east_asia.strftime("%y-%m-%d %H:%M")
214 +
215 + return formatted_date_str
216 + except ValueError as e:
217 + print(f"Error parsing pubDate: {e}")
218 + return None
219 +
220 + async def build_message(tweet_data, user_name):
221 + """
222 + Builds a message from the tweet data, including translated text and Twitter user ID.
223 +
224 + Args:
225 + tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
226 + user_name: The Twitter user ID.
227 +
228 + Returns:
229 + A Message object ready to be sent, or None if there's no content to send.
230 + """
231 + message = Message()
232 + formatted_date = format_pub_date(tweet_data.get('pub_date',''))
233 +
234 + # Add author and time
235 + if formatted_date and tweet_data.get('author'):
236 + message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n"))
237 +
238 + # Add tweet content
239 + if tweet_data.get('text'):
240 + message.append(MessageSegment.text(f"{tweet_data['text']}\n"))
241 +
242 + # Translate the text and append both original and translated text
243 + if tweet_data["text"]:
244 + translated_text = await translate_text(tweet_data["text"])
245 + if translated_text:
246 + message.append(MessageSegment.text(f"--------\n{translated_text}\n"))
247 +
248 + if tweet_data.get("images"):
249 + for image_url in tweet_data["images"]:
250 + message.append(MessageSegment.image(image_url))
251 +
252 + # We will handle video sending separately, so we don't add it to the message here.
253 +
254 + # Check if there's any content to send before returning
255 + if len(message) > 0:
256 + return message
257 + else:
258 + return None
259 +
260 + async def build_message_content_only(tweet_data, user_name):
261 + """
262 + Builds a message containing only image and video content, without translation.
263 +
264 + Args:
265 + tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
266 + user_name: The Twitter user ID.
267 +
268 + Returns:
269 + A Message object ready to be sent, or None if there's no media content to send.
270 + """
271 + message = Message()
272 +
273 + if tweet_data.get("images"):
274 + for image_url in tweet_data["images"]:
275 + message.append(MessageSegment.image(image_url))
276 +
277 + # Check if there's any content to send before returning
278 + if len(message) > 0:
279 + return message
280 + else:
281 + return None
282 +
283 + async def build_message_original(tweet_data, user_name):
284 + """
285 + Builds a message from the tweet data without translation, including Twitter user ID.
286 +
287 + Args:
288 + tweet_data: A dictionary containing the tweet text, image URLs, and video URLs.
289 + user_name: The Twitter user ID.
290 +
291 + Returns:
292 + A Message object ready to be sent, or None if there's no content to send.
293 + """
294 + message = Message()
295 + formatted_date = format_pub_date(tweet_data.get('pub_date',''))
296 +
297 + # Add author and time
298 + if formatted_date and tweet_data.get('author'):
299 + message.append(MessageSegment.text(f"{tweet_data['author']}@{user_name} 🕒{formatted_date}\n"))
300 +
301 + # Add tweet content without translation
302 + if tweet_data.get('text'):
303 + message.append(MessageSegment.text(f"{tweet_data['text']}\n"))
304 +
305 + if tweet_data.get("images"):
306 + for image_url in tweet_data["images"]:
307 + message.append(MessageSegment.image(image_url))
308 +
309 + # Check if there's any content to send before returning
310 + if len(message) > 0:
311 + return message
312 + else:
313 + return None
Neuer Älter