140 std::function<void(
const Sp<Node>&,
int)> onNewNode;
198 const std::vector<Sp<Value>>&,
199 const time_point&)> onAnnounce {};
211 const Value::Id&)> onRefresh {};
214 using RequestCb = std::function<void(
const Request&,
RequestAnswer&&)>;
216 using RequestExpiredCb = std::function<void(
const Request&,
bool)>;
221 std::unique_ptr<DatagramSocket>&& sock,
222 const Sp<Logger>& log,
225 decltype(NetworkEngine::onError)&& onError,
226 decltype(NetworkEngine::onNewNode)&& onNewNode,
227 decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
228 decltype(NetworkEngine::onPing)&& onPing,
229 decltype(NetworkEngine::onFindNode)&& onFindNode,
230 decltype(NetworkEngine::onGetValues)&& onGetValues,
231 decltype(NetworkEngine::onListen)&& onListen,
232 decltype(NetworkEngine::onAnnounce)&& onAnnounce,
233 decltype(NetworkEngine::onRefresh)&& onRefresh);
257 std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
258 std::vector<Sp<Value>>&& values,
const Query& q,
int version);
260 void tellListenerRefreshed(
const Sp<Node>& n, Tid socket_id,
const InfoHash& hash,
const Blob& ntoken,
const std::vector<Value::Id>& values,
int version);
261 void tellListenerExpired(
const Sp<Node>& n, Tid socket_id,
const InfoHash& hash,
const Blob& ntoken,
const std::vector<Value::Id>& values,
int version);
263 bool isRunning(sa_family_t af)
const;
264 inline want_t want ()
const {
return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
266 void connectivityChanged(sa_family_t);
282 sendPing(
const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
296 return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
297 std::forward<RequestCb>(on_done),
298 std::forward<RequestExpiredCb>(on_expired));
316 RequestCb&& on_done = {},
317 RequestExpiredCb&& on_expired = {});
337 RequestExpiredCb&& on_expired);
367 RequestExpiredCb&& on_expired);
387 RequestExpiredCb&& on_expired);
403 const Value::Id& vid,
406 RequestErrorCb&& on_error,
407 RequestExpiredCb&& on_expired);
422 std::vector<Sp<Value>>&& values,
428 std::vector<Sp<Value>>::iterator begin,
429 std::vector<Sp<Value>>::iterator end,
446 auto n = cache.getNode(
id, addr, scheduler.
time(), 0);
451 std::vector<unsigned> getNodeMessageStats(
bool in) {
452 auto& st = in ? in_stats : out_stats;
453 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
458 void blacklistNode(
const Sp<Node>& n);
460 std::vector<Sp<Node>> getCachedNodes(
const InfoHash&
id, sa_family_t sa_f,
size_t count) {
461 return cache.getCachedNodes(
id, sa_f, count);
464 size_t getNodeCacheSize()
const {
467 size_t getNodeCacheSize(sa_family_t af)
const {
468 return cache.size(af);
471 size_t getRateLimiterSize()
const {
472 return address_rate_limiter.size();
475 size_t getPartialCount()
const {
476 return partial_messages.size();
481 struct PartialMessage;
487 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN +
sizeof(in_addr) +
sizeof(in_port_t)};
489 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN +
sizeof(in6_addr) +
sizeof(in_port_t)};
491 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
494 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
496 static constexpr std::chrono::seconds RX_TIMEOUT {3};
499 static constexpr unsigned BLACKLISTED_MAX {10};
501 static constexpr size_t MTU {1280};
502 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
503 static constexpr size_t MAX_MESSAGE_VALUE_SIZE {56 * 1024};
505 void process(std::unique_ptr<ParsedMessage>&&,
const SockAddr& from);
507 bool rateLimit(
const SockAddr& addr);
509 static bool isMartian(
const SockAddr& addr);
510 bool isNodeBlacklisted(
const SockAddr& addr)
const;
512 void requestStep(Sp<Request> req);
518 void sendRequest(
const Sp<Request>& request);
520 struct MessageStats {
526 unsigned refresh {0};
527 unsigned updateValue {0};
532 int send(
const SockAddr& addr,
const char *buf,
size_t len,
bool confirmed =
false);
534 void sendValueParts(Tid tid,
const std::vector<Blob>& svals,
const SockAddr& addr);
535 std::vector<Blob> packValueHeader(msgpack::sbuffer&, std::vector<Sp<Value>>::const_iterator, std::vector<Sp<Value>>::const_iterator)
const;
536 std::vector<Blob> packValueHeader(msgpack::sbuffer& buf,
const std::vector<Sp<Value>>& values)
const {
537 return packValueHeader(buf, values.begin(), values.end());
539 void maintainRxBuffer(Tid tid);
545 void sendPong(
const SockAddr& addr, Tid tid);
547 void sendNodesValues(
const SockAddr& addr,
551 const std::vector<Sp<Value>>& st,
554 Blob bufferNodes(sa_family_t af,
const InfoHash&
id, std::vector<Sp<Node>>& nodes);
556 std::pair<Blob, Blob> bufferNodes(sa_family_t af,
559 std::vector<Sp<Node>>& nodes,
560 std::vector<Sp<Node>>& nodes6);
562 void sendListenConfirmation(
const SockAddr& addr, Tid tid);
564 void sendValueAnnounced(
const SockAddr& addr, Tid, Value::Id);
566 void sendError(
const SockAddr& addr,
569 const std::string& message,
570 bool include_id=
false);
572 void deserializeNodes(ParsedMessage& msg,
const SockAddr& from);
575 const InfoHash& myid;
576 const NetworkConfig config {};
577 const std::unique_ptr<DatagramSocket> dht_socket;
584 using IpLimiter = RateLimiter;
585 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
586 IpLimiterMap address_rate_limiter;
587 RateLimiter rate_limiter;
588 ssize_t limiter_maintenance {0};
591 std::map<Tid, Sp<Request>> requests {};
592 std::map<Tid, PartialMessage> partial_messages;
594 MessageStats in_stats {}, out_stats {};
595 std::set<SockAddr> blacklist {};
597 Scheduler& scheduler;
599 bool logIncoming_ {
false};