Skip to content

Commit

Permalink
Improve vertex sampling (#40)
Browse files Browse the repository at this point in the history
* fix CompactSubgraph.

* fix neighbor sampler.

* speed up sampling.

* reorganize the code of neighbor sampling.

* parallelize neighbor sampling.

* outputs the number of sampled vertices.

* speed up sampling.

* remove comment.

* add TODO
  • Loading branch information
zheng-da authored Nov 12, 2018
1 parent 461f6e8 commit a5a3c3f
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 113 deletions.
244 changes: 144 additions & 100 deletions src/operator/contrib/csr_neighborhood_sample-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ struct ver_node {

// How to set the default value?
struct NeighborSampleParam : public dmlc::Parameter<NeighborSampleParam> {
int num_args;
dgl_id_t num_hops, num_neighbor, max_num_vertices;
DMLC_DECLARE_PARAMETER(NeighborSampleParam) {
DMLC_DECLARE_FIELD(num_args).set_lower_bound(2)
.describe("Number of input arguments.");
DMLC_DECLARE_FIELD(num_hops)
.set_default(1)
.describe("Number of hops.");
Expand All @@ -89,19 +92,23 @@ static bool CSRNeighborSampleStorageType(const nnvm::NodeAttrs& attrs,
DispatchMode* dispatch_mode,
std::vector<int> *in_attrs,
std::vector<int> *out_attrs) {
CHECK_EQ(in_attrs->size(), 2);
CHECK_EQ(out_attrs->size(), 2);
const NeighborSampleParam& params = nnvm::get<NeighborSampleParam>(attrs.parsed);
size_t num_subgraphs = params.num_args - 1;
CHECK_EQ(out_attrs->size(), 2 * num_subgraphs);

CHECK_EQ(in_attrs->at(0), mxnet::kCSRStorage);
CHECK_EQ(in_attrs->at(1), mxnet::kDefaultStorage);
for (size_t i = 0; i < num_subgraphs; i++)
CHECK_EQ(in_attrs->at(1 + i), mxnet::kDefaultStorage);

bool success = true;
if (!type_assign(&(*out_attrs)[0], mxnet::kDefaultStorage)) {
success = false;
}
if (!type_assign(&(*out_attrs)[1], mxnet::kCSRStorage)) {
success = false;
}
for (size_t i = 0; i < num_subgraphs; i++)
if (!type_assign(&(*out_attrs)[i], mxnet::kDefaultStorage)) {
success = false;
}
for (size_t i = 0; i < num_subgraphs; i++)
if (!type_assign(&(*out_attrs)[i + num_subgraphs], mxnet::kCSRStorage)) {
success = false;
}

*dispatch_mode = DispatchMode::kFComputeEx;

Expand All @@ -111,42 +118,54 @@ static bool CSRNeighborSampleStorageType(const nnvm::NodeAttrs& attrs,
static bool CSRNeighborSampleShape(const nnvm::NodeAttrs& attrs,
std::vector<TShape> *in_attrs,
std::vector<TShape> *out_attrs) {
CHECK_EQ(in_attrs->size(), 2);
CHECK_EQ(out_attrs->size(), 2);
const NeighborSampleParam& params = nnvm::get<NeighborSampleParam>(attrs.parsed);
size_t num_subgraphs = params.num_args - 1;
CHECK_EQ(out_attrs->size(), 2 * num_subgraphs);

CHECK_EQ(in_attrs->at(0).ndim(), 2U);
CHECK_EQ(in_attrs->at(1).ndim(), 1U);
// Check the graph shape
for (size_t i = 0; i < num_subgraphs; i++) {
CHECK_EQ(in_attrs->at(1 + i).ndim(), 1U);
}
CHECK_EQ(in_attrs->at(0)[0], in_attrs->at(0)[1]);

const NeighborSampleParam& params =
nnvm::get<NeighborSampleParam>(attrs.parsed);

TShape out_shape(1);
out_shape[0] = params.max_num_vertices;
SHAPE_ASSIGN_CHECK(*out_attrs, 0, out_shape);
// We use the last element to store the actual number of vertices in the
// subgraph.
out_shape[0] = params.max_num_vertices + 1;
bool success = true;
for (size_t i = 0; i < num_subgraphs; i++) {
SHAPE_ASSIGN_CHECK(*out_attrs, i, out_shape);
success = success && out_attrs->at(i).ndim() != 0U &&
out_attrs->at(i).Size() != 0U;
}

TShape out_csr_shape(2);
out_csr_shape[0] = params.max_num_vertices;
out_csr_shape[1] = in_attrs->at(0)[1];
SHAPE_ASSIGN_CHECK(*out_attrs, 1, out_csr_shape);
for (size_t i = 0; i < num_subgraphs; i++) {
SHAPE_ASSIGN_CHECK(*out_attrs, i + num_subgraphs, out_csr_shape);
success = success && out_attrs->at(i + num_subgraphs).ndim() != 0U &&
out_attrs->at(i + num_subgraphs).Size() != 0U;
}

return out_attrs->at(0).ndim() != 0U &&
out_attrs->at(0).Size() != 0U &&
out_attrs->at(1).ndim() != 0U &&
out_attrs->at(1).Size() != 0U;
return success;
}

static bool CSRNeighborSampleType(const nnvm::NodeAttrs& attrs,
std::vector<int> *in_attrs,
std::vector<int> *out_attrs) {
CHECK_EQ(in_attrs->size(), 2);
CHECK_EQ(out_attrs->size(), 2);
const NeighborSampleParam& params = nnvm::get<NeighborSampleParam>(attrs.parsed);
size_t num_subgraphs = params.num_args - 1;
CHECK_EQ(out_attrs->size(), 2 * num_subgraphs);

TYPE_ASSIGN_CHECK(*out_attrs, 0, in_attrs->at(1));
TYPE_ASSIGN_CHECK(*out_attrs, 1, in_attrs->at(0));
bool success = true;
for (size_t i = 0; i < num_subgraphs; i++) {
TYPE_ASSIGN_CHECK(*out_attrs, i, in_attrs->at(1));
TYPE_ASSIGN_CHECK(*out_attrs, i + num_subgraphs, in_attrs->at(0));
success = success && out_attrs->at(i) != -1 && out_attrs->at(i + num_subgraphs) != -1;
}

return out_attrs->at(0) != -1;
return success;
}

static void GetSrcList(const dgl_id_t* val_list,
Expand Down Expand Up @@ -175,78 +194,78 @@ static void GetSample(std::vector<dgl_id_t>& ver_list,
}
return;
}
// Make sample
std::unordered_map<size_t, bool> mp;
size_t sample_count = 0;
for (;;) {
// rand_num = [0, ver_list.size()-1]
size_t rand_num = rand() % ver_list.size();
auto got = mp.find(rand_num);
if (got != mp.end() && mp[rand_num]) {
// re-sample
continue;
}
mp[rand_num] = true;
out.push_back(ver_list[rand_num]);
out_edge.push_back(edge_list[rand_num]);
sample_count++;
if (sample_count == max_num_neighbor) {
break;
// If we just sample a small number of elements from a large neighbor list.
std::vector<size_t> sorted_idxs(max_num_neighbor);
if (ver_list.size() > max_num_neighbor * 10) {
std::unordered_set<size_t> sampled_idxs;
while (sampled_idxs.size() < max_num_neighbor) {
// rand_num = [0, ver_list.size()-1]
size_t rand_num = rand() % ver_list.size();
sampled_idxs.insert(rand_num);
}
size_t i = 0;
for (auto it = sampled_idxs.begin(); it != sampled_idxs.end(); it++, i++)
sorted_idxs[i] = *it;
} else {
// The vertex list is relatively small. We just shuffle the list and
// take the first few.
std::vector<size_t> idxs(ver_list.size());
for (size_t i = 0; i < idxs.size(); i++) idxs[i] = i;
std::random_shuffle(idxs.begin(), idxs.end());
for (size_t i = 0; i < max_num_neighbor; i++)
sorted_idxs[i] = idxs[i];
}
}

static void CSRNeighborSampleComputeExCPU(const nnvm::NodeAttrs& attrs,
const OpContext& ctx,
const std::vector<NDArray>& inputs,
const std::vector<OpReqType>& req,
const std::vector<NDArray>& outputs) {
CHECK_EQ(inputs.size(), 2U);
CHECK_EQ(outputs.size(), 2U);

const NeighborSampleParam& params =
nnvm::get<NeighborSampleParam>(attrs.parsed);
std::sort(sorted_idxs.begin(), sorted_idxs.end());

// set seed for random sampling
srand(time(nullptr));

dgl_id_t num_hops = params.num_hops;
dgl_id_t num_neighbor = params.num_neighbor;
dgl_id_t max_num_vertices = params.max_num_vertices;
for (auto idx : sorted_idxs) {
out.push_back(ver_list[idx]);
out_edge.push_back(edge_list[idx]);
}
}

size_t seed_num = inputs[1].data().Size();
struct neigh_list {
std::vector<dgl_id_t> neighs;
std::vector<dgl_id_t> edges;
neigh_list(const std::vector<dgl_id_t> &_neighs,
const std::vector<dgl_id_t> &_edges): neighs(_neighs), edges(_edges) {
}
};

CHECK_GE(max_num_vertices, seed_num);
static void SampleSubgraph(const NDArray &csr, const NDArray &seed_arr,
const NDArray &sub_csr, const NDArray &sampled_ids,
dgl_id_t num_hops, dgl_id_t num_neighbor,
dgl_id_t max_num_vertices) {
size_t num_seeds = seed_arr.shape().Size();

const dgl_id_t* val_list = inputs[0].data().dptr<dgl_id_t>();
const dgl_id_t* col_list = inputs[0].aux_data(csr::kIdx).dptr<dgl_id_t>();
const dgl_id_t* indptr = inputs[0].aux_data(csr::kIndPtr).dptr<dgl_id_t>();
const dgl_id_t* seed = inputs[1].data().dptr<dgl_id_t>();
CHECK_GE(max_num_vertices, num_seeds);

dgl_id_t* out = outputs[0].data().dptr<dgl_id_t>();
const dgl_id_t* val_list = csr.data().dptr<dgl_id_t>();
const dgl_id_t* col_list = csr.aux_data(csr::kIdx).dptr<dgl_id_t>();
const dgl_id_t* indptr = csr.aux_data(csr::kIndPtr).dptr<dgl_id_t>();
const dgl_id_t* seed = seed_arr.data().dptr<dgl_id_t>();
dgl_id_t* out = sampled_ids.data().dptr<dgl_id_t>();

// BFS traverse the graph and sample vertices
dgl_id_t sub_vertices_count = 0;
std::map<dgl_id_t, bool> sub_ver_mp;
std::unordered_set<dgl_id_t> sub_ver_mp;
std::queue<ver_node> node_queue;
// add seed vertices
for (size_t i = 0; i < seed_num; ++i) {
for (size_t i = 0; i < num_seeds; ++i) {
ver_node node;
node.vertex_id = seed[i];
node.level = 0;
node_queue.push(node);
sub_ver_mp[node.vertex_id] = true;
sub_ver_mp.insert(node.vertex_id);
sub_vertices_count++;
}

std::vector<dgl_id_t> tmp_src_list;
std::vector<dgl_id_t> tmp_edge_list;
std::vector<dgl_id_t> tmp_sampled_src_list;
std::vector<dgl_id_t> tmp_sampled_edge_list;
std::unordered_map<dgl_id_t, neigh_list> neigh_mp;

std::map<dgl_id_t, std::vector<dgl_id_t> > ver_mp;
std::map<dgl_id_t, std::vector<dgl_id_t> > edge_mp;

size_t num_edges = 0;
while (!node_queue.empty()) {
ver_node& cur_node = node_queue.front();
if (cur_node.level < num_hops) {
Expand All @@ -269,70 +288,76 @@ static void CSRNeighborSampleComputeExCPU(const nnvm::NodeAttrs& attrs,
num_neighbor,
tmp_sampled_src_list,
tmp_sampled_edge_list);

ver_mp[dst_id] = tmp_sampled_src_list;
edge_mp[dst_id] = tmp_sampled_edge_list;
neigh_mp.insert(std::pair<dgl_id_t, neigh_list>(dst_id,
neigh_list(tmp_sampled_src_list,
tmp_sampled_edge_list)));
num_edges += tmp_sampled_src_list.size();

// TODO The code doesn't limit the maximal number of vertices correctly.
sub_vertices_count++;
if (sub_vertices_count == max_num_vertices) {
break;
}

for (size_t i = 0; i < tmp_sampled_src_list.size(); ++i) {
auto got = sub_ver_mp.find(tmp_sampled_src_list[i]);
if (got == sub_ver_mp.end()) {
sub_ver_mp[tmp_sampled_src_list[i]] = true;
auto ret = sub_ver_mp.insert(tmp_sampled_src_list[i]);
if (ret.second) {
sub_vertices_count++;
ver_node new_node;
new_node.vertex_id = tmp_sampled_src_list[i];
new_node.level = cur_node.level + 1;
node_queue.push(new_node);
if (new_node.level < num_hops)
node_queue.push(new_node);
}
}
}
node_queue.pop();
}

// Copy sub_ver_mp to output[0]
dgl_id_t idx = 0;
size_t idx = 0;
for (auto& data: sub_ver_mp) {
if (data.second) {
*(out+idx) = data.first;
idx++;
}
*(out+idx) = data;
idx++;
}
size_t num_vertices = sub_ver_mp.size();
std::sort(out, out + num_vertices);
// The rest data will be set to -1
for (dgl_id_t i = idx; i < max_num_vertices; ++i) {
*(out+i) = -1;
}
// The last element stores the actual number of vertices in the subgraph.
out[max_num_vertices] = sub_ver_mp.size();

// Construct sub_csr_graph
// TODO reduce the memory copy
std::vector<dgl_id_t> sub_val;
std::vector<dgl_id_t> sub_col_list;
std::vector<dgl_id_t> sub_indptr(max_num_vertices+1, 0);

size_t index = 1;
for (auto& data: sub_ver_mp) {
dgl_id_t dst_id = data.first;
auto edge = edge_mp.find(dst_id);
auto vert = ver_mp.find(dst_id);
if (edge != edge_mp.end() && vert != ver_mp.end()) {
CHECK_EQ(edge->second.size(), vert->second.size());
for (auto& val : edge->second) {
sub_val.reserve(num_edges);
sub_col_list.reserve(num_edges);

for (size_t i = 0, index = 1; i < num_vertices; i++) {
dgl_id_t dst_id = *(out + i);
auto it = neigh_mp.find(dst_id);
if (it != neigh_mp.end()) {
const auto &edges = it->second.edges;
const auto &neighs = it->second.neighs;
CHECK_EQ(edges.size(), neighs.size());
for (auto& val : edges) {
sub_val.push_back(val);
}
for (auto& val : vert->second) {
for (auto& val : neighs) {
sub_col_list.push_back(val);
}
sub_indptr[index] = sub_indptr[index-1] + edge->second.size();
sub_indptr[index] = sub_indptr[index-1] + edges.size();
} else {
sub_indptr[index] = sub_indptr[index-1];
}
index++;
}

// Copy sub_csr_graph to output[1]
const NDArray& sub_csr = outputs[1];
TShape shape_1(1);
TShape shape_2(1);
shape_1[0] = sub_val.size();
Expand All @@ -351,6 +376,25 @@ static void CSRNeighborSampleComputeExCPU(const nnvm::NodeAttrs& attrs,
std::copy(sub_indptr.begin(), sub_indptr.end(), indptr_out);
}

static void CSRNeighborSampleComputeExCPU(const nnvm::NodeAttrs& attrs,
const OpContext& ctx,
const std::vector<NDArray>& inputs,
const std::vector<OpReqType>& req,
const std::vector<NDArray>& outputs) {
const NeighborSampleParam& params =
nnvm::get<NeighborSampleParam>(attrs.parsed);
size_t num_subgraphs = inputs.size() - 1;
CHECK_EQ(outputs.size(), 2 * num_subgraphs);

// set seed for random sampling
srand(time(nullptr));

#pragma omp parallel for
for (size_t i = 0; i < num_subgraphs; i++)
SampleSubgraph(inputs[0], inputs[i + 1], outputs[i + num_subgraphs], outputs[i],
params.num_hops, params.num_neighbor, params.max_num_vertices);
}

} // op
} // mxnet

Expand Down
Loading

0 comments on commit a5a3c3f

Please sign in to comment.