Skip to content

Commit

Permalink
评审会: 支持「未分配管控区域」- agent安装整体改造 (closed TencentBlueKing#2025)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed May 7, 2024
1 parent 0e623a6 commit dcb4af9
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
name: black
language: python
- repo: https://github.com/pycqa/isort
rev: 5.5.4
rev: 5.13.0
hooks:
- id: isort
args: [ "--profile", "black" ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ def search_business(bk_biz_id: int) -> Dict[str, Any]:
return CCApi.search_business(search_business_params)["info"][0]

def static_ip_selector(
self, sub_inst: models.SubscriptionInstanceRecord, cmdb_hosts: List[Dict[str, Any]]
self,
sub_inst: models.SubscriptionInstanceRecord,
cmdb_hosts: List[Dict[str, Any]],
unassigned_bk_host_ids: List[Optional[int]],
) -> SelectorResult:
"""
静态 IP 处理器
Expand Down Expand Up @@ -113,6 +116,19 @@ def static_ip_selector(
),
)
return SelectorResult(is_add=False, is_skip=True, sub_inst=sub_inst)
elif sub_inst.instance_info["host"].get("bk_host_id") in unassigned_bk_host_ids:
self.move_insts_to_failed(
[sub_inst.id],
log_content=_(
"主机期望被分配到业务【ID:{except_bk_biz_id}】,但实际业务【ID: {actual_biz_id}】"
"中已存在相同IP, host_id: {bk_host_id},请选择对应主机或者更改寻址方式为动态"
).format(
except_bk_biz_id=except_bk_biz_id,
actual_biz_id=cmdb_host["bk_biz_id"],
bk_host_id=cmdb_host["bk_host_id"],
),
)
return SelectorResult(is_add=False, is_skip=True, sub_inst=sub_inst)
else:
# 同业务下视为更新
sub_inst.instance_info["host"]["bk_host_id"] = cmdb_host["bk_host_id"]
Expand All @@ -126,7 +142,10 @@ def mark_and_replace_multi_inner_ip(self, exist_cmdb_hosts: List[Dict[str, Any]]
cmdb_host["bk_host_innerip"] = bk_host_inner_ip[0]

def dynamic_ip_selector(
self, sub_inst: models.SubscriptionInstanceRecord, cmdb_hosts: List[Dict[str, Any]]
self,
sub_inst: models.SubscriptionInstanceRecord,
cmdb_hosts: List[Dict[str, Any]],
unassigned_bk_host_ids: List[Optional[int]],
) -> SelectorResult:
"""
动态 IP 处理器
Expand Down Expand Up @@ -324,6 +343,47 @@ def handle_update_cmdb_hosts_case(
)
return sub_inst_ids

@controller.ConcurrentController(
data_list_name="sub_insts",
batch_call_func=concurrent.batch_call,
get_config_dict_func=core.get_config_dict,
get_config_dict_kwargs={"config_name": core.ServiceCCConfigName.HOST_WRITE.value},
)
def handle_update_cmdb_hosts_cloud_id_case(self, sub_insts: List[models.SubscriptionInstanceRecord]):
"""
批量更新 CMDB 主机管控区域
:param sub_insts: 订阅实例列表
:return: 返回成功更新的订阅实例 ID 列表
"""
if not sub_insts:
return []

sub_inst_ids: List[int] = []
update_list: List[Dict[str, Any]] = []
for sub_inst in sub_insts:
sub_inst_ids.append(sub_inst.id)
sub_inst.update_time = timezone.now()
host_info: Dict[str, Any] = sub_inst.instance_info["host"]
properties: Dict[str, Any] = {
"bk_cloud_id": host_info["bk_cloud_id"],
"bk_addressing": host_info.get("bk_addressing", constants.CmdbAddressingType.STATIC.value),
"bk_os_type": constants.BK_OS_TYPE[host_info["os_type"]],
}
update_params: Dict[str, Any] = {
"bk_host_ids": [host_info["bk_host_id"]],
"properties": {k: v for k, v in properties.items() if v is not None and v != ""},
}
self.log_info(
sub_inst_ids=sub_inst.id,
log_content=_("更新 CMDB 主机信息:\n {params}").format(params=json.dumps(update_params, indent=2)),
)
update_list.append(update_params)
CCApi.batch_update_host_all_properties({"update": update_list})
models.SubscriptionInstanceRecord.objects.bulk_update(
sub_insts, fields=["instance_info", "update_time"], batch_size=self.batch_size
)
return sub_inst_ids

@controller.ConcurrentController(
data_list_name="sub_insts",
batch_call_func=concurrent.batch_call,
Expand Down Expand Up @@ -500,6 +560,7 @@ def _execute(self, data, parent_data, common_data: CommonData):

sub_insts_to_be_added: List[models.SubscriptionInstanceRecord] = []
sub_insts_to_be_updated: List[models.SubscriptionInstanceRecord] = []
sub_insts_to_be_updated_cloud_id: List[models.SubscriptionInstanceRecord] = []
id__sub_inst_obj_map: Dict[int, models.SubscriptionInstanceRecord] = {}
# 获取已存在于 CMDB 的主机信息
exist_cmdb_hosts: List[Dict[str, Any]] = self.query_hosts(subscription_instances)
Expand All @@ -509,6 +570,15 @@ def _execute(self, data, parent_data, common_data: CommonData):
# 按 IpKey 聚合主机信息
# IpKey:ip(v4 or v6)+ bk_addressing(寻值方式)+ bk_cloud_id(管控区域)
cmdb_host_infos_gby_ip_key: Dict[str, List[Dict[str, Any]]] = self.get_host_infos_gby_ip_key(exist_cmdb_hosts)

# 查询未分配管控区域ID的主机
unassigned_cloud_id: List[Optional[int]] = models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.UNASSIGNED_BK_CLOUD_ID.value, default=[]
)
unassigned_bk_host_ids: List[Optional[int]] = [
host.bk_host_id for host in common_data.host_id_obj_map.values() if host.bk_cloud_id in unassigned_cloud_id
]

for sub_inst in subscription_instances:
id__sub_inst_obj_map[sub_inst.id] = sub_inst
host_info: Dict[str, Any] = sub_inst.instance_info["host"]
Expand All @@ -523,13 +593,19 @@ def _execute(self, data, parent_data, common_data: CommonData):

# 按照寻址方式通过不同的选择器,选择更新或新增主机到 CMDB
if bk_addressing == constants.CmdbAddressingType.DYNAMIC.value:
selector_result: SelectorResult = self.dynamic_ip_selector(sub_inst, cmdb_hosts_with_the_same_ips)
selector_result: SelectorResult = self.dynamic_ip_selector(
sub_inst, cmdb_hosts_with_the_same_ips, unassigned_bk_host_ids
)
else:
selector_result: SelectorResult = self.static_ip_selector(sub_inst, cmdb_hosts_with_the_same_ips)
selector_result: SelectorResult = self.static_ip_selector(
sub_inst, cmdb_hosts_with_the_same_ips, unassigned_bk_host_ids
)

if selector_result.is_skip:
# 选择器已处理,跳过
continue
elif host_info.get("bk_host_id") and host_info["bk_host_id"] in unassigned_bk_host_ids:
sub_insts_to_be_updated_cloud_id.append(selector_result.sub_inst)
elif selector_result.is_add:
sub_insts_to_be_added.append(selector_result.sub_inst)
else:
Expand All @@ -540,9 +616,16 @@ def _execute(self, data, parent_data, common_data: CommonData):
successfully_updated_sub_inst_ids: List[int] = self.handle_update_cmdb_hosts_case(
sub_insts=sub_insts_to_be_updated, host_ids_with_mutil_inner_ip=host_ids_with_mutil_inner_ip
)
successfully_updated_cloud_id_sub_inst_ids: List[int] = self.handle_update_cmdb_hosts_cloud_id_case(
sub_insts=sub_insts_to_be_updated_cloud_id
)

# 2 - 对操作成功的实例更新本地数据
succeed_sub_insts: List[models.SubscriptionInstanceRecord] = []
for sub_inst_id in successfully_added_sub_inst_ids + successfully_updated_sub_inst_ids:
for sub_inst_id in (
successfully_added_sub_inst_ids
+ successfully_updated_sub_inst_ids
+ successfully_updated_cloud_id_sub_inst_ids
):
succeed_sub_insts.append(id__sub_inst_obj_map[sub_inst_id])
self.handle_update_db(succeed_sub_insts)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import importlib
import random
import typing
from typing import Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set

import mock

Expand Down Expand Up @@ -151,6 +151,10 @@ def init_mock_clients(self):
return_type=mock_data_utils.MockReturnType.SIDE_EFFECT.value,
return_obj=self.add_host_to_business_idle_func,
),
batch_update_host_all_properties_return=mock_data_utils.MockReturn(
return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value,
return_obj={},
),
)

