Source code for trafpy.manager.src.schedulers.agent

from trafpy.manager.src.schedulers.schedulertoolbox import SchedulerToolbox

import numpy as np
import networkx as nx
import copy
import math
import sys
import matplotlib.pyplot as plt
from collections import defaultdict
import time
import itertools


[docs]class Agent(SchedulerToolbox): def __init__(self, Graph, RWA, slot_size, max_F, epsilon, alpha, gamma, agent_type='sarsa_learning'): super().__init__(Graph, RWA, slot_size) self.agent_type = agent_type self.N = int(len(self.Graph.graph['endpoints'])) # number of servers self.max_F = max_F # max number of flows per queue self.max_record_num_packets = 100 # max to record. above this val, all same self.max_record_time_in_queue = 10 # max time in queue to record self.epsilon = epsilon self.alpha = alpha self.gamma = gamma graph_edges = [e for e in self.Graph.edges] self.num_edges = len(graph_edges) self.num_encoded_path_combos = self.RWA.num_k * (2 ** (self.num_edges-1)) self.num_poss_times_in_queue = (self.max_record_time_in_queue/self.slot_size)+1 self.num_poss_schedule = 2 # can be 1 or 0 self.num_poss_packets = self.max_record_num_packets + 1 self.num_poss_flows = self.max_F + 1 #self.phi = self.num_poss_packets * self.num_poss_times_in_queue * self.num_poss_flows * self.num_encoded_path_combos * self.num_poss_schedule # state space for each flow self.phi = int(self.num_poss_packets + self.num_poss_times_in_queue + self.num_poss_flows + (self.RWA.num_k * self.num_edges) + 1) # num params for each flow state, +1 is for schedule param self.num_queues = self.N * (self.N - 1) eps = self.Graph.graph['endpoints'] a_idx = iter([i+1 for i in range(self.num_queues * self.max_F)]) self.int_to_action = {next(a_idx): {'server': ep, 'queue': epq, 'flow_idx': flow_idx} for ep in eps for epq in eps if ep != epq for flow_idx in range(self.max_F)} self.int_to_action[0] = 'null' # add null action #self.action_to_int = {} #for key, val in self.int_to_action.items(): # self.action_to_int[val] = key self.reset() self.action_space = len(self.int_to_action) self.state_space = self.num_queues * self.max_F * self.phi #self.state_space = (self.action_space ** 2) * self.phi print('Action space: {}'.format(self.action_space)) print('State space: {}'.format(self.state_space)) # initialise q table self.Q_table = defaultdict(lambda: np.zeros(self.action_space)) # create policy to follow self.policy = self.make_epsilon_greedy_policy(self.Q_table, self.epsilon, self.action_space)
[docs] def make_epsilon_greedy_policy(self, Q_table, epsilon, action_space): def policy_func(state): # create uniform probability distribution over all possible actions A = np.ones(action_space, dtype=float) * epsilon / action_space # find which action has the highest value best_action = np.argmax(Q_table[state]) print('Best action:\n{}'.format(best_action)) # bias probability of choosing best action A[best_action] += (1.0 - epsilon) print('Action prob distribution:\n{}'.format(A)) return A return policy_func
[docs] def check_if_end_of_time_slot_decisions(self, action, chosen_flows): ''' If one of the following is true, agent should stop making decisions this time slot and this method will return True: 1) The action chosen is the 'null' action, which is the agent's way of explicitly stating that it doesn't want to schedule anymore flows for this time slot 2) The action chosen is a flow that has already been chosen this time slot 3) The action chosen is invalid since, due to previously selected actions, there are no paths or channels (lightpaths) available Args: - action: The action to be checked chosen by the agent - chosen_flows: A list of flows already chosen by the agent ''' if action == 'null': print('Agent doesn\'t want to make any more scheduling decisions') return True else: # find flow in network flow_dict = self.get_flow_from_network(action['server'], action['queue'], action['flow_idx']) if flow_dict == 'N/A': print('Flow not in network, therefore action invalid') return True else: # flow is in network, check if already chosen this time slot if flow_dict in chosen_flows: print('Chosen flow already chosen this time slot') return True else: # flow not yet chosen this time slot, check if lightpath available est, _, _ = self.look_for_available_lightpath(flow_dict, chosen_flows) if not est: print('Lightpath not available, therefore action invalid') return True # if get to this point, action not null and is valid, return False return False
[docs] def get_action(self, observation): chosen_flows = [] self.chosen_actions = [] # save chosen actions self.chosen_action_states = [] # save states agent observed when choosing flows self.Q_state_action = 0 # init estimated q state action value self.update_network_state(observation) state, agent_queues = self.get_agent_state_representation(observation) while True: # keep getting decisions until end of time slot decisions a_probs = self.policy(state) a = np.random.choice(np.arange(len(a_probs)),p=a_probs) # choose from dist print('Chosen action: {}'.format(a)) self.chosen_actions.append(a) self.chosen_action_states.append(state) q_value = self.Q_table[state][a] print('Estimated q_value of action: {}'.format(q_value)) self.Q_state_action += q_value a_meaning = self.int_to_action[a] print('Action meaning:\n{}'.format(a_meaning)) # check if chosen action is null or invalid if self.check_if_end_of_time_slot_decisions(a_meaning, chosen_flows): # agent has either chosen 'null' or an invalid action print('No more scheduling decisions for this time step') print('Chosen flows:\n{}'.format(chosen_flows)) # save current observation and actions chosen action = {'chosen_flows': chosen_flows} self.curr_observation = copy.deepcopy(observation) self.curr_action = copy.deepcopy(action) # update Q_state_action to be estimated mean return per action num_chosen_actions = len(chosen_flows) try: self.Q_state_action /= num_chosen_actions except ZeroDivisionError: # estimated 0 mean return per action self.Q_state_action return action else: # agent action is valid. Allocate lightpath with FF and append to chosen flows chosen_flow = self.get_flow_from_network(a_meaning['server'], a_meaning['queue'], a_meaning['flow_idx']) _, p, c = self.look_for_available_lightpath(chosen_flow, chosen_flows) chosen_flow['path'], chosen_flow['channel'] = p, c chosen_flows.append(chosen_flow) # update agent state as having scheduled this chosen flow state, agent_queues = self.update_agent_state(agent_queues, a_meaning)
[docs] def update_agent_state(self, agent_queues, action): ''' Updates flow=action in agent_queues to having scheduled = 1, returns updated state ''' server = action['server'] queue = action['queue'] flow_idx = action['flow_idx'] # update schedule status of flow in agent queues #print('Agent queue:\n{}'.format(agent_queues[server][queue]['queued_flows'][flow_idx])) agent_queues[server][queue]['queued_flows'][flow_idx]['scheduled'] = [int(1)] #print('Updated agent queues:\n{}'.format(agent_queues)) # update state state = self.gen_state_from_agent_queues(agent_queues) hashable_state=tuple(list(itertools.chain.from_iterable(state))) # merge list of lists return hashable_state, agent_queues
[docs] def process_reward(self, reward, next_observation): ''' Take reward from environment that resulted in action from prev time step and use to learn ''' # save what your estimated value per chosen flow/action was Q_state_action = copy.deepcopy(self.Q_state_action) # get mean value per action for next observation using best actions #saved_epsilon = copy.deepcopy(self.epsilon) # save curr epsilon val #self.epsilon = 0 # choose best actions _ = self.get_action(next_observation) Q_nextstate_nextaction = copy.deepcopy(self.Q_state_action) #self.epsilon = saved_epsilon # return to saved epsilon val # calc td delta td_target = reward + (self.gamma * (Q_nextstate_nextaction)) td_delta = td_target - Q_state_action # go through each action that was chosen and update q value iterables = zip(self.chosen_actions, self.chosen_action_states) for action, state in iterables: self.Q_table[state][action] += self.alpha * td_delta
[docs] def get_agent_state_representation(self, observation): eps = self.SchedulerNetwork.graph['endpoints'] agent_queues = {server: {queue: {'queued_flows': []} for queue in eps if queue != server} for server in eps} for ep in eps: # check queues at each end point... ep_queues = self.SchedulerNetwork.nodes[ep] agent_ep_queue = {ep: {'queued_flows': []}} iterables = zip(ep_queues.values(), ep_queues.keys()) for ep_queue, ep_queue_key in iterables: # for each queue at the selected end point... # calc num flows in queue num_flows_in_queue = len(ep_queue['queued_flows']) encoded_num_flows = self.binary_encode_num_flows_in_queue(num_flows_in_queue, self.max_F) for flow_idx in range(num_flows_in_queue): # for each flow in the selected queue... flow_dict = self.init_paths_and_packets(ep_queue['queued_flows'][flow_idx]) # calc time flow has been in queue so far num_decimals = str(self.slot_size)[::-1].find('.') time_arrived = flow_dict['time_arrived'] curr_time = observation['slot_dict']['lb_time'] time_in_queue = abs(round(curr_time - time_arrived, num_decimals)) encoded_time = self.binary_encode_time_in_queue(time_in_queue, self.max_record_time_in_queue) # calc num flow packets remaining num_packets_left = len(flow_dict['packets']) encoded_num_packets = self.binary_encode_num_packets(num_packets_left, self.max_record_num_packets) # get binary encoded shortest paths shortest_paths = flow_dict['k_shortest_paths'] encoded_paths = self.binary_encode_paths(shortest_paths) # init scheduled status for this time slot scheduled = [0] # define the agent's flow dict agent_flow_dict = {'num_packets_left': encoded_num_packets, 'time_in_queue': encoded_time, 'num_flows_in_queue': encoded_num_flows, 'k_shortest_paths': encoded_paths, 'scheduled': scheduled} # add flow dict to agent's queue agent_queues[ep][ep_queue_key]['queued_flows'].append(agent_flow_dict) # agent_ep_queue[ep]['queued_flows'].append(agent_flow_dict) # update agent's queue state for this end point... #agent_queues[next(key_iter)] = agent_ep_queues state = self.gen_state_from_agent_queues(agent_queues) hashable_state=tuple(list(itertools.chain.from_iterable(state))) # merge list of lists return hashable_state, agent_queues
[docs] def merge_agent_flow_dict(self, agent_flow_dict): ''' Merges flow dict of agent into single array ''' flow_state_array = [] flow_state_array.append(agent_flow_dict['scheduled']) flow_state_array.append(agent_flow_dict['num_packets_left']) flow_state_array.append(agent_flow_dict['time_in_queue']) flow_state_array.append(agent_flow_dict['num_flows_in_queue']) paths = list(itertools.chain.from_iterable(agent_flow_dict['k_shortest_paths'])) flow_state_array.append(paths) flow_state_array=list(itertools.chain.from_iterable(flow_state_array)) # merge list of lists return flow_state_array
[docs] def gen_state_from_agent_queues(self, agent_queues): ''' Uses agent queues to generate state ''' # init state for single flow single_flow_state = list(np.zeros((self.phi,), dtype=int)) #print('len single flow state: {}'.format(len(single_flow_state))) # calc total number of flows in network state space num_flows = self.num_queues * self.max_F # merge agent queue states that have been added into single array state_queue_dict = copy.deepcopy(agent_queues) for ep in state_queue_dict.keys(): queues = state_queue_dict[ep] for queue in queues.keys(): flows = state_queue_dict[ep][queue]['queued_flows'] for idx in range(len(flows)): flows[idx] = self.merge_agent_flow_dict(flows[idx]) #print('len merged flow: {}'.format(len(flows[idx]))) #print('Original agent queues:\n{}'.format(agent_queues)) #print('Merged agent queues:\n{}'.format(state_queue_dict)) # each queue of each server can have a flow, where a flow has self.phi parameters # add initialised single flow state for each flow until have padded out state dict for ep in state_queue_dict.keys(): queues = state_queue_dict[ep] for queue in queues.keys(): flows = state_queue_dict[ep][queue]['queued_flows'] while len(flows) < self.max_F: flows.append(single_flow_state) #print('Padded state dict:\n{}'.format(state_queue_dict)) # convert state dict into matrix state = [] for ep in state_queue_dict.keys(): queues = state_queue_dict[ep] for queue in queues.keys(): flows = state_queue_dict[ep][queue]['queued_flows'] for flow in flows: state.append(flow) state = np.asarray(state) #print('State matrix:\n{}'.format(state)) #print('Size of state matrix: {}'.format(state.shape)) return state
[docs] def get_agent_action(self, state): pass
if __name__ == '__main__': import graphs as g from demand import Demand from routing import RWA from scheduling import SRPT, BASRPT, Agent from simulator import DCN import pickle import networkx as nx import time # config max_F = 2 # max number of flows per queue num_episodes = 100 num_k_paths = 1 slot_size = 0.2 max_time = 100 path_figures = 'figures/' load_demands = 'pickles/demand/10_uniform_demands.obj' # path str or None filehandler = open(load_demands, 'rb') demand = pickle.load(filehandler) graph = demand.Graph edge_dict = nx.get_edge_attributes(graph, 'channels') num_channels = len(list(edge_dict[list(edge_dict.keys())[0]].keys())) # initialise routing agent rwa = RWA(g.gen_channel_names(num_channels), num_k_paths) # initialise scheduling agent #scheduler = SRPT(graph, rwa, slot_size) #scheduler = BASRPT(graph, rwa, slot_size, V=500) scheduler = Agent(graph, rwa, slot_size, max_F, epsilon=0.1, gamma=1.0, alpha=0.5) # initialise dcn simulation environment env = DCN(demand, scheduler, max_F, max_time) # run simulations for episode in range(num_episodes): observation = env.reset(load_demands) while True: print('------------------------------------------------------') print('Time: {}'.format(env.curr_time)) print('Observation:\n{}'.format(observation)) action = scheduler.get_action(observation) print('Action:\n{}'.format(action)) observation, reward, done, info = env.step(action) print('Observation:\n{}'.format(observation)) if done: print('Episode finished.') env.get_scheduling_session_summary() env.print_scheduling_session_summary() env.plot_all_queue_evolution(path_figures+'scheduler/') break