在ROS2中开发计算机视觉模块
共 6196字,需浏览 13分钟
·
2023-09-21 21:47
点击下方卡片,关注“新机器视觉”公众号
视觉/图像重磅干货,第一时间送达
一些废话:本人在就读研究生期间主修深度学习与计算机视觉,毕业后进入制造业某德企工作,关注工业领域的视觉项目开发。最近对机器人操作系统(ROS)很感兴趣,其在工业领域应用很广泛,还可以开发无人机,无人驾驶等应用。虽然机械臂的价格比较昂贵,但公司有现成的UR机械臂可以在后期供我上机学习,算是一个优势。
其实机器人这个名词对于大家来说早已不陌生了,但随着近5年人工智能(AI)技术的快速发展与渐渐落地成熟,接下来势必会为机器人的智能水平引入一个新的阶段。且在中国人口出生率不断下降的情况下,机器人的需求在未来几十年会很大。可以说是技术条件与社会需求并存。
正文:本文应用场景为机械臂开发。供初学者参考,以及方便自己复用部分源码,如有疑问或者建议欢迎交流。
一、视觉模块架构设计
考虑的几个方面:
传感器主要使用话题(topic)通信机制持续向外部发布图像信息;
图像接受与处理、以及发送处理结果节点有两种形式,一种是使用服务(service)通信机制,一种是使用话题机制,两者均可。本人在网上搜集了一些信息,并且参考了chatgpt的意见,得到了一个不错的结果:话题速度更快,并且实现更简单,开发中一般默认使用话题,如果随着开发的进行,话题不再满足我们的需求,可以转至service机制。由于工业机械臂中的传感器(camera)和AI模型(一般一个机械臂只会用到一个)并不复杂,所以我选择topic通信机制开发图像数据的接受处理、结果发送模块。
使用订阅者来接受CV model的处理结果。
[sensor publisher (camera_pub.py)] --> [subscriber and publisher node (cam_sub_and_detection_pub.py)] --> [subscriber (detection_results_sub.py)]
二、代码编写
一、新建工作空间
1. 创建src文件夹以存放源码;
2. 在src目录下新建cv_devel_pkg与interfaces_pkg,分别存放视觉开发模块的源码与topic数据接口(interface)文件;
2.1 interfaces_pkg编写
需要注意的是,在新建interface pkg时,build type暂时只能选择c++(信息来源:ros官方文档),并且我们需要修改cmakelists.txt与package.xml:
cmakelists.txt:
新增:
# find_package(<dependency> REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/DetectionResults.msg"
s DEPENDENCIES geometry_msgs
)
package.xml:
新增:
<!-- added -->
<depend>geometry_msgs</depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<!-- end -->
其中,DetectionResults.msg中存放的信息是结果中心坐标的msg:
int32 position_x
int32 position_y
至此,interface pkg代码编写结束。
2.2 cv_devel_pkg编写
(1) camera_pub.py
import rclpy
from rclpy.node import Node
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
import cv2
class CameraPubNode(Node):
def __init__(self, name):
super().__init__(name)
self.pub = self.create_publisher(Image, 'image_raw', 10)
self.timer = self.create_timer(0.5, self.timer_callback)
self.cap = cv2.VideoCapture(0)
self.cv_bridge = CvBridge()
def timer_callback(self):
ret = self.cap.grab()
if ret:
flag, frame = self.cap.retrieve()
if flag:
self.pub.publish(self.cv_bridge.cv2_to_imgmsg(frame, 'bgr8'))
self.get_logger().info('Publish image successfully!')
else:
self.get_logger().info('Did not get image info!')
def main(args=None):
rclpy.init(args=args)
node = CameraPubNode('CameraPubNode')
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
编写完成后,在pkg目录下的setup.py中注册节点,并分别执行colcon build、source install/local_setup.sh、ros2 run cv_devel_pkg camera_pub。
如图,正常运行:
(2) cam_sub_and_detection_pub.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from interfaces_pkg.msg import DetectionResults
from cv_bridge import CvBridge
import cv2
import numpy as np
class CamSubAndDetectionPubNode(Node):
def __init__(self, name):
super().__init__(name)
self.sub = self.create_subscription(Image, 'image_raw', self.listen_callback, 10)
self.pub = self.create_publisher(DetectionResults, 'detection_results', 10)
self.cv_bridge = CvBridge()
self.position_x = 0
self.position_y = 0
def listen_callback(self, data):
self.get_logger().info('Get image! I will process it!')
image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8')
self.detect(image)
position = DetectionResults()
position.position_x = self.position_x
position.position_y = self.position_y
self.get_logger().info('Position is: ({}, {})'.format(self.position_x, self.position_y))
self.pub.publish(position)
def detect(self, image):
pass # 这里可以嵌入自己的机器视觉或者AI视觉代码
def main(args=None):
rclpy.init(args=args)
node = CamSubAndDetectionPubNode('CamSubAndDetectionPubNode')
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
(3)detection_results_sub.py
import rclpy
from rclpy.node import Node
from interfaces_pkg.msg import DetectionResults
class DetectionResultsSubNode(Node):
def __init__(self, name):
super().__init__(name)
self.sub = self.create_subscription(DetectionResults, 'detection_results', self.listen_callback, 10)
def listen_callback(self, data):
self.get_logger().info('I get the position: ({},{})'.format(data.position_x, data.position_y))
def main(args=None):
rclpy.init(args=args)
node = DetectionResultsSubNode('detection_results_sub_node')
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
三、完工
cv_devel_pkg中的节点代码全部编写完成后,在setup.py中注册,然后build & run。
检测结果展示:
三个节点可正常运行:
声明:部分内容来源于网络,仅供读者学习、交流之目的。文章版权归原作者所有。如有不妥,请联系删除。