Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mergeDelayError #205

Open
bartvanhoutte opened this issue Feb 25, 2020 · 2 comments
Open

Implement mergeDelayError #205

bartvanhoutte opened this issue Feb 25, 2020 · 2 comments

Comments

@bartvanhoutte
Copy link
Contributor

Is there any way to implement mergeDelayError using the existing operators? If not, it would be very handy to have this operator.

@bartvanhoutte
Copy link
Contributor Author

Something like this perhaps?

final class MergeDelayErrorOperator implements OperatorInterface
{

    private ObservableInterface $subsequentObservable;

    /**
     * MergeDelayErrorOperator constructor.
     *
     * @param ObservableInterface $subsequentObservable
     */
    public function __construct(ObservableInterface $subsequentObservable)
    {
        $this->subsequentObservable = $subsequentObservable;
    }

    public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
    {
        $errors = [];

        return $observable
          ->materialize()
          ->merge($this->subsequentObservable->materialize())
          ->filter(static function ($event) use (&$errors) {
              $class = get_class($event);

              if ($class === OnErrorNotification::class) {
                  $errors[] = $event;
                  return false;
              } elseif ($class === OnCompletedNotification::class) {
                  return false;
              }

              return true;
          })
          ->dematerialize()
          ->subscribe(
            [$observer, 'onNext'],
            [$observer, 'onError'],
            static function () use ($observer, &$errors) {
                array_map(fn(Notification $n) => $n->accept($observer), $errors);
                $observer->onCompleted();
            }
          );
    }

}

@mbonneau
Copy link
Member

mbonneau commented Mar 4, 2020

@bartvanhoutte I have tested a couple of ideas. Here is my preferred solution:

Observable::fromArray([$observable1, $observable2])
                ->reduce(function ($a, Observable $o) {
                    $s = new Subject();
                    return [
                        $a[0]->merge($o->catch(function (\Throwable $e) use ($s) {
                            $s->onError($e);
                            return Observable::empty();
                        })),
                        $a[1]->merge($s)
                    ];
                }, [Observable::empty(), Observable::empty()])
                ->flatMap(function ($a) {
                    return $a[0]->concat($a[1]);
                });

Here it is implemented as a function in the Observable class:

    public function mergeDelayError(Observable $o) : Observable {
        return Observable::fromArray([$this, $o])
            ->reduce(function ($a, Observable $o) {
                $s = new Subject();
                return [
                    $a[0]->merge($o->catch(function (\Throwable $e) use ($s) {
                        $s->onError($e);
                        return Observable::empty();
                    })),
                    $a[1]->merge($s)
                ];
            }, [Observable::empty(), Observable::empty()])
            ->flatMap(function ($a) {
                return $a[0]->concat($a[1]);
            });
    }

Below is a test of the solution:

<?php


namespace Rx\Functional\Operator;


use Rx\Functional\FunctionalTestCase;
use Rx\Observable;
use Rx\Subject\Subject;

class MergeDelayErrorTest extends FunctionalTestCase
{
    /**
     * @test
     */
    public function it_waits_for_complete_before_emitting_error()
    {
        $xs = $this->createColdObservable(array(
                                              onNext(100, 4),
                                              onNext(200, 2),
                                              onNext(300, 3),
                                              onNext(400, 1),
                                              onCompleted(500)
                                          ));

        $ys = $this->createColdObservable(array(
                                              onNext(50, 'foo'),
                                              onNext(100, 'bar'),
                                              onNext(150, 'baz'),
                                              onError(160, new \Exception()),
                                              onNext(200, 'qux'),
                                              onCompleted(250)
                                          ));

        $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) {
            $xs->mergeDelayError($ys);
        });

        $this->assertMessages(array(
                                  onNext(250, 'foo'),
                                  onNext(300, 4),
                                  onNext(300, 'bar'),
                                  onNext(350, 'baz'),
                                  onNext(400, 2),
                                  onNext(500, 3),
                                  onNext(600, 1),
                                  onError(700, new \Exception())
                              ), $results->getMessages());

        $this->assertSubscriptions(array(subscribe(200, 700)), $xs->getSubscriptions());
        $this->assertSubscriptions(array(subscribe(200, 360)), $ys->getSubscriptions());
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants