Skip to content

Commit

Permalink
Merge pull request #15 from picanumber/MultiplierStage
Browse files Browse the repository at this point in the history
Hatching stage
  • Loading branch information
picanumber authored Oct 15, 2022
2 parents 849ed31 + 0c1126a commit f21aa00
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 19 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/asan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@ on:
branches: [ "main" ]

jobs:

address_sanitizer:
strategy:
matrix:
compiler:
- pkg: g++-11
exe: g++-11
- pkg: g++-10
exe: g++-10

uses: picanumber/sanitizer_workflows/.github/workflows/asan.yml@main
with:
testDir: 'test' # Explicit specification of the test directory.
compiler_package: ${{ matrix.compiler.pkg }}
compiler_name: ${{ matrix.compiler.exe }}
2 changes: 1 addition & 1 deletion .github/workflows/style.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ jobs:
- name: Run clang-format style check for C/C++/Protobuf programs.
uses: jidicula/clang-format-action@v4.6.2
with:
clang-format-version: '14'
clang-format-version: '15'
check-path: ${{ matrix.path }}
17 changes: 17 additions & 0 deletions .github/workflows/ubuntu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,28 @@ env:
INSTALL_LOCATION: .local

jobs:

build:
timeout-minutes: 15

strategy:
matrix:
compiler:
- pkg: g++-11
exe: g++-11
- pkg: g++-10
exe: g++-10

runs-on: ubuntu-latest
if: "!contains(github.event.head_commit.message, '[skip ci]') && !contains(github.event.head_commit.message, '[ci skip]')"

steps:
- uses: actions/checkout@v3

- name: Install compiler
run: |
sudo apt update
sudo apt install -y ${{ matrix.compiler.pkg }}
- name: cache dependencies
uses: actions/cache@v2
Expand All @@ -37,6 +52,8 @@ jobs:
cmake --build build --target install --config Release
- name: configure
env:
CXX: ${{ matrix.compiler.exe }}
run: cmake -Bbuild -DCMAKE_INSTALL_PREFIX=$GITHUB_WORKSPACE/$INSTALL_LOCATION -DProject_ENABLE_CODE_COVERAGE=1

