Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

feat: add opentelemetry to report tensor ready order #42

Merged
merged 37 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
87af336
feat: bagua core cbind
shjwudp Jun 19, 2021
9a5ace2
Merge branch 'master' of https://github.com/shjwudp/bagua-core
shjwudp Jun 21, 2021
aa643ec
Merge branch 'master' of https://github.com/shjwudp/bagua-core
shjwudp Jul 13, 2021
c5caeb0
Merge branch 'master' of https://github.com/shjwudp/bagua-core
shjwudp Jul 21, 2021
17a4447
feat: telemetry trace on tensor mark communication
shjwudp Jul 26, 2021
26a851c
..
shjwudp Jul 26, 2021
00b9dea
Format Rust code using rustfmt
github-actions[bot] Jul 26, 2021
d8a4b9f
..
shjwudp Jul 26, 2021
2dff063
Merge branch 'telemetry' of https://github.com/shjwudp/bagua-core int…
shjwudp Jul 26, 2021
203c0cd
Format Rust code using rustfmt
github-actions[bot] Jul 26, 2021
cb9f24e
..
shjwudp Jul 27, 2021
1eef37e
..
shjwudp Jul 27, 2021
569cfd5
..
shjwudp Jul 27, 2021
b05fcba
Format Rust code using rustfmt
github-actions[bot] Jul 27, 2021
b90d4ee
..
shjwudp Jul 27, 2021
770a685
..
shjwudp Jul 27, 2021
8b53bf8
..
shjwudp Jul 27, 2021
b7c7791
..
shjwudp Jul 27, 2021
f088848
..
shjwudp Jul 27, 2021
b967fcc
Format Rust code using rustfmt
github-actions[bot] Jul 27, 2021
305ceca
..
shjwudp Jul 27, 2021
1b39a54
Merge branch 'telemetry' of https://github.com/shjwudp/bagua-core int…
shjwudp Jul 27, 2021
d0eb17b
Format Rust code using rustfmt
github-actions[bot] Jul 27, 2021
f21e219
..
shjwudp Jul 27, 2021
a7b63f0
Merge branch 'telemetry' of https://github.com/shjwudp/bagua-core int…
shjwudp Jul 27, 2021
0aa4082
..
shjwudp Jul 27, 2021
443681f
..
shjwudp Jul 27, 2021
6b2da25
..
shjwudp Jul 27, 2021
5a80dca
..
shjwudp Jul 28, 2021
8ee12f7
..
shjwudp Jul 28, 2021
9a93c2a
Format Rust code using rustfmt
github-actions[bot] Jul 28, 2021
7f5d795
..
shjwudp Jul 28, 2021
c31e088
Merge branch 'telemetry' of https://github.com/shjwudp/bagua-core int…
shjwudp Jul 28, 2021
26e621f
Merge branch 'master' into telemetry
shjwudp Jul 28, 2021
c0c2363
..
shjwudp Jul 28, 2021
50d64dc
Merge branch 'telemetry' of https://github.com/shjwudp/bagua-core int…
shjwudp Jul 28, 2021
8dda2fa
..
shjwudp Jul 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,025 changes: 1,000 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
workspace = {members = ["bagua-core-internal", "bagua-core-py", "bagua-core-c"],exclude = []}
workspace = { members = [
"bagua-core-internal",
"bagua-core-py",
"bagua-opentelemetry",
], exclude = [] }
2 changes: 2 additions & 0 deletions bagua-core-internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ ureq = "2.1"
num-traits = "0.2"
num-derive = "0.3"
display_utils = "0.4.0"
opentelemetry = { version = "0.15", features = ["serialize", "metrics"] }
bagua-opentelemetry = { path = "../bagua-opentelemetry" }

