Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADN Network Functions #242

Merged
merged 25 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions eval/policy/admission-control/attach1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
addon_engine = "HelloAclSenderEngine"
tx_channels_replacements = [
[
"MrpcEngine",
"HelloAclSenderEngine",
0,
0,
],
[
"HelloAclSenderEngine",
"TcpRpcAdapterEngine",
0,
0,
],
]
rx_channels_replacements = [
[
"TcpRpcAdapterEngine",
"HelloAclSenderEngine",
0,
0,
],
[
"HelloAclSenderEngine",
"MrpcEngine",
0,
0,
],
]
group = ["MrpcEngine", "TcpRpcAdapterEngine"]
op = "attach"
config_string = '''
'''
33 changes: 33 additions & 0 deletions eval/policy/admission-control/attach2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
addon_engine = "AdmissionControlEngine"
tx_channels_replacements = [
[
"MrpcEngine",
"AdmissionControlEngine",
0,
0,
],
[
"AdmissionControlEngine",
"HelloAclSenderEngine",
0,
0,
],
]
rx_channels_replacements = [
[
"HelloAclSenderEngine",
"AdmissionControlEngine",
0,
0,
],
[
"AdmissionControlEngine",
"MrpcEngine",
0,
0,
],
]
group = ["MrpcEngine", "HelloAclSenderEngine", "TcpRpcAdapterEngine"]
op = "attach"
config_string = '''
'''
4 changes: 4 additions & 0 deletions eval/policy/admission-control/detach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
addon_engine = "HelloAclSenderEngine"
tx_channels_replacements = [["MrpcEngine", "TcpRpcAdapterEngine", 0, 0]]
rx_channels_replacements = []
op = "detach"
33 changes: 33 additions & 0 deletions eval/policy/fault-server/attach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
addon_engine = "FaultServerEngine"
tx_channels_replacements = [
[
"MrpcEngine",
"FaultServerEngine",
0,
0,
],
[
"FaultServerEngine",
"TcpRpcAdapterEngine",
0,
0,
],
]
rx_channels_replacements = [
[
"TcpRpcAdapterEngine",
"FaultServerEngine",
0,
0,
],
[
"FaultServerEngine",
"MrpcEngine",
0,
0,
],
]
group = ["MrpcEngine", "TcpRpcAdapterEngine"]
op = "attach"
config_string = '''
'''
47 changes: 47 additions & 0 deletions eval/policy/fault-server/collect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3
from typing import List
import glob
import sys

OD = "/tmp/mrpc-eval"
if len(sys.argv) >= 2:
OD = sys.argv[1]


def convert_msg_size(s: str) -> int:
if s.endswith('gb'):
return int(s[:-2]) * 1024 * 1024 * 1024
if s.endswith('mb'):
return int(s[:-2]) * 1024 * 1024
if s.endswith('kb'):
return int(s[:-2]) * 1024
if s.endswith('b'):
return int(s[:-1])

raise ValueError(f"unknown input: {s}")


def get_rate(path: str) -> List[float]:
rates = []
with open(path, 'r') as fin:
for line in fin:
words = line.strip().split(' ')
if words[-3] == 'rps,':
rate = float(words[-4])
rates.append(rate)
return rates[1:]


def load_result(sol_before, sol_after, f: str):
# print(f)
rates = get_rate(f)
before = rates[5:25]
after = rates[-25:-5]
for r in before:
print(f'{round(r/1000,2)},{sol_before},w/o Fault')
for r in after:
print(f'{round(r/1000,2)},{sol_after},w/ Fault')


for f in glob.glob(OD+"/policy/fault/rpc_bench_tput_32b/rpc_bench_client_danyang-04.stdout"):
load_result('mRPC', 'ADN+mRPC', f)
9 changes: 9 additions & 0 deletions eval/policy/fault-server/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
workdir = "~/nfs/Developing/livingshade/phoenix/experimental/mrpc"

