Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
parity-clib: async C bindings to RPC requests + `subscribe/unsubscr…
Browse files Browse the repository at this point in the history
…ibe to websocket events` (#9920)

* feat(parity-clib asynchronous rpc queries)

* feat(seperate bindings for ws and rpc)

* Subscribing to websockets for the full-client works

* feat(c binding unsubscribe_from_websocket)

* fix(tests): tweak CMake build config

* Enforce C+11

* refactor(parity-cpp-example) : `cpp:ify`

* fix(typedefs) : revert typedefs parity-clib

* docs(nits)

* fix(simplify websocket_unsubscribe)

* refactor(cpp example) : more subscriptions

* fix(callback type) : address grumbles on callback

* Use it the example to avoid using global variables

* docs(nits) - don't mention `arc`

* fix(jni bindings): fix compile errors

* feat(java example and updated java bindings)

* fix(java example) : run both full and light client

* fix(Java shutdown) : unsubscribe to sessions

Forgot to pass the JNIEnv environment since it is an instance method

* feat(return valid JString)

* Remove Java dependency by constructing a valid Java String in the callback

* fix(logger) : remove `rpc` trace log

* fix(format)

* fix(parity-clib): remove needless callback `type`

* fix(parity-clib-examples) : update examples

* `cpp` example pass in a struct instead to determines `callback kind`
* `java` add a instance variable the class `Callback` to determine `callback kind`

* fix(review comments): docs and format

* Update parity-clib/src/java.rs

Co-Authored-By: niklasad1 <niklasadolfsson1@gmail.com>

* fix(bad merge + spelling)

* fix(move examples to parity-clib/examples)
  • Loading branch information
niklasad1 authored and 5chdn committed Jan 2, 2019
1 parent 2bb7961 commit b4f8bba
Show file tree
Hide file tree
Showing 14 changed files with 771 additions and 236 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion parity-clib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ name = "parity"
crate-type = ["cdylib", "staticlib"]

[dependencies]
futures = "0.1.6"
jni = { version = "0.10.1", optional = true }
panic_hook = { path = "../util/panic-hook" }
parity-ethereum = { path = "../", default-features = false }
jni = { version = "0.10.1", optional = true }
tokio = "0.1.11"
tokio-current-thread = "0.1.3"

[features]
default = []
Expand Down
103 changes: 63 additions & 40 deletions parity-clib/Parity.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,67 @@
* Interface to the Parity client.
*/
public class Parity {
/**
* Starts the Parity client with the CLI options passed as an array of strings.
*
* Each space-delimited option corresponds to an array entry.
* For example: `["--port", "12345"]`
*
* @param options The CLI options to start Parity with
*/
public Parity(String[] options) {
long config = configFromCli(options);
inner = build(config);
}

/** Performs a synchronous RPC query.
*
* Note that this will block the current thread until the query is finished. You are
* encouraged to create a background thread if you don't want to block.
*
* @param query The JSON-encoded RPC query to perform
* @return A JSON-encoded result
*/
public String rpcQuery(String query) {
return rpcQueryNative(inner, query);
}

@Override
protected void finalize​() {
destroy(inner);
}

static {
System.loadLibrary("parity");
}

private static native long configFromCli(String[] cliOptions);
private static native long build(long config);
private static native void destroy(long inner);
private static native String rpcQueryNative(long inner, String rpc);

private long inner;
/**
* Starts the Parity client with the CLI options passed as an array of strings.
*
* Each space-delimited option corresponds to an array entry.
* For example: `["--port", "12345"]`
*
* @param options The CLI options to start Parity with
*/
public Parity(String[] options) {
long config = configFromCli(options);
inner = build(config);
}

/** Performs an asynchronous RPC query by spawning a background thread that is executed until
* either a response is received or the timeout has been expired.
*
* @param query The JSON-encoded RPC query to perform
* @param timeoutMillis The maximum time in milliseconds that the query will run
* @param callback An instance of class which must have a instance method named `callback` that will be
* invoke when the result is ready
*/
public void rpcQuery(String query, long timeoutMillis, Object callback) {
rpcQueryNative(inner, query, timeoutMillis, callback);
}

/** Subscribes to a specific WebSocket event that will run in a background thread until it is canceled.
*
* @param query The JSON-encoded RPC query to perform
* @param callback An instance of class which must have a instance method named `callback` that will be invoked
* when the result is ready
*
* @return A pointer to the current sessions which can be used to terminate the session later
*/
public long subscribeWebSocket(String query, Object callback) {
return subscribeWebSocketNative(inner, query, callback);
}

/** Unsubscribes to a specific WebSocket event
*
* @param session Pointer the the session to terminate
*/
public void unsubscribeWebSocket(long session) {
unsubscribeWebSocketNative(session);
}

// FIXME: `finalize` is deprecated - https://github.com/paritytech/parity-ethereum/issues/10066
@Override
protected void finalize​() {
destroy(inner);
}

static {
System.loadLibrary("parity");
}

private static native long configFromCli(String[] cliOptions);
private static native long build(long config);
private static native void destroy(long inner);
private static native void rpcQueryNative(long inner, String rpc, long timeoutMillis, Object callback);
private static native long subscribeWebSocketNative(long inner, String rpc, Object callback);
private static native void unsubscribeWebSocketNative(long session);

private long inner;
}
5 changes: 2 additions & 3 deletions parity-clib/examples/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
cmake_minimum_required(VERSION 3.5)
include(ExternalProject)

include_directories("${CMAKE_SOURCE_DIR}/../../../parity-clib")

include_directories("${CMAKE_SOURCE_DIR}/../..")
set (CMAKE_CXX_STANDARD 11) # Enfore C++11
add_executable(parity-example main.cpp)

ExternalProject_Add(
Expand Down
197 changes: 161 additions & 36 deletions parity-clib/examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,169 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

#include <cstddef>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <chrono>
#include <parity.h>
#include <regex>
#include <string>
#include <thread>

void on_restart(void*, const char*, size_t) {}
void* parity_run(std::vector<const char*>);
int parity_subscribe_to_websocket(void*);
int parity_rpc_queries(void*);

const int SUBSCRIPTION_ID_LEN = 18;
const size_t TIMEOUT_ONE_MIN_AS_MILLIS = 60 * 1000;
const unsigned int CALLBACK_RPC = 1;
const unsigned int CALLBACK_WS = 2;

struct Callback {
unsigned int type;
long unsigned int counter;
};

// list of rpc queries
const std::vector<std::string> rpc_queries {
"{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}",
"{\"method\":\"eth_getTransactionReceipt\",\"params\":[\"0x444172bef57ad978655171a8af2cfd89baa02a97fcb773067aef7794d6913fff\"],\"id\":1,\"jsonrpc\":\"2.0\"}",
"{\"method\":\"eth_estimateGas\",\"params\":[{\"from\":\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"}],\"id\":1,\"jsonrpc\":\"2.0\"}",
"{\"method\":\"eth_getBalance\",\"params\":[\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
};

// list of subscriptions
const std::vector<std::string> ws_subscriptions {
"{\"method\":\"parity_subscribe\",\"params\":[\"eth_getBalance\",[\"0xcd2a3d9f938e13cd947ec05abc7fe734df8dd826\",\"latest\"]],\"id\":1,\"jsonrpc\":\"2.0\"}",
"{\"method\":\"parity_subscribe\",\"params\":[\"parity_netPeers\"],\"id\":1,\"jsonrpc\":\"2.0\"}",
"{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
};

// callback that gets invoked upon an event
void callback(void* user_data, const char* response, size_t _len) {
Callback* cb = static_cast<Callback*>(user_data);
if (cb->type == CALLBACK_RPC) {
printf("rpc response: %s\r\n", response);
cb->counter -= 1;
} else if (cb->type == CALLBACK_WS) {
printf("websocket response: %s\r\n", response);
std::regex is_subscription ("\\{\"jsonrpc\":\"2.0\",\"result\":\"0[xX][a-fA-F0-9]{16}\",\"id\":1\\}");
if (std::regex_match(response, is_subscription) == true) {
cb->counter -= 1;
}
}
}

int main() {
ParityParams cfg = { 0 };
cfg.on_client_restart_cb = on_restart;

const char* args[] = {"--no-ipc"};
size_t str_lens[] = {8};
if (parity_config_from_cli(args, str_lens, 1, &cfg.configuration) != 0) {
return 1;
}

void* parity;
if (parity_start(&cfg, &parity) != 0) {
return 1;
}

const char* rpc = "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}";
size_t out_len = 256;
char* out = (char*)malloc(out_len + 1);
if (parity_rpc(parity, rpc, strlen(rpc), out, &out_len)) {
return 1;
}
out[out_len] = '\0';
printf("RPC output: %s", out);
free(out);

sleep(5);
if (parity != NULL) {
parity_destroy(parity);
}

return 0;
// run full-client
{
std::vector<const char*> config = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"};
void* parity = parity_run(config);
if (parity_rpc_queries(parity)) {
printf("rpc_queries failed\r\n");
return 1;
}

if (parity_subscribe_to_websocket(parity)) {
printf("ws_queries failed\r\n");
return 1;
}

if (parity != nullptr) {
parity_destroy(parity);
}
}

// run light-client
{
std::vector<const char*> light_config = {"--no-ipc", "--light", "--jsonrpc-apis=all", "--chain", "kovan"};
void* parity = parity_run(light_config);

if (parity_rpc_queries(parity)) {
printf("rpc_queries failed\r\n");
return 1;
}

if (parity_subscribe_to_websocket(parity)) {
printf("ws_queries failed\r\n");
return 1;
}

if (parity != nullptr) {
parity_destroy(parity);
}
}
return 0;
}

