|
| 1 | +import os |
| 2 | +import re |
| 3 | +import ast |
| 4 | + |
| 5 | +from avocado.utils import process |
| 6 | + |
| 7 | +from virttest import data_dir |
| 8 | +from virttest import libvirt_version |
| 9 | +from virttest import virsh |
| 10 | +from virttest.libvirt_xml import vm_xml |
| 11 | +from virttest.utils_test import libvirt |
| 12 | + |
| 13 | +from provider.snapshot import snapshot_base |
| 14 | +from provider.virtual_disk import disk_base |
| 15 | + |
| 16 | + |
| 17 | +virsh_dargs = {"debug": True, "ignore_status": False} |
| 18 | + |
| 19 | +def prepare_guest(params, test, disk_obj): |
| 20 | + """ |
| 21 | + Prepare guest with additional disks. |
| 22 | +
|
| 23 | + :param params: dict, test parameters |
| 24 | + :param test: test object |
| 25 | + :param disk_obj: disk object for adding disks to VM |
| 26 | + :return: dict of disk paths {disk_name: disk_path} |
| 27 | + """ |
| 28 | + image_size = params.get("image_size") |
| 29 | + disk_type = params.get("disk_type", "file") |
| 30 | + disk_list = ast.literal_eval(params.get("disk_list", "[]")) |
| 31 | + vm_name = params.get("main_vm") |
| 32 | + disk_paths = {} |
| 33 | + |
| 34 | + vm_xml_obj = vm_xml.VMXML.new_from_dumpxml(vm_name) |
| 35 | + disk_sources = disk_base.DiskBase.get_source_list(vm_xml_obj, "file", "vda") |
| 36 | + disk_paths["vda"] = disk_sources[0] if disk_sources else None |
| 37 | + disk_paths["vdb"] = os.path.join(data_dir.get_tmp_dir(), "test1.qcow2") |
| 38 | + disk_paths["vdc"] = os.path.join(data_dir.get_tmp_dir(), "test2.qcow2") |
| 39 | + |
| 40 | + for disk_name in ["vdb", "vdc"]: |
| 41 | + img_path = disk_paths[disk_name] |
| 42 | + libvirt.create_local_disk(disk_type, path=img_path, size=image_size, disk_format="qcow2") |
| 43 | + test.log.debug(f"Created image {img_path} with size {image_size}") |
| 44 | + |
| 45 | + for target_disk in disk_list: |
| 46 | + if target_disk == "vda": |
| 47 | + continue |
| 48 | + disk_dict = ast.literal_eval(params.get("disk_dict") % target_disk) |
| 49 | + disk_obj.add_vm_disk(disk_type, disk_dict, disk_paths[target_disk]) |
| 50 | + |
| 51 | + return disk_paths |
| 52 | + |
| 53 | + |
| 54 | +def create_snapshots(params, test, disk_paths): |
| 55 | + """ |
| 56 | + Create manual snapshots with diskspec options. |
| 57 | +
|
| 58 | + :param params: dict, test parameters |
| 59 | + :param test: test object |
| 60 | + :param disk_paths: dict of disk paths {disk_name: disk_path} |
| 61 | + """ |
| 62 | + test.log.info("TEST_STEP3: Create snapshots based on the snapshot command.") |
| 63 | + vm_name = params.get("main_vm") |
| 64 | + snap_name = params.get("snap_name") |
| 65 | + snap_options = params.get("snap_options") |
| 66 | + diskspec_options = params.get("diskspec_options") |
| 67 | + |
| 68 | + options_parts = [snap_name, snap_options, diskspec_options] |
| 69 | + full_snap_options = " ".join(part for part in options_parts if part) |
| 70 | + virsh.snapshot_create_as(vm_name, full_snap_options, **virsh_dargs) |
| 71 | + test.log.debug(f"Created snapshot {snap_name} with options: {full_snap_options}") |
| 72 | + |
| 73 | + |
| 74 | +def check_bitmap_auto_flag(result, checkpoint_name): |
| 75 | + """ |
| 76 | + Check if bitmap has auto flag in qemu-img info output. |
| 77 | +
|
| 78 | + :param result: qemu-img info output string |
| 79 | + :param checkpoint_name: name of the checkpoint to check |
| 80 | + :return: True if auto flag found, False otherwise |
| 81 | + """ |
| 82 | + lines = result.split('\n') |
| 83 | + in_bitmaps_section = False |
| 84 | + current_bitmap_has_auto = False |
| 85 | + current_bitmap_name = None |
| 86 | + |
| 87 | + for line in lines: |
| 88 | + line = line.strip() |
| 89 | + |
| 90 | + # Check if we're in the bitmaps section |
| 91 | + if line == "bitmaps:": |
| 92 | + in_bitmaps_section = True |
| 93 | + continue |
| 94 | + |
| 95 | + # If not in bitmaps section, skip |
| 96 | + if not in_bitmaps_section: |
| 97 | + continue |
| 98 | + |
| 99 | + # Check for start of a new bitmap entry |
| 100 | + if re.match(r'^\[\d+\]:$', line): |
| 101 | + # Reset for new bitmap |
| 102 | + current_bitmap_has_auto = False |
| 103 | + current_bitmap_name = None |
| 104 | + # Check for auto flag |
| 105 | + elif "auto" in line: |
| 106 | + current_bitmap_has_auto = True |
| 107 | + # Check for bitmap name |
| 108 | + elif line.startswith("name:"): |
| 109 | + name_value = line.split("name:", 1)[1].strip() |
| 110 | + current_bitmap_name = name_value |
| 111 | + # If this is the target checkpoint and it has auto flag, return True |
| 112 | + if current_bitmap_name == checkpoint_name and current_bitmap_has_auto: |
| 113 | + return True |
| 114 | + return False |
| 115 | + |
| 116 | + |
| 117 | +def check_image_info(params, test, disk_list, disk_paths): |
| 118 | + """ |
| 119 | + Check test results for bitmap flags in qemu images. |
| 120 | +
|
| 121 | + Expected results: |
| 122 | + - only_one_manual=yes: Only for vdb and vdc, the bitmaps flags are auto in the result of qemu-img info |
| 123 | + - with_multiple_manual=yes: For vda, vdb and vdc, the bitmaps flags are auto in the result of qemu-img info |
| 124 | +
|
| 125 | + :param params: dict, test parameters |
| 126 | + :param test: test object |
| 127 | + :param disk_list: list of disk names (used as fallback if test variant not determined) |
| 128 | + :param disk_paths: dict of disk paths {disk_name: disk_path} |
| 129 | + """ |
| 130 | + checkpoint_name = params.get("checkpoint_name") |
| 131 | + only_one_manual = params.get("only_one_manual", "no") == "yes" |
| 132 | + with_multiple_manual = params.get("with_multiple_manual", "no") == "yes" |
| 133 | + |
| 134 | + # Determine expected disks with auto bitmap flags based on test variant |
| 135 | + if only_one_manual: |
| 136 | + expected_auto_disks = disk_list[1:] |
| 137 | + elif with_multiple_manual: |
| 138 | + expected_auto_disks = disk_list |
| 139 | + else: |
| 140 | + expected_auto_disks = [] |
| 141 | + |
| 142 | + for disk in expected_auto_disks: |
| 143 | + img_path = disk_paths.get(disk) |
| 144 | + if not img_path: |
| 145 | + test.fail(f"Could not find path for disk {disk} in disk_paths") |
| 146 | + cmd = f"qemu-img info {img_path} -U" |
| 147 | + result = process.run(cmd, ignore_status=False).stdout_text |
| 148 | + test.log.debug(f"Image info for {disk} ({img_path}): {result}") |
| 149 | + |
| 150 | + bitmap_found = checkpoint_name in result |
| 151 | + if not bitmap_found: |
| 152 | + test.fail(f"Expected checkpoint {checkpoint_name} in image info for disk {disk}, but not found") |
| 153 | + |
| 154 | + auto_flag_found = check_bitmap_auto_flag(result, checkpoint_name) |
| 155 | + if not auto_flag_found: |
| 156 | + test.fail(f"Expected 'auto' flag in bitmap for disk {disk}, but not found") |
| 157 | + else: |
| 158 | + test.log.info(f"For disk {disk}: Found expected 'auto' flag in bitmap") |
| 159 | + |
| 160 | + test.log.info("Test results validation completed successfully") |
| 161 | + |
| 162 | + |
| 163 | +def check_test_result(params, test, disk_paths): |
| 164 | + """ |
| 165 | + Check test results including bitmap flags and VM state transitions. |
| 166 | +
|
| 167 | + :param params: dict, test parameters |
| 168 | + :param test: test object |
| 169 | + :param disk_paths: dict of disk paths {disk_name: disk_path} |
| 170 | + """ |
| 171 | + test.log.info("TEST_STEP4: Check the test results for bitmap flags.") |
| 172 | + vm_name = params.get("main_vm") |
| 173 | + disk_list = ast.literal_eval(params.get("disk_list", "[]")) |
| 174 | + |
| 175 | + check_image_info(params, test, disk_list, disk_paths) |
| 176 | + test.log.info("TEST_STEP5: Check the guest status (should be paused).") |
| 177 | + domstate = virsh.domstate(vm_name, **virsh_dargs) |
| 178 | + if "paused" not in domstate.stdout.lower(): |
| 179 | + test.fail(f"Expected VM to be paused after snapshot, but got state: {domstate.stdout}") |
| 180 | + test.log.debug("VM is correctly paused after snapshot creation") |
| 181 | + |
| 182 | + test.log.info("TEST_STEP6: Resume the guest and check the status again.") |
| 183 | + virsh.resume(vm_name, **virsh_dargs) |
| 184 | + |
| 185 | + domstate = virsh.domstate(vm_name, **virsh_dargs) |
| 186 | + if "running" not in domstate.stdout.lower(): |
| 187 | + test.fail(f"Expected VM to be running after resume, but got state: {domstate.stdout}") |
| 188 | + test.log.debug("VM is correctly running after resume") |
| 189 | + |
| 190 | + |
| 191 | +def run(test, params, env): |
| 192 | + """ |
| 193 | + Create manual snapshot test. |
| 194 | + """ |
| 195 | + def run_test(): |
| 196 | + """ |
| 197 | + Create manual snapshot test. |
| 198 | + """ |
| 199 | + nonlocal disk_paths |
| 200 | + test.log.info("TEST_STEP1: Create 2 new images and start guest with them.") |
| 201 | + disk_paths = prepare_guest(params, test, disk_obj) |
| 202 | + if not vm.is_alive(): |
| 203 | + virsh.start(vm_name, **virsh_dargs) |
| 204 | + vm.wait_for_login().close() |
| 205 | + |
| 206 | + test.log.info("TEST_STEP2: Create a checkpoint and restart guest.") |
| 207 | + virsh.checkpoint_create_as(vm_name, checkpoint_name, **virsh_dargs) |
| 208 | + test.log.debug(f"Created checkpoint {checkpoint_name}") |
| 209 | + |
| 210 | + virsh.destroy(vm_name, **virsh_dargs) |
| 211 | + virsh.start(vm_name, **virsh_dargs) |
| 212 | + vm.wait_for_login().close() |
| 213 | + |
| 214 | + create_snapshots(params, test, disk_paths) |
| 215 | + check_test_result(params, test, disk_paths) |
| 216 | + |
| 217 | + def teardown_test(): |
| 218 | + """ |
| 219 | + Clean data. |
| 220 | + """ |
| 221 | + test.log.info("TEST_TEARDOWN: Clean up env.") |
| 222 | + try: |
| 223 | + checkpoints = virsh.checkpoint_list(vm_name) |
| 224 | + if checkpoint_name in checkpoints.stdout: |
| 225 | + virsh.checkpoint_delete(vm_name, checkpoint_name) |
| 226 | + except Exception as e: |
| 227 | + test.log.debug(f"Error cleaning checkpoint: {e}") |
| 228 | + |
| 229 | + test_disks = disk_list[1:] |
| 230 | + for disk_name in test_disks: |
| 231 | + img_path = disk_paths.get(disk_name) |
| 232 | + if img_path and os.path.exists(img_path): |
| 233 | + try: |
| 234 | + os.remove(img_path) |
| 235 | + except Exception as e: |
| 236 | + test.log.debug(f"Error removing image {img_path}: {e}") |
| 237 | + test_obj.teardown_test() |
| 238 | + |
| 239 | + libvirt_version.is_libvirt_feature_supported(params) |
| 240 | + vm_name = params.get("main_vm") |
| 241 | + checkpoint_name = params.get("checkpoint_name") |
| 242 | + snap_name = params.get("snap_name") |
| 243 | + disk_list = ast.literal_eval(params.get("disk_list", "[]")) |
| 244 | + original_xml = vm_xml.VMXML.new_from_inactive_dumpxml(vm_name) |
| 245 | + params['backup_vmxml'] = original_xml.copy() |
| 246 | + vm = env.get_vm(vm_name) |
| 247 | + |
| 248 | + test_obj = snapshot_base.SnapshotTest(vm, test, params) |
| 249 | + disk_obj = disk_base.DiskBase(test, vm, params) |
| 250 | + disk_paths = {} |
| 251 | + |
| 252 | + try: |
| 253 | + run_test() |
| 254 | + |
| 255 | + finally: |
| 256 | + teardown_test() |
0 commit comments