跳转至

Unison MTP

We start from the instance itself.

C
1
2
3
4
5
6
7
8
9
// src/mtp/examples/fat-tree-mtp.cc

// 1) initialize mtp
MtpInterface::Enable(conf::thread);

// 2) then baseClass changes
Simulator::Run(); // <-- simulation starts here
// from ClassSimulator to ClassMultithreadedSimulator
// equals to: MultithreadedSimulator::Run()

Now we are calling MultithreadedSimulator::Run(), we come to src/mtp/model/multithreaded-simulator-impl.cc.

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// src/mtp/model/multithreaded-simulator-impl.cc

void
MultithreadedSimulatorImpl::Run()
{
    NS_LOG_FUNCTION(this);
    // auto partition
    if (m_partition)
    {
        Partition();
    }
    MtpInterface::Run();
}

Partition(): 4.2 Fine-Grained Partition

Now we are calling MtpInterface::Run(), we come to src/mtp/model/mtp-interface.cc.

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// src/mtp/model/mtp-interface.cc

void
MtpInterface::Run()
{
    RunBefore(); // calculate lookahead and create threads
    while (!g_globalFinished)
    {
        ProcessOneRound(); // everything in a round
        CalculateSmallestTime(); // get the smallest time
    }
    RunAfter();
}

First we take a look at ProcessOneRound():

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// TODO(bxhu): refer to paper - spatial partition
void
MtpInterface::ProcessOneRound()
{
    // assign logical process to threads

    // determine the priority of logical processes
    if (g_sortFunc != nullptr && g_round++ % g_period == 0)
    {
        std::sort(g_sortedSystemIndices, g_sortedSystemIndices + g_systemCount, g_sortFunc);
    }

    // == LPs are sorted well by priority. Now we can process them in order. ==

    // stage 1: process events
    g_recvMsgStage = false;
    g_finishedSystemCount.store(0, std::memory_order_relaxed);
    g_systemIndex.store(0, std::memory_order_release);
    // main thread also needs to process an LP to reduce an extra thread overhead
    while (true)
    {
        // generate increasing index
        uint32_t index = g_systemIndex.fetch_add(1, std::memory_order_acquire);

        // if index is out of range, break the loop
        if (index >= g_systemCount)
        {
            break;
        }
        LogicalProcess* system = &g_systems[g_sortedSystemIndices[index]]; // current LP
        system->ProcessOneRound(); // process current LP

        // increase the finished system count
        g_finishedSystemCount.fetch_add(1, std::memory_order_release);
    }

    // logical process barriar synchronization
    while (g_finishedSystemCount.load(std::memory_order_acquire) != g_systemCount)
    {
    };

    // stage 2: process the public LP
    g_systems[0].ProcessOneRound();

    // stage 3: receive messages
    // principle here is the same as stage 1.
    g_recvMsgStage = true;
    g_finishedSystemCount.store(0, std::memory_order_relaxed);
    g_systemIndex.store(0, std::memory_order_release);
    while (true)
    {
        uint32_t index = g_systemIndex.fetch_add(1, std::memory_order_acquire);
        if (index >= g_systemCount)
        {
            break;
        }
        LogicalProcess* system = &g_systems[g_sortedSystemIndices[index]];
        system->ReceiveMessages();
        g_finishedSystemCount.fetch_add(1, std::memory_order_release);
    }

    // logical process barriar synchronization
    while (g_finishedSystemCount.load(std::memory_order_acquire) != g_systemCount)
    {
    };
}

MtpInterface::ProcessOneRound(): 4.1 System Overview and 4.2 Spatial Partition

Then we come to CalculateSmallestTime():

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
MtpInterface::CalculateSmallestTime()
{
    // update globally smallest time
    g_smallestTime = Time::Max() / 2;
    for (uint32_t i = 0; i <= g_systemCount; i++)
    {
        Time nextTime = g_systems[i].Next();
        if (nextTime < g_smallestTime)
        {
            g_smallestTime = nextTime;
        }
    }
    g_nextPublicTime = g_systems[0].Next();

    // TODO(bxhu): refer to paper - LBTS equation
    // 1. g_smallestTime: $min{N_i} + t_{look_ahead_time}$
    // 2. g_nextPublicTime: $N_{pub}$
    // LINK ./logical-process.cc#LBTS

    // test if global finished
    bool globalFinished = true;
    for (uint32_t i = 0; i <= g_systemCount; i++)
    {
        globalFinished &= g_systems[i].isLocalFinished();
    }
    g_globalFinished = globalFinished;
}

MtpInterface::CalculateSmallestTime(): 4.2 Temporal Partition (LBTS)

Why AutoChanging in ns3

Recall this:

C
1
2
3
4
5
6
7
// 1) initialize mtp
MtpInterface::Enable(conf::thread);

// 2) then baseClass changes
Simulator::Run(); // <-- simulation starts here
// from ClassSimulator to ClassMultithreadedSimulator
// equals to: MultithreadedSimulator::Run()

Why it implements auto-replace functionality?

C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// src/mtp/model/mtp-interface.cc
void
MtpInterface::Enable()
{
#ifdef NS3_MPI
    GlobalValue::Bind("SimulatorImplementationType", StringValue("ns3::HybridSimulatorImpl")); // mpi
#else
    GlobalValue::Bind("SimulatorImplementationType",
                    StringValue("ns3::MultithreadedSimulatorImpl")); // mtp
#endif
    g_enabled = true; // signal: enable mtp
}

void
MtpInterface::Enable(const uint32_t threadCount)
{
#ifdef NS3_MPI
    Config::SetDefault("ns3::HybridSimulatorImpl::MaxThreads", UintegerValue(threadCount));
#else
    Config::SetDefault("ns3::MultithreadedSimulatorImpl::MaxThreads", UintegerValue(threadCount));
#endif
    MtpInterface::Enable();
}

GlobalValue::Bind will automatically replace SimulatorImplementationType to ns3::MultithreadedSimulatorImpl.

You can click and see the detailed process in Bind(a,b) function in ns3.