官方接单发单平台上线!有接单发单需求的请直接发布需求,或注册接单!点击此处查看详情!

pytorch也写了判断用cpu还是gpu,但是源代码里面的参数有问题。源码里面写的是cvrp,tsp等等,你改的话在cvrp基础上改就可以了,就把vrptw里面的容量和时间作为输入attention

时间:2023-09-12 浏览:575 分类:Python程序代做

91代做网-专注各种程序代做

包括但不限于:各类毕设课设、作业辅导、代码答疑、报告论文、商业程序开发、论文复现和小程序开发等。

也欢迎各行业程序员加入我们,具体请联系客服详聊:QQ号:,微信号:,接单Q群:

源代码里面的代码可以直接改,pytorch也写了判断用cpu还是gpu,但是源代码里面的参数有问题。源码里面写的是cvrp,tsp等等,你改的话在cvrp基础上改就可以了,就把vrptw里面的容量和时间作为输入attention中的一部分,用于计算mask,mask是用来判断下一个可选点,之后再把路径长度和时间惩罚值加权和作为Actor的奖励函数,原代码里面只是将路径长度作为奖励函数。Critic里面已经写了,主要是用来估计价值函数,帮助智能体改进策略。你再把运行部分改成异步并行的,每个agent都是独立的,都含有Actor和Critic,每个agent里面的Actor是一样的,每个agent里面的Critic网络结构也是一样的,只是每个agent里面AC参数不一样,这个你自己设置就可以了。同时写一个全局Critic,这个全局Critic和每个Agent中的Critic的网络结构是一样的,同样也是参数设置不一样,是作为baseline的一部分计算优势函数的。

  1. 将vrptw里面的容量和时间作为输入attention中的一部分,用于计算mask,以限制可选点。

  2. 将路径长度和时间惩罚值加权和作为Actor的奖励函数。

  3. 修改运行部分,将其改为异步并行模式,每个agent都是独立的,包含Actor和Critic,并设置全局Critic。


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np

class SharedActorCriticNet(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(SharedActorCriticNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.actor_head = nn.Linear(hidden_dim, num_actions)
        self.critic_head = nn.Linear(hidden_dim, 1)

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return x

    def get_actor_output(self, x):
        x = self.forward(x)
        x = self.actor_head(x)

        # Apply softmax to get probability distribution of actions
        action_probs = F.softmax(x, dim=1)
        return action_probs

    def get_critic_output(self, x):
        x = self.forward(x)
        value_estimate = self.critic_head(x)
        return value_estimate


class ACActor:
    def __init__(self, input_dim, hidden_dim, num_actions):
        self.model = SharedActorCriticNet(input_dim, hidden_dim).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.num_actions = num_actions
        ...
        
class ACCritic:
    def __init__(self, input_dim, hidden_dim):
        self.model = SharedActorCriticNet(input_dim, hidden_dim).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        ...

class GlobalCritic:
    def __init__(self, input_dim, hidden_dim):
        self.model = SharedActorCriticNet(input_dim, hidden_dim).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

    def calculate_advantages(self, rewards, values, dones, next_value):
        # Compute the advantage function for each state in the rollout
        td_errors = rewards + (1 - dones) * gamma * next_value - values
        advantages = []
        advantage = 0

        # Backward sweep through the rollout to compute advantages
        for error in td_errors[::-1]:
            advantage = gamma * gae_lambda * advantage + error
            advantages.append(advantage)
        advantages.reverse()
        return torch.tensor(advantages).to(device)


def train(actor, critic, global_critic, rollout, device):
    states, actions, log_probs, values, rewards, masks = rollout

    # Update actor and critic networks
    next_state = states[-1]
    next_value = global_critic.model.get_critic_output(torch.FloatTensor(next_state).to(device)).item()

    advantages = global_critic.calculate_advantages(rewards, values, masks, next_value)
    returns = advantages + torch.FloatTensor(values).to(device)

    actor_loss = (-log_probs * advantages.detach()).mean()
    critic_loss = F.smooth_l1_loss(torch.FloatTensor(values).to(device), returns.detach())

    actor.optimizer.zero_grad()
    actor_loss.backward()
    actor.optimizer.step()

    critic.optimizer.zero_grad()
    critic_loss.backward()
    critic.optimizer.step()

    # Update global critic network
    global_critic_loss = critic_loss.item()
    global_critic.optimizer.zero_grad()
    global_critic_loss.backward()
    global_critic.optimizer.step()


def run_agents(env, agents, global_critic, num_episodes, rollout_length, device):
    for episode in range(num_episodes):
        rollouts = []
        for i, agent in enumerate(agents):
            state = env.reset()
            done = False
            episode_reward = 0

            while not done:
                action, log_prob, value = agent.act(state)
                next_state, reward, done, info = env.step(action)

                # Mask the next_state based on constraints
                # Calculate time and capacity violations
                # Add penalty to reward function
                mask = ...
                reward += ...

                rollouts[i].append((state, action, log_prob, value, reward, mask))
                state = next_state
                episode_reward += reward

                if len(rollouts[i]) == rollout_length:
                    train(agent.actor, agent.critic, global_critic, rollouts[i], device)

        print(f"Episode {episode} reward: {episode_reward}")


客服