// src/mtp/examples/fat-tree-mtp.cc// 1) initialize mtpMtpInterface::Enable(conf::thread);// 2) then baseClass changesSimulator::Run();// <-- simulation starts here// from ClassSimulator to ClassMultithreadedSimulator// equals to: MultithreadedSimulator::Run()
Now we are calling MultithreadedSimulator::Run(), we come to src/mtp/model/multithreaded-simulator-impl.cc.
C
1 2 3 4 5 6 7 8 910111213
// src/mtp/model/multithreaded-simulator-impl.ccvoidMultithreadedSimulatorImpl::Run(){NS_LOG_FUNCTION(this);// auto partitionif(m_partition){Partition();}MtpInterface::Run();}
Partition(): 4.2 Fine-Grained Partition
Now we are calling MtpInterface::Run(), we come to src/mtp/model/mtp-interface.cc.
C
1 2 3 4 5 6 7 8 910111213
// src/mtp/model/mtp-interface.ccvoidMtpInterface::Run(){RunBefore();// calculate lookahead and create threadswhile(!g_globalFinished){ProcessOneRound();// everything in a roundCalculateSmallestTime();// get the smallest time}RunAfter();}
// TODO(bxhu): refer to paper - spatial partitionvoidMtpInterface::ProcessOneRound(){// assign logical process to threads// determine the priority of logical processesif(g_sortFunc!=nullptr&&g_round++%g_period==0){std::sort(g_sortedSystemIndices,g_sortedSystemIndices+g_systemCount,g_sortFunc);}// == LPs are sorted well by priority. Now we can process them in order. ==// stage 1: process eventsg_recvMsgStage=false;g_finishedSystemCount.store(0,std::memory_order_relaxed);g_systemIndex.store(0,std::memory_order_release);// main thread also needs to process an LP to reduce an extra thread overheadwhile(true){// generate increasing indexuint32_tindex=g_systemIndex.fetch_add(1,std::memory_order_acquire);// if index is out of range, break the loopif(index>=g_systemCount){break;}LogicalProcess*system=&g_systems[g_sortedSystemIndices[index]];// current LPsystem->ProcessOneRound();// process current LP// increase the finished system countg_finishedSystemCount.fetch_add(1,std::memory_order_release);}// logical process barriar synchronizationwhile(g_finishedSystemCount.load(std::memory_order_acquire)!=g_systemCount){};// stage 2: process the public LPg_systems[0].ProcessOneRound();// stage 3: receive messages// principle here is the same as stage 1.g_recvMsgStage=true;g_finishedSystemCount.store(0,std::memory_order_relaxed);g_systemIndex.store(0,std::memory_order_release);while(true){uint32_tindex=g_systemIndex.fetch_add(1,std::memory_order_acquire);if(index>=g_systemCount){break;}LogicalProcess*system=&g_systems[g_sortedSystemIndices[index]];system->ReceiveMessages();g_finishedSystemCount.fetch_add(1,std::memory_order_release);}// logical process barriar synchronizationwhile(g_finishedSystemCount.load(std::memory_order_acquire)!=g_systemCount){};}
MtpInterface::ProcessOneRound(): 4.1 System Overview and 4.2 Spatial Partition
voidMtpInterface::CalculateSmallestTime(){// update globally smallest timeg_smallestTime=Time::Max()/2;for(uint32_ti=0;i<=g_systemCount;i++){TimenextTime=g_systems[i].Next();if(nextTime<g_smallestTime){g_smallestTime=nextTime;}}g_nextPublicTime=g_systems[0].Next();// TODO(bxhu): refer to paper - LBTS equation// 1. g_smallestTime: $min{N_i} + t_{look_ahead_time}$// 2. g_nextPublicTime: $N_{pub}$// LINK ./logical-process.cc#LBTS// test if global finishedboolglobalFinished=true;for(uint32_ti=0;i<=g_systemCount;i++){globalFinished&=g_systems[i].isLocalFinished();}g_globalFinished=globalFinished;}
// 1) initialize mtpMtpInterface::Enable(conf::thread);// 2) then baseClass changesSimulator::Run();// <-- simulation starts here// from ClassSimulator to ClassMultithreadedSimulator// equals to: MultithreadedSimulator::Run()