[dependencies.pyo3]
version = "0.14.1"
Expand Down
7 changes: 0 additions & 7 deletions bagua-core-internal/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::comm_ops::python_ffi_op::PythonFFIOp;
use crate::comm_ops::CommOpTrait;
use crate::communicators::{BaguaCommunicator, BaguaSingleCommunicator};
use crate::resource_pool::{CudaMemory, CUDA_DEVICE_MEMORY_POOL};
use crate::telemetry::TELEMETRY;
use crate::torch_ffi::root::c10::{DeviceType, StorageImpl, TensorImpl};
use crate::{kernels, BaguaCoreError};
use itertools::Itertools;
Expand Down Expand Up @@ -769,12 +768,6 @@ impl BaguaTensor {
if cuda_event_ptr == 0 {
tracing::info!("mark comm ready with an event 0, ignoring event");
}
match TELEMETRY.as_ref() {
None => {}
Some(ref x) => {
x.lock().new_tensor_ready(self.inner.read().name.as_str());
}
}
let mut guard = self.inner.write();
guard.ready_for_comm = true;
guard.ready_cuda_event_ptr = cuda_event_ptr;
Expand Down
51 changes: 26 additions & 25 deletions bagua-core-internal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ pub mod datatypes;
pub mod events;
pub mod kernels;
pub mod resource_pool;
pub mod telemetry;
mod torch_ffi;

use crate::comm_ops::CommOpTrait;
use crate::telemetry::{SCHEDULED_THREAD_POOL, TELEMETRY};
use bagua_opentelemetry;
use cpp::cpp;
use datatypes::{BaguaBucket, BaguaTensor};
use events::BaguaEventChannel;
use flume::RecvTimeoutError;
use hashbrown::{HashMap, HashSet};
use opentelemetry::{
global,
trace::{Span, Tracer},
KeyValue,
};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -167,6 +171,8 @@ impl BaguaCommBackend {
}
}

static TELEMETRY_INIT_ONCE: std::sync::Once = std::sync::Once::new();