- name: build
Expand Down
60 changes: 56 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [Topology](#Topology)
- [Filter](#Filter)
- [Farm](#Farm)
- [Hatch](#Hatch)
- [Utilities](#Utilities)
- [Consumer](#Consumer)
- [Examples](#Examples)
Expand Down Expand Up @@ -202,9 +203,9 @@ The `std::optional` type was not used directly, since explicit use cases exist f
To __provide explicit syntax to your pipeline declaration__, a helper `Filter` caller can be used. This is a "call forwarding" wrapper that can use either `std::optional` or `yap::Filtered` return types:

```cpp
auto oddPrinter = yap::Pipeline{}
| gen
| yap::Filter(s2) // Explicitly declared filtering stage.
auto oddPrinter = yap::Pipeline{}
| gen
| yap::Filter(s2) // Explicitly declared filtering stage.
| intPrinter{};
```

Expand All @@ -214,6 +215,57 @@ __A stage following a filter__ should accept a `yap::Filtered<T>` input. It can

![farmed_stage](assets/farm_pattern.png)

### [Hatch](https://github.com/picanumber/yapp/blob/main/examples/basic/use_hatched.cpp)

A _hatching stage_ is one that can produce more than one outputs for a single input. In the diagram below, `S2` is such a stage:

![hatching_stage](assets/hatch_pattern.png)

Note that this process is conceptually different to producing a collection of objects. __A typical example where you might want to hatch your input is when processing text files__, say line by line. If the stage that produced the lines was to scan the whole text file and output a vector of text lines (strings) then you'd face the following deficiencies:

1. Extraneous memory used to hold the entirety of the text file. The program only needs a single line "in-flight" to do its processing.
2. The next stage has to wait until the whole file has been read. The "vector of lines" implies that text processing can only begin after the text file has been read.

Such a situation can be greatly improved if the "text reader" stage produces its output in a piece wise fashion: Each line that is ready, gets immediately pushed to the next stage for processing.

__To create a hatching stage__ use a callable that accepts `yap::Hatchable` objects as input, a class similar in logic to `yap::Filtered` that conveys how the stage does its processing:

1. The `yap::Hatchable` is convertible to `bool`. `true` means new input while `false` (empty optional) means you're still producing output from the previous input.
2. The hatching stage outputs an object that is convertible to `bool`, e.g. an `std::optional` or again `yap::Hatchable`. __The pipeline stops processing the same input when the output is `false`__, alternatively it keeps invoking the stage with an empty `yap::Hatchable` to produce more output from the same input.

```cpp
auto exampleHatchingStage = [](yap::Hatchable<int> input)
{
std::optional<char> ret;

if (val)
{
// New Input from previous stage. Input data is non empty.
std::optional<int> &curInput = input.data;
assert(curInput);
}
else
{
// Keep processing the last input from previous stage. Input data is empty.
assert(!input.data);
}

return ret; // Returning a contextually "false" object, here empty
// optional, means the input won't be hatched any more and
// the stage can process new values produced from the
// previous stage.
};
```

To __provide explicit syntax to your pipeline declaration__, a helper `OutputHatchable` caller can be used to denote the stage producing input for the hatching stage. This is a "call forwarding" wrapper that wraps the output of a stage into `yap::Hatchable` return types:

```cpp
auto hp = yap::Pipeline{}
| yap::OutputHatchable(generator) // The previous stage can be annotated.
| exampleHatchingStage
| sinkStage{};
```

## Utilities

Utilities that accompany the library are described here. Creating a huge suite of accompanying tools is a non-goal for this library, however there should be provision for patterns that are often encountered. In that spirit, the following tools are made.
Expand All @@ -232,7 +284,7 @@ Usage on a container `c` is pretty straightforward:
auto p1 = yap::Pipeline{} | yap::Consume(c.begin(), c.end()) | ...

// Input values are moved into the pipeline. Container has "moved-from" objects.
auto p2 = yap::Pipeline{} |
auto p2 = yap::Pipeline{} |
yap::Consume(std::make_move_iterator(c.begin()), std::make_move_iterator(c.end())) | ...
```
Expand Down
Binary file added assets/hatch_pattern.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions examples/basic/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ target_link_libraries(nthreaded_stages ${USED_LIBS})

add_executable(use_filtered use_filtered.cpp)
target_link_libraries(use_filtered ${USED_LIBS})

add_executable(use_hatched use_hatched.cpp)
target_link_libraries(use_hatched ${USED_LIBS})
56 changes: 56 additions & 0 deletions examples/basic/use_hatched.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "yap/pipeline.h"
#include "yap/runtime_utilities.h"
#include "yap/topology.h"

#include <chrono>
#include <iostream>
#include <optional>

using namespace std::chrono_literals;

int main()
{
std::vector<int> input{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

// Accepts Hatchable values, which means each input can produce multiple
// outputs. Such a production happens in a "piecewise" manner meaning that
// the stage does not return a collection of values; instead every piece of
// output is immediately pushed to the next stage and the stage is invoked
// again to produce the rest of the output.
auto hatchingTransform = [curVal = 0,
curChar = 'a'](yap::Hatchable<int> val) mutable {
std::optional<char> ret;

if (val)
{
// New Input from previous stage.
curChar = 'a' + *val.data;
curVal = *val.data;
ret.emplace(curChar);
}
else if (curVal-- > 0)
{
// Keep processing the last input from previous stage.
ret.emplace(curChar);
}

return ret; // Returning a contextually "false" object, here empty
// optional, means the input won't be hatched any more and
// the stage can process new values produced from the
// previous stage.
};

auto sink = [](std::optional<char> val) {
std::cout << "Output: " << val.value() << std::endl;
};

auto pl = yap::Pipeline{} |
yap::OutputHatchable(yap::Consume(input.begin(), input.end())) |
hatchingTransform | sink;

std::cout << "Processing\n";
pl.consume();
std::cout << "Finished\n";

return 0;
}
27 changes: 27 additions & 0 deletions include/yap/compile_time_utilities.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// © 2022 Nikolaos Athanasiou, github.com/picanumber
#pragma once

#include <type_traits>

namespace yap
{

namespace detail
{

template <class T, template <class...> class Template>
struct is_instantiation : std::false_type
{
};

template <template <class...> class Template, class... Args>
struct is_instantiation<Template<Args...>, Template> : std::true_type
{
};

} // namespace detail

template <class T, template <class...> class G>
concept instantiation_of = detail::is_instantiation<T, G>::value;

} // namespace yap
2 changes: 1 addition & 1 deletion include/yap/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#pragma once

#include "buffer_queue.h"
#include "pipeline_types_utilities.h"
#include "stage.h"
#include "type_utilities.h"

#include <memory>
#include <mutex>
Expand Down
File renamed without changes.
47 changes: 41 additions & 6 deletions include/yap/stage.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#pragma once

#include "buffer_queue.h"
#include "compile_time_utilities.h"
#include "runtime_utilities.h"
#include "topology.h"

Expand All @@ -22,9 +23,9 @@ namespace detail

// Process a transformation stage. Returns whether to keep processing.
template <class IN, class OUT>
std::enable_if_t<not detail::kIsFiltered<OUT>, bool> process(
Callable<IN, OUT> &op, std::shared_ptr<BufferQueue<std::future<IN>>> &input,
std::shared_ptr<BufferQueue<std::future<OUT>>> &output)
bool process(Callable<IN, OUT> &op,
std::shared_ptr<BufferQueue<std::future<IN>>> &input,
std::shared_ptr<BufferQueue<std::future<OUT>>> &output)
{
try
{
Expand All @@ -50,9 +51,10 @@ std::enable_if_t<not detail::kIsFiltered<OUT>, bool> process(

// Process a filtering transformation stage. Returns whether to keep processing.
template <class IN, class OUT>
std::enable_if_t<detail::kIsFiltered<OUT>, bool> process(
Callable<IN, OUT> &op, std::shared_ptr<BufferQueue<std::future<IN>>> &input,
std::shared_ptr<BufferQueue<std::future<OUT>>> &output)
requires(instantiation_of<OUT, Filtered>) bool
process(Callable<IN, OUT> &op,
std::shared_ptr<BufferQueue<std::future<IN>>> &input,
std::shared_ptr<BufferQueue<std::future<OUT>>> &output)
{
try
{
Expand All @@ -78,6 +80,39 @@ std::enable_if_t<detail::kIsFiltered<OUT>, bool> process(
}
}

// Process a hatching transformation stage. Returns whether to keep processing.
template <class IN, class OUT>
requires(instantiation_of<IN, Hatchable>) bool
process(Callable<IN, OUT> &op,
std::shared_ptr<BufferQueue<std::future<IN>>> &input,
std::shared_ptr<BufferQueue<std::future<OUT>>> &output)
{
try
{
auto result = op(input->pop().get());
while (result)
{
output->push(make_ready_future<OUT>(std::move(result)));
result = op(IN{});
}
return true;
}
catch (detail::ClosedError &e)
{
return false;
}
catch (GeneratorExit &e)
{
output->push(make_exceptional_future<OUT>(e));
return false;
}
catch (...)
{
// Op threw an exception. No point in propagating the data.
return true;
}
}

// Process a generator stage.
template <class OUT>
bool process(Callable<void, OUT> &op,
Expand Down
Loading

0 comments on commit f21aa00

Please sign in to comment.