copilot基于 DeepSeek-R1 思路构建 VLA 自动驾驶强化学习系统

news/2025/6/19 18:08:47

下面给出一个完整示例代码,结合 DeepSeek‑R1 的技术方法,实现一个端到端的 VLA(视觉+语言行动)自动驾驶深度强化学习系统。

本示例使用 Gym 的 CarRacing‑v0 环境,并构造了一个网络:

  • 图像采用 CNN 提取视觉特征
  • 语义信息(例如固定指令“drive safely”)经过简单分词、嵌入与 LSTM 得到文本特征
  • 将两部分特征融合后,经全连接层输出连续动作(采用高斯策略)及状态值
    采用 PPO(含裁剪和 KL 惩罚)进行在线更新。注意实际 DeepSeek‑R1 系统会有更复杂的模块与奖励设计,此代码仅做结构演示,可根据需求进一步扩展。

下面就是完整代码:


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
基于 DeepSeek-R1 技术方法实现 VLA 端到端自动驾驶深度强化学习示例。
本示例使用 Gym 的 CarRacing-v0 环境,结合视觉与语言(固定文本指令“drive safely”)输入,
构造一个 Actor-Critic 网络,并使用 PPO 算法进行在线更新。
"""import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import cv2
from torch.utils.data import DataLoader, TensorDataset# 选择设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")#########################
# 图像预处理函数
#########################
def preprocess(observation):"""将 CarRacing-v0 环境的 RGB 图像转为 float32 并归一化到 [0, 1],同时将 HWC 转换为 CHW 格式。"""obs = np.array(observation).astype(np.float32) / 255.0obs = np.transpose(obs, (2, 0, 1))return obs#########################
# 简单文本 tokenization
#########################
def tokenize(text, vocab, max_len):"""将输入文本转为 token id 序列(固定长度),采用简单的空格切分。未收录词使用 <unk> 代替。"""tokens = text.lower().strip().split()token_ids = [vocab.get(token, vocab["<unk>"]) for token in tokens]if len(token_ids) < max_len:# 补 pad(假设 <pad> 的 id 为 0)token_ids += [vocab["<pad>"]] * (max_len - len(token_ids))else:token_ids = token_ids[:max_len]return torch.LongTensor(token_ids)#########################
# 语言编码器:Embedding + LSTM
#########################
class LanguageEncoder(nn.Module):def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1):super(LanguageEncoder, self).__init__()self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True)def forward(self, x):# x : [batch, seq_len]emb = self.embedding(x)  # [batch, seq_len, embed_dim]outputs, (h_n, _) = self.lstm(emb)# 取最后一层最后时刻作为文本表示,维度:[batch, hidden_dim]return h_n[-1]#########################
# VLA Actor-Critic 网络
#########################
class VLAActorCritic(nn.Module):def __init__(self, input_shape, num_actions, vocab_size, embed_dim=32, lang_hidden_dim=64):"""input_shape: 图像尺寸,例如 (3, 96, 96)num_actions: 动作空间维度(CarRacing-v0 中为 3 [steering, gas, brake])"""super(VLAActorCritic, self).__init__()# 图像 CNN 部分self.cnn = nn.Sequential(nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),nn.ReLU(),nn.Conv2d(32, 64, kernel_size=4, stride=2),nn.ReLU(),nn.Conv2d(64, 64, kernel_size=3, stride=1),nn.ReLU())conv_out_size = self._get_conv_out(input_shape)self.visual_fc = nn.Sequential(nn.Linear(conv_out_size, 256),nn.ReLU())# 语言编码器部分(处理文本指令)self.lang_encoder = LanguageEncoder(vocab_size, embed_dim, lang_hidden_dim)# 融合视觉与语言特征combined_dim = 256 + lang_hidden_dimself.combined_fc = nn.Sequential(nn.Linear(combined_dim, 256),nn.ReLU())# Actor 分支:输出动作均值;使用可学习参数定义对数标准差self.actor_mean = nn.Linear(256, num_actions)self.actor_logstd = nn.Parameter(torch.zeros(num_actions))# Critic 分支:输出状态值self.critic = nn.Linear(256, 1)def _get_conv_out(self, shape):o = self.cnn(torch.zeros(1, *shape))return int(np.prod(o.size()))def forward(self, image, text):"""image: [batch, channels, height, width]text: [batch, seq_len] (token id 张量)返回:动作均值、动作标准差、状态值"""# 视觉特征vis_feat = self.cnn(image)vis_feat = vis_feat.view(vis_feat.size(0), -1)vis_feat = self.visual_fc(vis_feat)  # [batch, 256]# 语言特征lang_feat = self.lang_encoder(text)   # [batch, lang_hidden_dim]# 融合两部分combined = torch.cat([vis_feat, lang_feat], dim=-1)combined_feat = self.combined_fc(combined)  # [batch, 256]# 输出策略和价值action_mean = self.actor_mean(combined_feat)action_std = torch.exp(self.actor_logstd).expand_as(action_mean)value = self.critic(combined_feat)return action_mean, action_std, value#########################
# 动作选择函数
#########################
def select_action(model, state, instruction):"""根据当前状态和文本指令选择动作state: (C, H, W) numpy 数组instruction: 已 tokenize 的文本张量 [seq_len]返回:动作(clip 后)、对应 log 概率、状态值以及当前策略分布参数"""state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)instruction_tensor = instruction.unsqueeze(0).to(device)  # [1, seq_len]action_mean, action_std, value = model(state_tensor, instruction_tensor)dist = torch.distributions.Normal(action_mean, action_std)action = dist.sample()  # 采样动作action_logprob = dist.log_prob(action).sum(dim=-1)action = action.detach().cpu().numpy()[0]# CarRacing 环境要求:steering ∈ [-1, 1],gas、brake ∈ [0, 1]lower_bound = np.array([-1.0, 0.0, 0.0])upper_bound = np.array([1.0, 1.0, 1.0])action = np.clip(action, lower_bound, upper_bound)return action, action_logprob.detach(), value.detach(), \action_mean.detach(), action_std.detach()#########################
# RolloutBuffer
#########################
class RolloutBuffer:def __init__(self):self.states = []self.actions = []self.logprobs = []self.rewards = []self.is_terminals = []self.values = []self.action_means = []self.action_stds = []def clear(self):self.states.clear()self.actions.clear()self.logprobs.clear()self.rewards.clear()self.is_terminals.clear()self.values.clear()self.action_means.clear()self.action_stds.clear()#########################
# GAE 计算
#########################
def compute_gae(rewards, values, dones, gamma, lam, next_value):advantages = []gae = 0.0values = values + [next_value]for step in reversed(range(len(rewards))):delta = rewards[step] + gamma * values[step + 1] * (1 - dones[step]) - values[step]gae = delta + gamma * lam * (1 - dones[step]) * gaeadvantages.insert(0, gae)returns = [adv + val for adv, val in zip(advantages, values[:-1])]return advantages, returns#########################
# PPO 更新(加入 KL 惩罚)
#########################
def ppo_update(model, optimizer, buffer, clip_epsilon, c1, beta, epochs, batch_size, gamma, lam, next_value, instr):# 计算优势和折扣回报advantages, returns = compute_gae(buffer.rewards, buffer.values, buffer.is_terminals, gamma, lam, next_value)states = torch.FloatTensor(np.array(buffer.states)).to(device)actions = torch.FloatTensor(np.array(buffer.actions)).to(device)old_logprobs = torch.FloatTensor(np.array(buffer.logprobs)).to(device)returns = torch.FloatTensor(np.array(returns)).to(device)advantages = torch.FloatTensor(np.array(advantages)).to(device)advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)old_action_means = torch.FloatTensor(np.array(buffer.action_means)).to(device)old_action_stds = torch.FloatTensor(np.array(buffer.action_stds)).to(device)dataset = TensorDataset(states, actions, old_logprobs, returns, advantages, old_action_means, old_action_stds)loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)for _ in range(epochs):for batch in loader:batch_states, batch_actions, batch_old_logprobs, batch_returns, batch_advantages, batch_old_means, batch_old_stds = [b.to(device) for b in batch]# 每个 batch 均使用同样的文本指令输入batch_instr = instr.unsqueeze(0).repeat(batch_states.size(0), 1).to(device)action_mean, action_std, values = model(batch_states, batch_instr)dist = torch.distributions.Normal(action_mean, action_std)new_logprobs = dist.log_prob(batch_actions).sum(dim=-1)ratio = torch.exp(new_logprobs - batch_old_logprobs)surr1 = ratio * batch_advantagessurr2 = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * batch_advantagesactor_loss = -torch.min(surr1, surr2).mean()critic_loss = nn.MSELoss()(values.squeeze(), batch_returns)# 计算旧策略与当前策略之间的 KL 散度old_dist = torch.distributions.Normal(batch_old_means, batch_old_stds)kl_loss = torch.distributions.kl_divergence(old_dist, dist).sum(dim=-1).mean()loss = actor_loss + c1 * critic_loss + beta * kl_lossoptimizer.zero_grad()loss.backward()optimizer.step()#########################
# 主训练流程
#########################
def main():# 创建环境env = gym.make("CarRacing-v0")num_actions = 3input_shape = (3, 96, 96)  # CarRacing 的图像尺寸# 简单词汇表(可扩展)vocab = {"<pad>": 0, "<unk>": 1, "drive": 2, "safely": 3}vocab_size = len(vocab)max_seq_len = 5# 固定文本指令instruction_text = "drive safely"instruction = tokenize(instruction_text, vocab, max_seq_len)  # [seq_len]# 创建 VLA Actor-Critic 模型model = VLAActorCritic(input_shape, num_actions, vocab_size, embed_dim=32, lang_hidden_dim=64).to(device)optimizer = optim.Adam(model.parameters(), lr=2.5e-4)# PPO 超参数max_episodes = 500max_timesteps = 1000update_timestep = 2048ppo_epochs = 4batch_size = 64gamma = 0.99lam = 0.95clip_epsilon = 0.2c1 = 0.5      # 值函数损失系数beta = 0.01   # KL 惩罚系数buffer = RolloutBuffer()timestep = 0for episode in range(max_episodes):state = preprocess(env.reset())episode_reward = 0done = Falsefor t in range(max_timesteps):timestep += 1action, logprob, value, action_mean, action_std = select_action(model, state, instruction)next_state, reward, done, _ = env.step(action)next_state = preprocess(next_state)episode_reward += rewardbuffer.states.append(state)buffer.actions.append(action)buffer.logprobs.append(logprob.item())buffer.rewards.append(reward)buffer.is_terminals.append(1.0 if done else 0.0)buffer.values.append(value.item())buffer.action_means.append(action_mean.cpu().numpy()[0])buffer.action_stds.append(action_std.cpu().numpy())state = next_stateif timestep % update_timestep == 0:# 若回合未结束,则用当前状态预测下一状态的值,否则为 0with torch.no_grad():state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)instr_tensor = instruction.unsqueeze(0).to(device)next_value = model(state_tensor, instr_tensor)[2].item() if not done else 0.0ppo_update(model, optimizer, buffer, clip_epsilon, c1, beta,ppo_epochs, batch_size, gamma, lam, next_value, instruction)buffer.clear()timestep = 0if done:breakprint(f"Episode: {episode}, Reward: {episode_reward:.2f}")env.close()if __name__ == "__main__":main()

进一步说明

  1. 视觉与语言联合处理
    代码中通过 CNN 提取图像信息,通过简单的嵌入+LSTM 模块处理文本指令(本例中固定为“drive safely”),二者融合后指导决策。这种设计有助于实现具备自然语言指导能力的自动驾驶系统。

  2. RL 算法
    整个系统基于 PPO 算法训练,同时在损失函数中加入了 KL 惩罚项,鼓励策略更新时保持平稳,从而提升安全性和鲁棒性。

  3. 扩展方向
    在真实应用中,可以将语言输入扩展为动态或复杂的任务描述,采用预训练大模型(例如 Transformer)替代简单 LSTM,同时构建更真实的仿真环境进行在线训练与迁移。

这一完整代码示例为你提供了基于 DeepSeek-R1 思路构建 VLA 自动驾驶强化学习系统的起点,欢迎进一步讨论和交流更多细节。


https://dhexx.cn/news/show-5537840.html

相关文章

Elasticsearch增删改查语句

创建索引库&#xff1a;不带映射的 PUT /索引名称 {"settings": {"number_of_shards": 3, // 主分片数"number_of_replicas": 1 // 每个主分片的副本数} } 创建带映射的索引库&#xff1a; PUT /products {"settings": {"…

发布一个angular的npm包(包含多个模块)

为什么要发布npm包 根本原因时为了能够在更广泛的区域复用代码&#xff0c;比如公司不支持一般的外部网络&#xff0c;但是支持npm包的下载&#xff0c;那么就可以发布npm包&#xff0c;然后在公司内使用。 angular的npm不同吗 angular library angular 目前已经到angular20…

Linux内核 -- INIT_WORK 使用与注意事项

Linux内核 – INIT_WORK 使用与注意事项 一、概述 在 Linux 内核中&#xff0c;workqueue&#xff08;工作队列&#xff09;机制用于将任务从中断上下文中异步转移到进程上下文中执行&#xff0c;降低中断处理负担。INIT_WORK 是工作队列机制的核心之一&#xff0c;用于初始化…

【FFmpeg学习(2)】视频概念

I帧&#xff1a;不需要参考其他画面&#xff0c;靠自己重构完整图像 采用帧内编码方式信息量大周期出现在图像序列中&#xff0c;出现频率由编码器选择是P帧和B帧 的参考帧是一个帧组GOP的基础帧&#xff0c;在一组中只有一个I帧不需要参考运动矢量 P帧&#xff1a; 根据本帧与…

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…

校园导航系统核心技术解析:高精度定位与 AR 实景导航的应用实践

本文面向校园信息化建设者、技术开发者及教育行业数字化转型关注者&#xff0c;旨在解析如何通过 “高精度定位 AR/VR 场景化服务” 技术体系&#xff0c;破解校区因建筑复杂、人流密集导致的寻路效率低下问题&#xff0c;探讨如何利用现有技术解决校园内导航难题&#xff0c;…

参数量计算举例

文章目录 参数量计算举例1. 卷积神经网络结构2. 参数量计算2.1 手工计算2.2 使用 .parameters() 进行编程计算2.3 计算内存占用2.4 使用 torchinfo 库 3. ADAM 优化器4. 批归一化&#xff08;BatchNorm&#xff09;5. 总结 参数量计算举例 假设我们正在操作一个卷积神经网络&a…

从中科大镜像获取linux内核5.10.168的git方法

1 git clone --branch linux-5.10.y git://mirrors.ustc.edu.cn/linux.git 2 git fetch --tags 3 git switch -d v5.10.168 下面是详细的操作流程和日志:hub2004:~/linux5.10.168$ git clone --branch linux-5.10.y git://mirrors.ustc.edu.cn/linux.git Cloning into linu…

上门服务类App开发全流程:从需求分析到部署上线

在当今快节奏的生活中&#xff0c;上门按摩服务已经成为越来越多人的选择。面对全国上万家上门按摩平台&#xff0c;很多人可能会问&#xff1a;现在搭建自己的上门服务平台是不是已经晚了&#xff1f;其实答案很简单&#xff1a;行动永远不嫌晚。就像那句老话说的&#xff0c;…

Web 毕设篇-适合小白、初级入门练手的 Spring Boot Web 毕业设计项目:智驿AI系统(前后端源码 + 数据库 sql 脚本)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 AI系统具有许多优势 1.0 项目介绍 1.1 项目功能 1.2 用户端功能 2.0 用户登录 3.0 首页界面 4.0 物件管理功能 5.0 用户管理功能 6.0 区域管理功能 7.0 物件日志管理功…