Crypto++
|
00001 // network.cpp - written and placed in the public domain by Wei Dai 00002 00003 #include "pch.h" 00004 #include "network.h" 00005 #include "wait.h" 00006 00007 #define CRYPTOPP_TRACE_NETWORK 0 00008 00009 NAMESPACE_BEGIN(CryptoPP) 00010 00011 #ifdef HIGHRES_TIMER_AVAILABLE 00012 00013 lword LimitedBandwidth::ComputeCurrentTransceiveLimit() 00014 { 00015 if (!m_maxBytesPerSecond) 00016 return ULONG_MAX; 00017 00018 double curTime = GetCurTimeAndCleanUp(); 00019 lword total = 0; 00020 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i) 00021 total += m_ops[i].second; 00022 return SaturatingSubtract(m_maxBytesPerSecond, total); 00023 } 00024 00025 double LimitedBandwidth::TimeToNextTransceive() 00026 { 00027 if (!m_maxBytesPerSecond) 00028 return 0; 00029 00030 if (!m_nextTransceiveTime) 00031 ComputeNextTransceiveTime(); 00032 00033 return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble()); 00034 } 00035 00036 void LimitedBandwidth::NoteTransceive(lword size) 00037 { 00038 if (m_maxBytesPerSecond) 00039 { 00040 double curTime = GetCurTimeAndCleanUp(); 00041 m_ops.push_back(std::make_pair(curTime, size)); 00042 m_nextTransceiveTime = 0; 00043 } 00044 } 00045 00046 void LimitedBandwidth::ComputeNextTransceiveTime() 00047 { 00048 double curTime = GetCurTimeAndCleanUp(); 00049 lword total = 0; 00050 for (unsigned int i=0; i!=m_ops.size(); ++i) 00051 total += m_ops[i].second; 00052 m_nextTransceiveTime = 00053 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000; 00054 } 00055 00056 double LimitedBandwidth::GetCurTimeAndCleanUp() 00057 { 00058 if (!m_maxBytesPerSecond) 00059 return 0; 00060 00061 double curTime = m_timer.ElapsedTimeAsDouble(); 00062 while (m_ops.size() && (m_ops.front().first + 1000 < curTime)) 00063 m_ops.pop_front(); 00064 return curTime; 00065 } 00066 00067 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack) 00068 { 00069 double nextTransceiveTime = TimeToNextTransceive(); 00070 if (nextTransceiveTime) 00071 container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack)); 00072 } 00073 00074 // ************************************************************* 00075 00076 size_t NonblockingSource::GeneralPump2( 00077 lword& byteCount, bool blockingOutput, 00078 unsigned long maxTime, bool checkDelimiter, byte delimiter) 00079 { 00080 m_blockedBySpeedLimit = false; 00081 00082 if (!GetMaxBytesPerSecond()) 00083 { 00084 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter); 00085 m_doPumpBlocked = (ret != 0); 00086 return ret; 00087 } 00088 00089 bool forever = (maxTime == INFINITE_TIME); 00090 unsigned long timeToGo = maxTime; 00091 Timer timer(Timer::MILLISECONDS, forever); 00092 lword maxSize = byteCount; 00093 byteCount = 0; 00094 00095 timer.StartTimer(); 00096 00097 while (true) 00098 { 00099 lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount); 00100 00101 if (curMaxSize || m_doPumpBlocked) 00102 { 00103 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); 00104 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter); 00105 m_doPumpBlocked = (ret != 0); 00106 if (curMaxSize) 00107 { 00108 NoteTransceive(curMaxSize); 00109 byteCount += curMaxSize; 00110 } 00111 if (ret) 00112 return ret; 00113 } 00114 00115 if (maxSize != ULONG_MAX && byteCount >= maxSize) 00116 break; 00117 00118 if (!forever) 00119 { 00120 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); 00121 if (!timeToGo) 00122 break; 00123 } 00124 00125 double waitTime = TimeToNextTransceive(); 00126 if (!forever && waitTime > timeToGo) 00127 { 00128 m_blockedBySpeedLimit = true; 00129 break; 00130 } 00131 00132 WaitObjectContainer container; 00133 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0)); 00134 container.Wait((unsigned long)waitTime); 00135 } 00136 00137 return 0; 00138 } 00139 00140 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking) 00141 { 00142 if (messageCount == 0) 00143 return 0; 00144 00145 messageCount = 0; 00146 00147 lword byteCount; 00148 do { 00149 byteCount = LWORD_MAX; 00150 RETURN_IF_NONZERO(Pump2(byteCount, blocking)); 00151 } while(byteCount == LWORD_MAX); 00152 00153 if (!m_messageEndSent && SourceExhausted()) 00154 { 00155 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true)); 00156 m_messageEndSent = true; 00157 messageCount = 1; 00158 } 00159 return 0; 00160 } 00161 00162 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize) 00163 { 00164 m_blockedBySpeedLimit = false; 00165 00166 size_t curBufSize = GetCurrentBufferSize(); 00167 if (curBufSize <= targetSize && (targetSize || !EofPending())) 00168 return 0; 00169 00170 if (!GetMaxBytesPerSecond()) 00171 return DoFlush(maxTime, targetSize); 00172 00173 bool forever = (maxTime == INFINITE_TIME); 00174 unsigned long timeToGo = maxTime; 00175 Timer timer(Timer::MILLISECONDS, forever); 00176 lword totalFlushed = 0; 00177 00178 timer.StartTimer(); 00179 00180 while (true) 00181 { 00182 size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit()); 00183 if (flushSize || EofPending()) 00184 { 00185 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); 00186 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize); 00187 if (ret) 00188 { 00189 NoteTransceive(ret); 00190 curBufSize -= ret; 00191 totalFlushed += ret; 00192 } 00193 } 00194 00195 if (curBufSize <= targetSize && (targetSize || !EofPending())) 00196 break; 00197 00198 if (!forever) 00199 { 00200 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime()); 00201 if (!timeToGo) 00202 break; 00203 } 00204 00205 double waitTime = TimeToNextTransceive(); 00206 if (!forever && waitTime > timeToGo) 00207 { 00208 m_blockedBySpeedLimit = true; 00209 break; 00210 } 00211 00212 WaitObjectContainer container; 00213 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0)); 00214 container.Wait((unsigned long)waitTime); 00215 } 00216 00217 return totalFlushed; 00218 } 00219 00220 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking) 00221 { 00222 TimedFlush(blocking ? INFINITE_TIME : 0); 00223 return hardFlush && (!!GetCurrentBufferSize() || EofPending()); 00224 } 00225 00226 // ************************************************************* 00227 00228 NetworkSource::NetworkSource(BufferedTransformation *attachment) 00229 : NonblockingSource(attachment), m_buf(1024*16) 00230 , m_waitingForResult(false), m_outputBlocked(false) 00231 , m_dataBegin(0), m_dataEnd(0) 00232 { 00233 } 00234 00235 unsigned int NetworkSource::GetMaxWaitObjectCount() const 00236 { 00237 return LimitedBandwidth::GetMaxWaitObjectCount() 00238 + GetReceiver().GetMaxWaitObjectCount() 00239 + AttachedTransformation()->GetMaxWaitObjectCount(); 00240 } 00241 00242 void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) 00243 { 00244 if (BlockedBySpeedLimit()) 00245 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack)); 00246 else if (!m_outputBlocked) 00247 { 00248 if (m_dataBegin == m_dataEnd) 00249 AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack)); 00250 else 00251 container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack)); 00252 } 00253 00254 AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack)); 00255 } 00256 00257 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter) 00258 { 00259 NetworkReceiver &receiver = AccessReceiver(); 00260 00261 lword maxSize = byteCount; 00262 byteCount = 0; 00263 bool forever = maxTime == INFINITE_TIME; 00264 Timer timer(Timer::MILLISECONDS, forever); 00265 BufferedTransformation *t = AttachedTransformation(); 00266 00267 if (m_outputBlocked) 00268 goto DoOutput; 00269 00270 while (true) 00271 { 00272 if (m_dataBegin == m_dataEnd) 00273 { 00274 if (receiver.EofReceived()) 00275 break; 00276 00277 if (m_waitingForResult) 00278 { 00279 if (receiver.MustWaitForResult() && 00280 !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), 00281 CallStack("NetworkSource::DoPump() - wait receive result", 0))) 00282 break; 00283 00284 unsigned int recvResult = receiver.GetReceiveResult(); 00285 #if CRYPTOPP_TRACE_NETWORK 00286 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); 00287 #endif 00288 m_dataEnd += recvResult; 00289 m_waitingForResult = false; 00290 00291 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size()) 00292 goto ReceiveNoWait; 00293 } 00294 else 00295 { 00296 m_dataEnd = m_dataBegin = 0; 00297 00298 if (receiver.MustWaitToReceive()) 00299 { 00300 if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), 00301 CallStack("NetworkSource::DoPump() - wait receive", 0))) 00302 break; 00303 00304 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd); 00305 m_waitingForResult = true; 00306 } 00307 else 00308 { 00309 ReceiveNoWait: 00310 m_waitingForResult = true; 00311 // call Receive repeatedly as long as data is immediately available, 00312 // because some receivers tend to return data in small pieces 00313 #if CRYPTOPP_TRACE_NETWORK 00314 OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str()); 00315 #endif 00316 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd)) 00317 { 00318 unsigned int recvResult = receiver.GetReceiveResult(); 00319 #if CRYPTOPP_TRACE_NETWORK 00320 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); 00321 #endif 00322 m_dataEnd += recvResult; 00323 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2) 00324 { 00325 m_waitingForResult = false; 00326 break; 00327 } 00328 } 00329 } 00330 } 00331 } 00332 else 00333 { 00334 m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount); 00335 00336 if (checkDelimiter) 00337 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin); 00338 00339 DoOutput: 00340 size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput); 00341 if (result) 00342 { 00343 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), 00344 CallStack("NetworkSource::DoPump() - wait attachment", 0))) 00345 goto DoOutput; 00346 else 00347 { 00348 m_outputBlocked = true; 00349 return result; 00350 } 00351 } 00352 m_outputBlocked = false; 00353 00354 byteCount += m_putSize; 00355 m_dataBegin += m_putSize; 00356 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter) 00357 break; 00358 if (maxSize != ULONG_MAX && byteCount == maxSize) 00359 break; 00360 // once time limit is reached, return even if there is more data waiting 00361 // but make 0 a special case so caller can request a large amount of data to be 00362 // pumped as long as it is immediately available 00363 if (maxTime > 0 && timer.ElapsedTime() > maxTime) 00364 break; 00365 } 00366 } 00367 00368 return 0; 00369 } 00370 00371 // ************************************************************* 00372 00373 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound) 00374 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound) 00375 , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE) 00376 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 00377 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0) 00378 , m_currentSpeed(0), m_maxObservedSpeed(0) 00379 { 00380 } 00381 00382 float NetworkSink::ComputeCurrentSpeed() 00383 { 00384 if (m_speedTimer.ElapsedTime() > 1000) 00385 { 00386 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime(); 00387 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f); 00388 m_byteCountSinceLastTimerReset = 0; 00389 m_speedTimer.StartTimer(); 00390 // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str()); 00391 } 00392 return m_currentSpeed; 00393 } 00394 00395 float NetworkSink::GetMaxObservedSpeed() const 00396 { 00397 lword m = GetMaxBytesPerSecond(); 00398 return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed; 00399 } 00400 00401 unsigned int NetworkSink::GetMaxWaitObjectCount() const 00402 { 00403 return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount(); 00404 } 00405 00406 void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack) 00407 { 00408 if (BlockedBySpeedLimit()) 00409 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack)); 00410 else if (m_wasBlocked) 00411 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack)); 00412 else if (!m_buffer.IsEmpty()) 00413 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack)); 00414 else if (EofPending()) 00415 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack)); 00416 } 00417 00418 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking) 00419 { 00420 if (m_eofState == EOF_DONE) 00421 { 00422 if (length || messageEnd) 00423 throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent"); 00424 00425 return 0; 00426 } 00427 00428 if (m_eofState > EOF_NONE) 00429 goto EofSite; 00430 00431 { 00432 if (m_skipBytes) 00433 { 00434 assert(length >= m_skipBytes); 00435 inString += m_skipBytes; 00436 length -= m_skipBytes; 00437 } 00438 00439 m_buffer.Put(inString, length); 00440 00441 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound) 00442 TimedFlush(0, 0); 00443 00444 size_t targetSize = messageEnd ? 0 : m_maxBufferSize; 00445 if (blocking) 00446 TimedFlush(INFINITE_TIME, targetSize); 00447 00448 if (m_buffer.CurrentSize() > targetSize) 00449 { 00450 assert(!blocking); 00451 m_wasBlocked = true; 00452 m_skipBytes += length; 00453 size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize); 00454 return STDMAX<size_t>(blockedBytes, 1); 00455 } 00456 00457 m_wasBlocked = false; 00458 m_skipBytes = 0; 00459 } 00460 00461 if (messageEnd) 00462 { 00463 m_eofState = EOF_PENDING_SEND; 00464 00465 EofSite: 00466 TimedFlush(blocking ? INFINITE_TIME : 0, 0); 00467 if (m_eofState != EOF_DONE) 00468 return 1; 00469 } 00470 00471 return 0; 00472 } 00473 00474 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize) 00475 { 00476 NetworkSender &sender = AccessSender(); 00477 00478 bool forever = maxTime == INFINITE_TIME; 00479 Timer timer(Timer::MILLISECONDS, forever); 00480 unsigned int totalFlushSize = 0; 00481 00482 while (true) 00483 { 00484 if (m_buffer.CurrentSize() <= targetSize) 00485 break; 00486 00487 if (m_needSendResult) 00488 { 00489 if (sender.MustWaitForResult() && 00490 !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()), 00491 CallStack("NetworkSink::DoFlush() - wait send result", 0))) 00492 break; 00493 00494 unsigned int sendResult = sender.GetSendResult(); 00495 #if CRYPTOPP_TRACE_NETWORK 00496 OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str()); 00497 #endif 00498 m_buffer.Skip(sendResult); 00499 totalFlushSize += sendResult; 00500 m_needSendResult = false; 00501 00502 if (!m_buffer.AnyRetrievable()) 00503 break; 00504 } 00505 00506 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; 00507 if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0))) 00508 break; 00509 00510 size_t contiguousSize = 0; 00511 const byte *block = m_buffer.Spy(contiguousSize); 00512 00513 #if CRYPTOPP_TRACE_NETWORK 00514 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str()); 00515 #endif 00516 sender.Send(block, contiguousSize); 00517 m_needSendResult = true; 00518 00519 if (maxTime > 0 && timeOut == 0) 00520 break; // once time limit is reached, return even if there is more data waiting 00521 } 00522 00523 m_byteCountSinceLastTimerReset += totalFlushSize; 00524 ComputeCurrentSpeed(); 00525 00526 if (m_buffer.IsEmpty() && !m_needSendResult) 00527 { 00528 if (m_eofState == EOF_PENDING_SEND) 00529 { 00530 sender.SendEof(); 00531 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE; 00532 } 00533 00534 while (m_eofState == EOF_PENDING_DELIVERY) 00535 { 00536 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; 00537 if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0))) 00538 break; 00539 00540 if (sender.EofSent()) 00541 m_eofState = EOF_DONE; 00542 } 00543 } 00544 00545 return totalFlushSize; 00546 } 00547 00548 #endif // #ifdef HIGHRES_TIMER_AVAILABLE 00549 00550 NAMESPACE_END