diff --git a/src/burst_tests/first/topo b/src/burst_tests/first/topo new file mode 100644 index 0000000..be0ac21 --- /dev/null +++ b/src/burst_tests/first/topo @@ -0,0 +1,7 @@ +desc:Simple configuration with two para link +topoType:MultiIf +leftSubnet:10.0. +rightSubnet:10.1. +#path_x:delay,queueSize(may be calc),bw +path_0:10,10,5 +path_1:1000,40,5 diff --git a/src/burst_tests/first/validation.yml b/src/burst_tests/first/validation.yml new file mode 100644 index 0000000..893c105 --- /dev/null +++ b/src/burst_tests/first/validation.yml @@ -0,0 +1,5 @@ +checkers: + mptcptrace: + tcptrace: +aggregators: + - burst_blocks diff --git a/src/burst_tests/first/xp b/src/burst_tests/first/xp new file mode 100644 index 0000000..ec29c38 --- /dev/null +++ b/src/burst_tests/first/xp @@ -0,0 +1,14 @@ +xpType:ncpv +ncClientPort_0:33400 +clientPcap:yes +pvRateLimit:64k +ddCount:1000 +kpmc:fullmesh + +#kpmc:netlink +#upmc:fullmeshIfNeededRate +#upmc_args: -c 500 -i 1000 -t 60000 + +kpms:fullmesh +pvG:1000000 +pvZ:1000000 diff --git a/src/conf/xp/4_nc b/src/conf/xp/4_nc index 5740ad4..f2cffd1 100644 --- a/src/conf/xp/4_nc +++ b/src/conf/xp/4_nc @@ -1,5 +1,11 @@ -xpType:nc +xpType:ncpv +rmem:300000 300000 300000 ncClientPort_0:33400 clientPcap:yes -ddCount:15000 -rmem:300000 300000 300000 +pvRateLimit:600k +ddCount:10000 +kpm:fullmesh +kpms:netlink +kpmc:netlink +upmc:fullmesh +#upmc_args: -n 5 diff --git a/src/mpBurstBlocks.py b/src/mpBurstBlocks.py new file mode 100644 index 0000000..11a4e7f --- /dev/null +++ b/src/mpBurstBlocks.py @@ -0,0 +1,194 @@ +import numpy as np +import os as os +import matplotlib.pyplot as plt + +class BurstBlocksAggregator: + def __init__(self, yml, test_name, dest_dir): + # csv_file="c2s_seq_1.csv" + # minimum delay to observe to identify the beginning of a block + self.min_block_sep = 0.3 + self.headers = [ "ts", "map_begin", "subflow", "is_seq", "map_end", "is_reinject" ] + self.log = open(dest_dir+"/burst_block_aggregator.log","w") + self.csv_file=dest_dir+"/"+"c2s_seq_1.csv" + self.a = np.genfromtxt (self.csv_file, delimiter=",") + self.blocks=[] + self.times=[] + self.packets_per_flow=[] + self.flows_ratios = [] + self.subflows=[] + self.flows_ratios=[] + self.extract_blocks() + self.extract_times() + self.extract_flows_packets() + self.extract_flows_ratios() + + def c(self,column): + """Return column index corresponding to name passed as argument""" + return self.headers.index(column) +# def extract_blocks(self): +# # beginning of block. First block starts at packet 0 +# b=0 +# # iteration, we can start at packet 1 +# i=1 +# previous=0 +# while i>self.log, "previous seq packet:", "{:10.8f}".format(self.a[previous][self.c("ts")]), "seq:", self.a[previous][self.c("map_begin")] +# print >>self.log, "found block starting at ", "{:10.8f}".format(self.a[i][self.c("ts")]), "seq:", self.a[i][self.c("map_begin")] +# # we know the start of the block and look for its last packet +# elif self.a[i][self.c("ts")]-self.a[previous][self.c("ts")]>self.min_block_sep: +# print >>self.log, "next block:", "{:10.8f}".format(self.a[i+1][self.c("ts")]), "seq:", self.a[i+1][self.c("map_begin")] +# print >>self.log,"--------------------------------------" +# # the ranges we use here are inclusive, ie the range contains both elements. +# self.blocks.append((b,previous)) +# b=i +# # keep track of previous seq packet +# previous=i +# i=i+1 +# self.blocks.append((b,previous)) +# print >>self.log, "# blocks: ", len(self.blocks) +# detect blocks based on number of bytes sent + +#!!!!!!!!!!!!!!!!!!!!!!!!!!! +# buggy code !!!!!!!!!!!!!!! +#!!!!!!!!!!!!!!!!!!!!!!!!!!! + def extract_blocks(self): + # beginning of block. First block starts at packet 0 + b=0 + # iteration, we can start at packet 1 + i=1 + previous_mod=0 + first_seq=self.a[i][self.c("map_begin")] + previous_seq=None + while iself.a[previous_seq][self.c("map_begin")]: + print >>self.log, "found block beginning at ", "{:10.8f}".format(self.a[i][self.c("ts")]), "end seq:", self.a[i][self.c("map_end")] + print >>self.log,"--------------------------------------" + self.blocks.append((b,previous_seq)) + b = i + # keep track of previous seq packet + #print >>self.log, "recording previous seq at time ", "{:10.8f}".format(self.a[i][self.c("ts")]), "and end_seq:", self.a[i][self.c("map_end")] + if self.a[i][self.c("map_begin")]>self.a[previous_seq][self.c("map_begin")]: + previous_mod=(self.a[i][self.c("map_end")]-self.a[0][self.c("map_begin")])%65536 + previous_seq=i + i=i+1 + print "last block previous_seq = " + str(previous_seq) + print >>self.log, "Rcording last block end at time ", "{:10.8f}".format(self.a[previous_seq][self.c("ts")]), "and end_seq:", self.a[previous_seq][self.c("map_end")] + self.blocks.append((b,previous_seq)) + print self.blocks + print >>self.log, "# blocks: ", len(self.blocks) + def extract_times(self): + for i in range(len(self.blocks)): + print >>self.log, "Block " + str(i) + print >>self.log, "---------------------" + first,last = self.blocks[i] + print >>self.log, "first packet[" + str(first) +"] at:", "{:10.6f}".format(self.a[first][self.c("ts")]), "seq:", self.a[first][self.c("map_begin")] + print >>self.log, "last packet [" + str(last) +"] at :", "{:10.6f}".format(self.a[last][self.c("ts")]), "seq:", self.a[last][self.c("map_begin")] + t1 = self.a[first][self.c("ts")] + # +1 because our ranges are inclusive + packets = self.a[first:last+1] + biggest_seq_index=self.find_biggest_seq_in_block(packets) + biggest_seq = packets[biggest_seq_index][self.c("map_end")] + print >>self.log, "biggest_seq = " + str(biggest_seq) + ack_index, ack_packet=self.find_ack_for_seq(biggest_seq, biggest_seq_index) + print >>self.log, "ack time = " + "{:10.6f}".format(self.a[ack_index][self.c("ts")]) + print >>self.log, "ack index = " + str(ack_index) + print >>self.log, "block time = " + "{:10.6f}".format(ack_packet[self.c("ts")] - packets[0][self.c("ts")]) + self.times.append([first, ack_index, ack_packet[self.c("ts")] - packets[0][self.c("ts")] , packets[0][self.c("ts")], ack_packet[self.c("ts")] ]) + print >>self.log, "############################" + print >>self.log, "---------------------------------------------" + print >>self.log, "block times = " + str(self.times) + self.times = np.array(self.times) + np.set_printoptions(precision=6) + block_times= self.times[:,2] + self.block_times=block_times + # this was to drop the smallest and biggest values from the mean + # block_times.sort() + # self.block_times=block_times[1:-2] + def extract_flows_packets(self): + for i in range(len(self.blocks)): + # will hold number of packets per flow for this block + r={} + print >>self.log, "Block " + str(i) + print >>self.log, "---------------------" + first,last = self.blocks[i] + # +1 because our ranges are inclusive + packets = self.a[first:last+1] + for p in packets: + if p[self.c("is_seq")]==0: + continue + flow = int(p[self.c("subflow")]) + if flow in r.keys(): + r[flow]+=1 + else: + r[flow]=1 + self.packets_per_flow.append(r) + print >>self.log, r + print >>self.log, "############################" + print >>self.log, "---------------------------------------------" + # now set values to 0 as needed for block that didn't send on some subflows + sublist = [ h.keys() for h in self.packets_per_flow] + all_subflows = list( set ( [item for sublist in self.packets_per_flow for item in sublist] )) + self.subflows= all_subflows + for h in self.packets_per_flow: + for f in all_subflows: + if not f in h.keys(): + h[f]=0 + + def extract_flows_ratios(self): + # reset value + self.flows_ratios = [] + # for each block compute the ratio + for count in self.packets_per_flow: + total_packets = sum(count.values()) + h = {} + for s in self.subflows: + h[s]=count[s]/float(total_packets) + self.flows_ratios.append(h) + + + def find_ack_for_seq(self, seq, start_index): + i=start_index + while iseq: + return (i,self.a[i]) + i=i+1 + print "Did not find ack for:" + print "seq : " + str(seq) + return None + + def find_biggest_seq_in_block(self, packets): + biggest_seq=-1 + j=0 + while jpackets[biggest_seq][self.c("map_begin")]: + biggest_seq=j + j=j+1 + print >>self.log, "biggest seq in block: " + str(packets[biggest_seq][self.c("map_end")]) + return biggest_seq + + def __del__(self): + self.log.close() + + def __str__(self): + s = str(self.block_times) + "\nmean:\t" + str(self.block_times.mean()) +"\nstd:\t"+ str(self.block_times.std()) + s+= "\nPackets per flow:\n" + s += str(self.packets_per_flow) + s+= "\nRatio of packets per flow:\n" + s += str(self.flows_ratios) + return s + def plot(self): + ratio1 = plt.plot([ h[1] for h in self.flows_ratios ] , label = "flow 1 ratio") + ratio2 = plt.plot([ h[2] for h in self.flows_ratios ] , label = "flow 2 ratio") + times = plt.plot(self.block_times, label = 'block time' ) + plt.legend(["ratio1", "ratio2", "times"]) + plt.show() diff --git a/src/mpIterator.py b/src/mpIterator.py new file mode 100755 index 0000000..66697af --- /dev/null +++ b/src/mpIterator.py @@ -0,0 +1,142 @@ +#!/usr/bin/python + +# apt-get install python-configglue + +import sys, getopt +from mpXpRunner import MpXpRunner +from mpTopo import MpTopo + +from shutil import copy +import os +from subprocess import call + +import datetime +# currently all checkers and validations and defined in this file +from mpValidations import * +from mpBurstBlocks import * + +from yaml import load, dump + +from optparse import OptionParser + + +# Define supported options +parser = OptionParser() +parser.add_option("-t", "--tests", dest="tests_dir", + help="Directory holding tests", metavar="TESTSDIR" , default="./tests") +parser.add_option("-l", "--logs", dest="logs_dir", + help="Directory where to log", metavar="LOGSDIR" , default="./logs") +parser.add_option("-r", "--repeat", dest="repeat", action="store_true", + help="Reuse existing logs", metavar="REPEAT" , default=False) + +(options, args) = parser.parse_args() + +# initialise flags values +tests_dir=options.tests_dir.rstrip("/") +logs_dir=options.logs_dir.rstrip("/") +repeat = options.repeat + +# take timestamp, used as subdirectory in logs_dir +timestamp=datetime.datetime.now().isoformat() + +if repeat: + print "not implemented" + timestamp="2015-06-01T14:57:31.617534" +#timestamp = "2015-05-26T15:42:45.419949" + +for test_name in [name for name in os.listdir(tests_dir) if os.path.isdir(os.path.join(tests_dir, name))]: + # initialise files defining the experience and test + test_dir = tests_dir + "/" + test_name + xpFile = test_dir+"/xp" + topoFile = test_dir+"/topo" + validation_file=test_dir+"/validation.yml" + destDir=logs_dir+"/"+timestamp+"/"+test_name + if not os.path.exists(destDir): + os.makedirs(destDir) + + print "Running " + test_dir + # run the experience + if not repeat: + MpXpRunner(MpTopo.mininetBuilder, topoFile, xpFile) + + #copy xp, topo and validation to log + copy(topoFile,destDir) + copy(xpFile,destDir) + copy(validation_file,destDir) + #copy log files + for l in ["client.pcap" ,"command.log" ,"upmc.log" ,"upms.log" ,"client.pcap" ,"netcat_server_0.log" ,"netcat_client_0.log"]: + copy(l,destDir) + + # Run validations + with open(validation_file, 'r') as f: + validations = load(f) + if validations!=None: + for k in validations["checkers"].keys(): + # Identify checker class + name = k.title().replace("_","")+"Checker" + klass= globals()[name] + # instantiate checker with validations and test_name + checker = klass(validations["checkers"], test_name, destDir) + if checker.check(): + print checker.logs + else: + print checker.logs + for k in validations["aggregators"]: + # Identify checker class + name = k.title().replace("_","")+"Aggregator" + klass= globals()[name] + # instantiate checker with validations and test_name + agg = klass(validations, test_name, destDir) + print agg + + + + + + +#tcptrace_checker = TcptraceChecker(validations, t ) +#print "WILL VALIDATE" +#tcptrace_checker.check() + +#for v in validations["tcptrace"]: +# print dump(v) + +# /usr/local/bin/mptcptrace -f /tmp/dest/client.pcap -G20 -F3 -r7 -s -S -a + + + +# Here are functions that can be used to generate topo and xp files: +#def write_entry(f, key, val): +# f.write("{}:{}\n".format(key,val)) +# +#def generateTopo(): +# path="/tmp/topo" +# f=open(path,"w") +# # delay, queueSize (in packets), bw +# write_entry(f, "path_0", "10,15,5") +# write_entry(f, "path_1", "10,15,5") +# write_entry(f, "topoType", "MultiIf") +# f.close() +# return path +# +#def generateXp(): +# path="/tmp/xp" +# f=open(path,"w") +# write_entry(f, "xpType", "nc") +# write_entry(f, "kpm", "fullmesh") +# write_entry(f, "kpms", "netlink") +# write_entry(f, "kpmc", "netlink") +# write_entry(f, "upmc", "fullmesh") +## write_entry(f, "upmc_args", "-t 600000 -i 500 -c 7800") +# write_entry(f, "ddCount", "10000") +# write_entry(f, "clientPcap", "yes") +# write_entry(f, "ncClientPort_0", "0:33400") +# write_entry(f, "rmem","300000 300000 300000") +# f.close() +# return path + +#topoFile=generateTopo() +#print(topoFile) +#xpFile=generateXp() +#print(xpFile) + diff --git a/src/mpMptcptraceData.py b/src/mpMptcptraceData.py new file mode 100644 index 0000000..e69eaeb --- /dev/null +++ b/src/mpMptcptraceData.py @@ -0,0 +1,47 @@ +#!/usr/bin/python + + +from subprocess import check_call +import csv + +from io import StringIO +import re +import os +import numpy as np + + + + +class MptcptraceData: + def __init__(self, pcap_file): + self.pcap_file=pcap_file + self.base_dir = os.path.dirname(pcap_file) + working_dir = os.getcwd() + + # generate CSVs + os.chdir(self.base_dir) + print self.base_dir + print os.getcwd() + check_call(["sudo" , "/usr/local/bin/mptcptrace" , "-f", os.path.basename(pcap_file) , "-G20", "-F3", "-r7", "-s", "-S", "-a", "-w2"]) + os.chdir(working_dir) + # accessing the attribute corresponding to the filename will parse the csv and return its cells + def __getattr__(self, name): + csv_file = self.base_dir+"/"+name+".csv" + print "opening csv file " + csv_file + if os.path.isfile(csv_file): + a = np.genfromtxt (csv_file, delimiter=",") + setattr(self, name, a) + return getattr(self,name) + else: + raise AttributeError("No csv file for unknown attribute "+name) + + + # gets cell corresponding to flow with header column + # flow 0 = first one, from 1=subflows + def get_csv(self, name): + if hasattr(self,name): + return getattr(self,name) + else: + return self.__get_attr__(name) + + diff --git a/src/mpTcptraceData.py b/src/mpTcptraceData.py new file mode 100755 index 0000000..99fb2da --- /dev/null +++ b/src/mpTcptraceData.py @@ -0,0 +1,27 @@ +#!/usr/bin/python + + +from subprocess import check_output +import csv + +from io import StringIO +import re +import numpy as np + + + + +class TcptraceData: + def __init__(self, pcap_file): + self.pcap_file=pcap_file + csv_content = check_output(["tcptrace", "-l", "--csv", pcap_file]) + tcptrace_reader = csv.reader(filter(lambda l: len(l)>0 and l[0]!="#",csv_content.splitlines())) + cells=np.array(list(tcptrace_reader)) + #drop header row + cells= cells[1:] + self.cells = cells + self.headers=cells[0] + self.flows=cells[1:] + self.number_of_flows=len(self.flows) + def get_csv(self, name): + return self.cells diff --git a/src/mpValidations.py b/src/mpValidations.py new file mode 100644 index 0000000..563fa99 --- /dev/null +++ b/src/mpValidations.py @@ -0,0 +1,174 @@ + + +from mpTcptraceData import * + +import numpy as np + + +# to get a REPL: +#import code +#code.interact(local=locals()) + + + +# A checker runs tests, and a test is made of multiple validations + + +# For a validation, the value to compare to is the target value from the yaml +# The validation takes place in the validate method, which takes +# as argument a value from which to extract the value to compare or the value itself +class Validation: + def __init__(self, yml): + if "target" in yml: + self.compared=yml["target"] + else: + self.compared=None + def name(self): + return self.__class__.__name__ + def validate(self,value): + raise Exception("Method not implemented") + def setup(self): + raise Exception("Method not implemented") + + +# checks a value passed is greater or equal (generic) +class MinValueValidation(Validation): + def validate(self, value): + self.value = value + return self.compared<=value +# checks a value passed is greater or equal (generic) +class MaxValueValidation(Validation): + def validate(self, value): + self.value = value + return self.compared>=value +# checks a value passed is greater or equal (generic) +class ExactValueValidation(Validation): + def validate(self, value): + self.value = value + return self.compared==value + + +# the method get_tested_value of the tester returns the value passed to validate. +# the CsvTester returns an array of values +class MinDifferenceValidation(Validation): + def validate(self, value): + v = value.flatten() + if len(v)>2: + raise Exception("MinDifferenceValidation requires 2 values maximum, not "+ str(len(v))) + self.value = float(v[1])-float(v[0]) + return self.compared<=self.value +class MinRowsValidation(Validation): + def validate(self, value): + self.value = len(value) + return self.compared<=self.value +class MaxRowsValidation(Validation): + def validate(self, value): + self.value = len(value) + return self.compared>=self.value +class ExactRowsValidation(Validation): + def validate(self, value): + self.value = len(value) + return self.compared==self.value +class MaxRatioValidation(Validation): + def validate(self, value): + v = value.flatten() + if len(v)>2: + raise Exception("MinDifferenceValidation requires 2 values maximum, not "+ str(len(v))) + self.value = float(v[1])/(float(v[0])+float(v[1])) + return self.compared>=self.value +# validates all values passed have increasing values +# it is the Tester's get_tested_value method that does the work +# to extract the values list from the trace. +class IncreasingValuesValidation(Validation): + def validate(self, values): + previous = 0 + for i,v in enumerate(values.flatten()): + #print i, "{:10.6f}".format(previous), "{:10.6f}".format(v) + if vb on flow 1 compared to flow 0." diff --git a/src/tests/base/xp b/src/tests/base/xp new file mode 100644 index 0000000..3a15d5a --- /dev/null +++ b/src/tests/base/xp @@ -0,0 +1,11 @@ +xpType:ncpv +rmem:300000 300000 300000 +ncClientPort_0:33400 +clientPcap:yes +pvRateLimit:600k +ddCount:10000 +kpm:fullmesh +kpms:netlink +kpmc:netlink +upmc:delayOpen +upmc_args: -c 2000 -d