| // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. |
| |
| #pragma once |
| |
| #if !defined(RXCPP_RX_SUBSCRIBER_HPP) |
| #define RXCPP_RX_SUBSCRIBER_HPP |
| |
| #include "rx-includes.hpp" |
| |
| namespace rxcpp { |
| |
| template<class T> |
| struct subscriber_base : public observer_base<T>, public subscription_base |
| { |
| typedef tag_subscriber subscriber_tag; |
| }; |
| |
| /*! |
| \brief binds an observer that consumes values with a composite_subscription that controls lifetime. |
| |
| \ingroup group-core |
| |
| */ |
| template<class T, class Observer = observer<T>> |
| class subscriber : public subscriber_base<T> |
| { |
| static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers"); |
| static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>"); |
| typedef subscriber<T, Observer> this_type; |
| typedef rxu::decay_t<Observer> observer_type; |
| |
| composite_subscription lifetime; |
| observer_type destination; |
| trace_id id; |
| |
| struct nextdetacher |
| { |
| ~nextdetacher() |
| { |
| trace_activity().on_next_return(*that); |
| if (do_unsubscribe) { |
| that->unsubscribe(); |
| } |
| } |
| nextdetacher(const this_type* that) |
| : that(that) |
| , do_unsubscribe(true) |
| { |
| } |
| template<class U> |
| void operator()(U u) { |
| trace_activity().on_next_enter(*that, u); |
| RXCPP_TRY { |
| that->destination.on_next(std::move(u)); |
| do_unsubscribe = false; |
| } RXCPP_CATCH(...) { |
| auto ex = rxu::current_exception(); |
| trace_activity().on_error_enter(*that, ex); |
| that->destination.on_error(std::move(ex)); |
| trace_activity().on_error_return(*that); |
| } |
| } |
| const this_type* that; |
| volatile bool do_unsubscribe; |
| }; |
| |
| struct errordetacher |
| { |
| ~errordetacher() |
| { |
| trace_activity().on_error_return(*that); |
| that->unsubscribe(); |
| } |
| errordetacher(const this_type* that) |
| : that(that) |
| { |
| } |
| inline void operator()(rxu::error_ptr ex) { |
| trace_activity().on_error_enter(*that, ex); |
| that->destination.on_error(std::move(ex)); |
| } |
| const this_type* that; |
| }; |
| |
| struct completeddetacher |
| { |
| ~completeddetacher() |
| { |
| trace_activity().on_completed_return(*that); |
| that->unsubscribe(); |
| } |
| completeddetacher(const this_type* that) |
| : that(that) |
| { |
| } |
| inline void operator()() { |
| trace_activity().on_completed_enter(*that); |
| that->destination.on_completed(); |
| } |
| const this_type* that; |
| }; |
| |
| subscriber(); |
| public: |
| typedef typename composite_subscription::weak_subscription weak_subscription; |
| |
| subscriber(const this_type& o) |
| : lifetime(o.lifetime) |
| , destination(o.destination) |
| , id(o.id) |
| { |
| } |
| subscriber(this_type&& o) |
| : lifetime(std::move(o.lifetime)) |
| , destination(std::move(o.destination)) |
| , id(std::move(o.id)) |
| { |
| } |
| |
| template<class U, class O> |
| friend class subscriber; |
| |
| template<class O> |
| subscriber( |
| const subscriber<T, O>& o, |
| typename std::enable_if< |
| !std::is_same<O, observer<T>>::value && |
| std::is_same<Observer, observer<T>>::value, void**>::type = nullptr) |
| : lifetime(o.lifetime) |
| , destination(o.destination.as_dynamic()) |
| , id(o.id) |
| { |
| } |
| |
| template<class U> |
| subscriber(trace_id id, composite_subscription cs, U&& o) |
| : lifetime(std::move(cs)) |
| , destination(std::forward<U>(o)) |
| , id(std::move(id)) |
| { |
| static_assert(!is_subscriber<U>::value, "cannot nest subscribers"); |
| static_assert(is_observer<U>::value, "must pass observer to subscriber"); |
| trace_activity().create_subscriber(*this); |
| } |
| |
| this_type& operator=(this_type o) { |
| lifetime = std::move(o.lifetime); |
| destination = std::move(o.destination); |
| id = std::move(o.id); |
| return *this; |
| } |
| |
| const observer_type& get_observer() const { |
| return destination; |
| } |
| observer_type& get_observer() { |
| return destination; |
| } |
| const composite_subscription& get_subscription() const { |
| return lifetime; |
| } |
| composite_subscription& get_subscription() { |
| return lifetime; |
| } |
| trace_id get_id() const { |
| return id; |
| } |
| |
| subscriber<T> as_dynamic() const { |
| return subscriber<T>(id, lifetime, destination.as_dynamic()); |
| } |
| |
| // observer |
| // |
| template<class V> |
| void on_next(V&& v) const { |
| if (!is_subscribed()) { |
| return; |
| } |
| nextdetacher protect(this); |
| protect(std::forward<V>(v)); |
| } |
| void on_error(rxu::error_ptr e) const { |
| if (!is_subscribed()) { |
| return; |
| } |
| errordetacher protect(this); |
| protect(std::move(e)); |
| } |
| void on_completed() const { |
| if (!is_subscribed()) { |
| return; |
| } |
| completeddetacher protect(this); |
| protect(); |
| } |
| |
| // composite_subscription |
| // |
| bool is_subscribed() const { |
| return lifetime.is_subscribed(); |
| } |
| weak_subscription add(subscription s) const { |
| return lifetime.add(std::move(s)); |
| } |
| template<class F> |
| auto add(F f) const |
| -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { |
| return lifetime.add(make_subscription(std::move(f))); |
| } |
| void remove(weak_subscription w) const { |
| return lifetime.remove(std::move(w)); |
| } |
| void clear() const { |
| return lifetime.clear(); |
| } |
| void unsubscribe() const { |
| return lifetime.unsubscribe(); |
| } |
| |
| }; |
| |
| template<class T, class Observer> |
| auto make_subscriber( |
| subscriber<T, Observer> o) |
| -> subscriber<T, Observer> { |
| return subscriber<T, Observer>(std::move(o)); |
| } |
| |
| // observer |
| // |
| |
| template<class T> |
| auto make_subscriber() |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); |
| } |
| |
| template<class T, class I> |
| auto make_subscriber( |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(const Observer& o) |
| -> typename std::enable_if< |
| is_observer<Observer>::value && |
| !is_subscriber<Observer>::value, |
| subscriber<T, Observer>>::type { |
| return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); |
| } |
| template<class T, class OnNext> |
| auto make_subscriber(const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| } |
| template<class T, class OnNext, class OnError> |
| auto make_subscriber(const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| } |
| template<class T, class OnNext, class OnCompleted> |
| auto make_subscriber(const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| } |
| template<class T, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| } |
| |
| // explicit lifetime |
| // |
| |
| template<class T> |
| auto make_subscriber(const composite_subscription& cs) |
| -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); |
| } |
| |
| template<class T, class I> |
| auto make_subscriber(const composite_subscription& cs, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); |
| } |
| template<class T, class I> |
| auto make_subscriber(const composite_subscription& cs, |
| const subscriber<T, I>& s) |
| -> subscriber<T, I> { |
| return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer()); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| !is_subscriber<Observer>::value && |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o)); |
| } |
| template<class T, class OnNext> |
| auto make_subscriber(const composite_subscription& cs, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| } |
| template<class T, class OnNext, class OnError> |
| auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| } |
| template<class T, class OnNext, class OnCompleted> |
| auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| } |
| template<class T, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| } |
| |
| // explicit id |
| // |
| |
| template<class T> |
| auto make_subscriber(trace_id id) |
| -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); |
| } |
| |
| template<class T> |
| auto make_subscriber(trace_id id, const composite_subscription& cs) |
| -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); |
| } |
| |
| template<class T, class I> |
| auto make_subscriber(trace_id id, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o); |
| } |
| template<class T, class I> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| return subscriber<T, observer<T, I>>(std::move(id), cs, o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(trace_id id, const Observer& o) |
| -> typename std::enable_if< |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| return subscriber<T, Observer>(std::move(id), composite_subscription(), o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| return subscriber<T, Observer>(std::move(id), cs, o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(trace_id id, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o); |
| } |
| template<class T, class Observer> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| return subscriber<T, observer<T, Observer>>(std::move(id), cs, o); |
| } |
| template<class T, class OnNext> |
| auto make_subscriber(trace_id id, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| } |
| template<class T, class OnNext> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| } |
| template<class T, class OnNext, class OnError> |
| auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| } |
| template<class T, class OnNext, class OnError> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| } |
| template<class T, class OnNext, class OnCompleted> |
| auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| } |
| template<class T, class OnNext, class OnCompleted> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| } |
| template<class T, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), composite_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| } |
| template<class T, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| } |
| |
| // chain defaults from subscriber |
| // |
| |
| template<class T, class OtherT, class OtherObserver, class I> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class I> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) |
| -> typename std::enable_if< |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) |
| -> typename std::enable_if< |
| !is_subscription<Observer>::value && |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), scbr.get_subscription(), |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| |
| template<class T, class OtherT, class OtherObserver, class I> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& , const composite_subscription& cs, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); |
| } |
| template<class T, class OtherT, class OtherObserver, class I> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>&, trace_id id, const composite_subscription& cs, |
| const observer<T, I>& o) |
| -> subscriber<T, observer<T, I>> { |
| return subscriber<T, observer<T, I>>(std::move(id), cs, o); |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| is_observer<Observer>::value, |
| subscriber<T, Observer>>::type { |
| auto r = subscriber<T, Observer>(std::move(id), cs, o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class Observer> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !is_subscriber<Observer>::value && |
| !is_subscription<Observer>::value && |
| !is_observer<Observer>::value, |
| subscriber<T, observer<T, Observer>>>::type { |
| auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext>(on)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> |
| auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { |
| auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| |
| template<class T, class Observer> |
| auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs) |
| -> subscriber<T, Observer> { |
| auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer()); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| template<class T, class Observer> |
| auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs) |
| -> subscriber<T, Observer> { |
| auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer()); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| |
| template<class T, class Observer> |
| auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id) |
| -> subscriber<T, Observer> { |
| auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer()); |
| trace_activity().connect(r, scbr); |
| return r; |
| } |
| |
| } |
| |
| #endif |