[env]
RUST_BACKTRACE = "1"
RUST_LOG_STYLE = "never"
CARGO_TERM_COLOR = "never"
PHOENIX_LOG = "info"
PROTOC = "/usr/bin/protoc"
PHOENIX_PREFIX = "/tmp/phoenix"
4 changes: 4 additions & 0 deletions eval/policy/fault-server/detach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
addon_engine = "FaultServerEngine"
tx_channels_replacements = [["MrpcEngine", "TcpRpcAdapterEngine", 0, 0]]
rx_channels_replacements = [["TcpRpcAdapterEngine", "MrpcEngine", 0, 0]]
op = "detach"
15 changes: 15 additions & 0 deletions eval/policy/fault-server/rpc_bench_tput_32b.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name = "policy/fault/rpc_bench_tput_32b"
description = "Run rpc_bench benchmark"
group = "fault"
timeout_secs = 70

[[worker]]
host = "danyang-06"
bin = "rpc_bench_server"
args = "--port 5002 -l info --transport tcp"

[[worker]]
host = "danyang-04"
bin = "rpc_bench_client"
args = "--transport tcp -c rdma0.danyang-06 --concurrency 128 --req-size 32 --duration 65 -i 1 --port 5002 -l error"
dependencies = [0]
29 changes: 29 additions & 0 deletions eval/policy/fault-server/start_traffic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env bash

trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM SIGHUP EXIT

