Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/vcnt #263

Closed
wants to merge 12 commits into from
4 changes: 2 additions & 2 deletions conf/jasminegraph-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ org.jasminegraph.scheduler.performancecollector.timing=30
#--------------------------------------------------------------------------------
#PerformanceCollector
#--------------------------------------------------------------------------------
org.jasminegraph.collector.pushgateway=http://192.168.43.135:9091/
org.jasminegraph.collector.prometheus=http://192.168.43.135:9090/
org.jasminegraph.collector.pushgateway=http://172.28.5.1:9091/
org.jasminegraph.collector.prometheus=http://172.28.5.2:9090/

#--------------------------------------------------------------------------------
#MetaDB information
Expand Down
14 changes: 14 additions & 0 deletions conf/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
global:
scrape_interval: 15s
external_labels:
monitor: "codelab-monitor"
scrape_configs:
- job_name: "prometheus"
scrape_interval: 5s
static_configs:
- targets: ["localhost:9090"]

- job_name: "pushgateway"
scrape_interval: 2s
static_configs:
- targets: ["172.28.5.3"]
45 changes: 45 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
services:
jasminegraph:
image: jasminegraph:latest
ports:
- '7777:7777'
- '7778:7778'
volumes:
- '/var/run/docker.sock:/var/run/docker.sock:rw'
- './env/databases/metadb:/home/ubuntu/software/jasminegraph/metadb'
- './env/databases/performancedb:/home/ubuntu/software/jasminegraph/performancedb'
- './env/data:/var/tmp/data'
- '/tmp/jasminegraph:/tmp/jasminegraph'
networks:
- jasminenet
command: --MODE 1 --MASTERIP 172.28.5.1 --WORKERS 2 --WORKERIP 172.28.5.1 --ENABLE_NMON false
depends_on:
- prometheus
prometheus:
image: prom/prometheus:latest
ports:
- 9090:9090
volumes:
- './conf/prometheus.yaml:/etc/prometheus/prometheus.yml'
networks:
jasminenet:
ipv4_address: 172.28.5.2
depends_on:
- pushgateway
pushgateway:
image: prom/pushgateway
ports:
- 9091:9091
networks:
jasminenet:
ipv4_address: 172.28.5.3
networks:
jasminenet:
external: false
name: jasminenet
driver: bridge
ipam:
config:
- subnet: 172.28.5.0/24
ip_range: 172.28.5.0/24
gateway: 172.28.5.1
7 changes: 3 additions & 4 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void *frontendservicesesion(void *dummyPt) {
std::string kafka_server_IP;
cppkafka::Configuration configs;
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite);

vector<DataPublisher *> workerClients;
bool workerClientsInitialized = false;
Expand Down Expand Up @@ -1229,14 +1229,13 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL);
Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite);
// Create the KafkaConnector object.
kstream = new KafkaConnector(configs);
// Subscribe to the Kafka topic.
kstream->Subscribe(topic_name_s);
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients);

StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite);
string path = "kafka:\\" + topic_name_s + ":" + group_id;
std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now());
string uploadStartTime = ctime(&time);
Expand Down
20 changes: 20 additions & 0 deletions src/partitioner/stream/Partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,26 @@ void Partitioner::printStats() {
}
}

void Partitioner::updateMetaDB() {
double vertexCount = 0;
double edgesCount = 0;
double edgeCutsCount = 0;

for (auto partition : this->partitions) {
vertexCount += partition.getVertextCount();
edgesCount += partition.getEdgesCount();
edgeCutsCount += partition.edgeCutsCount();
}

double numberOfEdges = edgesCount + edgeCutsCount/2;
std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) +
"' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) +
"' ,edgecount = '" + std::to_string(numberOfEdges) +
"' WHERE idgraph = '" + std::to_string(this->graphID) + "'";
this->sqlite->runUpdate(sqlStatement);
streaming_partitioner_logger.info("Successfully updated metaDB");
}

