Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ControlMessage to hold arbitrary Python objects & update MessageMeta to copy & slice #1637

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b11984f
add py_object getter/setter for cm
yczhang-nv Apr 16, 2024
7450404
add pybind and python test for cm.py_obj
yczhang-nv Apr 16, 2024
554d2d9
add copy functions to meta & fix meta ut segfault
yczhang-nv Apr 17, 2024
fd8b7f6
add longer array to test
yczhang-nv Apr 17, 2024
8fb01fb
add test to slice and copy
yczhang-nv Apr 17, 2024
ee2ebf5
remove whitespace
yczhang-nv Apr 17, 2024
858246a
fix format
yczhang-nv Apr 17, 2024
321731e
fix format
yczhang-nv Apr 17, 2024
ebca9b3
add Python test to meta slicing
yczhang-nv Apr 18, 2024
433078d
update CM to use JSONValues
yczhang-nv Apr 24, 2024
ba17ddd
fix JSONValues bug
yczhang-nv Apr 24, 2024
ffe0b25
clean up
yczhang-nv Apr 24, 2024
b176874
clean up
yczhang-nv Apr 24, 2024
108ccd3
fix bug
yczhang-nv Apr 24, 2024
da1683f
fix prototypes
yczhang-nv Apr 25, 2024
5a38423
added some todos in comments
yczhang-nv Apr 25, 2024
9632715
use json_t
yczhang-nv May 2, 2024
61dede7
add support to json_t
yczhang-nv May 3, 2024
2d13968
Merge branch 'branch-24.06' into yuchen-update-ControlMessage
yczhang-nv May 3, 2024
7c0229a
add tests
yczhang-nv May 3, 2024
5d9473b
update doc
yczhang-nv May 3, 2024
dec46b6
fix format
yczhang-nv May 3, 2024
5394fc1
update doc
yczhang-nv May 3, 2024
58dc37f
Use PyHolder
yczhang-nv May 3, 2024
67dd047
Use nlohmann::json::subtype
yczhang-nv May 3, 2024
97beb13
add unit tests
yczhang-nv May 7, 2024
de5b8df
fix import bugs
yczhang-nv May 8, 2024
6ac3a9b
add tests
yczhang-nv May 8, 2024
1036b57
Merge remote-tracking branch 'upstream/branch-24.06' into yuchen-upda…
yczhang-nv May 8, 2024
8d942f9
fix comments
yczhang-nv May 9, 2024
3899e46
Merge remote-tracking branch 'upstream/branch-24.06' into yuchen-upda…
yczhang-nv May 9, 2024
5b3c8a4
revert test
yczhang-nv May 9, 2024
ec4ca78
fix doc
yczhang-nv May 9, 2024
93e3f08
delete whitespace
yczhang-nv May 9, 2024
65ceacf
fix_all does not break
yczhang-nv May 9, 2024
5e67913
passed python check
yczhang-nv May 9, 2024
cc4f695
did not break before modifying module.cpp
yczhang-nv May 9, 2024
eb73aad
fix ci
yczhang-nv May 9, 2024
a5b4e1a
Merge branch 'branch-24.06' into yuchen-update-ControlMessage
yczhang-nv May 9, 2024
96d570b
fix tests
yczhang-nv May 9, 2024
86693df
Merge remote-tracking branch 'origin/yuchen-update-ControlMessage' in…
yczhang-nv May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions morpheus/_lib/include/morpheus/messages/control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <nlohmann/json.hpp> // for json, basic_json
#include <pybind11/pytypes.h> // for object, dict, list, none
#include <pymrc/utilities/json_values.hpp>

#include <chrono> // for system_clock, time_point
#include <map> // for map
Expand Down Expand Up @@ -328,6 +329,20 @@ class ControlMessage
*/
void task_type(ControlMessageType task_type);

/**
* @brief Set a Python object at a specific path
* @param path the path in the JSON object where the value should be set
* @param value the Python object to set
*/
void set_py_object(const std::string& path, const pybind11::object& value);

/**
* @brief Get the Python object at a specific path
* @param path Path to the specified object
* @return The Python representation of the object at the specified path
*/
pybind11::object get_py_object(const std::string& path) const;

