forked from MemesRandomland/telegraph_export_bot
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy patharchive.py
More file actions
197 lines (165 loc) · 5.96 KB
/
archive.py
File metadata and controls
197 lines (165 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import dbm
import os
import webpage2telegraph
from html_telegraph_poster import TelegraphPoster
from telegram import MessageEntity, ParseMode
from telegram.ext import Updater, MessageHandler, Filters
from telegram_util import matchKey, log_on_fail, getDisplayUserHtml
import config
def set_proxy():
if config.proxy:
os.environ['http_proxy'] = config.proxy
os.environ['https_proxy'] = config.proxy
os.environ['no_proxy'] = 'api.telegram.org,api.telegra.ph'
if config.jsproxy:
os.environ['no_proxy'] = os.environ['no_proxy'] + ',' + config.jsproxy
if config.siteproxy:
os.environ['no_proxy'] = os.environ['no_proxy'] + ',' + config.siteproxy
if config.jsproxy:
config.jsproxy = 'https://' + config.jsproxy + '/-----'
if config.siteproxy:
config.siteproxy = 'https://' + config.siteproxy + '/'
set_proxy()
tele = Updater(config.api_token, use_context=True)
log_chat = tele.bot.get_chat(config.log_chat)
source_flags = dbm.open('source_flags.db', 'c')
simplify_flags = dbm.open('simplify_flags.db', 'c')
telegraph_tokens = dbm.open('telegraph_tokens.db', 'c')
def get_from(msg):
if msg.from_user:
return msg.from_user.id, msg.from_user.first_name, msg.from_user.username
return msg.chat_id, msg.chat.title, msg.chat.username # from channel
def send_auth_url(msg, p):
r = p.get_account_info(fields=['auth_url'])
msg.reply_text('如果你需要编辑生成的 Telegraph,或者绑定到你的账户以便日后编辑,请在五分钟内点此链接登录:' + r['auth_url'])
def get_telegraph_token(msg):
from_id, name, username = get_from(msg)
fid = str(from_id)
if fid in telegraph_tokens:
p = TelegraphPoster(access_token=telegraph_tokens[fid])
else:
p = TelegraphPoster()
r = p.create_api_token(name, username)
telegraph_tokens[fid] = r['access_token']
send_auth_url(msg, p)
# noinspection PyBroadException
def get_telegraph(msg, url):
from_id, _, _ = get_from(msg)
fid = str(from_id)
if fid not in telegraph_tokens:
get_telegraph_token(msg)
webpage2telegraph.token = telegraph_tokens[fid]
simplify = fid in source_flags
source = fid in source_flags
try:
return webpage2telegraph.transfer(url, source=source, simplify=simplify)
except Exception as e:
if config.jsproxy:
try:
return webpage2telegraph.transfer(config.jsproxy + url, source=False, simplify=simplify)
except:
pass
if config.siteproxy:
urls = url.split(':/', 1)
try:
return webpage2telegraph.transfer(config.siteproxy + urls[0] + urls[1], source=False, simplify=simplify)
except:
pass
raise e
def transfer(msg):
for item in msg.entities:
url = ''
if item['type'] == 'url':
url = msg.text[item['offset']:][:item['length']]
elif item['type'] == 'text_link':
t = msg.text[item['offset']:][:item['length']]
if not matchKey(t, ['source', '原文']):
url = item['url']
else:
continue
if '://' not in url:
url = 'http://' + url
elif not url.startswith('http'):
continue
result = get_telegraph(msg, url)
yield result
if str(msg.chat_id) in source_flags:
msg.chat.send_message('%s\n<a href="%s">原文</a>' % (result, url), parse_mode=ParseMode.HTML)
else:
msg.chat.send_message(result)
# noinspection PyBroadException
@log_on_fail(log_chat)
def archive(update, context):
if update.edited_message or update.edited_channel_post:
return
msg = update.effective_message
if msg.forward_from and msg.forward_from.username == 'CNArchiveBot':
return
try:
process_msg = msg.chat.send_message('正在存档…')
except:
return
error = ''
result = []
try:
result = list(transfer(msg))
except Exception as e:
error = str(e)
try:
msg.chat.send_message(error)
except: # 洪水攻击时会发生异常
pass
raise e
finally:
log = ['%s (%d):' % (getDisplayUserHtml(msg.from_user), msg.from_user.id), msg.text_html]
if error:
log.append('\nError:')
log.append(error)
if result:
log.append('\nResult:')
log.append('\n'.join(result))
log_chat.send_message('\n'.join(log),
parse_mode=ParseMode.HTML,
disable_web_page_preview=True)
process_msg.delete()
def switch_source_flag(msg):
from_id, _, _ = get_from(msg)
fid = str(from_id)
if fid in source_flags:
del source_flags[fid]
msg.reply_text('将隐藏原文链接')
else:
source_flags[fid] = b'1'
msg.reply_text('将展示原文链接')
def switch_simplify_flag(msg):
from_id, _, _ = get_from(msg)
fid = str(from_id)
if fid in simplify_flags:
del simplify_flags[fid]
msg.reply_text('将不再繁简转换')
else:
simplify_flags[fid] = b'1'
msg.reply_text('将进行繁简转换')
with open('help.md') as f:
help_message = f.read()
@log_on_fail(log_chat)
def command(update, context):
msg = update.message
if matchKey(msg.text, ['auth', 'token']):
return get_telegraph_token(msg)
if matchKey(msg.text, ['source']):
return switch_source_flag(msg)
if matchKey(msg.text, ['simplify']):
return switch_simplify_flag(msg)
if msg.chat_id > 0: # from private
msg.reply_text(help_message)
tele.dispatcher.add_handler(MessageHandler(
Filters.text &
(Filters.entity(MessageEntity.URL) | Filters.entity(MessageEntity.TEXT_LINK)),
archive)
)
tele.dispatcher.add_handler(MessageHandler(Filters.command, command))
tele.start_polling()
tele.idle()