====== Quick project: File deduplication [python] ====== {{tag>dev python}} Some time ago my wife just killed the SD card in her phone somehow [//Don't ask me **how** of course// lol]. So, at that time I was trying to recover the family pictures on that card, and I used two different options: one with a tool that I don't remember with a GUI, like recurva or something ? And another option on linux reading raw data with another tool that I don't remember either [//Jesus... not sure we can actually call that "a memory" what I have here...// 😁]. ====== ====== And those 2 options gave me different results: * On 1 side less files recovered, but with some names on them * On the other side more files recovered, but just with crazy number names and incorrect creation times. So here I am now: I need to reconsiliate those 2 sets of photos and remove the duplicates, and I don't feel like doing this my hand. So let's check what we can do about it with our good old python friend! 😃 I currently have 2 folders for the pictures: I could place them in the same folder, but to stay generic, let's just keep them separated. So, first step: I start with creating a new minimal component using my [[public:projects:nervland:0010_nvland_management_project|NervProj framework]] (outside of that project in fact, just loaded dynamically as a plugin) """FileDedup handling component""" import logging from nvp.nvp_component import NVPComponent from nvp.nvp_context import NVPContext from nvp.nvp_project import NVPProject logger = logging.getLogger(__name__) class FileDedup(NVPComponent): """FileDedup component class""" def __init__(self, ctx: NVPContext, _proj: NVPProject): """Component constructor""" NVPComponent.__init__(self, ctx) desc = { "dedup-files": None, } ctx.define_subparsers("main", desc) psr = ctx.get_parser('main.dedup-files') psr.add_argument("--input", dest="input_folder", type=str, help="Input folder to start the search for duplicates.") def process_command(self, cmd0): """Re-implementation of process_command""" if cmd0 != 'dedup-files': return False settings = self.ctx.get_settings() input_dir = settings.get("input_folder", None) if input_dir is None: input_dir = self.get_cwd() self.search_duplicates(input_dir) return True def search_duplicates(self, input_dir): """Search for duplicate files.""" logger.info("Should search for duplicate files in %s", input_dir) And then I just need to write the search_duplicates method itself: def search_duplicates(self, input_dir): """Search for duplicate files.""" logger.info("Should search for duplicate files in %s", input_dir) # get all the files in that input directory: all_files = self.get_all_files(input_dir) # iterate on each file to generate an hash: hashes = {} for fname in all_files: file_path = self.get_path(input_dir, fname) fhh = self.compute_file_hash(file_path) if fhh in hashes: hashes[fhh].append(fname) else: hashes[fhh] = [fname] # Keep only the hashes with 2 or more files: dups = [] for _, files in hashes.items(): if len(files) > 1: dups.append(files) # Write that list of duplicate into file: if len(dups) > 1: dup_file = self.get_path(input_dir, "duplicates.json") self.write_json(dups, dup_file) logger.info("Found duplicates: %s", self.pretty_print(dups)) logger.info("Found %d duplicates.", len(dups)) else: logger.info("No duplicate found.") In this method the only important sub functions are ''get_all_files'' and ''compute_file_hash'', which are respectively defined as follow (in the base **NVPObject** class, so this part is available in the NervProj project on github): def get_all_files(self, folder, exp=".*", recursive=False): """Get all the files matching a given pattern in a folder.""" # prepare a pattern: p = re.compile(exp) num = len(folder)+1 if recursive: # Retrieve all files in a given folder recursively res = [] # logDEBUG("Searching for files in %s" % folder) for root, _directories, filenames in os.walk(folder): # for directory in directories: # print os.path.join(root, directory) for filename in filenames: fname = os.path.join(root, filename) if (os.path.isfile(fname) and p.search(fname) is not None): # logDEBUG("Found file: %s" % fname) # We should remove the foldre prefix here: res.append(fname[num:]) return res else: return [f for f in os.listdir(folder) if (os.path.isfile(os.path.join(folder, f)) and p.search(f) is not None)] And: def compute_file_hash(self, fpath, blocksize=65536): """Compute the hash for a given file path""" # cf. https://www.programcreek.com/python/example/111324/xxhash.xxh64 hasher = xxhash.xxh64() with open(fpath, 'rb') as file: buf = file.read(blocksize) # otherwise hash the entire file while len(buf) > 0: hasher.update(buf) buf = file.read(blocksize) return hasher.intdigest() And this is it, then I tried to run that command, and disappointingly, I didn't get any duplicate reported at all: kenshin@Saturn /cygdrive/d/Temp/Photos/Flo $ nvp dedup-files 2022/04/21 20:37:12 [components.file_dedup] INFO: Should search for duplicate files in D:\Temp\Photos\Flo 2022/04/21 20:37:12 [components.file_dedup] INFO: No duplicate found. But let's just try to manually add a duplicate to ensure it will be found... **wooops** lol not good: I explicitly duplicated a couple of files and still no duplicate found ? 🤔 Ahhh! I know... I always forget about that: I need to explicitly enable recursion in get_all_files() of course. => Okay, good! so now using the call ''all_files = self.get_all_files(input_dir, recursive=True)'', and with that, I'm now able to find a lot of duplications: ['recup_flo_03_2021\\10914592.jpg', 'recup_flo_03_2021\\13028032.jpg'], ['recup_flo_03_2021\\10922368.jpg', 'recup_flo_03_2021\\13035808.jpg'], ['recup_flo_03_2021\\10930592.jpg', 'recup_flo_03_2021\\13044032.jpg'], ['recup_flo_03_2021\\10939072.jpg', 'recup_flo_03_2021\\13052512.jpg'], ['recup_flo_03_2021\\10947328.jpg', 'recup_flo_03_2021\\13060768.jpg'], ['recup_flo_03_2021\\10955200.jpg', 'recup_flo_03_2021\\13068640.jpg'], ['recup_flo_03_2021\\10964128.jpg', 'recup_flo_03_2021\\13077568.jpg'], ['recup_flo_03_2021\\10972224.jpg', 'recup_flo_03_2021\\13085664.jpg'], ['recup_flo_03_2021\\10974624.jpg', 'recup_flo_03_2021\\13088064.jpg'], ['recup_flo_03_2021\\10976768.jpg', 'recup_flo_03_2021\\13090208.jpg'], ['recup_flo_03_2021\\10984352.jpg', 'recup_flo_03_2021\\13097792.jpg'], ['recup_flo_03_2021\\10992128.jpg', 'recup_flo_03_2021\\13105568.jpg'], ['recup_flo_03_2021\\10996128.jpg', 'recup_flo_03_2021\\13109568.jpg']] 2022/04/21 20:45:36 [components.file_dedup] INFO: Found 331 duplicates. Let's check the generated **duplicates.json** file. Hmmm, okay, so, some times I can have 3 times the same image data in a single folder, like that: [ "recup_flo_03_2021\\04963744.jpg", "recup_flo_03_2021\\08526304.jpg", "recup_flo_03_2021\\08541984.jpg" ], Or I can have up to 4 copies from 2 separate folders, like that: [ "photos\\IMG_20190712_103534 - Copy.jpg", "photos\\IMG_20190712_103534.jpg", "recup_flo_03_2021\\04559904.jpg", "recup_flo_03_2021\\04569568.jpg" ], So now, let's add support to resolve those duplications automatically: * We should give priority to the images in the "photos" folder here => in fact I should rather just rename the folders to ensure the "photos" folder comes first alphabetically. * And when inside a given folder only we will just sort the filenames alphabetically too. * => So I simply need to sort the list of files with the same hash and keep only the first one. And here is the update version of the search_duplicate method: def search_duplicates(self, input_dir): """Search for duplicate files.""" logger.info("Should search for duplicate files in %s", input_dir) # get all the files in that input directory: all_files = self.get_all_files(input_dir, recursive=True) resolve = self.get_param("resolve", False) # iterate on each file to generate an hash: hashes = {} for fname in all_files: file_path = self.get_path(input_dir, fname) fhh = self.compute_file_hash(file_path) # Check if this file is a simple copy and delete it in that case: parts = os.path.splitext(file_path) if resolve and parts[0].endswith(" - Copy"): file_path2 = parts[0][:-7]+parts[1] if self.file_exists(file_path2): fhh2 = self.compute_file_hash(file_path2) if fhh == fhh2: logger.info("Removing simple copy: %s", file_path) self.remove_file(file_path) continue if fhh in hashes: hashes[fhh].append(fname) else: hashes[fhh] = [fname] # Keep only the hashes with 2 or more files: dups = [] for _, files in hashes.items(): if len(files) > 1: dups.append(files) # Write that list of duplicate into file: if len(dups) > 1: dup_file = self.get_path(input_dir, "duplicates.json") if resolve: # Try to resolve the duplications: for flist in dups: flist.sort() nfiles = len(flist) for i in range(1, nfiles): file_path = self.get_path(input_dir, flist[i]) logger.info("Removing file %s", file_path) # self.rename_file(file_path, file_path+".dup") self.remove_file(file_path) self.remove_file(dup_file) else: self.write_json(dups, dup_file) logger.info("Found duplicates: %s", self.pretty_print(dups)) logger.info("Found %d duplicates.", len(dups)) else: logger.info("No duplicate found.") **Note**: There is an addition parameter retrieved now with ''resolve = self.get_param("resolve", False)'': This comes directly from our command line where we can provide the "resolve" parameter thanks to this new code in the component constructor: psr = ctx.get_parser('main.dedup-files') psr.add_argument("--input", dest="input_folder", type=str, help="Input folder to start the search for duplicates.") psr.add_argument("-r", "--resolve", dest="resolve", action="store_true", help="Resolve the duplications.") => And now we are good! I could successfully remove the duplicates, and start classifying the remaining pictures. As a reminder to myself, the command line to use from a given folder where all the duplicates should be removed is for instance: $ nvp dedup-files -r ===== Bonus content: organizing pictures using the shot time from the EXIF data ===== For the recovered pictures with no date in the filenames anymore I finally realized I could still retrieve the shot time from the jpeg EXIF data: so now I will try to collect that data to organize the images in YYYY/Month subfolders and rename them to contain that shot time. => So I created another simple component to handle that task: """PictureHandler handling component""" import logging import PIL from datetime import datetime from nvp.nvp_component import NVPComponent from nvp.nvp_context import NVPContext from nvp.nvp_project import NVPProject logger = logging.getLogger(__name__) class PictureHandler(NVPComponent): """PictureHandler component class""" def __init__(self, ctx: NVPContext, _proj: NVPProject): """Component constructor""" NVPComponent.__init__(self, ctx) desc = { "classify-pics": None, } ctx.define_subparsers("main", desc) psr = ctx.get_parser('main.classify-pics') psr.add_argument("--input", dest="input_folder", type=str, help="Input folder to start the search for duplicates.") psr.add_argument("-f", "--folder", dest="sub_folder", type=str, help="Sub folder where to put the images in each YYYY/Month folder") psr.add_argument("-r", "--rename", dest="rename", action="store_true", help="Rename the picture files.") def process_command(self, cmd0): """Re-implementation of process_command""" if cmd0 != 'classify-pics': return False settings = self.ctx.get_settings() input_dir = settings.get("input_folder", None) if input_dir is None: input_dir = self.get_cwd() self.classify_pics(input_dir) return True def classify_pics(self, input_dir): """Search for duplicate files.""" logger.info("Should search for duplicate files in %s", input_dir) # get all the files in that input directory: all_files = self.get_all_files(input_dir, recursive=False) rename = self.get_param("rename", False) subfolder = self.get_param("sub_folder", None) months = ["Janvier", "Fevrier", "Mars", "Avril", "Mai", "Juin", "Juillet", "Aout", "Septembre", "Octobre", "Novembre", "Decembre"] for fname in all_files: file_path = self.get_path(input_dir, fname) # Open that file: img = PIL.Image.open(file_path) exif = img.getexif() img.close() # ctime = exif.get(36867) ctime = exif.get(306) if ctime is not None: logger.info("Found creation time: %s", ctime) cdate = datetime.strptime(ctime, '%Y:%m:%d %H:%M:%S') year = cdate.year month = months[cdate.month-1] dest_dir = self.get_path(input_dir, str(year), month) if subfolder is not None: dest_dir = self.get_path(dest_dir, subfolder) self.make_folder(dest_dir) if rename: filename = f"{cdate.year:04d}{cdate.month:02d}{cdate.day:02d}_{cdate.hour:02d}{cdate.minute:02d}{cdate.second:02d}.jpg" else: filename = fname dest_file = self.get_path(dest_dir, filename) self.rename_file(file_path, dest_file) else: logger.warning("No EXIF creation time in %s, available data: %s", file_path, exif) # logger.warning("EXIF data: %s", exif) On the web I found that the EXIF slot that should be used to retrieve the creation time should be **36867** but in fact this didn't work for me: and printing the content of the "exif" dictionary above, I realized I should use the slot key **306** instead: not quite sure why (I didn't look any further 😊): maybe that an old EXIF version or some customization or something: never mind, it's working at least 👍 And again, as a reminder to myself, the command line I used in my case here was: $ nvp classify-pics -r -f "Taken by Flo" => And now let's say this is it. The idea was to keep it short and quick anyway 😅