Source code for trafpy.manager.src.schedulers.random_agent

from trafpy.manager.src.schedulers.schedulertoolbox import SchedulerToolbox, SchedulerToolbox_v2

import random
import json


[docs]class RandomAgent: def __init__(self, Graph, RWA, slot_size, packet_size=300, time_multiplexing=True, debug_mode=False, scheduler_name='Rand'): self.debug_mode = debug_mode self.toolbox = SchedulerToolbox_v2(Graph=Graph, RWA=RWA, slot_size=slot_size, packet_size=packet_size, time_multiplexing=time_multiplexing, debug_mode=debug_mode) self.scheduler_name = scheduler_name self.resolution_strategy = 'random'
[docs] def get_action(self, observation, print_processing_time=False): chosen_flows = self.get_scheduler_action(observation) return {'chosen_flows': chosen_flows}
[docs] def get_scheduler_action(self, observation): # update network state self.toolbox.update_network_state(observation, hide_child_dependency_flows=True) # collect useful flow info dicts for making scheduling decisions flow_info = self.toolbox.collect_flow_info_dicts(path_channel_assignment_strategy='fair_share_num_flows', cost_function=None) # allocate flows by order of cost (lowest cost flows prioritised first) scheduling_info, cost_info = self.toolbox.allocate_available_bandwidth(flow_info, resolution_strategy=self.resolution_strategy) # collect chosen flows and corresponding packets to schedule for the chosen flows chosen_flows = [] for flow_id in flow_info['queued_flows'].keys(): if flow_id not in scheduling_info['flow_id_to_packets_to_schedule_per_edge'] or len(scheduling_info['flow_id_to_packets_to_schedule_per_edge'][flow_id]) == 0: # flow was not chosen to be scheduled on any edge pass else: # flow was chosen to be scheduled on an edge flow = flow_info['queued_flows'][flow_id] # packets to schedule for each flow limited by the edge in their path with lowest number of schedulable packets flow['packets_this_slot'] = min(scheduling_info['flow_id_to_packets_to_schedule_per_edge'][flow_id]) # check that flow was also selected to be scheduled on a bandwidth-limited end point link (if it wasn't, cannot schedule this flow) info_to_transfer_this_slot = flow['packets_this_slot'] * flow['packet_size'] lowest_edge_bandwidth = self.toolbox.get_lowest_edge_bandwidth(flow['path']) bandwidth_requested = info_to_transfer_this_slot / self.toolbox.slot_size # info units of this flow transferred this time slot == capacity used on each channel in flow's path this time slot if bandwidth_requested > lowest_edge_bandwidth: # flow must only have been selected for bandwidth-limiting links and not been given any bandwidth-limited capacity, do not schedule flow pass else: # flow must have been allocated bandwidth on at least one end point link, check for contentions and try to establish flow chosen_flows = self.toolbox.resolve_contentions_and_set_up_flow(flow, chosen_flows, flow_info, scheduling_info, cost_info, resolution_strategy=self.resolution_strategy) # DEBUG if self.debug_mode: print('~~~ Final Choices ~~~') print('chosen flows:\n{}'.format(chosen_flows)) edge_to_chosen_flows = {edge: [] for edge in flow_info['requested_edges'].keys()} for flow in chosen_flows: edges = self.toolbox.get_path_edges(flow['path']) for edge in edges: edge_to_chosen_flows[json.dumps(edge)].append(flow['flow_id']) for edge in self.toolbox.network.edges: for channel in self.toolbox.rwa.channel_names: # src-dst bw = self.toolbox.get_channel_bandwidth(edge, channel) try: print('edge: {} | channel: {} | chosen flows: {} | bandwidth remaining: {}'.format(edge, channel, edge_to_chosen_flows[json.dumps(edge)], bw)) except KeyError: # no flows chosen on this edge print('edge: {} | channel: {} | bandwidth remaining: {}'.format(edge, channel, bw)) # dst-src edge = edge[::-1] bw = self.toolbox.get_channel_bandwidth(edge, channel) try: print('edge: {} | channel: {} | chosen flows: {} | bandwidth remaining: {}'.format(edge, channel, edge_to_chosen_flows[json.dumps(edge)], bw)) except KeyError: # no flows chosen on this edge print('edge: {} | channel: {} | bandwidth remaining: {}'.format(edge, channel, bw)) return chosen_flows
# OLD (DON'T DELETE: This uses machine_readable_network observation -> might be useful when implementing RL stuff # import numpy as np # import json # import tensorflow as tf # class RandomAgent(SchedulerToolbox): # def __init__(self, Graph, RWA, slot_size, packet_size=300, env=None, scheduler_name='random'): # super().__init__(Graph, RWA, slot_size, packet_size) # self.env = env # self.scheduler_name = scheduler_name # def update_avail_actions(self, *chosen_actions): # self.action_assignments = np.array([0.] * self.action_space.n.numpy()) # self.action_mask = np.array([0.] * self.action_space.n.numpy()) # # any actions with a path and channel that has already been chosen cannot be reselected # chosen_flows = [] # for action in chosen_actions: # flow = self.conv_chosen_action_index_to_chosen_flow(action, chosen_flows) # establish_flow, path, channel = self.look_for_available_lightpath(flow, chosen_flows, search_k_shortest=False) # if not establish_flow: # raise Exception('Error: Trying to establish flow {} which is not available given chosen flows {}'.format(flow, chosen_flows)) # flow['channel'] = channel # chosen_flows.append(flow) # for action in self.obs['machine_readable_network'].keys(): # if self.obs['machine_readable_network'][action]['flow_present'] == 1 and self.obs['machine_readable_network'][action]['selected'] == 0 and self.obs['machine_readable_network'][action]['null_action'] == 0: # # flow present, not yet selected and currently registered as available, check if is available given chosen actions # flow = self.conv_chosen_action_index_to_chosen_flow(action, chosen_flows) # establish_flow, path, channel = self.look_for_available_lightpath(flow, chosen_flows, search_k_shortest=False) # if not establish_flow: # # no way to establish flow, register as null action # self.obs['machine_readable_network'][action]['null_action'] = 1 # # get indices of available actions and create action mask # self.avail_action_indices = self.get_indices_of_available_actions(self.obs['machine_readable_network']) # for i in self.avail_action_indices: # self.action_mask[i] = 1 # def get_indices_of_available_actions(self, action_dict): # indices = [] # for index in action_dict.keys(): # if action_dict[index]['flow_present'] == 1 and action_dict[index]['selected'] == 0 and action_dict[index]['null_action'] == 0: # # flow present and not yet selected # indices.append(index) # else: # # no flow present at this placeholder action # pass # return indices # def get_scheduler_action(self, obs, choose_multiple_actions=True): # self.obs = obs # self.update_network_state(obs, hide_child_dependency_flows=True) # self.chosen_actions = [] # if choose_multiple_actions: # while True: # # choose flows # self.update_avail_actions(*self.chosen_actions) # if len(self.avail_action_indices) > 0: # # still have available actions to choose from # action = np.random.choice(self.avail_action_indices) # # add sampled action to chosen actions # self.chosen_actions.append(action) # # update action as having been chosen # self.obs['machine_readable_network'][action]['selected'] = 1 # self.obs['machine_readable_network'][action]['null_action'] = 1 # else: # # no available actions left # break # else: # self.update_avail_actions(*self.chosen_actions) # action = np.random.choice(self.avail_action_indices) # self.chosen_actions.append(action) # self.obs['machine_readable_network'][action]['selected'] = 1 # self.obs['machine_readable_network'][action]['null_action'] = 1 # self.chosen_flows = [] # for action in self.chosen_actions: # self.chosen_flows.append(self.conv_chosen_action_index_to_chosen_flow(action)) # return self.chosen_flows # def conv_chosen_action_index_to_chosen_flow(self, action, chosen_flows=None): # if chosen_flows is None: # chosen_flows = self.chosen_flows # else: # pass # # find src and dst of chosen action flow # src_rep = self.obs['machine_readable_network'][action]['src'] # dst_rep = self.obs['machine_readable_network'][action]['dst'] # src = self.env.repgen.index_to_endpoint[src_rep.numpy()] # dst = self.env.repgen.index_to_endpoint[dst_rep.numpy()] # # find other unique chars of flow # time_arrived = self.obs['machine_readable_network'][action]['time_arrived'] # size = self.obs['machine_readable_network'][action]['size'] # # find queued flows at this src-dst queue # queued_flows = self.SchedulerNetwork.nodes[src][dst]['queued_flows'] # found_flow = False # for flow in queued_flows: # if flow['src'] == src and flow['dst'] == dst and flow['time_arrived'] == time_arrived and flow['size'] == size: # found_flow = True # if flow['packets'] is None: # flow = self.init_paths_and_packets(flow) # if flow['channel'] is None: # establish_flow, path, channel = self.look_for_available_lightpath(flow, chosen_flows, search_k_shortest=False) # if not establish_flow: # raise Exception('Error: Trying to establish flow {} which is not available given chosen flows {}'.format(flow, self.chosen_flows)) # flow['channel'] = channel # return flow # else: # # not this flow # pass # if not found_flow: # raise Exception('Unable to find action {} in queue flows {}'.format(self.obs['machine_readable_network'][action], queued_flows)) # def register_env(self, env): # self.env = env # self.action_space = self.env.action_space # def get_action(self, obs): # if self.env is None: # raise Exception('Must call register_env(env) method or instantiate scheduler with env != None before getting action from scheduler.') # chosen_flows = self.get_scheduler_action(obs) # action = {'chosen_flows': chosen_flows} # return action