/**
* @brief Sets a timestamp for a specific key.
*
Expand Down Expand Up @@ -374,6 +389,7 @@ class ControlMessage

nlohmann::json m_tasks{};
nlohmann::json m_config{};
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved
mrc::pymrc::JSONValues m_py_objects;

std::map<std::string, time_point_t> m_timestamps{};
};
Expand Down
34 changes: 34 additions & 0 deletions morpheus/_lib/include/morpheus/messages/meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ class MessageMeta
*/
virtual std::optional<std::string> ensure_sliceable_index();

/**
* @brief Creates a deep copy of DataFrame with the specified ranges.
*
* @param ranges the tensor index ranges to copy
* @return std::shared_ptr<MessageMeta> the deep copy of the specified ranges
*/
virtual std::shared_ptr<MessageMeta> copy_ranges(const std::vector<RangeType>& ranges) const;

/**
* @brief Get a slice of the underlying DataFrame by creating a deep copy
*
* @param start the tensor index of the start of the copy
* @param stop the tensor index of the end of the copy
* @return std::shared_ptr<MessageMeta> the deep copy of the speicifed slice
*/
virtual std::shared_ptr<MessageMeta> get_slice(TensorIndex start, TensorIndex stop) const;

/**
* @brief Create MessageMeta cpp object from a python object
*
Expand Down Expand Up @@ -297,6 +314,23 @@ struct MessageMetaInterfaceProxy
* @return std::string The name of the column with the old index or nullopt if no changes were made.
*/
static std::optional<std::string> ensure_sliceable_index(MessageMeta& self);

/**
* @brief Creates a deep copy of DataFrame with the specified ranges.
*
* @param ranges the tensor index ranges to copy
* @return std::shared_ptr<MessageMeta> the deep copy of the specified ranges
*/
static std::shared_ptr<MessageMeta> copy_ranges(MessageMeta& self, const std::vector<RangeType>& ranges);

/**
* @brief Get a slice of the underlying DataFrame by creating a deep copy
*
* @param start the tensor index of the start of the copy
* @param stop the tensor index of the end of the copy
* @return std::shared_ptr<MessageMeta> the deep copy of the speicifed slice
*/
static std::shared_ptr<MessageMeta> get_slice(MessageMeta& self, TensorIndex start, TensorIndex stop);
};

