diff --git a/conf/jasminegraph-server.properties b/conf/jasminegraph-server.properties index c03de76e1..7ecc14a19 100644 --- a/conf/jasminegraph-server.properties +++ b/conf/jasminegraph-server.properties @@ -77,8 +77,8 @@ org.jasminegraph.scheduler.performancecollector.timing=30 #-------------------------------------------------------------------------------- #PerformanceCollector #-------------------------------------------------------------------------------- -org.jasminegraph.collector.pushgateway=http://192.168.43.135:9091/ -org.jasminegraph.collector.prometheus=http://192.168.43.135:9090/ +org.jasminegraph.collector.pushgateway=http://172.28.5.1:9091/ +org.jasminegraph.collector.prometheus=http://172.28.5.2:9090/ #-------------------------------------------------------------------------------- #MetaDB information diff --git a/conf/prometheus.yaml b/conf/prometheus.yaml new file mode 100644 index 000000000..8631515d9 --- /dev/null +++ b/conf/prometheus.yaml @@ -0,0 +1,14 @@ +global: + scrape_interval: 15s + external_labels: + monitor: "codelab-monitor" +scrape_configs: + - job_name: "prometheus" + scrape_interval: 5s + static_configs: + - targets: ["localhost:9090"] + + - job_name: "pushgateway" + scrape_interval: 2s + static_configs: + - targets: ["172.28.5.3"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..913f01fce --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,45 @@ +services: + jasminegraph: + image: jasminegraph:latest + ports: + - '7777:7777' + - '7778:7778' + volumes: + - '/var/run/docker.sock:/var/run/docker.sock:rw' + - './env/databases/metadb:/home/ubuntu/software/jasminegraph/metadb' + - './env/databases/performancedb:/home/ubuntu/software/jasminegraph/performancedb' + - './env/data:/var/tmp/data' + - '/tmp/jasminegraph:/tmp/jasminegraph' + networks: + - jasminenet + command: --MODE 1 --MASTERIP 172.28.5.1 --WORKERS 2 --WORKERIP 172.28.5.1 --ENABLE_NMON false + depends_on: + - prometheus + prometheus: + image: prom/prometheus:latest + ports: + - 9090:9090 + volumes: + - './conf/prometheus.yaml:/etc/prometheus/prometheus.yml' + networks: + jasminenet: + ipv4_address: 172.28.5.2 + depends_on: + - pushgateway + pushgateway: + image: prom/pushgateway + ports: + - 9091:9091 + networks: + jasminenet: + ipv4_address: 172.28.5.3 +networks: + jasminenet: + external: false + name: jasminenet + driver: bridge + ipam: + config: + - subnet: 172.28.5.0/24 + ip_range: 172.28.5.0/24 + gateway: 172.28.5.1 diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index d9d02bdd8..d1d292ac1 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -134,7 +134,7 @@ void *frontendservicesesion(void *dummyPt) { std::string kafka_server_IP; cppkafka::Configuration configs; KafkaConnector *kstream; - Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH); + Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH, sqlite); vector workerClients; bool workerClientsInitialized = false; @@ -1229,14 +1229,13 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c // create kafka consumer and graph partitioner kstream = new KafkaConnector(configs); // Create the Partitioner object. - Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL); + Partitioner graphPartitioner(numberOfPartitions, 0, spt::Algorithms::FENNEL, sqlite); // Create the KafkaConnector object. kstream = new KafkaConnector(configs); // Subscribe to the Kafka topic. kstream->Subscribe(topic_name_s); // Create the StreamHandler object. - StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients); - + StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite); string path = "kafka:\\" + topic_name_s + ":" + group_id; std::time_t time = chrono::system_clock::to_time_t(chrono::system_clock::now()); string uploadStartTime = ctime(&time); diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index 7eff8eefc..d18de7a46 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -133,6 +133,26 @@ void Partitioner::printStats() { } } +void Partitioner::updateMetaDB() { + double vertexCount = 0; + double edgesCount = 0; + double edgeCutsCount = 0; + + for (auto partition : this->partitions) { + vertexCount += partition.getVertextCount(); + edgesCount += partition.getEdgesCount(); + edgeCutsCount += partition.edgeCutsCount(); + } + + double numberOfEdges = edgesCount + edgeCutsCount/2; + std::string sqlStatement = "UPDATE graph SET vertexcount = '" + std::to_string(vertexCount) + + "' ,centralpartitioncount = '" + std::to_string(this->numberOfPartitions) + + "' ,edgecount = '" + std::to_string(numberOfEdges) + + "' WHERE idgraph = '" + std::to_string(this->graphID) + "'"; + this->sqlite->runUpdate(sqlStatement); + streaming_partitioner_logger.info("Successfully updated metaDB"); +} + /** * Greedy vertex assignment objectives of minimizing the number of cut edges and balancing of the partition sizes. Assign the vertext to partition P that maximize the partition score diff --git a/src/partitioner/stream/Partitioner.h b/src/partitioner/stream/Partitioner.h index 55b1ad5ab..75570ee7e 100644 --- a/src/partitioner/stream/Partitioner.h +++ b/src/partitioner/stream/Partitioner.h @@ -15,6 +15,7 @@ #include #include "./Partition.h" +#include "../../metadb/SQLiteDBInterface.h" typedef std::vector> partitionedEdge; namespace spt { // spt : Streaming Partitioner @@ -29,12 +30,13 @@ class Partitioner { long totalEdges = 0; int graphID; spt::Algorithms algorithmInUse; + SQLiteDBInterface *sqlite; // perPartitionCap is : Number of vertices that can be store in this partition, This is a dynamic shared pointer // containing a value depending on the whole graph size and # of partitions public: - Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog) - : numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog) { + Partitioner(int numberOfPartitions, int graphID, spt::Algorithms alog, SQLiteDBInterface* sqlite) + : numberOfPartitions(numberOfPartitions), graphID(graphID), algorithmInUse(alog), sqlite(sqlite) { for (size_t i = 0; i < numberOfPartitions; i++) { this->partitions.push_back(Partition(i, numberOfPartitions)); }; @@ -47,6 +49,8 @@ class Partitioner { partitionedEdge fennelPartitioning(std::pair edge); partitionedEdge ldgPartitioning(std::pair edge); static std::pair deserialize(std::string data); + void updateMetaDB(); + void setGraphID(int graphId){this->graphID = graphId;} }; #endif // !JASMINE_PARTITIONER_HEADER diff --git a/src/server/JasmineGraphInstanceService.cpp b/src/server/JasmineGraphInstanceService.cpp index 10e897860..1e638048c 100644 --- a/src/server/JasmineGraphInstanceService.cpp +++ b/src/server/JasmineGraphInstanceService.cpp @@ -13,6 +13,7 @@ limitations under the License. #include "JasmineGraphInstanceService.h" +#include #include #include @@ -1980,6 +1981,8 @@ static inline void close_command(int connFd, bool *loop_exit_p) { static inline void shutdown_command(int connFd) { close(connFd); + pid_t ppid = getppid(); + kill(ppid, SIGTERM); exit(0); } diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index 7cddf831d..07c1bbcf4 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -168,11 +168,19 @@ std::vector Utils::getHostListFromProperties() { } static inline std::string trim_right_copy(const std::string &s, const std::string &delimiters) { - return s.substr(0, s.find_last_not_of(delimiters) + 1); + size_t end = s.find_last_not_of(delimiters); + if (end == std::string::npos) { + return ""; // Return empty string if all characters are delimiters + } + return s.substr(0, end + 1); } static inline std::string trim_left_copy(const std::string &s, const std::string &delimiters) { - return s.substr(s.find_first_not_of(delimiters)); + size_t start = s.find_first_not_of(delimiters); + if (start == std::string::npos) { + return ""; // Return empty string if all characters are delimiters + } + return s.substr(start); } std::string Utils::trim_copy(const std::string &s, const std::string &delimiters) { diff --git a/src/util/kafka/StreamHandler.cpp b/src/util/kafka/StreamHandler.cpp index bf788ea94..44a63c2c2 100644 --- a/src/util/kafka/StreamHandler.cpp +++ b/src/util/kafka/StreamHandler.cpp @@ -27,10 +27,10 @@ using namespace std::chrono; Logger stream_handler_logger; StreamHandler::StreamHandler(KafkaConnector *kstream, int numberOfPartitions, - vector &workerClients) + vector &workerClients, SQLiteDBInterface* sqlite) : kstream(kstream), workerClients(workerClients), - graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH), + graphPartitioner(numberOfPartitions, 0, spt::Algorithms::HASH, sqlite), stream_topic_name("stream_topic_name") { } @@ -82,6 +82,9 @@ void StreamHandler::listen_to_kafka_topic() { stream_handler_logger.error("Edge Rejected. Streaming edge should Include the Graph ID."); continue; } + auto prop = edgeJson["properties"]; + auto graphID = std::string(prop["graphId"]); + graphPartitioner.setGraphID(stoi(graphID)); auto sourceJson = edgeJson["source"]; auto destinationJson = edgeJson["destination"]; string sId = std::string(sourceJson["id"]); @@ -114,6 +117,6 @@ void StreamHandler::listen_to_kafka_topic() { workerClients.at(temp_d)->publish(obj.dump()); } } - + graphPartitioner.updateMetaDB(); graphPartitioner.printStats(); } diff --git a/src/util/kafka/StreamHandler.h b/src/util/kafka/StreamHandler.h index 7190be4f0..1a7059266 100644 --- a/src/util/kafka/StreamHandler.h +++ b/src/util/kafka/StreamHandler.h @@ -23,7 +23,8 @@ limitations under the License. class StreamHandler { public: - StreamHandler(KafkaConnector *kstream, int numberOfPartitions, std::vector &workerClients); + StreamHandler(KafkaConnector *kstream, int numberOfPartitions, + std::vector &workerClients, SQLiteDBInterface* sqlite); void listen_to_kafka_topic(); cppkafka::Message pollMessage(); bool isErrorInMessage(const cppkafka::Message &msg); diff --git a/src_python/fl_client.py b/src_python/fl_client.py index 89c037ae4..36edb8297 100644 --- a/src_python/fl_client.py +++ b/src_python/fl_client.py @@ -49,6 +49,7 @@ class Client: (Without partition sheduling) """ + #pylint: disable=too-many-positional-arguments def __init__(self, model, graph_params, weights_path, graph_id, partition_id, epochs=10, ip=socket.gethostname(), port=5000, header_length=10): diff --git a/src_python/fl_client_shed.py b/src_python/fl_client_shed.py index a821bf77e..ed42283e7 100644 --- a/src_python/fl_client_shed.py +++ b/src_python/fl_client_shed.py @@ -53,6 +53,7 @@ class Client: on a given GCN model (With partition sheduling) """ + #pylint: disable=too-many-positional-arguments def __init__(self, client_id, weights_path, graph_id, partition_ids, epochs=10, ip=socket.gethostname(), port=5000, header_length=10): diff --git a/src_python/fl_server.py b/src_python/fl_server.py index 7a8a85d7e..076baecce 100644 --- a/src_python/fl_server.py +++ b/src_python/fl_server.py @@ -36,6 +36,7 @@ class Server: learning process (Without partition sheduling) """ + #pylint: disable=too-many-positional-arguments def __init__(self, model, rounds, weights_path, graph_id, max_conn=2, ip=socket.gethostname(), port=5000, header_length=10): diff --git a/src_python/fl_server_shed.py b/src_python/fl_server_shed.py index 33779de23..ac15e57b6 100644 --- a/src_python/fl_server_shed.py +++ b/src_python/fl_server_shed.py @@ -35,6 +35,7 @@ class Server: learning process (With partition sheduling) """ + #pylint: disable=too-many-positional-arguments def __init__(self, model_weights, rounds, weights_path, graph_id, num_clients=2, ip=socket.gethostname(), port=5000, header_length=10): diff --git a/src_python/org_agg.py b/src_python/org_agg.py index aabf83625..13c6a9df5 100644 --- a/src_python/org_agg.py +++ b/src_python/org_agg.py @@ -38,6 +38,7 @@ class Aggregator: """Aggregator """ + #pylint: disable=too-many-positional-arguments def __init__(self, model, rounds, num_orgs, ip, port): # Parameters diff --git a/src_python/org_server.py b/src_python/org_server.py index ecc2ec886..5d5683db8 100644 --- a/src_python/org_server.py +++ b/src_python/org_server.py @@ -39,6 +39,7 @@ class Server: """Server """ + #pylint: disable=too-many-positional-arguments def __init__(self, org_id, model_weights, rounds, num_clients, ip, port): # Parameters diff --git a/start-docker.sh b/start-docker.sh new file mode 100755 index 000000000..83082e18e --- /dev/null +++ b/start-docker.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker compose up -d jasminegraph prometheus & diff --git a/stop-docker.sh b/stop-docker.sh new file mode 100755 index 000000000..56881c99b --- /dev/null +++ b/stop-docker.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# Variables +HOST="localhost" +PORT="7777" + +# Establish telnet connection and send command +( + sleep 5 + echo "shdn" + sleep 5 + echo "exit" + sleep 5 +) | telnet "$HOST" "$PORT" + +docker compose stop prometheus pushgateway & diff --git a/test-docker.sh b/test-docker.sh index 1abd2635e..34dad2512 100755 --- a/test-docker.sh +++ b/test-docker.sh @@ -169,4 +169,7 @@ fi stop_and_remove_containers force_remove "${TEST_ROOT}/env" "${WORKER_LOG_DIR}" +if [ "$exit_code" = '0' ]; then + docker tag jasminegraph:test jasminegraph:latest +fi exit "$exit_code"