Skip to content

Commit

Permalink
In progress adding children to Object classes
Browse files Browse the repository at this point in the history
  • Loading branch information
mdemoret-nv committed Jul 12, 2023
1 parent 7cbfe8e commit 0178730
Show file tree
Hide file tree
Showing 12 changed files with 531 additions and 108 deletions.
41 changes: 41 additions & 0 deletions cpp/mrc/include/mrc/node/node_parent.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <algorithm>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <numeric>
#include <tuple>
#include <type_traits>
#include <utility>

namespace mrc::node {

template <typename... TypesT>
class NodeParent
{
public:
using child_types_t = std::tuple<TypesT...>;

virtual std::tuple<std::pair<std::string, std::reference_wrapper<TypesT>>...> get_children_refs() const = 0;
};

} // namespace mrc::node
34 changes: 29 additions & 5 deletions cpp/mrc/include/mrc/node/operators/zip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
#include "mrc/channel/buffered_channel.hpp"
#include "mrc/channel/channel.hpp"
#include "mrc/channel/status.hpp"
#include "mrc/node/node_parent.hpp"
#include "mrc/node/sink_properties.hpp"
#include "mrc/node/source_properties.hpp"
#include "mrc/types.hpp"
#include "mrc/utils/string_utils.hpp"
#include "mrc/utils/tuple_utils.hpp"
#include "mrc/utils/type_utils.hpp"

Expand All @@ -42,8 +45,16 @@

namespace mrc::node {

class ZipBase
{
public:
virtual ~ZipBase() = default;
};

template <typename... TypesT>
class Zip : public WritableAcceptor<std::tuple<TypesT...>>
class Zip : public ZipBase,
public WritableAcceptor<std::tuple<TypesT...>>,
public NodeParent<edge::IWritableProvider<TypesT>...>
{
template <typename T>
using queue_t = BufferedChannel<T>;
Expand All @@ -63,6 +74,13 @@ class Zip : public WritableAcceptor<std::tuple<TypesT...>>
return std::make_tuple(std::make_unique<queue_t<TypesT>>(channel_size)...);
}

template <std::size_t... Is>
static std::tuple<std::pair<std::string, std::reference_wrapper<edge::IWritableProvider<TypesT>>>...>
build_child_pairs(Zip* self, std::index_sequence<Is...> /*unused*/)
{
return std::make_tuple(std::make_pair(MRC_CONCAT_STR("sink[" << Is << "]"), std::ref(self->get_sink<Is>()))...);
}

template <std::size_t I = 0>
channel::Status tuple_pop_each(queues_tuple_type& queues_tuple, output_t& output_tuple)
{
Expand All @@ -89,12 +107,18 @@ class Zip : public WritableAcceptor<std::tuple<TypesT...>>
m_queue_counts.fill(0);
}

virtual ~Zip() = default;
~Zip() override = default;

template <size_t N>
std::shared_ptr<edge::IWritableProvider<NthTypeOf<N, TypesT...>>> get_sink() const
edge::IWritableProvider<NthTypeOf<N, TypesT...>>& get_sink() const
{
return *std::get<N>(m_upstream_holders);
}

std::tuple<std::pair<std::string, std::reference_wrapper<edge::IWritableProvider<TypesT>>>...> get_children_refs()
const override
{
return std::get<N>(m_upstream_holders);
return build_child_pairs(const_cast<Zip*>(this), std::index_sequence_for<TypesT...>{});
}

protected:
Expand Down Expand Up @@ -242,7 +266,7 @@ class Zip : public WritableAcceptor<std::tuple<TypesT...>>
}
}

boost::fibers::mutex m_mutex;
mutable Mutex m_mutex;

// Once an upstream is closed, this is set representing the max number of values in a queue before its closed
size_t m_max_queue_count{std::numeric_limits<size_t>::max()};
Expand Down
8 changes: 7 additions & 1 deletion cpp/mrc/include/mrc/segment/component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ template <typename ResourceT>
class Component final : public Object<ResourceT>
{
public:
Component(std::unique_ptr<ResourceT> resource) : m_resource(std::move(resource)) {}
Component(std::unique_ptr<ResourceT> resource) :
ObjectProperties(Object<ResourceT>::build_state()),
Object<ResourceT>(),
m_resource(std::move(resource))
{
this->init_children();
}
~Component() final = default;

private:
Expand Down
6 changes: 5 additions & 1 deletion cpp/mrc/include/mrc/segment/egress_port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ class EgressPort final : public Object<node::RxSinkBase<T>>,

public:
EgressPort(SegmentAddress address, PortName name) :
ObjectProperties(Object<node::RxSinkBase<T>>::build_state()),
m_segment_address(address),
m_port_name(std::move(name)),
m_sink(std::make_unique<node::RxNode<T>>())
{}
{
// Must call after constructing Object<T>
this->init_children();
}

private:
node::RxSinkBase<T>* get_object() const final
Expand Down
6 changes: 5 additions & 1 deletion cpp/mrc/include/mrc/segment/ingress_port.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ class IngressPort : public Object<node::RxSourceBase<T>>, public IngressPortBase

public:
IngressPort(SegmentAddress address, PortName name) :
ObjectProperties(Object<node::RxSourceBase<T>>::build_state()),
m_segment_address(address),
m_port_name(std::move(name)),
m_source(std::make_unique<node::RxNode<T>>())
{}
{
// Must call after constructing Object<T>
this->init_children();
}

private:
node::RxSourceBase<T>* get_object() const final
Expand Down
Loading

0 comments on commit 0178730

Please sign in to comment.