Coverage for diffoscope/comparators/fsimage.py: 81%

84 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 13:38 +0000

1# 

2# diffoscope: in-depth comparison of files, archives, and directories 

3# 

4# Copyright © 2015 Reiner Herrmann <reiner@reiner-h.de> 

5# Copyright © 2016-2021, 2023 Chris Lamb <lamby@debian.org> 

6# 

7# diffoscope is free software: you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation, either version 3 of the License, or 

10# (at your option) any later version. 

11# 

12# diffoscope is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License 

18# along with diffoscope. If not, see <https://www.gnu.org/licenses/>. 

19 

20import re 

21import logging 

22import os.path 

23 

24from diffoscope.difference import Difference 

25from diffoscope.tools import python_module_missing 

26from diffoscope.profiling import profile 

27from diffoscope.tools import get_comment_for_missing_python_module 

28 

29from .utils.file import File 

30from .utils.archive import Archive 

31 

32try: 

33 import guestfs 

34except ImportError: 

35 python_module_missing("guestfs") 

36 guestfs = None 

37 

38logger = logging.getLogger(__name__) 

39 

40 

41class FsImageContainer(Archive): 

42 def open_archive(self): 

43 if not guestfs: 

44 return None 

45 

46 self.g = guestfs.GuestFS(python_return_dict=True) 

47 try: 

48 # force a check that LIBGUESTFS_CACHEDIR exists. otherwise guestfs 

49 # will fall back to /var/tmp, which we don't want 

50 self.g.set_cachedir(os.environ["LIBGUESTFS_CACHEDIR"]) 

51 except KeyError: 

52 pass 

53 self.g.add_drive_opts(self.source.path, format="raw", readonly=1) 

54 try: 

55 logger.debug("Launching guestfs; this may take some time") 

56 with profile("command", "guestfs"): 

57 self.g.launch() 

58 logger.debug("guestfs successfully launched") 

59 except RuntimeError: 

60 logger.exception("guestfs failed to launch") 

61 logger.error( 

62 "If memory is too tight for 512 MiB, try running " 

63 "with LIBGUESTFS_MEMSIZE=256 or lower." 

64 ) 

65 return None 

66 devices = self.g.list_devices() 

67 try: 

68 self.g.mount_options("ro", devices[0], "/") 

69 except RuntimeError: 

70 logger.exception("guestfs count not mount image; invalid file?") 

71 return None 

72 self.fs = self.g.list_filesystems()[devices[0]] 

73 return self 

74 

75 def close_archive(self): 

76 if not guestfs: 

77 return None 

78 

79 try: 

80 self.g.umount_all() 

81 self.g.close() 

82 except Exception: # noqa 

83 pass 

84 

85 def get_member_names(self): 

86 if not guestfs: 

87 return [] 

88 return [os.path.basename(self.source.path) + ".tar"] 

89 

90 def extract(self, member_name, dest_dir): 

91 dest_path = os.path.join(dest_dir, member_name) 

92 logger.debug("filesystem image extracting to %s", dest_path) 

93 self.g.tar_out("/", dest_path) 

94 return dest_path 

95 

96 

97class FsImageFile(File): 

98 DESCRIPTION = "ext2/ext3/ext4/btrfs/fat filesystems" 

99 CONTAINER_CLASSES = [FsImageContainer] 

100 FILE_TYPE_RE = re.compile( 

101 r"^(Linux.*filesystem data|BTRFS Filesystem|F2FS filesystem).*" 

102 ) 

103 

104 @classmethod 

105 def recognizes(cls, file): 

106 # Avoid DOS / MBR file type as it generate a lot of false positives, 

107 # manually check "System identifier string" instead 

108 with open(file.path, "rb") as f: 

109 f.seek(54) 

110 if f.read(8) in (b"FAT12 ", b"FAT16 "): 

111 return True 

112 f.seek(82) 

113 if f.read(8) == b"FAT32 ": 

114 return True 

115 return super().recognizes(file) 

116 

117 def compare_details(self, other, source=None): 

118 differences = [] 

119 my_fs = "" 

120 other_fs = "" 

121 if hasattr(self.as_container, "fs"): 

122 my_fs = self.as_container.fs 

123 if hasattr(other.as_container, "fs"): 

124 other_fs = other.as_container.fs 

125 if my_fs != other_fs: 

126 differences.append( 

127 Difference.from_text( 

128 my_fs, other_fs, None, None, source="filesystem" 

129 ) 

130 ) 

131 if not guestfs: 

132 self.add_comment(get_comment_for_missing_python_module("guestfs")) 

133 return differences