mirror of
https://github.com/iperov/DeepFaceLab.git
synced 2024-03-22 13:10:55 +08:00
846 lines
38 KiB
Python
846 lines
38 KiB
Python
import traceback
|
|
import math
|
|
import multiprocessing
|
|
import operator
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from numpy import linalg as npla
|
|
|
|
import facelib
|
|
from core import imagelib
|
|
from core import mathlib
|
|
from facelib import FaceType, LandmarksProcessor
|
|
from core.interact import interact as io
|
|
from core.joblib import Subprocessor
|
|
from core.leras import nn
|
|
from core import pathex
|
|
from core.cv2ex import *
|
|
from DFLIMG import *
|
|
|
|
DEBUG = False
|
|
|
|
class ExtractSubprocessor(Subprocessor):
|
|
class Data(object):
|
|
def __init__(self, filepath=None, rects=None, landmarks = None, landmarks_accurate=True, manual=False, force_output_path=None, final_output_files = None):
|
|
self.filepath = filepath
|
|
self.rects = rects or []
|
|
self.rects_rotation = 0
|
|
self.landmarks_accurate = landmarks_accurate
|
|
self.manual = manual
|
|
self.landmarks = landmarks or []
|
|
self.force_output_path = force_output_path
|
|
self.final_output_files = final_output_files or []
|
|
self.faces_detected = 0
|
|
|
|
class Cli(Subprocessor.Cli):
|
|
|
|
#override
|
|
def on_initialize(self, client_dict):
|
|
self.type = client_dict['type']
|
|
self.image_size = client_dict['image_size']
|
|
self.jpeg_quality = client_dict['jpeg_quality']
|
|
self.face_type = client_dict['face_type']
|
|
self.max_faces_from_image = client_dict['max_faces_from_image']
|
|
self.device_idx = client_dict['device_idx']
|
|
self.cpu_only = client_dict['device_type'] == 'CPU'
|
|
self.final_output_path = client_dict['final_output_path']
|
|
self.output_debug_path = client_dict['output_debug_path']
|
|
|
|
#transfer and set stdin in order to work code.interact in debug subprocess
|
|
stdin_fd = client_dict['stdin_fd']
|
|
if stdin_fd is not None and DEBUG:
|
|
sys.stdin = os.fdopen(stdin_fd)
|
|
|
|
if self.cpu_only:
|
|
device_config = nn.DeviceConfig.CPU()
|
|
place_model_on_cpu = True
|
|
else:
|
|
device_config = nn.DeviceConfig.GPUIndexes ([self.device_idx])
|
|
place_model_on_cpu = device_config.devices[0].total_mem_gb < 4
|
|
|
|
if self.type == 'all' or 'rects' in self.type or 'landmarks' in self.type:
|
|
nn.initialize (device_config)
|
|
|
|
self.log_info (f"Running on {client_dict['device_name'] }")
|
|
|
|
if self.type == 'all' or self.type == 'rects-s3fd' or 'landmarks' in self.type:
|
|
self.rects_extractor = facelib.S3FDExtractor(place_model_on_cpu=place_model_on_cpu)
|
|
|
|
if self.type == 'all' or 'landmarks' in self.type:
|
|
# for head type, extract "3D landmarks"
|
|
self.landmarks_extractor = facelib.FANExtractor(landmarks_3D=self.face_type >= FaceType.HEAD,
|
|
place_model_on_cpu=place_model_on_cpu)
|
|
|
|
self.cached_image = (None, None)
|
|
|
|
#override
|
|
def process_data(self, data):
|
|
if 'landmarks' in self.type and len(data.rects) == 0:
|
|
return data
|
|
|
|
filepath = data.filepath
|
|
cached_filepath, image = self.cached_image
|
|
if cached_filepath != filepath:
|
|
image = cv2_imread( filepath )
|
|
if image is None:
|
|
self.log_err (f'Failed to open {filepath}, reason: cv2_imread() fail.')
|
|
return data
|
|
image = imagelib.normalize_channels(image, 3)
|
|
image = imagelib.cut_odd_image(image)
|
|
self.cached_image = ( filepath, image )
|
|
|
|
h, w, c = image.shape
|
|
|
|
if 'rects' in self.type or self.type == 'all':
|
|
data = ExtractSubprocessor.Cli.rects_stage (data=data,
|
|
image=image,
|
|
max_faces_from_image=self.max_faces_from_image,
|
|
rects_extractor=self.rects_extractor,
|
|
)
|
|
|
|
if 'landmarks' in self.type or self.type == 'all':
|
|
data = ExtractSubprocessor.Cli.landmarks_stage (data=data,
|
|
image=image,
|
|
landmarks_extractor=self.landmarks_extractor,
|
|
rects_extractor=self.rects_extractor,
|
|
)
|
|
|
|
if self.type == 'final' or self.type == 'all':
|
|
data = ExtractSubprocessor.Cli.final_stage(data=data,
|
|
image=image,
|
|
face_type=self.face_type,
|
|
image_size=self.image_size,
|
|
jpeg_quality=self.jpeg_quality,
|
|
output_debug_path=self.output_debug_path,
|
|
final_output_path=self.final_output_path,
|
|
)
|
|
return data
|
|
|
|
@staticmethod
|
|
def rects_stage(data,
|
|
image,
|
|
max_faces_from_image,
|
|
rects_extractor,
|
|
):
|
|
h,w,c = image.shape
|
|
if min(h,w) < 128:
|
|
# Image is too small
|
|
data.rects = []
|
|
else:
|
|
for rot in ([0, 90, 270, 180]):
|
|
if rot == 0:
|
|
rotated_image = image
|
|
elif rot == 90:
|
|
rotated_image = image.swapaxes( 0,1 )[:,::-1,:]
|
|
elif rot == 180:
|
|
rotated_image = image[::-1,::-1,:]
|
|
elif rot == 270:
|
|
rotated_image = image.swapaxes( 0,1 )[::-1,:,:]
|
|
rects = data.rects = rects_extractor.extract (rotated_image, is_bgr=True)
|
|
if len(rects) != 0:
|
|
data.rects_rotation = rot
|
|
break
|
|
if max_faces_from_image is not None and \
|
|
max_faces_from_image > 0 and \
|
|
len(data.rects) > 0:
|
|
data.rects = data.rects[0:max_faces_from_image]
|
|
return data
|
|
|
|
|
|
@staticmethod
|
|
def landmarks_stage(data,
|
|
image,
|
|
landmarks_extractor,
|
|
rects_extractor,
|
|
):
|
|
h, w, ch = image.shape
|
|
|
|
if data.rects_rotation == 0:
|
|
rotated_image = image
|
|
elif data.rects_rotation == 90:
|
|
rotated_image = image.swapaxes( 0,1 )[:,::-1,:]
|
|
elif data.rects_rotation == 180:
|
|
rotated_image = image[::-1,::-1,:]
|
|
elif data.rects_rotation == 270:
|
|
rotated_image = image.swapaxes( 0,1 )[::-1,:,:]
|
|
|
|
data.landmarks = landmarks_extractor.extract (rotated_image, data.rects, rects_extractor if (data.landmarks_accurate) else None, is_bgr=True)
|
|
if data.rects_rotation != 0:
|
|
for i, (rect, lmrks) in enumerate(zip(data.rects, data.landmarks)):
|
|
new_rect, new_lmrks = rect, lmrks
|
|
(l,t,r,b) = rect
|
|
if data.rects_rotation == 90:
|
|
new_rect = ( t, h-l, b, h-r)
|
|
if lmrks is not None:
|
|
new_lmrks = lmrks[:,::-1].copy()
|
|
new_lmrks[:,1] = h - new_lmrks[:,1]
|
|
elif data.rects_rotation == 180:
|
|
if lmrks is not None:
|
|
new_rect = ( w-l, h-t, w-r, h-b)
|
|
new_lmrks = lmrks.copy()
|
|
new_lmrks[:,0] = w - new_lmrks[:,0]
|
|
new_lmrks[:,1] = h - new_lmrks[:,1]
|
|
elif data.rects_rotation == 270:
|
|
new_rect = ( w-b, l, w-t, r )
|
|
if lmrks is not None:
|
|
new_lmrks = lmrks[:,::-1].copy()
|
|
new_lmrks[:,0] = w - new_lmrks[:,0]
|
|
data.rects[i], data.landmarks[i] = new_rect, new_lmrks
|
|
|
|
return data
|
|
|
|
@staticmethod
|
|
def final_stage(data,
|
|
image,
|
|
face_type,
|
|
image_size,
|
|
jpeg_quality,
|
|
output_debug_path=None,
|
|
final_output_path=None,
|
|
):
|
|
data.final_output_files = []
|
|
filepath = data.filepath
|
|
rects = data.rects
|
|
landmarks = data.landmarks
|
|
|
|
if output_debug_path is not None:
|
|
debug_image = image.copy()
|
|
|
|
face_idx = 0
|
|
for rect, image_landmarks in zip( rects, landmarks ):
|
|
if image_landmarks is None:
|
|
continue
|
|
|
|
rect = np.array(rect)
|
|
|
|
if face_type == FaceType.MARK_ONLY:
|
|
image_to_face_mat = None
|
|
face_image = image
|
|
face_image_landmarks = image_landmarks
|
|
else:
|
|
image_to_face_mat = LandmarksProcessor.get_transform_mat (image_landmarks, image_size, face_type)
|
|
|
|
face_image = cv2.warpAffine(image, image_to_face_mat, (image_size, image_size), cv2.INTER_LANCZOS4)
|
|
face_image_landmarks = LandmarksProcessor.transform_points (image_landmarks, image_to_face_mat)
|
|
|
|
landmarks_bbox = LandmarksProcessor.transform_points ( [ (0,0), (0,image_size-1), (image_size-1, image_size-1), (image_size-1,0) ], image_to_face_mat, True)
|
|
|
|
rect_area = mathlib.polygon_area(np.array(rect[[0,2,2,0]]).astype(np.float32), np.array(rect[[1,1,3,3]]).astype(np.float32))
|
|
landmarks_area = mathlib.polygon_area(landmarks_bbox[:,0].astype(np.float32), landmarks_bbox[:,1].astype(np.float32) )
|
|
|
|
if not data.manual and face_type <= FaceType.FULL_NO_ALIGN and landmarks_area > 4*rect_area: #get rid of faces which umeyama-landmark-area > 4*detector-rect-area
|
|
continue
|
|
|
|
if output_debug_path is not None:
|
|
LandmarksProcessor.draw_rect_landmarks (debug_image, rect, image_landmarks, face_type, image_size, transparent_mask=True)
|
|
|
|
output_path = final_output_path
|
|
if data.force_output_path is not None:
|
|
output_path = data.force_output_path
|
|
|
|
output_filepath = output_path / f"{filepath.stem}_{face_idx}.jpg"
|
|
cv2_imwrite(output_filepath, face_image, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality ] )
|
|
|
|
dflimg = DFLJPG.load(output_filepath)
|
|
dflimg.set_face_type(FaceType.toString(face_type))
|
|
dflimg.set_landmarks(face_image_landmarks.tolist())
|
|
dflimg.set_source_filename(filepath.name)
|
|
dflimg.set_source_rect(rect)
|
|
dflimg.set_source_landmarks(image_landmarks.tolist())
|
|
dflimg.set_image_to_face_mat(image_to_face_mat)
|
|
dflimg.save()
|
|
|
|
data.final_output_files.append (output_filepath)
|
|
face_idx += 1
|
|
data.faces_detected = face_idx
|
|
|
|
if output_debug_path is not None:
|
|
cv2_imwrite( output_debug_path / (filepath.stem+'.jpg'), debug_image, [int(cv2.IMWRITE_JPEG_QUALITY), 50] )
|
|
|
|
return data
|
|
|
|
#overridable
|
|
def get_data_name (self, data):
|
|
#return string identificator of your data
|
|
return data.filepath
|
|
|
|
@staticmethod
|
|
def get_devices_for_config (type, device_config):
|
|
devices = device_config.devices
|
|
cpu_only = len(devices) == 0
|
|
|
|
if 'rects' in type or \
|
|
'landmarks' in type or \
|
|
'all' in type:
|
|
|
|
if not cpu_only:
|
|
if type == 'landmarks-manual':
|
|
devices = [devices.get_best_device()]
|
|
|
|
result = []
|
|
|
|
for device in devices:
|
|
count = 1
|
|
|
|
if count == 1:
|
|
result += [ (device.index, 'GPU', device.name, device.total_mem_gb) ]
|
|
else:
|
|
for i in range(count):
|
|
result += [ (device.index, 'GPU', f"{device.name} #{i}", device.total_mem_gb) ]
|
|
|
|
return result
|
|
else:
|
|
if type == 'landmarks-manual':
|
|
return [ (0, 'CPU', 'CPU', 0 ) ]
|
|
else:
|
|
return [ (i, 'CPU', 'CPU%d' % (i), 0 ) for i in range( min(8, multiprocessing.cpu_count() // 2) ) ]
|
|
|
|
elif type == 'final':
|
|
return [ (i, 'CPU', 'CPU%d' % (i), 0 ) for i in (range(min(8, multiprocessing.cpu_count())) if not DEBUG else [0]) ]
|
|
|
|
def __init__(self, input_data, type, image_size=None, jpeg_quality=None, face_type=None, output_debug_path=None, manual_window_size=0, max_faces_from_image=0, final_output_path=None, device_config=None):
|
|
if type == 'landmarks-manual':
|
|
for x in input_data:
|
|
x.manual = True
|
|
|
|
self.input_data = input_data
|
|
|
|
self.type = type
|
|
self.image_size = image_size
|
|
self.jpeg_quality = jpeg_quality
|
|
self.face_type = face_type
|
|
self.output_debug_path = output_debug_path
|
|
self.final_output_path = final_output_path
|
|
self.manual_window_size = manual_window_size
|
|
self.max_faces_from_image = max_faces_from_image
|
|
self.result = []
|
|
|
|
self.devices = ExtractSubprocessor.get_devices_for_config(self.type, device_config)
|
|
|
|
super().__init__('Extractor', ExtractSubprocessor.Cli,
|
|
999999 if type == 'landmarks-manual' or DEBUG else 120)
|
|
|
|
#override
|
|
def on_clients_initialized(self):
|
|
if self.type == 'landmarks-manual':
|
|
self.wnd_name = 'Manual pass'
|
|
io.named_window(self.wnd_name)
|
|
io.capture_mouse(self.wnd_name)
|
|
io.capture_keys(self.wnd_name)
|
|
|
|
self.cache_original_image = (None, None)
|
|
self.cache_image = (None, None)
|
|
self.cache_text_lines_img = (None, None)
|
|
self.hide_help = False
|
|
self.landmarks_accurate = True
|
|
self.force_landmarks = False
|
|
|
|
self.landmarks = None
|
|
self.x = 0
|
|
self.y = 0
|
|
self.rect_size = 100
|
|
self.rect_locked = False
|
|
self.extract_needed = True
|
|
|
|
self.image = None
|
|
self.image_filepath = None
|
|
|
|
io.progress_bar (None, len (self.input_data))
|
|
|
|
#override
|
|
def on_clients_finalized(self):
|
|
if self.type == 'landmarks-manual':
|
|
io.destroy_all_windows()
|
|
|
|
io.progress_bar_close()
|
|
|
|
#override
|
|
def process_info_generator(self):
|
|
base_dict = {'type' : self.type,
|
|
'image_size': self.image_size,
|
|
'jpeg_quality' : self.jpeg_quality,
|
|
'face_type': self.face_type,
|
|
'max_faces_from_image':self.max_faces_from_image,
|
|
'output_debug_path': self.output_debug_path,
|
|
'final_output_path': self.final_output_path,
|
|
'stdin_fd': sys.stdin.fileno() }
|
|
|
|
|
|
for (device_idx, device_type, device_name, device_total_vram_gb) in self.devices:
|
|
client_dict = base_dict.copy()
|
|
client_dict['device_idx'] = device_idx
|
|
client_dict['device_name'] = device_name
|
|
client_dict['device_type'] = device_type
|
|
yield client_dict['device_name'], {}, client_dict
|
|
|
|
#override
|
|
def get_data(self, host_dict):
|
|
if self.type == 'landmarks-manual':
|
|
need_remark_face = False
|
|
while len (self.input_data) > 0:
|
|
data = self.input_data[0]
|
|
filepath, data_rects, data_landmarks = data.filepath, data.rects, data.landmarks
|
|
is_frame_done = False
|
|
|
|
if self.image_filepath != filepath:
|
|
self.image_filepath = filepath
|
|
if self.cache_original_image[0] == filepath:
|
|
self.original_image = self.cache_original_image[1]
|
|
else:
|
|
self.original_image = imagelib.normalize_channels( cv2_imread( filepath ), 3 )
|
|
|
|
self.cache_original_image = (filepath, self.original_image )
|
|
|
|
(h,w,c) = self.original_image.shape
|
|
self.view_scale = 1.0 if self.manual_window_size == 0 else self.manual_window_size / ( h * (16.0/9.0) )
|
|
|
|
if self.cache_image[0] == (h,w,c) + (self.view_scale,filepath):
|
|
self.image = self.cache_image[1]
|
|
else:
|
|
self.image = cv2.resize (self.original_image, ( int(w*self.view_scale), int(h*self.view_scale) ), interpolation=cv2.INTER_LINEAR)
|
|
self.cache_image = ( (h,w,c) + (self.view_scale,filepath), self.image )
|
|
|
|
(h,w,c) = self.image.shape
|
|
|
|
sh = (0,0, w, min(100, h) )
|
|
if self.cache_text_lines_img[0] == sh:
|
|
self.text_lines_img = self.cache_text_lines_img[1]
|
|
else:
|
|
self.text_lines_img = (imagelib.get_draw_text_lines ( self.image, sh,
|
|
[ '[L Mouse click] - lock/unlock selection. [Mouse wheel] - change rect',
|
|
'[R Mouse Click] - manual face rectangle',
|
|
'[Enter] / [Space] - confirm / skip frame',
|
|
'[,] [.]- prev frame, next frame. [Q] - skip remaining frames',
|
|
'[a] - accuracy on/off (more fps)',
|
|
'[h] - hide this help'
|
|
], (1, 1, 1) )*255).astype(np.uint8)
|
|
|
|
self.cache_text_lines_img = (sh, self.text_lines_img)
|
|
|
|
if need_remark_face: # need remark image from input data that already has a marked face?
|
|
need_remark_face = False
|
|
if len(data_rects) != 0: # If there was already a face then lock the rectangle to it until the mouse is clicked
|
|
self.rect = data_rects.pop()
|
|
self.landmarks = data_landmarks.pop()
|
|
data_rects.clear()
|
|
data_landmarks.clear()
|
|
|
|
self.rect_locked = True
|
|
self.rect_size = ( self.rect[2] - self.rect[0] ) / 2
|
|
self.x = ( self.rect[0] + self.rect[2] ) / 2
|
|
self.y = ( self.rect[1] + self.rect[3] ) / 2
|
|
self.redraw()
|
|
|
|
if len(data_rects) == 0:
|
|
(h,w,c) = self.image.shape
|
|
while True:
|
|
io.process_messages(0.0001)
|
|
|
|
if not self.force_landmarks:
|
|
new_x = self.x
|
|
new_y = self.y
|
|
|
|
new_rect_size = self.rect_size
|
|
|
|
mouse_events = io.get_mouse_events(self.wnd_name)
|
|
for ev in mouse_events:
|
|
(x, y, ev, flags) = ev
|
|
if ev == io.EVENT_MOUSEWHEEL and not self.rect_locked:
|
|
mod = 1 if flags > 0 else -1
|
|
diff = 1 if new_rect_size <= 40 else np.clip(new_rect_size / 10, 1, 10)
|
|
new_rect_size = max (5, new_rect_size + diff*mod)
|
|
elif ev == io.EVENT_LBUTTONDOWN:
|
|
if self.force_landmarks:
|
|
self.x = new_x
|
|
self.y = new_y
|
|
self.force_landmarks = False
|
|
self.rect_locked = True
|
|
self.redraw()
|
|
else:
|
|
self.rect_locked = not self.rect_locked
|
|
self.extract_needed = True
|
|
elif ev == io.EVENT_RBUTTONDOWN:
|
|
self.force_landmarks = not self.force_landmarks
|
|
if self.force_landmarks:
|
|
self.rect_locked = False
|
|
elif not self.rect_locked:
|
|
new_x = np.clip (x, 0, w-1) / self.view_scale
|
|
new_y = np.clip (y, 0, h-1) / self.view_scale
|
|
|
|
key_events = io.get_key_events(self.wnd_name)
|
|
key, chr_key, ctrl_pressed, alt_pressed, shift_pressed = key_events[-1] if len(key_events) > 0 else (0,0,False,False,False)
|
|
|
|
if key == ord('\r') or key == ord('\n'):
|
|
#confirm frame
|
|
is_frame_done = True
|
|
data_rects.append (self.rect)
|
|
data_landmarks.append (self.landmarks)
|
|
break
|
|
elif key == ord(' '):
|
|
#confirm skip frame
|
|
is_frame_done = True
|
|
break
|
|
elif key == ord(',') and len(self.result) > 0:
|
|
#go prev frame
|
|
|
|
if self.rect_locked:
|
|
self.rect_locked = False
|
|
# Only save the face if the rect is still locked
|
|
data_rects.append (self.rect)
|
|
data_landmarks.append (self.landmarks)
|
|
|
|
|
|
self.input_data.insert(0, self.result.pop() )
|
|
io.progress_bar_inc(-1)
|
|
need_remark_face = True
|
|
|
|
break
|
|
elif key == ord('.'):
|
|
#go next frame
|
|
|
|
if self.rect_locked:
|
|
self.rect_locked = False
|
|
# Only save the face if the rect is still locked
|
|
data_rects.append (self.rect)
|
|
data_landmarks.append (self.landmarks)
|
|
|
|
need_remark_face = True
|
|
is_frame_done = True
|
|
break
|
|
elif key == ord('q'):
|
|
#skip remaining
|
|
|
|
if self.rect_locked:
|
|
self.rect_locked = False
|
|
data_rects.append (self.rect)
|
|
data_landmarks.append (self.landmarks)
|
|
|
|
while len(self.input_data) > 0:
|
|
self.result.append( self.input_data.pop(0) )
|
|
io.progress_bar_inc(1)
|
|
|
|
break
|
|
|
|
elif key == ord('h'):
|
|
self.hide_help = not self.hide_help
|
|
break
|
|
elif key == ord('a'):
|
|
self.landmarks_accurate = not self.landmarks_accurate
|
|
break
|
|
|
|
if self.force_landmarks:
|
|
pt2 = np.float32([new_x, new_y])
|
|
pt1 = np.float32([self.x, self.y])
|
|
|
|
pt_vec_len = npla.norm(pt2-pt1)
|
|
pt_vec = pt2-pt1
|
|
if pt_vec_len != 0:
|
|
pt_vec /= pt_vec_len
|
|
|
|
self.rect_size = pt_vec_len
|
|
self.rect = ( int(self.x-self.rect_size),
|
|
int(self.y-self.rect_size),
|
|
int(self.x+self.rect_size),
|
|
int(self.y+self.rect_size) )
|
|
|
|
if pt_vec_len > 0:
|
|
lmrks = np.concatenate ( (np.zeros ((17,2), np.float32), LandmarksProcessor.landmarks_2D), axis=0 )
|
|
lmrks -= lmrks[30:31,:]
|
|
mat = cv2.getRotationMatrix2D( (0, 0), -np.arctan2( pt_vec[1], pt_vec[0] )*180/math.pi , pt_vec_len)
|
|
mat[:, 2] += (self.x, self.y)
|
|
self.landmarks = LandmarksProcessor.transform_points(lmrks, mat )
|
|
|
|
|
|
self.redraw()
|
|
|
|
elif self.x != new_x or \
|
|
self.y != new_y or \
|
|
self.rect_size != new_rect_size or \
|
|
self.extract_needed:
|
|
self.x = new_x
|
|
self.y = new_y
|
|
self.rect_size = new_rect_size
|
|
self.rect = ( int(self.x-self.rect_size),
|
|
int(self.y-self.rect_size),
|
|
int(self.x+self.rect_size),
|
|
int(self.y+self.rect_size) )
|
|
|
|
return ExtractSubprocessor.Data (filepath, rects=[self.rect], landmarks_accurate=self.landmarks_accurate)
|
|
|
|
else:
|
|
is_frame_done = True
|
|
|
|
if is_frame_done:
|
|
self.result.append ( data )
|
|
self.input_data.pop(0)
|
|
io.progress_bar_inc(1)
|
|
self.extract_needed = True
|
|
self.rect_locked = False
|
|
else:
|
|
if len (self.input_data) > 0:
|
|
return self.input_data.pop(0)
|
|
|
|
return None
|
|
|
|
#override
|
|
def on_data_return (self, host_dict, data):
|
|
if not self.type != 'landmarks-manual':
|
|
self.input_data.insert(0, data)
|
|
|
|
def redraw(self):
|
|
(h,w,c) = self.image.shape
|
|
|
|
if not self.hide_help:
|
|
image = cv2.addWeighted (self.image,1.0,self.text_lines_img,1.0,0)
|
|
else:
|
|
image = self.image.copy()
|
|
|
|
view_rect = (np.array(self.rect) * self.view_scale).astype(np.int).tolist()
|
|
view_landmarks = (np.array(self.landmarks) * self.view_scale).astype(np.int).tolist()
|
|
|
|
if self.rect_size <= 40:
|
|
scaled_rect_size = h // 3 if w > h else w // 3
|
|
|
|
p1 = (self.x - self.rect_size, self.y - self.rect_size)
|
|
p2 = (self.x + self.rect_size, self.y - self.rect_size)
|
|
p3 = (self.x - self.rect_size, self.y + self.rect_size)
|
|
|
|
wh = h if h < w else w
|
|
np1 = (w / 2 - wh / 4, h / 2 - wh / 4)
|
|
np2 = (w / 2 + wh / 4, h / 2 - wh / 4)
|
|
np3 = (w / 2 - wh / 4, h / 2 + wh / 4)
|
|
|
|
mat = cv2.getAffineTransform( np.float32([p1,p2,p3])*self.view_scale, np.float32([np1,np2,np3]) )
|
|
image = cv2.warpAffine(image, mat,(w,h) )
|
|
view_landmarks = LandmarksProcessor.transform_points (view_landmarks, mat)
|
|
|
|
landmarks_color = (255,255,0) if self.rect_locked else (0,255,0)
|
|
LandmarksProcessor.draw_rect_landmarks (image, view_rect, view_landmarks, self.face_type, self.image_size, landmarks_color=landmarks_color)
|
|
self.extract_needed = False
|
|
|
|
io.show_image (self.wnd_name, image)
|
|
|
|
|
|
#override
|
|
def on_result (self, host_dict, data, result):
|
|
if self.type == 'landmarks-manual':
|
|
filepath, landmarks = result.filepath, result.landmarks
|
|
|
|
if len(landmarks) != 0 and landmarks[0] is not None:
|
|
self.landmarks = landmarks[0]
|
|
|
|
self.redraw()
|
|
else:
|
|
self.result.append ( result )
|
|
io.progress_bar_inc(1)
|
|
|
|
|
|
|
|
#override
|
|
def get_result(self):
|
|
return self.result
|
|
|
|
|
|
class DeletedFilesSearcherSubprocessor(Subprocessor):
|
|
class Cli(Subprocessor.Cli):
|
|
#override
|
|
def on_initialize(self, client_dict):
|
|
self.debug_paths_stems = client_dict['debug_paths_stems']
|
|
return None
|
|
|
|
#override
|
|
def process_data(self, data):
|
|
input_path_stem = Path(data[0]).stem
|
|
return any ( [ input_path_stem == d_stem for d_stem in self.debug_paths_stems] )
|
|
|
|
#override
|
|
def get_data_name (self, data):
|
|
#return string identificator of your data
|
|
return data[0]
|
|
|
|
#override
|
|
def __init__(self, input_paths, debug_paths ):
|
|
self.input_paths = input_paths
|
|
self.debug_paths_stems = [ Path(d).stem for d in debug_paths]
|
|
self.result = []
|
|
super().__init__('DeletedFilesSearcherSubprocessor', DeletedFilesSearcherSubprocessor.Cli, 60)
|
|
|
|
#override
|
|
def process_info_generator(self):
|
|
for i in range(min(multiprocessing.cpu_count(), 8)):
|
|
yield 'CPU%d' % (i), {}, {'debug_paths_stems' : self.debug_paths_stems}
|
|
|
|
#override
|
|
def on_clients_initialized(self):
|
|
io.progress_bar ("Searching deleted files", len (self.input_paths))
|
|
|
|
#override
|
|
def on_clients_finalized(self):
|
|
io.progress_bar_close()
|
|
|
|
#override
|
|
def get_data(self, host_dict):
|
|
if len (self.input_paths) > 0:
|
|
return [self.input_paths.pop(0)]
|
|
return None
|
|
|
|
#override
|
|
def on_data_return (self, host_dict, data):
|
|
self.input_paths.insert(0, data[0])
|
|
|
|
#override
|
|
def on_result (self, host_dict, data, result):
|
|
if result == False:
|
|
self.result.append( data[0] )
|
|
io.progress_bar_inc(1)
|
|
|
|
#override
|
|
def get_result(self):
|
|
return self.result
|
|
|
|
def main(detector=None,
|
|
input_path=None,
|
|
output_path=None,
|
|
output_debug=None,
|
|
manual_fix=False,
|
|
manual_output_debug_fix=False,
|
|
manual_window_size=1368,
|
|
face_type='full_face',
|
|
max_faces_from_image=None,
|
|
image_size=None,
|
|
jpeg_quality=None,
|
|
cpu_only = False,
|
|
force_gpu_idxs = None,
|
|
):
|
|
|
|
if not input_path.exists():
|
|
io.log_err ('Input directory not found. Please ensure it exists.')
|
|
return
|
|
|
|
if not output_path.exists():
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
if face_type is not None:
|
|
face_type = FaceType.fromString(face_type)
|
|
|
|
if face_type is None:
|
|
if manual_output_debug_fix:
|
|
files = pathex.get_image_paths(output_path)
|
|
if len(files) != 0:
|
|
dflimg = DFLIMG.load(Path(files[0]))
|
|
if dflimg is not None and dflimg.has_data():
|
|
face_type = FaceType.fromString ( dflimg.get_face_type() )
|
|
|
|
input_image_paths = pathex.get_image_unique_filestem_paths(input_path, verbose_print_func=io.log_info)
|
|
output_images_paths = pathex.get_image_paths(output_path)
|
|
output_debug_path = output_path.parent / (output_path.name + '_debug')
|
|
|
|
continue_extraction = False
|
|
if not manual_output_debug_fix and len(output_images_paths) > 0:
|
|
if len(output_images_paths) > 128:
|
|
continue_extraction = io.input_bool ("Continue extraction?", True, help_message="Extraction can be continued, but you must specify the same options again.")
|
|
|
|
if len(output_images_paths) > 128 and continue_extraction:
|
|
try:
|
|
input_image_paths = input_image_paths[ [ Path(x).stem for x in input_image_paths ].index ( Path(output_images_paths[-128]).stem.split('_')[0] ) : ]
|
|
except:
|
|
io.log_err("Error in fetching the last index. Extraction cannot be continued.")
|
|
return
|
|
elif input_path != output_path:
|
|
io.input(f"\n WARNING !!! \n {output_path} contains files! \n They will be deleted. \n Press enter to continue.\n")
|
|
for filename in output_images_paths:
|
|
Path(filename).unlink()
|
|
|
|
device_config = nn.DeviceConfig.GPUIndexes( force_gpu_idxs or nn.ask_choose_device_idxs(choose_only_one=detector=='manual', suggest_all_gpu=True) ) \
|
|
if not cpu_only else nn.DeviceConfig.CPU()
|
|
|
|
if face_type is None:
|
|
face_type = io.input_str ("Face type", 'wf', ['f','wf','head'], help_message="Full face / whole face / head. 'Whole face' covers full area of face include forehead. 'head' covers full head, but requires XSeg for src and dst faceset.").lower()
|
|
face_type = {'f' : FaceType.FULL,
|
|
'wf' : FaceType.WHOLE_FACE,
|
|
'head' : FaceType.HEAD}[face_type]
|
|
|
|
if max_faces_from_image is None:
|
|
max_faces_from_image = io.input_int(f"Max number of faces from image", 0, help_message="If you extract a src faceset that has frames with a large number of faces, it is advisable to set max faces to 3 to speed up extraction. 0 - unlimited")
|
|
|
|
if image_size is None:
|
|
image_size = io.input_int(f"Image size", 512 if face_type < FaceType.HEAD else 768, valid_range=[256,2048], help_message="Output image size. The higher image size, the worse face-enhancer works. Use higher than 512 value only if the source image is sharp enough and the face does not need to be enhanced.")
|
|
|
|
if jpeg_quality is None:
|
|
jpeg_quality = io.input_int(f"Jpeg quality", 90, valid_range=[1,100], help_message="Jpeg quality. The higher jpeg quality the larger the output file size.")
|
|
|
|
if detector is None:
|
|
io.log_info ("Choose detector type.")
|
|
io.log_info ("[0] S3FD")
|
|
io.log_info ("[1] manual")
|
|
detector = {0:'s3fd', 1:'manual'}[ io.input_int("", 0, [0,1]) ]
|
|
|
|
|
|
if output_debug is None:
|
|
output_debug = io.input_bool (f"Write debug images to {output_debug_path.name}?", False)
|
|
|
|
if output_debug:
|
|
output_debug_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
if manual_output_debug_fix:
|
|
if not output_debug_path.exists():
|
|
io.log_err(f'{output_debug_path} not found. Re-extract faces with "Write debug images" option.')
|
|
return
|
|
else:
|
|
detector = 'manual'
|
|
io.log_info('Performing re-extract frames which were deleted from _debug directory.')
|
|
|
|
input_image_paths = DeletedFilesSearcherSubprocessor (input_image_paths, pathex.get_image_paths(output_debug_path) ).run()
|
|
input_image_paths = sorted (input_image_paths)
|
|
io.log_info('Found %d images.' % (len(input_image_paths)))
|
|
else:
|
|
if not continue_extraction and output_debug_path.exists():
|
|
for filename in pathex.get_image_paths(output_debug_path):
|
|
Path(filename).unlink()
|
|
|
|
images_found = len(input_image_paths)
|
|
faces_detected = 0
|
|
if images_found != 0:
|
|
if detector == 'manual':
|
|
io.log_info ('Performing manual extract...')
|
|
data = ExtractSubprocessor ([ ExtractSubprocessor.Data(Path(filename)) for filename in input_image_paths ], 'landmarks-manual', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, manual_window_size=manual_window_size, device_config=device_config).run()
|
|
|
|
io.log_info ('Performing 3rd pass...')
|
|
data = ExtractSubprocessor (data, 'final', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, final_output_path=output_path, device_config=device_config).run()
|
|
|
|
else:
|
|
io.log_info ('Extracting faces...')
|
|
data = ExtractSubprocessor ([ ExtractSubprocessor.Data(Path(filename)) for filename in input_image_paths ],
|
|
'all',
|
|
image_size,
|
|
jpeg_quality,
|
|
face_type,
|
|
output_debug_path if output_debug else None,
|
|
max_faces_from_image=max_faces_from_image,
|
|
final_output_path=output_path,
|
|
device_config=device_config).run()
|
|
|
|
faces_detected += sum([d.faces_detected for d in data])
|
|
|
|
if manual_fix:
|
|
if all ( np.array ( [ d.faces_detected > 0 for d in data] ) == True ):
|
|
io.log_info ('All faces are detected, manual fix not needed.')
|
|
else:
|
|
fix_data = [ ExtractSubprocessor.Data(d.filepath) for d in data if d.faces_detected == 0 ]
|
|
io.log_info ('Performing manual fix for %d images...' % (len(fix_data)) )
|
|
fix_data = ExtractSubprocessor (fix_data, 'landmarks-manual', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, manual_window_size=manual_window_size, device_config=device_config).run()
|
|
fix_data = ExtractSubprocessor (fix_data, 'final', image_size, jpeg_quality, face_type, output_debug_path if output_debug else None, final_output_path=output_path, device_config=device_config).run()
|
|
faces_detected += sum([d.faces_detected for d in fix_data])
|
|
|
|
|
|
io.log_info ('-------------------------')
|
|
io.log_info ('Images found: %d' % (images_found) )
|
|
io.log_info ('Faces detected: %d' % (faces_detected) )
|
|
io.log_info ('-------------------------')
|