Source code for trafpy.benchmarker.versions.benchmark

import trafpy
from trafpy.generator.src.tools import load_data_from_json, save_data_as_json

import abc
import os
from pathlib import Path
import json
import numpy as np


[docs]class Benchmark(abc.ABC): def __init__(self, benchmark_name, benchmark_version='v001', load_prev_dists=True, jobcentric=False): ''' Args: benchmark_name (str): Name of benchmark (e.g. 'university') benchmark_version (str): TrafPy benchmark version to access (e.g. 'v001'). load_prev_dists (bool): If True, will generate a new benchmark distribution for the network(s) you provide the imported. This is needed if you have a network with a different number of endpoints or with different end point labels. If False, will load the exact same distributions as was previously defined, which is needed if you want to use the exact same benchmark distribution multiple times. jobcentric (bool): Whether or not the benchmark traffic is job-centric (True) or flow-centric (False). ''' self.benchmark_name = benchmark_name self.benchmark_version = benchmark_version self.load_prev_dists = load_prev_dists self.jobcentric = jobcentric trafpy_path = os.path.dirname(trafpy.__file__) # self.benchmark_path = trafpy_path + '/benchmarker/versions/benchmark_{}/data/{}/'.format(benchmark_version, benchmark_name) self.benchmark_path = trafpy_path + '/benchmarker/versions/benchmark_{}/benchmarks/{}/'.format(benchmark_version, benchmark_name) Path(self.benchmark_path).mkdir(exist_ok=True) if load_prev_dists: print('Set to load benchmark {} distribution data from {}'.format(benchmark_name, self.benchmark_path)) else: print('Set to save benchmark {} distribution data to {}'.format(benchmark_name, self.benchmark_path))
[docs] def load_dist(self, benchmark_name, dist_name): '''Loads previously saved distribution data for given benchmark.''' # check if dists already previously saved path = self.benchmark_path+'{}.json'.format(dist_name) if os.path.exists(path): dist_data = json.loads(load_data_from_json(path_to_load=path, print_times=False)) if type(dist_data) == dict: # convert keys (rand var unique values) from str to float dist_data = {float(key): dist_data[key] for key in dist_data.keys()} else: # convert list to numpy array dist_data = np.asarray(dist_data) print('Loaded {} distribution data from {}'.format(dist_name, path)) else: dist_data = None return dist_data, path
[docs] def get_dist_and_path(self, dist_name): '''Gets distribution data and corresponding path. If distribution data does not exist, will return dist=None and the path will be where the dist data should be saved if it is generated. ''' dist_data = None if self.load_prev_dists: # load dist and get path dist saved in dist_data, path = self.load_dist(self.benchmark_name, dist_name=dist_name) else: # just get path for saving path = self.benchmark_path+'{}.json'.format(dist_name) return dist_data, path
[docs] def save_dist(self, dist_data, dist_name): '''Saves distribution data for a given benchmark.''' save_data_as_json(path_to_save=self.benchmark_path+'{}.json'.format(dist_name), data=dist_data, overwrite=True, print_times=False) print('Saved {} distribution data to {}'.format(dist_name, self.benchmark_path))
[docs] @abc.abstractmethod def get_node_dist(self, eps, racks_dict, dist_name='node_dist'): '''Loads previously saved node dist (if it exists). This is an abstract method and therefore must be defined by any child class. Args: eps (list): List of network end points. racks_dict (dict): Dict mapping racks to the corresponding end points contained within each rack. dist_name (str): Name of distribution (determines path to search for previously saved distribution). ''' dist, path = self.get_dist_and_path(dist_name) if dist is not None: if len(eps) != len(dist): raise Exception('You provided len(eps)={} end points but the node distribution used has len(node_dist)={} end points. This is likely because you have left load_prev_dists=True but you are now trying to generate traffic for a network with a different number of end points. Set load_prev_dists=False or ensure len(eps) == len(node_dist)'.format(len(eps), len(dist))) return dist, path
[docs] @abc.abstractmethod def get_interarrival_time_dist(self, dist_name='interarrival_time_dist'): '''Loads previously saved interarrival time dist (if it exists). This is an abstract method and therefore must be defined by any child class. Args: dist_name (str): Name of distribution (determines path to search for previously saved distribution). ''' dist, path = self.get_dist_and_path(dist_name) return dist, path
[docs] @abc.abstractmethod def get_flow_size_dist(self, dist_name='flow_size_dist'): '''Loads previously saved flow size dist (if it exists). This is an abstract method and therefore must be defined by any child class. Args: dist_name (str): Name of distribution (determines path to search for previously saved distribution). ''' dist, path = self.get_dist_and_path(dist_name) return dist, path
[docs] def get_num_ops_dist(self, dist_name='num_ops_dist'): '''Loads previously saved number of operations dist (if it exists). This method only needs to be defined by a child class if jobcentric=True, since flow-centric data have no notion of 'number of operations' (in relation to job DAGs). Args: dist_name (str): Name of distribution (determines path to search for previously saved distribution). ''' if self.jobcentric: # must implement this method if jobcentric raise NotImplementedError('If jobcentric==True, must implement get_num_ops_dist method') else: # no need to implement for flow centric pass dist, path = self.get_dist_and_path(dist_name) return dist, path