def fetch_succeeded_sub_inst_ids(self) -> List[int]:
Expand Down Expand Up @@ -325,3 +329,79 @@ def assert_in_teardown(self):
)
)
super().assert_in_teardown()


class UnassignedHostsTestCase(AddOrUpdateHostsTestCase):
@classmethod
def get_default_case_name(cls) -> str:
return "未分配管控区域主机"

@classmethod
def structure_cmdb_mock_data(cls):
"""
构造CMDB接口返回数据
:return:
"""
super().structure_cmdb_mock_data()
cls.list_hosts_without_biz_result = {"count": 0, "info": []}

@classmethod
def adjust_test_data_in_db(cls):
models.GlobalSettings.set_config(models.GlobalSettings.KeyEnum.UNASSIGNED_BK_CLOUD_ID.value, [900001])
models.Host.objects.filter(bk_host_id__in=cls.to_be_updated_host_ids).update(bk_cloud_id=900001)

def assert_in_teardown(self):
not_updated_count = models.Host.objects.filter(
bk_host_id__in=self.to_be_updated_host_ids, bk_cloud_id=900001
).count()
# 检测是否全部被更新
self.assertEqual(not_updated_count, 0)


class UnassignedDuplicateHostsTestCase(UnassignedHostsTestCase):
@classmethod
def get_default_case_name(cls) -> str:
return "未分配管控区域主机,存在相同主机"

