Skip to content

Commit

Permalink
Merge pull request #93 from vertexclique/dynamic-runners
Browse files Browse the repository at this point in the history
Dynamic runners
  • Loading branch information
vertexclique authored Dec 15, 2023
2 parents d31a402 + ccad842 commit ba44f3d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
4 changes: 2 additions & 2 deletions examples/postgres-sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ async fn postgres_agent(stream: CStream, _ctx: Context<()>) -> Result<()> {
fn main() {
let mut app = Callysto::new();

app.with_name("elasticsearch-app");
app.agent("elastic-agent", app.topic("example"), postgres_agent);
app.with_name("postgres-sink-app");
app.agent("postgres-agent", app.topic("example"), postgres_agent);

app.run();
}
25 changes: 24 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,9 @@ where
self
}

fn build_client_config(&self) -> ClientConfig {
///
/// Builds kafka client config
pub fn build_client_config(&self) -> ClientConfig {
let mut cc = ClientConfig::new();

cc.set("bootstrap.servers", &*self.brokers)
Expand Down Expand Up @@ -552,6 +554,27 @@ where
Ok(())
}

///
/// Dynamic runner for [Agent].
/// Allows user to run a dynamic agent while application is running.
/// This agent is not bookkept, instead user can manage the lifecycle
/// of the application with [nuclei::Task] handler.
pub fn run_agent(agent: Arc<dyn Agent<State>>) -> AsyncTask<()> {
info!("Starting dynamic Agent");

nuclei::spawn(async move {
match agent.start().await {
Ok(dep) => dep.await,
_ => panic!(
"Error occurred on start of dynamic Agent with label: {}.",
agent.label().await
),
}

agent.after_start().await;
})
}

pub fn run(self) {
// Load all background workers
self.background_workers();
Expand Down
26 changes: 25 additions & 1 deletion src/kafka/cconsumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::thread;
use tracing::error;

use crate::kafka::runtime::NucleiRuntime;
use crate::prelude::CStatistics;

pub struct CConsumer {
pub(super) consumer: Arc<BaseConsumer<CConsumerContext>>,
Expand All @@ -39,11 +40,32 @@ pub struct CConsumer {
pin_project! {
#[derive(Clone)]
pub struct CStream {
pub context: Arc<CConsumerContext>,
#[pin]
rx: ArchPadding<Receiver<Option<OwnedMessage>>>
}
}

impl CStream {
///
/// Get consumer context.
pub fn context(&self) -> Arc<CConsumerContext> {
self.context.clone()
}

///
/// Topic name
pub fn topic_name(&self) -> String {
self.context.topic_name.clone()
}

///
/// Consumer statistics
pub fn stats(&self) -> Arc<Option<CStatistics>> {
self.context.get_stats()
}
}

impl Stream for CStream {
type Item = Option<OwnedMessage>;

Expand Down Expand Up @@ -77,6 +99,8 @@ impl CConsumer {
rx: ArchPadding<Receiver<Option<OwnedMessage>>>,
consumer: Arc<BaseConsumer<CConsumerContext>>,
) -> CStream {
let context = consumer.context().clone();

let handle = thread::Builder::new()
.name("cstream-gen".into())
.spawn(move || {
Expand All @@ -93,6 +117,6 @@ impl CConsumer {
}
});

CStream { rx }
CStream { context, rx }
}
}

0 comments on commit ba44f3d

Please sign in to comment.