OD=/tmp/mrpc-eval
if [[ $# -ge 1 ]]; then
OD=$1
fi

WORKDIR=$(dirname $(realpath $0))
cd $WORKDIR

# concurrency = 128
cargo rr --bin launcher -- --output-dir ${OD} --timeout=120 --benchmark ./rpc_bench_tput_32b.toml --configfile ./config.toml &

sleep 30

LIST_OUTPUT="${OD}"/policy/list.json
cargo rr --bin list -- --dump "${LIST_OUTPUT}" # Need to specifiy PHOENIX_PREFIX
cat "${LIST_OUTPUT}"
ARG_PID=$(cat "${LIST_OUTPUT}" | jq '.[] | select(.service == "Mrpc") | .pid')
ARG_SID=$(cat "${LIST_OUTPUT}" | jq '.[] | select(.service == "Mrpc") | .sid')
echo $ARG_SID

sleep 1

cargo run --bin addonctl -- --config ./attach.toml --pid ${ARG_PID} --sid ${ARG_SID} # Need to specifiy PHOENIX_PREFIX

wait
2 changes: 1 addition & 1 deletion eval/policy/hello-acl-receiver/detach.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addon_engine = "HelloAclReceiverEngine"
tx_channels_replacements = [["MrpcEngine", "TcpRpcAdapterEngine", 0, 0]]
rx_channels_replacements = []
rx_channels_replacements = [["TcpRpcAdapterEngine", "MrpcEngine", 0, 0]]
op = "detach"
4 changes: 4 additions & 0 deletions eval/policy/logging-server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
If you want to measure latency

change `--concurrency 128` to `--concurrency 1` and append
`--log-latency` to the end of the args in `rpc_bench_tput_32b.toml`.
31 changes: 31 additions & 0 deletions eval/policy/logging-server/attach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
addon_engine = "LoggingServerEngine"
tx_channels_replacements = [
[
"MrpcEngine",
"LoggingServerEngine",
0,
0,
],
[
"LoggingServerEngine",
"TcpRpcAdapterEngine",
0,
0,
],
]
rx_channels_replacements = [
[
"TcpRpcAdapterEngine",
"LoggingServerEngine",
0,
0,
],
[
"LoggingServerEngine",
"MrpcEngine",
0,
0,
],
]
group = ["MrpcEngine", "TcpRpcAdapterEngine"]
op = "attach"
33 changes: 33 additions & 0 deletions eval/policy/logging-server/collect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3
from typing import List
import glob
import sys

OD = "/tmp/mrpc-eval"
if len(sys.argv) >= 2:
OD = sys.argv[1]


def get_rate(path: str) -> List[float]:
rates = []
with open(path, 'r') as fin:
for line in fin:
words = line.strip().split(' ')
if words[-3] == 'rps,':
rate = float(words[-4])
rates.append(rate)
return rates[1:]


def load_result(sol_before, sol_after, f: str):
rates = get_rate(f)
before = rates[5:25]
after = rates[-25:-5]
for r in before:
print(f'{round(r/1000,2)},{sol_before},w/o Logging')
for r in after:
print(f'{round(r/1000,2)},{sol_after},w/ Logging')


for f in glob.glob(OD+"/policy/logging/rpc_bench_tput_32b/rpc_bench_client_danyang-04.stdout"):
load_result('mRPC', 'Native mRPC', f)
9 changes: 9 additions & 0 deletions eval/policy/logging-server/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
workdir = "~/nfs/Developing/livingshade/phoenix/experimental/mrpc"

[env]
RUST_BACKTRACE = "1"
RUST_LOG_STYLE = "never"
CARGO_TERM_COLOR = "never"
PHOENIX_LOG = "info"
PROTOC = "/usr/bin/protoc"
PHOENIX_PREFIX = "/tmp/phoenix"
4 changes: 4 additions & 0 deletions eval/policy/logging-server/detach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
addon_engine = "LoggingServerEngine"
tx_channels_replacements = [["MrpcEngine", "TcpRpcAdapterEngine", 0, 0]]
rx_channels_replacements = [["TcpRpcAdapterEngine", "MrpcEngine", 0, 0]]
op = "detach"
15 changes: 15 additions & 0 deletions eval/policy/logging-server/rpc_bench_tput_32b.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name = "policy/logging/rpc_bench_tput_32b"
description = "Run rpc_bench benchmark"
group = "logging"
timeout_secs = 70

[[worker]]
host = "danyang-06"
bin = "rpc_bench_server"
args = "--port 5002 -l info --transport tcp"

[[worker]]
host = "danyang-04"
bin = "rpc_bench_client"
args = "--transport tcp -c rdma0.danyang-06 --concurrency 128 --req-size 32 --duration 65 -i 1 --port 5002 -l info"
dependencies = [0]
58 changes: 58 additions & 0 deletions eval/policy/logging-server/run_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
import json
import subprocess
import pathlib
import os
from os.path import dirname
import time
import toml
import sys

OD = "/tmp/mrpc-eval"
if len(sys.argv) >= 2:
OD = sys.argv[1]

SCRIPTDIR = pathlib.Path(__file__).parent.resolve()
CONFIG_PATH = os.path.join(SCRIPTDIR, "config.toml")

config = toml.load(CONFIG_PATH)
workdir = config["workdir"]
workdir = dirname(dirname(os.path.expanduser(workdir)))
env = {**os.environ, **config['env']}

os.chdir(workdir)
os.makedirs(f"{OD}/policy/null", exist_ok=True)

cmd = f'''cargo run --release -p benchmark --bin launcher -- -o {OD} --timeout=120
--benchmark {os.path.join(SCRIPTDIR, 'rpc_bench_tput_32b.toml')}
--configfile { os.path.join(SCRIPTDIR, 'config.toml')}'''
workload = subprocess.Popen(cmd.split())
time.sleep(30)

list_cmd = f"cargo run --release --bin list -- --dump {OD}/policy/list.json"
subprocess.run(list_cmd.split(), env=env)

with open(f"{OD}/policy/list.json", "r") as fin:
content = fin.read()
print(content)
data = json.loads(content)
mrpc_pid = None
mrpc_sid = None
for subscription in data:
pid = subscription["pid"]
sid = subscription["sid"]
engines = [x[1] for x in subscription["engines"]]
if "MrpcEngine" in engines:
mrpc_pid = pid
mrpc_sid = sid

print("Start to attach policy")
attach_config = os.path.join(SCRIPTDIR, "attach.toml")
attach_cmd = f"cargo run --release --bin addonctl -- --config {attach_config} --pid {mrpc_pid} --sid {mrpc_sid}"
subprocess.run(attach_cmd.split(), env=env)

subprocess.run(list_cmd.split(), env=env)
with open(f"{OD}/policy/list.json", "r") as fin:
print(fin.read())

workload.wait()
Loading
Loading