Skip to content

Commit

Permalink
refactor: modular
Browse files Browse the repository at this point in the history
  • Loading branch information
itsusinn committed Oct 17, 2021
1 parent 9604c97 commit d826a7c
Showing 1 changed file with 75 additions and 132 deletions.
207 changes: 75 additions & 132 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,145 +1,88 @@
use std::env;
#![allow(incomplete_features)]
#![feature(backtrace,capture_disjoint_fields)]

use std::sync::Arc;
use nats::asynk::Connection;
use teloxide::{prelude::*, Bot};
use std::{
collections::{HashMap, HashSet}
};
use nats::Headers;

use arcstr::ArcStr;
use futures::FutureExt;
use log::{info, warn};
use mesagisto_client::{OptionExt, cache::CACHE, cipher::CIPHER, db::DB, res::RES, server::SERVER};
use teloxide::{Bot, prelude::*};

use bot::TG_BOT;
use config::CONFIG;
use despatch::cmd_or_msg_repl;

#[macro_use]
extern crate educe;

#[macro_use]
extern crate automatic_config;
#[macro_use]
extern crate singleton;
mod bot;
mod command;
mod config;
mod data;
mod message;
mod webhook;
mod despatch;
mod message;
mod net;
mod webhook;

use config::CONFIG;
use data::DATA;
use despatch::cmd_or_msg_repl;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
run().await
fn main() {
std::env::set_var("RUST_BACKTRACE", "1");
std::backtrace::Backtrace::force_capture();
env_logger::builder()
.write_style(env_logger::WriteStyle::Auto)
.filter(None, log::LevelFilter::Error)
.format_timestamp(None)
.filter(Some("telegram_mesaga_fonto"), log::LevelFilter::Trace)
.filter(Some("mesagisto_client"), log::LevelFilter::Trace)
.filter(Some("teloxide"), log::LevelFilter::Info)
.init();
tokio::runtime::Builder::new_multi_thread()
// fixme: how many do we need
.worker_threads(5)
.enable_all()
.build()
.unwrap()
.block_on(async {
run().await.unwrap();
});
}

#[allow(unused_must_use)]
async fn run() -> Result<(), anyhow::Error> {

teloxide::enable_logging!();

if !CONFIG.enabled {
log::info!("Mesagisto-Bot is not enabled and is about to exit the program");
return Ok(());
}

log::info!("Mesagisto-Bot is starting up");

let opts = nats::asynk::Options::new();

log::info!("Connecting to nats server");
let nc = opts
.with_name("telegram client")
.connect(&CONFIG.forwarding.address).await?;

let cid = nc.client_id().to_string();
log::info!("Connected sucessfully,the client id is {}",&cid);
let nats_header = {
let mut inner = HashMap::default();
let entry = inner.entry("cid".to_string()).or_insert_with(HashSet::default);
entry.insert(cid.clone());
Arc::new(Headers { inner })
};

let bot = Bot::with_client(
CONFIG.telegram.token.clone(),
net::client_from_config()
).auto_send();

let clone_header = nats_header.clone();
let clone_cid = Arc::new(cid);
let clone_bot = Arc::new(bot.clone());
let clone_nc = nc.clone();

cmd_or_msg_repl(
bot,
&*CONFIG.telegram.bot_name,
command::answer,
move |cx,msg| {

let clone_header = clone_header.clone();
let clone_cid = clone_cid.clone();
let clone_nc = clone_nc.clone();
let clone_bot = clone_bot.clone();

async move {
if !message::answer_msg(cx.clone(), &msg).await? {
let target = Arc::new(cx.chat_id().to_string());
if CONFIG.target_address_mapper.contains_key(&target) {
let address = CONFIG.target_address_mapper.get(&target).unwrap().clone();
let sender = cx.update.from().unwrap();
let sender_name = if sender.username.is_none() {
sender.full_name().replace(|c: char| !c.is_alphanumeric(),"")
} else {
sender.username.clone().unwrap()
};
let content = format!("{}: {}", sender_name,msg);
clone_nc.publish_with_reply_or_headers(
address.as_str(),
None,
Some(&*clone_header),
content).await.unwrap();
try_create_endpoint(clone_nc,target, address,clone_cid,clone_bot).await;
}
};
respond(())
}
},
).await;

CONFIG.save();
log::info!("Mesagisto Bot is going to shut down");
Ok(())
}

async fn try_create_endpoint(
nc:Connection,
target:Arc<String>,
address:Arc<String>,
cid:Arc<String>,
bot:Arc<AutoSend<Bot>>
){
log::info!("Trying to create sub for {}",*target);
if !DATA.active_endpoint.contains_key(&*target) {
DATA.active_endpoint.insert(target.clone(), true);
log::info!("Creating sub for {}",target);
let sub = nc.subscribe(address.as_str()).await.unwrap();

tokio::spawn( async move {
loop {
let next = sub.next().await;
if next.is_none() { continue; }
let next = next.unwrap();

let headers = next.headers;
if headers.is_none() { continue; }
let headers = headers.unwrap();

let cid_set = headers.get("cid");
if cid_set.is_none() {continue;}
let cid_set = cid_set.unwrap();

if cid_set.contains(&*cid) {continue;}

if let Err(err) = bot.send_message(
target.as_str().parse::<i64>().unwrap(), String::from_utf8_lossy(&next.data)
).await{
log::error!("Teloxide error {}",&err);
}
}
});
}
if !CONFIG.enabled {
warn!("Mesagisto-Bot is not enabled and is about to exit the program.");
warn!("To enable it, please modify the configuration file.");
warn!("Mesagisto-Bot未被启用,即将退出程序。");
warn!("若要启用,请修改配置文件。");
return Ok(());
}
CIPHER.init(&"this is an example key".to_string());
info!("Mesagisto-Bot is starting up");
info!("Mesagisto-Bot正在启动");
DB.init(ArcStr::from("tg").some());
RES.init().await;
RES.resolve_photo_url(|id_pair| {
async {
let file_path = TG_BOT.get_file(id_pair.1.as_str()).await.unwrap().file_path;
Ok(TG_BOT.get_url_by_path(file_path))
}.boxed()
});
SERVER.init(&CONFIG.nats.address).await;
CACHE.init();
let bot = Bot::with_client(CONFIG.telegram.token.clone(), net::client_from_config()).auto_send();

TG_BOT.init(Arc::new(bot));

cmd_or_msg_repl(
&*TG_BOT,
&*CONFIG.telegram.bot_name,
command::answer,
message::handler::answer_msg,
).await;

CONFIG.save();
log::info!("Mesagisto Bot is going to shut down");
Ok(())
}

0 comments on commit d826a7c

Please sign in to comment.