added burst block aggregator

This commit is contained in:
Raphael Bauduin 2015-06-01 16:17:47 -07:00
parent ded9226e2a
commit e2fefd0067
5 changed files with 107 additions and 3 deletions

View File

@ -0,0 +1,7 @@
desc:Simple configuration with two para link
topoType:MultiIf
leftSubnet:10.0.
rightSubnet:10.1.
#path_x:delay,queueSize(may be calc),bw
path_0:10,10,5
path_1:1000,40,5

View File

@ -0,0 +1,5 @@
checkers:
mptcptrace:
tcptrace:
aggregators:
- burst_blocks

14
src/burst_tests/first/xp Normal file
View File

@ -0,0 +1,14 @@
xpType:ncpv
ncClientPort_0:33400
clientPcap:yes
pvRateLimit:64k
ddCount:1000
kpmc:fullmesh
#kpmc:netlink
#upmc:fullmeshIfNeededRate
#upmc_args: -c 500 -i 1000 -t 60000
kpms:fullmesh
pvG:1000000
pvZ:1000000

63
src/mpBurstBlocks.py Normal file
View File

@ -0,0 +1,63 @@
import numpy as np
import os as os
class BurstBlocksAggregator:
def __init__(self, yml, test_name, dest_dir):
# csv_file="c2s_seq_1.csv"
# minimum delay to observe to identify the beginning of a block
self.min_block_sep = 0.1
self.headers = [ "ts", "map_begin", "subflow", "is_seq", "map-end", "is_reinject" ]
self.log = open(dest_dir+"/burst_block_aggregator.log","w")
self.csv_file=dest_dir+"/"+"c2s_seq_1.csv"
self.a = np.genfromtxt (self.csv_file, delimiter=",")
self.blocks=[]
self.times=[]
self.extract_blocks()
self.extract_times()
def c(self,column):
"""Return column index corresponding to name passed as argument"""
return self.headers.index(column)
def extract_blocks(self):
# beginning of block. First block starts at packet 0
b=0
# iteration, we can start at packet 1
i=1
while i<len(self.a):
if self.a[i][self.c("ts")]-self.a[i-1][self.c("ts")]>0.1:
print >>self.log, "previous block:", "{:10.8f}".format(self.a[i-1][self.c("ts")]), "seq:", self.a[i-1][self.c("map_begin")]
print >>self.log, "found block starting at ", "{:10.8f}".format(self.a[i][self.c("ts")]), "seq:", self.a[i][self.c("map_begin")]
print >>self.log, "next block:", "{:10.8f}".format(self.a[i+1][self.c("ts")]), "seq:", self.a[i+1][self.c("map_begin")]
print >>self.log,"--------------------------------------"
# the ranges we use here are inclusive, ie the range contains both elements.
self.blocks.append((b,i-1))
b=i
i=i+1
self.blocks.append((b,i-1))
print >>self.log, "# blocks: ", len(self.blocks)
def extract_times(self):
for i in range(len(self.blocks)):
first,last = self.blocks[i]
t1 = self.a[first][self.c("ts")]
# +1 because our ranges are inclusive
packets = self.a[first:last+1]
j=0
biggest_ack=-1
while j<len(packets):
if packets[j][self.c("is_seq")]==0:
if biggest_ack==-1:
biggest_ack=j
elif packets[j][self.c("map_begin")]>packets[biggest_ack][self.c("map_begin")]:
biggest_ack=j
j=j+1
self.times.append([first, first+biggest_ack, packets[biggest_ack][self.c("ts")] - packets[0][self.c("ts")], packets[0][self.c("ts")], packets[biggest_ack][self.c("ts")]])
self.times = np.array(self.times)
np.set_printoptions(precision=6)
block_times= self.times[:,2]
block_times.sort()
self.block_times=block_times[1:-2]
def __del__(self):
self.log.close()
def __str__(self):
return str(self.block_times) + "\nmean:\t" + str(self.block_times.mean()) +"\nstd:\t"+ str(self.block_times.std())

View File

@ -13,6 +13,7 @@ from subprocess import call
import datetime
# currently all checkers and validations and defined in this file
from mpValidations import *
from mpBurstBlocks import *
from yaml import load, dump
@ -25,16 +26,22 @@ parser.add_option("-t", "--tests", dest="tests_dir",
help="Directory holding tests", metavar="TESTSDIR" , default="./tests")
parser.add_option("-l", "--logs", dest="logs_dir",
help="Directory where to log", metavar="LOGSDIR" , default="./logs")
parser.add_option("-r", "--repeat", dest="repeat", action="store_true",
help="Reuse existing logs", metavar="REPEAT" , default=False)
(options, args) = parser.parse_args()
# initialise flags values
tests_dir=options.tests_dir.rstrip("/")
logs_dir=options.logs_dir.rstrip("/")
repeat = options.repeat
# take timestamp, used as subdirectory in logs_dir
timestamp=datetime.datetime.now().isoformat()
if repeat:
print "not implemented"
timestamp="2015-06-01T14:57:31.617534"
#timestamp = "2015-05-26T15:42:45.419949"
for test_name in [name for name in os.listdir(tests_dir) if os.path.isdir(os.path.join(tests_dir, name))]:
@ -49,7 +56,8 @@ for test_name in [name for name in os.listdir(tests_dir) if os.path.isdir(os.pat
print "Running " + test_dir
# run the experience
MpXpRunner(MpTopo.mininetBuilder, topoFile, xpFile)
if not repeat:
MpXpRunner(MpTopo.mininetBuilder, topoFile, xpFile)
#copy xp, topo and validation to log
copy(topoFile,destDir)
@ -63,16 +71,23 @@ for test_name in [name for name in os.listdir(tests_dir) if os.path.isdir(os.pat
with open(validation_file, 'r') as f:
validations = load(f)
if validations!=None:
for k in validations.keys():
for k in validations["checkers"].keys():
# Identify checker class
name = k.title().replace("_","")+"Checker"
klass= globals()[name]
# instantiate checker with validations and test_name
checker = klass(validations, test_name, destDir)
checker = klass(validations["checkers"], test_name, destDir)
if checker.check():
print checker.logs
else:
print checker.logs
for k in validations["aggregators"]:
# Identify checker class
name = k.title().replace("_","")+"Aggregator"
klass= globals()[name]
# instantiate checker with validations and test_name
agg = klass(validations, test_name, destDir)
print agg