diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 5122b45e..8c3c6a4a 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -9,7 +9,7 @@ repos:
   - hooks:
       - id: isort
     repo: https://github.com/timothycrosley/isort
-    rev: 5.10.1
+    rev: 5.12.0
   - hooks:
       - id: black
     repo: https://github.com/psf/black
diff --git a/reactivex/__init__.py b/reactivex/__init__.py
index a707f978..9a2761fd 100644
--- a/reactivex/__init__.py
+++ b/reactivex/__init__.py
@@ -232,6 +232,43 @@ def combine_latest(*__sources: Observable[Any]) -> Observable[Any]:
     return combine_latest_(*__sources)
 
 
+def combine_throttle(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
+    """Merges the specified observable sequences into one observable
+    sequence by creating a result whenever all of the
+    observable sequences have produced an element at a corresponding
+    index. Faster observables, that emits events more frequently, are
+    throttled, so that they match speed of slower observables.
+    Speed of emitting items matches speed of slowest source observable.
+    It is somewhat similar to :func:`reactivex.zip` operator, returns tuple,
+    but items of faster observables are dropped, so that only latest values are
+    at each index.
+    It is also similar to :func:`reactivex.combine_latest`, but emits new item
+    only when all sources produce new item. Only latest items are included in
+    resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`.
+
+    .. marble::
+        :alt: combine_throttle
+
+        --1---2-3--------4---|
+        -a-------b--c-d------|
+        [ combine_throttle() ]
+        --1,a----3,b-----4,d-|
+
+    Example:
+        >>> res = combine_throttle(obs1, obs2)
+
+    Args:
+        args: Observable sources to combine.
+
+    Returns:
+        An observable sequence containing the result of combining
+        elements of the sources as tuple.
+    """
+    from .observable.combinethrottle import combine_throttle_
+
+    return combine_throttle_(*args)
+
+
 def concat(*sources: Observable[_T]) -> Observable[_T]:
     """Concatenates all of the specified observable sequences.
 
@@ -1303,6 +1340,7 @@ def zip(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
     "catch_with_iterable",
     "create",
     "combine_latest",
+    "combine_throttle",
     "compose",
     "concat",
     "concat_with_iterable",
diff --git a/reactivex/observable/combinethrottle.py b/reactivex/observable/combinethrottle.py
new file mode 100644
index 00000000..9569f4b1
--- /dev/null
+++ b/reactivex/observable/combinethrottle.py
@@ -0,0 +1,76 @@
+from asyncio import Future
+from threading import RLock
+from typing import Any, Callable, List, Optional, Tuple
+
+from reactivex import Observable, abc, from_future
+from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
+from reactivex.internal import synchronized
+
+
+def combine_throttle_(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
+    """Merges the specified observable sequences into one observable
+    sequence by creating a tuple whenever all of the observable sequences
+    have produced an element at a corresponding index.
+
+    Example:
+        >>> res = combine_throttle(source)
+
+    Args:
+        args: Observable sources to combine_throttle.
+
+    Returns:
+        An observable sequence containing the result of combining
+        elements of the sources as a tuple.
+    """
+
+    n = len(args)
+
+    sources = list(args)
+
+    def subscribe(
+        observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
+    ) -> CompositeDisposable:
+
+        flags = (1 << (n - 1)) & 0  # Reserve n zero bits.
+        full_mask = 1 << (n - 1)
+        full_mask |= full_mask - 1  # Create mask with n 1 bits.
+        lock = RLock()
+
+        results: List[None] = [None] * n
+
+        def create_on_next(i: int) -> Callable[[Any], None]:
+            @synchronized(lock)
+            def on_next(item: Any) -> None:
+                nonlocal flags
+                results[i] = item
+                flags |= 1 << i
+                if flags == full_mask:
+                    flags = 0
+                    observer.on_next(tuple(results))
+
+            return on_next
+
+        subscriptions: List[abc.DisposableBase] = []
+
+        for i in range(len(sources)):
+            source: Observable[Any] = sources[i]
+            if isinstance(source, Future):
+                source = from_future(source)
+
+            sad = SingleAssignmentDisposable()
+
+            sad.disposable = source.subscribe(
+                create_on_next(i),
+                observer.on_error,
+                observer.on_completed,
+                scheduler=scheduler,
+            )
+
+            subscriptions.append(sad)
+
+        return CompositeDisposable(subscriptions)
+
+    return Observable(subscribe=subscribe)
+
+
+__all__ = ["combine_throttle_"]
diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py
index 0983dfa2..9f7e53b2 100644
--- a/reactivex/operators/__init__.py
+++ b/reactivex/operators/__init__.py
@@ -430,6 +430,45 @@ def combine_latest(
     return combine_latest_(*others)
 
 
+def combine_throttle(
+    *args: Observable[Any],
+) -> Callable[[Observable[Any]], Observable[Any]]:
+    """Merges the specified observable sequences into one observable
+    sequence by creating a result whenever all of the
+    observable sequences have produced an element at a corresponding
+    index. Faster observables, that emits events more frequently, are
+    throttled, so that they match speed of slower observables.
+    Speed of emitting items matches speed of slowest source observable.
+    It is somewhat similar to :func:`reactivex.zip` operator, returns tuple,
+    but items of faster observables are dropped, so that only latest values are
+    at each index.
+    It is also similar to :func:`reactivex.combine_latest`, but emits new item
+    only when all sources produce new item. Only latest items are included in
+    resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`.
+
+    .. marble::
+        :alt: combine_throttle
+
+        --1---2-3--------4---|
+        -a-------b--c-d------|
+        [ combine_throttle() ]
+        --1,a----3,b-----4,d-|
+
+    Example:
+        >>> res = combine_throttle(obs1, obs2)
+
+    Args:
+        args: Observable sources to combine.
+
+    Returns:
+        An observable sequence containing the result of combining
+        elements of the sources as tuple.
+    """
+    from ._combinethrottle import combine_throttle_
+
+    return combine_throttle_(*args)
+
+
 def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T]]:
     """Concatenates all the observable sequences.
 
diff --git a/reactivex/operators/_combinethrottle.py b/reactivex/operators/_combinethrottle.py
new file mode 100644
index 00000000..0ca2e8d3
--- /dev/null
+++ b/reactivex/operators/_combinethrottle.py
@@ -0,0 +1,31 @@
+from typing import Any, Callable, Tuple
+
+import reactivex
+from reactivex import Observable
+
+
+def combine_throttle_(
+    *args: Observable[Any],
+) -> Callable[[Observable[Any]], Observable[Tuple[Any, ...]]]:
+    def _combine_throttle(source: Observable[Any]) -> Observable[Tuple[Any, ...]]:
+        """Merges the specified observable sequences into one observable
+        sequence by creating a tuple whenever all of the observable sequences
+        have produced an element at a corresponding index.
+
+        Example:
+            >>> res = combine_throttle(source)
+
+        Args:
+            args: Observable sources to combine_throttle.
+
+        Returns:
+            An observable sequence containing the result of combining
+            elements of the sources as a tuple.
+        """
+
+        return reactivex.combine_throttle(source, *args)
+
+    return _combine_throttle
+
+
+__all__ = ["combine_throttle_"]
diff --git a/tests/test_observable/test_combinethrottle.py b/tests/test_observable/test_combinethrottle.py
new file mode 100644
index 00000000..51f67808
--- /dev/null
+++ b/tests/test_observable/test_combinethrottle.py
@@ -0,0 +1,288 @@
+import unittest
+from typing import List
+
+import reactivex
+from reactivex import operators as ops
+from reactivex.observable.observable import Observable
+from reactivex.testing import ReactiveTest, TestScheduler
+from reactivex.testing.recorded import Recorded
+
+on_next = ReactiveTest.on_next
+on_completed = ReactiveTest.on_completed
+on_error = ReactiveTest.on_error
+subscribe = ReactiveTest.subscribe
+subscribed = ReactiveTest.subscribed
+disposed = ReactiveTest.disposed
+created = ReactiveTest.created
+
+
+class TestCombineThrottle(unittest.TestCase):
+    def test_combine_throttle_never_never(self):
+        scheduler = TestScheduler()
+        o1 = reactivex.never()
+        o2 = reactivex.never()
+
+        def create():
+            return o1.pipe(ops.combine_throttle(o2))
+
+        results = scheduler.start(create)
+        assert results.messages == []
+
+    def test_combine_throttle_never_empty(self):
+        scheduler = TestScheduler()
+        msgs = [on_next(150, 1), on_completed(210)]
+        o1 = reactivex.never()
+        o2 = scheduler.create_hot_observable(msgs)
+
+        def create():
+            return o1.pipe(ops.combine_throttle(o2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_completed(210)]
+
+    def test_combine_throttle_empty_empty(self):
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_completed(210)]
+        msgs2 = [on_next(150, 1), on_completed(210)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_completed(210)]
+
+    def test_combine_throttle_empty_non_empty(self):
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_completed(210)]
+        msgs2 = [on_next(150, 1), on_next(215, 2), on_completed(220)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_completed(210)]
+
+    def test_combine_throttle_non_empty_empty(self):
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_completed(210)]
+        msgs2 = [on_next(150, 1), on_next(215, 2), on_completed(220)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e2.pipe(ops.combine_throttle(e1))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_completed(210)]
+
+    def test_combine_throttle_never_non_empty(self):
+        scheduler = TestScheduler()
+        msgs = [on_next(150, 1), on_next(215, 2), on_completed(220)]
+        e1 = scheduler.create_hot_observable(msgs)
+        e2 = reactivex.never()
+
+        def create():
+            return e2.pipe(ops.combine_throttle(e1))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_completed(220)]
+
+    def test_combine_throttle_non_empty_never(self):
+        scheduler = TestScheduler()
+        msgs = [on_next(150, 1), on_next(215, 2), on_completed(220)]
+        e1 = scheduler.create_hot_observable(msgs)
+        e2 = reactivex.never()
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_completed(220)]
+
+    def test_combine_throttle_non_empty_non_empty(self):
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)]
+        msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_next(220, (2, 3)), on_completed(230)]
+
+    def test_combine_throttle_empty_error(self):
+        ex = Exception("ex")
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_completed(230)]
+        msgs2 = [on_next(150, 1), on_error(220, ex)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex)]
+
+    def test_combine_throttle_error_empty(self):
+        ex = Exception("ex")
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_completed(230)]
+        msgs2 = [on_next(150, 1), on_error(220, ex)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e2.pipe(ops.combine_throttle(e1))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex)]
+
+    def test_combine_throttle_never_error(self):
+        ex = Exception("ex")
+        scheduler = TestScheduler()
+        msgs2 = [on_next(150, 1), on_error(220, ex)]
+        e1 = reactivex.never()
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex)]
+
+    def test_combine_throttle_error_never(self):
+        ex = Exception("ex")
+        scheduler = TestScheduler()
+        msgs2 = [on_next(150, 1), on_error(220, ex)]
+        e1 = reactivex.never()
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e2.pipe(ops.combine_throttle(e1))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex)]
+
+    def test_combine_throttle_error_error(self):
+        ex1 = Exception("ex1")
+        ex2 = Exception("ex2")
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_error(230, ex1)]
+        msgs2 = [on_next(150, 1), on_error(220, ex2)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e2.pipe(ops.combine_throttle(e1))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex2)]
+
+    def test_combine_throttle_some_error(self):
+        ex = Exception("ex")
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)]
+        msgs2 = [on_next(150, 1), on_error(220, ex)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex)]
+
+    def test_combine_throttle_error_some(self):
+        ex = Exception("ex")
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)]
+        msgs2 = [on_next(150, 1), on_error(220, ex)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e2.pipe(ops.combine_throttle(e1))
+
+        results = scheduler.start(create)
+        assert results.messages == [on_error(220, ex)]
+
+    def test_combine_throttle_different_speeds(self):
+        scheduler = TestScheduler()
+        msgs1 = [
+            on_next(150, 1),
+            on_next(215, 2),
+            on_next(230, 3),
+            on_next(240, 4),
+            on_next(290, 5),
+            on_completed(310),
+        ]
+        msgs2 = [
+            on_next(150, "a"),
+            on_next(210, "b"),
+            on_next(250, "c"),
+            on_next(270, "d"),
+            on_next(280, "e"),
+            on_completed(300),
+        ]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert e1.subscriptions == [subscribe(200, 300)]
+        assert results.messages == [
+            on_next(215, (2, "b")),
+            on_next(250, (4, "c")),
+            on_next(290, (5, "e")),
+            on_completed(300),
+        ]
+
+    def test_combine_throttle_one_after_other(self):
+        scheduler = TestScheduler()
+        msgs1 = [on_next(150, 1), on_next(215, 2), on_next(230, 3), on_completed(240)]
+        msgs2 = [on_next(250, "a"), on_next(260, "b"), on_completed(270)]
+        e1 = scheduler.create_hot_observable(msgs1)
+        e2 = scheduler.create_hot_observable(msgs2)
+
+        def create():
+            return e1.pipe(ops.combine_throttle(e2))
+
+        results = scheduler.start(create)
+        assert e1.subscriptions == [subscribe(200, 240)]
+        assert results.messages == [on_completed(240)]
+
+    def test_combine_throttle_100_observables_with_linearly_increased_speeds(self):
+        scheduler = TestScheduler()
+
+        obeservables: List[Observable[int]] = []
+        all_msgs: List[List[Recorded[int]]] = []
+
+        for i in range(1, 101):
+            msgs: List[Recorded[int]] = []
+            for j in range(0, 200, i):
+                msgs.append(on_next(201 + j, i))
+
+            msgs.append(on_completed(500))
+
+            obeservables.append(scheduler.create_hot_observable(msgs))
+            all_msgs.append(msgs)
+
+        def create():
+            return obeservables[0].pipe(ops.combine_throttle(*obeservables[1:]))
+
+        results = scheduler.start(create)
+
+        assert results.messages == [
+            on_next(201, tuple(range(1, 101))),
+            on_next(301, tuple(range(1, 101))),
+            on_completed(500),
+        ]