-
Notifications
You must be signed in to change notification settings - Fork 11k
Fix nopgadget's NewSwap PR #1579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ionality - Add clone_or_update scripts for cross-platform repo management - Introduce exclude.txt to prevent syncing .git, models, and binary files - Add install and run scripts for macOS/Windows environments - Improve error handling and add docstrings in utilities.py - Enhance robustness in video processing functions - Update core modules (face_analyser, globals, ui, etc.) for consistency The changes implement cross-platform setup automation while improving code quality through better error handling, documentation, and synchronization control. Key modules and scripts were updated to ensure stable execution across different operating systems.
Reviewer's GuideThis PR refactors camera handling and UI to use a unified enumeration function, introduces virtual camera support, enhances face‐swap and face‐enhancer modules with improved blending, error handling and documentation, hardens utility functions and global configuration with type hints, and adds cross-platform setup and Git helper scripts. Sequence diagram for toggling the virtual camera in the UIsequenceDiagram
actor User
participant UI
participant "VirtualCamManager"
participant Status
User->>UI: Clicks 'Toggle Virtual Cam' button
UI->>"VirtualCamManager": Calls start() or stop()
"VirtualCamManager"->>Status: Updates status message
Status-->>User: Shows enabled/disabled message
Sequence diagram for improved face swap process with error handling and blendingsequenceDiagram
participant "FaceSwapper"
participant "FaceAnalyser"
participant "FrameProcessor"
participant "Logger"
"FrameProcessor"->>"FaceAnalyser": get_one_face(temp_frame)
"FrameProcessor"->>"FaceSwapper": swap_face(source_face, target_face, temp_frame)
"FaceSwapper"->>"FaceSwapper": color_transfer (if enabled)
"FaceSwapper"->>"FaceSwapper": Poisson blending (seamlessClone)
"FaceSwapper"->>"FaceSwapper": apply_mouth_area (if enabled)
"FaceSwapper"->>"Logger": Log error if face swap fails
"FaceSwapper"-->>"FrameProcessor": Returns swapped frame
Class diagram for new and updated camera and virtual camera managementclassDiagram
class VideoCapturer {
+device_index: int
+frame_callback
+_current_frame
+_frame_ready
+is_running: bool
+cap
+start(width: int, height: int, fps: int): bool
+read(): (bool, np.ndarray)
+release(): void
+set_frame_callback(callback): void
}
class VirtualCamManager {
+cam
+enabled: bool
+width: int
+height: int
+fps: int
+start(width: int, height: int, fps: int): void
+send(frame): void
+stop(): void
}
VideoCapturer --|> VirtualCamManager : uses (UI toggles)
Class diagram for updated face swapper and face enhancer modulesclassDiagram
class FaceSwapper {
+swap_face(source_face, target_face, temp_frame): Any
+process_frame(source_face, temp_frame): Any
+process_frame_v2(temp_frame, temp_frame_path): Any
+process_frames(source_path, temp_frame_paths, progress): None
+process_image(source_path, target_path, output_path): bool
+color_transfer(source, target): np.ndarray
+create_lower_mouth_mask(face, frame): ...
+draw_mouth_mask_visualization(frame, face, mouth_mask_data): ...
+create_face_mask(face, frame): np.ndarray
}
class FaceEnhancer {
+enhance_face(temp_frame): Any
+process_frame(source_face, temp_frame): Any
+process_frames(source_path, temp_frame_paths, progress): None
+process_image(source_path, target_path, output_path): None
+process_video(source_path, temp_frame_paths): None
+process_frame_v2(temp_frame): Any
}
FaceSwapper <|-- FaceEnhancer : can be used as a frame processor
Class diagram for updated global configuration and typingclassDiagram
class Globals {
+ROOT_DIR: str
+WORKFLOW_DIR: str
+file_types: List[Any]
+source_target_map: List[Dict[str, Any]]
+simple_map: Dict[str, Any]
+source_path: Optional[str]
+target_path: Optional[str]
+output_path: Optional[str]
+frame_processors: List[str]
+keep_fps: bool
+keep_audio: bool
+keep_frames: bool
+many_faces: bool
+map_faces: bool
+color_correction: bool
+nsfw_filter: bool
+video_encoder: Optional[str]
+video_quality: Optional[int]
+live_mirror: bool
+live_resizable: bool
+max_memory: Optional[int]
+execution_providers: List[str]
+execution_threads: Optional[int]
+headless: Optional[bool]
+log_level: str
+fp_ui: Dict[str, bool]
+camera_input_combobox: Any
+webcam_preview_running: bool
+show_fps: bool
+mouth_mask: bool
+show_mouth_mask_box: bool
+mask_feather_ratio: int
+mask_down_size: float
+mask_size: int
}
class Typing {
+Face = InsightFace
+Frame = numpy.ndarray
}
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Blocking issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `modules/utilities.py:201-203` </location>
<code_context>
+ ]
def is_image(image_path: str) -> bool:
- if image_path and os.path.isfile(image_path):
- mimetype, _ = mimetypes.guess_type(image_path)
- return bool(mimetype and mimetype.startswith("image/"))
- return False
+ """Check if a file is an image."""
+ return has_image_extension(image_path)
</code_context>
<issue_to_address>
**issue (bug_risk):** is_image now only checks file extension, not file existence or mimetype.
This may cause is_image to return True for non-existent or invalid files, potentially resulting in errors later. Please consider reintroducing the file existence check or ensure error handling where is_image is called.
</issue_to_address>
### Comment 2
<location> `modules/utilities.py:206-208` </location>
<code_context>
+ return has_image_extension(image_path)
def is_video(video_path: str) -> bool:
- if video_path and os.path.isfile(video_path):
- mimetype, _ = mimetypes.guess_type(video_path)
- return bool(mimetype and mimetype.startswith("video/"))
- return False
+ """Check if a file is a video."""
+ return os.path.splitext(video_path)[1].lower() in [
+ ".mp4", ".mkv"
+ ]
</code_context>
<issue_to_address>
**issue (bug_risk):** is_video now only checks file extension, not file existence or mimetype.
This approach may lead to false positives if the file does not exist or is not a valid video. Please consider reintroducing checks for file existence and mimetype, or confirm that downstream logic can handle such cases.
</issue_to_address>
### Comment 3
<location> `modules/utilities.py:183-191` </location>
<code_context>
+ print(f"Error moving temp output: {e}")
def clean_temp(target_path: str) -> None:
+ """Remove temp directory and files for a given target path."""
temp_directory_path = get_temp_directory_path(target_path)
- parent_directory_path = os.path.dirname(temp_directory_path)
- if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
- shutil.rmtree(temp_directory_path)
- if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
- os.rmdir(parent_directory_path)
+ try:
+ for p in Path(temp_directory_path).glob("*"):
+ p.unlink()
+ os.rmdir(temp_directory_path)
+ except Exception as e:
+ print(f"Error cleaning temp directory: {e}")
</code_context>
<issue_to_address>
**issue (bug_risk):** clean_temp now unconditionally deletes all files in the temp directory.
Consider checking the keep_frames flag before deleting files to avoid removing intermediate results when preservation is intended.
</issue_to_address>
### Comment 4
<location> `modules/processors/frame/face_enhancer.py:85-91` </location>
<code_context>
-def enhance_face(temp_frame: Frame) -> Frame:
+def enhance_face(temp_frame: Any) -> Any:
+ """Enhance a face in the given frame using GFPGAN."""
with THREAD_SEMAPHORE:
- _, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True)
+ enhancer = get_face_enhancer()
+ if enhancer is None:
+ print("Face enhancer model not loaded.")
+ return temp_frame
+ try:
+ _, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True)
</code_context>
<issue_to_address>
**suggestion (bug_risk):** enhance_face returns the original frame if enhancement fails, which may mask errors.
Consider adding a warning log or a flag to indicate when enhancement is skipped, so downstream code can reliably detect failures.
Suggested implementation:
```python
import warnings
def enhance_face(temp_frame: Any) -> tuple[Any, bool]:
"""Enhance a face in the given frame using GFPGAN.
Returns:
(frame, enhanced): frame is the processed frame, enhanced is True if enhancement succeeded, False otherwise.
"""
with THREAD_SEMAPHORE:
enhancer = get_face_enhancer()
if enhancer is None:
warnings.warn("Face enhancer model not loaded. Enhancement skipped.", RuntimeWarning)
return temp_frame, False
try:
_, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True)
return temp_frame, True
except Exception as e:
warnings.warn(f"Face enhancement failed: {e}. Enhancement skipped.", RuntimeWarning)
return temp_frame, False
```
You will need to update any downstream code that calls `enhance_face` to handle the new return value `(frame, enhanced: bool)`. For example, in `process_frame`, you should unpack the tuple and check the `enhanced` flag to detect if enhancement was skipped or failed.
</issue_to_address>
### Comment 5
<location> `modules/cluster_analysis.py:7-16` </location>
<code_context>
+def find_cluster_centroids(embeddings: List[Any], max_k: int = 10) -> Any:
</code_context>
<issue_to_address>
**issue:** find_cluster_centroids returns empty list if KMeans fails for all k.
Returning an empty list may lead to downstream errors. Consider raising an exception or providing a default centroid to handle this case.
</issue_to_address>
### Comment 6
<location> `modules/processors/frame/core.py:45-54` </location>
<code_context>
+def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
</code_context>
<issue_to_address>
**issue (bug_risk):** set_frame_processors_modules_from_ui does not update modules.globals.frame_processors when removing processors.
Removing a processor from FRAME_PROCESSORS_MODULES should also remove it from modules.globals.frame_processors to prevent state mismatches.
</issue_to_address>
### Comment 7
<location> `modules/utilities.py:35` </location>
<code_context>
subprocess.run(commands, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 8
<location> `modules/capturer.py:11-20` </location>
<code_context>
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
"""Extract a specific frame from a video file, with color correction if enabled."""
capture = cv2.VideoCapture(video_path)
try:
# Set MJPEG format to ensure correct color space handling
capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
# Only force RGB conversion if color correction is enabled
if modules.globals.color_correction:
capture.set(cv2.CAP_PROP_CONVERT_RGB, 1)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
if has_frame and modules.globals.color_correction:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame if has_frame else None
except Exception as e:
print(f"Error extracting video frame: {e}")
return None
finally:
capture.release()
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 9
<location> `modules/capturer.py:32-33` </location>
<code_context>
def get_video_frame_total(video_path: str) -> int:
"""Return the total number of frames in a video file."""
capture = cv2.VideoCapture(video_path)
try:
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
return video_frame_total
except Exception as e:
print(f"Error getting video frame total: {e}")
return 0
finally:
capture.release()
</code_context>
<issue_to_address>
**suggestion (code-quality):** Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
```suggestion
return int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
```
</issue_to_address>
### Comment 10
<location> `modules/face_analyser.py:55-58` </location>
<code_context>
def default_source_face() -> Any:
"""Return the first source face from the global map, if available."""
for map in modules.globals.source_target_map:
if "source" in map:
return map["source"]["face"]
return None
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use the built-in function `next` instead of a for-loop ([`use-next`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-next/))
```suggestion
return next(
(
map["source"]["face"]
for map in modules.globals.source_target_map
if "source" in map
),
None,
)
```
</issue_to_address>
### Comment 11
<location> `modules/face_analyser.py:91-97` </location>
<code_context>
def get_unique_faces_from_target_image() -> Any:
"""Extract unique faces from the target image and update the global map."""
try:
modules.globals.source_target_map = []
target_frame = cv2.imread(modules.globals.target_path)
many_faces = get_many_faces(target_frame)
i = 0
for face in many_faces:
modules.globals.source_target_map.append({
'id': i,
'target': {'face': face}
})
i += 1
except Exception as e:
print(f"Error in get_unique_faces_from_target_image: {e}")
return None
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Replace manual loop counter with call to enumerate ([`convert-to-enumerate`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/convert-to-enumerate/))
- Replace a for append loop with list extend ([`for-append-to-extend`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-append-to-extend/))
```suggestion
modules.globals.source_target_map.extend(
{'id': i, 'target': {'face': face}}
for i, face in enumerate(many_faces)
)
```
</issue_to_address>
### Comment 12
<location> `modules/face_analyser.py:107` </location>
<code_context>
def get_unique_faces_from_target_video() -> Any:
"""Extract unique faces from all frames of the target video and update the global map."""
try:
modules.globals.source_target_map = []
frame_face_embeddings = []
face_embeddings = []
print('Creating temp resources...')
clean_temp(modules.globals.target_path)
create_temp(modules.globals.target_path)
print('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
i = 0
for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"):
frame = cv2.imread(temp_frame_path)
faces = get_many_faces(frame)
if faces:
for face in faces:
face_embeddings.append(face.normed_embedding)
frame_face_embeddings.append({'frame': temp_frame_path, 'face': face})
centroids = find_cluster_centroids(face_embeddings)
for frame in frame_face_embeddings:
closest_centroid_index, _ = find_closest_centroid(centroids, frame['face'].normed_embedding)
modules.globals.source_target_map.append({
'id': closest_centroid_index,
'target': {'face': frame['face'], 'location': frame['frame']}
})
for i in range(len(centroids)):
pass # Optionally, add more logic here
except Exception as e:
print(f"Error in get_unique_faces_from_target_video: {e}")
return None
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Remove nested block which has no effect ([`remove-empty-nested-block`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-empty-nested-block/))
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>
### Comment 13
<location> `modules/face_analyser.py:139-142` </location>
<code_context>
def default_target_face():
"""Return the first target face from the global map, if available."""
for map in modules.globals.source_target_map:
if "target" in map:
return map["target"]["face"]
return None
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use the built-in function `next` instead of a for-loop ([`use-next`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-next/))
```suggestion
return next(
(
map["target"]["face"]
for map in modules.globals.source_target_map
if "target" in map
),
None,
)
```
</issue_to_address>
### Comment 14
<location> `modules/face_analyser.py:147-149` </location>
<code_context>
def dump_faces(centroids: Any, frame_face_embeddings: list) -> None:
"""Dump face crops to the temp directory for debugging or visualization."""
temp_directory_path = get_temp_directory_path(modules.globals.target_path)
for i in range(len(centroids)):
pass # Implement as needed
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove nested block which has no effect ([`remove-empty-nested-block`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-empty-nested-block/))
```suggestion
temp_directory_path = get_temp_directory_path(modules.globals.target_path)
```
</issue_to_address>
### Comment 15
<location> `modules/processors/frame/face_swapper.py:127` </location>
<code_context>
def process_frame_v2(temp_frame: Any, temp_frame_path: str = "") -> Any:
"""Process a frame using mapped faces (for mapped face mode)."""
if is_image(modules.globals.target_path):
if modules.globals.many_faces:
source_face = default_source_face()
for map in modules.globals.source_target_map:
target_face = map["target"]["face"]
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
for map in modules.globals.source_target_map:
if "source" in map:
source_face = map["source"]["face"]
target_face = map["target"]["face"]
temp_frame = swap_face(source_face, target_face, temp_frame)
elif is_video(modules.globals.target_path):
if modules.globals.many_faces:
source_face = default_source_face()
for map in modules.globals.source_target_map:
target_frame = [
f
for f in map["target_faces_in_frame"]
if f["location"] == temp_frame_path
]
for frame in target_frame:
for target_face in frame["faces"]:
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
for map in modules.globals.source_target_map:
if "source" in map:
target_frame = [
f
for f in map["target_faces_in_frame"]
if f["location"] == temp_frame_path
]
source_face = map["source"]["face"]
for frame in target_frame:
for target_face in frame["faces"]:
temp_frame = swap_face(source_face, target_face, temp_frame)
else:
detected_faces = get_many_faces(temp_frame)
if modules.globals.many_faces:
if detected_faces:
source_face = default_source_face()
for target_face in detected_faces:
temp_frame = swap_face(source_face, target_face, temp_frame)
elif not modules.globals.many_faces:
if detected_faces:
if len(detected_faces) <= len(
modules.globals.simple_map["target_embeddings"]
):
for detected_face in detected_faces:
closest_centroid_index, _ = find_closest_centroid(
modules.globals.simple_map["target_embeddings"],
detected_face.normed_embedding,
)
temp_frame = swap_face(
modules.globals.simple_map["source_faces"][
closest_centroid_index
],
detected_face,
temp_frame,
)
else:
detected_faces_centroids = []
for face in detected_faces:
detected_faces_centroids.append(face.normed_embedding)
i = 0
for target_embedding in modules.globals.simple_map[
"target_embeddings"
]:
closest_centroid_index, _ = find_closest_centroid(
detected_faces_centroids, target_embedding
)
temp_frame = swap_face(
modules.globals.simple_map["source_faces"][i],
detected_faces[closest_centroid_index],
temp_frame,
)
i += 1
return temp_frame
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Remove redundant conditional [×3] ([`remove-redundant-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-if/))
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Hoist nested repeated code outside conditional statements ([`hoist-similar-statement-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-similar-statement-from-if/))
- Low code quality found in process\_frame\_v2 - 13% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
### Comment 16
<location> `modules/processors/frame/face_swapper.py:271` </location>
<code_context>
def process_image(source_path: str, target_path: str, output_path: str) -> bool:
"""Process a single image and return True if successful, False if no face detected."""
if not modules.globals.map_faces:
source_face = get_one_face(cv2.imread(source_path))
if source_face is None:
logging.warning("No face detected in source image. Skipping output.")
return False
target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame)
if np.array_equal(result, target_frame):
logging.warning("No face detected in target image. Skipping output.")
return False
cv2.imwrite(output_path, result)
return True
else:
if modules.globals.many_faces:
update_status(
"Many faces enabled. Using first source image. Progressing...", NAME
)
target_frame = cv2.imread(target_path)
result = process_frame_v2(target_frame)
if np.array_equal(result, target_frame):
logging.warning("No face detected in mapped target image. Skipping output.")
return False
cv2.imwrite(output_path, result)
return True
</code_context>
<issue_to_address>
**issue (code-quality):** Hoist repeated code outside conditional statement [×2] ([`hoist-statement-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-statement-from-if/))
</issue_to_address>
### Comment 17
<location> `modules/ui.py:146` </location>
<code_context>
def get_available_cameras():
"""Returns a list of available camera names and indices."""
if platform.system() == "Windows":
try:
graph = FilterGraph()
devices = graph.get_input_devices()
# Create list of indices and names
camera_indices = list(range(len(devices)))
camera_names = devices
# If no cameras found through DirectShow, try OpenCV fallback
if not camera_names:
# Try to open camera with index -1 and 0
test_indices = [-1, 0]
working_cameras = []
for idx in test_indices:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
working_cameras.append(f"Camera {idx}")
cap.release()
if working_cameras:
return test_indices[: len(working_cameras)], working_cameras
# If still no cameras found, return empty lists
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
except Exception as e:
print(f"Error detecting cameras: {str(e)}")
return [], ["No cameras found"]
else:
# Unix-like systems (Linux/Mac) camera detection
camera_indices = []
camera_names = []
if platform.system() == "Darwin": # macOS specific handling
# Try to open the default FaceTime camera first
cams = enumerate_cameras()
for camera_info in cams:
camera_indices.append(camera_info.index)
camera_names.append(camera_info.name)
# cap = cv2.VideoCapture(0)
# On macOS, additional cameras typically use indices 1 and 2
else:
# Linux camera detection - test first 10 indices
for i in range(10):
cap = cv2.VideoCapture(i)
if cap.isOpened():
camera_indices.append(i)
camera_names.append(f"Camera {i}")
cap.release()
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 18
<location> `modules/ui.py:1015-1018` </location>
<code_context>
def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
update_status("Failed to start camera")
return
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None
prev_time = time.time()
fps_update_interval = 0.5
frame_count = 0
fps = 0
face_swap_enabled = True # Toggle for live face swap
last_face_detected = True
no_face_counter = 0
NO_FACE_THRESHOLD = 30 # Number of frames to show warning if no face
def toggle_face_swap():
nonlocal face_swap_enabled
face_swap_enabled = not face_swap_enabled
update_status(f"Face Swap {'Enabled' if face_swap_enabled else 'Disabled'}")
# Optionally, bind a key or button to toggle_face_swap
PREVIEW.bind('<f>', lambda e: toggle_face_swap())
while True:
ret, frame = cap.read()
if not ret:
update_status("Camera frame read failed.")
break
temp_frame = frame.copy()
if modules.globals.live_mirror:
temp_frame = cv2.flip(temp_frame, 1)
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
face_found = True
if face_swap_enabled:
if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame)
else:
# Check if a face is detected before swapping
detected_face = get_one_face(temp_frame)
if detected_face is not None and source_image is not None:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
last_face_detected = True
no_face_counter = 0
else:
face_found = False
no_face_counter += 1
else:
modules.globals.target_path = None
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
# Face swap disabled, just show the frame
pass
# Show warning if no face detected for a while
if not face_found and no_face_counter > NO_FACE_THRESHOLD:
cv2.putText(
temp_frame,
"No face detected!",
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
(0, 0, 255),
3,
)
elif face_found:
no_face_counter = 0
# Calculate and display FPS
current_time = time.time()
frame_count += 1
if current_time - prev_time >= fps_update_interval:
fps = frame_count / (current_time - prev_time)
frame_count = 0
prev_time = current_time
if modules.globals.show_fps:
cv2.putText(
temp_frame,
f"FPS: {fps:.1f}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 0),
2,
)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(image)
image = ImageOps.contain(
image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
if PREVIEW.state() == "withdrawn":
break
cap.release()
PREVIEW.withdraw()
update_status("Webcam preview closed.")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove redundant pass statement ([`remove-redundant-pass`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-pass/))
```suggestion
```
</issue_to_address>
### Comment 19
<location> `modules/video_capture.py:34` </location>
<code_context>
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
"""Initialize and start video capture."""
try:
if platform.system() == "Windows":
capture_methods = [
(self.device_index, cv2.CAP_DSHOW),
(self.device_index, cv2.CAP_ANY),
(-1, cv2.CAP_ANY),
(0, cv2.CAP_ANY),
]
for dev_id, backend in capture_methods:
try:
self.cap = cv2.VideoCapture(dev_id, backend)
if self.cap.isOpened():
break
except Exception as e:
print(f"Error opening camera with backend {backend}: {e}")
else:
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera")
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps)
self.is_running = True
return True
except Exception as e:
print(f"Failed to start capture: {str(e)}")
if self.cap:
self.cap.release()
return False
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
ZardashtKaya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix conflicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses camera enumeration issues and introduces several new features including virtual camera support, enhanced face swapping with Poisson blending, and cross-platform setup automation scripts.
Key Changes:
- Fixed camera detection logic across Windows, macOS, and Linux platforms using
cv2_enumerate_cameras - Added virtual camera output support via
pyvirtualcamwith UI toggle and management class - Enhanced face swapping pipeline with Poisson blending, color correction, and improved error handling
Reviewed Changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 29 comments.
Show a summary per file
| File | Description |
|---|---|
| ui.json | New UI theme configuration defining colors and styling for CustomTkinter components |
| run-cuda.bat | Updated to activate virtual environment before running with CUDA provider |
| run-cuda-macos.sh | New macOS script for CUDA execution with virtual environment activation |
| run-coreml.bat | New Windows reference script for CoreML provider |
| run-coreml-macos.sh | New macOS script for CoreML/Apple Silicon execution |
| push_to_rehanbgmi.sh | New helper script for pushing changes to a specific fork |
| push_to_new_branch.sh | New helper script for creating and pushing feature branches |
| install_windows.bat | New automated Windows setup script with dependency installation |
| install_macos.sh | New automated macOS setup script with Homebrew and Python 3.10 |
| exclude.txt | New exclusion list for repository syncing operations |
| clone_or_update_deep_live_cam.sh | New script for cloning/updating repository with rsync |
| clone_or_update_deep_live_cam.bat | Windows version of repository sync script |
| modules/video_capture.py | Refactored camera initialization with improved error handling and docstrings |
| modules/utilities.py | Enhanced with better error handling, docstrings, and temp file management |
| modules/ui.py | Added camera enumeration function, virtual camera manager, and live preview enhancements |
| modules/typing.py | Simplified type aliases with clearer documentation |
| modules/processors/frame/face_swapper.py | Major refactor with Poisson blending, simplified processing logic, and better error handling |
| modules/processors/frame/face_enhancer.py | Streamlined with TensorRT support detection and consistent error handling |
| modules/processors/frame/core.py | Improved module loading with better error messages and type hints |
| modules/predicter.py | Added exception handling and docstrings for NSFW prediction |
| modules/globals.py | Cleaned up with type hints and better documentation |
| modules/gettext.py | Added type hints and improved docstrings |
| modules/face_analyser.py | Refactored with consistent error handling and simplified map operations |
| modules/cluster_analysis.py | Enhanced with error handling and type hints |
| modules/capturer.py | Improved with try-finally blocks and better error handling |
Comments suppressed due to low confidence (3)
modules/utilities.py:166
- This comment appears to contain commented-out code.
# if not output_path:
# base = os.path.splitext(os.path.basename(target_path))[0]
# return os.path.join(TEMP_DIRECTORY, f"{base}_result.png")
modules/video_capture.py:6
- Import of 'getBackendName' is not used.
from cv2.videoio_registry import getBackendName
modules/video_capture.py:7
- Import of 'supported_backends' is not used.
from cv2_enumerate_cameras import supported_backends
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| git commit -m "$COMMIT_MSG" | ||
| git checkout -b "$BRANCH_NAME" | ||
| git push rehanbgmi "$BRANCH_NAME" | ||
| echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)." |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in echo message: "deeplivceam" should be "deeplivecam" (missing 'e').
| echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)." | |
| echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivecam)." |
| for i in range(len(centroids)): | ||
| pass # Implement as needed |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty pass statement in a loop serves no purpose. Consider adding actual implementation or removing this placeholder code.
| for i in range(len(centroids)): | |
| pass # Implement as needed | |
| os.makedirs(temp_directory_path, exist_ok=True) | |
| for i in range(len(centroids)): | |
| face = frame_face_embeddings[i] | |
| # Assume face has .bbox and .frame attributes | |
| if hasattr(face, 'bbox') and hasattr(face, 'frame'): | |
| x1, y1, x2, y2 = map(int, face.bbox) | |
| crop = face.frame[y1:y2, x1:x2] | |
| out_path = os.path.join(temp_directory_path, f"face_{i}.jpg") | |
| cv2.imwrite(out_path, crop) | |
| else: | |
| # If face.frame is not available, skip or log | |
| print(f"Face object at index {i} missing 'bbox' or 'frame' attribute.") |
| @@ -0,0 +1,23 @@ | |||
| #!/bin/zsh | |||
| # push_to_rehanbgmi.sh - Commit and push changes to your fork (rehanbgmi/deeplivceam) in Deep-Live-Cam-remote | |||
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in the comment: "deeplivceam" should be "deeplivecam" (missing 'e').
| except Exception as e: | ||
| print(f"Error opening camera with backend {backend}: {e}") |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After an exception in opening the camera, the code continues to the next iteration without releasing the capture object. This could lead to resource leaks. The self.cap.release() line that was present in the original code has been removed.
| if not os.path.exists(download_directory_path): | ||
| os.makedirs(download_directory_path) | ||
| """Download files from URLs if they do not exist in the directory.""" | ||
| import requests |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The requests library is imported inside the function but is not in the standard library. This import should be moved to the top of the file, and the dependency should be verified in requirements.txt.
| detected_face = get_one_face(temp_frame) | ||
| if detected_face is not None and source_image is not None: | ||
| temp_frame = frame_processor.process_frame(source_image, temp_frame) | ||
| last_face_detected = True |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable last_face_detected is not used.
| last_face_detected = True |
| import modules.processors.frame.core | ||
| import torch | ||
| import modules | ||
| import numpy as np |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'np' is not used.
| import numpy as np |
|
|
||
| TENSORRT_AVAILABLE = False | ||
| try: | ||
| import tensorrt |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'tensorrt' is not used.
| import ssl | ||
| import subprocess | ||
| import urllib | ||
| import cv2 |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'cv2' is not used.
| import cv2 |
| PYVIRTUALCAM_AVAILABLE = True | ||
| except ImportError: | ||
| PYVIRTUALCAM_AVAILABLE = False | ||
| print("pyvirtualcam is not installed. Virtual camera support will be disabled.") |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print statement may execute during import.
This PR fixes the camera. enumeration by using
cv2_enumerate_cameras()correctly and fixing some other bugs in the UI and making the NewSwap Branch runSummary by Sourcery
Improve camera enumeration and introduce virtual camera support; refine face swapping pipeline with advanced blending and error handling; standardize utilities; and add cross-platform setup scripts
New Features:
Bug Fixes:
Enhancements: