====== Quick project: File deduplication [python] ======
{{tag>dev python}}
Some time ago my wife just killed the SD card in her phone somehow [//Don't ask me **how** of course// lol]. So, at that time I was trying to recover the family pictures on that card, and I used two different options: one with a tool that I don't remember with a GUI, like recurva or something ? And another option on linux reading raw data with another tool that I don't remember either [//Jesus... not sure we can actually call that "a memory" what I have here...// 😁].
====== ======
And those 2 options gave me different results:
* On 1 side less files recovered, but with some names on them
* On the other side more files recovered, but just with crazy number names and incorrect creation times.
So here I am now: I need to reconsiliate those 2 sets of photos and remove the duplicates, and I don't feel like doing this my hand. So let's check what we can do about it with our good old python friend! 😃
I currently have 2 folders for the pictures: I could place them in the same folder, but to stay generic, let's just keep them separated.
So, first step: I start with creating a new minimal component using my [[public:projects:nervland:0010_nvland_management_project|NervProj framework]] (outside of that project in fact, just loaded dynamically as a plugin)
"""FileDedup handling component"""
import logging
from nvp.nvp_component import NVPComponent
from nvp.nvp_context import NVPContext
from nvp.nvp_project import NVPProject
logger = logging.getLogger(__name__)
class FileDedup(NVPComponent):
"""FileDedup component class"""
def __init__(self, ctx: NVPContext, _proj: NVPProject):
"""Component constructor"""
NVPComponent.__init__(self, ctx)
desc = {
"dedup-files": None,
}
ctx.define_subparsers("main", desc)
psr = ctx.get_parser('main.dedup-files')
psr.add_argument("--input", dest="input_folder", type=str,
help="Input folder to start the search for duplicates.")
def process_command(self, cmd0):
"""Re-implementation of process_command"""
if cmd0 != 'dedup-files':
return False
settings = self.ctx.get_settings()
input_dir = settings.get("input_folder", None)
if input_dir is None:
input_dir = self.get_cwd()
self.search_duplicates(input_dir)
return True
def search_duplicates(self, input_dir):
"""Search for duplicate files."""
logger.info("Should search for duplicate files in %s", input_dir)
And then I just need to write the search_duplicates method itself: def search_duplicates(self, input_dir):
"""Search for duplicate files."""
logger.info("Should search for duplicate files in %s", input_dir)
# get all the files in that input directory:
all_files = self.get_all_files(input_dir)
# iterate on each file to generate an hash:
hashes = {}
for fname in all_files:
file_path = self.get_path(input_dir, fname)
fhh = self.compute_file_hash(file_path)
if fhh in hashes:
hashes[fhh].append(fname)
else:
hashes[fhh] = [fname]
# Keep only the hashes with 2 or more files:
dups = []
for _, files in hashes.items():
if len(files) > 1:
dups.append(files)
# Write that list of duplicate into file:
if len(dups) > 1:
dup_file = self.get_path(input_dir, "duplicates.json")
self.write_json(dups, dup_file)
logger.info("Found duplicates: %s", self.pretty_print(dups))
logger.info("Found %d duplicates.", len(dups))
else:
logger.info("No duplicate found.")
In this method the only important sub functions are ''get_all_files'' and ''compute_file_hash'', which are respectively defined as follow (in the base **NVPObject** class, so this part is available in the NervProj project on github):
def get_all_files(self, folder, exp=".*", recursive=False):
"""Get all the files matching a given pattern in a folder."""
# prepare a pattern:
p = re.compile(exp)
num = len(folder)+1
if recursive:
# Retrieve all files in a given folder recursively
res = []
# logDEBUG("Searching for files in %s" % folder)
for root, _directories, filenames in os.walk(folder):
# for directory in directories:
# print os.path.join(root, directory)
for filename in filenames:
fname = os.path.join(root, filename)
if (os.path.isfile(fname) and p.search(fname) is not None):
# logDEBUG("Found file: %s" % fname)
# We should remove the foldre prefix here:
res.append(fname[num:])
return res
else:
return [f for f in os.listdir(folder) if (os.path.isfile(os.path.join(folder, f)) and p.search(f) is not None)]
And: def compute_file_hash(self, fpath, blocksize=65536):
"""Compute the hash for a given file path"""
# cf. https://www.programcreek.com/python/example/111324/xxhash.xxh64
hasher = xxhash.xxh64()
with open(fpath, 'rb') as file:
buf = file.read(blocksize)
# otherwise hash the entire file
while len(buf) > 0:
hasher.update(buf)
buf = file.read(blocksize)
return hasher.intdigest()
And this is it, then I tried to run that command, and disappointingly, I didn't get any duplicate reported at all: kenshin@Saturn /cygdrive/d/Temp/Photos/Flo
$ nvp dedup-files
2022/04/21 20:37:12 [components.file_dedup] INFO: Should search for duplicate files in D:\Temp\Photos\Flo
2022/04/21 20:37:12 [components.file_dedup] INFO: No duplicate found.
But let's just try to manually add a duplicate to ensure it will be found... **wooops** lol not good: I explicitly duplicated a couple of files and still no duplicate found ? 🤔
Ahhh! I know... I always forget about that: I need to explicitly enable recursion in get_all_files() of course.
=> Okay, good! so now using the call ''all_files = self.get_all_files(input_dir, recursive=True)'', and with that, I'm now able to find a lot of duplications: ['recup_flo_03_2021\\10914592.jpg', 'recup_flo_03_2021\\13028032.jpg'],
['recup_flo_03_2021\\10922368.jpg', 'recup_flo_03_2021\\13035808.jpg'],
['recup_flo_03_2021\\10930592.jpg', 'recup_flo_03_2021\\13044032.jpg'],
['recup_flo_03_2021\\10939072.jpg', 'recup_flo_03_2021\\13052512.jpg'],
['recup_flo_03_2021\\10947328.jpg', 'recup_flo_03_2021\\13060768.jpg'],
['recup_flo_03_2021\\10955200.jpg', 'recup_flo_03_2021\\13068640.jpg'],
['recup_flo_03_2021\\10964128.jpg', 'recup_flo_03_2021\\13077568.jpg'],
['recup_flo_03_2021\\10972224.jpg', 'recup_flo_03_2021\\13085664.jpg'],
['recup_flo_03_2021\\10974624.jpg', 'recup_flo_03_2021\\13088064.jpg'],
['recup_flo_03_2021\\10976768.jpg', 'recup_flo_03_2021\\13090208.jpg'],
['recup_flo_03_2021\\10984352.jpg', 'recup_flo_03_2021\\13097792.jpg'],
['recup_flo_03_2021\\10992128.jpg', 'recup_flo_03_2021\\13105568.jpg'],
['recup_flo_03_2021\\10996128.jpg', 'recup_flo_03_2021\\13109568.jpg']]
2022/04/21 20:45:36 [components.file_dedup] INFO: Found 331 duplicates.
Let's check the generated **duplicates.json** file. Hmmm, okay, so, some times I can have 3 times the same image data in a single folder, like that: [
"recup_flo_03_2021\\04963744.jpg",
"recup_flo_03_2021\\08526304.jpg",
"recup_flo_03_2021\\08541984.jpg"
],
Or I can have up to 4 copies from 2 separate folders, like that: [
"photos\\IMG_20190712_103534 - Copy.jpg",
"photos\\IMG_20190712_103534.jpg",
"recup_flo_03_2021\\04559904.jpg",
"recup_flo_03_2021\\04569568.jpg"
],
So now, let's add support to resolve those duplications automatically:
* We should give priority to the images in the "photos" folder here => in fact I should rather just rename the folders to ensure the "photos" folder comes first alphabetically.
* And when inside a given folder only we will just sort the filenames alphabetically too.
* => So I simply need to sort the list of files with the same hash and keep only the first one.
And here is the update version of the search_duplicate method: def search_duplicates(self, input_dir):
"""Search for duplicate files."""
logger.info("Should search for duplicate files in %s", input_dir)
# get all the files in that input directory:
all_files = self.get_all_files(input_dir, recursive=True)
resolve = self.get_param("resolve", False)
# iterate on each file to generate an hash:
hashes = {}
for fname in all_files:
file_path = self.get_path(input_dir, fname)
fhh = self.compute_file_hash(file_path)
# Check if this file is a simple copy and delete it in that case:
parts = os.path.splitext(file_path)
if resolve and parts[0].endswith(" - Copy"):
file_path2 = parts[0][:-7]+parts[1]
if self.file_exists(file_path2):
fhh2 = self.compute_file_hash(file_path2)
if fhh == fhh2:
logger.info("Removing simple copy: %s", file_path)
self.remove_file(file_path)
continue
if fhh in hashes:
hashes[fhh].append(fname)
else:
hashes[fhh] = [fname]
# Keep only the hashes with 2 or more files:
dups = []
for _, files in hashes.items():
if len(files) > 1:
dups.append(files)
# Write that list of duplicate into file:
if len(dups) > 1:
dup_file = self.get_path(input_dir, "duplicates.json")
if resolve:
# Try to resolve the duplications:
for flist in dups:
flist.sort()
nfiles = len(flist)
for i in range(1, nfiles):
file_path = self.get_path(input_dir, flist[i])
logger.info("Removing file %s", file_path)
# self.rename_file(file_path, file_path+".dup")
self.remove_file(file_path)
self.remove_file(dup_file)
else:
self.write_json(dups, dup_file)
logger.info("Found duplicates: %s", self.pretty_print(dups))
logger.info("Found %d duplicates.", len(dups))
else:
logger.info("No duplicate found.")
**Note**: There is an addition parameter retrieved now with ''resolve = self.get_param("resolve", False)'': This comes directly from our command line where we can provide the "resolve" parameter thanks to this new code in the component constructor: psr = ctx.get_parser('main.dedup-files')
psr.add_argument("--input", dest="input_folder", type=str,
help="Input folder to start the search for duplicates.")
psr.add_argument("-r", "--resolve", dest="resolve", action="store_true",
help="Resolve the duplications.")
=> And now we are good! I could successfully remove the duplicates, and start classifying the remaining pictures.
As a reminder to myself, the command line to use from a given folder where all the duplicates should be removed is for instance: $ nvp dedup-files -r
===== Bonus content: organizing pictures using the shot time from the EXIF data =====
For the recovered pictures with no date in the filenames anymore I finally realized I could still retrieve the shot time from the jpeg EXIF data: so now I will try to collect that data to organize the images in YYYY/Month subfolders and rename them to contain that shot time.
=> So I created another simple component to handle that task: """PictureHandler handling component"""
import logging
import PIL
from datetime import datetime
from nvp.nvp_component import NVPComponent
from nvp.nvp_context import NVPContext
from nvp.nvp_project import NVPProject
logger = logging.getLogger(__name__)
class PictureHandler(NVPComponent):
"""PictureHandler component class"""
def __init__(self, ctx: NVPContext, _proj: NVPProject):
"""Component constructor"""
NVPComponent.__init__(self, ctx)
desc = {
"classify-pics": None,
}
ctx.define_subparsers("main", desc)
psr = ctx.get_parser('main.classify-pics')
psr.add_argument("--input", dest="input_folder", type=str,
help="Input folder to start the search for duplicates.")
psr.add_argument("-f", "--folder", dest="sub_folder", type=str,
help="Sub folder where to put the images in each YYYY/Month folder")
psr.add_argument("-r", "--rename", dest="rename", action="store_true",
help="Rename the picture files.")
def process_command(self, cmd0):
"""Re-implementation of process_command"""
if cmd0 != 'classify-pics':
return False
settings = self.ctx.get_settings()
input_dir = settings.get("input_folder", None)
if input_dir is None:
input_dir = self.get_cwd()
self.classify_pics(input_dir)
return True
def classify_pics(self, input_dir):
"""Search for duplicate files."""
logger.info("Should search for duplicate files in %s", input_dir)
# get all the files in that input directory:
all_files = self.get_all_files(input_dir, recursive=False)
rename = self.get_param("rename", False)
subfolder = self.get_param("sub_folder", None)
months = ["Janvier", "Fevrier", "Mars", "Avril", "Mai", "Juin", "Juillet",
"Aout", "Septembre", "Octobre", "Novembre", "Decembre"]
for fname in all_files:
file_path = self.get_path(input_dir, fname)
# Open that file:
img = PIL.Image.open(file_path)
exif = img.getexif()
img.close()
# ctime = exif.get(36867)
ctime = exif.get(306)
if ctime is not None:
logger.info("Found creation time: %s", ctime)
cdate = datetime.strptime(ctime, '%Y:%m:%d %H:%M:%S')
year = cdate.year
month = months[cdate.month-1]
dest_dir = self.get_path(input_dir, str(year), month)
if subfolder is not None:
dest_dir = self.get_path(dest_dir, subfolder)
self.make_folder(dest_dir)
if rename:
filename = f"{cdate.year:04d}{cdate.month:02d}{cdate.day:02d}_{cdate.hour:02d}{cdate.minute:02d}{cdate.second:02d}.jpg"
else:
filename = fname
dest_file = self.get_path(dest_dir, filename)
self.rename_file(file_path, dest_file)
else:
logger.warning("No EXIF creation time in %s, available data: %s", file_path, exif)
# logger.warning("EXIF data: %s", exif)
On the web I found that the EXIF slot that should be used to retrieve the creation time should be **36867** but in fact this didn't work for me: and printing the content of the "exif" dictionary above, I realized I should use the slot key **306** instead: not quite sure why (I didn't look any further 😊): maybe that an old EXIF version or some customization or something: never mind, it's working at least 👍
And again, as a reminder to myself, the command line I used in my case here was: $ nvp classify-pics -r -f "Taken by Flo"
=> And now let's say this is it. The idea was to keep it short and quick anyway 😅