Skip to content

Instantly share code, notes, and snippets.

@pliablepixels
Created January 31, 2026 09:59
Show Gist options
  • Select an option

  • Save pliablepixels/a90cc821f9c3f2a6ad16671bdba9f55c to your computer and use it in GitHub Desktop.

Select an option

Save pliablepixels/a90cc821f9c3f2a6ad16671bdba9f55c to your computer and use it in GitHub Desktop.
pyzm yolo fix without version checks
import numpy as np
import sys
import cv2
import time
import datetime
import re
from pyzm.helpers.Base import Base
import portalocker
import os
from pyzm.helpers.utils import Timer
import pyzm.helpers.globals as g
import imutils
# cv2 version check for unconnected layers fix
def cv2_version() -> tuple:
# Returns (major, minor, patch) as ints for proper numeric comparison
x = cv2.__version__.split(".")
# Strip non-numeric suffixes like "-dev"
parts = [re.sub(r'[^0-9]', '', p) or '0' for p in x]
maj = int(parts[0]) if len(parts) > 0 else 0
minor = int(parts[1]) if len(parts) > 1 else 0
patch = int(parts[2]) if len(parts) > 2 else 0
return (maj, minor, patch)
# Class to handle Yolo based detection
class Yolo(Base):
# The actual CNN object detection code
# opencv DNN code credit: https://github.com/arunponnusamy/cvlib
def __init__(self, options={}):
self.net = None
self.classes = None
self.options = options
self.is_locked = False
self.processor = self.options.get('object_processor') or 'cpu'
self.lock_maximum = int(options.get(self.processor + '_max_processes') or 1)
self.lock_timeout = int(options.get(self.processor + '_max_lock_wait') or 120)
self.is_get_unconnected_api_list = False
# self.lock_name='pyzm_'+self.processor+'_lock'
self.lock_name = 'pyzm_uid{}_{}_lock'.format(os.getuid(), self.processor)
self.disable_locks = options.get('disable_locks', 'no')
if self.disable_locks == 'no':
g.logger.Debug(2, 'portalock: max:{}, name:{}, timeout:{}'.format(self.lock_maximum, self.lock_name,
self.lock_timeout))
self.lock = portalocker.BoundedSemaphore(maximum=self.lock_maximum, name=self.lock_name,
timeout=self.lock_timeout)
self.model_height = self.options.get('model_height', 416)
self.model_width = self.options.get('model_width', 416)
def acquire_lock(self):
if self.disable_locks == 'yes':
return
if self.is_locked:
g.logger.Debug(2, '{} portalock already acquired'.format(self.lock_name))
return
try:
g.logger.Debug(2, 'Waiting for {} portalock...'.format(self.lock_name))
self.lock.acquire()
g.logger.Debug(2, 'Got {} portalock'.format(self.lock_name))
self.is_locked = True
except portalocker.AlreadyLocked:
g.logger.Error('Timeout waiting for {} portalock for {} seconds'.format(self.lock_name, self.lock_timeout))
raise ValueError(
'Timeout waiting for {} portalock for {} seconds'.format(self.lock_name, self.lock_timeout))
def release_lock(self):
if self.disable_locks == 'yes':
return
if not self.is_locked:
g.logger.Debug(2, '{} portalock already released'.format(self.lock_name))
return
self.lock.release()
self.is_locked = False
g.logger.Debug(2, 'Released {} portalock'.format(self.lock_name))
def get_options(self):
return self.options
def populate_class_labels(self):
class_file_abs_path = self.options.get('object_labels')
f = open(class_file_abs_path, 'r')
self.classes = [line.strip() for line in f.readlines()]
f.close()
def get_classes(self):
return self.classes
def load_model(self):
name = self.options.get('name') or 'Yolo'
g.logger.Debug(1, '|--------- Loading "{}" model from disk -------------|'.format(name))
t = Timer()
self.net = cv2.dnn.readNet(self.options.get('object_weights'),
self.options.get('object_config'))
# self.net = cv2.dnn.readNetFromDarknet(config_file_abs_path, weights_file_abs_path)
diff_time = t.stop_and_get_ms()
cv2_ver = cv2_version()
if cv2_ver >= (4, 5, 4):
# see https://github.com/opencv/opencv/issues/20923
# we need to modify Yolo code not to expect a nested structure
g.logger.Debug(1, 'You are using OpenCV >= 4.5.4, making sure we fix getUnconnectedOutLayers() API')
self.is_get_unconnected_api_list = True
g.logger.Debug(
1, 'perf: processor:{} Yolo initialization (loading {} model from disk) took: {}'
.format(self.processor, self.options.get('object_weights'), diff_time))
if self.processor == 'gpu':
if cv2_ver < (4, 2, 0):
g.logger.Error('Not setting CUDA backend for OpenCV DNN')
g.logger.Error(
'You are using OpenCV version {} which does not support CUDA for DNNs. A minimum of 4.2 is required. See https://www.pyimagesearch.com/2020/02/03/how-to-use-opencvs-dnn-module-with-nvidia-gpus-cuda-and-cudnn/ on how to compile and install openCV 4.2'
.format(cv2.__version__))
self.processor = 'cpu'
else:
g.logger.Debug(1, 'Using CPU for detection')
if self.processor == 'gpu':
g.logger.Debug(2, 'Setting CUDA backend for OpenCV')
g.logger.Debug(3,
'If you did not set your CUDA_ARCH_BIN correctly during OpenCV compilation, you will get errors during detection related to invalid device/make_policy')
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
self.populate_class_labels()
def get_output_layers(self):
layer_names = self.net.getLayerNames()
if self.is_get_unconnected_api_list:
output_layers = [
layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()
]
else:
output_layers = [
layer_names[i[0] - 1] for i in self.net.getUnconnectedOutLayers()
]
return output_layers
def detect(self, image=None):
Height, Width = image.shape[:2]
g.logger.Debug(2, 'detect extracted image dimensions as: {}wx{}h'.format(Width, Height))
downscaled = False
upsize_xfactor = None
upsize_yfactor = None
max_size = self.options.get('max_size', Width)
old_image = None
if Width > max_size:
downscaled = True
g.logger.Debug(2, 'Scaling image down to max size: {}'.format(max_size))
old_image = image.copy()
image = imutils.resize(image, width=max_size)
newHeight, newWidth = image.shape[:2]
upsize_xfactor = Width / newWidth
upsize_yfactor = Height / newHeight
if self.options.get('auto_lock', True):
self.acquire_lock()
try:
if not self.net:
self.load_model()
g.logger.Debug(
1, '|---------- YOLO (input image: {}w*{}h, model resize dimensions: {}w*{}h) ----------|'
.format(Width, Height, self.model_width, self.model_height))
scale = 0.00392 # 1/255, really. Normalize inputs.
t = Timer()
ln = self.get_output_layers()
blob = cv2.dnn.blobFromImage(image,
scale, (self.model_width, self.model_height), (0, 0, 0),
True,
crop=False)
self.net.setInput(blob)
outs = self.net.forward(ln)
if self.options.get('auto_lock', True):
self.release_lock()
except:
if self.options.get('auto_lock', True):
self.release_lock()
raise
diff_time = t.stop_and_get_ms()
g.logger.Debug(
1, 'perf: processor:{} Yolo detection took: {}'.format(self.processor, diff_time))
class_ids = []
confidences = []
boxes = []
nms_threshold = 0.4
conf_threshold = 0.2
# first nms filter out with a yolo confidence of 0.2 (or less)
if float(self.options.get('object_min_confidence')) < conf_threshold:
conf_threshold = float(self.options.get('object_min_confidence'))
for out in outs:
for detection in out:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
center_x = int(detection[0] * Width)
center_y = int(detection[1] * Height)
w = int(detection[2] * Width)
h = int(detection[3] * Height)
x = center_x - w / 2
y = center_y - h / 2
class_ids.append(class_id)
confidences.append(float(confidence))
boxes.append([x, y, w, h])
t = Timer()
indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold,
nms_threshold)
diff_time = t.stop_and_get_ms()
g.logger.Debug(
2, 'perf: processor:{} Yolo NMS filtering took: {}'.format(self.processor, diff_time))
bbox = []
label = []
conf = []
# now filter out with configured yolo confidence, so we can see rejections in log
# NMSBoxes returns flat indices in OpenCV >= 4.5.4, nested [[i]] before that
indices = np.array(indices).flatten()
for i in indices:
box = boxes[i]
x = box[0]
y = box[1]
w = box[2]
h = box[3]
bbox.append([
int(round(x)),
int(round(y)),
int(round(x + w)),
int(round(y + h))
])
label.append(str(self.classes[class_ids[i]]))
conf.append(confidences[i])
if downscaled:
g.logger.Debug(2, 'Scaling image back up to {}'.format(Width))
image = old_image
for box in bbox:
box[0] = round(box[0] * upsize_xfactor)
box[1] = round(box[1] * upsize_yfactor)
box[2] = round(box[2] * upsize_xfactor)
box[3] = round(box[3] * upsize_yfactor)
return bbox, label, conf, ['yolo'] * len(label)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment