Skip to content

Commit

Permalink
Merge pull request #209 from kirkshoop/operatorfixes
Browse files Browse the repository at this point in the history
make operator | work better
  • Loading branch information
Kirk Shoop committed Mar 11, 2016
2 parents 18605f6 + 33ccae2 commit 998e06d
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 90 deletions.
60 changes: 31 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@ The Reactive Extensions for Native (__RxCpp__) is a library for composing asynch
Windows: [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446)
Linux & OSX: [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp)

[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)

[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)

Github: [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v2.2.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)

NuGet: [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/)
[![NuGet downloads](http://img.shields.io/nuget/dt/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/)

[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)

[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)

#Example
Add ```Rx/v2/src``` to the include paths

[![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html)

```cpp

#include "rxcpp/rx.hpp"
using namespace rxcpp;
using namespace rxcpp::sources;
Expand All @@ -31,41 +30,42 @@ using namespace rxcpp::util;
#include <random>
using namespace std;

//using rxcpp::operators::sum;

int main()
{
random_device rd; // non-deterministic generator
mt19937 gen(rd());
uniform_int_distribution<> dist(4, 18);

// produce byte stream that contains lines of text
// for testing purposes, produce byte stream that from lines of text
auto bytes = range(1, 10) |
flat_map([&](int i){
return from(
from((uint8_t)('A' + i)) |
repeat(dist(gen)),
from((uint8_t)'\r')) |
concat();
flat_map([&](int i){
auto body = from((uint8_t)('A' + i)) |
repeat(dist(gen)) |
as_dynamic();
auto delim = from((uint8_t)'\r');
return from(body, delim) | concat();
}) |
window(17) |
flat_map([](observable<uint8_t> w){
flat_map([](observable<uint8_t> w){
return w |
reduce(
vector<uint8_t>(),
vector<uint8_t>(),
[](vector<uint8_t>& v, uint8_t b){
v.push_back(b);
v.push_back(b);
return move(v);
},
[](vector<uint8_t>& v){return move(v);}) |
as_dynamic();
}) |
as_dynamic();
}) |
filter([](vector<uint8_t>& v){
tap([](vector<uint8_t>& v){
// print input packet of bytes
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
cout << endl;
return true;
cout << endl;
});

//
// recover lines of text from byte stream
//

// create strings split on \r
auto strings = bytes |
concat_map([](vector<uint8_t> v){
Expand All @@ -75,6 +75,9 @@ int main()
sregex_token_iterator end;
vector<string> splits(cursor, end);
return iterate(move(splits));
}) |
filter([](string& s){
return !s.empty();
});

// group strings by line
Expand All @@ -83,18 +86,17 @@ int main()
group_by(
[=](string& s) mutable {
return s.back() == '\r' ? group++ : group;
},
[](string& s) { return move(s);});
});

// reduce the strings for a line into one string
auto lines = linewindows |
flat_map([](grouped_observable<int, string> w){
return w | sum();
flat_map([](grouped_observable<int, string> w){
return w | sum();
});

// print result
lines |
subscribe(println(cout));
subscribe<string>(println(cout));

return 0;
}
Expand Down
78 changes: 41 additions & 37 deletions Rx/v2/examples/linesfrombytes/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "rxcpp/rx.hpp"
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;

#include <regex>
Expand All @@ -14,64 +15,67 @@ int main()
mt19937 gen(rd());
uniform_int_distribution<> dist(4, 18);

// produce byte stream that contains lines of text
auto bytes = range(1, 10).
map([&](int i){
return from((uint8_t)('A' + i)).
repeat(dist(gen)).
concat(from((uint8_t)'\r'));
}).
merge().
window(17).
map([](observable<uint8_t> w){
return w.
// for testing purposes, produce byte stream that from lines of text
auto bytes = range(1, 10) |
flat_map([&](int i){
auto body = from((uint8_t)('A' + i)) |
repeat(dist(gen)) |
as_dynamic();
auto delim = from((uint8_t)'\r');
return from(body, delim) | concat();
}) |
window(17) |
flat_map([](observable<uint8_t> w){
return w |
reduce(
vector<uint8_t>(),
vector<uint8_t>(),
[](vector<uint8_t>& v, uint8_t b){
v.push_back(b);
v.push_back(b);
return move(v);
},
[](vector<uint8_t>& v){return move(v);}).
as_dynamic();
}).
merge().
filter([](vector<uint8_t>& v){
}) |
as_dynamic();
}) |
tap([](vector<uint8_t>& v){
// print input packet of bytes
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
cout << endl;
return true;
cout << endl;
});

//
// recover lines of text from byte stream
//

// create strings split on \r
auto strings = bytes.
map([](vector<uint8_t> v){
auto strings = bytes |
concat_map([](vector<uint8_t> v){
string s(v.begin(), v.end());
regex delim(R"/(\r)/");
sregex_token_iterator cursor(s.begin(), s.end(), delim, {-1, 0});
sregex_token_iterator end;
cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
cregex_token_iterator end;
vector<string> splits(cursor, end);
return iterate(move(splits));
}).
concat();
}) |
filter([](string& s){
return !s.empty();
});

// group strings by line
int group = 0;
auto linewindows = strings.
auto linewindows = strings |
group_by(
[=](string& s) mutable {
return s.back() == '\r' ? group++ : group;
},
[](string& s) { return move(s);});
});

// reduce the strings for a line into one string
auto lines = linewindows.
map([](grouped_observable<int, string> w){
return w.sum();
}).
merge();
auto lines = linewindows |
flat_map([](grouped_observable<int, string> w) {
return w | sum();
});

// print result
lines.
subscribe(println(cout));
lines |
subscribe<string>(println(cout));

return 0;
}
21 changes: 20 additions & 1 deletion Rx/v2/src/rxcpp/operators/rx-concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,31 @@ class concat_factory

}

template<class Coordination>
inline auto concat()
-> detail::concat_factory<identity_one_worker> {
return detail::concat_factory<identity_one_worker>(identity_current_thread());
}

template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type>
auto concat(Coordination&& sf)
-> detail::concat_factory<Coordination> {
return detail::concat_factory<Coordination>(std::forward<Coordination>(sf));
}

template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type>
auto concat(O0&& o0, ON&&... on)
-> detail::concat_factory<identity_one_worker> {
return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...));
}

template<class Coordination, class O0, class... ON,
class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type,
class CheckO = typename std::enable_if<is_observable<O0>::value>::type>
auto concat(Coordination&& sf, O0&& o0, ON&&... on)
-> detail::concat_factory<Coordination> {
return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...));
}

}

}
Expand Down
15 changes: 14 additions & 1 deletion Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct concat_traits {

static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");

typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
};

template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
Expand Down Expand Up @@ -267,6 +267,19 @@ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
}

template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
auto concat_map(CollectionSelector&& s, Coordination&& sf)
-> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}

template<class CollectionSelector>
auto concat_map(CollectionSelector&& s)
-> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
}


}

}
Expand Down
18 changes: 15 additions & 3 deletions Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct flat_map_traits {

static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");

typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;

static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");

Expand All @@ -43,7 +43,7 @@ struct flat_map_traits {

static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");

typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
};

template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
Expand Down Expand Up @@ -234,8 +234,20 @@ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
return detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
}

template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
auto flat_map(CollectionSelector&& s, Coordination&& sf)
-> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}

template<class CollectionSelector>
auto flat_map(CollectionSelector&& s)
-> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
}

#endif
}

}

#endif
14 changes: 13 additions & 1 deletion Rx/v2/src/rxcpp/operators/rx-group_by.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class group_by_factory
template<class Observable>
struct group_by_factory_traits
{
typedef rxu::value_type_t<Observable> value_type;
typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
};
Expand All @@ -209,6 +209,18 @@ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p)
return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p));
}

template<class KeySelector, class MarbleSelector>
inline auto group_by(KeySelector ks, MarbleSelector ms)
-> detail::group_by_factory<KeySelector, MarbleSelector, rxu::less> {
return detail::group_by_factory<KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less());
}

template<class KeySelector>
inline auto group_by(KeySelector ks)
-> detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less> {
return detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less>(std::move(ks), rxu::take_at<0>(), rxu::less());
}


}

Expand Down
5 changes: 5 additions & 0 deletions Rx/v2/src/rxcpp/operators/rx-merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class merge_factory

}

inline auto merge()
-> detail::merge_factory<identity_one_worker> {
return detail::merge_factory<identity_one_worker>(identity_current_thread());
}

template<class Coordination>
auto merge(Coordination&& sf)
-> detail::merge_factory<Coordination> {
Expand Down
Loading

0 comments on commit 998e06d

Please sign in to comment.