/**
* Greedy vertex assignment objectives of minimizing the number of cut edges
and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score
Expand Down
8 changes: 6 additions & 2 deletions src/partitioner/stream/Partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <vector>

#include "./Partition.h"
#include "../../metadb/SQLiteDBInterface.h"

typedef std::vector<std::pair<std::string, long>> partitionedEdge;
namespace spt { // spt : Streaming Partitioner
Expand All @@ -29,12 +30,13 @@ class Partitioner {
long totalEdges = 0;
int graphID;
spt::Algorithms algorithmInUse;
SQLiteDBInterface *sqlite;
// perPartitionCap is : Number of vertices that can be store in this partition, This is a dynamic shared pointer
// containing a value depending on the whole graph size and # of partitions

public:
Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog)
: numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog) {
Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog, SQLiteDBInterface* sqlite)
: numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog), sqlite(sqlite) {
for (size_t i = 0; i < numberOfPartitions; i++) {
this->partitions.push_back(Partition(i, numberOfPartitions));
};
Expand All @@ -47,6 +49,8 @@ class Partitioner {
partitionedEdge fennelPartitioning(std::pair<std::string, std::string> edge);
partitionedEdge ldgPartitioning(std::pair<std::string, std::string> edge);
static std::pair<long, long> deserialize(std::string data);
void updateMetaDB();
void setGraphID(int graphId){this->graphID = graphId;}
};

#endif // !JASMINE_PARTITIONER_HEADER
3 changes: 3 additions & 0 deletions src/server/JasmineGraphInstanceService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ limitations under the License.

#include "JasmineGraphInstanceService.h"

#include <signal.h>
#include <stdio.h>
#include <unistd.h>

Expand Down Expand Up @@ -1980,6 +1981,8 @@ static inline void close_command(int connFd, bool *loop_exit_p) {

static inline void shutdown_command(int connFd) {
close(connFd);
pid_t ppid = getppid();
kill(ppid, SIGTERM);
exit(0);
}

Expand Down
12 changes: 10 additions & 2 deletions src/util/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,19 @@ std::vector<std::string> Utils::getHostListFromProperties() {
}

static inline std::string trim_right_copy(const std::string &s, const std::string &delimiters) {
return s.substr(0, s.find_last_not_of(delimiters) + 1);
size_t end = s.find_last_not_of(delimiters);
if (end == std::string::npos) {
return ""; // Return empty string if all characters are delimiters
}
return s.substr(0, end + 1);
}

static inline std::string trim_left_copy(const std::string &s, const std::string &delimiters) {
return s.substr(s.find_first_not_of(delimiters));
size_t start = s.find_first_not_of(delimiters);
if (start == std::string::npos) {
return ""; // Return empty string if all characters are delimiters
}
return s.substr(start);
}

std::string Utils::trim_copy(const std::string &s, const std::string &delimiters) {
Expand Down
9 changes: 6 additions & 3 deletions src/util/kafka/StreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ using namespace std::chrono;
Logger stream_handler_logger;

StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions,
vector<DataPublisher *> &workerClients)
vector<DataPublisher *> &workerClients, SQLiteDBInterface* sqlite)
: kstream(kstream),
workerClients(workerClients),
graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH),
graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite),
stream_topic_name("stream_topic_name") { }


Expand Down Expand Up @@ -82,6 +82,9 @@ void StreamHandler::listen_to_kafka_topic() {
stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID.");
continue;
}
auto prop = edgeJson["properties"];
auto graphID = std::string(prop["graphId"]);
graphPartitioner.setGraphID(stoi(graphID));
auto sourceJson = edgeJson["source"];
auto destinationJson = edgeJson["destination"];
string sId = std::string(sourceJson["id"]);
Expand Down Expand Up @@ -114,6 +117,6 @@ void StreamHandler::listen_to_kafka_topic() {
workerClients.at(temp_d)->publish(obj.dump());
}
}

graphPartitioner.updateMetaDB();
graphPartitioner.printStats();
}
3 changes: 2 additions & 1 deletion src/util/kafka/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ limitations under the License.

class StreamHandler {
public:
StreamHandler(KafkaConnector *kstream, int numberOfPartitions, std::vector<DataPublisher *> &workerClients);
StreamHandler(KafkaConnector *kstream, int numberOfPartitions,
std::vector<DataPublisher *> &workerClients, SQLiteDBInterface* sqlite);
void listen_to_kafka_topic();
cppkafka::Message pollMessage();
bool isErrorInMessage(const cppkafka::Message &msg);
Expand Down
1 change: 1 addition & 0 deletions src_python/fl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class Client:
(Without partition sheduling)
"""

#pylint: disable=too-many-positional-arguments
def __init__(self, model, graph_params, weights_path, graph_id, partition_id, epochs=10,
ip=socket.gethostname(), port=5000, header_length=10):

Expand Down
1 change: 1 addition & 0 deletions src_python/fl_client_shed.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Client:
on a given GCN model (With partition sheduling)
"""

#pylint: disable=too-many-positional-arguments
def __init__(self, client_id, weights_path, graph_id, partition_ids, epochs=10,
ip=socket.gethostname(), port=5000, header_length=10):

Expand Down
1 change: 1 addition & 0 deletions src_python/fl_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Server:
learning process (Without partition sheduling)
"""

#pylint: disable=too-many-positional-arguments
def __init__(self, model, rounds, weights_path, graph_id, max_conn=2, ip=socket.gethostname(),
port=5000, header_length=10):

Expand Down
1 change: 1 addition & 0 deletions src_python/fl_server_shed.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Server:
learning process (With partition sheduling)
"""

#pylint: disable=too-many-positional-arguments
def __init__(self, model_weights, rounds, weights_path, graph_id, num_clients=2,
ip=socket.gethostname(), port=5000, header_length=10):

Expand Down
1 change: 1 addition & 0 deletions src_python/org_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Aggregator:
"""Aggregator
"""

#pylint: disable=too-many-positional-arguments
def __init__(self, model, rounds, num_orgs, ip, port):

# Parameters
Expand Down
1 change: 1 addition & 0 deletions src_python/org_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Server:
"""Server
"""

#pylint: disable=too-many-positional-arguments
def __init__(self, org_id, model_weights, rounds, num_clients, ip, port):

# Parameters
Expand Down
2 changes: 2 additions & 0 deletions start-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
docker compose up -d jasminegraph prometheus &
16 changes: 16 additions & 0 deletions stop-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# Variables
HOST="localhost"
PORT="7777"

# Establish telnet connection and send command
(
sleep 5
echo "shdn"
sleep 5
echo "exit"
sleep 5
) | telnet "$HOST" "$PORT"

docker compose stop prometheus pushgateway &
3 changes: 3 additions & 0 deletions test-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,7 @@ fi

stop_and_remove_containers
force_remove "${TEST_ROOT}/env" "${WORKER_LOG_DIR}"
if [ "$exit_code" = '0' ]; then
docker tag jasminegraph:test jasminegraph:latest
fi
exit "$exit_code"