Skip to content

Commit 0821f9c

Browse files
committed
CP-54481: support DMV RPU plugin
add unit test for DMV code Signed-off-by: Chunjie Zhu <[email protected]>
1 parent 2bb35de commit 0821f9c

File tree

2 files changed

+313
-49
lines changed

2 files changed

+313
-49
lines changed

tests/test_dmv.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
"""tests/test_dmv.py: Unit test for xcp/dmv.py"""
2+
import unittest
3+
from unittest import mock
4+
import types
5+
import json
6+
import errno
7+
from typing import Dict, Union
8+
from xcp import dmv
9+
10+
11+
class TestDMV(unittest.TestCase):
12+
@mock.patch("os.listdir")
13+
def test_get_all_kabi_dirs(self, m_listdir):
14+
m_listdir.return_value = ["4.19.0", "5.10.0"]
15+
dirs = dmv.get_all_kabi_dirs()
16+
self.assertIn(("4.19.0", "/lib/modules/4.19.0/updates", "/lib/modules/4.19.0/dmv"), dirs)
17+
self.assertIn(("5.10.0", "/lib/modules/5.10.0/updates", "/lib/modules/5.10.0/dmv"), dirs)
18+
19+
def test_note_offset(self):
20+
self.assertEqual(dmv.note_offset(1), 3)
21+
self.assertEqual(dmv.note_offset(4), 0)
22+
self.assertEqual(dmv.note_offset(5), 3)
23+
24+
def test_id_matches(self):
25+
self.assertTrue(dmv.id_matches("*", "1234"))
26+
self.assertTrue(dmv.id_matches("1234", "*"))
27+
self.assertTrue(dmv.id_matches("1234", "1234"))
28+
self.assertFalse(dmv.id_matches("1234", "5678"))
29+
30+
def test_pci_matches_true(self):
31+
present = {"vendor": "14e4", "device": "163c", "subvendor": "*", "subdevice": "*"}
32+
driver_pci_ids = {
33+
"abc.ko": [
34+
{"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"}
35+
]
36+
}
37+
self.assertTrue(dmv.pci_matches(present, driver_pci_ids))
38+
39+
def test_pci_matches_false(self):
40+
present = {"vendor": "abcd", "device": "9999", "subvendor": "*", "subdevice": "*"}
41+
driver_pci_ids = {
42+
"abc.ko": [
43+
{"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"}
44+
]
45+
}
46+
self.assertFalse(dmv.pci_matches(present, driver_pci_ids))
47+
48+
@mock.patch("re.compile")
49+
def test_hardware_present_true(self, m_compile):
50+
m = mock.Mock()
51+
m.finditer.return_value = [
52+
mock.Mock(groupdict=lambda: {"vendor": "14e4", "device": "163c", "subvendor": "*", "subdevice": "*"})
53+
]
54+
m_compile.return_value = m
55+
pci_ids = {
56+
"abc.ko": [
57+
{"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"}
58+
]
59+
}
60+
self.assertTrue(dmv.hardware_present("dummy", pci_ids))
61+
62+
@mock.patch("re.compile")
63+
def test_hardware_present_false_01(self, m_compile):
64+
self.assertFalse(dmv.hardware_present("dummy", None))
65+
66+
@mock.patch("re.compile")
67+
def test_hardware_present_false_02(self, m_compile):
68+
m = mock.Mock()
69+
m.finditer.return_value = [
70+
mock.Mock(groupdict=lambda: {"vendor": "abcd", "device": "9999", "subvendor": "*", "subdevice": "*"})
71+
]
72+
m_compile.return_value = m
73+
pci_ids = {
74+
"abc.ko": [
75+
{"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"}
76+
]
77+
}
78+
self.assertFalse(dmv.hardware_present("dummy", pci_ids))
79+
80+
@mock.patch("os.path.isfile")
81+
@mock.patch("builtins.open", new_callable=mock.mock_open)
82+
@mock.patch("struct.calcsize")
83+
@mock.patch("struct.unpack")
84+
def test_get_active_variant(self, m_unpack, m_calcsize, m_open, m_isfile):
85+
m_isfile.return_value = True
86+
m_calcsize.return_value = 12
87+
m_unpack.return_value = (9, 3, 1)
88+
fake_file = mock.Mock()
89+
fake_file.read.side_effect = [
90+
b"x"*12, # header
91+
b"", # offset
92+
b"XenServer", b"", b"v1\x00", b"", # vendor, offset, content, offset
93+
]
94+
m_open.return_value.__enter__.return_value = fake_file
95+
result = dmv.get_active_variant(["foo.ko"])
96+
self.assertEqual(result, "v1")
97+
98+
m_isfile.return_value = False
99+
result = dmv.get_active_variant(["foo.ko"])
100+
self.assertEqual(result, None)
101+
102+
@mock.patch("os.path.isfile")
103+
def test_get_loaded_modules(self, m_isfile):
104+
m_isfile.side_effect = lambda path: "foo" in path
105+
result = dmv.get_loaded_modules(["foo.ko", "bar.ko"])
106+
self.assertEqual(result, ["foo.ko"])
107+
108+
@mock.patch("os.path.islink")
109+
@mock.patch("os.path.realpath")
110+
@mock.patch("os.path.dirname")
111+
@mock.patch("builtins.open", new_callable=mock.mock_open, read_data='{"variant": "v1"}')
112+
@mock.patch("json.load")
113+
def test_variant_selected(self, m_json_load, m_open, m_dirname, m_realpath, m_islink):
114+
m_islink.return_value = True
115+
m_realpath.return_value = "/some/dir"
116+
m_dirname.return_value = "/some/dir"
117+
m_json_load.return_value = {"variant": "v1"}
118+
d = dmv.DriverMultiVersion("/updates", None)
119+
result = d.variant_selected(["foo.ko"])
120+
self.assertEqual(result, "v1")
121+
122+
m_islink.return_value = False
123+
d = dmv.DriverMultiVersion("/updates", None)
124+
result = d.variant_selected(["foo.ko"])
125+
self.assertEqual(result, None)
126+
127+
@mock.patch("xcp.dmv.open_with_codec_handling")
128+
@mock.patch("xcp.dmv.hardware_present")
129+
def test_parse_dmv_info01(self, m_hw_present, m_open_codec):
130+
m_hw_present.return_value = True
131+
info_json = {
132+
"category": "net",
133+
"name": "foo",
134+
"description": "desc",
135+
"variant": "v1",
136+
"version": "1.0",
137+
"priority": 1,
138+
"status": "ok",
139+
"pci_ids": {
140+
"foo.ko": [
141+
{"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"}
142+
]
143+
}
144+
}
145+
m_open_codec.return_value.__enter__.return_value = mock.Mock(
146+
spec=["read"], read=lambda: json.dumps(info_json)
147+
)
148+
with mock.patch("json.load", return_value=info_json):
149+
lspci_out = types.SimpleNamespace(stdout="dummy")
150+
d = dmv.DriverMultiVersion("", lspci_out)
151+
json_data, json_formatted = d.parse_dmv_info("dummy")
152+
self.assertEqual(json_data["name"], "foo")
153+
self.assertEqual(json_formatted["type"], "net")
154+
self.assertTrue(json_formatted["variants"]["v1"]["hardware_present"])
155+
156+
@mock.patch("xcp.dmv.open_with_codec_handling")
157+
@mock.patch("xcp.dmv.hardware_present")
158+
@mock.patch("xcp.dmv.get_active_variant")
159+
def test_parse_dmv_info02(self, m_active_variant, m_hw_present, m_open_codec):
160+
m_active_variant.return_value = "foo"
161+
m_hw_present.return_value = True
162+
info_json = {
163+
"category": "net",
164+
"name": "foo",
165+
"description": "desc",
166+
"variant": "v1",
167+
"version": "1.0",
168+
"priority": 1,
169+
"status": "ok",
170+
"pci_ids": {
171+
"foo.ko": [
172+
{"vendor_id": "14e4", "device_id": "163c", "subvendor_id": "*", "subdevice_id": "*"}
173+
]
174+
}
175+
}
176+
m_open_codec.return_value.__enter__.return_value = mock.Mock(
177+
spec=["read"], read=lambda: json.dumps(info_json)
178+
)
179+
with mock.patch("json.load", return_value=info_json):
180+
lspci_out = types.SimpleNamespace(stdout="dummy")
181+
d = dmv.DriverMultiVersion("", lspci_out, runtime=True)
182+
json_data, json_formatted = d.parse_dmv_info("dummy")
183+
self.assertEqual(json_data["name"], "foo")
184+
self.assertEqual(json_formatted["type"], "net")
185+
self.assertTrue(json_formatted["variants"]["v1"]["hardware_present"])
186+
self.assertEqual(json_formatted["active"], "foo")
187+
188+
def test_merge_jsondata(self):
189+
oldone = {
190+
"type": "net",
191+
"friendly_name": "foo",
192+
"description": "desc",
193+
"info": "foo",
194+
"variants": {"v1": {"version": "1.0"}},
195+
"selected": "v1",
196+
"active": "v1",
197+
"loaded modules": ["foo.ko"]
198+
}
199+
newone = {
200+
"type": "net",
201+
"friendly_name": "foo",
202+
"description": "desc",
203+
"info": "foo",
204+
"variants": {"v2": {"version": "2.0"}},
205+
"selected": None,
206+
"active": None,
207+
"loaded modules": ["bar.ko"]
208+
}
209+
mgr = dmv.DriverMultiVersionManager(runtime=True)
210+
mgr.merge_jsondata(oldone, newone)
211+
merged = mgr.dmv_list["drivers"]["foo"]
212+
self.assertIn("v1", merged["variants"])
213+
self.assertIn("v2", merged["variants"])
214+
self.assertEqual(merged["selected"], "v1")
215+
self.assertEqual(merged["active"], "v1")
216+
self.assertEqual(merged["loaded modules"], ["foo.ko", "bar.ko"])
217+
218+
oldobj: Dict[str, Union[str, Dict, None] = {
219+
"type": "storage",
220+
"friendly_name": "foo",
221+
"description": "desc",
222+
"info": "foo",
223+
"variants": {"v1": {"version": "1.0"}},
224+
"selected": None,
225+
"active": None,
226+
"loaded modules": ["foo.ko"]
227+
}
228+
newobj = {
229+
"type": "storage",
230+
"friendly_name": "foo",
231+
"description": "desc",
232+
"info": "foo",
233+
"variants": {"v2": {"version": "2.0"}},
234+
"selected": "v2",
235+
"active": "v2",
236+
"loaded modules": ["bar.ko"]
237+
}
238+
mgr = dmv.DriverMultiVersionManager(runtime=True)
239+
mgr.merge_jsondata(oldobj, newobj)
240+
merged = mgr.dmv_list["drivers"]["foo"]
241+
self.assertIn("v1", merged["variants"])
242+
self.assertIn("v2", merged["variants"])
243+
self.assertEqual(merged["selected"], "v2")
244+
self.assertEqual(merged["active"], "v2")
245+
self.assertEqual(merged["loaded modules"], ["foo.ko", "bar.ko"])
246+
247+
def test_process_dmv_data(self):
248+
mgr = dmv.DriverMultiVersionManager()
249+
json_data = {"name": "foo"}
250+
json_formatted = {"type": "net"}
251+
mgr.process_dmv_data(json_data, json_formatted)
252+
self.assertEqual(mgr.dmv_list["drivers"]["foo"], json_formatted)
253+
254+
@mock.patch("xcp.dmv.subprocess.run")
255+
@mock.patch("xcp.dmv.glob.glob")
256+
@mock.patch("xcp.dmv.os.makedirs")
257+
@mock.patch("xcp.dmv.os.symlink")
258+
@mock.patch("xcp.dmv.os.rename")
259+
@mock.patch("xcp.dmv.os.path.join", side_effect=lambda *args: "/".join(args))
260+
@mock.patch("xcp.dmv.get_all_kabi_dirs")
261+
def test_create_dmv_symlink(self, m_get_dirs, m_join, m_rename, m_symlink, m_makedirs, m_glob, m_run):
262+
m_get_dirs.return_value = [("5.10.0", "/lib/modules/5.10.0/updates", "/lib/modules/5.10.0/dmv")]
263+
m_glob.return_value = ["/lib/modules/5.10.0/dmv/foo/1.0/bar.ko"]
264+
m_symlink.side_effect = None
265+
m_rename.side_effect = None
266+
m_run.side_effect = None
267+
268+
mgr = dmv.DriverMultiVersionManager()
269+
result = mgr.create_dmv_symlink("foo", "1.0")
270+
self.assertTrue(result)
271+
m_makedirs.assert_called_with("/lib/modules/5.10.0/updates", exist_ok=True)
272+
m_symlink.assert_called()
273+
m_rename.assert_called()
274+
275+
def test_get_set_error(self):
276+
mgr = dmv.DriverMultiVersionManager()
277+
mgr.set_dmv_error(errno.ENOENT)
278+
err = mgr.get_dmv_error()
279+
self.assertEqual(err["exit_code"], errno.ENOENT)
280+
self.assertIn("No such file", err["message"])

xcp/dmv.py

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import struct
2929
import glob
3030
import errno
31+
from typing import Any, Dict
3132

3233
from .compat import open_with_codec_handling
3334

@@ -91,37 +92,36 @@ def id_matches(id1, id2):
9192
return True
9293
return id1 == id2
9394

94-
'''
95-
driver_pci_ids example:
96-
{
97-
"abc.ko": [
98-
{
99-
"vendor_id": "14e4",
100-
"device_id": "163c",
101-
"subvendor_id": "*",
102-
"subdevice_id": "*"
103-
},
104-
{
105-
"vendor_id": "14e4",
106-
"device_id": "163b",
107-
"subvendor_id": "*",
108-
"subdevice_id": "*"
109-
}],
110-
"de.ko": [
111-
{
112-
"vendor_id": "eees",
113-
"device_id": "163c",
114-
"subvendor_id": "*",
115-
"subdevice_id": "*"
116-
},
117-
{
118-
"vendor_id": "14f4",
119-
"device_id": "16db",
120-
"subvendor_id": "2123",
121-
"subdevice_id": "1123"
122-
}]
123-
}
124-
'''
95+
96+
# driver_pci_ids example:
97+
# {
98+
# "abc.ko": [
99+
# {
100+
# "vendor_id": "14e4",
101+
# "device_id": "163c",
102+
# "subvendor_id": "*",
103+
# "subdevice_id": "*"
104+
# },
105+
# {
106+
# "vendor_id": "14e4",
107+
# "device_id": "163b",
108+
# "subvendor_id": "*",
109+
# "subdevice_id": "*"
110+
# }],
111+
# "de.ko": [
112+
# {
113+
# "vendor_id": "eees",
114+
# "device_id": "163c",
115+
# "subvendor_id": "*",
116+
# "subdevice_id": "*"
117+
# },
118+
# {
119+
# "vendor_id": "14f4",
120+
# "device_id": "16db",
121+
# "subvendor_id": "2123",
122+
# "subdevice_id": "1123"
123+
# }]
124+
# }
125125
def pci_matches(present_pci_id, driver_pci_ids):
126126
"""Check if present PCI ID matches any of the driver PCI IDs."""
127127
merged_driver_pci_id_list = []
@@ -170,22 +170,6 @@ def hardware_present(lspci_out, pci_ids):
170170
return True
171171
return False
172172

173-
def variant_selected(modules, updates_dir):
174-
"""Check and return which driver is selected"""
175-
# Check if any module in the modules is selected
176-
for module in modules:
177-
slink_file = os.path.join(updates_dir, module)
178-
if os.path.islink(slink_file):
179-
module_path = os.path.realpath(slink_file)
180-
module_dir = os.path.dirname(module_path)
181-
info_file = os.path.join(module_dir, "info.json")
182-
with open(info_file, "r", encoding="ascii") as json_file:
183-
json_data = json.load(json_file)
184-
variant = json_data["variant"]
185-
186-
return variant
187-
return None
188-
189173
class DriverMultiVersion(object):
190174
def __init__(self, updates_dir, lspci_out, runtime=False):
191175
self.updates_dir = updates_dir
@@ -242,7 +226,7 @@ def __init__(self, runtime=False):
242226
"protocol": {"version": dmv_proto_ver},
243227
"operation": {"reboot": False},
244228
"drivers": {}
245-
}
229+
} # type: Dict[str, Any]
246230
self.errors_list = {
247231
"version": err_proto_ver,
248232
"exit_code": 0,
@@ -296,7 +280,7 @@ def parse_dmv_list(self):
296280
for _, updates_dir, dmv_dir in get_all_kabi_dirs():
297281
if not os.path.isdir(dmv_dir):
298282
continue
299-
283+
300284
for path, _, files in os.walk(dmv_dir):
301285
if "info.json" not in files:
302286
continue

0 commit comments

Comments
 (0)