====== Quick project: image processing tool [python] ====== {{tag>dev python nervproj}} Morning the world! After my previous quick project article on the [[blog:2022:0510_nervproj_filtering_corrupted_gifs|Filtering of corrupted gif files]] what I want to build here is a minimal command line utility I could use to perform simple images manipulation, like resizing, cropping, applying simple effets, etc. I already started to build something in that direction in the article [[blog:2022:0423_image_overview_generation|Image overview generation]], but I need to make this a bit more generic: should still support gifs, and batch processing of images, but not really to create a single overview image at the end. So let's get to work, shall we! ====== ====== ===== Preparing the initial version of the utility component ===== * As usual, we start with the building of the skelekon for the dedicated component. Actually that's something I might consider automating someday: the creation of simple components skeleton with a simple command ;-) * So here is the initial minimal file: """Module for ImageProcessor class definition""" import logging from nvp.nvp_context import NVPContext from nvp.nvp_component import NVPComponent logger = logging.getLogger(__name__) class ImageProcessor(NVPComponent): """ImageProcessor component class""" def __init__(self, ctx: NVPContext): """class constructor""" NVPComponent.__init__(self, ctx) def process_command(self, cmd): """Check if this component can process the given command""" if cmd == 'process': return self.process_images() return False def process_images(self): """Process the input images from a given folder""" logger.info("Should apply image processing here.") return True if __name__ == "__main__": # Create the context: context = NVPContext() # Add our component: comp = context.register_component("ImageProcessor", ImageProcessor(context)) context.define_subparsers("main", { 'process': None, }) psr = context.get_parser('main.process') psr.add_argument("--output", dest="output_dir", type=str, help="Output dir where to store the valid files") psr.add_argument("--input", dest="input_dir", type=str, help="Input dir where to start the filtering") comp.run() * Here is the new script added to run this utility: "imgs": { "custom_python_env": "media_env", "cmd": "${PYTHON} ${PROJECT_ROOT_DIR}/nvh/media/image_processor.py process", "python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"] } * And this works just as expected: $ nvp run imgs 2022/05/11 07:15:14 [__main__] INFO: Should apply image processing here. ===== Collecting input files & configuring output ===== * The utility should be flexible enough to support the following behaviors: * If no input is specified, we use the current working dir as input and process all the images in there recursively * If no output is specified, we put all the resulting images in a sibling folder to the input folder * If input is an image file we process only that file * if input is a folder we may process files recursively or not. * => Basically, this is similar to the mechanism I started to build in [[blog:2022:0510_nervproj_filtering_corrupted_gifs|Filtering of corrupted gif files]]. * So here is the updated ''process_images'' method providing support for this: def process_images(self): """Process the input images from a given folder""" input_dir = self.get_param("input_dir") if input_dir is None: # Use the current working dir: input_dir = self.get_cwd() if self.is_relative_path(input_dir): # turn this into an absolute path: input_dir = self.get_path(self.get_cwd(), input_dir) output_dir = self.get_param("output_dir") if output_dir is None: # We use the parent folder of the input: folder = self.get_filename(input_dir) parent_dir = self.get_parent_folder(input_dir) output_dir = self.get_path(parent_dir, f"{folder}_filtered") if self.is_relative_path(output_dir): # turn this into an absolute path: output_dir = self.get_path(self.get_cwd(), output_dir) # Create the destination dir: self.make_folder(output_dir) # If the input is a folder, then we collect all the files in there: assert self.path_exists(input_dir), f"Invalid input provided {input_dir}" if self.file_exists(input_dir): all_files = [input_dir] else: # Collect all the files in that input folder: recurv = self.get_param("recurvise", False) all_files = self.get_all_files(input_dir, recursive=recurv) num_imgs = len(all_files) logger.info("Collected %d input images.", num_imgs) return True * => And this is finding the input files as desired: $ nvp run imgs 2022/05/11 07:27:32 [__main__] INFO: Collected 44 input images. ===== Boilerplate code to process each image file ===== * Once we have a list of input file, we can start processing them one by one, just adding this code at the end of the ''process_images'' method: for i in range(num_imgs): fname = all_files[i] if self.is_image_file(fname): logger.info("%d/%d: Processing %s...", i+1, num_imgs, fname) self.process_image(fname, input_dir, output_dir) * And then I added the following 2 helper functions referenced above: def is_image_file(self, fname): """Check if a given file is a valid image file simply checking the extension of the file""" ext = self.get_path_extension(fname) return ext.lower() in [".gif", ".jpg", ".png"] def process_image(self, fname, input_dir, output_dir): """Process a single image file.""" file_path = self.get_path(input_dir, fname) logger.info("Should process image file %s here.", file_path) * => Now we can focus on processing a single image in ''process_image'' ===== First processing stage: applying effects ===== * So let's write something useful in process_image now, starting with some image processing effects: def process_image(self, fname, input_dir, output_dir): """Process a single image file.""" file_path = self.get_path(input_dir, fname) logger.info("Should process image file %s here.", file_path) # We start with opening the image: try: img = Image.open(file_path) # Apply the orientation change as required: img = ImageProcessor.apply_exif_orientation(img) # Collect the frames from the input image: frames = [] for frame in ImageSequence.Iterator(img): frames.append(self.process_frame(frame)) # For now use the same output format as the input: ext = self.get_path_extension(fname) out_file = self.get_path(output_dir, fname) dest_folder = self.get_parent_folder(out_file) self.make_folder(dest_folder) if ext == ".gif": # Write a gif image from the frames: # Use the original frame duration: frame_dur = getattr(img, 'frame_duration', 50) frames[0].save(out_file, save_all=True, append_images=frames[1:], optimize=True, duration=frame_dur, loop=0) elif ext == ".jpg": # Save as a regular simple image: nframes = len(frames) if nframes == 1: frames[0].save(out_file, quality=90) else: out_base = self.remove_file_extension(out_file) for i in range(nframes): frames[i].save(f"{out_base}_frame{i:03d}{ext}", quality=90) except (UnidentifiedImageError, Image.DecompressionBombError) as err: logger.error("Cannot process file %s: %s", file_path, str(err)) * The method above will mostly handle processing each "frame" in a given image with the ''process_frame()'' method (we can have multiple frame in a given image if handling gifs for instance) * And for the processing itself I currently just added this: @staticmethod def apply_frame_effect(frame, effect): """Apply a given effect on a frame""" if effect == "bw": return ImageOps.grayscale(frame) if effect == "contour": return frame.filter(ImageFilter.CONTOUR) if effect == "invert": return ImageOps.invert(frame) if effect == "detail": return frame.filter(ImageFilter.DETAIL) if effect == "edge": return frame.filter(ImageFilter.EDGE_ENHANCE) if effect == "edge2": return frame.filter(ImageFilter.EDGE_ENHANCE_MORE) if effect == "find_edges": return frame.filter(ImageFilter.FIND_EDGES) if effect == "sharpen": return frame.filter(ImageFilter.SHARPEN) if effect.startswith("sketch"): # cf. https://towardsdatascience.com/generate-pencil-sketch-from-photo-in-python-7c56802d8acb # cf.https://www.askpython.com/python/examples/images-to-pencil-sketch args = effect.split(":") radius = int(args[1]) # # step 1: convert image to grayscale: # gray_img = ImageOps.grayscale(frame) # # step 2: invert the grayscale image: # inv_img = ImageOps.invert(gray_img) # # step 3: Apply gaussian blur with given radius: # inv_blur_img = inv_img.filter(ImageFilter.GaussianBlur(radius=radius)) # # step 4: Invert blur image: # blur_img = ImageOps.invert(inv_blur_img) # Convert to opencv: img = np.asarray(frame) # grey_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) invert_img = cv2.bitwise_not(grey_img) blur_img = cv2.GaussianBlur(invert_img, (radius, radius), 0) invblur_img = cv2.bitwise_not(blur_img) sketch_img = cv2.divide(grey_img, invblur_img, scale=256.0) col_img = cv2.cvtColor(sketch_img, cv2.COLOR_GRAY2RGB) # Convert back to PIL image: return Image.fromarray(col_img) logger.warning("Cannot apply unknown frame effect '%s'", effect) return frame def process_frame(self, frame): """Apply the required processing on a single image frame""" for effect in self.effects: frame = ImageProcessor.apply_frame_effect(frame, effect) # return the final frame: return frame * Then I of course had to introduce support to specify the effects on the command line: psr.add_argument("--fx", dest="effects", type=str, help="List of effects to apply on the images") * So let's give it a try on some test images: $ nvp run imgs --fx sketch:21 2022/05/11 10:55:50 [__main__] INFO: Collected 6 input images. 2022/05/11 10:55:50 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/11 10:55:50 [__main__] INFO: Should process image file D:\Temp\images\IMG_20220215_094740.jpg here. 2022/05/11 10:55:52 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/11 10:55:52 [__main__] INFO: Should process image file D:\Temp\images\IMG_20220310_214814.jpg here. 2022/05/11 10:55:54 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/11 10:55:54 [__main__] INFO: Should process image file D:\Temp\images\music-teacher-jack-black.gif here. Traceback (most recent call last): File "D:\Projects\NervHome\nvh\media\image_processor.py", line 245, in comp.run() File "D:\Projects\NervProj\nvp\nvp_component.py", line 69, in run res = self.process_command(cmd) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 28, in process_command return self.process_images() File "D:\Projects\NervHome\nvh\media\image_processor.py", line 83, in process_images self.process_image(fname, input_dir, output_dir) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 195, in process_image frames.append(self.process_frame(frame)) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 175, in process_frame frame = ImageProcessor.apply_frame_effect(frame, effect) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 158, in apply_frame_effect grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) cv2.error: OpenCV(4.5.5) d:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\color.simd_helpers.hpp:92: error: (-2:Unspecified error) in function '__cdecl cv::impl::`anonymous-namespace'::CvtHelper,struct cv: :impl::A0x7123906f::Set<1,-1,-1>,struct cv::impl::A0x7123906f::Set<0,2,5>,2>::CvtHelper(const class cv::_InputArray &,const class cv:: _OutputArray &,int)' > Invalid number of channels in input image: > 'VScn::contains(scn)' > where > 'scn' is 1 /*__*/ * Hmmm... okay so I have a problem with the number of channels apparently 🤔... * => Ok, fixed by handling correctly the case of "P" frame mode in gif files: frames = [] for frame in ImageSequence.Iterator(img): # logger.info("Frame mode: %s", frame.mode) if frame.mode == "P": frame = frame.convert("RGBA") frames.append(self.process_frame(frame)) * So now it works just fine: $ nvp run imgs --fx sketch:21 2022/05/11 11:20:37 [__main__] INFO: Collected 6 input images. 2022/05/11 11:20:37 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/11 11:20:38 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/11 11:20:39 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/11 11:20:40 [__main__] INFO: 4/6: Processing oh-hey-oh-hey-there.gif... 2022/05/11 11:20:40 [__main__] INFO: 5/6: Processing oh-hey-ryan-reynolds.gif... 2022/05/11 11:20:41 [__main__] INFO: 6/6: Processing school-of-rock-jack-black.gif... * And the results are just as I expected them to be: {{ blog:2022:0511:oh_hey.gif }} * => Cooool ✌😎! ===== Second processing stage: reducing dimensions ===== * Next common processing operation I need to support is the image dimensions reduction: most of the time, my images are too big to be usable for anything 'online' => So I need to reduce their size while also keeping the aspect ratio. * Let's try to simple add one command line argument here: the **max-size** which will be applied either on the width or the height on the image (depending on which one is the largest) * Here are the updated processing steps: def apply_max_size(self, frame, max_size): """Apply a max_size transform on the frame""" width = frame.width height = frame.height new_width = None new_height = None # logger.info("Frame size: %dx%d, maxsize: %d", width, height, max_size) if width > height and width > max_size: # We have to reduce the width: new_width = max_size new_height = int(height * max_size/width) elif height > width and height > max_size: # we have to reduce the height: new_height = max_size new_width = int(width * max_size/height) if new_width is not None and new_height is not None: logger.debug("New frame size: %dx%d", new_width, new_height) frame = frame.resize((new_width, new_height)) return frame def process_frame(self, frame): """Apply the required processing on a single image frame""" # Check if we need to resize that frame: if self.max_size is not None: frame = self.apply_max_size(frame, self.max_size) for effect in self.effects: frame = ImageProcessor.apply_frame_effect(frame, effect) # return the final frame: return frame * And reading the "max_size" from the command line of course: #Somewhere in our code: self.max_size = self.get_param("max_size", None) # Somewhere else in our code: psr = context.get_parser('main.process') psr.add_argument("--output", dest="output_dir", type=str, help="Output dir where to store the valid files") psr.add_argument("--input", dest="input_dir", type=str, help="Input dir where to start the filtering") psr.add_argument("--fx", dest="effects", type=str, help="List of effects to apply on the images") psr.add_argument("--maxsize", dest="max_size", type=int, help="Maximum size that the resulting image should have.") * And that's working alright again: $ nvp run imgs --fx sketch:21 --maxsize 1280 2022/05/11 11:54:12 [__main__] INFO: Collected 6 input images. 2022/05/11 11:54:12 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/11 11:54:12 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/11 11:54:13 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/11 11:54:14 [__main__] INFO: 4/6: Processing oh-hey-oh-hey-there.gif... 2022/05/11 11:54:14 [__main__] INFO: 5/6: Processing oh-hey-ryan-reynolds.gif... 2022/05/11 11:54:15 [__main__] INFO: 6/6: Processing school-of-rock-jack-black.gif... * Here is a resized image for instance: {{ blog:2022:0511:resized_image.jpg?800 }} ===== Third processing stage: gif duration multiplier ===== * This one is more like a little detail, but I was just thinking about it so why not ? => I'm thinking I could specify another command line argument to control the playback speed of a given gif image: I could provide a multiplier, and the use that to multiplier the frame duration of the final output. * => Let's add that 😝 * Easy enough: we just need to apply the new "speed_multiplier" value I introduced just before writing the output gif file: if ext == ".gif": # Write a gif image from the frames: # Use the original frame duration: frame_dur = int(img.info.get('duration', 50)/self.speed_multiplier) frames[0].save(out_file, save_all=True, append_images=frames[1:], optimize=True, duration=frame_dur, loop=0) * denoising sketch filter: https://opencv24-python-tutorials.readthedocs.io/en/latest/py_tutorials/py_photo/py_non_local_means/py_non_local_means.html ===== Processing stage 4: gif denoising with temporal filtering ===== * One last thing I would like to add now is a "multi frame denoising" support which could be handy when creating animated sketch as shown above (we can notice some noise from frame to frame): * => I initially found some documentation about that on this page: https://docs.opencv.org/4.x/d5/d69/tutorial_py_non_local_means.html * So I added this code section to handle this denoising process: # Apply denoising effect if needed: if self.multiframes_denoise is not None: args = self.multiframes_denoise.split(":") temporal = int(args[1]) if len(args) >= 2 else 3 hhh = int(args[2]) if len(args) >= 3 else 10 template = int(args[3]) if len(args) >= 4 else 7 wsize = int(args[4]) if len(args) >= 5 else 35 # We should apply multi frame denoising here: logger.info("Applying overall denoising...") arrs = [np.asarray(frame) for frame in frames] nframes = len(frames) flist = [] woff = int((temporal-1)/2) for i in range(nframes): logger.info("Denoising frame %d/%d...", i+1, nframes) if i < woff or i > (nframes-woff-1): # Single frame denoising here: dst = cv2.fastNlMeansDenoising(arrs[i]) else: dst = cv2.fastNlMeansDenoisingColoredMulti( arrs, i, temporalWindowSize=temporal, h=hhh, templateWindowSize=template, searchWindowSize=wsize) flist.append(dst) frames = [Image.fromarray(arr) for arr in flist] * It goods not "too bad", but not specially great either, and it's pretty slow for "large" images so not sure I'm going to use that so much: $ nvp run imgs --fx sketch:21,edge2 -d -s 0.7 -o ../sketch2 2022/05/12 09:40:10 [__main__] INFO: Collected 6 input images. 2022/05/12 09:40:10 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/12 09:40:12 [__main__] INFO: Applying overall denoising... 2022/05/12 09:40:13 [__main__] INFO: Denoising frame 1/1... 2022/05/12 09:41:31 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/12 09:41:33 [__main__] INFO: Applying overall denoising... 2022/05/12 09:41:33 [__main__] INFO: Denoising frame 1/1... 2022/05/12 09:42:43 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/12 09:42:43 [__main__] INFO: Applying overall denoising... 2022/05/12 09:42:43 [__main__] INFO: Denoising frame 1/35... 2022/05/12 09:42:44 [__main__] INFO: Denoising frame 2/35... 2022/05/12 09:42:50 [__main__] INFO: Denoising frame 3/35... 2022/05/12 09:42:56 [__main__] INFO: Denoising frame 4/35... 2022/05/12 09:43:01 [__main__] INFO: Denoising frame 5/35... 2022/05/12 09:43:06 [__main__] INFO: Denoising frame 6/35... 2022/05/12 09:43:12 [__main__] INFO: Denoising frame 7/35... 2022/05/12 09:43:17 [__main__] INFO: Denoising frame 8/35... 2022/05/12 09:43:23 [__main__] INFO: Denoising frame 9/35... 2022/05/12 09:43:28 [__main__] INFO: Denoising frame 10/35... If one day I want to reconsider this: one thing that is not so great here is that the first/last frames are not denoised with the same algorithm: because we need a window of frames before and after the frame we want to process. But one way around that could be to artificially add the sibling frames in 'reverse time' before the first frame and after the last one (ie. in the array 'arrs' in the code above) * Arrff 😅, actually, I wanted to make this right, so I just implemented this mechanism I mentioned in the note just above: # Apply denoising effect if needed: if self.multiframes_denoise is not None: args = self.multiframes_denoise.split(":") temporal = int(args[1]) if len(args) >= 2 else 5 hhh = int(args[2]) if len(args) >= 3 else 6 template = int(args[3]) if len(args) >= 4 else 7 wsize = int(args[4]) if len(args) >= 5 else 35 # We should apply multi frame denoising here: logger.info("Applying overall denoising: %d:%d:%d:%d...", temporal, hhh, template, wsize) arrs = [np.asarray(frame) for frame in frames] nframes = len(frames) flist = [] woff = int((temporal-1)/2) if nframes == 1: # Apply simple denoising here: dst = cv2.fastNlMeansDenoising(arrs[0]) flist.append(dst) else: # Add the additional starting/ending frames: before = [] after = [] logger.info("Adding %d additional sibling frames.", woff) for i in range(woff): idx = min(i+1, nframes-1) before.insert(0, arrs[idx]) idx = max(nframes-1-i-1, 0) after.append(arrs[idx]) arrs = before+arrs+after for i in range(nframes): logger.info("Denoising frame %d/%d...", i+1, nframes) dst = cv2.fastNlMeansDenoisingColoredMulti( arrs, woff+i, temporalWindowSize=temporal, h=hhh, templateWindowSize=template, searchWindowSize=wsize) flist.append(dst) frames = [Image.fromarray(arr) for arr in flist] * => And indeed, this helps reducing the artefacts with the first/last frames 👍! So this is good. But the denoising process itself is still real slow 😖. Arrf, never mind. ===== Conclusion ===== * And now I think this is good enough for a starting version of this utility: there is a lot more I could add here, but, well, as usual, the idea was to keep this post as short as possible, so let's just say we'll get back to this tool when there is a need for it 👍! See yaaa!