Morning the world! After my previous quick project article on the Filtering of corrupted gif files what I want to build here is a minimal command line utility I could use to perform simple images manipulation, like resizing, cropping, applying simple effets, etc.
I already started to build something in that direction in the article Image overview generation, but I need to make this a bit more generic: should still support gifs, and batch processing of images, but not really to create a single overview image at the end.
So let's get to work, shall we!
"""Module for ImageProcessor class definition""" import logging from nvp.nvp_context import NVPContext from nvp.nvp_component import NVPComponent logger = logging.getLogger(__name__) class ImageProcessor(NVPComponent): """ImageProcessor component class""" def __init__(self, ctx: NVPContext): """class constructor""" NVPComponent.__init__(self, ctx) def process_command(self, cmd): """Check if this component can process the given command""" if cmd == 'process': return self.process_images() return False def process_images(self): """Process the input images from a given folder""" logger.info("Should apply image processing here.") return True if __name__ == "__main__": # Create the context: context = NVPContext() # Add our component: comp = context.register_component("ImageProcessor", ImageProcessor(context)) context.define_subparsers("main", { 'process': None, }) psr = context.get_parser('main.process') psr.add_argument("--output", dest="output_dir", type=str, help="Output dir where to store the valid files") psr.add_argument("--input", dest="input_dir", type=str, help="Input dir where to start the filtering") comp.run()
"imgs": { "custom_python_env": "media_env", "cmd": "${PYTHON} ${PROJECT_ROOT_DIR}/nvh/media/image_processor.py process", "python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"] }
$ nvp run imgs 2022/05/11 07:15:14 [__main__] INFO: Should apply image processing here.
process_images
method providing support for this: def process_images(self): """Process the input images from a given folder""" input_dir = self.get_param("input_dir") if input_dir is None: # Use the current working dir: input_dir = self.get_cwd() if self.is_relative_path(input_dir): # turn this into an absolute path: input_dir = self.get_path(self.get_cwd(), input_dir) output_dir = self.get_param("output_dir") if output_dir is None: # We use the parent folder of the input: folder = self.get_filename(input_dir) parent_dir = self.get_parent_folder(input_dir) output_dir = self.get_path(parent_dir, f"{folder}_filtered") if self.is_relative_path(output_dir): # turn this into an absolute path: output_dir = self.get_path(self.get_cwd(), output_dir) # Create the destination dir: self.make_folder(output_dir) # If the input is a folder, then we collect all the files in there: assert self.path_exists(input_dir), f"Invalid input provided {input_dir}" if self.file_exists(input_dir): all_files = [input_dir] else: # Collect all the files in that input folder: recurv = self.get_param("recurvise", False) all_files = self.get_all_files(input_dir, recursive=recurv) num_imgs = len(all_files) logger.info("Collected %d input images.", num_imgs) return True
$ nvp run imgs 2022/05/11 07:27:32 [__main__] INFO: Collected 44 input images.
process_images
method: for i in range(num_imgs): fname = all_files[i] if self.is_image_file(fname): logger.info("%d/%d: Processing %s...", i+1, num_imgs, fname) self.process_image(fname, input_dir, output_dir)
def is_image_file(self, fname): """Check if a given file is a valid image file simply checking the extension of the file""" ext = self.get_path_extension(fname) return ext.lower() in [".gif", ".jpg", ".png"] def process_image(self, fname, input_dir, output_dir): """Process a single image file.""" file_path = self.get_path(input_dir, fname) logger.info("Should process image file %s here.", file_path)
process_image
def process_image(self, fname, input_dir, output_dir): """Process a single image file.""" file_path = self.get_path(input_dir, fname) logger.info("Should process image file %s here.", file_path) # We start with opening the image: try: img = Image.open(file_path) # Apply the orientation change as required: img = ImageProcessor.apply_exif_orientation(img) # Collect the frames from the input image: frames = [] for frame in ImageSequence.Iterator(img): frames.append(self.process_frame(frame)) # For now use the same output format as the input: ext = self.get_path_extension(fname) out_file = self.get_path(output_dir, fname) dest_folder = self.get_parent_folder(out_file) self.make_folder(dest_folder) if ext == ".gif": # Write a gif image from the frames: # Use the original frame duration: frame_dur = getattr(img, 'frame_duration', 50) frames[0].save(out_file, save_all=True, append_images=frames[1:], optimize=True, duration=frame_dur, loop=0) elif ext == ".jpg": # Save as a regular simple image: nframes = len(frames) if nframes == 1: frames[0].save(out_file, quality=90) else: out_base = self.remove_file_extension(out_file) for i in range(nframes): frames[i].save(f"{out_base}_frame{i:03d}{ext}", quality=90) except (UnidentifiedImageError, Image.DecompressionBombError) as err: logger.error("Cannot process file %s: %s", file_path, str(err))
process_frame()
method (we can have multiple frame in a given image if handling gifs for instance)@staticmethod def apply_frame_effect(frame, effect): """Apply a given effect on a frame""" if effect == "bw": return ImageOps.grayscale(frame) if effect == "contour": return frame.filter(ImageFilter.CONTOUR) if effect == "invert": return ImageOps.invert(frame) if effect == "detail": return frame.filter(ImageFilter.DETAIL) if effect == "edge": return frame.filter(ImageFilter.EDGE_ENHANCE) if effect == "edge2": return frame.filter(ImageFilter.EDGE_ENHANCE_MORE) if effect == "find_edges": return frame.filter(ImageFilter.FIND_EDGES) if effect == "sharpen": return frame.filter(ImageFilter.SHARPEN) if effect.startswith("sketch"): # cf. https://towardsdatascience.com/generate-pencil-sketch-from-photo-in-python-7c56802d8acb # cf.https://www.askpython.com/python/examples/images-to-pencil-sketch args = effect.split(":") radius = int(args[1]) # # step 1: convert image to grayscale: # gray_img = ImageOps.grayscale(frame) # # step 2: invert the grayscale image: # inv_img = ImageOps.invert(gray_img) # # step 3: Apply gaussian blur with given radius: # inv_blur_img = inv_img.filter(ImageFilter.GaussianBlur(radius=radius)) # # step 4: Invert blur image: # blur_img = ImageOps.invert(inv_blur_img) # Convert to opencv: img = np.asarray(frame) # grey_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) invert_img = cv2.bitwise_not(grey_img) blur_img = cv2.GaussianBlur(invert_img, (radius, radius), 0) invblur_img = cv2.bitwise_not(blur_img) sketch_img = cv2.divide(grey_img, invblur_img, scale=256.0) col_img = cv2.cvtColor(sketch_img, cv2.COLOR_GRAY2RGB) # Convert back to PIL image: return Image.fromarray(col_img) logger.warning("Cannot apply unknown frame effect '%s'", effect) return frame def process_frame(self, frame): """Apply the required processing on a single image frame""" for effect in self.effects: frame = ImageProcessor.apply_frame_effect(frame, effect) # return the final frame: return frame
psr.add_argument("--fx", dest="effects", type=str, help="List of effects to apply on the images")
$ nvp run imgs --fx sketch:21 2022/05/11 10:55:50 [__main__] INFO: Collected 6 input images. 2022/05/11 10:55:50 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/11 10:55:50 [__main__] INFO: Should process image file D:\Temp\images\IMG_20220215_094740.jpg here. 2022/05/11 10:55:52 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/11 10:55:52 [__main__] INFO: Should process image file D:\Temp\images\IMG_20220310_214814.jpg here. 2022/05/11 10:55:54 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/11 10:55:54 [__main__] INFO: Should process image file D:\Temp\images\music-teacher-jack-black.gif here. Traceback (most recent call last): File "D:\Projects\NervHome\nvh\media\image_processor.py", line 245, in <module> comp.run() File "D:\Projects\NervProj\nvp\nvp_component.py", line 69, in run res = self.process_command(cmd) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 28, in process_command return self.process_images() File "D:\Projects\NervHome\nvh\media\image_processor.py", line 83, in process_images self.process_image(fname, input_dir, output_dir) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 195, in process_image frames.append(self.process_frame(frame)) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 175, in process_frame frame = ImageProcessor.apply_frame_effect(frame, effect) File "D:\Projects\NervHome\nvh\media\image_processor.py", line 158, in apply_frame_effect grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) cv2.error: OpenCV(4.5.5) d:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\color.simd_helpers.hpp:92: error: (-2:Unspecified error) in function '__cdecl cv::impl::`anonymous-namespace'::CvtHelper<struct cv::impl::`anonymous namespace'::Set<3,4,-1>,struct cv: :impl::A0x7123906f::Set<1,-1,-1>,struct cv::impl::A0x7123906f::Set<0,2,5>,2>::CvtHelper(const class cv::_InputArray &,const class cv:: _OutputArray &,int)' > Invalid number of channels in input image: > 'VScn::contains(scn)' > where > 'scn' is 1
frames = [] for frame in ImageSequence.Iterator(img): # logger.info("Frame mode: %s", frame.mode) if frame.mode == "P": frame = frame.convert("RGBA") frames.append(self.process_frame(frame))
$ nvp run imgs --fx sketch:21 2022/05/11 11:20:37 [__main__] INFO: Collected 6 input images. 2022/05/11 11:20:37 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/11 11:20:38 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/11 11:20:39 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/11 11:20:40 [__main__] INFO: 4/6: Processing oh-hey-oh-hey-there.gif... 2022/05/11 11:20:40 [__main__] INFO: 5/6: Processing oh-hey-ryan-reynolds.gif... 2022/05/11 11:20:41 [__main__] INFO: 6/6: Processing school-of-rock-jack-black.gif...
def apply_max_size(self, frame, max_size): """Apply a max_size transform on the frame""" width = frame.width height = frame.height new_width = None new_height = None # logger.info("Frame size: %dx%d, maxsize: %d", width, height, max_size) if width > height and width > max_size: # We have to reduce the width: new_width = max_size new_height = int(height * max_size/width) elif height > width and height > max_size: # we have to reduce the height: new_height = max_size new_width = int(width * max_size/height) if new_width is not None and new_height is not None: logger.debug("New frame size: %dx%d", new_width, new_height) frame = frame.resize((new_width, new_height)) return frame def process_frame(self, frame): """Apply the required processing on a single image frame""" # Check if we need to resize that frame: if self.max_size is not None: frame = self.apply_max_size(frame, self.max_size) for effect in self.effects: frame = ImageProcessor.apply_frame_effect(frame, effect) # return the final frame: return frame
#Somewhere in our code: self.max_size = self.get_param("max_size", None) # Somewhere else in our code: psr = context.get_parser('main.process') psr.add_argument("--output", dest="output_dir", type=str, help="Output dir where to store the valid files") psr.add_argument("--input", dest="input_dir", type=str, help="Input dir where to start the filtering") psr.add_argument("--fx", dest="effects", type=str, help="List of effects to apply on the images") psr.add_argument("--maxsize", dest="max_size", type=int, help="Maximum size that the resulting image should have.")
$ nvp run imgs --fx sketch:21 --maxsize 1280 2022/05/11 11:54:12 [__main__] INFO: Collected 6 input images. 2022/05/11 11:54:12 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/11 11:54:12 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/11 11:54:13 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/11 11:54:14 [__main__] INFO: 4/6: Processing oh-hey-oh-hey-there.gif... 2022/05/11 11:54:14 [__main__] INFO: 5/6: Processing oh-hey-ryan-reynolds.gif... 2022/05/11 11:54:15 [__main__] INFO: 6/6: Processing school-of-rock-jack-black.gif...
if ext == ".gif": # Write a gif image from the frames: # Use the original frame duration: frame_dur = int(img.info.get('duration', 50)/self.speed_multiplier) frames[0].save(out_file, save_all=True, append_images=frames[1:], optimize=True, duration=frame_dur, loop=0)
# Apply denoising effect if needed: if self.multiframes_denoise is not None: args = self.multiframes_denoise.split(":") temporal = int(args[1]) if len(args) >= 2 else 3 hhh = int(args[2]) if len(args) >= 3 else 10 template = int(args[3]) if len(args) >= 4 else 7 wsize = int(args[4]) if len(args) >= 5 else 35 # We should apply multi frame denoising here: logger.info("Applying overall denoising...") arrs = [np.asarray(frame) for frame in frames] nframes = len(frames) flist = [] woff = int((temporal-1)/2) for i in range(nframes): logger.info("Denoising frame %d/%d...", i+1, nframes) if i < woff or i > (nframes-woff-1): # Single frame denoising here: dst = cv2.fastNlMeansDenoising(arrs[i]) else: dst = cv2.fastNlMeansDenoisingColoredMulti( arrs, i, temporalWindowSize=temporal, h=hhh, templateWindowSize=template, searchWindowSize=wsize) flist.append(dst) frames = [Image.fromarray(arr) for arr in flist]
$ nvp run imgs --fx sketch:21,edge2 -d -s 0.7 -o ../sketch2 2022/05/12 09:40:10 [__main__] INFO: Collected 6 input images. 2022/05/12 09:40:10 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg... 2022/05/12 09:40:12 [__main__] INFO: Applying overall denoising... 2022/05/12 09:40:13 [__main__] INFO: Denoising frame 1/1... 2022/05/12 09:41:31 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg... 2022/05/12 09:41:33 [__main__] INFO: Applying overall denoising... 2022/05/12 09:41:33 [__main__] INFO: Denoising frame 1/1... 2022/05/12 09:42:43 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif... 2022/05/12 09:42:43 [__main__] INFO: Applying overall denoising... 2022/05/12 09:42:43 [__main__] INFO: Denoising frame 1/35... 2022/05/12 09:42:44 [__main__] INFO: Denoising frame 2/35... 2022/05/12 09:42:50 [__main__] INFO: Denoising frame 3/35... 2022/05/12 09:42:56 [__main__] INFO: Denoising frame 4/35... 2022/05/12 09:43:01 [__main__] INFO: Denoising frame 5/35... 2022/05/12 09:43:06 [__main__] INFO: Denoising frame 6/35... 2022/05/12 09:43:12 [__main__] INFO: Denoising frame 7/35... 2022/05/12 09:43:17 [__main__] INFO: Denoising frame 8/35... 2022/05/12 09:43:23 [__main__] INFO: Denoising frame 9/35... 2022/05/12 09:43:28 [__main__] INFO: Denoising frame 10/35...
# Apply denoising effect if needed: if self.multiframes_denoise is not None: args = self.multiframes_denoise.split(":") temporal = int(args[1]) if len(args) >= 2 else 5 hhh = int(args[2]) if len(args) >= 3 else 6 template = int(args[3]) if len(args) >= 4 else 7 wsize = int(args[4]) if len(args) >= 5 else 35 # We should apply multi frame denoising here: logger.info("Applying overall denoising: %d:%d:%d:%d...", temporal, hhh, template, wsize) arrs = [np.asarray(frame) for frame in frames] nframes = len(frames) flist = [] woff = int((temporal-1)/2) if nframes == 1: # Apply simple denoising here: dst = cv2.fastNlMeansDenoising(arrs[0]) flist.append(dst) else: # Add the additional starting/ending frames: before = [] after = [] logger.info("Adding %d additional sibling frames.", woff) for i in range(woff): idx = min(i+1, nframes-1) before.insert(0, arrs[idx]) idx = max(nframes-1-i-1, 0) after.append(arrs[idx]) arrs = before+arrs+after for i in range(nframes): logger.info("Denoising frame %d/%d...", i+1, nframes) dst = cv2.fastNlMeansDenoisingColoredMulti( arrs, woff+i, temporalWindowSize=temporal, h=hhh, templateWindowSize=template, searchWindowSize=wsize) flist.append(dst) frames = [Image.fromarray(arr) for arr in flist]