Source code for trafpy.manager.src.simulators.env_analyser

import numpy as np
from sqlitedict import SqliteDict
import os
import shutil
import time
import _pickle as cPickle

[docs]class EnvAnalyser: def __init__(self, env, time_units='a.u.', info_units='a.u.', subject_class_name=None): ''' envs (obj): Environment/simulation object to analyse. time_units (str): Units of time in env simulation (e.g. us). info_units (str): Units of information in simulation (e.g. B). subject_class_name (str): Name of test subject class. Is useful for when come to e.g. plotting multiple envs using EnvsPlotter and want to group analysers into classes/subject names being tested (e.g. test subject 'scheduler_1' vs. test subject 'scheduler_2') across an arbitrary number of tests (e.g. 10 different network loads). ''' self.env = env if subject_class_name is None: self.subject_class_name = env.sim_name else: self.subject_class_name = subject_class_name self.computed_metrics = False self.time_units = time_units self.info_units = info_units
[docs] def compute_metrics(self, measurement_start_time=None, measurement_end_time=None, env_analyser_database_path=None, overwrite=False, print_summary=False): ''' measurement_start_time (int, float): Simulation time at which to begin recording metrics etc.; is the warm-up time measurement_end_time (int, float): Simulation time at which to stop recording metrics etc.; is the cool-down time If overwrite is False and an analyser object exists in env_analyser_database_path, will load previously saved analyser object rather than re-computing everything. To overwrite this previously saved analyser, set overwrite=True. If tmp_database_path is not None, will store data in tmp_database_path str specified. This can help with memory errors as avoids holding everything in RAM memory. ''' print('\nComputing metrics for env {}...'.format(self.env.sim_name)) start = time.time() self.computed_metrics = True self.measurement_start_time = measurement_start_time self.measurement_end_time = measurement_end_time load_prev = False if env_analyser_database_path is not None: # using databases to store in external memory, init database dir env_analyser_database_path += '/env_analyser_database' if os.path.exists(env_analyser_database_path+'/analyser'): if overwrite: print('Overwriting {}...'.format(env_analyser_database_path)) shutil.rmtree(env_analyser_database_path) os.mkdir(env_analyser_database_path) else: load_prev = True print('{} exists and overwrite is False. Loading previously completed analysis...'.format(env_analyser_database_path)) else: if os.path.exists(env_analyser_database_path): shutil.rmtree(env_analyser_database_path) if not load_prev and env_analyser_database_path is not None and not os.path.exists(env_analyser_database_path): os.mkdir(env_analyser_database_path) self.env_analyser_database_path = env_analyser_database_path if load_prev and env_analyser_database_path is not None: self._load_self(path=self.env_analyser_database_path) end = time.time() print('Loaded previously saved analyser object from {} in {} s.'.format(self.env_analyser_database_path, end-start)) else: if self.env.job_centric: self._compute_job_summary() self._compute_flow_summary() self._compute_general_summary() if self.env_analyser_database_path is not None: print('Saving analyser object for env {}...'.format(self.env.sim_name)) s = time.time() self._save_self(path=self.env_analyser_database_path) e = time.time() print('Saved analyser object to {} in {} s.'.format(self.env_analyser_database_path, e-s)) end = time.time() print('Computed metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) if print_summary: print('\n-=-=-=-=-=-=--= Summary -=-=-=-=-=-=-=-') self._print_general_summary() self._print_flow_summary() if self.env.job_centric: self._print_job_summary()
####################################### GENERAL ########################################## def _print_general_summary(self): print('\n ~* General Information *~') print('Simulation name: \'{}\''.format(self.env.sim_name)) print('Measurement duration: {} {} (Start time : {} {} | End time: {} {})'.format(self.measurement_duration, self.time_units, self.measurement_start_time, self.time_units, self.measurement_end_time, self.time_units)) print('Total number of generated demands (jobs or flows) passed to env: {}'.format(self.env.num_demands)) print('Total info arrived: {} {}'.format(self.total_info_arrived, self.info_units)) print('Total info transported: {} {}'.format(self.total_info_transported, self.info_units)) print('Load (abs): {} {}/{}'.format(self.load_abs, self.info_units, self.time_units)) print('Load (frac): {} fraction of network capacity requested.'.format(self.load_frac)) print('Throughput (abs): {} {}/{}'.format(self.throughput_abs, self.info_units, self.time_units)) print('Throughput (frac): {} fraction of arrived info successfully transported.'.format(self.throughput_frac)) print('T-Score: {}'.format(self.t_score)) def _compute_general_summary(self): self.total_info_arrived = self._calc_total_info_arrived() self.total_info_transported = self._calc_total_info_transported() self.load_abs = self._calc_network_load_abs() self.load_frac = self._calc_network_load_frac() self.throughput_abs = self._calc_throughput_abs() self.throughput_frac = self._calc_throughput_frac() self.t_score = self._compute_t_score() def _calc_total_info_arrived(self): if self.env_analyser_database_path is not None: with SqliteDict(self.arrived_flow_dicts) as arrived_flow_dicts: arrived_flows = list(arrived_flow_dicts.values()) arrived_flow_dicts.close() else: arrived_flows = list(self.arrived_flow_dicts.values()) return sum([arrived_flows[i]['size'] for i in range(len(arrived_flows))]) def _calc_total_info_transported(self): if self.env_analyser_database_path is not None: with SqliteDict(self.completed_flow_dicts) as completed_flow_dicts: completed_flows = list(completed_flow_dicts.values()) completed_flow_dicts.close() else: completed_flows = list(self.completed_flow_dicts.values()) return sum([completed_flows[i]['size'] for i in range(len(completed_flows))]) def _calc_network_load_abs(self): '''Calc absolute network load (i.e. is load rate during measurement period).''' return self.total_info_arrived / self.measurement_duration def _calc_network_load_frac(self): '''Calc fraction network load (i.e. is fraction of network capacity requested during measurement period).''' return self.load_abs / self.env.network.graph['max_nw_capacity'] def _calc_throughput_abs(self): return self.total_info_transported / self.measurement_duration def _calc_throughput_frac(self): return self.total_info_transported / self.total_info_arrived ################################## FLOW ################################################ def _print_flow_summary(self): print('\n ~* Flow Information *~') print('Total number of generated flows passed to env (src != dst, dependency_type == \'data_dep\'): {}'.format(self.env.num_flows)) print('Total number of these flows which arrived during measurement period: {}'.format(self.num_arrived_flows)) print('Time first flow arrived: {} {}'.format(self.time_first_flow_arrived, self.time_units)) print('Time last flow arrived: {} {}'.format(self.time_last_flow_arrived, self.time_units)) print('Total number of flows that were completed: {}'.format(self.num_completed_flows)) print('Total number of flows that were left in queue at end of measurement period: {}'.format(self.num_queued_flows)) print('Total number of flows that were dropped (dropped + left in queue at end of measurement period): {}'.format(self.num_dropped_flows)) print('Fraction of arrived flows dropped: {}'.format(self.dropped_flow_frac)) print('Mean flow completion time (FCT): {} {}'.format(self.mean_fct, self.time_units)) print('99th percentile FCT: {} {}'.format(self.nn_fct, self.time_units)) def _compute_t_score(self): '''Returns TrafPy overall T-score.''' if not self.computed_metrics: raise Exception('Must first run compute_metrics() method.') # FCT COMPONENT # collect flow sizes of arrived flows if self.env_analyser_database_path is not None: with SqliteDict(self.arrived_flow_dicts) as arrived_flow_dicts: arrived_flows = list(arrived_flow_dicts.values()) arrived_flow_dicts.close() else: arrived_flows = list(self.arrived_flow_dicts.values()) self.flow_sizes = [flow['size'] for flow in arrived_flows] mean_fct = self.mean_fct std_fct = self.std_fct mean_fct_factor = np.mean(self.flow_sizes) / self.env.network.graph['ep_link_capacity'] std_fct_factor = np.std(self.flow_sizes) / self.env.network.graph['ep_link_capacity'] mean_fct_component = mean_fct_factor / mean_fct std_fct_component = std_fct_factor / std_fct self.fct_component = mean_fct_component + std_fct_component # DROPPED FLOWS COMPONENT num_eps = len(self.env.network.graph['endpoints']) num_queues = num_eps * (num_eps - 1) max_num_flows_in_network = self.env.max_flows * num_queues self.dropped_component = 1 - (self.dropped_flow_frac * max_num_flows_in_network) # THROUGHPUT COMPONENT self.throughput_component = self.throughput_abs / self.env.network.graph['max_nw_capacity'] # T-SCORE # print('fct component: {} | dropped component: {} | throughput_component: {}'.format(fct_component, dropped_component, throughput_component)) t_score = self.fct_component + self.dropped_component + self.throughput_component return t_score def _save_self(self, path): f = open(path+'/analyser', 'wb') cPickle.dump(self.__dict__, f, 2) f.close() def _load_self(self, path): f = open(path+'/analyser', 'rb') tmp_dict = cPickle.load(f) f.close() self.__dict__.update(tmp_dict) def _compute_flow_summary(self): self._compute_flow_arrival_metrics() self._compute_flow_completion_metrics() self._compute_flow_queued_metrics() self._compute_flow_dropped_metrics() if self.env.track_grid_slot_evolution: self._generate_grid_demands_numpy_array() def _generate_grid_demands_numpy_array(self): # collect ep link channel demand info into numpy array grid self.grid_demands = [] for ep in self.env.grid_slot_dict.keys(): for channel in self.env.grid_slot_dict[ep].keys(): self.grid_demands.append(self.env.grid_slot_dict[ep][channel]['demands']) self.grid_demands = np.array([np.array(xi) for xi in self.grid_demands]) # unpack and conv to numpy array # conv grid demands to unique integer ids (for colour coding) unique_id_counter = 0 demand_to_id = {} for demand_idx in range(self.grid_demands.shape[0]): for time_idx in range(self.grid_demands.shape[1]): d = self.grid_demands[demand_idx][time_idx] if d not in demand_to_id.keys(): # not yet encountered demand id, update demand_to_id dict demand_to_id[d] = unique_id_counter unique_id_counter += 1 # update grid_demands self.grid_demands[demand_idx][time_idx] = demand_to_id[d] else: # update grid_demands self.grid_demands[demand_idx][time_idx] = demand_to_id[d] # conv grid elements to ints self.grid_demands = self.grid_demands.astype(int) def _calc_flow_completion_times(self, flow_completion_times): if len(flow_completion_times) == 0: mean_fct, ninetyninth_percentile_fct, max_fct, standard_deviation_fct = float('inf'), float('inf'), float('inf'), float('inf') else: mean_fct = np.average(np.asarray(flow_completion_times)) ninetyninth_percentile_fct = np.percentile(np.asarray(flow_completion_times), 99) max_fct = np.max(np.asarray(flow_completion_times)) standard_deviation_fct = np.std(flow_completion_times) return mean_fct, ninetyninth_percentile_fct, max_fct, standard_deviation_fct def _init_flow_arrival_metrics(self): if self.env_analyser_database_path is not None: # init database self.arrived_flow_dicts = self.env_analyser_database_path + '/arrived_flow_dicts.sqlite' # print('arrived dicts:\n{}'.format(self.arrived_flow_dicts)) if os.path.exists(self.arrived_flow_dicts): os.remove(self.arrived_flow_dicts) times_arrived = [] with SqliteDict(self.arrived_flow_dicts) as arrived_flow_dicts: for key, val in self._get_flows_arrived_in_measurement_period().items(): arrived_flow_dicts[key] = val times_arrived.append(val['time_arrived']) arrived_flow_dicts.commit() arrived_flow_dicts.close() else: # load into memory self.arrived_flow_dicts = self._get_flows_arrived_in_measurement_period() arrived_flows = list(self.arrived_flow_dicts.values()) times_arrived = [arrived_flows[i]['time_arrived'] for i in range(len(arrived_flows))] # print(times_arrived) self.num_arrived_flows = len(times_arrived) self.time_first_flow_arrived = min(times_arrived) self.time_last_flow_arrived = max(times_arrived) def _compute_flow_arrival_metrics(self): print('Computing flow arrival metrics for env {}...'.format(self.env.sim_name)) start = time.time() self._init_flow_arrival_metrics() # self.measurement_duration, self.measurement_start_time, self.measurement_end_time = self._get_flow_measurement_times() if not self.env.job_centric: # compute flow-centric measurement tims self.measurement_duration, self.measurement_start_time, self.measurement_end_time = self._get_measurement_times() else: # demands are jobs therefore measurements times computed by job arrivals pass end = time.time() print('Computed flow arrival metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _compute_flow_completion_metrics(self): print('Computing flow completion metrics for env {}...'.format(self.env.sim_name)) start = time.time() if self.env_analyser_database_path is not None: # init database self.completed_flow_dicts = self.env_analyser_database_path + '/completed_flow_dicts.sqlite' fcts = [] if os.path.exists(self.completed_flow_dicts): os.remove(self.completed_flow_dicts) with SqliteDict(self.completed_flow_dicts) as completed_flow_dicts: for key, val in self._get_flows_completed_in_measurement_period().items(): completed_flow_dicts[key] = val time_arrived, time_completed = val['time_arrived'], val['time_completed'] fct = time_completed - time_arrived fcts.append(fct) completed_flow_dicts.commit() completed_flow_dicts.close() else: # load into memory self.completed_flow_dicts = self._get_flows_completed_in_measurement_period() for flow in self.completed_flow_dicts.values(): fct = flow['time_completed'] - flow['time_arrived'] fcts.append(fct) self.num_completed_flows = len(fcts) self.mean_fct, self.nn_fct, self.max_fct, self.std_fct = self._calc_flow_completion_times(fcts) end = time.time() print('Computed flow completion metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _compute_flow_dropped_metrics(self): print('Computing flow dropped metrics for env {}...'.format(self.env.sim_name)) start = time.time() dropped_flows = self._get_flows_dropped_in_measurement_period() if self.env_analyser_database_path is not None: self.dropped_flow_dicts = self.env_analyser_database_path + '/dropped_flow_dicts.sqlite' with SqliteDict(self.dropped_flow_dicts) as dropped_flow_dicts: for key, val in dropped_flows.items(): dropped_flow_dicts[key] = val dropped_flow_dicts.commit() dropped_flow_dicts.close() else: self.dropped_flow_dicts = dropped_flows self.num_dropped_flows = len(list(dropped_flows.keys())) self.dropped_flow_frac = self.num_dropped_flows / self.num_arrived_flows self.total_info_dropped = 0 for flow in dropped_flows.values(): self.total_info_dropped += flow['size'] self.dropped_info_frac = self.total_info_dropped / self._calc_total_info_arrived() end = time.time() print('Computed flow dropped metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _compute_flow_queued_metrics(self): print('Computing flow queued metrics for env {}...'.format(self.env.sim_name)) start = time.time() queued_flows = self._get_flows_remaining_in_queue_at_end_of_measurement_period() if self.env_analyser_database_path is not None: self.queued_flow_dicts = self.env_analyser_database_path + '/queued_flow_dicts.sqlite' with SqliteDict(self.queued_flow_dicts) as queued_flow_dicts: for key, val in queued_flows.items(): queued_flow_dicts[key] = val queued_flow_dicts.commit() queued_flow_dicts.close() else: self.queued_flow_dicts = queued_flows self.num_queued_flows = len(list(queued_flows.keys())) end = time.time() print('Computed flow queued metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _get_measurement_times(self): if self.measurement_start_time is None: if self.env.job_centric: measurement_start_time = self.time_first_job_arrived else: measurement_start_time = self.time_first_flow_arrived elif self.measurement_start_time == 'auto' and self.measurement_end_time == 'auto': if self.env.job_centric: # both start and end must be assigned simultaneously self.measurement_start_time = 0.1 * self.time_last_job_arrived self.measurement_end_time = self.env.curr_time + self.env.slot_size self._init_job_arrival_metrics() else: # both start and end must be assigned simultaneously self.measurement_start_time = 0.1 * self.time_last_flow_arrived self.measurement_end_time = self.env.curr_time + self.env.slot_size # update arrived demands to be within measurement duration self._init_flow_arrival_metrics() measurement_start_time = self.measurement_start_time measurement_end_time = self.measurement_end_time elif self.measurement_start_time == 'auto' and self.measurement_end_time != 'auto': if self.env.job_centric: self.measurement_start_time = 0.1 * self.time_last_job_arrived self._init_job_arrival_metrics() else: self.measurement_start_time = 0.1 * self.time_last_flow_arrived self._init_flow_arrival_metrics() measurement_start_time = self.measurement_start_time else: measurement_start_time = self.measurement_start_time if self.measurement_end_time is None: measurement_end_time = self.env.curr_time + self.env.slot_size # time sim was ended self.measurement_end_time = measurement_end_time elif self.measurement_end_time == 'auto' and self.measurement_start_time != 'auto': if self.env.job_centric: self.measurement_start_time = self.time_last_job_arrived self._init_job_arrival_metrics() else: self.measurement_start_time = self.time_last_flow_arrived self._init_flow_arrival_metrics() measurement_start_time = self.measurement_start_time else: measurement_end_time = self.measurement_end_time measurement_duration = measurement_end_time - measurement_start_time return measurement_duration, measurement_start_time, measurement_end_time # def _get_flow_measurement_times(self): # if self.measurement_start_time is None: # measurement_start_time = self.time_first_flow_arrived # elif self.measurement_start_time == 'auto' and self.measurement_end_time == 'auto': # # both start and end must be assigned simultaneously # self.measurement_start_time = 0.1 * self.time_last_flow_arrived # self.measurement_end_time = self.time_last_flow_arrived # # update arrived flows to be within measurement duration # self._init_flow_arrival_metrics() # measurement_start_time = self.measurement_start_time # measurement_end_time = self.measurement_end_time # elif self.measurement_start_time == 'auto' and self.measurement_end_time != 'auto': # self.measurement_start_time = 0.1 * self.time_last_flow_arrived # self._init_flow_arrival_metrics() # measurement_start_time = self.measurement_start_time # else: # measurement_start_time = self.measurement_start_time # if self.measurement_end_time is None: # measurement_end_time = self.env.curr_time # time sim was ended # elif self.measurement_end_time == 'auto' and self.measurement_start_time != 'auto': # self.measurement_start_time = self.time_last_flow_arrived # self._init_flow_arrival_metrics() # measurement_start_time = self.measurement_start_time # else: # measurement_end_time = self.measurement_end_time # measurement_duration = measurement_end_time - measurement_start_time # return measurement_duration, measurement_start_time, measurement_end_time def _get_flows_remaining_in_queue_at_end_of_measurement_period(self): queued_flow_dicts = {} # get flows that were dropped during measurement period -> these won't be in queue dropped_flow_dicts = self._get_flows_dropped_in_measurement_period(count_flows_left_in_queue=False) # create dicts to enable efficient hash searching if self.env_analyser_database_path is not None: with SqliteDict(self.completed_flow_dicts) as completed_flow_dicts: completed_flows = list(completed_flow_dicts.values()) completed_flow_dicts.close() else: completed_flows = list(self.completed_flow_dicts.values()) if 'unique_id' in completed_flows[0]: completed_flow_ids = {completed_flows[i]['unique_id']: i for i in range(len(completed_flows))} dropped_flows = list(dropped_flow_dicts.values()) dropped_flow_ids = {dropped_flows[i]['unique_id']: i for i in range(len(dropped_flows))} else: # no unique id included in flow dict completed_flow_ids = {completed_flows[i]['flow_id']: i for i in range(len(completed_flows))} dropped_flows = list(dropped_flow_dicts.values()) dropped_flow_ids = {dropped_flows[i]['flow_id']: i for i in range(len(dropped_flows))} if self.env_analyser_database_path is not None: arrived_flow_dicts = SqliteDict(self.arrived_flow_dicts) else: arrived_flow_dicts = self.arrived_flow_dicts for flow_id, flow in arrived_flow_dicts.items(): if flow_id not in completed_flow_ids and flow_id not in dropped_flow_ids: queued_flow_dicts[flow_id] = flow if self.env_analyser_database_path is not None: arrived_flow_dicts.close() return queued_flow_dicts def _get_flows_dropped_in_measurement_period(self, count_flows_left_in_queue=True): '''Find all flows which arrived during measurement period and were dropped. If count_flows_left_in_queue, will count flows left in queue at end of measurement period as having been dropped. ''' if type(self.env.dropped_flow_dicts) is str: # load database env_dropped_flow_dicts = SqliteDict(self.env.dropped_flow_dicts) else: env_dropped_flow_dicts = self.env.dropped_flow_dicts dropped_flow_dicts = {} if self.measurement_start_time is None and self.measurement_end_time is None: return env_dropped_flow_dicts elif self.measurement_start_time is None and self.measurement_end_time is not None: for flow_id, flow in env_dropped_flow_dicts.items(): arr_time = flow['time_arrived'] if arr_time <= self.measurement_end_time: dropped_flow_dicts[flow_id] = flow else: # cooling down pass elif self.measurement_start_time is not None and self.measurement_end_time is None: for flow_id, flow in env_dropped_flow_dicts.items(): arr_time = flow['time_arrived'] if arr_time >= self.measurement_start_time: dropped_flow_dicts[flow_id] = flow else: # warming up pass else: for flow_id, flow in env_dropped_flow_dicts.items(): arr_time = flow['time_arrived'] if arr_time < self.measurement_start_time: # warming up pass elif arr_time >= self.measurement_start_time and arr_time <= self.measurement_end_time: # measure dropped_flow_dicts[flow_id] = flow else: # cooling down pass if count_flows_left_in_queue: if self.env_analyser_database_path is not None: with SqliteDict(self.queued_flow_dicts) as queued_flow_dicts: for flow in queued_flow_dicts.values(): if 'unique_id' in flow: dropped_flow_dicts[flow['unique_id']] = flow else: # no unique id included in flow dict dropped_flow_dicts[flow['flow_id']] = flow queued_flow_dicts.close() else: for flow in self.queued_flow_dicts.values(): if 'unique_id' in flow: dropped_flow_dicts[flow['unique_id']] = flow else: # no unique id included in flow dict dropped_flow_dicts[flow['flow_id']] = flow if type(self.env.dropped_flow_dicts) is str: env_dropped_flow_dicts.close() return dropped_flow_dicts def _get_flows_completed_in_measurement_period(self): '''Find all flows which arrived during measurement period and were completed.''' if type(self.env.completed_flow_dicts) is str: # load database env_completed_flow_dicts = SqliteDict(self.env.completed_flow_dicts) else: env_completed_flow_dicts = self.env.completed_flow_dicts completed_flow_dicts = {} if self.measurement_start_time is None and self.measurement_end_time is None: return env_completed_flow_dicts elif self.measurement_start_time is None and self.measurement_end_time is not None: for flow_id, flow in env_completed_flow_dicts.items(): comp_time = flow['time_completed'] if comp_time <= self.measurement_end_time: completed_flow_dicts[flow_id] = flow else: # cooling down pass elif self.measurement_start_time is not None and self.measurement_end_time is None: for flow_id, flow in env_completed_flow_dicts.items(): arr_time, comp_time = flow['time_arrived'], flow['time_completed'] if comp_time >= self.measurement_start_time and arr_time >= self.measurement_start_time: completed_flow_dicts[flow_id] = flow else: # warming up pass else: for flow_id, flow in env_completed_flow_dicts.items(): arr_time, comp_time = flow['time_arrived'], flow['time_completed'] # print('flow arr time: {} | meas start time: {} | comp time: {} | meas end time: {}'.format(flow['time_arrived'], self.measurement_start_time, flow['time_completed'], self.measurement_end_time)) if comp_time < self.measurement_start_time or arr_time < self.measurement_start_time: # warming up pass elif comp_time >= self.measurement_start_time and comp_time <= self.measurement_end_time and arr_time >= self.measurement_start_time: # measure completed_flow_dicts[flow_id] = flow elif comp_time > self.measurement_end_time: # cooling down pass if type(self.env.completed_flow_dicts) is str: env_completed_flow_dicts.close() return completed_flow_dicts def _get_flows_arrived_in_measurement_period(self): '''Find flows arrived during measurement period.''' if type(self.env.arrived_flow_dicts) is str: # load database env_arrived_flow_dicts = SqliteDict(self.env.arrived_flow_dicts) else: env_arrived_flow_dicts = self.env.arrived_flow_dicts flows_arrived = {} if self.measurement_start_time is None and self.measurement_end_time is None: return env_arrived_flow_dicts elif self.measurement_start_time == 'auto' and self.measurement_end_time == 'auto': # assume all arrived for now, will update later return env_arrived_flow_dicts elif self.measurement_start_time is None and self.measurement_end_time is not None: for flow_id, flow in env_arrived_flow_dicts.items(): arr_time = flow['time_arrived'] if arr_time <= self.measurement_end_time: flows_arrived[flow_id] = flow else: # cooling down pass elif self.measurement_start_time is not None and self.measurement_end_time is None: for flow_id, flow in env_arrived_flow_dicts.items(): if arr_time >= self.measurement_start_time: flows_arrived[flow_id] = flow else: # warming up pass else: for flow_id, flow in env_arrived_flow_dicts.items(): arr_time = flow['time_arrived'] # print('flow arr time: {} | meas start time: {}'.format(flow['time_arrived'], self.measurement_start_time)) if arr_time < self.measurement_start_time: # warming up pass elif arr_time >= self.measurement_start_time and arr_time <= self.measurement_end_time: # measure flows_arrived[flow_id] = flow elif arr_time > self.measurement_end_time: # cooling down pass if type(self.env.arrived_flow_dicts) is str: env_arrived_flow_dicts.close() return flows_arrived #################################### JOB ################################## def _print_job_summary(self): print('\n ~* Job Information *~') print('Total number of generated jobs passed to env: {}'.format(self.env.num_demands)) print('Total number of these jobs which arrived during measurement period: {}'.format(self.num_arrived_jobs)) print('Time first job arrived: {} {}'.format(self.time_first_job_arrived, self.time_units)) print('Time last job arrived: {} {}'.format(self.time_last_job_arrived, self.time_units)) print('Total number of jobs that were completed: {}'.format(self.num_completed_jobs)) print('Total number of jobs that were left in queue at end of measurement period: {}'.format(self.num_queued_jobs)) print('Total number of jobs that were dropped (dropped + left in queue at end of measurement period): {}'.format(self.num_dropped_jobs)) print('Fraction of arrived jobs dropped: {}'.format(self.dropped_job_frac)) print('Mean job completion time (JCT): {} {}'.format(self.mean_jct, self.time_units)) print('99th percentile JCT: {} {}'.format(self.nn_jct, self.time_units)) def _compute_job_summary(self): self._compute_job_arrival_metrics() self._compute_job_completion_metrics() self._compute_job_queued_metrics() self._compute_job_dropped_metrics() def _compute_job_arrival_metrics(self): print('Computing job arrival metrics for env {}...'.format(self.env.sim_name)) start = time.time() # self.arrived_job_dicts = self._get_jobs_arrived_in_measurement_period(self.measurement_start_time, self.measurement_end_time) # self.job_times_arrived = [self.arrived_job_dicts[i]['time_arrived'] for i in range(len(self.arrived_job_dicts))] # self.num_arrived_jobs = len(self.arrived_job_dicts) self._init_job_arrival_metrics() # self.measurement_duration, self.measurement_start_time, self.measurement_end_time = self._get_job_measurement_times() self.measurement_duration, self.measurement_start_time, self.measurement_end_time = self._get_measurement_times() end = time.time() print('Computed job arrival metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _compute_job_completion_metrics(self): # self.completed_job_dicts = self._get_jobs_completed_in_measurement_period() # self.num_completed_jobs = len(self.completed_job_dicts) print('Computing job completion metrics for env {}...'.format(self.env.sim_name)) start = time.time() if self.env_analyser_database_path is not None: # init database self.completed_job_dicts = self.env_analyser_database_path + '/completed_job_dicts.sqlite' jcts = [] if os.path.exists(self.completed_job_dicts): os.remove(self.completed_job_dicts) with SqliteDict(self.completed_job_dicts) as completed_job_dicts: for key, val in self._get_jobs_completed_in_measurement_period().items(): completed_job_dicts[key] = val time_arrived, time_completed = val['time_arrived'], val['time_completed'] jct = time_completed - time_arrived jcts.append(jct) completed_job_dicts.commit() completed_job_dicts.close() else: # load into memory self.completed_job_dicts = self._get_jobs_completed_in_measurement_period() for job in self.completed_job_dicts.values(): jct = job['time_completed'] - job['time_arrived'] jcts.append(jct) self.num_completed_jobs = len(jcts) self.mean_jct, self.nn_jct, self.max_jct, self.std_jct = self._calc_job_completion_times(jcts) end = time.time() print('Computed job completion metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _compute_job_dropped_metrics(self): # self.dropped_job_dicts = self._get_jobs_dropped_in_measurement_period() # self.num_dropped_jobs = len(self.dropped_job_dicts) print('Computing job dropped metrics for env {}...'.format(self.env.sim_name)) start = time.time() dropped_jobs = self._get_jobs_dropped_in_measurement_period() if self.env_analyser_database_path is not None: self.dropped_job_dicts = self.env_analyser_database_path + '/dropped_job_dicts.sqlite' with SqliteDict(self.dropped_job_dicts) as dropped_job_dicts: for key, val in dropped_jobs.items(): dropped_job_dicts[key] = val dropped_job_dicts.commit() dropped_job_dicts.close() else: self.dropped_job_dicts = dropped_jobs self.num_dropped_jobs = len(list(dropped_jobs.keys())) self.dropped_job_frac = self.num_dropped_jobs / self.num_arrived_jobs # self.total_info_dropped = 0 # for job in dropped_jobs.values(): # self.total_info_dropped += job['size'] # self.dropped_info_frac = self.total_info_dropped / self._calc_total_info_arrived() end = time.time() print('Computed job dropped metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _compute_job_queued_metrics(self): # self.queued_job_dicts = self._get_job_remaining_in_queue_at_end_of_measurement_period() # self.num_queued_jobs = len(self.queued_job_dicts) print('Computing job queued metrics for env {}...'.format(self.env.sim_name)) start = time.time() queued_jobs = self._get_jobs_remaining_in_queue_at_end_of_measurement_period() if self.env_analyser_database_path is not None: self.queued_job_dicts = self.env_analyser_database_path + '/queued_job_dicts.sqlite' with SqliteDict(self.queued_job_dicts) as queued_job_dicts: for key, val in queued_jobs.items(): queued_job_dicts[key] = val queued_job_dicts.commit() queued_job_dicts.close() else: self.queued_job_dicts = queued_jobs self.num_queued_jobs = len(list(queued_jobs.keys())) end = time.time() print('Computed job queued metrics for env {} in {} s.'.format(self.env.sim_name, end-start)) def _get_jobs_remaining_in_queue_at_end_of_measurement_period(self): queued_job_dicts = {} # get jobs that were dropped during measurement period -> these won't be in queue dropped_job_dicts = self._get_jobs_dropped_in_measurement_period(count_jobs_left_in_queue=False) # create dicts to enable efficient hash searching if self.env_analyser_database_path is not None: with SqliteDict(self.completed_job_dicts) as completed_job_dicts: completed_jobs = list(completed_job_dicts.values()) completed_job_dicts.close() else: completed_jobs = list(self.completed_job_dicts.values()) if 'unique_id' in completed_jobs[0]: completed_job_ids = {completed_jobs[i]['unique_id']: i for i in range(len(completed_jobs))} dropped_jobs = list(dropped_job_dicts.values()) dropped_job_ids = {dropped_jobs[i]['unique_id']: i for i in range(len(dropped_jobs))} else: # no unique id included in job dict completed_job_ids = {completed_jobs[i]['job_id']: i for i in range(len(completed_jobs))} dropped_jobs = list(dropped_job_dicts.values()) dropped_job_ids = {dropped_jobs[i]['job_id']: i for i in range(len(dropped_jobs))} if self.env_analyser_database_path is not None: arrived_job_dicts = SqliteDict(self.arrived_job_dicts) else: arrived_job_dicts = self.arrived_job_dicts for job_id, job in arrived_job_dicts.items(): if job_id not in completed_job_ids and job_id not in dropped_job_ids: queued_job_dicts[job_id] = job if self.env_analyser_database_path is not None: arrived_job_dicts.close() return queued_job_dicts def _get_jobs_arrived_in_measurement_period(self): '''Find jobs arrived during measurement period.''' if type(self.env.arrived_job_dicts) is str: # load database env_arrived_job_dicts = SqliteDict(self.env.arrived_job_dicts) else: env_arrived_job_dicts = self.env.arrived_job_dicts jobs_arrived = {} if self.measurement_start_time is None and self.measurement_end_time is None: return env_arrived_job_dicts elif self.measurement_start_time == 'auto' and self.measurement_end_time == 'auto': # assume all arrived for now, will update later return env_arrived_job_dicts elif self.measurement_start_time is None and self.measurement_end_time is not None: for job_id, job in env_arrived_job_dicts.items(): arr_time = job['time_arrived'] if arr_time <= self.measurement_end_time: jobs_arrived[job_id] = job else: # cooling down pass elif self.measurement_start_time is not None and self.measurement_end_time is None: for job_id, job in env_arrived_job_dicts.items(): arr_time = job['time_arrived'] if arr_time >= self.measurement_start_time: jobs_arrived[job_id] = job else: # warming up pass else: for job_id, job in env_arrived_job_dicts.items(): arr_time = job['time_arrived'] if arr_time < self.measurement_start_time: # warming up pass elif arr_time >= self.measurement_start_time and arr_time <= self.measurement_end_time: # measure jobs_arrived[job_id] = job elif arr_time > self.measurement_end_time: # cooling down pass if type(self.env.arrived_job_dicts) is str: env_arrived_job_dicts.close() return jobs_arrived def _get_jobs_completed_in_measurement_period(self): '''Find all jobs which arrived during measurement period and were completed.''' if type(self.env.completed_job_dicts) is str: # load database env_completed_job_dicts = SqliteDict(self.env.completed_job_dicts) else: env_completed_job_dicts = self.env.completed_job_dicts completed_job_dicts = {} if self.measurement_start_time is None and self.measurement_end_time is None: return env_completed_job_dicts elif self.measurement_start_time is None and self.measurement_end_time is not None: for job_id, job in env_completed_job_dicts.items(): comp_time = job['time_completed'] if comp_time <= self.measurement_end_time: completed_job_dicts[job_id] = job else: # cooling down pass elif self.measurement_start_time is not None and self.measurement_end_time is None: for job_id, job in env_completed_job_dicts.items(): arr_time, comp_time = job['time_arrived'], job['time_completed'] if comp_time >= self.measurement_start_time and arr_time >= self.measurement_start_time: completed_job_dicts[job_id] = job else: # warming up pass else: for job_id, job in env_completed_job_dicts.items(): arr_time, comp_time = job['time_arrived'], job['time_completed'] if comp_time < self.measurement_start_time or arr_time < self.measurement_start_time: # warming up pass elif comp_time >= self.measurement_start_time and comp_time <= self.measurement_end_time and arr_time >= self.measurement_start_time: # measure completed_job_dicts[job_id] = job elif comp_time > self.measurement_end_time: # cooling down pass if type(self.env.completed_job_dicts) is str: env_completed_job_dicts.close() return completed_job_dicts def _calc_job_completion_times(self, job_completion_times): if len(job_completion_times) == 0: mean_jct, ninetyninth_percentile_jct, max_jct, standard_deviation_jct = float('inf'), float('inf'), float('inf'), float('inf') else: mean_jct = np.average(np.asarray(job_completion_times)) ninetyninth_percentile_jct = np.percentile(np.asarray(job_completion_times), 99) max_jct = np.max(np.asarray(job_completion_times)) standard_deviation_jct = np.std(job_completion_times) return mean_jct, ninetyninth_percentile_jct, max_jct, standard_deviation_jct def _get_jobs_dropped_in_measurement_period(self, count_jobs_left_in_queue=True): '''Find all jobs which arrived during measurement period and were dropped. If count_jobs_left_in_queue, will count jobs left in queue at end of measurement period as having been dropped. ''' if type(self.env.dropped_job_dicts) is str: # load database env_dropped_job_dicts = SqliteDict(self.env.dropped_job_dicts) else: env_dropped_job_dicts = self.env.dropped_job_dicts dropped_job_dicts = {} if self.measurement_start_time is None and self.measurement_end_time is None: return env_dropped_job_dicts elif self.measurement_start_time is None and self.measurement_end_time is not None: for job_id, job in env_dropped_job_dicts.items(): arr_time = job['time_arrived'] if arr_time <= self.measurement_end_time: dropped_job_dicts[job_id] = job else: # cooling down pass elif self.measurement_start_time is not None and self.measurement_end_time is None: for job_id, job in env_dropped_job_dicts.items(): arr_time = job['time_arrived'] if arr_time >= self.measurement_start_time: dropped_job_dicts[job_id] = job else: # warming up pass else: for job_id, job in env_dropped_job_dicts.items(): arr_time = job['time_arrived'] if arr_time < self.measurement_start_time: # warming up pass elif arr_time >= self.measurement_start_time and arr_time <= self.measurement_end_time: # measure dropped_job_dicts[job_id] = job else: # cooling down pass if count_jobs_left_in_queue: if self.env_analyser_database_path is not None: with SqliteDict(self.queued_job_dicts) as queued_job_dicts: for job in queued_job_dicts.values(): if 'unique_id' in job: dropped_job_dicts[job['unique_id']] = job else: # no unique id included in job dict dropped_job_dicts[job['job_id']] = job queued_job_dicts.close() else: for job in self.queued_job_dicts.values(): if 'unique_id' in job: dropped_job_dicts[job['unique_id']] = job else: # no unique id included in job dict dropped_job_dicts[job['job_id']] = job if type(self.env.dropped_job_dicts) is str: env_dropped_job_dicts.close() return dropped_job_dicts def _init_job_arrival_metrics(self): if self.env_analyser_database_path is not None: # init database self.arrived_job_dicts = self.env_analyser_database_path + '/arrived_job_dicts.sqlite' if os.path.exists(self.arrived_job_dicts): os.remove(self.arrived_job_dicts) times_arrived = [] with SqliteDict(self.arrived_job_dicts) as arrived_job_dicts: for key, val in self._get_jobs_arrived_in_measurement_period().items(): arrived_job_dicts[key] = val times_arrived.append(val['time_arrived']) arrived_job_dicts.commit() arrived_job_dicts.close() else: # load into memory self.arrived_job_dicts = self._get_jobs_arrived_in_measurement_period() arrived_jobs = list(self.arrived_job_dicts.values()) times_arrived = [arrived_jobs[i]['time_arrived'] for i in range(len(arrived_jobs))] self.num_arrived_jobs = len(times_arrived) self.time_first_job_arrived = min(times_arrived) self.time_last_job_arrived = max(times_arrived)
# def _get_job_measurement_times(self): # if self.measurement_start_time is None: # measurement_start_time = self.time_first_job_arrived # elif self.measurement_start_time == 'auto' and self.measurement_end_time == 'auto': # # both start and end must be assigned simultaneously # self.measurement_start_time = 0.1 * self.time_last_job_arrived # self.measurement_end_time = self.time_last_job_arrived # # update arrived jobs to be within measurement duration # self._init_job_arrival_metrics() # measurement_start_time = self.measurement_start_time # measurement_end_time = self.measurement_end_time # elif self.measurement_start_time == 'auto' and self.measurement_end_time != 'auto': # self.measurement_start_time = 0.1 * self.time_last_job_arrived # self._init_job_arrival_metrics() # measurement_start_time = self.measurement_start_time # else: # measurement_start_time = self.measurement_start_time # if self.measurement_end_time is None: # measurement_end_time = self.env.curr_time # time sim ended # elif self.measurement_end_time == 'auto' and self.measurement_start_time != 'auto': # self.measurement_start_time = self.time_last_job_arrived # self._init_job_arrival_metrics() # measurement_start_time = self.measurement_start_time # else: # measurement_end_time = self.measurement_end_time # measurement_duration = measurement_end_time - measurement_start_time # print('measurement end time: {} | curr sim time: {}'.format(self.measurement_end_time, self.env.curr_time)) # return measurement_duration, measurement_start_time, measurement_end_time