int parity_rpc_queries(void* parity) {
if (!parity) {
return 1;
}

Callback cb { .type = CALLBACK_RPC, .counter = rpc_queries.size() };

for (auto query : rpc_queries) {
if (parity_rpc(parity, query.c_str(), query.length(), TIMEOUT_ONE_MIN_AS_MILLIS, callback, &cb) != 0) {
return 1;
}
}

while(cb.counter != 0);
return 0;
}


int parity_subscribe_to_websocket(void* parity) {
if (!parity) {
return 1;
}

std::vector<const void*> sessions;

Callback cb { .type = CALLBACK_WS, .counter = ws_subscriptions.size() };

for (auto sub : ws_subscriptions) {
void *const session = parity_subscribe_ws(parity, sub.c_str(), sub.length(), callback, &cb);
if (!session) {
return 1;
}
sessions.push_back(session);
}

while(cb.counter != 0);
std::this_thread::sleep_for(std::chrono::seconds(60));
for (auto session : sessions) {
parity_unsubscribe_ws(session);
}
return 0;
}

void* parity_run(std::vector<const char*> args) {
ParityParams cfg = {
.configuration = nullptr,
.on_client_restart_cb = callback,
.on_client_restart_cb_custom = nullptr
};

std::vector<size_t> str_lens;

for (auto arg: args) {
str_lens.push_back(std::strlen(arg));
}

// make sure no out-of-range access happens here
if (args.empty()) {
if (parity_config_from_cli(nullptr, nullptr, 0, &cfg.configuration) != 0) {
return nullptr;
}
} else {
if (parity_config_from_cli(&args[0], &str_lens[0], args.size(), &cfg.configuration) != 0) {
return nullptr;
}
}

void *parity = nullptr;
if (parity_start(&cfg, &parity) != 0) {
return nullptr;
}

return parity;
}
Loading

0 comments on commit b4f8bba

Please sign in to comment.