def main(params):
setup_logging(params.log_level)
# STEP 1: Instantiate face recognition and face detection models
detector = SCRFD(params.det_weight, input_size=(640, 640), conf_thres=params.confidence_thresh)
recognizer = ArcFace(params.rec_weight)
# STEP 2: Build target face database from face dataset directory
targets = build_targets(detector, recognizer, params)
colors = {name: (random.randint(0, 256), random.randint(0, 256), random.randint(0, 256))
for _, name in targets}
cap = cv2.VideoCapture(params.source)
if not cap.isOpened():
raise Exception("Could not open video or webcam")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
# STEP 3: Initialize video input from webcam
out = cv2.VideoWriter("output_video.mp4", cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
# Create a resizable window
cv2.namedWindow("Frame", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Frame", width, height)
while True:
ret, frame = cap.read()
if not ret:
break
# STEP 4: Process each frame from the video input
frame = frame_processor(frame, detector, recognizer, targets, colors, params)
out.write(frame)
cv2.imshow("Frame", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
out.release()
cv2.destroyAllWindows()
class SCRFD:
def __init__(self, model_file=None, input_size=(640, 640), conf_thres=0.5):
"""
Initialize SCRFD face detector
Args:
model_file: Path to ONNX model file
input_size: Network input size (width, height)
conf_thres: Confidence threshold for detections
"""
self.net = cv2.dnn.readNet(model_file)
self.input_size = input_size
self.conf_threshold = conf_thres
self.nms_threshold = 0.4
# Enable GPU acceleration if available
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
# Stride configuration for multi-scale detection
self.strides = [8, 16, 32]
self.fmc = len(self.strides)
def detect(self, img, max_num=0):
"""
Detect faces in an image
Args:
img: Input image
max_num: Maximum number of faces to detect (0 for unlimited)
Returns:
bboxes: Array of bounding boxes with confidence scores
kpss: Array of facial keypoints
"""
# Preprocess image
img_height, img_width = img.shape[:2]
input_height, input_width = self.input_size
blob = cv2.dnn.blobFromImage(
img, 1.0/128, self.input_size, (127.5, 127.5, 127.5), swapRB=True
)
# Forward pass
self.net.setInput(blob)
outputs = self.net.forward(self.net.getUnconnectedOutLayersNames())
# Process outputs
scores_list = []
bboxes_list = []
kpss_list = []
# Process each feature map (multi-scale outputs)
for idx, stride in enumerate(self.strides):
# Decode outputs to get face detections
# Implementation details omitted for brevity
# Apply non-maximum suppression
return self._postprocess(scores_list, bboxes_list, kpss_list,
img_height, img_width, max_num)
def _postprocess(self, scores, bboxes, kpss, img_height, img_width, max_num):
"""
Post-processing of detections
"""
# Convert to numpy arrays
scores = np.vstack(scores)
bboxes = np.vstack(bboxes)
kpss = np.vstack(kpss) if len(kpss) > 0 else np.zeros((0, 0, 2))
# Apply NMS
indices = cv2.dnn.NMSBoxes(
bboxes, scores.flatten(), self.conf_threshold, self.nms_threshold
)
# Extract top detections
# Implementation details omitted for brevity
return final_bboxes, final_kpss
class ArcFace:
def __init__(self, model_file=None):
"""
Initialize ArcFace face recognition model
Args:
model_file: Path to ONNX model file
"""
self.model = cv2.dnn.readNet(model_file)
# Enable GPU acceleration if available
self.model.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.model.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
def __call__(self, img, kps):
"""
Generate face embedding
Args:
img: Input image
kps: Facial keypoints (5 points: eyes, nose, mouth corners)
Returns:
embedding: Normalized face embedding vector
"""
# Align face using keypoints
aligned_face = self._align_face(img, kps)
# Create blob from aligned face
blob = cv2.dnn.blobFromImage(
aligned_face, 1.0/128, (112, 112), (127.5, 127.5, 127.5), swapRB=True
)
# Forward pass
self.model.setInput(blob)
embedding = self.model.forward()
# Normalize embedding (L2 normalization)
embedding = embedding / np.linalg.norm(embedding)
return embedding
def _align_face(self, img, kps):
"""
Align face using facial keypoints
Args:
img: Input image
kps: Facial keypoints
Returns:
aligned_face: Aligned face image
"""
# Define reference keypoints (destination)
dst = np.array([
[30.2946, 51.6963], # left eye
[65.5318, 51.6963], # right eye
[48.0252, 71.7366], # nose
[33.5493, 92.3655], # left mouth
[62.7299, 92.3655] # right mouth
], dtype=np.float32)
# Calculate transformation matrix
M = cv2.estimateAffinePartial2D(kps, dst)[0]
# Apply transformation
aligned_face = cv2.warpAffine(img, M, (112, 112))
return aligned_face
def compute_similarity(embedding1, embedding2):
"""
Compute cosine similarity between two face embeddings
Args:
embedding1: First face embedding
embedding2: Second face embedding
Returns:
similarity: Cosine similarity score (higher means more similar)
"""
# Calculate dot product
dot_product = np.dot(embedding1, embedding2)
# Calculate magnitudes
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
# Calculate cosine similarity
similarity = dot_product / (norm1 * norm2)
return similarity
def frame_processor(
frame: np.ndarray,
detector: SCRFD,
recognizer: ArcFace,
targets: List[Tuple[np.ndarray, str]],
colors: dict,
params: argparse.Namespace
) -> np.ndarray:
"""
Process a single frame for face detection and recognition
Args:
frame: Input frame
detector: SCRFD face detector
recognizer: ArcFace face recognizer
targets: List of (embedding, name) tuples for known faces
colors: Dict mapping names to colors for visualization
params: Configuration parameters
Returns:
processed_frame: Frame with annotated faces
"""
# Detect faces in the frame
bboxes, kpss = detector.detect(frame, params.max_num)
# Process each detected face
for bbox, kps in zip(bboxes, kpss):
*bbox, conf_score = bbox.astype(np.int32)
# Generate embedding for the detected face
embedding = recognizer(frame, kps)
# Find best match in targets
max_similarity = 0
best_match_name = "Unknown"
for target, name in targets:
similarity = compute_similarity(target, embedding)
if similarity > max_similarity and similarity > params.similarity_thresh:
max_similarity = similarity
best_match_name = name
# Draw bounding box with name and similarity score
if best_match_name != "Unknown":
color = colors[best_match_name]
draw_bbox_info(frame, bbox, similarity=max_similarity,
name=best_match_name, color=color)
else:
draw_bbox(frame, bbox, (0, 255, 0))
return frame
def draw_bbox_info(img, bbox, similarity, name, color=(0, 255, 0)):
"""
Draw bounding box with name and similarity score
Args:
img: Input image
bbox: Bounding box coordinates [x1, y1, x2, y2]
similarity: Similarity score
name: Person name
color: Box color
"""
x1, y1, x2, y2 = bbox
# Draw bounding box
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
# Create label with name and similarity
label = f"{name}: {similarity:.2f}"
# Draw label background
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(img, (x1, y1-label_size[1]-10), (x1+label_size[0], y1), color, -1)
# Draw label text
cv2.putText(img, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
def build_targets(detector, recognizer, params: argparse.Namespace) -> List[Tuple[np.ndarray, str]]:
"""
Build target database from face images
Args:
detector: SCRFD face detector
recognizer: ArcFace face recognizer
params: Configuration parameters
Returns:
targets: List of (embedding, name) tuples
"""
targets = []
# Process each face image in the dataset directory
for filename in os.listdir(params.faces_dir):
name = filename[:-4] # Remove file extension
image_path = os.path.join(params.faces_dir, filename)
# Read image
image = cv2.imread(image_path)
# Detect face
bboxes, kpss = detector.detect(image, max_num=1)
# Skip if no face detected
if len(kpss) == 0:
logging.warning(f"No face detected in {image_path}. Skipping...")
continue
# Generate embedding
embedding = recognizer(image, kpss[0])
targets.append((embedding, name))
return targets