00001 00032 #include <itpp/protocol/tcp.h> 00033 #include <itpp/base/itfile.h> 00034 #include <limits> 00035 #include <cstdlib> 00036 #include <ctime> 00037 00039 00040 #ifdef _MSC_VER 00041 #pragma warning(disable:4355) 00042 #endif 00043 00044 namespace itpp 00045 { 00046 00047 // -------------------- Default parameters ---------------------------------- 00048 00049 // TCP sender and receiver 00050 00051 #define TCP_HEADERLENGTH 40 00052 00053 // TCP sender 00054 00055 #define TCP_VERSION kReno 00056 #define TCP_SMSS 1460 00057 #define TCP_INITIALCWNDREL 2 // related to MSS 00058 #define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd 00059 #define TCP_MAXCWNDREL 32 // related to MSS 00060 #define TCP_DUPACKS 3 00061 #define TCP_INITIALRTT 1 00062 const double TCP_STIMERGRAN = 0.2; 00063 const double TCP_SWSATIMERVALUE = 0.2; 00064 #define TCP_MAXBACKOFF 64 00065 const double TCP_MAXRTO = std::numeric_limits<double>::max(); 00066 #define TCP_IMMEDIATEBACKOFFRESET false 00067 #define TCP_TIMESTAMPS false 00068 #define TCP_KARN true 00069 #define TCP_NAGLE false 00070 #define TCP_GOBACKN true 00071 #define TCP_FLIGHTSIZERECOVERY false 00072 #define TCP_RENOCONSERVATION true 00073 #define TCP_CAREFULSSTHRESHREDUCTION true 00074 #define TCP_IGNOREDUPACKONTORECOVERY true 00075 #define TCP_CAREFULMULFASTRTXAVOIDANCE true 00076 #define TCP_RESTARTAFTERIDLE true 00077 00078 // TCP receiver 00079 00080 #define TCP_RMSS 1460 00081 const int TCP_BUFFERSIZE = std::numeric_limits<int>::max() / 4; 00082 #define TCP_DELAYEDACK true 00083 const double TCP_ACKDELAYTIME = 0.2; 00084 #define TCP_SENDPERIODICACKS false 00085 #define TCP_STRICTPERIODICACKS false 00086 #define TCP_PERIODICACKINTERVAL 1 00087 #define TCP_ACKSCHEDULINGDELAY 0 00088 #define TCP_ACKBUFFERWRITE false 00089 #define TCP_ACKBUFFERREAD true 00090 const int TCP_MAXUSERBLOCKSIZE = std::numeric_limits<int>::max() / 4; 00091 #define TCP_MINUSERBLOCKSIZE 1 00092 #define TCP_USERBLOCKPROCDELAY 0 00093 00094 // TCP generator 00095 00096 #define TCPGEN_BLOCKSIZE 1460 00097 00098 // TCP applications 00099 00100 #define TCPAPP_MAXNOOFACTIVEAPPS 500 00101 #define TCPAPP_DISTSTATARRAYSIZE 100 00102 #define TCPAPP_DISTSTATMAXGOODPUT 1000 00103 #define TCPAPP_DISTSTATMAXTRANSFERTIME 10000 00104 #define TCPAPP_CONDMEANSTATARRAYSIZE 100 00105 #define TCPAPP_CONDMEANSTATMAXREQSIZE 100000 00106 00107 00108 00109 inline int min(int opd1, int opd2) 00110 { 00111 return (opd1 < opd2) ? opd1 : opd2; 00112 } 00113 00114 00115 inline int max(int opd1, int opd2) 00116 { 00117 return (opd1 > opd2) ? opd1 : opd2; 00118 } 00119 00120 00121 // round is used to map a double value (e.g. RTO in TTCPSender) to the 00122 // next higher value of a certain granularity (e.g. timer granularity). 00123 inline double round(const double value, const double granularity) 00124 { 00125 return (std::ceil(value / granularity) * granularity); 00126 } 00127 00128 // -------------------- TCP_Segment ---------------------------------------- 00129 00130 TCP_Segment::TCP_Segment() : 00131 seq_begin(), 00132 seq_end() 00133 { 00134 } 00135 00136 TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) : 00137 seq_begin(sn_begin), 00138 seq_end(sn_end) 00139 { 00140 it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) + 00141 " < begin byte " + to_str(seq_begin.value())); 00142 } 00143 00144 00145 TCP_Segment::TCP_Segment(const TCP_Segment &segment) : 00146 seq_begin(segment.seq_begin), 00147 seq_end(segment.seq_end) 00148 { 00149 } 00150 00151 00152 TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment) 00153 { 00154 this->seq_begin = segment.seq_begin; 00155 this->seq_end = segment.seq_end; 00156 00157 return *this; 00158 } 00159 00160 00161 void TCP_Segment::combine(const TCP_Segment &segment) 00162 { 00163 it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined"); 00164 00165 seq_begin = min(seq_begin, segment.seq_begin); 00166 seq_end = max(seq_end, segment.seq_end); 00167 } 00168 00169 00170 std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment) 00171 { 00172 os << "(" << segment.seq_begin << "," << segment.seq_end << ")"; 00173 return os; 00174 } 00175 00176 00177 // -------------------- TCP_Packet ---------------------------------------- 00178 TCP_Packet::TCP_Packet() : 00179 fSegment(), 00180 fACK(), 00181 fWnd(0), 00182 fSessionId(0), 00183 fInfo(0) 00184 { 00185 } 00186 00187 00188 TCP_Packet::TCP_Packet(const TCP_Packet &packet) : 00189 fSegment(packet.fSegment), 00190 fACK(packet.fACK), 00191 fWnd(packet.fWnd), 00192 fSessionId(packet.fSessionId), 00193 fInfo(0) 00194 { 00195 std::cout << "TCP_Packet::TCP_Packet ############" << " "; 00196 00197 if (packet.fInfo != 0) { 00198 std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " "; 00199 fInfo = new TDebugInfo(*packet.fInfo); 00200 } 00201 } 00202 00203 00204 TCP_Packet::~TCP_Packet() 00205 { 00206 delete fInfo; 00207 } 00208 00209 00210 TCP_Packet & TCP_Packet::clone() const 00211 { 00212 return *new TCP_Packet(*this); 00213 } 00214 00215 00216 void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd, 00217 double estRTT, Sequence_Number sndUna, 00218 Sequence_Number sndNxt, bool isRtx) 00219 { 00220 if (fInfo == 0) { 00221 fInfo = new TDebugInfo; 00222 } 00223 00224 fInfo->fSSThresh = ssThresh; 00225 fInfo->fRecWnd = recWnd; 00226 fInfo->fCWnd = cWnd; 00227 fInfo->fRTTEstimate = estRTT; 00228 fInfo->fSndUna = sndUna; 00229 fInfo->fSndNxt = sndNxt; 00230 fInfo->fRtxFlag = isRtx; 00231 } 00232 00233 00234 void TCP_Packet::print_header(std::ostream &) const 00235 { 00236 std::cout << "Hello!\n"; 00237 00238 std::cout << "Ses = " << get_session_id() << " "; 00239 00240 std::cout << "Segment = " << get_segment() << " " 00241 << "ACK = " << get_ACK() << " " 00242 << "Wnd = " << get_wnd() << " "; 00243 00244 std::cout << "DestPort = " << fDestinationPort << " " 00245 << "SourcePort = " << fSourcePort << " "; 00246 00247 00248 if (fInfo != 0) { 00249 std::cout << "SndSSThresh = " << fInfo->fSSThresh << " "; 00250 std::cout << "RecWnd = " << fInfo->fRecWnd << " "; 00251 std::cout << "SndCWnd = " << fInfo->fCWnd << " "; 00252 std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " "; 00253 std::cout << "RtxFlag = " << fInfo->fRtxFlag; 00254 } 00255 else 00256 std::cout << "fInfo = " << fInfo << " "; 00257 00258 std::cout << std::endl; 00259 00260 } 00261 00262 00263 00264 std::ostream & operator<<(std::ostream & out, TCP_Packet & msg) 00265 { 00266 msg.print_header(out); 00267 return out; 00268 } 00269 00270 00271 // -------------------- TCP_Sender ---------------------------------------- 00272 TCP_Sender::TCP_Sender(int label) : 00273 fLabel(label), 00274 fTCPVersion(TCP_VERSION), 00275 fMSS(TCP_SMSS), 00276 fTCPIPHeaderLength(TCP_HEADERLENGTH), 00277 fInitialRTT(TCP_INITIALRTT), 00278 fInitialCWnd(0), // default initialization see below 00279 fInitialSSThresh(0), // default initialization see below 00280 fMaxCWnd(0), // default initialization see below 00281 fDupACKThreshold(TCP_DUPACKS), 00282 fTimerGranularity(TCP_STIMERGRAN), 00283 fMaxRTO(TCP_MAXRTO), 00284 fMaxBackoff(TCP_MAXBACKOFF), 00285 fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET), 00286 fKarn(TCP_KARN), 00287 fGoBackN(TCP_GOBACKN), 00288 fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY), 00289 fRenoConservation(TCP_RENOCONSERVATION), 00290 fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION), 00291 fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY), 00292 fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE), 00293 fNagle(TCP_NAGLE), 00294 fSWSATimerValue(TCP_SWSATIMERVALUE), 00295 fRestartAfterIdle(TCP_RESTARTAFTERIDLE), 00296 fDebug(false), 00297 fTrace(false), 00298 fSessionId(0), 00299 fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout), 00300 fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/ 00301 { 00302 00303 // default values and parameter check for MaxCWND, InitCWND, InitSSThresh 00304 if (fMaxCWnd == 0) { 00305 fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS); 00306 } 00307 else if (fMaxCWnd < fMSS) { 00308 // throw (UL_CException("TCP_Sender::TCP_Sender", 00309 // "MaxCWnd must be >= MSS")); 00310 } 00311 00312 if (fInitialCWnd == 0) { 00313 fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS); 00314 } 00315 else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) { 00316 // throw (UL_CException("TCP_Sender::TCP_Sender", 00317 // "initial CWnd must be >= MSS and <= MaxCWnd")); 00318 } 00319 00320 if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) { 00321 fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd); 00322 } 00323 else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) { 00324 // throw (UL_CException("TCP_Sender::TCP_Sender", 00325 // "initial CWnd must be >= 2*MSS and <= MaxCWnd")); 00326 } 00327 00328 setup(); 00329 00330 InitStatistics(); 00331 00332 00333 tcp_send.set_name("TCP Send"); 00334 tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet); 00335 tcp_receive_ack.set_name("TCP ACK"); 00336 tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication); 00337 tcp_socket_write.set_name("SocketWrite"); 00338 tcp_release.forward(this, &TCP_Sender::release); 00339 tcp_release.set_name("Release"); 00340 00341 } 00342 00343 00344 TCP_Sender::~TCP_Sender() 00345 { 00346 } 00347 00348 void TCP_Sender::set_debug(const bool enable_debug) 00349 { 00350 fDebug = enable_debug; 00351 tcp_send.set_debug(enable_debug); 00352 } 00353 00354 void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug) 00355 { 00356 fDebug = enable_debug; 00357 tcp_send.set_debug(enable_signal_debug); 00358 } 00359 00360 void TCP_Sender::set_trace(const bool enable_trace) 00361 { 00362 fTrace = enable_trace; 00363 } 00364 00365 void TCP_Sender::set_label(int label) 00366 { 00367 fLabel = label; 00368 } 00369 00370 void TCP_Sender::setup() 00371 { 00372 fSndUna = 0; 00373 fSndNxt = 0; 00374 fSndMax = 0; 00375 fMaxRecWnd = 0; 00376 fRecWnd = fMaxCWnd; 00377 fUserNxt = 0; 00378 fCWnd = fInitialCWnd; 00379 fSSThresh = fInitialSSThresh; 00380 fRecoveryDupACK = 0; 00381 fRecoveryTO = 0; 00382 fDupACKCnt = 0; 00383 00384 // timers 00385 fBackoff = 1; 00386 fPendingBackoffReset = false; 00387 fLastSendTime = Event_Queue::now(); 00388 00389 // RTT measurement 00390 fTimUna = 0; 00391 fSRTT = 0; 00392 fRTTVar = 0; 00393 fRTTEstimate = fInitialRTT; 00394 fRTTMPending = false; 00395 fRTTMByte = 0; 00396 00397 CWnd_val.set_size(1000); 00398 CWnd_val.zeros(); 00399 CWnd_time.set_size(1000); 00400 CWnd_time.zeros(); 00401 CWnd_val(0) = fInitialCWnd; 00402 CWnd_time(0) = 0; 00403 CWnd_index = 1; 00404 00405 SSThresh_val.set_size(1000); 00406 SSThresh_val.zeros(); 00407 SSThresh_time.set_size(1000); 00408 SSThresh_time.zeros(); 00409 SSThresh_val(0) = fInitialSSThresh; 00410 SSThresh_time(0) = 0; 00411 SSThresh_index = 1; 00412 00413 sent_seq_num_val.set_size(1000); 00414 sent_seq_num_val.zeros(); 00415 sent_seq_num_time.set_size(1000); 00416 sent_seq_num_time.zeros(); 00417 sent_seq_num_val(0) = 0; 00418 sent_seq_num_time(0) = 0; 00419 sent_seq_num_index = 1; 00420 00421 sender_recv_ack_seq_num_val.set_size(1000); 00422 sender_recv_ack_seq_num_val.zeros(); 00423 sender_recv_ack_seq_num_time.set_size(1000); 00424 sender_recv_ack_seq_num_time.zeros(); 00425 sender_recv_ack_seq_num_val(0) = 0; 00426 sender_recv_ack_seq_num_time(0) = 0; 00427 sender_recv_ack_seq_num_index = 1; 00428 00429 RTTEstimate_val.set_size(1000); 00430 RTTEstimate_val.zeros(); 00431 RTTEstimate_time.set_size(1000); 00432 RTTEstimate_time.zeros(); 00433 RTTEstimate_val(0) = fInitialRTT; 00434 RTTEstimate_time(0) = 0; 00435 RTTEstimate_index = 1; 00436 00437 RTTsample_val.set_size(1000); 00438 RTTsample_val.zeros(); 00439 RTTsample_time.set_size(1000); 00440 RTTsample_time.zeros(); 00441 RTTsample_val(0) = 0; 00442 RTTsample_time(0) = 0; 00443 RTTsample_index = 1; 00444 00445 } 00446 00447 std::string TCP_Sender::GenerateFilename() 00448 { 00449 time_t rawtime; 00450 struct tm *timeinfo; 00451 timeinfo = localtime(&rawtime); 00452 std::ostringstream filename_stream; 00453 filename_stream << "trace_tcp_sender_u" << fLabel 00454 << "_" << 1900 + timeinfo->tm_year 00455 << "_" << timeinfo->tm_mon 00456 << "_" << timeinfo->tm_mday 00457 << "__" << timeinfo->tm_hour 00458 << "_" << timeinfo->tm_min 00459 << "_" << timeinfo->tm_sec 00460 << "_.it"; 00461 return filename_stream.str(); 00462 } 00463 00464 00465 void TCP_Sender::release(std::string file) 00466 { 00467 std::string filename; 00468 fSessionId++; 00469 00470 fRtxTimer.Reset(); 00471 fSWSATimer.Reset(); 00472 00473 if (fTrace) { 00474 if (file == "") 00475 filename = GenerateFilename(); 00476 else 00477 filename = file; 00478 00479 save_trace(filename); 00480 } 00481 } 00482 00483 00484 void TCP_Sender::InitStatistics() 00485 { 00486 fNumberOfTimeouts = 0; 00487 fNumberOfIdleTimeouts = 0; 00488 fNumberOfFastRetransmits = 0; 00489 fNumberOfRTTMeasurements = 0; 00490 fNumberOfReceivedACKs = 0; 00491 } 00492 00493 00494 void TCP_Sender::StopTransientPhase() 00495 { 00496 InitStatistics(); 00497 } 00498 00499 00500 void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p) 00501 { 00502 if (fDebug) { 00503 std::cout << "TCP_Sender::HandleUserMessageIndication" 00504 << " byte_size=" << user_data_p->bit_size() / 8 00505 << " ptr=" << user_data_p 00506 << " time=" << Event_Queue::now() << std::endl; 00507 } 00508 00509 SocketWriteQueue.push(user_data_p); 00510 00511 SendNewData(); // will call GetMessage (via GetNextSegmentSize) 00512 // if new data can be sent 00513 } 00514 00515 00516 void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg) 00517 { 00518 TCP_Packet & packet = (TCP_Packet &) * msg; 00519 00520 if (fDebug) { 00521 std::cout << "TCP_Sender::ReceiveMessageFromNet" 00522 << " byte_size=" << msg->bit_size() / 8 00523 << " ptr=" << msg 00524 << " time=" << Event_Queue::now() << std::endl; 00525 } 00526 00527 if ((packet.get_session_id() == fSessionId) && // ACK of current session 00528 (packet.get_ACK() >= fSndUna)) { // ACK is OK 00529 HandleACK(packet); 00530 } 00531 00532 delete &packet; 00533 } 00534 00535 00536 void TCP_Sender::HandleACK(TCP_Packet &msg) 00537 { 00538 it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at "); 00539 00540 fNumberOfReceivedACKs++; 00541 00542 if (fTrace) { 00543 TraceACKedSeqNo(msg.get_ACK()); 00544 } 00545 00546 if (fDebug) { 00547 std::cout << "sender " << fLabel << ": " 00548 << "receive ACK: " 00549 << " t = " << Event_Queue::now() << ", " 00550 << msg << std::endl; 00551 } 00552 00553 // update receiver advertised window size 00554 fRecWnd = msg.get_wnd(); 00555 fMaxRecWnd = max(fRecWnd, fMaxRecWnd); 00556 00557 if (msg.get_ACK() == fSndUna) { // duplicate ACK 00558 00559 bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data 00560 00561 if (fIgnoreDupACKOnTORecovery) { 00562 // don't count dupacks during TO recovery! 00563 if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5 00564 // like in Solaris 00565 ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO); 00566 } 00567 else { 00568 // like in ns 00569 ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO); 00570 } 00571 } 00572 00573 if (!ignoreDupACK) { 00574 fDupACKCnt++; // count the number of duplicate ACKs 00575 00576 if (fDupACKCnt == fDupACKThreshold) { 00577 // dupack threshold is reached 00578 fNumberOfFastRetransmits++; 00579 00580 fRecoveryDupACK = fSndMax; 00581 00582 ReduceSSThresh(); // halve ssthresh (in most cases) 00583 00584 if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) { 00585 fCWnd = fSSThresh; 00586 } 00587 else if (fTCPVersion == kTahoe) { 00588 fCWnd = fMSS; 00589 } 00590 00591 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00592 // conservation of packets: 00593 if (fRenoConservation) { 00594 fCWnd += fDupACKThreshold * fMSS; 00595 } 00596 } 00597 else if (fTCPVersion == kTahoe) { 00598 if (fGoBackN) { 00599 fSndNxt = fSndUna; // Go-Back-N (like in ns) 00600 } 00601 } 00602 00603 UnaRetransmit(); // initiate retransmission 00604 } 00605 else if (fDupACKCnt > fDupACKThreshold) { 00606 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00607 // conservation of packets 00608 // CWnd may exceed MaxCWnd during fast recovery, 00609 // however, the result of SendWindow() is always <= MaxCwnd 00610 if (fRenoConservation) { 00611 fCWnd += fMSS; 00612 } 00613 } 00614 } 00615 } 00616 } 00617 else { // new ACK 00618 Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK 00619 fSndUna = msg.get_ACK(); 00620 fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N" 00621 00622 // reset retransmission timer 00623 00624 if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) { 00625 // seq. no. for which rtx timer is running has been received 00626 fRtxTimer.Reset(); 00627 } 00628 00629 // backoff reset 00630 00631 if (fImmediateBackoffReset) { 00632 fBackoff = 1; 00633 } 00634 else { 00635 if (fPendingBackoffReset) { 00636 fBackoff = 1; 00637 fPendingBackoffReset = false; 00638 } 00639 else if (fBackoff > 1) { 00640 // reset backoff counter only on next new ACK (this is probably 00641 // the way to operate intended by Karn) 00642 fPendingBackoffReset = true; 00643 } 00644 } 00645 00646 // RTT measurement 00647 00648 if ((fSndUna > fRTTMByte) && fRTTMPending) { 00649 UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime); 00650 fRTTMPending = false; 00651 } 00652 00653 // update CWnd and reset dupack counter 00654 00655 if (fDupACKCnt >= fDupACKThreshold) { 00656 // we are in fast recovery 00657 if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) { 00658 // New Reno partial ACK handling 00659 if (fRenoConservation) { 00660 fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS); 00661 } 00662 UnaRetransmit(); // start retransmit immediately 00663 } 00664 else { 00665 FinishFastRecovery(); 00666 } 00667 } 00668 else { 00669 // no fast recovery 00670 fDupACKCnt = 0; 00671 if (fCWnd < fSSThresh) { 00672 // slow start phase 00673 fCWnd = min(fCWnd + fMSS, fMaxCWnd); 00674 } 00675 else { 00676 // congestion avoidance phase 00677 fCWnd += max(fMSS * fMSS / fCWnd, 1); // RFC 2581 00678 fCWnd = min(fCWnd, fMaxCWnd); 00679 } 00680 } 00681 } // new ACK 00682 00683 SendNewData(); // try to send new data (even in the case that a retransmit 00684 // had to be performed) 00685 00686 if (fTrace) { 00687 TraceCWnd(); 00688 } 00689 } 00690 00691 00692 void TCP_Sender::SendNewData(bool skipSWSA) 00693 { 00694 unsigned nextSegmentSize; 00695 00696 it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!"); 00697 00698 if (fRestartAfterIdle) { 00699 IdleCheck(); 00700 } 00701 00702 bool sillyWindowAvoidanceFailed = false; 00703 00704 while (!sillyWindowAvoidanceFailed && 00705 ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) { 00706 // there is new data to send and window is large enough 00707 00708 // SWSA and Nagle (RFC 1122): assume PUSH to be set 00709 unsigned queuedUnsent = fUserNxt - fSndNxt; 00710 unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt); 00711 00712 if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) || 00713 ((!fNagle || (fSndUna == fSndNxt)) && 00714 ((queuedUnsent <= usableWindow) || // Silly W. A. 00715 ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2) 00716 ) 00717 ) || 00718 skipSWSA 00719 ) { 00720 // Silly Window Syndrome Avoidance (SWSA) and Nagle passed 00721 00722 TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize); 00723 TCP_Packet & msg = * new TCP_Packet(); 00724 00725 msg.set_segment(nextSegment); 00726 msg.set_session_id(fSessionId); 00727 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00728 msg.set_source_port(fLabel); // number for simplicity. 00729 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00730 00731 if (fDebug) { 00732 std::cout << "TCP_Sender::SendNewData," 00733 << " nextSegmentSize=" << nextSegmentSize 00734 << " fTCPIPHeaderLength=" << fTCPIPHeaderLength 00735 << " byte_size=" << msg.bit_size() / 8 00736 << " ptr=" << &msg 00737 << " time=" << Event_Queue::now() << std::endl; 00738 } 00739 00740 // no RTT measurement for retransmitted segments 00741 // changes on Dec. 13. 2002 (Ga, Bo, Scharf) 00742 00743 if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo## 00744 fRTTMStartTime = Event_Queue::now(); 00745 fRTTMByte = nextSegment.begin(); 00746 fRTTMPending = true; 00747 } 00748 00749 fSndNxt += nextSegmentSize; 00750 fSndMax = max(fSndNxt, fSndMax); 00751 00752 // reset SWSA timer if necessary 00753 if (skipSWSA) { 00754 skipSWSA = false; 00755 } 00756 else if (fSWSATimer.IsPending()) { 00757 fSWSATimer.Reset(); 00758 } 00759 00760 // set rtx timer if necessary 00761 if (!fRtxTimer.IsPending()) { 00762 SetRtxTimer(); 00763 } 00764 00765 00766 if (fDebug) { 00767 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00768 fSndUna, fSndNxt, false); 00769 std::cout << "sender " << fLabel 00770 << ": send new data: " 00771 << " t = " << Event_Queue::now() << ", " 00772 << msg << std::endl; 00773 } 00774 00775 SendMsg(msg); 00776 } 00777 else { 00778 sillyWindowAvoidanceFailed = true; 00779 // set SWSA timer 00780 if (!fSWSATimer.IsPending()) { 00781 fSWSATimer.Set(fSWSATimerValue); 00782 } 00783 } 00784 } 00785 00786 // set timers in case that no new data could have been sent 00787 if (!fRtxTimer.IsPending()) { 00788 if (fSndMax > fSndUna) { // there is outstanding data 00789 if (!fImmediateBackoffReset && fPendingBackoffReset) { 00790 // backoff is reset if no new data could have been sent since last 00791 // (successfull) retransmission; this is useful in case of 00792 // Reno recovery and multiple losses to avoid that in 00793 // the (unavoidable) series of timeouts the timer value 00794 // increases exponentially as this is not the intention 00795 // of the delayed backoff reset in Karn's algorithm 00796 fBackoff = 1; 00797 fPendingBackoffReset = false; 00798 } 00799 SetRtxTimer(); 00800 } 00801 } 00802 } 00803 00804 00805 void TCP_Sender::UnaRetransmit() 00806 { 00807 // resend after timeout or fast retransmit 00808 unsigned nextSegmentSize = GetNextSegmentSize(fSndUna); 00809 00810 if (nextSegmentSize > 0) { 00811 TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize); 00812 TCP_Packet & msg = *new TCP_Packet(); 00813 msg.set_segment(nextSegment); 00814 msg.set_session_id(fSessionId); 00815 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00816 msg.set_source_port(fLabel); // number for simplicity. 00817 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00818 00819 fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize); 00820 fSndMax = max(fSndNxt, fSndMax); 00821 00822 // The RTT measurement is cancelled if the RTTM byte has a sequence 00823 // number higher or equal than the first retransmitted byte as 00824 // the ACK for the RTTM byte will be delayed by the rtx for at least 00825 // one round 00826 if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) { 00827 fRTTMPending = false; 00828 } 00829 00830 SetRtxTimer(); 00831 00832 if (fDebug) { 00833 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00834 fSndUna, fSndNxt, true); 00835 std::cout << "sender " << fLabel; 00836 if (fDupACKCnt >= fDupACKThreshold) { 00837 std::cout << ": fast rtx: "; 00838 } 00839 else { 00840 std::cout << ": TO rtx: "; 00841 } 00842 std::cout << " t = " << Event_Queue::now() << ", " 00843 << msg << std::endl; 00844 } 00845 00846 SendMsg(msg); 00847 } 00848 else { 00849 // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send")); 00850 } 00851 } 00852 00853 00854 void TCP_Sender::FinishFastRecovery() 00855 { 00856 if (fTCPVersion == kTahoe) { 00857 fDupACKCnt = 0; 00858 } 00859 else if (fTCPVersion == kReno) { 00860 // Reno fast recovery 00861 fDupACKCnt = 0; 00862 if (fFlightSizeRecovery) { 00863 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00864 } 00865 else { 00866 fCWnd = fSSThresh; 00867 } 00868 } 00869 else if (fTCPVersion == kNewReno) { 00870 // New Reno fast recovery 00871 // "Set CWnd to ... min (ssthresh, FlightSize + MSS) 00872 // ... or ssthresh" (RFC 2582) 00873 if (fFlightSizeRecovery) { 00874 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00875 } 00876 else { 00877 fCWnd = fSSThresh; 00878 } 00879 fDupACKCnt = 0; 00880 } 00881 } 00882 00883 00884 void TCP_Sender::ReduceSSThresh() 00885 { 00886 if (fCarefulSSThreshReduction) { 00887 // If Reno conservation is enabled the amount of 00888 // outstanding data ("flight size") might be rather large 00889 // and even larger than twice the old ssthresh value; 00890 // so this corresponds more to the ns behaviour where always cwnd is 00891 // taken instead of flight size. 00892 fSSThresh = max(2 * fMSS, 00893 min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2); 00894 } 00895 else { 00896 // use filght size / 2 as recommended in RFC 2581 00897 fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2); 00898 } 00899 00900 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd"); 00901 00902 if (fTrace) { 00903 TraceSSThresh(); 00904 } 00905 } 00906 00907 00908 void TCP_Sender::SendMsg(TCP_Packet &msg) 00909 { 00910 if (fTrace) { 00911 TraceSentSeqNo(msg.get_segment().end()); 00912 } 00913 00914 if (fRestartAfterIdle) { 00915 fLastSendTime = Event_Queue::now(); // needed for idle detection 00916 } 00917 00918 tcp_send(&msg); 00919 } 00920 00921 00922 void TCP_Sender::IdleCheck() 00923 { 00924 // idle detection according to Jacobson, SIGCOMM, 1988: 00925 // sender is currently idle and nothing has been send since RTO 00926 00927 if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) { 00928 fCWnd = fInitialCWnd; // see RFC2581 00929 00930 fNumberOfIdleTimeouts++; 00931 00932 if (fTrace) { 00933 TraceCWnd(); 00934 } 00935 00936 if (fDebug) { 00937 std::cout << "sender " << fLabel 00938 << ": idle timeout: " 00939 << "t = " << Event_Queue::now() 00940 << ", SndNxt = " << fSndNxt 00941 << ", SndUna = " << fSndUna 00942 << ", Backoff = " << fBackoff 00943 << std::endl; 00944 } 00945 } 00946 } 00947 00948 00949 void TCP_Sender::HandleRtxTimeout(Ttype) 00950 { 00951 fNumberOfTimeouts++; 00952 00953 // update backoff 00954 fBackoff = min(fMaxBackoff, fBackoff * 2); 00955 if (!fImmediateBackoffReset) { 00956 fPendingBackoffReset = false; 00957 } 00958 00959 if (fDupACKCnt >= fDupACKThreshold) { 00960 FinishFastRecovery(); // reset dup ACK cnt and CWnd 00961 } 00962 else if (fDupACKCnt > 0) { 00963 fDupACKCnt = 0; // don't allow dupack action during TO recovery 00964 } 00965 00966 // update CWnd and SSThresh 00967 ReduceSSThresh(); // halve ssthresh (in most cases) 00968 fCWnd = fMSS; // not initial CWnd, see RFC 2581 00969 00970 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd"); 00971 00972 fRecoveryTO = fSndMax; 00973 00974 if (fGoBackN) { 00975 // go back N is mainly relevant in the case of multiple losses 00976 // which would lead to a series of timeouts without resetting sndnxt 00977 fSndNxt = fSndUna; 00978 } 00979 00980 if (fDebug) { 00981 std::cout << "sender " << fLabel 00982 << ": rtx timeout: " 00983 << "t = " << Event_Queue::now() 00984 << ", SndNxt = " << fSndNxt 00985 << ", SndUna = " << fSndUna 00986 << std::endl; 00987 } 00988 00989 if (fTrace) { 00990 TraceCWnd(); 00991 } 00992 00993 UnaRetransmit(); // initiate retransmission 00994 } 00995 00996 00997 void TCP_Sender::HandleSWSATimeout(Ttype) 00998 { 00999 SendNewData(true); 01000 } 01001 01002 01003 unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin) 01004 { 01005 // try to get new user messages if available and necessary 01006 while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) { 01007 itpp::Packet *packet_p = SocketWriteQueue.front(); 01008 SocketWriteQueue.pop(); 01009 fUserNxt += (unsigned) packet_p->bit_size() / 8; 01010 delete packet_p; 01011 } 01012 01013 Sequence_Number end = min(min(fUserNxt, begin + fMSS), 01014 fSndUna + SendWindow()); 01015 01016 if (fDebug) { 01017 std::cout << "TCP_Sender::GetNextSegmentSize," 01018 << " fUserNxt=" << fUserNxt 01019 << " begin_seq_num=" << begin 01020 << " fMSS=" << fMSS 01021 << " fSndUna=" << fSndUna 01022 << " SendWindow()=" << SendWindow() 01023 << " end_seq_num=" << end 01024 << " time=" << Event_Queue::now() << std::endl; 01025 } 01026 01027 return max(0, end - begin); 01028 } 01029 01030 01031 unsigned TCP_Sender::SendWindow() const 01032 { 01033 return min(fRecWnd, min(fMaxCWnd, fCWnd)); 01034 } 01035 01036 01037 double TCP_Sender::CalcRTOValue() const 01038 { 01039 static const double factor = 1 + 1e-8; 01040 // to avoid "simultaneous" TO/receive ACK events in case of const. RTT 01041 01042 double rto = fBackoff * fRTTEstimate * factor; 01043 01044 if (rto > fMaxRTO) { 01045 rto = fMaxRTO; 01046 } 01047 01048 return rto; 01049 } 01050 01051 01052 void TCP_Sender::SetRtxTimer() 01053 { 01054 double rto = CalcRTOValue(); 01055 fRtxTimer.Set(rto); 01056 fTimUna = fSndUna; 01057 if (fDebug) { 01058 std::cout << "sender " << fLabel 01059 << ": set rtx timer: " 01060 << "t = " << Event_Queue::now() 01061 << ", RTO = " << rto 01062 << ", Backoff = " << fBackoff 01063 << ", TimUna = " << fTimUna 01064 << std::endl; 01065 } 01066 } 01067 01068 01069 void TCP_Sender::UpdateRTTVariables(double sampleRTT) 01070 { 01071 if (fSRTT == 0) { 01072 fSRTT = sampleRTT; 01073 fRTTVar = sampleRTT / 2; 01074 } 01075 else { 01076 // see, e.g., Comer for the values used as weights 01077 fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT; 01078 fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT); 01079 } 01080 01081 fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity); 01082 01083 if (fTrace) { 01084 TraceRTTVariables(sampleRTT); 01085 } 01086 01087 fNumberOfRTTMeasurements++; 01088 } 01089 01090 01091 void TCP_Sender::TraceRTTVariables(double sampleRTT) 01092 { 01093 if (fDebug) { 01094 std::cout << "sender " << fLabel 01095 << ": RTT update: " 01096 << "t = " << Event_Queue::now() 01097 << ", sample = " << sampleRTT 01098 << ", SRTT = " << fSRTT 01099 << ", RTTVar = " << fRTTVar 01100 << ", RTTEstimate = " << fRTTEstimate 01101 << std::endl; 01102 } 01103 01104 if (RTTsample_index >= RTTsample_time.size()) { 01105 RTTsample_time.set_size(2*RTTsample_time.size(), true); 01106 RTTsample_val.set_size(2*RTTsample_val.size(), true); 01107 } 01108 RTTsample_val(RTTsample_index) = sampleRTT; 01109 RTTsample_time(RTTsample_index) = Event_Queue::now(); 01110 RTTsample_index++; 01111 01112 if (RTTEstimate_index >= RTTEstimate_time.size()) { 01113 RTTEstimate_time.set_size(2*RTTEstimate_time.size(), true); 01114 RTTEstimate_val.set_size(2*RTTEstimate_val.size(), true); 01115 } 01116 RTTEstimate_val(RTTEstimate_index) = fRTTEstimate; 01117 RTTEstimate_time(RTTEstimate_index) = Event_Queue::now(); 01118 RTTEstimate_index++; 01119 } 01120 01121 01122 void TCP_Sender::TraceCWnd() 01123 { 01124 if (fDebug) { 01125 std::cout << "sender " << fLabel 01126 << " t = " << Event_Queue::now() 01127 << " cwnd = " << fCWnd << std::endl; 01128 } 01129 if (CWnd_index >= CWnd_time.size()) { 01130 CWnd_time.set_size(2*CWnd_time.size(), true); 01131 CWnd_val.set_size(2*CWnd_val.size(), true); 01132 } 01133 CWnd_val(CWnd_index) = fCWnd; 01134 CWnd_time(CWnd_index) = Event_Queue::now(); 01135 CWnd_index++; 01136 01137 } 01138 01139 void TCP_Sender::TraceSSThresh() 01140 { 01141 if (fDebug) { 01142 std::cout << "sender " << fLabel 01143 << " t = " << Event_Queue::now() 01144 << " cwnd = " << fSSThresh << std::endl; 01145 } 01146 if (SSThresh_index >= SSThresh_time.size()) { 01147 SSThresh_time.set_size(2*SSThresh_time.size(), true); 01148 SSThresh_val.set_size(2*SSThresh_val.size(), true); 01149 } 01150 SSThresh_val(SSThresh_index) = fSSThresh; 01151 SSThresh_time(SSThresh_index) = Event_Queue::now(); 01152 SSThresh_index++; 01153 01154 } 01155 01156 void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn) 01157 { 01159 if (fDebug) { 01160 std::cout << "sender " << fLabel 01161 << " t = " << Event_Queue::now() 01162 << " sent = " << sn 01163 << std::endl; 01164 } 01165 if (sent_seq_num_index >= sent_seq_num_time.size()) { 01166 sent_seq_num_time.set_size(2*sent_seq_num_time.size(), true); 01167 sent_seq_num_val.set_size(2*sent_seq_num_val.size(), true); 01168 } 01169 sent_seq_num_val(sent_seq_num_index) = sn.value(); 01170 sent_seq_num_time(sent_seq_num_index) = Event_Queue::now(); 01171 sent_seq_num_index++; 01172 } 01173 01174 01175 void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn) 01176 { 01177 if (fDebug) { 01178 std::cout << "sender " << fLabel 01179 << " t = " << Event_Queue::now() 01180 << " ACK = " << sn 01181 << std::endl; 01182 } 01183 01184 if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) { 01185 sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(), true); 01186 sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(), true); 01187 } 01188 sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value(); 01189 sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now(); 01190 sender_recv_ack_seq_num_index++; 01191 } 01192 01193 01194 void TCP_Sender::save_trace(std::string filename) 01195 { 01196 01197 CWnd_val.set_size(CWnd_index, true); 01198 CWnd_time.set_size(CWnd_index, true); 01199 01200 SSThresh_val.set_size(SSThresh_index, true); 01201 SSThresh_time.set_size(SSThresh_index, true); 01202 01203 sent_seq_num_val.set_size(sent_seq_num_index, true); 01204 sent_seq_num_time.set_size(sent_seq_num_index, true); 01205 01206 sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true); 01207 sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index, true); 01208 01209 RTTEstimate_val.set_size(RTTEstimate_index, true); 01210 RTTEstimate_time.set_size(RTTEstimate_index, true); 01211 01212 RTTsample_val.set_size(RTTsample_index, true); 01213 RTTsample_time.set_size(RTTsample_index, true); 01214 01215 if (fDebug) { 01216 std::cout << "CWnd_val" << CWnd_val << std::endl; 01217 std::cout << "CWnd_time" << CWnd_time << std::endl; 01218 std::cout << "CWnd_index" << CWnd_index << std::endl; 01219 01220 std::cout << "SSThresh_val" << SSThresh_val << std::endl; 01221 std::cout << "SSThresh_time" << SSThresh_time << std::endl; 01222 std::cout << "SSThresh_index" << SSThresh_index << std::endl; 01223 01224 std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl; 01225 std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl; 01226 std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl; 01227 01228 std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl; 01229 std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl; 01230 std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl; 01231 01232 std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl; 01233 std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl; 01234 std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl; 01235 01236 std::cout << "RTTsample_val" << RTTsample_val << std::endl; 01237 std::cout << "RTTsample_time" << RTTsample_time << std::endl; 01238 std::cout << "RTTsample_index" << RTTsample_index << std::endl; 01239 01240 std::cout << "TCP_Sender::saving to file: " << filename << std::endl; 01241 } 01242 01243 it_file ff2; 01244 ff2.open(filename); 01245 01246 ff2 << Name("CWnd_val") << CWnd_val; 01247 ff2 << Name("CWnd_time") << CWnd_time; 01248 ff2 << Name("CWnd_index") << CWnd_index; 01249 01250 ff2 << Name("SSThresh_val") << SSThresh_val; 01251 ff2 << Name("SSThresh_time") << SSThresh_time; 01252 ff2 << Name("SSThresh_index") << SSThresh_index; 01253 01254 ff2 << Name("sent_seq_num_val") << sent_seq_num_val; 01255 ff2 << Name("sent_seq_num_time") << sent_seq_num_time; 01256 ff2 << Name("sent_seq_num_index") << sent_seq_num_index; 01257 01258 ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val; 01259 ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time; 01260 ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index; 01261 01262 ff2 << Name("RTTEstimate_val") << RTTEstimate_val; 01263 ff2 << Name("RTTEstimate_time") << RTTEstimate_time; 01264 ff2 << Name("RTTEstimate_index") << RTTEstimate_index; 01265 01266 ff2 << Name("RTTsample_val") << RTTsample_val; 01267 ff2 << Name("RTTsample_time") << RTTsample_time; 01268 ff2 << Name("RTTsample_index") << RTTsample_index; 01269 01270 ff2.flush(); 01271 ff2.close(); 01272 } 01273 01274 01275 void TCP_Sender::print_item(std::ostream &, const std::string & keyword) 01276 { 01277 if (keyword == "Label") { 01278 std::cout << fLabel; 01279 } 01280 else if (keyword == "CWnd") { 01281 std::cout << fCWnd; 01282 } 01283 else if (keyword == "SSThresh") { 01284 std::cout << fSSThresh; 01285 } 01286 else if (keyword == "SRTT") { 01287 std::cout << fSRTT; 01288 } 01289 else if (keyword == "RTTvar") { 01290 std::cout << fRTTVar; 01291 } 01292 else if (keyword == "Backoff") { 01293 std::cout << fBackoff; 01294 } 01295 else if (keyword == "RTO") { 01296 std::cout << CalcRTOValue(); 01297 } 01298 else if (keyword == "NoOfFastRets") { 01299 std::cout << fNumberOfFastRetransmits; 01300 } 01301 else if (keyword == "NoOfRetTOs") { 01302 std::cout << fNumberOfTimeouts; 01303 } 01304 else if (keyword == "NoOfIdleTOs") { 01305 std::cout << fNumberOfIdleTimeouts; 01306 } 01307 else if (keyword == "NoOfRTTMs") { 01308 std::cout << fNumberOfRTTMeasurements; 01309 } 01310 else if (keyword == "NoOfRecACKs") { 01311 std::cout << fNumberOfReceivedACKs; 01312 } 01313 else { 01314 } 01315 } 01316 01317 01318 // -------------------- TCP_Receiver_Buffer ---------------------------------------- 01319 TCP_Receiver_Buffer::TCP_Receiver_Buffer() : 01320 fFirstByte() 01321 { 01322 } 01323 01324 01325 TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) : 01326 fFirstByte(rhs.fFirstByte), 01327 fBufList(rhs.fBufList) 01328 { 01329 } 01330 01331 01332 void TCP_Receiver_Buffer::reset() 01333 { 01334 fBufList.clear(); 01335 fFirstByte = 0; 01336 } 01337 01338 01339 TCP_Receiver_Buffer::~TCP_Receiver_Buffer() 01340 { 01341 } 01342 01343 01344 void TCP_Receiver_Buffer::write(TCP_Segment newBlock) 01345 { 01346 // error cases 01347 it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment"); 01348 01349 // cut blocks beginning before fFirstByte 01350 if (newBlock.begin() < fFirstByte) { 01351 if (newBlock.end() > fFirstByte) { 01352 newBlock.set_begin(fFirstByte); 01353 } 01354 else { 01355 return; //// TODO: Is this strange? 01356 } 01357 } 01358 01359 if (newBlock.length() == 0) { // empty block, nothing to do 01360 return; 01361 } 01362 01363 if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) { 01364 // new block is behind last block in buffer 01365 fBufList.push_back(newBlock); 01366 } 01367 else { 01368 // skip list entries if beginning of newBlock > end of current one 01369 // (search for correct list position) 01370 std::list<TCP_Segment>::iterator iter; 01371 iter = fBufList.begin(); 01372 while (newBlock.begin() > iter->end()) { 01373 iter++; 01374 it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error"); 01375 } 01376 01377 TCP_Segment & exBlock = *iter; 01378 01379 if (exBlock.can_be_combined(newBlock)) { 01380 // overlapping or contiguous blocks -> combine 01381 exBlock.combine(newBlock); 01382 01383 // check following blocks 01384 iter++; 01385 while ((iter != fBufList.end()) && 01386 exBlock.can_be_combined(*iter)) { 01387 exBlock.combine(*iter); 01388 iter = fBufList.erase(iter); 01389 } 01390 } 01391 else { 01392 // no overlap, newBlock lies between two existing list entries 01393 // new list entry has to be created 01394 01395 fBufList.insert(iter, newBlock); 01396 } 01397 } 01398 01399 it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error"); 01400 01401 } 01402 01403 01404 // The amount of data read from the buffer is given as parameter. It has 01405 // to be less than or equal to the size of the first block stored. This 01406 // mean the caller of Read should first check how much data is available 01407 // by calling FirstBlockSize. 01408 void TCP_Receiver_Buffer::read(unsigned noOfBytes) 01409 { 01410 it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read"); 01411 it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid"); 01412 01413 01414 if (noOfBytes < first_block_size()) { 01415 fBufList.front().set_begin(fBufList.front().begin() + noOfBytes); 01416 } 01417 else { // first block will be read completely 01418 fBufList.pop_front(); 01419 } 01420 fFirstByte += noOfBytes; 01421 01422 it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error"); 01423 } 01424 01425 01426 // FirstBlockSize returns the size of the first block stored in the 01427 // buffer or 0 if the buffer is empty 01428 unsigned TCP_Receiver_Buffer::first_block_size() const 01429 { 01430 if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) { 01431 return fBufList.front().length(); 01432 } 01433 else { 01434 return 0; 01435 } 01436 } 01437 01438 01439 std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const 01440 { 01441 os << "receiver buffer information" << std::endl 01442 << "number of blocks: " << fBufList.size() << std::endl 01443 << "first byte stored: " << fFirstByte << std::endl 01444 << "last byte stored +1: " << last_byte() << std::endl 01445 << "next byte expected: " << next_expected() << std::endl; 01446 01447 if (detail > 0) { 01448 os << "segments in receiver buffer:" << std::endl; 01449 01450 typedef std::list<TCP_Segment>::const_iterator LI; 01451 for (LI i = fBufList.begin(); i != fBufList.end(); ++i) { 01452 const TCP_Segment & block = *i; 01453 os << ". segment: " << block << std::endl; 01454 } 01455 01456 } 01457 01458 return os; 01459 } 01460 01461 01462 // -------------------- TCP_Receiver ---------------------------------------- 01463 TCP_Receiver::TCP_Receiver(int label) : 01464 fReceiverBuffer(), 01465 fLabel(label), 01466 fTCPIPHeaderLength(TCP_HEADERLENGTH), 01467 fMSS(TCP_RMSS), 01468 fBufferSize(TCP_BUFFERSIZE), 01469 fDelayedACK(TCP_DELAYEDACK), 01470 fACKDelayTime(TCP_ACKDELAYTIME), 01471 fSendPeriodicACKs(TCP_SENDPERIODICACKS), 01472 fStrictPeriodicACKs(TCP_STRICTPERIODICACKS), 01473 fPeriodicACKInterval(TCP_PERIODICACKINTERVAL), 01474 fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY), 01475 fACKOnBufferWrite(TCP_ACKBUFFERWRITE), 01476 fACKOnBufferRead(TCP_ACKBUFFERREAD), 01477 fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE), 01478 fMinUserBlockSize(TCP_MINUSERBLOCKSIZE), 01479 fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY), 01480 fTrace(false), 01481 fDebug(false), 01482 fSessionId(0), 01483 fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler), 01484 fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler), 01485 fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage), 01486 fWaitingACKMsg(0), 01487 fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing) 01488 { 01489 fUserMessage = NULL; 01490 01491 01492 if (!fACKOnBufferRead && !fACKOnBufferWrite) { 01493 // throw(UL_CException("TCP_Receiver::TCP_Receiver", 01494 // "ACKs must be sent on buffer read or write or both")); 01495 } 01496 01497 setup(); 01498 01499 tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet); 01500 tcp_receive.set_name("TCP Receive"); 01501 tcp_send_ack.set_name("TCP send ACK"); 01502 tcp_new_data.set_name("TCP New Data"); 01503 tcp_release.forward(this, &TCP_Receiver::release); 01504 tcp_release.set_name("TCP Release"); 01505 01506 } 01507 01508 01509 TCP_Receiver::~TCP_Receiver() 01510 { 01511 delete fWaitingACKMsg; 01512 delete fUserMessage; 01513 } 01514 01515 01516 void TCP_Receiver::set_debug(const bool enable_debug) 01517 { 01518 fDebug = enable_debug; 01519 tcp_send_ack.set_debug(enable_debug); 01520 tcp_new_data.set_debug(); 01521 } 01522 01523 void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug) 01524 { 01525 fDebug = enable_debug; 01526 tcp_send_ack.set_debug(enable_signal_debug); 01527 tcp_new_data.set_debug(); 01528 } 01529 01530 void TCP_Receiver::set_trace(const bool enable_trace) 01531 { 01532 fTrace = enable_trace; 01533 } 01534 01535 01536 01537 void TCP_Receiver::setup() 01538 { 01539 fAdvRcvWnd = 0; 01540 fAdvRcvNxt = 0; 01541 01542 if (fSendPeriodicACKs) { 01543 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01544 } 01545 01546 fReceiverBuffer.reset(); 01547 01548 received_seq_num_val.set_size(1000); 01549 received_seq_num_val.zeros(); 01550 received_seq_num_time.set_size(1000); 01551 received_seq_num_time.zeros(); 01552 received_seq_num_val(0) = 0; 01553 received_seq_num_time(0) = 0; 01554 received_seq_num_index = 1; 01555 } 01556 01557 std::string TCP_Receiver::GenerateFilename() 01558 { 01559 time_t rawtime; 01560 struct tm *timeinfo; 01561 timeinfo = localtime(&rawtime); 01562 std::ostringstream filename_stream; 01563 filename_stream << "trace_tcp_receiver_u" << fLabel 01564 << "_" << 1900 + timeinfo->tm_year 01565 << "_" << timeinfo->tm_mon 01566 << "_" << timeinfo->tm_mday 01567 << "__" << timeinfo->tm_hour 01568 << "_" << timeinfo->tm_min 01569 << "_" << timeinfo->tm_sec 01570 << "_.it"; 01571 return filename_stream.str(); 01572 } 01573 01574 void TCP_Receiver::release(std::string file) 01575 { 01576 std::string filename; 01577 fSessionId++; 01578 01579 if (fWaitingACKMsg != 0) { 01580 delete fWaitingACKMsg; 01581 fWaitingACKMsg = 0; 01582 } 01583 if (fUserMessage != 0) { 01584 delete fUserMessage; 01585 fUserMessage = 0; 01586 } 01587 01588 fUserBlockProcTimer.Reset(); 01589 fDelayedACKTimer.Reset(); 01590 fPeriodicACKTimer.Reset(); 01591 fACKSchedulingTimer.Reset(); 01592 01593 if (fTrace) { 01594 if (file == "") 01595 filename = GenerateFilename(); 01596 else 01597 filename = file; 01598 01599 save_trace(filename); 01600 } 01601 } 01602 01603 01604 void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg) 01605 { 01606 TCP_Packet & packet = (TCP_Packet &) * msg; 01607 if (packet.get_destination_port() == fLabel) { 01608 if (packet.get_session_id() == fSessionId) { 01609 ReceiveDataPacket(packet); 01610 } 01611 else { 01612 it_warning("Received a TCP packet with wrong SessionId"); 01613 std::cout << "TCP_Receiver::ReceiveMessageFromNet, " 01614 << "fLabel= " << fLabel 01615 << "fSessionId= " << fSessionId << std::endl; 01616 std::cout << "packet=" << packet 01617 << ", next exp. = " << fReceiverBuffer.next_expected() 01618 << std::endl; 01619 exit(0); 01620 } 01621 } 01622 else { 01623 it_warning("Received a TCP packet with label"); 01624 exit(0); 01625 } 01626 } 01627 01628 01629 void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg) 01630 { 01631 TCP_Segment segment = msg.get_segment(); 01632 01633 bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) || 01634 (segment.end() <= fReceiverBuffer.next_expected()); 01635 01636 if (fDebug) { 01637 std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": " 01638 << "receive msg: " 01639 << "t = " << Event_Queue::now() 01640 << ", next exp. = " << fReceiverBuffer.next_expected() 01641 << ", " << msg << std::endl; 01642 } 01643 01644 if (fTrace) { 01645 TraceReceivedSeqNo(segment.end()); 01646 } 01647 01648 it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at "); 01649 it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at "); 01650 01651 fReceiverBuffer.write(segment); 01652 01653 if (isOutOfOrder) { 01654 SendACK(true); // create dupack conditionless 01655 } 01656 else { 01657 if (fACKOnBufferWrite) { 01658 SendACK(false); 01659 } 01660 IndicateUserMessage(); 01661 } 01662 01663 delete &msg; 01664 } 01665 01666 01667 void TCP_Receiver::IndicateUserMessage() 01668 { 01669 if (fUserMessage == 0) { 01670 // receive a block 01671 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01672 fMaxUserBlockSize); 01673 01674 if (fDebug) { 01675 std::cout << "TCP_Receiver::IndicateUserMessage " 01676 << "t = " << Event_Queue::now() 01677 << " noOfBytes = " << noOfBytes 01678 << " firstBlock = " << fReceiverBuffer.first_block_size() 01679 << std::endl; 01680 } 01681 01682 if (noOfBytes >= fMinUserBlockSize) { 01683 fUserMessage = new Packet(); 01684 fUserMessage->set_bit_size(8*noOfBytes); 01685 fUserBlockProcTimer.Set(fUserBlockProcDelay); 01686 } 01687 } 01688 } 01689 01690 01691 bool TCP_Receiver::is_user_message_available() 01692 { 01693 if (fUserMessage != 0) { 01694 return true; 01695 } 01696 01697 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01698 fMaxUserBlockSize); 01699 01700 if (noOfBytes >= fMinUserBlockSize) { 01701 fUserMessage = new Packet(); 01702 fUserMessage->set_bit_size(8*noOfBytes); 01703 return true; 01704 } 01705 else { 01706 return false; 01707 } 01708 } 01709 01710 01711 itpp::Packet & TCP_Receiver::get_user_message() 01712 { 01713 it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available"); 01714 if (fDebug) { 01715 std::cout << "TCP_Receiver::GetUserMessage " 01716 << "receiver: " << fLabel << ": " 01717 << "read from buffer: " 01718 << "t = " << Event_Queue::now() 01719 << ", user msg length = " << (fUserMessage->bit_size() / 8) 01720 << ", first byte = " << fReceiverBuffer.first_byte() 01721 << ", first block size = " << fReceiverBuffer.first_block_size() 01722 << std::endl; 01723 } 01724 01725 fReceiverBuffer.read(fUserMessage->bit_size() / 8); 01726 if (fACKOnBufferRead) { 01727 SendACK(false); // send acknowledgement 01728 } 01729 01730 itpp::Packet & msg = *fUserMessage; 01731 fUserMessage = 0; 01732 01733 if (fReceiverBuffer.first_block_size() > 0) { 01734 IndicateUserMessage(); 01735 } 01736 01737 return msg; 01738 } 01739 01740 01741 01742 void TCP_Receiver::HandleEndOfProcessing(Ttype) 01743 { 01744 it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available"); 01745 01746 01747 tcp_new_data(fLabel); 01748 } 01749 01750 01751 void TCP_Receiver::DelayedACKHandler(Ttype) 01752 { 01753 if (fDebug) { 01754 std::cout << "TCP_Receiver::DelayedACKHandler " 01755 << "receiver " << fLabel 01756 << ": delACK TO: " 01757 << "t = " << Event_Queue::now() << std::endl; 01758 } 01759 01760 SendACK(true); 01761 } 01762 01763 01764 void TCP_Receiver::PeriodicACKHandler(Ttype) 01765 { 01766 if (fDebug) { 01767 std::cout << "TCP_Receiver::PeriodicACKHandler" 01768 << "receiver " << fLabel 01769 << ": periodicACK TO: " 01770 << "t = " << Event_Queue::now() << std::endl; 01771 } 01772 01773 SendACK(true); 01774 } 01775 01776 01777 void TCP_Receiver::SendACK(bool sendConditionless) 01778 { 01779 // sendConditionless is set 01780 // ... if packet was received out of order or 01781 // ... if delayed ACK timer has expired 01782 01783 // Bei eingeschaltetem "delayed ACK" wird ein ACK nur 01784 // gesendet, wenn das Fenster um 2MSS oder 35% der 01785 // maximalen Fenstergroesse verschoben worden ist 01786 // ... oder nach delayed ACK Timeout 01787 // ... oder wenn es das ACK fur ein Out of Order Segment ist 01788 // ... oder (in der Realitat), wenn ich auch was zu senden habe. 01789 01790 if (sendConditionless || !fDelayedACK || 01791 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) || 01792 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= 01793 (int)(0.35 * fBufferSize))) { 01794 // Remark: RFC2581 recommends to acknowledge every second 01795 // packet conditionless (without setting this as a requirement) 01796 // in order to avoid excessive ack delays when the receiver MSS 01797 // is larger than the sender MSS. In this uni-directional 01798 // implementation, the receiver's MSS is not actively 01799 // used for sending but only for deciding when acknowledgments 01800 // have to be returned. Thus, the best solution to account for 01801 // RFC2581 is to set the receiver's MSS always equal to the 01802 // sender's MSS. 01803 01804 // Receiver Silly Window Syndrome Avoidance: 01805 01806 if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS) 01807 <= fReceiverBuffer.first_byte() + fBufferSize) { 01808 // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt 01809 // als beim letzten ACK, wenn sie sich seither um mindestens 01810 // min (BufferSize/ 2, MSS) geandert hat. 01811 fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size(); 01812 } 01813 else { 01814 fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected(); 01815 } 01816 01817 fAdvRcvNxt = fReceiverBuffer.next_expected(); 01818 01819 if (fSendPeriodicACKs && 01820 (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) { 01821 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01822 } 01823 01824 if (fDelayedACK && fDelayedACKTimer.IsPending()) { 01825 fDelayedACKTimer.Reset(); 01826 } 01827 01828 ScheduleACKMessage(); 01829 } 01830 else { 01831 if (!fDelayedACKTimer.IsPending()) { 01832 fDelayedACKTimer.Set(fACKDelayTime); 01833 if (fDebug) { 01834 std::cout << "TCP_Receiver::SendACK" 01835 << "receiver " << fLabel 01836 << ": set delACK timer: " 01837 << "t = " << Event_Queue::now() << std::endl; 01838 } 01839 } 01840 } 01841 } 01842 01843 01844 void TCP_Receiver::ScheduleACKMessage() 01845 { 01846 if (fWaitingACKMsg == 0) { 01847 fWaitingACKMsg = new TCP_Packet; 01848 } 01849 01850 fWaitingACKMsg->set_ACK(fAdvRcvNxt); 01851 fWaitingACKMsg->set_wnd(fAdvRcvWnd); 01852 fWaitingACKMsg->set_session_id(fSessionId); 01853 fWaitingACKMsg->set_destination_port(fLabel); 01854 fWaitingACKMsg->set_source_port(fLabel); 01855 fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength); 01856 01857 if (fACKSchedulingDelay > 0) { 01858 if (!fACKSchedulingTimer.IsPending()) { 01859 fACKSchedulingTimer.Set(fACKSchedulingDelay); 01860 } 01861 } 01862 else { 01863 SendACKMessage(Event_Queue::now()); 01864 } 01865 } 01866 01867 01868 void TCP_Receiver::SendACKMessage(Ttype) 01869 { 01870 it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting"); 01871 01872 if (fDebug) { 01873 std::cout << "TCP_Receiver::SendACKMessage Ack sent" 01874 << "receiver " << fLabel 01875 << ": send ACK: " 01876 << "t = " << Event_Queue::now() 01877 << ", " << (*fWaitingACKMsg) 01878 << " byte_size=" << fWaitingACKMsg->bit_size() / 8 01879 << " ptr=" << fWaitingACKMsg << std::endl; 01880 } 01881 01882 tcp_send_ack(fWaitingACKMsg); 01883 01884 fWaitingACKMsg = 0; 01885 } 01886 01887 01888 void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn) 01889 { 01890 if (fDebug) { 01891 std::cout << "TCP_Receiver::TraceReceivedSeqNo " 01892 << "receiver " << fLabel 01893 << " t = " << Event_Queue::now() 01894 << " sn = " << sn << std::endl; 01895 } 01896 if (received_seq_num_index >= received_seq_num_time.size()) { 01897 received_seq_num_time.set_size(2*received_seq_num_time.size(), true); 01898 received_seq_num_val.set_size(2*received_seq_num_val.size(), true); 01899 } 01900 received_seq_num_val(received_seq_num_index) = sn.value(); 01901 received_seq_num_time(received_seq_num_index) = Event_Queue::now(); 01902 received_seq_num_index++; 01903 } 01904 01905 01906 void TCP_Receiver::save_trace(std::string filename) 01907 { 01908 01909 received_seq_num_val.set_size(received_seq_num_index, true); 01910 received_seq_num_time.set_size(received_seq_num_index, true); 01911 01912 if (fDebug) { 01913 std::cout << "received_seq_num_val" << received_seq_num_val << std::endl; 01914 std::cout << "received_seq_num_time" << received_seq_num_time << std::endl; 01915 std::cout << "received_seq_num_index" << received_seq_num_index << std::endl; 01916 std::cout << "TCP_Receiver::saving to file: " << filename << std::endl; 01917 } 01918 01919 it_file ff2; 01920 ff2.open(filename); 01921 01922 ff2 << Name("received_seq_num_val") << received_seq_num_val; 01923 ff2 << Name("received_seq_num_time") << received_seq_num_time; 01924 ff2 << Name("received_seq_num_index") << received_seq_num_index; 01925 01926 ff2.flush(); 01927 ff2.close(); 01928 } 01929 01930 01931 } //namespace itpp 01932 01933 #ifdef _MSC_VER 01934 #pragma warning(default:4355) 01935 #endif 01936
Generated on Wed Jul 27 2011 16:27:05 for IT++ by Doxygen 1.7.4