|
| 1 | +import unittest |
| 2 | +import os |
| 3 | +import time |
| 4 | +import threading |
| 5 | +import random |
| 6 | +from mooncake.store import MooncakeDistributedStore |
| 7 | + |
| 8 | +# The lease time of the kv object, should be set equal to |
| 9 | +# the master's value. |
| 10 | +DEFAULT_DEFAULT_KV_LEASE_TTL = 5000 # 5000 milliseconds |
| 11 | +# Use environment variable if set, otherwise use default |
| 12 | +default_kv_lease_ttl = int(os.getenv("DEFAULT_KV_LEASE_TTL", DEFAULT_DEFAULT_KV_LEASE_TTL)) |
| 13 | + |
| 14 | + |
| 15 | +def get_client(store, local_buffer_size_param=None): |
| 16 | + """Initialize and setup the distributed store client.""" |
| 17 | + protocol = os.getenv("PROTOCOL", "tcp") |
| 18 | + device_name = os.getenv("DEVICE_NAME", "ibp6s0") |
| 19 | + local_hostname = os.getenv("LOCAL_HOSTNAME", "localhost") |
| 20 | + metadata_server = os.getenv("MC_METADATA_SERVER", "http://127.0.0.1:8080/metadata") |
| 21 | + global_segment_size = 3200 * 1024 * 1024 # 3200 MB |
| 22 | + local_buffer_size = ( |
| 23 | + local_buffer_size_param if local_buffer_size_param is not None |
| 24 | + else 512 * 1024 * 1024 # 512 MB |
| 25 | + ) |
| 26 | + real_client_address = "127.0.0.1:50052" |
| 27 | + |
| 28 | + retcode = store.setup( |
| 29 | + local_hostname, |
| 30 | + metadata_server, |
| 31 | + global_segment_size, |
| 32 | + local_buffer_size, |
| 33 | + protocol, |
| 34 | + device_name, |
| 35 | + real_client_address, |
| 36 | + use_dummy_client=True |
| 37 | + ) |
| 38 | + |
| 39 | + if retcode: |
| 40 | + raise RuntimeError(f"Failed to setup store client. Return code: {retcode}") |
| 41 | + |
| 42 | +class TestDistributedObjectStoreSingleStore(unittest.TestCase): |
| 43 | + """Test class for single store operations (no replication).""" |
| 44 | + |
| 45 | + @classmethod |
| 46 | + def setUpClass(cls): |
| 47 | + """Initialize the store once for all tests.""" |
| 48 | + cls.store = MooncakeDistributedStore() |
| 49 | + get_client(cls.store) |
| 50 | + |
| 51 | + def test_basic_put_get_exist_operations(self): |
| 52 | + """Test basic Put/Get/Exist operations through the Python interface.""" |
| 53 | + test_data = b"Hello, World!" |
| 54 | + key = "test_basic_key" |
| 55 | + |
| 56 | + # Test Put operation |
| 57 | + self.assertEqual(self.store.put(key, test_data), 0) |
| 58 | + |
| 59 | + # Verify data through Get operation |
| 60 | + self.assertEqual(self.store.get_size(key), len(test_data)) |
| 61 | + retrieved_data = self.store.get(key) |
| 62 | + self.assertEqual(retrieved_data, test_data) |
| 63 | + |
| 64 | + # Put again with the same key, should succeed |
| 65 | + self.assertEqual(self.store.put(key, test_data), 0) |
| 66 | + |
| 67 | + # Remove the key |
| 68 | + time.sleep(default_kv_lease_ttl / 1000) |
| 69 | + self.assertEqual(self.store.remove(key), 0) |
| 70 | + |
| 71 | + def test_batch_is_exist_operations(self): |
| 72 | + """Test batch is_exist operations through the Python interface.""" |
| 73 | + batch_size = 20 |
| 74 | + test_data = b"Hello, Batch World!" |
| 75 | + |
| 76 | + # Create test keys |
| 77 | + keys = [f"test_batch_exist_key_{i}" for i in range(batch_size)] |
| 78 | + |
| 79 | + # Put only the first half of the keys |
| 80 | + existing_keys = keys[:batch_size // 2] |
| 81 | + for key in existing_keys: |
| 82 | + self.assertEqual(self.store.put(key, test_data), 0) |
| 83 | + |
| 84 | + # Test batch_is_exist with mixed existing and non-existing keys |
| 85 | + results = self.store.batch_is_exist(keys) |
| 86 | + |
| 87 | + # Verify results |
| 88 | + self.assertEqual(len(results), len(keys)) |
| 89 | + |
| 90 | + # First half should exist (result = 1) |
| 91 | + for i in range(batch_size // 2): |
| 92 | + self.assertEqual(results[i], 1, f"Key {keys[i]} should exist but got {results[i]}") |
| 93 | + |
| 94 | + # Second half should not exist (result = 0) |
| 95 | + for i in range(batch_size // 2, batch_size): |
| 96 | + self.assertEqual(results[i], 0, f"Key {keys[i]} should not exist but got {results[i]}") |
| 97 | + |
| 98 | + # Test with empty keys list |
| 99 | + empty_results = self.store.batch_is_exist([]) |
| 100 | + self.assertEqual(len(empty_results), 0) |
| 101 | + |
| 102 | + # Test with single key |
| 103 | + single_result = self.store.batch_is_exist([existing_keys[0]]) |
| 104 | + self.assertEqual(len(single_result), 1) |
| 105 | + self.assertEqual(single_result[0], 1) |
| 106 | + |
| 107 | + # Test with non-existent key |
| 108 | + non_existent_result = self.store.batch_is_exist(["non_existent_key"]) |
| 109 | + self.assertEqual(len(non_existent_result), 1) |
| 110 | + self.assertEqual(non_existent_result[0], 0) |
| 111 | + |
| 112 | + # Clean up |
| 113 | + time.sleep(default_kv_lease_ttl / 1000) |
| 114 | + for key in existing_keys: |
| 115 | + self.assertEqual(self.store.remove(key), 0) |
| 116 | + |
| 117 | + def test_batch_get_into_operations(self): |
| 118 | + """Test batch_get_into operations for multiple keys.""" |
| 119 | + import ctypes |
| 120 | + |
| 121 | + # Test data |
| 122 | + batch_size = 3 |
| 123 | + test_data = [ |
| 124 | + b"Hello, Batch World 1! " * 100, # ~2.3KB |
| 125 | + b"Hello, Batch World 2! " * 200, # ~4.6KB |
| 126 | + b"Hello, Batch World 3! " * 150, # ~3.5KB |
| 127 | + ] |
| 128 | + keys = [f"test_batch_get_into_key_{i}" for i in range(batch_size)] |
| 129 | + |
| 130 | + # First, put the test data using regular put operations |
| 131 | + for i, (key, data) in enumerate(zip(keys, test_data)): |
| 132 | + result = self.store.put(key, data) |
| 133 | + self.assertEqual(result, 0, f"Failed to put data for key {key}") |
| 134 | + |
| 135 | + # Use a large spacing between buffers to avoid any overlap detection |
| 136 | + buffer_spacing = 1024 * 1024 # 1MB spacing between buffers |
| 137 | + |
| 138 | + # Allocate one large buffer with significant spacing |
| 139 | + total_buffer_size = buffer_spacing * batch_size |
| 140 | + large_buffer_ptr = self.store.alloc_from_mem_pool(total_buffer_size) |
| 141 | + large_buffer = (ctypes.c_char * total_buffer_size).from_address(large_buffer_ptr) |
| 142 | + |
| 143 | + # Register the entire large buffer once |
| 144 | + result = self.store.register_buffer(large_buffer_ptr, total_buffer_size) |
| 145 | + self.assertEqual(result, 0, "Buffer registration should succeed") |
| 146 | + |
| 147 | + # Create individual buffer views within the large buffer with spacing |
| 148 | + buffers = [] |
| 149 | + buffer_ptrs = [] |
| 150 | + buffer_sizes = [] |
| 151 | + |
| 152 | + for i, data in enumerate(test_data): |
| 153 | + # Calculate offset with large spacing to avoid any overlap issues |
| 154 | + offset = i * buffer_spacing |
| 155 | + buffer_ptr = large_buffer_ptr + offset |
| 156 | + |
| 157 | + buffers.append(large_buffer) # Keep reference to prevent GC |
| 158 | + buffer_ptrs.append(buffer_ptr) |
| 159 | + buffer_sizes.append(buffer_spacing) # Use full spacing as buffer size |
| 160 | + |
| 161 | + # Test batch_get_into |
| 162 | + results = self.store.batch_get_into(keys, buffer_ptrs, buffer_sizes) |
| 163 | + |
| 164 | + # Verify results |
| 165 | + self.assertEqual(len(results), batch_size, "Should return result for each key") |
| 166 | + |
| 167 | + for i, (expected_data, result) in enumerate(zip(test_data, results)): |
| 168 | + self.assertGreater(result, 0, f"batch_get_into should succeed for key {keys[i]}") |
| 169 | + self.assertEqual(result, len(expected_data), f"Should read correct number of bytes for key {keys[i]}") |
| 170 | + |
| 171 | + # Verify data integrity - read from the correct offset in the large buffer |
| 172 | + offset = i * buffer_spacing |
| 173 | + read_data = bytes(large_buffer[offset:offset + result]) |
| 174 | + self.assertEqual(read_data, expected_data, f"Data should match for key {keys[i]}") |
| 175 | + |
| 176 | + # Test error cases |
| 177 | + # Test with mismatched array sizes |
| 178 | + mismatched_results = self.store.batch_get_into(keys[:2], buffer_ptrs[:3], buffer_sizes[:3]) |
| 179 | + self.assertEqual(len(mismatched_results), 2, "Should return results for provided keys") |
| 180 | + for result in mismatched_results: |
| 181 | + self.assertLess(result, 0, "Should fail with mismatched array sizes") |
| 182 | + |
| 183 | + # Test with empty arrays |
| 184 | + empty_results = self.store.batch_get_into([], [], []) |
| 185 | + self.assertEqual(len(empty_results), 0, "Should return empty results for empty input") |
| 186 | + |
| 187 | + # Cleanup |
| 188 | + time.sleep(default_kv_lease_ttl / 1000) |
| 189 | + self.assertEqual(self.store.unregister_buffer(large_buffer_ptr), 0, "Buffer unregistration should succeed") |
| 190 | + for key in keys: |
| 191 | + self.assertEqual(self.store.remove(key), 0) |
| 192 | + |
| 193 | + def test_batch_put_from_operations(self): |
| 194 | + """Test batch_put_from operations for multiple keys.""" |
| 195 | + import ctypes |
| 196 | + |
| 197 | + # Test data |
| 198 | + batch_size = 3 |
| 199 | + test_data = [ |
| 200 | + b"Batch Put Data 1! " * 100, # ~1.8KB |
| 201 | + b"Batch Put Data 2! " * 200, # ~3.6KB |
| 202 | + b"Batch Put Data 3! " * 150, # ~2.7KB |
| 203 | + ] |
| 204 | + keys = [f"test_batch_put_from_key_{i}" for i in range(batch_size)] |
| 205 | + |
| 206 | + # Use a large spacing between buffers to avoid any overlap detection |
| 207 | + buffer_spacing = 1024 * 1024 # 1MB spacing between buffers |
| 208 | + |
| 209 | + # Allocate one large buffer with significant spacing |
| 210 | + total_buffer_size = buffer_spacing * batch_size |
| 211 | + large_buffer_ptr = self.store.alloc_from_mem_pool(total_buffer_size) |
| 212 | + large_buffer = (ctypes.c_char * total_buffer_size).from_address(large_buffer_ptr) |
| 213 | + |
| 214 | + # Register the entire large buffer once |
| 215 | + result = self.store.register_buffer(large_buffer_ptr, total_buffer_size) |
| 216 | + self.assertEqual(result, 0, "Buffer registration should succeed") |
| 217 | + |
| 218 | + # Create individual buffer views within the large buffer with spacing |
| 219 | + buffers = [] |
| 220 | + buffer_ptrs = [] |
| 221 | + buffer_sizes = [] |
| 222 | + |
| 223 | + for i, data in enumerate(test_data): |
| 224 | + # Calculate offset with large spacing to avoid any overlap issues |
| 225 | + offset = i * buffer_spacing |
| 226 | + buffer_ptr = large_buffer_ptr + offset |
| 227 | + |
| 228 | + # Copy test data to buffer |
| 229 | + ctypes.memmove(ctypes.c_void_p(buffer_ptr), data, len(data)) |
| 230 | + |
| 231 | + buffers.append(large_buffer) # Keep reference to prevent GC |
| 232 | + buffer_ptrs.append(buffer_ptr) |
| 233 | + buffer_sizes.append(len(data)) # Use actual data size for put_from |
| 234 | + |
| 235 | + # Test batch_put_from |
| 236 | + results = self.store.batch_put_from(keys, buffer_ptrs, buffer_sizes) |
| 237 | + |
| 238 | + # Verify results |
| 239 | + self.assertEqual(len(results), batch_size, "Should return result for each key") |
| 240 | + |
| 241 | + for i, result in enumerate(results): |
| 242 | + self.assertEqual(result, 0, f"batch_put_from should succeed for key {keys[i]}") |
| 243 | + |
| 244 | + # Verify data was stored correctly using regular get |
| 245 | + for i, (key, expected_data) in enumerate(zip(keys, test_data)): |
| 246 | + retrieved_data = self.store.get(key) |
| 247 | + self.assertEqual(retrieved_data, expected_data, f"Data should match after batch_put_from for key {key}") |
| 248 | + |
| 249 | + # Test error cases |
| 250 | + # Test with mismatched array sizes |
| 251 | + mismatched_results = self.store.batch_put_from(keys[:2], buffer_ptrs[:3], buffer_sizes[:3]) |
| 252 | + self.assertEqual(len(mismatched_results), 2, "Should return results for provided keys") |
| 253 | + for result in mismatched_results: |
| 254 | + self.assertLess(result, 0, "Should fail with mismatched array sizes") |
| 255 | + |
| 256 | + # Test with empty arrays |
| 257 | + empty_results = self.store.batch_put_from([], [], []) |
| 258 | + self.assertEqual(len(empty_results), 0, "Should return empty results for empty input") |
| 259 | + |
| 260 | + # Cleanup |
| 261 | + time.sleep(default_kv_lease_ttl / 1000) |
| 262 | + self.assertEqual(self.store.unregister_buffer(large_buffer_ptr), 0, "Buffer unregistration should succeed") |
| 263 | + for key in keys: |
| 264 | + self.assertEqual(self.store.remove(key), 0) |
| 265 | + |
| 266 | + # Mark this test as zzz_ so that it is the last test to run |
| 267 | + def zzz_test_dict_fuzz_e2e(self): |
| 268 | + """End-to-end fuzz test comparing distributed store behavior with dict. |
| 269 | + Performs ~1000 random operations (put, get, remove) with random value sizes between 1KB and 64MB. |
| 270 | + After testing, all keys are removed. |
| 271 | + """ |
| 272 | + import random |
| 273 | + # Local reference dict to simulate expected dict behavior |
| 274 | + reference = {} |
| 275 | + operations = 1000 |
| 276 | + # Use a pool of keys to limit memory consumption |
| 277 | + keys_pool = [f"key_{i}" for i in range(100)] |
| 278 | + # Track which keys have values assigned to ensure consistency |
| 279 | + key_values = {} |
| 280 | + # Fuzz record for debugging in case of errors |
| 281 | + fuzz_record = [] |
| 282 | + try: |
| 283 | + for i in range(operations): |
| 284 | + op = random.choice(["put", "get", "remove"]) |
| 285 | + key = random.choice(keys_pool) |
| 286 | + if op == "put": |
| 287 | + # If key already exists, use the same value to ensure consistency |
| 288 | + if key in key_values: |
| 289 | + value = key_values[key] |
| 290 | + size = len(value) |
| 291 | + else: |
| 292 | + size = random.randint(1, 64 * 1024 * 1024) |
| 293 | + value = os.urandom(size) |
| 294 | + key_values[key] = value |
| 295 | + |
| 296 | + fuzz_record.append(f"{i}: put {key} [size: {size}]") |
| 297 | + error_code = self.store.put(key, value) |
| 298 | + if error_code == -200: |
| 299 | + # The space is not enough, continue to next operation |
| 300 | + continue |
| 301 | + elif error_code == 0: |
| 302 | + reference[key] = value |
| 303 | + else: |
| 304 | + raise RuntimeError(f"Put operation failed for key {key}. Error code: {error_code}") |
| 305 | + elif op == "get": |
| 306 | + fuzz_record.append(f"{i}: get {key}") |
| 307 | + retrieved = self.store.get(key) |
| 308 | + if retrieved != b"": # Otherwise the key may have been evicted |
| 309 | + expected = reference.get(key, b"") |
| 310 | + self.assertEqual(retrieved, expected) |
| 311 | + elif op == "remove": |
| 312 | + fuzz_record.append(f"{i}: remove {key}") |
| 313 | + error_code = self.store.remove(key) |
| 314 | + # if remove did not fail due to the key has a lease |
| 315 | + if error_code != -706: |
| 316 | + reference.pop(key, None) |
| 317 | + # Also remove from key_values to allow new value if key is reused |
| 318 | + key_values.pop(key, None) |
| 319 | + except Exception as e: |
| 320 | + print(f"Error: {e}") |
| 321 | + print('\nFuzz record (operations so far):') |
| 322 | + for record in fuzz_record: |
| 323 | + print(record) |
| 324 | + raise e |
| 325 | + # Cleanup: ensure all remaining keys are removed |
| 326 | + time.sleep(default_kv_lease_ttl / 1000) |
| 327 | + for key in list(reference.keys()): |
| 328 | + self.store.remove(key) |
| 329 | + |
| 330 | + def test_replicate_config_creation_and_properties(self): |
| 331 | + """Test ReplicateConfig class creation and property access.""" |
| 332 | + from mooncake.store import ReplicateConfig |
| 333 | + |
| 334 | + # Test default constructor |
| 335 | + config = ReplicateConfig() |
| 336 | + self.assertEqual(config.replica_num, 1) |
| 337 | + self.assertEqual(config.with_soft_pin, False) |
| 338 | + self.assertEqual(config.preferred_segment, "") |
| 339 | + |
| 340 | + # Test property assignment |
| 341 | + config.replica_num = 3 |
| 342 | + config.with_soft_pin = True |
| 343 | + config.preferred_segment = "node1:12345" |
| 344 | + |
| 345 | + self.assertEqual(config.replica_num, 3) |
| 346 | + self.assertEqual(config.with_soft_pin, True) |
| 347 | + self.assertEqual(config.preferred_segment, "node1:12345") |
| 348 | + |
| 349 | + # Test string representation |
| 350 | + config_str = str(config) |
| 351 | + self.assertIsInstance(config_str, str) |
| 352 | + self.assertIn("3", config_str) # Should contain replica_num |
| 353 | + |
| 354 | +if __name__ == '__main__': |
| 355 | + # Show which test is running; stop on first failure |
| 356 | + unittest.main(verbosity=2, failfast=True) |
0 commit comments