测试realsense安装:
import pyrealsense2 as rs
import numpy as np
import cv2
def main():
# 配置RealSense管道
pipeline = rs.pipeline()
config = rs.config()
# 启用RGB和彩色深度流
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
# 启动流
pipeline.start(config)
try:
while True:
# 等待对齐的帧
frames = pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
depth_frame = frames.get_depth_frame()
if not color_frame or not depth_frame:
continue
# 将帧转换为NumPy数组
color_image = np.asanyarray(color_frame.get_data())
depth_image = np.asanyarray(depth_frame.get_data())
# 归一化深度图用于显示
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
# 显示RGB和彩色深度图
cv2.imshow('RealSense RGB', color_image)
cv2.imshow('RealSense Depth', depth_colormap)
# 按 'q' 退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
# 停止管道
pipeline.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
使用ZMQ发送数据:camera_server.py
import pyrealsense2 as rs
import numpy as np
import cv2
import zmq
# 设置ZMQ
context = zmq.Context()
socket = context.socket(zmq.PUB) # 使用发布订阅模式
socket.bind("tcp://*:5555") # 绑定到本机5555端口
def main():
# 配置RealSense
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline.start(config)
try:
while True:
frames = pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
depth_frame = frames.get_depth_frame()
if not color_frame or not depth_frame:
continue
# 转换数据
color_image = np.asanyarray(color_frame.get_data())
depth_image = np.asanyarray(depth_frame.get_data())
# 压缩RGB图像
_, color_encoded = cv2.imencode('.jpg', color_image, [cv2.IMWRITE_JPEG_QUALITY, 80])
# 压缩深度图(使用PNG无损压缩)
_, depth_encoded = cv2.imencode('.png', depth_image)
# 发送数据(RGB前缀 "RGB", 深度前缀 "DEPTH")
socket.send_multipart([b"RGB", color_encoded.tobytes()])
socket.send_multipart([b"DEPTH", depth_encoded.tobytes()])
finally:
pipeline.stop()
if __name__ == "__main__":
main()
接收图像:
import zmq
import cv2
import numpy as np
# 设置ZMQ
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://<服务器IP>:5555") # 替换为服务器IP
socket.setsockopt_string(zmq.SUBSCRIBE, "RGB")
socket.setsockopt_string(zmq.SUBSCRIBE, "DEPTH")
def main():
while True:
topic, img_data = socket.recv_multipart()
if topic == b"RGB":
# 解码RGB图像
color_image = cv2.imdecode(np.frombuffer(img_data, dtype=np.uint8), cv2.IMREAD_COLOR)
cv2.imshow("Received RGB", color_image)
elif topic == b"DEPTH":
# 解码深度图像
depth_image = cv2.imdecode(np.frombuffer(img_data, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
cv2.imshow("Received Depth", depth_colormap)
# 退出条件
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
在数梅派中设置自动运行脚本:
sudo nano /etc/systemd/system/camera.service
[Unit]
Description=RealSense Camera Service
After=network.target
[Service]
ExecStart=/home/raspi/miniforge3/envs/go1/bin/python /home/raspi/camera_server.py
WorkingDirectory=/home/raspi
StandardOutput=inherit
StandardError=inherit
Restart=always
User=raspi
[Install]
WantedBy=multi-user.target
启动服务
sudo systemctl daemon-reload
sudo systemctl enable camera.service
sudo systemctl start camera.service
检查服务:
sudo systemctl status camera.service
停止或禁用服务:
sudo systemctl stop camera.service # 停止服务
sudo systemctl disable camera.service # 禁用开机自启