-
Notifications
You must be signed in to change notification settings - Fork 1
/
blob_detector
119 lines (94 loc) · 3.92 KB
/
blob_detector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python
import cv2
import rospy
from courserun.msg import BlobDetections####need to make color field a STRING
from std_msgs.msg import Float64, String
from geometry_msgs.msg import Point
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import threading
import numpy as np
global colors, color_map
colors = ["red", "green", "yellow"]
color_map = {"red": (np.array([0,190,100]), np.array([15,255,255])),\
"green": (np.array([35,100,200]), np.array([80,255,255])),\
"yellow": (np.array([20,200,150]), np.array([30,255,255]))}
class blob_detector:
def __init__(self):
self.node_name = "blob_detector"
self.thread_lock = threading.Lock()
self.sub_image = rospy.Subscriber("/camera/rgb/image_rect_color",\
Image, self.cbImage, queue_size=1)
self.pub_image = rospy.Publisher("/blob_image", Image, queue_size=1)
self.pub_msg = rospy.Publisher("/blob_detection", BlobDetections, queue_size=1)
self.bridge = CvBridge()
rospy.loginfo("[%s] Initialized." %(self.node_name))
self.message = BlobDetections()
def cbImage(self,image_msg):
thread = threading.Thread(target=self.processImage,args=(image_msg,))
thread.setDaemon(True)
thread.start()
def processImage(self, image_msg):
if not self.thread_lock.acquire(False):
return
image_cv = self.bridge.imgmsg_to_cv2(image_msg)
width, height, cha = image_cv.shape
lowthresarea = 500
# Convert BGR to HSV
hsv = cv2.cvtColor(image_cv, cv2.COLOR_BGR2HSV)
Arr = []
BoxArr = []
cent = Point()
for c in colors:
#Create mask
(lower, upper) = color_map[c]
mask = cv2.inRange(hsv, lower, upper)
#Create contours
contours = cv2.findContours(mask, cv2.cv.CV_RETR_EXTERNAL, cv2.cv.CV_CHAIN_APPROX_NONE)[0]
for co in contours:
rect = cv2.minAreaRect(co)
(w,h) = rect[1]
area = w*h
if area < lowthresarea: continue
#angle = rect[2]
#if angle < -30 and angle > -60: continue#too angled
#p = max(w/h,h/w)
#if p > 2: continue#the rectangle probably does not enclose the paper
center = rect[0]
(y,x) = center
cent.x = x/width
cent.y = y/height
box = cv2.cv.BoxPoints(rect)
box = np.int0(box)
Arr.append((Float64(area),cent,String(c)))
BoxArr.append(box)#does not matter the order
'''
c_size = cv2.contourArea(i)
if c_size < 500: continue
x,y,w,h = cv2.boundingRect(i) #Draw a box around the contour
cv2.rectangle(image_cv, (x,y) , (x+w, y+h), (0,0,255),2)
centre = cv2.moments(i)
blob_color = hsv[int(centre['m01'] / centre['m00']), int(centre['m10'] / centre['m00']), 0]
if (blob_color < 12):
self.pub_color.publish(True)
print "The blob is red"
else:
self.pub_color.publish(False)
print error
'''
Arr.sort()
Arr = Arr[::-1]
self.message.sizes = [o[0] for o in Arr]
self.message.locations = [o[1] for o in Arr]
self.message.colors = [o[2] for o in Arr]
cv2.drawContours(image_cv,BoxArr,0,(255,255,255),4)
try:
self.pub_image.publish(self.bridge.cv2_to_imgmsg(image_cv, "bgr8"))
self.pub_msg.publish(self.message)
except CvBridgeError as e:
print(e)
self.thread_lock.release()
if __name__=="__main__":
rospy.init_node('blob_detector')
e = blob_detector()
rospy.spin()