From 3962ba22c79aecd5b5c1fda2f886a5958c3bf249 Mon Sep 17 00:00:00 2001 From: itsusinn Date: Sun, 6 Feb 2022 00:39:12 +0800 Subject: [PATCH] refactor: change teloxide to dev branch cancel support of webhook --- Cargo.toml | 19 ++--- src/bot.rs | 8 +-- src/command.rs | 84 ++++++++++++----------- src/config.rs | 2 +- src/despatch.rs | 118 -------------------------------- src/dispatch.rs | 30 ++++++++ src/main.rs | 24 ++----- src/message/handler.rs | 7 -- src/message/handlers/receive.rs | 7 +- src/message/handlers/send.rs | 60 +++++----------- src/message/mod.rs | 5 -- src/net.rs | 1 - src/webhook.rs | 79 --------------------- 13 files changed, 112 insertions(+), 332 deletions(-) delete mode 100644 src/despatch.rs create mode 100644 src/dispatch.rs delete mode 100644 src/message/handler.rs delete mode 100644 src/webhook.rs diff --git a/Cargo.toml b/Cargo.toml index cb8b5d0..8d6ac4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,8 @@ edition = "2021" [dependencies] # bot -teloxide ={ version = "0.5.3",default-features = false,features = ["frunk", "macros", "auto-send","ctrlc_handler","rustls"] } -teloxide-core = { version = "0.3.3",default-features = false, features = ["rustls"] } +teloxide ={ rev = "25f8634", default-features = false,features = ["frunk", "macros", "auto-send","ctrlc_handler","rustls","dispatching2"],git = "https://github.com/teloxide/teloxide.git"} +teloxide-core = { version = "0.4.0", default-features = false, features = ["rustls","cache_me"] } teloxide-macros = "0.4.1" # logging @@ -26,7 +26,6 @@ async-std = "1.10.0" tokio = { version = "1.15.0",default-features = false,features = ["rt-multi-thread", "macros","signal"] } tokio-util = "0.6.8" tokio-stream = "0.1.7" -smol = "1.2.5" futures = "0.3.17" # error handling @@ -46,7 +45,6 @@ once_cell = "1.8.0" educe = "0.4.18" num-bigint = "0.4.2" arcstr = { version = "1.1.1", features = ["serde"] } - # webhook warp = "0.3.1" @@ -58,20 +56,11 @@ sled = "0.34.7" # mesagisto nats = "0.17.0" -# mesagisto-client = { rev = "414e33", git = "https://github.com/MeowCat-Studio/mesagisto-client-rs.git"} -mesagisto-client = { path = "../../mesagisto-client/rust" } +mesagisto-client = { rev = "414e33", git = "https://github.com/MeowCat-Studio/mesagisto-client-rs.git"} +# mesagisto-client = { path = "../../mesagisto-client/rust" } automatic-config = { git = "https://github.com/Itsusinn/automatic-config-rs.git" } singleton = { git = "https://github.com/Itsusinn/singleton-rs.git", rev = "bee321c" } # singleton = { path = "/home/itsusinn/Workspace/Code/singleton-rs" } either = "1.6.1" # hex = "0.4" base64-url = "1.4.10" - -[dependencies.pyo3] -version = "0.15.1" -optional = true -features = ["auto-initialize"] - -[features] -# default = ["yinglish"] -yinglish = ["pyo3"] diff --git a/src/bot.rs b/src/bot.rs index 781b6b3..feda28a 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -1,4 +1,4 @@ -use std::{ops::Deref, sync::Arc}; +use std::ops::Deref; use crate::config::CONFIG; use arcstr::ArcStr; @@ -7,10 +7,10 @@ use teloxide::{adaptors::AutoSend, prelude::Requester, types::File as TgFile, Bo #[derive(Singleton, Default)] pub struct TgBot { - inner: LateInit>>, + inner: LateInit>, } impl TgBot { - pub fn init(&self, bot: Arc>) { + pub fn init(&self, bot: AutoSend) { self.inner.init(bot) } // fixme use this-error @@ -36,7 +36,7 @@ impl TgBot { } } impl Deref for TgBot { - type Target = Arc>; + type Target = AutoSend; fn deref(&self) -> &Self::Target { &self.inner } diff --git a/src/command.rs b/src/command.rs index fcc776d..a350210 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,54 +1,56 @@ use crate::config::CONFIG; -use std::sync::Arc; -use teloxide::dispatching::UpdateWithCx; -use teloxide::prelude::*; + +use teloxide::prelude2::*; use teloxide::utils::command::BotCommand; -#[derive(BotCommand)] +#[derive(BotCommand, Clone)] #[command(rename = "lowercase", description = "信使Bot支持以下命令")] pub enum Command { #[command(description = "显示命令帮助")] Help, - #[command(description = "启用消息转发")] - Enable, - #[command(description = "禁用消息转发")] - Disable, + // #[command(description = "启用消息转发")] + // Enable, + // #[command(description = "禁用消息转发")] + // Disable, #[command(description = "设置当前Group的转发地址", parse_with = "split")] SetAddress { address: String }, } - -pub async fn answer( - cx: Arc, Message>>, - command: Command, -) -> anyhow::Result<()> { - match command { - Command::Help => { - cx.answer(Command::descriptions()).await?; - } - Command::Enable => { - cx.answer("Mesagisto信使已启用").await?; - } - Command::Disable => { - cx.answer("Mesagisto信使已禁用").await?; - } - Command::SetAddress { address } => { - let sender_id = cx.update.from().unwrap().id; - let chat_id = cx.chat_id(); - let admins = cx.requester.get_chat_administrators(chat_id).await?; - let mut is_admin = false; - for admin in admins { - if admin.user.id == sender_id { - is_admin = true; - break; - } +impl Command { + pub async fn answer(msg: Message, bot: AutoSend, cmd: Command) -> anyhow::Result<()> { + match cmd { + Command::Help => { + let chat_id = msg.chat_id(); + bot.send_message(chat_id, Command::descriptions()).await?; } - if is_admin { - CONFIG.target_address_mapper.insert(chat_id, address.into()); - cx.answer("成功设置当前Group的信使地址").await?; - } else { - cx.answer("权限不足,拒绝设置信使频道").await?; + // Command::Enable => { + // cx.answer("Mesagisto信使已启用").await?; + // } + // Command::Disable => { + // cx.answer("Mesagisto信使已禁用").await?; + // } + Command::SetAddress { address } => { + let sender_id = msg.from().unwrap().id; + let chat_id = msg.chat_id(); + let admins = bot.get_chat_administrators(chat_id).await?; + let mut is_admin = false; + for admin in admins { + if admin.user.id == sender_id { + is_admin = true; + break; + } + } + if is_admin { + CONFIG.target_address_mapper.insert(chat_id, address.into()); + bot + .send_message(chat_id, "成功设置当前Group的信使地址") + .await?; + } else { + bot + .send_message(chat_id, "权限不足,拒绝设置信使频道") + .await?; + } } - } - }; - Ok(()) + }; + Ok(()) + } } diff --git a/src/config.rs b/src/config.rs index 85ceab2..84563d9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -78,5 +78,5 @@ pub struct WebhookConfig { #[basic_derive] pub struct FormatConfig { - pub msg: ArcStr + pub msg: ArcStr, } diff --git a/src/despatch.rs b/src/despatch.rs deleted file mode 100644 index 2a7abeb..0000000 --- a/src/despatch.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::config::CONFIG; -use crate::webhook::webhook; -use std::fmt::Debug; -use std::future::Future; -use std::sync::Arc; -use teloxide::dispatching::update_listeners::UpdateListener; -use teloxide::dispatching::{update_listeners, Dispatcher, DispatcherHandlerRx, UpdateWithCx}; -use teloxide::error_handlers::{ErrorHandler, OnError}; -use teloxide::prelude::*; -use teloxide::utils::command::BotCommand; -use teloxide_core::types::Message; -use tokio_stream::wrappers::UnboundedReceiverStream; - -pub async fn cmd_or_msg_repl( - bot: &AutoSend, - bot_name: N, - cmd_handler: CH, - msg_handler: MH, -) where - Cmd: BotCommand + Send + 'static, - - CH: Fn(Arc, Message>>, Cmd) -> FutC + Send + Sync + 'static, - FutC: Future> + Send + 'static, - - MH: Fn(Arc, Message>>) -> FutM + Send + Sync + 'static, - FutM: Future> + Send + 'static, - - N: Into + Send + Clone + 'static, -{ - if CONFIG.telegram.webhook.enable { - let listener = webhook(&bot).await; - cmd_or_msg_repl_with_listener(&bot, bot_name, cmd_handler, msg_handler, listener).await; - } else { - bot - .delete_webhook() - .await - .expect("Failed to delete previous webhook"); - let listener = update_listeners::polling_default(bot.clone()).await; - cmd_or_msg_repl_with_listener(&bot, bot_name, cmd_handler, msg_handler, listener).await; - }; -} - -pub async fn cmd_or_msg_repl_with_listener( - bot: &AutoSend, - bot_name: N, - cmd_handler: CH, - msg_handler: MH, - listener: L, -) where - Cmd: BotCommand + Send + 'static, - - CH: Fn(Arc, Message>>, Cmd) -> FutC + Send + Sync + 'static, - FutC: Future> + Send + 'static, - - MH: Fn(Arc, Message>>) -> FutM + Send + Sync + 'static, - FutM: Future> + Send + 'static, - - N: Into + Send + Clone + 'static, - - L: UpdateListener + Send, - ListenerE: Debug + Send, -{ - let cmd_handler = Arc::new(cmd_handler); - let msg_handler = Arc::new(msg_handler); - - log::info!("Mesagisto-Bot启动成功"); - Dispatcher::new(bot.clone()) - .messages_handler(move |rx: DispatcherHandlerRx, Message>| { - UnboundedReceiverStream::new(rx).for_each_concurrent(None, move |cx| { - let msg_handler = Arc::clone(&msg_handler); - let cmd_handler = Arc::clone(&cmd_handler); - let clone_bot_name = bot_name.clone(); - let cx = Arc::new(cx); - async move { - if let Some(text_content) = cx.clone().update.text() { - let parse = Cmd::parse(&*text_content, clone_bot_name); - match parse { - Ok(cmd) => cmd_handler(cx.to_owned(), cmd).await.log_on_error().await, - Err(_) => msg_handler(cx).await.log_on_error().await, - } - } else { - msg_handler(cx).await.log_on_error().await - }; - } - }) - }) - .setup_ctrlc_handler() - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await -} - -struct TracingErrorHandler { - text: String, -} -impl TracingErrorHandler { - #[allow(unused)] - pub fn with_custom_text(text: T) -> Arc - where - T: Into, - { - Arc::new(Self { text: text.into() }) - } -} -use futures::future::BoxFuture; -impl ErrorHandler for TracingErrorHandler { - fn handle_error(self: Arc, error: anyhow::Error) -> BoxFuture<'static, ()> { - log::error!( - "{}:{}, \n Backtrace {}", - self.text, - error, - error.backtrace() - ); - Box::pin(async {}) - } -} diff --git a/src/dispatch.rs b/src/dispatch.rs new file mode 100644 index 0000000..9dd2e49 --- /dev/null +++ b/src/dispatch.rs @@ -0,0 +1,30 @@ +use teloxide::prelude2::*; +use tracing::info; + +use crate::{command::Command, message::handlers}; + +pub async fn start(bot: &AutoSend) { + let handler = Update::filter_message() + .branch( + dptree::entry() + .filter_command::() + .endpoint(Command::answer), + ) + .branch( + dptree::filter(|msg: Message| msg.chat.is_group() || msg.chat.is_supergroup()) + .endpoint(handlers::send::answer_common), + ); + info!("Mesagisto-Bot启动成功"); + Dispatcher::builder(bot.clone(), handler) + .default_handler(|upd| async move { + log::warn!("Unhandled update: {:?}", upd); + }) + // If the dispatcher fails for some reason, execute this handler. + .error_handler(LoggingErrorHandler::with_custom_text( + "An error has occurred in the dispatcher", + )) + .build() + .setup_ctrlc_handler() + .dispatch() + .await; +} diff --git a/src/main.rs b/src/main.rs index 454dd2f..c36cc0d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,12 +3,10 @@ use futures::FutureExt; use mesagisto_client::MesagistoConfig; -use std::sync::Arc; -use teloxide::{prelude::*, Bot}; +use teloxide::{prelude2::*, Bot}; use bot::TG_BOT; use config::CONFIG; -use despatch::cmd_or_msg_repl; #[macro_use] extern crate educe; @@ -19,11 +17,10 @@ extern crate singleton; mod bot; mod command; mod config; -mod despatch; +mod dispatch; pub mod ext; mod message; mod net; -mod webhook; #[tokio::main] async fn main() { @@ -32,9 +29,9 @@ async fn main() { #[allow(unused_must_use)] async fn run() -> Result<(), anyhow::Error> { let env = tracing_subscriber::EnvFilter::from("warn") - .add_directive("teloxide=info".parse()?) - .add_directive("telegram_message_source=info".parse()?) - .add_directive("mesagisto_client=info".parse()?); + .add_directive("teloxide=info".parse()?) + .add_directive("telegram_message_source=info".parse()?) + .add_directive("mesagisto_client=info".parse()?); tracing_subscriber::fmt().with_env_filter(env).init(); if !CONFIG.enable { @@ -71,15 +68,8 @@ async fn run() -> Result<(), anyhow::Error> { let bot = Bot::with_client(CONFIG.telegram.token.clone(), net::client_from_config()).auto_send(); - TG_BOT.init(Arc::new(bot)); - - cmd_or_msg_repl( - &*TG_BOT, - &*CONFIG.telegram.bot_name, - command::answer, - message::handler::answer_msg, - ) - .await; + TG_BOT.init(bot); + dispatch::start(&TG_BOT).await; CONFIG.save(); log::info!("Mesagisto Bot is going to shut down"); diff --git a/src/message/handler.rs b/src/message/handler.rs deleted file mode 100644 index f5bb229..0000000 --- a/src/message/handler.rs +++ /dev/null @@ -1,7 +0,0 @@ -use std::sync::Arc; -use teloxide::prelude::*; - -use super::handlers::send::answer_common; -pub async fn answer_msg(cx: Arc, Message>>) -> anyhow::Result<()> { - answer_common(cx).await -} diff --git a/src/message/handlers/receive.rs b/src/message/handlers/receive.rs index e24d8f9..95d374c 100644 --- a/src/message/handlers/receive.rs +++ b/src/message/handlers/receive.rs @@ -13,7 +13,10 @@ use teloxide::prelude::Requester; use teloxide::types::InputFile; -pub async fn receive_from_server(message: nats::asynk::Message, target: Vec) -> anyhow::Result<()> { +pub async fn receive_from_server( + message: nats::asynk::Message, + target: Vec, +) -> anyhow::Result<()> { let target = i64::from_be_bytes(target.try_into().unwrap()); log::trace!("接收到来自目标{}的消息", target); let packet = Packet::from_cbor(&message.data)?; @@ -62,7 +65,7 @@ pub async fn handle_receive_message(mut message: Message, target: i64) -> anyhow .send_message(target, format!("{}:", sender_name)) .await?; DB.put_msg_id_ir_2(&target, &receipt.id, &message.id)?; - let receipt = TG_BOT.send_photo(target, InputFile::File(path)).await?; + let receipt = TG_BOT.send_photo(target, InputFile::file(path)).await?; DB.put_msg_id_1(&target, &message.id, &receipt.id)?; } } diff --git a/src/message/handlers/send.rs b/src/message/handlers/send.rs index b4604f7..93a5456 100644 --- a/src/message/handlers/send.rs +++ b/src/message/handlers/send.rs @@ -2,47 +2,22 @@ use crate::bot::TG_BOT; use crate::config::CONFIG; use crate::ext::db::DbExt; use crate::message::handlers::receive::receive_from_server; -use crate::message::Cx; + use mesagisto_client::data::message::{MessageType, Profile}; use mesagisto_client::data::{message, Packet}; use mesagisto_client::db::DB; use mesagisto_client::res::RES; use mesagisto_client::server::SERVER; use mesagisto_client::EitherExt; -use std::sync::Arc; -use teloxide::prelude::*; - -#[cfg(feature = "yinglish")] -pub fn yinglish(text: String) -> String { - use pyo3::{types::PyModule, Python}; - - Python::with_gil(|py| { - let yinglish = PyModule::import(py, "yinglish").unwrap(); - let res: String = yinglish - .call_method1("chs2yin", (text,)) - .unwrap() - .extract() - .unwrap(); - res - }) -} - -pub async fn answer_common(cx: Arc) -> anyhow::Result<()> { - let udp = &cx.update; - #[cfg(feature = "yinglish")] - if text.starts_with("!") { - let mut content = text.clone().to_string(); - content.remove(0); - let reply_content = yinglish(content); - cx.reply_to(reply_content).await?; - } +use teloxide::prelude2::*; - let target = cx.chat_id(); +pub async fn answer_common(msg: Message, bot: AutoSend) -> anyhow::Result<()> { + let target = msg.chat_id(); if !CONFIG.target_address_mapper.contains_key(&target) { return Ok(()); } let address = CONFIG.target_address_mapper.get(&target).unwrap().clone(); - let sender = match cx.update.from() { + let sender = match msg.from() { Some(v) => v, //fixme None => return Ok(()), @@ -51,25 +26,21 @@ pub async fn answer_common(cx: Arc) -> anyhow::Result<()> { let profile = Profile { id: sender.id.to_be_bytes().into(), username: sender.username.clone(), - nick: Some( - sender - .full_name() - .replace(|c: char| !c.is_alphanumeric(), ""), - ), + nick: Some(sender.full_name()), }; let mut chain = Vec::::new(); - if let Some(text) = udp.text() { + if let Some(text) = msg.text() { chain.push(MessageType::Text { content: text.to_string(), }); - } else if let Some(image) = udp.photo() { + } else if let Some(image) = msg.photo() { let photo = image.last().unwrap(); let file_id: Vec = photo.file_id.as_bytes().to_vec(); let uid: Vec = photo.file_unique_id.as_bytes().to_vec(); RES.put_image_id(&uid, file_id.clone()); TG_BOT.file(&uid, &file_id).await?; chain.push(MessageType::Image { id: uid, url: None }) - } else if let Some(sticker) = udp.sticker() { + } else if let Some(sticker) = msg.sticker() { let file_id: Vec = sticker.file_id.as_bytes().to_vec(); let uid: Vec = sticker.file_unique_id.as_bytes().to_vec(); RES.put_image_id(&uid, file_id.clone()); @@ -77,24 +48,29 @@ pub async fn answer_common(cx: Arc) -> anyhow::Result<()> { chain.push(MessageType::Image { id: uid, url: None }) } - let reply = match udp.reply_to_message() { + let reply = match msg.reply_to_message() { Some(v) => { let local_id = v.id.to_be_bytes().to_vec(); DB.get_msg_id_2(&target, &local_id).unwrap_or(None) } None => None, }; - DB.put_msg_id_0(&udp.chat_id(), &udp.id, &udp.id)?; + DB.put_msg_id_0(&msg.chat_id(), &msg.id, &msg.id)?; let message = message::Message { profile, - id: udp.id.to_be_bytes().to_vec(), + id: msg.id.to_be_bytes().to_vec(), chain, reply, }; let packet = Packet::from(message.tl())?; SERVER - .send_and_receive(target.to_be_bytes().to_vec(), address, packet, receive_from_server) + .send_and_receive( + target.to_be_bytes().to_vec(), + address, + packet, + receive_from_server, + ) .await?; Ok(()) } diff --git a/src/message/mod.rs b/src/message/mod.rs index 4f820d8..c3d4495 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,6 +1 @@ -use teloxide::{adaptors::AutoSend, prelude::UpdateWithCx, types::Message, Bot}; - -pub mod handler; pub mod handlers; - -type Cx = UpdateWithCx, Message>; diff --git a/src/net.rs b/src/net.rs index 99359ee..d2d2c41 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,7 +1,6 @@ use std::time::Duration; fn default_reqwest_settings() -> reqwest::ClientBuilder { - let connect_timeout = Duration::from_secs(10); let timeout = connect_timeout + Duration::from_secs(24); diff --git a/src/webhook.rs b/src/webhook.rs deleted file mode 100644 index 82bd1e3..0000000 --- a/src/webhook.rs +++ /dev/null @@ -1,79 +0,0 @@ -use log::info; -use reqwest::{StatusCode, Url}; -use teloxide::{ - dispatching::{ - stop_token::AsyncStopToken, - update_listeners::{self, StatefulListener}, - }, - prelude::*, - types::Update, -}; - -use std::{convert::Infallible, env, net::SocketAddr}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; -use warp::Filter; - -use crate::config::CONFIG; - -pub async fn webhook(bot: &AutoSend) -> impl update_listeners::UpdateListener { - // Heroku auto defines a port value - let port: u16 = if CONFIG.telegram.webhook.heroku { - env::var("PORT") - .expect("PORT env variable missing") - .parse() - .expect("PORT value to be integer") - } else { - CONFIG.telegram.webhook.port - }; - info!("The port of webhook is {}", &port); - // Heroku host example .: "heroku-ping-pong-bot.herokuapp.com" - let host = CONFIG.telegram.webhook.host.to_string(); - let path = format!("bot{}", CONFIG.telegram.token); - let url = Url::parse(&format!("https://{}/{}", host, path)).unwrap(); - - info!("Webhook is being setup"); - bot.set_webhook(url).await.expect("Cannot setup a webhook"); - - let (tx, rx) = mpsc::unbounded_channel(); - - let server = warp::post() - .and(warp::path(path)) - .and(warp::body::json()) - .map(move |json: serde_json::Value| { - if let Ok(update) = Update::try_parse(&json) { - tx.send(Ok(update)) - .expect("Cannot send an incoming update from the webhook") - } - - StatusCode::OK - }) - .recover(handle_rejection); - - let (stop_token, stop_flag) = AsyncStopToken::new_pair(); - - let addr = format!("0.0.0.0:{}", port).parse::().unwrap(); - let server = warp::serve(server); - let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag); - - // You might want to use serve.key_path/serve.cert_path methods here to - // setup a self-signed TLS certificate. - - tokio::spawn(fut); - let stream = UnboundedReceiverStream::new(rx); - - fn streamf(state: &mut (S, T)) -> &mut S { - &mut state.0 - } - - StatefulListener::new( - (stream, stop_token), - streamf, - |state: &mut (_, AsyncStopToken)| state.1.clone(), - ) -} - -async fn handle_rejection(error: warp::Rejection) -> Result { - log::error!("Cannot process the request due to: {:?}", error); - Ok(StatusCode::INTERNAL_SERVER_ERROR) -}