Skip to content

Commit

Permalink
wrap http client
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Mar 15, 2024
1 parent 7138124 commit 79e6d1a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 74 deletions.
17 changes: 8 additions & 9 deletions horaectl/src/cmd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
use anyhow::Result;
use clap::Subcommand;

use crate::operation::cluster::{
clusters_diagnose, clusters_list, clusters_schedule_get, clusters_schedule_set,
};
use crate::operation::cluster::ClusterOp;

#[derive(Subcommand)]
pub enum ClusterCommand {
Expand Down Expand Up @@ -50,18 +48,19 @@ pub enum ScheduleCommand {
}

pub async fn run(cmd: ClusterCommand) -> Result<()> {
let op = ClusterOp::try_new()?;
match cmd {
ClusterCommand::List => clusters_list().await,
ClusterCommand::Diagnose => clusters_diagnose().await,
ClusterCommand::List => op.list().await,
ClusterCommand::Diagnose => op.diagnose().await,
ClusterCommand::Schedule { cmd } => {
if let Some(cmd) = cmd {
match cmd {
ScheduleCommand::Get => clusters_schedule_get().await,
ScheduleCommand::On => clusters_schedule_set(true).await,
ScheduleCommand::Off => clusters_schedule_set(false).await,
ScheduleCommand::Get => op.get_schedule_status().await,
ScheduleCommand::On => op.update_schedule_status(true).await,
ScheduleCommand::Off => op.update_schedule_status(false).await,
}
} else {
clusters_schedule_get().await
op.get_schedule_status().await
}
}
}
Expand Down
149 changes: 84 additions & 65 deletions horaectl/src/operation/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::time::Duration;

use anyhow::Result;
use prettytable::row;
use reqwest::Client;

use crate::{
operation::{
Expand Down Expand Up @@ -53,76 +56,92 @@ fn schedule_url() -> String {
+ "/enableSchedule"
}

pub async fn clusters_list() -> Result<()> {
let res = reqwest::get(list_url()).await?;
let response: ClusterResponse = res.json().await?;

let mut table = table_writer(&CLUSTERS_LIST_HEADER);
for cluster in response.data {
table.add_row(row![
cluster.id,
cluster.name,
cluster.shard_total.to_string(),
cluster.topology_type,
cluster.procedure_executing_batch_size.to_string(),
format_time_milli(cluster.created_at),
format_time_milli(cluster.modified_at)
]);
pub struct ClusterOp {
http_client: Client,
}

impl ClusterOp {
pub fn try_new() -> Result<Self> {
let hc = Client::builder()
.timeout(Duration::from_secs(30))
.user_agent("horaectl")
.build()?;

Ok(Self { http_client: hc })
}
table.printstd();

Ok(())
}
pub async fn list(&self) -> Result<()> {
let res = self.http_client.get(list_url()).send().await?;
let response: ClusterResponse = res.json().await?;

let mut table = table_writer(&CLUSTERS_LIST_HEADER);
for cluster in response.data {
table.add_row(row![
cluster.id,
cluster.name,
cluster.shard_total.to_string(),
cluster.topology_type,
cluster.procedure_executing_batch_size.to_string(),
format_time_milli(cluster.created_at),
format_time_milli(cluster.modified_at)
]);
}
table.printstd();

pub async fn clusters_diagnose() -> Result<()> {
let res = reqwest::get(diagnose_url()).await?;
let response: DiagnoseShardResponse = res.json().await?;
let mut table = table_writer(&CLUSTERS_DIAGNOSE_HEADER);
table.add_row(row![response
.data
.unregistered_shards
.iter()
.map(|shard_id| shard_id.to_string())
.collect::<Vec<_>>()
.join(", ")]);
for (shard_id, data) in response.data.unready_shards {
table.add_row(row!["", shard_id, data.node_name, data.status]);
Ok(())
}
table.printstd();

Ok(())
}
pub async fn diagnose(&self) -> Result<()> {
let res = self.http_client.get(diagnose_url()).send().await?;
let response: DiagnoseShardResponse = res.json().await?;
let mut table = table_writer(&CLUSTERS_DIAGNOSE_HEADER);
table.add_row(row![response
.data
.unregistered_shards
.iter()
.map(|shard_id| shard_id.to_string())
.collect::<Vec<_>>()
.join(", ")]);
for (shard_id, data) in response.data.unready_shards {
table.add_row(row!["", shard_id, data.node_name, data.status]);
}
table.printstd();

pub async fn clusters_schedule_get() -> Result<()> {
let res = reqwest::get(schedule_url()).await?;
let response: EnableScheduleResponse = res.json().await?;
let mut table = table_writer(&CLUSTERS_ENABLE_SCHEDULE_HEADER);
let row = match response.data {
Some(data) => row![data],
None => row!["topology should in dynamic mode"],
};
table.add_row(row);
table.printstd();

Ok(())
}
Ok(())
}

pub async fn get_schedule_status(&self) -> Result<()> {
let res = self.http_client.get(schedule_url()).send().await?;
let response: EnableScheduleResponse = res.json().await?;
let mut table = table_writer(&CLUSTERS_ENABLE_SCHEDULE_HEADER);
let row = match response.data {
Some(data) => row![data],
None => row!["topology should in dynamic mode"],
};
table.add_row(row);
table.printstd();

Ok(())
}

pub async fn clusters_schedule_set(enable: bool) -> Result<()> {
let request = EnableScheduleRequest { enable };

let res = reqwest::Client::new()
.put(schedule_url())
.json(&request)
.send()
.await?;
let response: EnableScheduleResponse = res.json().await?;
let mut table = table_writer(&CLUSTERS_ENABLE_SCHEDULE_HEADER);
let row = match response.data {
Some(data) => row![data],
None => row!["topology should in dynamic mode"],
};
table.add_row(row);
table.printstd();

Ok(())
pub async fn update_schedule_status(&self, enable: bool) -> Result<()> {
let request = EnableScheduleRequest { enable };

let res = self
.http_client
.put(schedule_url())
.json(&request)
.send()
.await?;
let response: EnableScheduleResponse = res.json().await?;
let mut table = table_writer(&CLUSTERS_ENABLE_SCHEDULE_HEADER);
let row = match response.data {
Some(data) => row![data],
None => row!["topology should in dynamic mode"],
};
table.add_row(row);
table.printstd();

Ok(())
}
}

0 comments on commit 79e6d1a

Please sign in to comment.