blog:2022:0511_nervproj_image_processor

Quick project: image processing tool [python]

Morning the world! After my previous quick project article on the Filtering of corrupted gif files what I want to build here is a minimal command line utility I could use to perform simple images manipulation, like resizing, cropping, applying simple effets, etc.

I already started to build something in that direction in the article Image overview generation, but I need to make this a bit more generic: should still support gifs, and batch processing of images, but not really to create a single overview image at the end.

So let's get to work, shall we!

  • As usual, we start with the building of the skelekon for the dedicated component.
Actually that's something I might consider automating someday: the creation of simple components skeleton with a simple command ;-)
  • So here is the initial minimal file:
    """Module for ImageProcessor class definition"""
    
    import logging
    
    from nvp.nvp_context import NVPContext
    from nvp.nvp_component import NVPComponent
    
    logger = logging.getLogger(__name__)
    
    
    class ImageProcessor(NVPComponent):
        """ImageProcessor component class"""
    
        def __init__(self, ctx: NVPContext):
            """class constructor"""
            NVPComponent.__init__(self, ctx)
    
        def process_command(self, cmd):
            """Check if this component can process the given command"""
    
            if cmd == 'process':
                return self.process_images()
    
            return False
    
        def process_images(self):
            """Process the input images from a given folder"""
    
            logger.info("Should apply image processing here.")
    
            return True
    
    
    if __name__ == "__main__":
        # Create the context:
        context = NVPContext()
    
        # Add our component:
        comp = context.register_component("ImageProcessor", ImageProcessor(context))
    
        context.define_subparsers("main", {
            'process': None,
        })
    
        psr = context.get_parser('main.process')
        psr.add_argument("--output", dest="output_dir", type=str,
                         help="Output dir where to store the valid files")
        psr.add_argument("--input", dest="input_dir", type=str,
                         help="Input dir where to start the filtering")
    
        comp.run()
    
  • Here is the new script added to run this utility:
        "imgs": {
          "custom_python_env": "media_env",
          "cmd": "${PYTHON} ${PROJECT_ROOT_DIR}/nvh/media/image_processor.py process",
          "python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"]
        }
  • And this works just as expected:
    $ nvp run imgs
    2022/05/11 07:15:14 [__main__] INFO: Should apply image processing here.
  • The utility should be flexible enough to support the following behaviors:
    • If no input is specified, we use the current working dir as input and process all the images in there recursively
    • If no output is specified, we put all the resulting images in a sibling folder to the input folder
    • If input is an image file we process only that file
    • if input is a folder we may process files recursively or not.
  • ⇒ Basically, this is similar to the mechanism I started to build in Filtering of corrupted gif files.
  • So here is the updated process_images method providing support for this:
        def process_images(self):
            """Process the input images from a given folder"""
    
            input_dir = self.get_param("input_dir")
            if input_dir is None:
                # Use the current working dir:
                input_dir = self.get_cwd()
    
            if self.is_relative_path(input_dir):
                # turn this into an absolute path:
                input_dir = self.get_path(self.get_cwd(), input_dir)
    
            output_dir = self.get_param("output_dir")
            if output_dir is None:
                # We use the parent folder of the input:
                folder = self.get_filename(input_dir)
                parent_dir = self.get_parent_folder(input_dir)
                output_dir = self.get_path(parent_dir, f"{folder}_filtered")
    
            if self.is_relative_path(output_dir):
                # turn this into an absolute path:
                output_dir = self.get_path(self.get_cwd(), output_dir)
    
            # Create the destination dir:
            self.make_folder(output_dir)
    
            # If the input is a folder, then we collect all the files in there:
            assert self.path_exists(input_dir), f"Invalid input provided {input_dir}"
    
            if self.file_exists(input_dir):
                all_files = [input_dir]
            else:
                # Collect all the files in that input folder:
                recurv = self.get_param("recurvise", False)
                all_files = self.get_all_files(input_dir, recursive=recurv)
    
            num_imgs = len(all_files)
            logger.info("Collected %d input images.", num_imgs)
    
            return True
  • ⇒ And this is finding the input files as desired:
    $ nvp run imgs
    2022/05/11 07:27:32 [__main__] INFO: Collected 44 input images.
  • Once we have a list of input file, we can start processing them one by one, just adding this code at the end of the process_images method:
            for i in range(num_imgs):
                fname = all_files[i]
                if self.is_image_file(fname):
                    logger.info("%d/%d: Processing %s...", i+1, num_imgs, fname)
                    self.process_image(fname, input_dir, output_dir)
    
  • And then I added the following 2 helper functions referenced above:
        def is_image_file(self, fname):
            """Check if a given file is a valid image file
            simply checking the extension of the file"""
            ext = self.get_path_extension(fname)
            return ext.lower() in [".gif", ".jpg", ".png"]
    
        def process_image(self, fname, input_dir, output_dir):
            """Process a single image file."""
            file_path = self.get_path(input_dir, fname)
            logger.info("Should process image file %s here.", file_path)
  • ⇒ Now we can focus on processing a single image in process_image
  • So let's write something useful in process_image now, starting with some image processing effects:
        def process_image(self, fname, input_dir, output_dir):
            """Process a single image file."""
            file_path = self.get_path(input_dir, fname)
            logger.info("Should process image file %s here.", file_path)
    
            # We start with opening the image:
            try:
                img = Image.open(file_path)
    
                # Apply the orientation change as required:
                img = ImageProcessor.apply_exif_orientation(img)
    
                # Collect the frames from the input image:
                frames = []
                for frame in ImageSequence.Iterator(img):
                    frames.append(self.process_frame(frame))
    
                # For now use the same output format as the input:
                ext = self.get_path_extension(fname)
    
                out_file = self.get_path(output_dir, fname)
                dest_folder = self.get_parent_folder(out_file)
                self.make_folder(dest_folder)
    
                if ext == ".gif":
                    # Write a gif image from the frames:
    
                    # Use the original frame duration:
                    frame_dur = getattr(img, 'frame_duration', 50)
                    frames[0].save(out_file, save_all=True, append_images=frames[1:],
                                   optimize=True, duration=frame_dur, loop=0)
                elif ext == ".jpg":
                    # Save as a regular simple image:
                    nframes = len(frames)
                    if nframes == 1:
                        frames[0].save(out_file, quality=90)
                    else:
                        out_base = self.remove_file_extension(out_file)
    
                        for i in range(nframes):
                            frames[i].save(f"{out_base}_frame{i:03d}{ext}", quality=90)
    
            except (UnidentifiedImageError, Image.DecompressionBombError) as err:
                logger.error("Cannot process file %s: %s", file_path, str(err))
  • The method above will mostly handle processing each “frame” in a given image with the process_frame() method (we can have multiple frame in a given image if handling gifs for instance)
  • And for the processing itself I currently just added this:
        @staticmethod
        def apply_frame_effect(frame, effect):
            """Apply a given effect on a frame"""
    
            if effect == "bw":
                return ImageOps.grayscale(frame)
            if effect == "contour":
                return frame.filter(ImageFilter.CONTOUR)
            if effect == "invert":
                return ImageOps.invert(frame)
            if effect == "detail":
                return frame.filter(ImageFilter.DETAIL)
            if effect == "edge":
                return frame.filter(ImageFilter.EDGE_ENHANCE)
            if effect == "edge2":
                return frame.filter(ImageFilter.EDGE_ENHANCE_MORE)
            if effect == "find_edges":
                return frame.filter(ImageFilter.FIND_EDGES)
            if effect == "sharpen":
                return frame.filter(ImageFilter.SHARPEN)
            if effect.startswith("sketch"):
                # cf. https://towardsdatascience.com/generate-pencil-sketch-from-photo-in-python-7c56802d8acb
                # cf.https://www.askpython.com/python/examples/images-to-pencil-sketch
                args = effect.split(":")
                radius = int(args[1])
    
                # # step 1: convert image to grayscale:
                # gray_img = ImageOps.grayscale(frame)
                # # step 2: invert the grayscale image:
                # inv_img = ImageOps.invert(gray_img)
                # # step 3: Apply gaussian blur with given radius:
                # inv_blur_img = inv_img.filter(ImageFilter.GaussianBlur(radius=radius))
                # # step 4: Invert blur image:
                # blur_img = ImageOps.invert(inv_blur_img)
    
                # Convert to opencv:
                img = np.asarray(frame)
                # grey_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
                invert_img = cv2.bitwise_not(grey_img)
                blur_img = cv2.GaussianBlur(invert_img, (radius, radius), 0)
                invblur_img = cv2.bitwise_not(blur_img)
                sketch_img = cv2.divide(grey_img, invblur_img, scale=256.0)
                col_img = cv2.cvtColor(sketch_img, cv2.COLOR_GRAY2RGB)
    
                # Convert back to PIL image:
                return Image.fromarray(col_img)
    
            logger.warning("Cannot apply unknown frame effect '%s'", effect)
            return frame
    
        def process_frame(self, frame):
            """Apply the required processing on a single image frame"""
    
            for effect in self.effects:
                frame = ImageProcessor.apply_frame_effect(frame, effect)
    
            # return the final frame:
            return frame
  • Then I of course had to introduce support to specify the effects on the command line:
        psr.add_argument("--fx", dest="effects", type=str,
                         help="List of effects to apply on the images")
  • So let's give it a try on some test images:
    $ nvp run imgs --fx sketch:21
    2022/05/11 10:55:50 [__main__] INFO: Collected 6 input images.
    2022/05/11 10:55:50 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg...
    2022/05/11 10:55:50 [__main__] INFO: Should process image file D:\Temp\images\IMG_20220215_094740.jpg here.
    2022/05/11 10:55:52 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg...
    2022/05/11 10:55:52 [__main__] INFO: Should process image file D:\Temp\images\IMG_20220310_214814.jpg here.
    2022/05/11 10:55:54 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif...
    2022/05/11 10:55:54 [__main__] INFO: Should process image file D:\Temp\images\music-teacher-jack-black.gif here.
    Traceback (most recent call last):
      File "D:\Projects\NervHome\nvh\media\image_processor.py", line 245, in <module>
        comp.run()
      File "D:\Projects\NervProj\nvp\nvp_component.py", line 69, in run
        res = self.process_command(cmd)
      File "D:\Projects\NervHome\nvh\media\image_processor.py", line 28, in process_command
        return self.process_images()
      File "D:\Projects\NervHome\nvh\media\image_processor.py", line 83, in process_images
        self.process_image(fname, input_dir, output_dir)
      File "D:\Projects\NervHome\nvh\media\image_processor.py", line 195, in process_image
        frames.append(self.process_frame(frame))
      File "D:\Projects\NervHome\nvh\media\image_processor.py", line 175, in process_frame
        frame = ImageProcessor.apply_frame_effect(frame, effect)
      File "D:\Projects\NervHome\nvh\media\image_processor.py", line 158, in apply_frame_effect
        grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    cv2.error: OpenCV(4.5.5) d:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\color.simd_helpers.hpp:92: error: (-2:Unspecified
     error) in function '__cdecl cv::impl::`anonymous-namespace'::CvtHelper<struct cv::impl::`anonymous namespace'::Set<3,4,-1>,struct cv:
    :impl::A0x7123906f::Set<1,-1,-1>,struct cv::impl::A0x7123906f::Set<0,2,5>,2>::CvtHelper(const class cv::_InputArray &,const class cv::
    _OutputArray &,int)'
    > Invalid number of channels in input image:
    >     'VScn::contains(scn)'
    > where
    >     'scn' is 1
    
  • Hmmm… okay so I have a problem with the number of channels apparently 🤔…
  • ⇒ Ok, fixed by handling correctly the case of “P” frame mode in gif files:
                frames = []
                for frame in ImageSequence.Iterator(img):
                    # logger.info("Frame mode: %s", frame.mode)
                    if frame.mode == "P":
                        frame = frame.convert("RGBA")
                    frames.append(self.process_frame(frame))
  • So now it works just fine:
    $ nvp run imgs --fx sketch:21
    2022/05/11 11:20:37 [__main__] INFO: Collected 6 input images.
    2022/05/11 11:20:37 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg...
    2022/05/11 11:20:38 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg...
    2022/05/11 11:20:39 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif...
    2022/05/11 11:20:40 [__main__] INFO: 4/6: Processing oh-hey-oh-hey-there.gif...
    2022/05/11 11:20:40 [__main__] INFO: 5/6: Processing oh-hey-ryan-reynolds.gif...
    2022/05/11 11:20:41 [__main__] INFO: 6/6: Processing school-of-rock-jack-black.gif...
  • And the results are just as I expected them to be:

  • ⇒ Cooool ✌😎!
  • Next common processing operation I need to support is the image dimensions reduction: most of the time, my images are too big to be usable for anything 'online' ⇒ So I need to reduce their size while also keeping the aspect ratio.
  • Let's try to simple add one command line argument here: the max-size which will be applied either on the width or the height on the image (depending on which one is the largest)
  • Here are the updated processing steps:
        def apply_max_size(self, frame, max_size):
            """Apply a max_size transform on the frame"""
            width = frame.width
            height = frame.height
            new_width = None
            new_height = None
    
            # logger.info("Frame size: %dx%d, maxsize: %d", width, height, max_size)
            if width > height and width > max_size:
                # We have to reduce the width:
                new_width = max_size
                new_height = int(height * max_size/width)
            elif height > width and height > max_size:
                # we have to reduce the height:
                new_height = max_size
                new_width = int(width * max_size/height)
    
            if new_width is not None and new_height is not None:
                logger.debug("New frame size: %dx%d", new_width, new_height)
                frame = frame.resize((new_width, new_height))
    
            return frame
    
        def process_frame(self, frame):
            """Apply the required processing on a single image frame"""
    
            # Check if we need to resize that frame:
            if self.max_size is not None:
                frame = self.apply_max_size(frame, self.max_size)
    
            for effect in self.effects:
                frame = ImageProcessor.apply_frame_effect(frame, effect)
    
            # return the final frame:
            return frame
  • And reading the “max_size” from the command line of course:
        #Somewhere in our code:
        self.max_size = self.get_param("max_size", None)
        
        # Somewhere else in our code: 
        psr = context.get_parser('main.process')
        psr.add_argument("--output", dest="output_dir", type=str,
                         help="Output dir where to store the valid files")
        psr.add_argument("--input", dest="input_dir", type=str,
                         help="Input dir where to start the filtering")
        psr.add_argument("--fx", dest="effects", type=str,
                         help="List of effects to apply on the images")
        psr.add_argument("--maxsize", dest="max_size", type=int,
                         help="Maximum size that the resulting image should have.")
    
  • And that's working alright again:
    $ nvp run imgs --fx sketch:21 --maxsize 1280
    2022/05/11 11:54:12 [__main__] INFO: Collected 6 input images.
    2022/05/11 11:54:12 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg...
    2022/05/11 11:54:12 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg...
    2022/05/11 11:54:13 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif...
    2022/05/11 11:54:14 [__main__] INFO: 4/6: Processing oh-hey-oh-hey-there.gif...
    2022/05/11 11:54:14 [__main__] INFO: 5/6: Processing oh-hey-ryan-reynolds.gif...
    2022/05/11 11:54:15 [__main__] INFO: 6/6: Processing school-of-rock-jack-black.gif...
  • Here is a resized image for instance:

resized_image.jpg

  • This one is more like a little detail, but I was just thinking about it so why not ? ⇒ I'm thinking I could specify another command line argument to control the playback speed of a given gif image: I could provide a multiplier, and the use that to multiplier the frame duration of the final output.
  • ⇒ Let's add that 😝
  • Easy enough: we just need to apply the new “speed_multiplier” value I introduced just before writing the output gif file:
                if ext == ".gif":
                    # Write a gif image from the frames:
    
                    # Use the original frame duration:
                    frame_dur = int(img.info.get('duration', 50)/self.speed_multiplier)
    
                    frames[0].save(out_file, save_all=True, append_images=frames[1:],
                                   optimize=True, duration=frame_dur, loop=0)
  • So I added this code section to handle this denoising process:
                # Apply denoising effect if needed:
                if self.multiframes_denoise is not None:
                    args = self.multiframes_denoise.split(":")
                    temporal = int(args[1]) if len(args) >= 2 else 3
                    hhh = int(args[2]) if len(args) >= 3 else 10
                    template = int(args[3]) if len(args) >= 4 else 7
                    wsize = int(args[4]) if len(args) >= 5 else 35
    
                    # We should apply multi frame denoising here:
                    logger.info("Applying overall denoising...")
                    arrs = [np.asarray(frame) for frame in frames]
                    nframes = len(frames)
                    flist = []
                    woff = int((temporal-1)/2)
    
                    for i in range(nframes):
                        logger.info("Denoising frame %d/%d...", i+1, nframes)
    
                        if i < woff or i > (nframes-woff-1):
                            # Single frame denoising here:
                            dst = cv2.fastNlMeansDenoising(arrs[i])
                        else:
                            dst = cv2.fastNlMeansDenoisingColoredMulti(
                                arrs, i, temporalWindowSize=temporal,
                                h=hhh,
                                templateWindowSize=template,
                                searchWindowSize=wsize)
    
                        flist.append(dst)
    
                    frames = [Image.fromarray(arr) for arr in flist]
  • It goods not “too bad”, but not specially great either, and it's pretty slow for “large” images so not sure I'm going to use that so much:
    $ nvp run imgs --fx sketch:21,edge2 -d -s 0.7 -o ../sketch2
    2022/05/12 09:40:10 [__main__] INFO: Collected 6 input images.
    2022/05/12 09:40:10 [__main__] INFO: 1/6: Processing IMG_20220215_094740.jpg...
    2022/05/12 09:40:12 [__main__] INFO: Applying overall denoising...
    2022/05/12 09:40:13 [__main__] INFO: Denoising frame 1/1...
    2022/05/12 09:41:31 [__main__] INFO: 2/6: Processing IMG_20220310_214814.jpg...
    2022/05/12 09:41:33 [__main__] INFO: Applying overall denoising...
    2022/05/12 09:41:33 [__main__] INFO: Denoising frame 1/1...
    2022/05/12 09:42:43 [__main__] INFO: 3/6: Processing music-teacher-jack-black.gif...
    2022/05/12 09:42:43 [__main__] INFO: Applying overall denoising...
    2022/05/12 09:42:43 [__main__] INFO: Denoising frame 1/35...
    2022/05/12 09:42:44 [__main__] INFO: Denoising frame 2/35...
    2022/05/12 09:42:50 [__main__] INFO: Denoising frame 3/35...
    2022/05/12 09:42:56 [__main__] INFO: Denoising frame 4/35...
    2022/05/12 09:43:01 [__main__] INFO: Denoising frame 5/35...
    2022/05/12 09:43:06 [__main__] INFO: Denoising frame 6/35...
    2022/05/12 09:43:12 [__main__] INFO: Denoising frame 7/35...
    2022/05/12 09:43:17 [__main__] INFO: Denoising frame 8/35...
    2022/05/12 09:43:23 [__main__] INFO: Denoising frame 9/35...
    2022/05/12 09:43:28 [__main__] INFO: Denoising frame 10/35...
    
If one day I want to reconsider this: one thing that is not so great here is that the first/last frames are not denoised with the same algorithm: because we need a window of frames before and after the frame we want to process. But one way around that could be to artificially add the sibling frames in 'reverse time' before the first frame and after the last one (ie. in the array 'arrs' in the code above)
  • Arrff 😅, actually, I wanted to make this right, so I just implemented this mechanism I mentioned in the note just above:
                # Apply denoising effect if needed:
                if self.multiframes_denoise is not None:
                    args = self.multiframes_denoise.split(":")
                    temporal = int(args[1]) if len(args) >= 2 else 5
                    hhh = int(args[2]) if len(args) >= 3 else 6
                    template = int(args[3]) if len(args) >= 4 else 7
                    wsize = int(args[4]) if len(args) >= 5 else 35
    
                    # We should apply multi frame denoising here:
                    logger.info("Applying overall denoising: %d:%d:%d:%d...",
                                temporal, hhh, template, wsize)
                    arrs = [np.asarray(frame) for frame in frames]
                    nframes = len(frames)
                    flist = []
                    woff = int((temporal-1)/2)
    
                    if nframes == 1:
                        # Apply simple denoising here:
                        dst = cv2.fastNlMeansDenoising(arrs[0])
                        flist.append(dst)
                    else:
                        # Add the additional starting/ending frames:
                        before = []
                        after = []
                        logger.info("Adding %d additional sibling frames.", woff)
                        for i in range(woff):
                            idx = min(i+1, nframes-1)
                            before.insert(0, arrs[idx])
    
                            idx = max(nframes-1-i-1, 0)
                            after.append(arrs[idx])
    
                        arrs = before+arrs+after
    
                        for i in range(nframes):
                            logger.info("Denoising frame %d/%d...", i+1, nframes)
    
                            dst = cv2.fastNlMeansDenoisingColoredMulti(
                                arrs, woff+i, temporalWindowSize=temporal,
                                h=hhh,
                                templateWindowSize=template,
                                searchWindowSize=wsize)
    
                            flist.append(dst)
    
                    frames = [Image.fromarray(arr) for arr in flist]
  • ⇒ And indeed, this helps reducing the artefacts with the first/last frames 👍! So this is good. But the denoising process itself is still real slow 😖. Arrf, never mind.
  • And now I think this is good enough for a starting version of this utility: there is a lot more I could add here, but, well, as usual, the idea was to keep this post as short as possible, so let's just say we'll get back to this tool when there is a need for it 👍! See yaaa!
  • blog/2022/0511_nervproj_image_processor.txt
  • Last modified: 2022/05/12 09:36
  • by 127.0.0.1