@classmethod
def structure_cmdb_mock_data(cls):
"""
构造CMDB接口返回数据
:return:
"""
super().structure_cmdb_mock_data()
host = models.Host.objects.filter(bk_host_id__in=cls.to_be_updated_host_ids, bk_addressing="STATIC").first()
host_info: Dict = copy.deepcopy(api_mkd.cmdb.unit.CMDB_HOST_INFO)
host_info.update(
{
"bk_host_id": host.bk_host_id,
"bk_addressing": host.bk_addressing,
"bk_host_innerip": host.inner_ip,
"bk_host_outerip": host.outer_ipv6,
"bk_host_innerip_v6": host.inner_ipv6,
"bk_host_outerip_v6": host.outer_ipv6,
}
)
cls.list_hosts_without_biz_result = {"count": 1, "info": [host_info]}
cls.error_instance_id = f"host|instance|host|{host.inner_ip}-{host.bk_cloud_id}-0"

def assert_in_teardown(self):
not_updated_count = models.Host.objects.filter(
bk_host_id__in=self.to_be_updated_host_ids, bk_cloud_id=900001
).count()
# 检测是否全部被更新
self.assertEqual(not_updated_count, 1)

def structure_common_inputs(self) -> Dict[str, Any]:
"""
构造原子的公共输入,基础的输入数据对标 apps/backend/components/collections/base.py inputs_format
:return:
"""
inputs = super().structure_common_inputs()
subscription_instance_ids = [
sub_inst_obj.id
for sub_inst_obj in self.obj_factory.sub_inst_record_objs
if sub_inst_obj.instance_id != self.error_instance_id
]
inputs["subscription_instance_ids"] = subscription_instance_ids
return inputs
4 changes: 4 additions & 0 deletions apps/mock_data/api_mkd/cmdb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
find_host_biz_relations_return=None,
push_host_identifier_return=None,
find_host_identifier_push_result_return=None,
batch_update_host_all_properties_return=None,
):
super(CCApiMockClient, self).__init__()
self.search_business = self.generate_magic_mock(mock_return_obj=search_business_return)
Expand All @@ -63,6 +64,9 @@ def __init__(
self.find_host_identifier_push_result = self.generate_magic_mock(
mock_return_obj=find_host_identifier_push_result_return
)
self.batch_update_host_all_properties = self.generate_magic_mock(
mock_return_obj=batch_update_host_all_properties_return
)

# 记录接口调用
self.bind_host_agent = self.call_recorder.start(self.bind_host_agent, key=CCApi.bind_host_agent)
Expand Down
9 changes: 8 additions & 1 deletion apps/node_man/handlers/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
specific language governing permissions and limitations under the License.
"""
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Set
from typing import Any, Dict, Iterable, List, Optional, Set

from django.db.models import Count, Q
from django.utils import timezone
Expand All @@ -33,6 +33,7 @@
from apps.node_man.models import (
AccessPoint,
Cloud,
GlobalSettings,
Host,
IdentityData,
InstallChannel,
Expand Down Expand Up @@ -237,6 +238,11 @@ def list(self, params: dict, username: str):
# 获得拓扑结构数据
topology = CmdbHandler().cmdb_or_cache_topo(username, user_biz, biz_host_id_map)

# 获取未分配管控区域
unassigned_bk_cloud_id: List[Optional(int)] = GlobalSettings.get_config(
key=GlobalSettings.KeyEnum.UNASSIGNED_BK_CLOUD_ID.value, default=[]
)

# 汇总
for hs in hosts_status:
hs["status_display"] = const.PROC_STATUS_CHN.get(hs["status"], "")
Expand All @@ -247,6 +253,7 @@ def list(self, params: dict, username: str):
hs["job_result"] = host_id_job_status.get(hs["bk_host_id"], {})
hs["topology"] = topology.get(hs["bk_host_id"], [])
hs["operate_permission"] = hs["bk_biz_id"] in agent_operate_bizs
hs["is_unassigned"] = True if int(hs["bk_cloud_id"]) in unassigned_bk_cloud_id else False

result = {"total": hosts_status_count, "list": hosts_status}

Expand Down
30 changes: 29 additions & 1 deletion apps/node_man/handlers/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
ProxyNotAvaliableError,
)
from apps.node_man.handlers.cloud import CloudHandler
from apps.node_man.models import AccessPoint, Host, IdentityData, ProcessStatus
from apps.node_man.models import (
AccessPoint,
GlobalSettings,
Host,
IdentityData,
ProcessStatus,
)


def check_available_proxy():
Expand Down Expand Up @@ -378,13 +384,17 @@ def operate_ip_checker(
error_host: typing.Dict[str, typing.Dict],
op_type: str,
node_type: str,
unassigned_bk_host_ids: typing.List[typing.Optional[int]],
):

host_infos_with_the_same_ips: typing.List[
typing.Dict[str, typing.Any]
] = tools.HostV2Tools.get_host_infos_with_the_same_ips(
host_infos_gby_ip_key=host_infos_gby_ip_key, host_info=host_info, ip_field_names=["inner_ip", "inner_ipv6"]
)
bk_host_id: int = host_info.get("bk_host_id")
if bk_host_id and bk_host_id in unassigned_bk_host_ids:
return True

if not host_infos_with_the_same_ips:
error_host["msg"] = _("尚未被安装,无法执行 {op_type} 操作").format(op_type=const.JOB_TYPE_DICT[op_type + "_" + node_type])
Expand Down Expand Up @@ -479,6 +489,14 @@ def install_validate(
else:
host_id__agent_state_info_map = {}

# 查询未分配主机
unassigned_cloud_ids: typing.List[typing.Optional[int]] = GlobalSettings.get_config(
key=GlobalSettings.KeyEnum.UNASSIGNED_BK_CLOUD_ID.value, default=[]
)
unassigned_bk_host_ids: typing.List[typing.Optional[int]] = Host.objects.filter(
bk_host_id__in=[host.get("bk_host_id") for host in hosts], bk_cloud_id__in=unassigned_cloud_ids
).values_list("bk_host_id", flat=True)

for host in hosts:
ap_id = host.get("ap_id")
bk_biz_id = host["bk_biz_id"]
Expand Down Expand Up @@ -509,6 +527,15 @@ def install_validate(
if bk_cloud_id != const.DEFAULT_CLOUD and bk_cloud_id not in cloud_info:
raise CloudNotExistError(_("管控区域(ID:{bk_cloud_id}) 不存在").format(bk_cloud_id=bk_cloud_id))

# 检查:管控区域是否为未分配
if bk_cloud_id in unassigned_cloud_ids:
error_host["msg"] = _("管控区域(ID:{bk_cloud_id}) 为【未分配】的管控区域不允许安装Agent, 再选择可用管控区域").format(
bk_cloud_id=bk_cloud_id
)
error_host["exception"] = "cloud_unassigned"
ip_filter_list.append(error_host)
continue

# 检查:直连区域不允许安装 PROXY
if bk_cloud_id == const.DEFAULT_CLOUD and node_type == const.NodeType.PROXY:
raise ProxyNotAvaliableError(
Expand Down Expand Up @@ -556,6 +583,7 @@ def install_validate(
error_host=error_host,
op_type=op_type,
node_type=node_type,
unassigned_bk_host_ids=unassigned_bk_host_ids,
)

if is_check_pass:
Expand Down
Loading

0 comments on commit dcb4af9

Please sign in to comment.