#pragma GCC visibility pop
Expand Down
4 changes: 4 additions & 0 deletions morpheus/_lib/messages/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ControlMessage():
Retrieve timestamps matching a regex filter within a given group.
"""
def get_metadata(self, key: object = None, default_value: object = None) -> object: ...
def get_py_object(self, path: str) -> object: ...
def get_tasks(self) -> dict: ...
def get_timestamp(self, key: str, fail_if_nonexist: bool = False) -> object:
"""
Expand All @@ -70,6 +71,7 @@ class ControlMessage():
def payload(self, meta: object) -> None: ...
def remove_task(self, task_type: str) -> dict: ...
def set_metadata(self, key: str, value: object) -> None: ...
def set_py_object(self, path: str, value: object) -> None: ...
def set_timestamp(self, key: str, timestamp: object) -> None:
"""
Set a timestamp for a given key and group.
Expand Down Expand Up @@ -182,6 +184,7 @@ class InferenceMemoryNLP(InferenceMemory, TensorMemory):
class MessageMeta():
def __init__(self, df: object) -> None: ...
def copy_dataframe(self) -> object: ...
def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]) -> MessageMeta: ...
def ensure_sliceable_index(self) -> typing.Optional[str]: ...
def get_column_names(self) -> typing.List[str]: ...
@typing.overload
Expand All @@ -192,6 +195,7 @@ class MessageMeta():
def get_data(self, columns: str) -> object: ...
@typing.overload
def get_data(self, columns: typing.List[str]) -> object: ...
def get_slice(self, start: int, stop: int) -> MessageMeta: ...
def has_sliceable_index(self) -> bool: ...
@staticmethod
def make_from_file(arg0: str) -> MessageMeta: ...
Expand Down
6 changes: 5 additions & 1 deletion morpheus/_lib/messages/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ PYBIND11_MODULE(messages, _module)
.def("mutable_dataframe", &MessageMetaInterfaceProxy::mutable_dataframe, py::return_value_policy::move)
.def("has_sliceable_index", &MessageMetaInterfaceProxy::has_sliceable_index)
.def("ensure_sliceable_index", &MessageMetaInterfaceProxy::ensure_sliceable_index)
.def("copy_ranges", &MessageMetaInterfaceProxy::copy_ranges, py::return_value_policy::move, py::arg("ranges"))
.def("get_slice", &MessageMetaInterfaceProxy::get_slice, py::return_value_policy::move, py::arg("start"), py::arg("stop"))
.def_static("make_from_file", &MessageMetaInterfaceProxy::init_cpp);

py::class_<MultiMessage, std::shared_ptr<MultiMessage>>(_module, "MultiMessage")
Expand Down Expand Up @@ -427,7 +429,9 @@ PYBIND11_MODULE(messages, _module)
.def("set_metadata", &ControlMessageProxy::set_metadata, py::arg("key"), py::arg("value"))
.def("task_type", pybind11::overload_cast<>(&ControlMessage::task_type))
.def(
"task_type", pybind11::overload_cast<ControlMessageType>(&ControlMessage::task_type), py::arg("task_type"));
"task_type", pybind11::overload_cast<ControlMessageType>(&ControlMessage::task_type), py::arg("task_type"))
.def("set_py_object", &ControlMessage::set_py_object, py::arg("path"), py::arg("value"))
.def("get_py_object", &ControlMessage::get_py_object, py::arg("path"));
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved

py::class_<LoaderRegistry, std::shared_ptr<LoaderRegistry>>(_module, "DataLoaderRegistry")
.def_static("contains", &LoaderRegistry::contains, py::arg("name"))
Expand Down
10 changes: 10 additions & 0 deletions morpheus/_lib/src/messages/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ nlohmann::json ControlMessage::remove_task(const std::string& task_type)
throw std::runtime_error("No tasks of type " + task_type + " found");
}

void ControlMessage::set_py_object(const std::string& path, const pybind11::object& value)
{
m_py_objects = std::move(m_py_objects.set_value(path, value));
}

pybind11::object ControlMessage::get_py_object(const std::string& path) const {
auto abs_path = "/" + path;
return m_py_objects.get_python(abs_path);
}

void ControlMessage::set_timestamp(const std::string& key, time_point_t timestamp_ns)
{
// Insert or update the timestamp in the map
Expand Down
60 changes: 56 additions & 4 deletions morpheus/_lib/src/messages/meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

#include <cuda_runtime.h> // for cudaMemcpy, cudaMemcpy2D, cudaMemcpyKind
#include <cudf/column/column_view.hpp> // for column_view
#include <cudf/concatenate.hpp>
#include <cudf/copying.hpp>
#include <cudf/io/types.hpp>
#include <cudf/types.hpp> // for type_id, data_type, size_type
#include <glog/logging.h>
Expand Down Expand Up @@ -84,7 +86,9 @@ TableInfo MessageMeta::get_info(const std::vector<std::string>& column_names) co

void MessageMeta::set_data(const std::string& col_name, TensorObject tensor)
{
this->set_data({col_name}, {tensor});
// This causes a segfault in copy ctor of TensorObject, when shared_ptr<MemoryDescriptor> increases the ref count
// this->set_data({col_name}, {tensor});
this->set_data({col_name}, std::vector<TensorObject>{tensor});
}

void MessageMeta::set_data(const std::vector<std::string>& column_names, const std::vector<TensorObject>& tensors)
Expand All @@ -111,16 +115,13 @@ void MessageMeta::set_data(const std::vector<std::string>& column_names, const s
const auto tensor_type = DType(tensors[i].dtype());
const auto tensor_type_id = tensor_type.cudf_type_id();
const auto row_stride = tensors[i].stride(0);

CHECK(tensors[i].count() == cv.size() &&
(table_type_id == tensor_type_id ||
(table_type_id == cudf::type_id::BOOL8 && tensor_type_id == cudf::type_id::UINT8)));

const auto item_size = tensors[i].dtype().item_size();

// Dont use cv.data<>() here since that does not account for the size of each element
auto data_start = const_cast<uint8_t*>(cv.head<uint8_t>()) + cv.offset() * item_size;

if (row_stride == 1)
{
// column major just use cudaMemcpy
Expand Down Expand Up @@ -193,6 +194,41 @@ bool MessageMeta::has_sliceable_index() const
return table.has_sliceable_index();
}

std::shared_ptr<MessageMeta> MessageMeta::copy_ranges(const std::vector<RangeType>& ranges) const
{
// copy ranges into a sequntial list of values
// https://github.com/rapidsai/cudf/issues/11223
std::vector<TensorIndex> cudf_ranges;
for (const auto& p : ranges)
{
// Append the message offset to the range here
cudf_ranges.push_back(p.first);
cudf_ranges.push_back(p.second);
}
auto table_info = this->get_info();
auto column_names = table_info.get_column_names();
auto metadata = cudf::io::table_metadata{};

metadata.schema_info.reserve(column_names.size() + 1);
metadata.schema_info.emplace_back("");

for (auto column_name : column_names)
{
metadata.schema_info.emplace_back(column_name);
}

auto table_view = table_info.get_view();
auto sliced_views = cudf::slice(table_view, cudf_ranges);
cudf::io::table_with_metadata table = {cudf::concatenate(sliced_views), std::move(metadata)};

return MessageMeta::create_from_cpp(std::move(table), 1);
}

std::shared_ptr<MessageMeta> MessageMeta::get_slice(TensorIndex start, TensorIndex stop) const
{
return this->copy_ranges({{start, stop}});
}

std::optional<std::string> MessageMeta::ensure_sliceable_index()
{
auto table = this->get_mutable_info();
Expand Down Expand Up @@ -462,6 +498,20 @@ std::optional<std::string> MessageMetaInterfaceProxy::ensure_sliceable_index(Mes
return self.ensure_sliceable_index();
}

std::shared_ptr<MessageMeta> MessageMetaInterfaceProxy::copy_ranges(MessageMeta& self, const std::vector<RangeType>& ranges)
{
pybind11::gil_scoped_release no_gil;

return self.copy_ranges(ranges);
}

std::shared_ptr<MessageMeta> MessageMetaInterfaceProxy::get_slice(MessageMeta& self, TensorIndex start, TensorIndex stop)
{
pybind11::gil_scoped_release no_gil;

return self.get_slice(start, stop);
}

SlicedMessageMeta::SlicedMessageMeta(std::shared_ptr<MessageMeta> other,
TensorIndex start,
TensorIndex stop,
Expand Down Expand Up @@ -492,4 +542,6 @@ std::optional<std::string> SlicedMessageMeta::ensure_sliceable_index()
throw std::runtime_error{"Unable to set a new index on the DataFrame from a partial view of the columns/rows."};
}



} // namespace morpheus
1 change: 1 addition & 0 deletions morpheus/_lib/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ add_morpheus_test(
messages/test_control_message.cpp
messages/test_dev_doc_ex3.cpp
messages/test_sliced_message_meta.cpp
messages/test_message_meta.cpp
)

add_morpheus_test(
Expand Down
33 changes: 33 additions & 0 deletions morpheus/_lib/tests/messages/test_control_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include <stdexcept> // for runtime_error
#include <string> // for operator<=>, string, char_traits, basic_string
#include <vector> // for vector
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // IWYU pragma: keep

using namespace morpheus;
using namespace morpheus::test;
Expand All @@ -41,6 +43,11 @@ using clock_type_t = std::chrono::system_clock;

using TestControlMessage = morpheus::test::TestMessages; // NOLINT(readability-identifier-naming)

namespace py = pybind11;
using namespace pybind11::literals;
using namespace std::string_literals;


TEST_F(TestControlMessage, InitializationTest)
{
auto msg_one = ControlMessage();
Expand Down Expand Up @@ -334,3 +341,29 @@ TEST_F(TestControlMessage, GetTensorMemoryWhenNoneSet)
// Verify that the retrieved tensor memory is nullptr
EXPECT_EQ(nullptr, retrievedTensorMemory);
}

TEST_F(TestControlMessage, SetAndGetPyObject)
{
auto msg = ControlMessage();

std::array<std::string, 3> alphabet = {"a", "b", "c"};
auto py_dict = py::dict("this"_a = py::dict("is"_a = "a test"s),
"alphabet"_a = py::cast(alphabet),
"ncc"_a = 1701,
"cost"_a = 47.47);

// <path, expected_result>
std::vector<std::pair<std::string, py::object>> tests = {{"", py_dict},
{"this", py::dict("is"_a = "a test"s)},
{"this/is", py::str("a test"s)},
{"alphabet", py_dict["alphabet"]},
{"ncc", py::int_(1701)},
{"cost", py::float_(47.47)}};

for (auto& [path, expected_object] : tests)
{
msg.set_py_object(path, expected_object);
auto object = msg.get_py_object(path);
EXPECT_TRUE(object.equal(expected_object));
}
}
Loading
Loading