4GB GPU的DeepSeek-Coder 1.3B模型,并且它已经被量化或优化过。以下是具体的步骤:
安装必要的依赖项:
pip install transformers torch grpcio googleapis-common-protos
创建一个新的ROS 2包:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python llm_ros2_node --dependencies rclpy std_msgs grpcio googleapis-common-protos torch transformers
编辑setup.py文件以包含所需的依赖项:
from setuptools import setup
package_name = 'llm_ros2_node'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='your_name',
maintainer_email='your_email@example.com',
description='TODO: Package description',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'llm_node = llm_ros2_node.llm_node:main',
],
},
)
编写ROS 2节点代码:在这个节点中,我们将订阅一个话题并发送消息到本地的大语言模型,然后将结果发布到另一个话题。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
class LLMNode(Node):
def __init__(self):
super().__init__('llm_node')
self.subscription = self.create_subscription(
String,
'input_text',
self.listener_callback,
10)
self.publisher_ = self.create_publisher(String, 'output_text', 10)
# Load the DeepSeek-Coder model and tokenizer
self.model_name_or_path = "path/to/deepseek-coder-1.3b-optimized"
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name_or_path)
self.model = AutoModelForCausalLM.from_pretrained(self.model_name_or_path).to(self.device)
self.model.eval()
def listener_callback(self, msg):
self.get_logger().info(f'Received input text: {msg.data}')
response = self.call_llm(msg.data)
self.publisher_.publish(String(data=response))
def call_llm(self, prompt):
inputs = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device)
outputs = self.model.generate(inputs, max_length=50, num_return_sequences=1)
reply = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return reply
def main(args=None):
rclpy.init(args=args)
llm_node = LLMNode()
rclpy.spin(llm_node)
llm_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()