impl BaguaCommBackend {
pub fn new(schedule_channel_cap: usize, device_id: usize) -> BaguaCommBackend {
unsafe {
Expand All @@ -181,6 +187,20 @@ impl BaguaCommBackend {
let (monitor_op_finish_channel_sender, monitor_op_finish_channel_receiver) =
flume::unbounded();

TELEMETRY_INIT_ONCE.call_once(|| {
match std::env::var("AUTO_TUNE_SERVER_ADDR") {
Ok(server_addr) => {
tracing::info!("detected auto tuning server, connecting");
bagua_opentelemetry::init_tracer(&server_addr);
}
Err(_) => {
tracing::warn!(
"auto tuning server not detected, may experience degraded performance"
);
}
};
});

BaguaCommBackend {
ordered_buckets: Default::default(),
bucket_mapping: Default::default(),
Expand Down Expand Up @@ -282,6 +302,10 @@ impl BaguaCommBackend {
tensor: &BaguaTensor,
ready_cuda_event_ptr: u64,
) -> Result<(), BaguaCoreError> {
let tracer = global::tracer("bagua-core");
let mut span = tracer.start("tensor_ready");
span.set_attribute(KeyValue::new("tensor_name", tensor.name()));

tensor.mark_comm_ready(ready_cuda_event_ptr);
while self.should_schedule()? {
let bucket = self.ordered_buckets.pop_front().unwrap();
Expand Down Expand Up @@ -311,27 +335,4 @@ impl BaguaCommBackend {
}
}
}

pub fn start_upload_telemetry(&self, skip: bool) -> Result<(), BaguaCoreError> {
NOBLES5E marked this conversation as resolved.
Show resolved Hide resolved
SCHEDULED_THREAD_POOL.execute(move || match TELEMETRY.as_ref() {
None => {}
Some(x) => {
let mut guard = x.lock();
match skip {
true => {
guard.clear();
}
false => {
match guard.push_payload_and_clear() {
Ok(_) => {}
Err(x) => {
tracing::error!("{:?}", x)
}
};
}
}
}
});
Ok(())
}
}
82 changes: 0 additions & 82 deletions bagua-core-internal/src/telemetry/mod.rs

This file was deleted.

5 changes: 0 additions & 5 deletions bagua-core-py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,6 @@ impl BaguaCommBackendPy {
py.allow_threads(|| self.inner.wait_pending_comm_ops())
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
}

pub fn start_upload_telemetry(&self, skip: bool, py: Python) -> PyResult<()> {
py.allow_threads(|| self.inner.start_upload_telemetry(skip))
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
}
}

#[pyclass(dict)]
Expand Down
22 changes: 22 additions & 0 deletions bagua-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "bagua-opentelemetry"
version = "0.1.0"
edition = "2018"
publish = ["private"]

[dependencies]
tracing = "0.1"
async-std = { version = "1.6", features = ["attributes", "tokio1"] }
async-trait = { version = "0.1" }
hyper = { version = "0.14", features = ["full"] }
opentelemetry = { version = "0.15", default-features = false, features = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using a different runtime from bagua-core-internal's opentelemetry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio does not work, removed

"trace",
"rt-async-std",
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
futures = { version = "0.3" }

tokio-stream = { version = "0.1" }
44 changes: 44 additions & 0 deletions bagua-opentelemetry/src/exporter/agent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone, Debug, Hash)]
pub struct BaguaSpan {
pub trace_id: u128,
pub action: String,
pub tensor_name: String,
pub start_time: u128,
pub end_time: u128,
}

#[derive(Serialize, Deserialize, Clone, Debug, Hash)]
pub struct BaguaBatch {
pub spans: Vec<BaguaSpan>,
}

#[derive(Debug)]
pub struct AgentAsyncClientHTTP {
server_addr: String,
client: reqwest::Client,
}

impl AgentAsyncClientHTTP {
pub fn new(server_addr: String) -> AgentAsyncClientHTTP {
Self {
server_addr: server_addr,
client: reqwest::Client::new(),
}
}

pub async fn emit_batch(
&mut self,
batch: BaguaBatch,
) -> Result<reqwest::Response, reqwest::Error> {
let uri = format!(
"http://{}/api/v1/report_tensor_execution_order",
self.server_addr
);

let resp = self.client.post(uri).json(&batch).send().await?;

Ok(resp)
}
}
62 changes: 62 additions & 0 deletions bagua-opentelemetry/src/exporter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
pub mod agent;

use crate::exporter::agent::{AgentAsyncClientHTTP, BaguaBatch, BaguaSpan};
use async_trait::async_trait;
use opentelemetry::{sdk::export::trace, Key};
use reqwest::StatusCode;
use std::time::UNIX_EPOCH;

#[derive(Debug)]
pub struct Exporter {
pub uploader: AgentAsyncClientHTTP,
}

#[async_trait]
impl trace::SpanExporter for Exporter {
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
NOBLES5E marked this conversation as resolved.
Show resolved Hide resolved
let mut bagua_spans = Vec::new();
for span in batch {
let bagua_span = BaguaSpan {
trace_id: span.span_context.trace_id().to_u128(),
action: span.name.into_owned(),
tensor_name: span
.attributes
.get(&Key::new("tensor_name"))
.unwrap()
.as_str()
.to_string(),
start_time: span
.start_time
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
end_time: span
.end_time
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
};

bagua_spans.push(bagua_span);
}

let resp = self
.uploader
.emit_batch(BaguaBatch { spans: bagua_spans })
.await;
match resp {
Ok(resp) => {
if resp.status() != StatusCode::OK {
tracing::warn!("upload bagua span failed, resp={:?}", resp);
}
}
Err(err) => {
tracing::warn!("upload bagua span failed, err={:?}", err);
}
}

Ok(())
}

fn shutdown(&mut self) {}
}
21 changes: 21 additions & 0 deletions bagua-opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub mod exporter;

use crate::exporter::agent::AgentAsyncClientHTTP;
use crate::exporter::Exporter;
use opentelemetry;
use opentelemetry::{global, sdk, trace::Tracer, trace::TracerProvider};

pub fn init_tracer(autotune_server_addr: &str) -> impl Tracer {
let exporter = Exporter {
uploader: AgentAsyncClientHTTP::new(autotune_server_addr.to_string()),
};

let builder = sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry::runtime::AsyncStd);

let tracer_provider = builder.build();
let tracer = tracer_provider.get_tracer("bagua-opentelemetry", Some(env!("CARGO_PKG_VERSION")));
let _ = global::set_tracer_provider(tracer_provider);

tracer
}
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ def install_dependency_library():
],
author="Kuaishou AI Platform & DS3 Lab",
author_email="admin@mail.xrlian.com",
install_requires=[],
install_requires=[
"setuptools_rust",
"colorama",
],
zip_safe=False,
)