from trafpy.manager.src.schedulers.schedulertoolbox import SchedulerToolbox, SchedulerToolbox_v2
import numpy as np
import networkx as nx
import copy
import math
import sys
import matplotlib.pyplot as plt
from collections import defaultdict
import time
import itertools
import time
import pandas as pd
from tabulate import tabulate
import json
[docs]class SRPT(SchedulerToolbox):
def __init__(self, Graph, RWA, slot_size, packet_size=300, time_multiplexing=True, debug_mode=False, scheduler_name='srpt'):
super().__init__(Graph, RWA, slot_size, packet_size, time_multiplexing, debug_mode)
self.scheduler_name = scheduler_name
[docs] def get_scheduler_action(self, observation):
'''
Uses observation and chosen rwa action(s) to construct schedule for this
timeslot
'''
# update scheduler network & new flow states
self.update_network_state(observation, hide_child_dependency_flows=True)
# choose which flows to schedule for this time slot
chosen_flows = []
for ep in self.SchedulerNetwork.graph['endpoints']:
queues = self.SchedulerNetwork.nodes[ep]
for queue in queues.keys():
queued_flows = queues[queue]['queued_flows']
completion_times = queues[queue]['completion_times']
num_queued_flows = len(queues[queue]['queued_flows'])
if num_queued_flows == 0:
# no flows queued, move to next queue
continue
else:
# queued flows present
chosen_flow, _ = self.find_shortest_flow_in_queue(queued_flows,completion_times)
if self.debug_mode:
print('\nAttempting to establish flow {}'.format(chosen_flow))
# check for contentions
contending_flows = [None]
contending_flow = None
establish_flow = False
if len(chosen_flows) != 0:
establish_flow, p, c, packets_this_slot = self.look_for_available_lightpath(chosen_flow,chosen_flows)
chosen_flow['path'], chosen_flow['channel'], chosen_flow['packets_this_slot'] = p, c, packets_this_slot
if not establish_flow:
contending_flows,contending_flow,contending_flow_fct,p,c,packets_this_slot = self.find_contending_flow(chosen_flow,chosen_flows)
chosen_flow['path'], chosen_flow['channel'], chosen_flow['packets_this_slot'] = p, c, packets_this_slot
comp_time, _ = self.estimate_time_to_completion(chosen_flow)
if contending_flow_fct > comp_time:
# new choice has lower fct that established flow
establish_flow = True
else:
# established flow has lower fct, do not choose
pass
else:
# rwa was completed
pass
else:
# no existing chosen flows yet, can choose flow
establish_flow = True
if establish_flow:
if self.debug_mode:
print('Chosen flow {} can be established'.format(chosen_flow))
for contending_flow in contending_flows:
try:
chosen_flows.remove(contending_flow)
self.take_down_connection(contending_flow)
except (NameError, ValueError):
# already not present
pass
chosen_flows.append(chosen_flow)
self.set_up_connection(chosen_flow)
else:
if self.debug_mode:
print('Chosen flow could not be established')
# contention was found and lost
pass
if self.debug_mode:
print('Channel capacity status after finished choosing actions:')
net = self.SchedulerNetwork
for edge in net.edges:
for channel in net[edge[0]][edge[1]]['channels']:
# reset channel capacity
chan_cap_available = self.get_channel_bandwidth(edge, channel)
chan_cap_max = net[edge[0]][edge[1]]['max_channel_capacity']
print('Available {} capacity for {}: {} / {}'.format(channel, edge, chan_cap_available, chan_cap_max))
return chosen_flows
[docs] def get_action(self, observation, print_processing_time=False):
# scheduler action
self.time_get_action_start = time.time()
chosen_flows = self.get_scheduler_action(observation)
action = {'chosen_flows': chosen_flows}
self.time_get_action_end = time.time()
if print_processing_time:
self.display_get_action_processing_time()
return action
[docs] def display_get_action_processing_time(self, num_decimals=8):
get_action_time = self.time_get_action_end - self.time_get_action_start
# create table
summary_dict = {'Get Action': [round(get_action_time, num_decimals)]}
df = pd.DataFrame(summary_dict)
print('')
print(tabulate(df, showindex=False, headers='keys', tablefmt='psql'))
[docs] def find_contending_flow(self, chosen_flow, chosen_flows):
'''
Goes through chosen flow possible path & channel combinations &
compares to path-channel combinations in chosen flows. Saves all
contentions that arise. When all possible contentions have been checked,
finds the 'most contentious' (i.e. shortest flow completion time) in
chosen_flows and returns this as the contending flow (since other
flows in contending_flows will have a higher FCT than this most contentious flow
and therefore if the chosen flow has a lower FCT than the most contentious flow,
it will also have a lower FCT than all competing flows and therefore should
replace all contending flows)
'''
contending_flows = self.find_all_contending_flows(chosen_flow, chosen_flows, cost_metric='fct')
chosen_path, chosen_channel, contending_flow, contending_flow_cost, contending_flows_list = self.choose_channel_and_path_using_contending_flows(contending_flows)
# find number of packets that would be able to schedule for chosen flow if successful
# can schedule up to the number of packets removed by removing contending flows
# OR up to lowest non-0 bandwidth edge on chosen flow's path (whichever is lowest
# is the limiting factor determining the number of packets which can be scheduled)
# 1. if all edges were empty, what would be the maximum possible number of packets that could transfer this time slot?
max_packets_available_if_all_edges_empty = self.get_maximum_packets_available_if_all_edges_empty(chosen_flow, chosen_path)
# 2. how many packets can be transferred on chosen path edges which are not in the contending path edges (and therefore will not change even if contending flows are dropped)?
packets_available_outside_contending_edges = self.get_packets_available_outside_contending_edges(chosen_flow, chosen_path, chosen_channel, contending_flows_list)
# 3. if all contending flows were dropped, how many packets would be made AVAILABLE to be scheduled for the chosen flow?
packets_available_if_drop_all_contending_flows = self.get_packets_available_if_drop_all_contending_flows(chosen_flow, chosen_path, chosen_channel, contending_flows_list)
# 4. given the constraints of 1. the flow's remaining packets and 2. the bandwidth constraints of edges outside the contending edges, what is the maximum number of packets that the flow could schedule this time slot?
max_packets_requested_by_chosen_flow = self.get_maximum_packets_requestable_by_flow(chosen_flow, max_packets_available_if_all_edges_empty, packets_available_outside_contending_edges)
# 5. if drop all contending flows and schedule this chosen flow, how many packets would actually end up being SCHEDULED for this flow?
packets_scheduled_if_drop_all_contending_flows = min(max_packets_available_if_all_edges_empty, packets_available_if_drop_all_contending_flows)
# print('Max packets requested: {} | Packets that would be scheduled if drop all contending flows: {}'.format(max_packets_requested_by_chosen_flow, packets_scheduled_if_drop_all_contending_flows))
# to make scheduler maximise its utility of available channel bandwidth, make sure
# that would only drop the minimum number of necessery contending flows to allow the chosen flow to
# be scheduled
if max_packets_requested_by_chosen_flow < packets_scheduled_if_drop_all_contending_flows:
contending_flow, contending_flow_cost, contending_flows_list, packets_scheduled_if_drop_all_contending_flows = self.select_minimum_number_of_contending_flows_to_drop(chosen_flow, chosen_path, chosen_channel, contending_flows_list, max_packets_requested_by_chosen_flow, max_packets_available_if_all_edges_empty, 'fct')
num_packets_this_slot = min(max_packets_requested_by_chosen_flow, packets_scheduled_if_drop_all_contending_flows)
return contending_flows_list, contending_flow, contending_flow_cost, chosen_path, chosen_channel, num_packets_this_slot
[docs]class SRPT_v2:
def __init__(self,
Graph,
RWA,
slot_size,
packet_size=300,
time_multiplexing=True,
debug_mode=False,
scheduler_name='SRPT'):
# DEBUG
# debug_mode = True
self.debug_mode = debug_mode
self.toolbox = SchedulerToolbox_v2(Graph=Graph,
RWA=RWA,
slot_size=slot_size,
packet_size=packet_size,
time_multiplexing=time_multiplexing,
debug_mode=debug_mode)
self.scheduler_name = scheduler_name
self.resolution_strategy = 'cost'
[docs] def get_action(self, observation, print_processing_time=False):
chosen_flows = self.get_scheduler_action(observation)
return {'chosen_flows': chosen_flows}
[docs] def cost_function(self, flow):
'''SRPT cost function.'''
return self.toolbox.estimate_time_to_completion(flow)
[docs] def get_scheduler_action(self, observation, reset_channel_capacities=True, path_channel_assignment_strategy='fair_share_num_flows'):
if self.debug_mode:
print('\n\n\n---------------- GET SCHEDULER ACTION -------------------')
# update network state
self.toolbox.update_network_state(observation, reset_channel_capacities=reset_channel_capacities, hide_child_dependency_flows=True)
# collect useful flow info dicts for making scheduling decisions
flow_info = self.toolbox.collect_flow_info_dicts(path_channel_assignment_strategy=path_channel_assignment_strategy, cost_function=self.cost_function)
# allocate flows by order of cost (lowest cost flows prioritised first)
scheduling_info, cost_info = self.toolbox.allocate_available_bandwidth(flow_info, resolution_strategy=self.resolution_strategy)
# collect chosen flows and corresponding packets to schedule for the chosen flows
chosen_flows = []
for flow_id in flow_info['queued_flows'].keys():
if flow_id not in scheduling_info['flow_id_to_packets_to_schedule_per_edge'] or len(scheduling_info['flow_id_to_packets_to_schedule_per_edge'][flow_id]) == 0:
# flow was not chosen to be scheduled on any edge
pass
else:
# flow was chosen to be scheduled on an edge
flow = flow_info['queued_flows'][flow_id]
# packets to schedule for each flow limited by the edge in their path with lowest number of schedulable packets
flow['packets_this_slot'] = min(scheduling_info['flow_id_to_packets_to_schedule_per_edge'][flow_id])
# check that flow was also selected to be scheduled on a bandwidth-limited end point link (if it wasn't, cannot schedule this flow)
lowest_edge_bandwidth = self.toolbox.get_lowest_edge_bandwidth(flow['path'])
info_to_transfer_this_slot = flow['packets_this_slot'] * flow['packet_size']
bandwidth_requested = info_to_transfer_this_slot / self.toolbox.slot_size # info units of this flow transferred this time slot == capacity used on each channel in flow's path this time slot
if bandwidth_requested > lowest_edge_bandwidth:
# flow must only have been selected for bandwidth-limiting links and not been given any bandwidth-limited capacity, do not schedule flow
pass
else:
# flow must have been allocated bandwidth on at least one end point link, check for contentions and try to establish flow
chosen_flows = self.toolbox.resolve_contentions_and_set_up_flow(flow, chosen_flows, flow_info, scheduling_info, cost_info, resolution_strategy=self.resolution_strategy)
# DEBUG
if self.debug_mode:
print('~~~ Final Choices ~~~')
print('chosen flows:\n{}'.format(chosen_flows))
edge_to_chosen_flows = {edge: [] for edge in flow_info['requested_edges'].keys()}
for flow in chosen_flows:
edges = self.toolbox.get_path_edges(flow['path'])
for edge in edges:
edge_to_chosen_flows[json.dumps(edge)].append(flow['flow_id'])
for edge in self.toolbox.network.edges:
for channel in self.toolbox.rwa.channel_names:
# src-dst
bw = self.toolbox.get_channel_bandwidth(edge, channel)
try:
print('edge: {} | channel: {} | chosen flows: {} | bandwidth remaining: {}'.format(edge, channel, edge_to_chosen_flows[json.dumps(edge)], bw))
except KeyError:
# no flows chosen on this edge
print('edge: {} | channel: {} | bandwidth remaining: {}'.format(edge, channel, bw))
# dst-src
edge = edge[::-1]
bw = self.toolbox.get_channel_bandwidth(edge, channel)
try:
print('edge: {} | channel: {} | chosen flows: {} | bandwidth remaining: {}'.format(edge, channel, edge_to_chosen_flows[json.dumps(edge)], bw))
except KeyError:
# no flows chosen on this edge
print('edge: {} | channel: {} | bandwidth remaining: {}'.format(edge, channel, bw))
return chosen_flows