/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "crypto/AddressCalc.h"
#include "crypto/Key.h"
#include "dht/dhtcore/ReplySerializer.h"
#include "subnode/SupernodeHunter.h"
#include "subnode/AddrSet.h"
#include "util/Identity.h"
#include "util/platform/Sockaddr.h"
#include "util/events/Timeout.h"
#include "util/AddrTools.h"
#include "util/events/Time.h"
#include "switch/LabelSplicer.h"
#include
#define CYCLE_MS 3000
struct SupernodeHunter_pvt
{
struct SupernodeHunter pub;
/** Nodes which are authorized to be our supernode. */
struct AddrSet* authorizedSnodes;
/** Our peers, DO NOT TOUCH, changed from in SubnodePathfinder. */
struct AddrSet* myPeerAddrs;
struct AddrSet* blacklist;
// Number of the next peer to ping in the peers AddrSet
int nextPeer;
// Will be set to the best known supernode possibility
struct Address snodeCandidate;
bool snodePathUpdated;
struct Allocator* alloc;
struct Log* log;
struct MsgCore* msgCore;
EventBase_t* base;
struct SwitchPinger* sp;
struct Address* myAddress;
String* selfAddrStr;
struct ReachabilityCollector* rc;
Identity
};
struct Query
{
struct SupernodeHunter_pvt* snp;
// If this is a findNode request, this is the search target, if it's a getPeers it's null.
struct Address* searchTar;
int64_t sendTime;
bool isGetRoute;
Identity
};
int SupernodeHunter_addSnode(struct SupernodeHunter* snh, struct Address* snodeAddr)
{
struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
int length0 = snp->authorizedSnodes->length;
AddrSet_add(snp->authorizedSnodes, snodeAddr, AddrSet_Match_ADDRESS_ONLY);
if (snp->authorizedSnodes->length == length0) {
return SupernodeHunter_addSnode_EXISTS;
}
return 0;
}
int SupernodeHunter_listSnodes(struct SupernodeHunter* snh,
struct Address*** outP,
struct Allocator* alloc)
{
struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
struct Address** out = Allocator_calloc(alloc, sizeof(char*), snp->authorizedSnodes->length);
for (int i = 0; i < snp->authorizedSnodes->length; i++) {
out[i] = AddrSet_get(snp->authorizedSnodes, i);
}
*outP = out;
return snp->authorizedSnodes->length;
}
int SupernodeHunter_removeSnode(struct SupernodeHunter* snh, struct Address* toRemove)
{
struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
int length0 = snp->authorizedSnodes->length;
AddrSet_remove(snp->authorizedSnodes, toRemove, AddrSet_Match_ADDRESS_ONLY);
if (snp->authorizedSnodes->length == length0) {
return SupernodeHunter_removeSnode_NONEXISTANT;
}
return 0;
}
static struct Address* getPeerByNpn(struct SupernodeHunter_pvt* snp, int npn)
{
npn = npn % snp->myPeerAddrs->length;
int i = npn;
do {
struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
if (peer && peer->protocolVersion > 19) { return peer; }
i = (i + 1) % snp->myPeerAddrs->length;
} while (i != npn);
return NULL;
}
static void adoptSupernode2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
{
struct Query* q = Identity_check((struct Query*) prom->userData);
struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
if (!src) {
Log_debug(snp->log, "timeout sending to %s",
Address_toString(prom->target, prom->alloc)->bytes);
// If we're in this state and it doesn't work, we're going to drop the snode and
// go back to the beginning because while there's a possibility of a lost packet,
// it's a bigger possibility that we don't have a working path and we'd better
// try another one.
AddrSet_add(snp->blacklist, prom->target, AddrSet_Match_BOTH);
Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
snp->snodePathUpdated = false;
return;
}
Log_debug(snp->log, "Reply from %s", Address_toString(src, prom->alloc)->bytes);
int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
if (!snodeRecvTime) {
Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
return;
}
Log_debug(snp->log, "\n\nSupernode location confirmed [%s]\n\n",
Address_toString(src, prom->alloc)->bytes);
if (snp->pub.snodeIsReachable) {
// If while we were searching, the outside code declared that indeed the snode
// is reachable, we will not try to change their snode.
} else if (snp->pub.onSnodeChange) {
Bits_memcpy(&snp->pub.snodeAddr, src, Address_SIZE);
snp->pub.snodeIsReachable = (
AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
) ? 2 : 1;
snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
} else {
Log_warn(snp->log, "onSnodeChange is not set");
}
}
static void adoptSupernode(struct SupernodeHunter_pvt* snp, struct Address* candidate)
{
struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
Identity_set(q);
q->snp = snp;
q->sendTime = Time_currentTimeMilliseconds();
Dict* msg = qp->msg = Dict_new(qp->alloc);
qp->cb = adoptSupernode2;
qp->userData = q;
qp->target = Address_clone(candidate, qp->alloc);
// NOTE: we don't immediately request a path because the RS doesn't know about us
// quite yet, so it will tell us it doesn't know a path, so we need to ping it
// and take it on faith until we get some announcements announced.
Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
Dict_putStringCC(msg, "sq", "pn", qp->alloc);
Assert_true(AddressCalc_validAddress(candidate->ip6.bytes));
return;
}
static void updateSnodePath2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
{
struct Query* q = Identity_check((struct Query*) prom->userData);
struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
if (!src) {
String* addrStr = Address_toString(prom->target, prom->alloc);
Log_debug(snp->log, "timeout sending to %s", addrStr->bytes);
return;
}
int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
if (!snodeRecvTime) {
Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
return;
}
struct Address_List* al = ReplySerializer_parse(src, msg, snp->log, false, prom->alloc);
if (!al || al->length == 0) {
Log_debug(snp->log, "Requesting route to snode [%s], it doesn't know one",
Address_toString(prom->target, prom->alloc)->bytes);
return;
}
Log_debug(snp->log, "Supernode path updated with [%s]",
Address_toString(&al->elems[0], prom->alloc)->bytes);
snp->snodePathUpdated = true;
if (!Bits_memcmp(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE)) {
Log_debug(snp->log, "Requestes route to snode [%s], the one we have is fine",
Address_toString(prom->target, prom->alloc)->bytes);
return;
}
Bits_memcpy(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE);
Bits_memcpy(&snp->snodeCandidate, &al->elems[0], Address_SIZE);
AddrSet_flush(snp->blacklist);
if (snp->pub.onSnodeChange) {
snp->pub.snodeIsReachable = (
AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
) ? 2 : 1;
snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
}
}
static void updateSnodePath(struct SupernodeHunter_pvt* snp)
{
struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
Identity_set(q);
q->snp = snp;
q->sendTime = Time_currentTimeMilliseconds();
Dict* msg = qp->msg = Dict_new(qp->alloc);
qp->cb = updateSnodePath2;
qp->userData = q;
qp->target = Address_clone(&snp->pub.snodeAddr, qp->alloc);
Log_debug(snp->log, "Update snode [%s] path", Address_toString(qp->target, qp->alloc)->bytes);
Dict_putStringCC(msg, "sq", "gr", qp->alloc);
String* src = String_newBinary(snp->myAddress->ip6.bytes, 16, qp->alloc);
Dict_putStringC(msg, "src", src, qp->alloc);
String* target = String_newBinary(snp->pub.snodeAddr.ip6.bytes, 16, qp->alloc);
Dict_putStringC(msg, "tar", target, qp->alloc);
}
static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* snode)
{
/*
struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
Identity_set(q);
q->snp = snp;
q->sendTime = Time_currentTimeMilliseconds();
Dict* msg = qp->msg = Dict_new(qp->alloc);
qp->cb = onReply;
qp->userData = q;
qp->target = candidate;
Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
Dict_putStringCC(msg, "sq", "gr", qp->alloc);
*/
}
static void peerResponseOK(struct SwitchPinger_Response* resp, struct SupernodeHunter_pvt* snp)
{
ReachabilityCollector_lagSample(snp->rc, resp->label, resp->milliseconds);
struct Address snode = {0};
Bits_memcpy(&snode, &resp->snode, sizeof(struct Address));
if (!snode.path) {
uint8_t label[20];
AddrTools_printPath(label, resp->label);
Log_debug(snp->log, "Peer [%s] reports no supernode", label);
return;
}
uint64_t path = LabelSplicer_splice(snode.path, resp->label);
if (path == UINT64_MAX) {
Log_debug(snp->log, "Supernode path could not be spliced");
return;
}
snode.path = path;
struct Address peerAddr = { .path = resp->label };
int i = AddrSet_indexOf(snp->myPeerAddrs, &peerAddr, AddrSet_Match_LABEL_ONLY);
if (i == -1) {
Log_info(snp->log, "We got a snode reply from a node which is not in peer list");
return;
}
struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
struct Address* firstPeer = getPeerByNpn(snp, 0);
if (!firstPeer) {
Log_info(snp->log, "All peers have gone away while packet was outstanding");
return;
}
// 1.
// If we have looped around and queried all of our peers returning to the first and we have
// still not found an snode in our authorized snodes list, we should simply accept this one.
if (!snp->pub.snodeIsReachable &&
snp->myPeerAddrs->length > 1 &&
snp->nextPeer >= snp->myPeerAddrs->length &&
Address_isSameIp(firstPeer, peer))
{
if (!snp->snodeCandidate.path) {
Log_info(snp->log, "No snode candidate found [%s]",
Address_toStringKey(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
snp->nextPeer = 0;
AddrSet_flush(snp->blacklist);
return;
}
Log_debug(snp->log, "Peer [%s] has proposed we use supernode [%s] we will accept it",
Address_toString(peer, resp->ping->pingAlloc)->bytes,
Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
adoptSupernode(snp, &snp->snodeCandidate);
return;
}
// 2.
// If this snode is one of our authorized snodes OR if we have none defined, accept this one.
if (AddrSet_indexOf(snp->blacklist, &snode, AddrSet_Match_BOTH) > -1) {
Log_debug(snp->log, "Peer [%s] [%" PRIx64 "] has proposed supernode [%s] "
"but it is blacklisted, continue",
Address_toString(peer, resp->ping->pingAlloc)->bytes,
resp->label,
Address_toString(&snode, resp->ping->pingAlloc)->bytes);
} else if (!snp->authorizedSnodes->length ||
AddrSet_indexOf(snp->authorizedSnodes, &snode, AddrSet_Match_ADDRESS_ONLY) > -1)
{
Address_getPrefix(&snode);
Log_debug(snp->log, "Peer [%s] has proposed supernode [%s] and %s so we will use it",
Address_toString(peer, resp->ping->pingAlloc)->bytes,
Address_toString(&snode, resp->ping->pingAlloc)->bytes,
(snp->authorizedSnodes->length) ? "it is authorized" : "we have none authorized");
adoptSupernode(snp, &snode);
return;
} else if (!snp->snodeCandidate.path) {
Log_debug(snp->log, "Peer [%s] has proposed supernode [%s], we're not using it yet "
"but we will store it as a candidate.",
Address_toString(peer, resp->ping->pingAlloc)->bytes,
Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
Bits_memcpy(&snp->snodeCandidate, &snode, sizeof(struct Address));
Address_getPrefix(&snp->snodeCandidate);
}
// 3.
// If this snode is not one of our authorized snodes, query it for all of our authorized snodes.
queryForAuthorized(snp, &snode);
}
static void peerResponse(struct SwitchPinger_Response* resp, void* userData)
{
struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) userData);
char* err = "";
switch (resp->res) {
case SwitchPinger_Result_OK: peerResponseOK(resp, snp); return;
case SwitchPinger_Result_LABEL_MISMATCH: err = "LABEL_MISMATCH"; break;
case SwitchPinger_Result_WRONG_DATA: err = "WRONG_DATA"; break;
case SwitchPinger_Result_ERROR_RESPONSE: err = "ERROR_RESPONSE"; break;
case SwitchPinger_Result_LOOP_ROUTE: err = "LOOP_ROUTE"; break;
case SwitchPinger_Result_TIMEOUT: err = "TIMEOUT"; break;
default: err = "unknown error"; break;
}
Log_debug(snp->log, "Error sending snp query to peer [%" PRIx64 "] [%s]",
resp->label, err);
}
static void probePeerCycle(void* vsn)
{
struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) vsn);
if (snp->pub.snodeIsReachable && !snp->snodePathUpdated) {
updateSnodePath(snp);
}
if (snp->pub.snodeIsReachable > 1) { return; }
if (snp->pub.snodeIsReachable && !snp->authorizedSnodes->length) { return; }
if (!snp->myPeerAddrs->length) { return; }
//Log_debug(snp->log, "probePeerCycle()");
if (AddrSet_indexOf(snp->authorizedSnodes, snp->myAddress, AddrSet_Match_ADDRESS_ONLY) != -1) {
Log_info(snp->log, "Self is specified as supernode, pinging...");
adoptSupernode(snp, snp->myAddress);
return;
}
struct Address* peer = getPeerByNpn(snp, snp->nextPeer);
if (!peer) {
Log_info(snp->log, "No peer found who is version >= 20");
return;
}
struct SwitchPinger_Ping* p =
SwitchPinger_newPing(peer->path, String_CONST(""), 3000, peerResponse, snp->alloc, snp->sp);
Assert_true(p);
Log_debug(snp->log, "Querying peer [%s] [%d] total [%d], blacklist size [%d]",
Address_toString(peer, p->pingAlloc)->bytes,
snp->nextPeer,
snp->myPeerAddrs->length,
snp->blacklist->length);
snp->nextPeer++;
p->type = SwitchPinger_Type_GETSNODE;
if (snp->pub.snodeIsReachable) {
Bits_memcpy(&p->snode, &snp->pub.snodeAddr, sizeof(struct Address));
}
p->onResponseContext = snp;
}
static void onSnodeUnreachable(struct SupernodeHunter* snh,
int64_t sendTime,
int64_t snodeRecvTime)
{
struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
Log_debug(snp->log, "Supernode unreachable.");
snp->snodePathUpdated = false;
// Snode unreachable, we need also reset peer snode candidate
Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
}
struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
struct Log* log,
EventBase_t* base,
struct SwitchPinger* sp,
struct AddrSet* peers,
struct MsgCore* msgCore,
struct Address* myAddress,
struct ReachabilityCollector* rc)
{
struct Allocator* alloc = Allocator_child(allocator);
struct SupernodeHunter_pvt* out =
Allocator_calloc(alloc, sizeof(struct SupernodeHunter_pvt), 1);
Identity_set(out);
out->authorizedSnodes = AddrSet_new(alloc);
out->blacklist = AddrSet_new(alloc);
out->myPeerAddrs = peers;
out->base = base;
//out->rand = rand;
//out->nodes = AddrSet_new(alloc);
//out->timeSnodeCalled = Time_currentTimeMilliseconds();
//out->snodeCandidates = AddrSet_new(alloc);
out->log = log;
out->alloc = alloc;
out->msgCore = msgCore;
out->myAddress = myAddress;
out->rc = rc;
out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc);
out->sp = sp;
out->snodePathUpdated = false;
out->pub.onSnodeUnreachable = onSnodeUnreachable;
Timeout_setInterval(probePeerCycle, out, CYCLE_MS, base, alloc);
return &out->pub;
}