Commit 52db149e authored by WillBrennan's avatar WillBrennan

a much needed cleanup

parent 102eedc8
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'Will Brennan'
# Built-in Modules
import time
import argparse
import logging
# Standard Modules
import cv2
import numpy
# Custom Modules
logger = logging.getLogger('main')
class SkinDetector(object):
def __init__(self, args):
assert isinstance(args, argparse.Namespace), 'args must be of type argparse.Namespace'
self.args = args
self.mask = None
logger.debug('SkinDetector initialised')
@staticmethod
def assert_image(img, grey=False):
logger.debug('Applying assertions...')
depth = 3
if grey:
depth = 2
assert isinstance(img, numpy.ndarray), 'image must be a numpy array'
assert len(img.shape) == depth, 'skin detection can only work on color images'
assert img.size > 100, 'seriously... you thought this would work?'
def get_mask_hsv(self, img):
logger.debug('Applying hsv threshold')
self.assert_image(img)
lower_thresh = numpy.array([0, 50, 0], dtype=numpy.uint8)
upper_thresh = numpy.array([120, 150, 255], dtype=numpy.uint8)
img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
msk_hsv = cv2.inRange(img_hsv, lower_thresh, upper_thresh)
if self.args.debug:
scripts.display('input', img)
scripts.display('mask_hsv', msk_hsv)
self.add_mask(msk_hsv)
def get_mask_rgb(self, img):
logger.debug('Applying rgb thresholds')
lower_thresh = numpy.array([45, 52, 108], dtype=numpy.uint8)
upper_thresh = numpy.array([255, 255, 255], dtype=numpy.uint8)
mask_a = cv2.inRange(img, lower_thresh, upper_thresh)
mask_b = 255*((img[:, :, 2]-img[:, :, 1])/20)
logger.debug('mask_b unique: {0}'.format(numpy.unique(mask_b)))
mask_c = 255*((numpy.max(img, axis=2)-numpy.min(img, axis=2))/20)
logger.debug('mask_d unique: {0}'.format(numpy.unique(mask_c)))
msk_rgb = cv2.bitwise_and(mask_a, mask_b)
msk_rgb = cv2.bitwise_and(mask_c, msk_rgb)
if self.args.debug:
scripts.display('input', img)
scripts.display('mask_rgb', msk_rgb)
self.add_mask(msk_rgb)
def get_mask_ycrcb(self, img):
self.assert_image(img)
lower_thresh = numpy.array([90, 100, 130], dtype=numpy.uint8)
upper_thresh = numpy.array([230, 120, 180], dtype=numpy.uint8)
img_ycrcb = cv2.cvtColor(img, cv2.COLOR_RGB2YCR_CB)
msk_ycrcb = cv2.inRange(img_ycrcb, lower_thresh, upper_thresh)
if self.args.debug:
scripts.display('input', img)
scripts.display('mask_ycrcb', msk_ycrcb)
self.add_mask(msk_ycrcb)
def grab_cut_mask(self, img_col, mask):
kernel = numpy.ones((50, 50), numpy.float32)/(50*50)
dst = cv2.filter2D(mask, -1, kernel)
dst[dst != 0] = 255
free = numpy.array(cv2.bitwise_not(dst), dtype=numpy.uint8)
if self.args.debug:
scripts.display('not skin', free)
scripts.display('grabcut input', mask)
grab_mask = numpy.zeros(mask.shape, dtype=numpy.uint8)
grab_mask[:, :] = 2
grab_mask[mask == 255] = 1
grab_mask[free == 255] = 0
print numpy.unique(grab_mask)
if numpy.unique(grab_mask).tolist() == [0, 1]:
logger.debug('conducting grabcut')
bgdModel = numpy.zeros((1, 65), numpy.float64)
fgdModel = numpy.zeros((1, 65), numpy.float64)
if img_col.size != 0:
mask, bgdModel, fgdModel = cv2.grabCut(img_col, grab_mask, None, bgdModel, fgdModel, 5, cv2.GC_INIT_WITH_MASK)
mask = numpy.where((mask == 2) | (mask == 0), 0, 1).astype('uint8')
else:
logger.warning('img_col is empty')
return mask
@staticmethod
def closing(msk):
assert isinstance(msk, numpy.ndarray), 'msk must be a numpy array'
assert msk.ndim == 2, 'msk must be a greyscale image'
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
msk = cv2.morphologyEx(msk, cv2.MORPH_CLOSE, kernel)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
msk = cv2.morphologyEx(msk, cv2.MORPH_OPEN, kernel, iterations=2)
return msk
def process(self, img):
logger.debug('Initialising process')
dt = time.time()
self.assert_image(img)
logger.debug('Generating mean-color image')
#img = mean_color.img_mean(img)
logger.debug('Conducting thresholding')
self.n_mask = 0
self.mask = numpy.zeros(img.shape[:2], dtype=numpy.uint8)
self.get_mask_hsv(img)
self.get_mask_rgb(img)
self.get_mask_ycrcb(img)
logger.debug('Thresholding sum of masks')
self.threshold(self.args.thresh)
if self.args.debug:
scripts.display('skin_mask', self.mask)
scripts.display('input_img', img)
dt = round(time.time()-dt, 2)
hz = round(1/dt, 2)
logger.debug('Conducted processing in {0}s ({1}Hz)'.format(dt, hz))
self.mask = self.closing(self.mask)
self.mask = self.grab_cut_mask(img, self.mask)
return self.mask
def add_mask(self, img):
logger.debug('normalising mask')
self.assert_image(img, grey=True)
img[img < 128] = 0
img[img >= 128] = 1
logger.debug('normalisation complete')
logger.debug('adding mask to total mask')
self.mask += img
self.n_mask += 1
logger.debug('add mask complete')
def threshold(self, threshold):
assert isinstance(threshold, float), 'threshold must be a float (current type - {0})'.format(type(threshold))
assert 0 <= threshold <= 1, 'threshold must be between 0 & 1 (current value - {0})'.format(threshold)
assert self.n_mask > 0, 'Number of masks must be greater than 0 [n_mask ({0}) = {1}]'.format(type(self.n_mask), self.n_mask)
logger.debug('Threshold Value - {0}%'.format(int(100*threshold)))
logger.debug('Number of Masks - {0}'.format(self.n_mask))
self.mask /= self.n_mask
self.mask[self.mask < threshold] = 0
self.mask[self.mask >= threshold] = 255
logger.debug('{0}% of the image is skin'.format(int((100.0/255.0)*numpy.sum(self.mask)/(self.mask.size))))
return self.mask
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'Will Brennan'
# Built-in Modules
import logging
# Standard Modules
import cv2
import numpy
# Custom Modules
import mean_color
logger = logging.getLogger('main')
class SuperContour(object):
def __init__(self, width=32):
self.width = width
def grid_contours(self, frame, contours, heir):
result_cont, result_heir, result_rois = [], [], []
logger.debug('segmenting contours with grid')
for contour in contours:
msk = numpy.zeros(frame.shape, dtype=frame.dtype)
cv2.drawContours(msk, [contour], -1, 255, -1)
bbox = cv2.boundingRect(contour)
w0, h0 = bbox[0]//self.width, bbox[1]//self.width
n_w = max(1, ((bbox[0]+bbox[2])//self.width) - w0)
n_h = max(1, ((bbox[1]+bbox[3])//self.width) - h0)
for i in range(n_w):
for j in range(n_h):
grid_msk = numpy.zeros(frame.shape, dtype=frame.dtype)
grid_box = numpy.array([[(w0+i)*self.width, (h0+j)*self.width],
[(w0+i+1)*self.width, (h0+j)*self.width],
[(w0+i)*self.width, (h0+j+1)*self.width],
[(w0+i+1)*self.width, (h0+j+1)*self.width]],
dtype=numpy.uint8)
cv2.drawContours(grid_msk, [grid_box], -1, 255, -1)
grid_msk = cv2.bitwise_and(grid_msk, msk)
result_cont.append(grid_msk)
# todo: work out stats of new contour!
# todo: mix grid with contours to form super pixels!
contours = result_cont
logger.debug('checking and removing overlap...')
msk_all = numpy.zeros(frame.shape[:2], dtype=frame.dtype)
for msk in contours:
msk = cv2.bitwise_and(msk, cv2.bitwise_not(msk_all))
if msk.sum() != 0:
result_cont.append(msk)
msk_all = numpy.min(255, cv2.add(msk_all, msk))
logger.debug('grid contours complete')
contours = result_cont
return contours, heir, rois
def process(self, frame):
frame_gry = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
contours, heir = cv2.findContours(frame_gry, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
contours, heir, rois = self.grid_contours(frame, contours, heir)
for i in range(len(contours)):
roi, contour = rois[i], contours[i]
mask = numpy.zeros(frame.shape[:2], dtype=frame.dtype)
cv2.drawContours(mask, [contour], -1, 255, -1)
yield roi, contour
\ No newline at end of file
...@@ -4,22 +4,41 @@ __author__ = 'Will Brennan' ...@@ -4,22 +4,41 @@ __author__ = 'Will Brennan'
# Built-in Modules # Built-in Modules
import argparse
import logging import logging
# Standard Modules # Standard Modules
import cv2 import cv2
import numpy
# Custom Modules # Custom Modules
import main
import scripts import scripts
from SkinDetector import SkinDetector
if __name__ == '__main__': if __name__ == '__main__':
args = scripts.get_args(from_file=False) parser = argparse.ArgumentParser(description=__doc__)
logger = scripts.get_logger(quite=args.quite, debug=args.debug) parser.add_argument('image_paths', type=str, nargs='+', help="paths to one or more images or image directories")
parser.add_argument('-b', '--debug', dest='debug', action='store_true', help='enable debug logging')
parser.add_argument('-q', '--quite', dest='quite', action='store_true', help='disable all logging')
parser.add_argument('-t', '--thresh', dest='thresh', default=0.5, type=float, help='threshold for skin mask')
args = parser.parse_args()
logger = logging.getLogger('main')
if not args.quite:
if args.debug:
level = logging.DEBUG
else:
level = logging.INFO
ch = logging.StreamHandler()
ch.setLevel(level=level)
formatter = logging.Formatter('%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
detector = SkinDetector(args)
cam = cv2.VideoCapture(0) cam = cv2.VideoCapture(0)
while True: while True:
ret, img_col = cam.read() ret, img_col = cam.read()
img_msk = main.process(img_col, args=args) img_msk = detector.process(img_col)
if not args.display: if not args.display:
scripts.display('img_col', img_col) scripts.display('img_col', img_col)
scripts.display('img_msk', img_msk) scripts.display('img_msk', img_msk)
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'Will Brennan'
# Built-in Modules
# Standard Modules
# Custom Modules
from main import *
import scripts
\ No newline at end of file
#!/usr/bin/env python
"""Bootstrap setuptools installation
To use setuptools in your package's setup.py, include this
file in the same directory and add this to the top of your setup.py::
from ez_setup import use_setuptools
use_setuptools()
To require a specific version of setuptools, set a download
mirror, or use an alternate download directory, simply supply
the appropriate options to ``use_setuptools()``.
This file can also be run as a script to install or upgrade setuptools.
"""
import os
import shutil
import sys
import tempfile
import zipfile
import optparse
import subprocess
import platform
import textwrap
import contextlib
from distutils import log
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
try:
from site import USER_SITE
except ImportError:
USER_SITE = None
DEFAULT_VERSION = "7.0"
DEFAULT_URL = "https://pypi.python.org/packages/source/s/setuptools/"
def _python_cmd(*args):
"""
Return True if the command succeeded.
"""
args = (sys.executable,) + args
return subprocess.call(args) == 0
def _install(archive_filename, install_args=()):
with archive_context(archive_filename):
# installing
log.warn('Installing Setuptools')
if not _python_cmd('setup.py', 'install', *install_args):
log.warn('Something went wrong during the installation.')
log.warn('See the error message above.')
# exitcode will be 2
return 2
def _build_egg(egg, archive_filename, to_dir):
with archive_context(archive_filename):
# building an egg
log.warn('Building a Setuptools egg in %s', to_dir)
_python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir)
# returning the result
log.warn(egg)
if not os.path.exists(egg):
raise IOError('Could not build the egg.')
class ContextualZipFile(zipfile.ZipFile):
"""
Supplement ZipFile class to support context manager for Python 2.6
"""
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def __new__(cls, *args, **kwargs):
"""
Construct a ZipFile or ContextualZipFile as appropriate
"""
if hasattr(zipfile.ZipFile, '__exit__'):
return zipfile.ZipFile(*args, **kwargs)
return super(ContextualZipFile, cls).__new__(cls)
@contextlib.contextmanager
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def _do_download(version, download_base, to_dir, download_delay):
egg = os.path.join(to_dir, 'setuptools-%s-py%d.%d.egg'
% (version, sys.version_info[0], sys.version_info[1]))
if not os.path.exists(egg):
archive = download_setuptools(version, download_base,
to_dir, download_delay)
_build_egg(egg, archive, to_dir)
sys.path.insert(0, egg)
# Remove previously-imported pkg_resources if present (see
# https://bitbucket.org/pypa/setuptools/pull-request/7/ for details).
if 'pkg_resources' in sys.modules:
del sys.modules['pkg_resources']
import setuptools
setuptools.bootstrap_install_from = egg
def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
to_dir=os.curdir, download_delay=15):
to_dir = os.path.abspath(to_dir)
rep_modules = 'pkg_resources', 'setuptools'
imported = set(sys.modules).intersection(rep_modules)
try:
import pkg_resources
except ImportError:
return _do_download(version, download_base, to_dir, download_delay)
try:
pkg_resources.require("setuptools>=" + version)
return
except pkg_resources.DistributionNotFound:
return _do_download(version, download_base, to_dir, download_delay)
except pkg_resources.VersionConflict as VC_err:
if imported:
msg = textwrap.dedent("""
The required version of setuptools (>={version}) is not available,
and can't be installed while this script is running. Please
install a more recent version first, using
'easy_install -U setuptools'.
(Currently using {VC_err.args[0]!r})
""").format(VC_err=VC_err, version=version)
sys.stderr.write(msg)
sys.exit(2)
# otherwise, reload ok
del pkg_resources, sys.modules['pkg_resources']
return _do_download(version, download_base, to_dir, download_delay)
def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
def download_file_powershell(url, target):
"""
Download the file at url to target using Powershell (which will validate
trust). Raise an exception if the command cannot complete.
"""
target = os.path.abspath(target)
ps_cmd = (
"[System.Net.WebRequest]::DefaultWebProxy.Credentials = "
"[System.Net.CredentialCache]::DefaultCredentials; "
"(new-object System.Net.WebClient).DownloadFile(%(url)r, %(target)r)"
% vars()
)
cmd = [
'powershell',
'-Command',
ps_cmd,
]
_clean_check(cmd, target)
def has_powershell():
if platform.system() != 'Windows':
return False
cmd = ['powershell', '-Command', 'echo test']
with open(os.path.devnull, 'wb') as devnull:
try:
subprocess.check_call(cmd, stdout=devnull, stderr=devnull)
except Exception:
return False
return True
download_file_powershell.viable = has_powershell
def download_file_curl(url, target):
cmd = ['curl', url, '--silent', '--output', target]
_clean_check(cmd, target)
def has_curl():
cmd = ['curl', '--version']
with open(os.path.devnull, 'wb') as devnull:
try:
subprocess.check_call(cmd, stdout=devnull, stderr=devnull)
except Exception:
return False
return True
download_file_curl.viable = has_curl
def download_file_wget(url, target):
cmd = ['wget', url, '--quiet', '--output-document', target]
_clean_check(cmd, target)
def has_wget():
cmd = ['wget', '--version']
with open(os.path.devnull, 'wb') as devnull:
try:
subprocess.check_call(cmd, stdout=devnull, stderr=devnull)
except Exception:
return False
return True
download_file_wget.viable = has_wget
def download_file_insecure(url, target):
"""
Use Python to download the file, even though it cannot authenticate the
connection.
"""
src = urlopen(url)
try:
# Read all the data in one block.
data = src.read()
finally:
src.close()
# Write all the data in one block to avoid creating a partial file.
with open(target, "wb") as dst:
dst.write(data)
download_file_insecure.viable = lambda: True
def get_best_downloader():
downloaders = (
download_file_powershell,
download_file_curl,
download_file_wget,
download_file_insecure,
)
viable_downloaders = (dl for dl in downloaders if dl.viable())
return next(viable_downloaders, None)
def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
to_dir=os.curdir, delay=15, downloader_factory=get_best_downloader):
"""
Download setuptools from a specified location and return its filename
`version` should be a valid setuptools version number that is available
as an sdist for download under the `download_base` URL (which should end
with a '/'). `to_dir` is the directory where the egg will be downloaded.
`delay` is the number of seconds to pause before an actual download
attempt.
``downloader_factory`` should be a function taking no arguments and
returning a function for downloading a URL to a target.
"""
# making sure we use the absolute path
to_dir = os.path.abspath(to_dir)
zip_name = "setuptools-%s.zip" % version
url = download_base + zip_name
saveto = os.path.join(to_dir, zip_name)
if not os.path.exists(saveto): # Avoid repeated downloads
log.warn("Downloading %s", url)
downloader = downloader_factory()
downloader(url, saveto)
return os.path.realpath(saveto)
def _build_install_args(options):
"""
Build the arguments to 'python setup.py install' on the setuptools package
"""
return ['--user'] if options.user_install else []
def _parse_args():
"""
Parse the command line for options
"""
parser = optparse.OptionParser()
parser.add_option(
'--user', dest='user_install', action='store_true', default=False,
help='install in user site package (requires Python 2.6 or later)')
parser.add_option(
'--download-base', dest='download_base', metavar="URL",
default=DEFAULT_URL,
help='alternative URL from where to download the setuptools package')
parser.add_option(
'--insecure', dest='downloader_factory', action='store_const',
const=lambda: download_file_insecure, default=get_best_downloader,
help='Use internal, non-validating downloader'
)
parser.add_option(
'--version', help="Specify which version to download",
default=DEFAULT_VERSION,
)
options, args = parser.parse_args()
# positional arguments are ignored
return options
def main():
"""Install or upgrade setuptools and EasyInstall"""
options = _parse_args()
archive = download_setuptools(
version=options.version,
download_base=options.download_base,
downloader_factory=options.downloader_factory,
)
return _install(archive, _build_install_args(options))
if __name__ == '__main__':
sys.exit(main())
...@@ -4,7 +4,7 @@ __author__ = 'Will Brennan' ...@@ -4,7 +4,7 @@ __author__ = 'Will Brennan'
# Built-in Modules # Built-in Modules
import time import os
import argparse import argparse
import logging import logging
# Standard Modules # Standard Modules
...@@ -12,181 +12,63 @@ import cv2 ...@@ -12,181 +12,63 @@ import cv2
import numpy import numpy
# Custom Modules # Custom Modules
import scripts import scripts
import SpeedySuperPixels from SkinDetector import SkinDetector
logger = logging.getLogger('main') logger = logging.getLogger('main')
class SkinDetector(object): def find_images(path, recursive=False, ignore=True):
def __init__(self, args): if os.path.exists(path):
assert isinstance(args, argparse.Namespace), 'args must be of type argparse.Namespace' yield path
self.args = args elif os.path.isdir(path):
self.mask = None assert os.path.isdir(path), 'FileIO - get_images: Directory does not exist'
logger.debug('SkinDetector initialised') assert isinstance(recursive, bool), 'FileIO - get_images: recursive must be a boolean variable'
ext, result = ['png', 'jpg', 'jpeg'], []
@staticmethod for path_a in os.listdir(path):
def assert_image(img, grey=False): path_a = path + '/' +path_a
logger.debug('Applying assertions...') if os.path.isdir(path_a) and recursive:
depth = 3 for path_b in find_images(path_a):
if grey: yield path_b
depth = 2 check_a = path_a.split('.')[-1] in ext
assert isinstance(img, numpy.ndarray), 'image must be a numpy array' check_b = ignore or ('-' not in path_a.split('/')[-1])
assert len(img.shape) == depth, 'skin detection can only work on color images' if check_a and check_b:
assert img.size > 100, 'seriously... you thought this would work?' yield path_a
def get_mask_hsv(self, img):
logger.debug('Applying hsv threshold')
self.assert_image(img)
lower_thresh = numpy.array([0, 50, 0], dtype=numpy.uint8)
upper_thresh = numpy.array([120, 150, 255], dtype=numpy.uint8)
img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
msk_hsv = cv2.inRange(img_hsv, lower_thresh, upper_thresh)
if self.args.debug:
scripts.display('input', img)
scripts.display('mask_hsv', msk_hsv)
self.add_mask(msk_hsv)
def get_mask_rgb(self, img):
logger.debug('Applying rgb thresholds')
lower_thresh = numpy.array([45, 52, 108], dtype=numpy.uint8)
upper_thresh = numpy.array([255, 255, 255], dtype=numpy.uint8)
mask_a = cv2.inRange(img, lower_thresh, upper_thresh)
mask_b = 255*((img[:, :, 2]-img[:, :, 1])/20)
logger.debug('mask_b unique: {0}'.format(numpy.unique(mask_b)))
mask_c = 255*((numpy.max(img, axis=2)-numpy.min(img, axis=2))/20)
logger.debug('mask_d unique: {0}'.format(numpy.unique(mask_c)))
msk_rgb = cv2.bitwise_and(mask_a, mask_b)
msk_rgb = cv2.bitwise_and(mask_c, msk_rgb)
if self.args.debug:
scripts.display('input', img)
scripts.display('mask_rgb', msk_rgb)
self.add_mask(msk_rgb)
def get_mask_ycrcb(self, img):
self.assert_image(img)
lower_thresh = numpy.array([90, 100, 130], dtype=numpy.uint8)
upper_thresh = numpy.array([230, 120, 180], dtype=numpy.uint8)
img_ycrcb = cv2.cvtColor(img, cv2.COLOR_RGB2YCR_CB)
msk_ycrcb = cv2.inRange(img_ycrcb, lower_thresh, upper_thresh)
if self.args.debug:
scripts.display('input', img)
scripts.display('mask_ycrcb', msk_ycrcb)
self.add_mask(msk_ycrcb)
def grab_cut_mask(self, img_col, mask):
kernel = numpy.ones((50, 50), numpy.float32)/(50*50)
dst = cv2.filter2D(mask, -1, kernel)
dst[dst != 0] = 255
free = numpy.array(cv2.bitwise_not(dst), dtype=numpy.uint8)
if self.args.debug:
cv2.imshow('not skin', free)
cv2.imshow('grabcut input', mask)
grab_mask = numpy.zeros(mask.shape, dtype=numpy.uint8)
grab_mask[:, :] = 2
grab_mask[mask == 255] = 1
grab_mask[free == 255] = 0
print numpy.unique(grab_mask)
if numpy.unique(grab_mask).tolist() == [0, 1]:
logger.debug('conducting grabcut')
bgdModel = numpy.zeros((1, 65), numpy.float64)
fgdModel = numpy.zeros((1, 65), numpy.float64)
if img_col.size != 0:
mask, bgdModel, fgdModel = cv2.grabCut(img_col, grab_mask, None, bgdModel, fgdModel, 5, cv2.GC_INIT_WITH_MASK)
mask = numpy.where((mask == 2) | (mask == 0), 0, 1).astype('uint8')
else: else:
logger.warning('img_col is empty') raise ValueError('error! path is not a valid path or directory')
return mask
@staticmethod
def closing(msk):
assert isinstance(msk, numpy.ndarray), 'msk must be a numpy array'
assert msk.ndim == 2, 'msk must be a greyscale image'
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
msk = cv2.morphologyEx(msk, cv2.MORPH_CLOSE, kernel)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
msk = cv2.morphologyEx(msk, cv2.MORPH_OPEN, kernel, iterations=2)
return msk
def process(self, img):
logger.debug('Initialising process')
dt = time.time()
self.assert_image(img)
logger.debug('Generating mean-color image')
#img = mean_color.img_mean(img)
logger.debug('Conducting thresholding')
self.n_mask = 0
self.mask = numpy.zeros(img.shape[:2], dtype=numpy.uint8)
#self.get_mask_hsv(img)
self.get_mask_rgb(img)
#self.get_mask_ycrcb(img)
logger.debug('Thresholding sum of masks')
self.threshold(self.args.thresh)
if self.args.debug:
cv2.imshow('skin_mask', self.mask)
cv2.imshow('input_img', img)
dt = round(time.time()-dt, 2)
hz = round(1/dt, 2)
logger.debug('Conducted processing in {0}s ({1}Hz)'.format(dt, hz))
self.mask = self.closing(self.mask)
self.mask = self.grab_cut_mask(img, self.mask)
return self.mask
def add_mask(self, img):
logger.debug('normalising mask')
self.assert_image(img, grey=True)
img[img < 128] = 0
img[img >= 128] = 1
logger.debug('normalisation complete')
logger.debug('adding mask to total mask')
self.mask += img
self.n_mask += 1
logger.debug('add mask complete')
def threshold(self, threshold):
assert isinstance(threshold, float), 'threshold must be a float (current type - {0})'.format(type(threshold))
assert 0 <= threshold <= 1, 'threshold must be between 0 & 1 (current value - {0})'.format(threshold)
assert self.n_mask > 0, 'Number of masks must be greater than 0 [n_mask ({0}) = {1}]'.format(type(self.n_mask), self.n_mask)
logger.debug('Threshold Value - {0}%'.format(int(100*threshold)))
logger.debug('Number of Masks - {0}'.format(self.n_mask))
self.mask /= self.n_mask
self.mask[self.mask < threshold] = 0
self.mask[self.mask >= threshold] = 255
logger.debug('{0}% of the image is skin'.format(int((100.0/255.0)*numpy.sum(self.mask)/(self.mask.size))))
return self.mask
if __name__ == '__main__':
def process(image, save=False, display=False, args=None, segment=False): parser = argparse.ArgumentParser(description=__doc__)
assert isinstance(image, numpy.ndarray) parser.add_argument('image_paths', type=str, nargs='+', help="paths to one or more images or image directories")
if not args: parser.add_argument('-b', '--debug', dest='debug', action='store_true', help='enable debug logging')
args = scripts.gen_args() parser.add_argument('-q', '--quite', dest='quite', action='store_true', help='disable all logging')
else: parser.add_argument('-d', '--display', dest='display', action='store_true', help="display result")
assert isinstance(args, argparse.Namespace), 'args must be an argparse.Namespace' parser.add_argument('-s', '--save', dest='save', action='store_true', help="save result to file")
args.save = save parser.add_argument('-t', '--thresh', dest='thresh', default=0.5, type=float, help='threshold for skin mask')
args.display = display args = parser.parse_args()
detector = SkinDetector(args)
if segment: logger = logging.getLogger('main')
slic = SpeedySuperPixels.SuperContour() if not args.quite:
skin = numpy.zeros(image.shape, dtype=image.dtype) if args.debug:
for roi, contour in slic.process(image): level = logging.DEBUG
pxl = cv2.bitwise_and(image, image, mask=contour)
msk = detector.process(pxl)
ret = msk.sum()/contour.sum()
if ret > 0.8:
skin = numpy.min(255, cv2.add(skin, contour))
return skin
else: else:
return detector.process(image) level = logging.INFO
ch = logging.StreamHandler()
ch.setLevel(level=level)
formatter = logging.Formatter('%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
detector = SkinDetector(args)
if __name__ == '__main__': for image_arg in args.image_paths:
args = scripts.get_args() for image_path in find_images(image_arg):
logger = scripts.get_logger(quite=args.quite, debug=args.debug) logging.info("loading image from {0}".format(image_path))
args.image_paths = scripts.find_images(args.image_paths[0])
for image_path in args.image_paths:
img_col = cv2.imread(image_path, 1) img_col = cv2.imread(image_path, 1)
img_msk = process(img_col, args=args)
if not args.display: img_msk = detector.process(img_col)
if args.display:
scripts.display('img_col', img_col) scripts.display('img_col', img_col)
scripts.display('img_msk', img_msk) scripts.display('img_msk', img_msk)
scripts.display('img_skn', cv2.bitwise_and(img_col, img_col, mask=img_msk)) scripts.display('img_skn', cv2.bitwise_and(img_col, img_col, mask=img_msk))
cv2.waitKey(1) cv2.waitKey(0)
\ No newline at end of file \ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'Will Brennan'
# Built-in modules
import time
import logging
# Standard modules
import numpy
import sklearn.utils
import sklearn.cluster as cluster
# Custom modules
logger = logging.getLogger('main')
def img_mean(frame, n_clusters=64):
result = frame.copy()
flat_frame = frame.reshape(-1, 3)
frame = sklearn.utils.shuffle(flat_frame)[:min(1000, frame.shape[0]*frame.shape[1])]
logger.debug('frame shape: {0}'.format(frame.shape))
logger.debug('starting training...')
t0 = time.time()
kmeans = cluster.KMeans(n_clusters=n_clusters, random_state=0).fit(frame)
logger.debug('training took {0}s'.format(round(time.time()-t0, 2)))
lookup = kmeans.predict(flat_frame)
label_idx = 0
for i in range(result.shape[0]):
for j in range(result.shape[1]):
result[i][j] = kmeans.cluster_centers_[lookup[label_idx]]
label_idx += 1
logger.debug('label shape: {0}'.format(result.shape))
result.astype(dtype=numpy.uint8)
return result
\ No newline at end of file
...@@ -4,95 +4,11 @@ __author__ = 'Will Brennan' ...@@ -4,95 +4,11 @@ __author__ = 'Will Brennan'
# Built-in Modules # Built-in Modules
import os
import sys
import argparse
import logging
# Standard Modules # Standard Modules
import cv2 import cv2
import numpy import numpy
# Custom Modules # Custom Modules
def get_logger(level=logging.INFO, quite=False, debug=False, to_file=''):
"""
This function initialises a logger to stdout.
:return: logger
"""
assert level in [logging.DEBUG, logging.INFO, logging.WARNING, logging.CRITICAL]
logger = logging.getLogger('main')
formatter = logging.Formatter('%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
if debug:
level = logging.DEBUG
logger.setLevel(level=level)
if not quite:
if to_file:
fh = logging.FileHandler(to_file)
fh.setLevel(level=level)
fh.setFormatter(formatter)
logger.addHandler(fh)
else:
ch = logging.StreamHandler()
ch.setLevel(level=level)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def get_args(default=None, args_string='', from_file=True):
"""
This function gets the command line arguments and passes any unknown arguments to ALE.
:param default: dictionary of default arguments with keys as `dest`
:return: command line arguments
"""
if not default:
default = {}
parser = argparse.ArgumentParser(description=__doc__)
if from_file:
parser.add_argument('image_paths', type=str, nargs='+', help="Filepath for input images or folder containing images")
parser.add_argument('-n', '--name', dest='name', default='DEFAULT_NAME', type=str, help='Basename of all export files')
parser.add_argument('-b', '--debug', dest='debug', action='store_true', help='Lower logging level to debug')
parser.add_argument('-q', '--quite', dest='quite', action='store_true', help='Disable all logging entirely')
parser.add_argument('-d', '--display', dest='display', action='store_true', help="Display Game while learning and testing")
parser.add_argument('-s', '--save', dest='save', action='store_true', help="If parsed saves the input image and mask with random file name, records name to logger")
parser.add_argument('-t', '--thresh', dest='thresh', default=0.5, type=float, help='thresholding for skin mask')
if args_string:
args_string = args_string.split(' ')
args = parser.parse_args(args_string)
else:
args = parser.parse_args()
return args
def gen_args():
return get_args(args_string='USED_GEN_ARGS')
def find_images(path, recursive=True):
if os.path.isdir(path):
return list(xfind_images(path, recursive=recursive))
elif os.path.exists(path):
return [path]
else:
raise ValueError('path is not a valid path or directory')
def xfind_images(directory, recursive=False, ignore=True):
assert os.path.isdir(directory), 'FileIO - get_images: Directory does not exist'
assert isinstance(recursive, bool), 'FileIO - get_images: recursive must be a boolean variable'
ext, result = ['png', 'jpg', 'jpeg'], []
for path_a in os.listdir(directory):
path_a = directory+'/'+path_a
if os.path.isdir(path_a) and recursive:
for path_b in xfind_images(path_a):
yield path_b
check_a = path_a.split('.')[-1] in ext
check_b = ignore or ('-' not in path_a.split('/')[-1])
if check_a and check_b:
yield path_a
def display(title, img, max_size=200000): def display(title, img, max_size=200000):
assert isinstance(img, numpy.ndarray), 'img must be a numpy array' assert isinstance(img, numpy.ndarray), 'img must be a numpy array'
assert isinstance(title, str), 'title must be a string' assert isinstance(title, str), 'title must be a string'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment