diff --git a/src/subscription_manager/model/__init__.py b/src/subscription_manager/model/__init__.py index a7bfbd6925..d2d15debb9 100644 --- a/src/subscription_manager/model/__init__.py +++ b/src/subscription_manager/model/__init__.py @@ -15,6 +15,7 @@ import logging from rhsm.certificate2 import CONTENT_ACCESS_CERT_TYPE +from rhsmlib.facts.hwprobe import HardwareCollector log = logging.getLogger(__name__) @@ -89,10 +90,12 @@ def __getitem__(self, key): return self._entitlements[key] -def find_content(ent_source, content_type=None): +def find_content(ent_source, content_type=None, use_os_release_product=False): """ Scan all entitlements looking for content of the given type. (string) Type will be compared case insensitive. + If use_os_release_product is enabled, generate a os-product-version tag and + add it ot the list of product_tags Returns a list of model.Content. """ @@ -101,10 +104,15 @@ def find_content(ent_source, content_type=None): content_labels = set() log.debug("Searching for content of type: %s" % content_type) for entitlement in ent_source: + if use_os_release_product: + all_product_tags = add_os_product_tags(ent_source.product_tags) + else: + all_product_tags = ent_source.product_tags + for content in entitlement.contents: # this is basically matching_content from repolib if content.content_type.lower() == content_type.lower() and content_tag_match( - content.tags, ent_source.product_tags + content.tags, all_product_tags ): if entitlement.entitlement_type == CONTENT_ACCESS_CERT_TYPE: content_access_entitlement_content[content.label] = content @@ -119,6 +127,19 @@ def find_content(ent_source, content_type=None): return entitled_content +def add_os_product_tags(product_tags): + """Add [os-id]-[os-version] tag to product tag list based on /etc/os-release + + Returns the product_tags including the os-product tag + """ + all_tags = product_tags + + dist_info = HardwareCollector().get_distribution() + os_product = "{name}-{version}".format(name=dist_info[4], version=dist_info[1]) + all_tags.append(os_product) + return all_tags + + def content_tag_match(content_tags, product_tags): """See if content required tags are provided by installed products. diff --git a/src/subscription_manager/repolib.py b/src/subscription_manager/repolib.py index 59d6e020a5..f5387fb939 100644 --- a/src/subscription_manager/repolib.py +++ b/src/subscription_manager/repolib.py @@ -516,13 +516,31 @@ def get_unique_content(self) -> Iterable[Repo]: # assumes items in content_list are hashable return set(content_list) + def use_os_release_product_enabled(self): + try: + use_os_release_product = conf["rhsm"].get_int("use_os_release_product") + except ValueError as e: + log.exception(e) + return False + except configparser.Error as e: + log.exception(e) + return False + else: + if use_os_release_product is None: + return False + return bool(use_os_release_product) + # Expose as public API for RepoActionInvoker.is_managed, since that # is used by Openshift tooling. # See https://bugzilla.redhat.com/show_bug.cgi?id=1223038 def matching_content(self) -> List["Content"]: content = [] for content_type in ALLOWED_CONTENT_TYPES: - content += model.find_content(self.ent_source, content_type=content_type) + content += model.find_content( + self.ent_source, + content_type=content_type, + use_os_release_product=self.use_os_release_product_enabled(), + ) return content def get_all_content(self, baseurl: str, ca_cert: str) -> List[Repo]: diff --git a/test/test_model.py b/test/test_model.py index 672cc6f279..1dde0f49f9 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -1,4 +1,5 @@ from unittest import mock +from unittest.mock import patch from . import fixture @@ -80,6 +81,29 @@ def ent_source(self): return es +class TestAddOsProductTags(fixture.SubManFixture): + @patch( + "rhsmlib.facts.hwprobe.HardwareCollector.get_distribution", + return_value=("", "12", "", "", "debian", []), + ) + def test_add_os_product_tags_empty(self, mock_get_distribution): + product_tags = [] + all_tags = model.add_os_product_tags(product_tags) + self.assertEqual(len(all_tags), 1) + self.assertEqual(all_tags[0], "debian-12") + + @patch( + "rhsmlib.facts.hwprobe.HardwareCollector.get_distribution", + return_value=("", "12", "", "", "debian", []), + ) + def test_add_os_product_tags_with_product_tags(self, mock_get_distribution): + product_tags = ["awesomeproduct-1"] + all_tags = model.add_os_product_tags(product_tags) + self.assertEqual(len(all_tags), 2) + self.assertEqual(all_tags[0], "awesomeproduct-1") + self.assertEqual(all_tags[1], "debian-12") + + class TestContentTagMatch(fixture.SubManFixture): def test_empty_content_empty_product(self): content_tags = [] @@ -220,6 +244,24 @@ def test_product_tags_and_content_tags_no_match(self): self.assertEqual(len(res), 0) + @patch( + "rhsmlib.facts.hwprobe.HardwareCollector.get_distribution", + return_value=("", "12", "", "", "debian", []), + ) + def test_product_tags_and_os_product_tags(self, mock_get_distribution): + content1 = create_mock_content(tags=["debian-12"], content_type="ostree") + content2 = create_mock_content(name="more-test-content", tags=["ubuntu-18"], content_type="ostree") + content_list = [content1, content2] + + entitlement = model.Entitlement(contents=content_list) + es = model.EntitlementSource() + es._entitlements = [entitlement] + + res = model.find_content(es, content_type="ostree", use_os_release_product=True) + + self.assertEqual(len(res), 1) + self.assertEqual(res[0], content1) + def test_product_tags_and_content_tags_no_match_no_product_tags(self): content1 = create_mock_content(tags=["awesomeos-ostree-23"], content_type="ostree") content2 = create_mock_content(