/* vim: set expandtab ts=4 sw=4: */ /* * You may redistribute this program and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, * either version 3 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "crypto/AddressCalc.h" #include "crypto/Ca.h" #include "interface/Iface.h" #include "net/InterfaceController.h" #include "memory/Allocator.h" #include "net/SwitchPinger.h" #include "wire/PFChan.h" #include "net/EventEmitter.h" #include "util/Base32.h" #include "util/Bits.h" #include "util/events/Time.h" #include "util/events/Timeout.h" #include "util/Identity.h" #include "util/version/Version.h" #include "util/AddrTools.h" #include "util/Defined.h" #include "util/Checksum.h" #include "util/Hex.h" #include "util/Kbps.h" #include "wire/Error.h" #include "wire/Message.h" #include "wire/Headers.h" #include "wire/Metric.h" #include "wire/CryptoHeader.h" /** After this number of milliseconds, a node will be regarded as unresponsive. */ #define UNRESPONSIVE_AFTER_MILLISECONDS (20*1024) /** * After this number of milliseconds without a valid incoming message, * a peer is "lazy" and should be pinged. */ #define PING_AFTER_MILLISECONDS (3*1024) /** How often to ping "lazy" peers, "unresponsive" peers are only pinged 20% of the time. */ #define PING_INTERVAL_MILLISECONDS 1024 /** The number of milliseconds to wait for a ping response. */ #define TIMEOUT_MILLISECONDS (2*1024) /** * The number of seconds to wait before an unresponsive peer * making an incoming connection is forgotten. */ #define FORGET_AFTER_MILLISECONDS (256*1024) /** Wait 32 seconds between sending beacon messages. */ #define BEACON_INTERVAL 32768 /** Every 3 seconds inform the pathfinder of the current link states. */ #define LINKSTATE_UPDATE_INTERVAL 3000 // Recommended by boringtun documentation #define HANDSHAKE_CYCLE_INTERVAL 100 // ---------------- Map ---------------- #define Map_NAME EndpointsBySockaddr #define Map_ENABLE_HANDLES #define Map_KEY_TYPE struct Sockaddr* #define Map_VALUE_TYPE struct Peer* #define Map_USE_HASH #define Map_USE_COMPARATOR #include "util/Map.h" static inline uint32_t Map_EndpointsBySockaddr_hash(struct Sockaddr** key) { return Sockaddr_hash(*key); } static inline int Map_EndpointsBySockaddr_compare(struct Sockaddr** keyA, struct Sockaddr** keyB) { return Sockaddr_compare(*keyA, *keyB); } // ---------------- EndMap ---------------- #define ArrayList_TYPE struct InterfaceController_Iface_pvt #define ArrayList_NAME OfIfaces #include "util/ArrayList.h" struct InterfaceController_pvt; struct InterfaceController_Iface_pvt { struct InterfaceController_Iface pub; struct Map_EndpointsBySockaddr peerMap; /** The number of the next peer to try pinging, this iterates through the list of peers. */ uint32_t lastPeerPinged; struct InterfaceController_pvt* ic; struct Allocator* alloc; Identity }; struct Peer { /** The interface which is registered with the switch. */ struct Iface switchIf; struct Iface plaintext; struct Iface ciphertext; struct Allocator* alloc; Ca_Session_t* caSession; struct Kbps sendBw; struct Kbps recvBw; /** The interface which this peer belongs to. */ struct InterfaceController_Iface_pvt* ici; /** The address within the interface of this peer. */ struct Sockaddr* lladdr; struct Address addr; /** Milliseconds since the epoch when the last *valid* message was received. */ uint64_t timeOfLastMessage; /** Time when the last switch ping response was received from this node. */ uint64_t timeOfLastPing; /** A counter to allow for 3/4 of all pings to be skipped when a node is definitely down. */ uint32_t pingCount; /** The handle which can be used to look up this endpoint in the endpoint set. */ uint32_t handle; /** True if we should forget about the peer if they do not respond. */ bool isIncomingConnection; /** * If InterfaceController_PeerState_UNAUTHENTICATED, no permanent state will be kept. * During transition from HANDSHAKE to ESTABLISHED, a check is done for a registeration of a * node which is already registered in a different switch slot, if there is one and the * handshake completes, it will be moved. */ enum InterfaceController_PeerState state; /** * The number of lost packets last time we checked. * _lastDrops and _lastPackets are the direct readings off of the ReplayProtector * so they will be reset to zero when the session resets. lastDrops and lastPackets * are monotonic and so probably what you want. */ uint64_t _lastDrops; uint64_t _lastPackets; uint64_t lastDrops; uint64_t lastPackets; // traffic counters uint64_t bytesOut; uint64_t bytesIn; Identity }; struct InterfaceController_pvt { /** Public functions and fields for this ifcontroller. */ struct InterfaceController pub; struct Allocator* const alloc; Ca_t* const ca; /** Switch for adding nodes when they are discovered. */ struct SwitchCore* const switchCore; struct Random* const rand; struct Log* const logger; struct EventBase* const eventBase; /** For communicating with the Pathfinder. */ struct Iface eventEmitterIf; /** After this number of milliseconds, a neoghbor will be regarded as unresponsive. */ uint32_t unresponsiveAfterMilliseconds; /** The number of milliseconds to wait before pinging. */ uint32_t pingAfterMilliseconds; /** The number of milliseconds to let a ping go before timing it out. */ uint32_t timeoutMilliseconds; /** After this number of milliseconds, an incoming connection is forgotten entirely. */ uint32_t forgetAfterMilliseconds; /** How often to send beacon messages (milliseconds). */ uint32_t beaconInterval; // Whether or not to create sessions using the noise protocol const bool enableNoise; /** The timeout event to use for pinging potentially unresponsive neighbors. */ struct Timeout* const pingInterval; /** The timeout event for updating the link state to the pathfinders. */ struct Timeout* const linkStateInterval; // Timeout for noise re-handshakes struct Timeout* const noiseHandshakeInterval; /** For pinging lazy/unresponsive nodes. */ struct SwitchPinger* const switchPinger; struct ArrayList_OfIfaces* icis; /** Temporary allocator for allocating timeouts for sending beacon messages. */ struct Allocator* beaconTimeoutAlloc; /** A password which is generated per-startup and sent out in beacon messages. */ uint8_t beaconPassword[Headers_Beacon_PASSWORD_LEN]; struct Headers_Beacon beacon; uint8_t ourPubKey[32]; Identity }; static bool knownIncompatibleVersion(uint32_t version) { if (!version) { return false; } else if (Defined(SUBNODE) && version < 21) { // Subnode doesn't talk to peers with less than v21 return true; } return !Version_isCompatible(version, Version_CURRENT_PROTOCOL); } static void sendPeer(uint32_t pathfinderId, enum PFChan_Core ev, struct Peer* peer, uint16_t latency) { if (!peer->addr.protocolVersion || knownIncompatibleVersion(peer->addr.protocolVersion)) { // Don't know the protocol version, never add them return; } struct InterfaceController_pvt* ic = Identity_check(peer->ici->ic); struct Allocator* alloc = Allocator_child(ic->alloc); struct Message* msg = Message_new(PFChan_Node_SIZE, 512, alloc); struct PFChan_Node* node = (struct PFChan_Node*) msg->msgbytes; Bits_memcpy(node->ip6, peer->addr.ip6.bytes, 16); Bits_memcpy(node->publicKey, peer->addr.key, 32); node->path_be = Endian_hostToBigEndian64(peer->addr.path); node->version_be = Endian_hostToBigEndian32(peer->addr.protocolVersion); if (ev != PFChan_Core_PEER_GONE) { Assert_true(peer->addr.protocolVersion); node->metric_be = Endian_hostToBigEndian32(Metric_IC_PEER | (latency & Metric_IC_PEER_MASK)); } else { node->metric_be = Endian_hostToBigEndian32(Metric_DEAD_LINK); } Er_assert(Message_epush32be(msg, pathfinderId)); Er_assert(Message_epush32be(msg, ev)); Iface_send(&ic->eventEmitterIf, msg); Allocator_free(alloc); } static void onPingResponse(struct SwitchPinger_Response* resp, void* onResponseContext) { if (SwitchPinger_Result_OK != resp->res) { return; } struct Peer* ep = Identity_check((struct Peer*) onResponseContext); struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic); ep->addr.protocolVersion = resp->version; if (Defined(Log_DEBUG)) { String* addr = Address_toString(&ep->addr, resp->ping->pingAlloc); if (knownIncompatibleVersion(resp->version)) { Log_debug(ic->logger, "got switch pong from node [%s] with incompatible version", addr->bytes); } else if (ep->addr.path != resp->label) { uint8_t sl[20]; AddrTools_printPath(sl, resp->label); Log_debug(ic->logger, "got switch pong from node [%s] mismatch label [%s]", addr->bytes, sl); } else { Log_debug(ic->logger, "got switch pong from node [%s]", addr->bytes); } } if (knownIncompatibleVersion(resp->version) || ep->addr.path != resp->label) { ep->state = InterfaceController_PeerState_INCOMPATIBLE; return; } if (ep->state == InterfaceController_PeerState_ESTABLISHED) { sendPeer(0xffffffff, PFChan_Core_PEER, ep, resp->milliseconds); } ep->timeOfLastPing = Time_currentTimeMilliseconds(); if (Defined(Log_DEBUG)) { String* addr = Address_toString(&ep->addr, resp->ping->pingAlloc); Log_debug(ic->logger, "Received [%s] from lazy endpoint [%s], state: [%s]", SwitchPinger_resultString(resp->res)->bytes, addr->bytes, InterfaceController_stateString(ep->state)); } } /* * Send a ping packet to one of the endpoints. */ static void sendPing(struct Peer* ep) { struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic); ep->pingCount++; struct SwitchPinger_Ping* ping = SwitchPinger_newPing(ep->addr.path, String_CONST(""), ic->timeoutMilliseconds, onPingResponse, ep->alloc, ic->switchPinger); if (!ping) { struct Allocator* alloc = Allocator_child(ep->alloc); Log_debug(ic->logger, "Sending switch ping to [%s] failed, out of ping slots", Address_toString(&ep->addr, alloc)->bytes); Allocator_free(alloc); } else { Log_debug(ic->logger, "Sending switch ping to [%s]", Address_toString(&ep->addr, ping->pingAlloc)->bytes); } if (ping) { ping->onResponseContext = ep; } } static void linkState(void* vic) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vic); uint32_t msgLen = 64; for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); msgLen += PFChan_LinkState_Entry_SIZE * ici->peerMap.count; } struct Allocator* alloc = Allocator_child(ic->alloc); struct Message* msg = Message_new(0, msgLen, alloc); for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); for (uint32_t i = 0; i < ici->peerMap.count; i++) { struct Peer* ep = ici->peerMap.values[i]; RTypes_CryptoStats_t stats; Ca_stats(ep->caSession, &stats); uint64_t newDrops = 0; // Prevents invalid number when the session resets if (stats.lost_packets > ep->_lastDrops) { newDrops = stats.lost_packets - ep->_lastDrops; } ep->_lastDrops = stats.lost_packets; ep->lastDrops += newDrops; uint64_t newPackets = 0; if (stats.received_packets > ep->_lastPackets) { newPackets = stats.received_packets - ep->_lastPackets; } ep->_lastPackets = stats.received_packets; ep->lastPackets += newPackets; struct PFChan_LinkState_Entry e = { .peerLabel = ep->addr.path, .sumOfPackets = ep->lastPackets, .sumOfDrops = ep->lastDrops, .sumOfKb = (ep->bytesIn >> 10), }; Er_assert(Message_epush(msg, &e, PFChan_LinkState_Entry_SIZE)); } } if (Message_getLength(msg)) { Er_assert(Message_epush32be(msg, 0xffffffff)); Er_assert(Message_epush32be(msg, PFChan_Core_LINK_STATE)); Iface_send(&ic->eventEmitterIf, msg); } Allocator_free(alloc); } static void iciPing(struct InterfaceController_Iface_pvt* ici, struct InterfaceController_pvt* ic) { if (!ici->peerMap.count) { return; } uint64_t now = Time_currentTimeMilliseconds(); // scan for endpoints have not sent anything recently. uint32_t startAt = ici->lastPeerPinged = (ici->lastPeerPinged + 1) % ici->peerMap.count; for (uint32_t i = startAt, count = 0; count < ici->peerMap.count;) { i = (i + 1) % ici->peerMap.count; count++; struct Peer* ep = ici->peerMap.values[i]; if (knownIncompatibleVersion(ep->addr.protocolVersion)) { // This is a version mismatch, we have nothing to do with this node // but we keep the session in INCOMPATIBLE state to keep track of the // fact that we don't want to talk to it. ep->state = InterfaceController_PeerState_INCOMPATIBLE; continue; } uint8_t ipIfDebug[40]; if (Defined(Log_DEBUG)) { Address_printIp(ipIfDebug, &ep->addr); } if (ep->addr.protocolVersion && now < ep->timeOfLastMessage + ic->pingAfterMilliseconds) { // It's sending traffic so leave it alone. // wait just a minute here ! // There is a risk that the NodeStore somehow forgets about our peers while the peers // are still happily sending traffic. To break this bad cycle lets just send a PEER // message once per second for whichever peer is the first that we address. if (count == 1 && ep->state == InterfaceController_PeerState_ESTABLISHED) { // noisy //Log_debug(ic->logger, "Notifying about peer number [%d/%d] [%s]", // i, ici->peerMap.count, ipIfDebug); sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff); } continue; } if (now < ep->timeOfLastPing + ic->pingAfterMilliseconds) { // Possibly an out-of-date node which is mangling packets, don't ping too often // because it causes the RumorMill to be filled with this node over and over. continue; } if (ep->isIncomingConnection && now > ep->timeOfLastMessage + ic->forgetAfterMilliseconds) { Log_debug(ic->logger, "Unresponsive peer [%s] has not responded in [%u] " "seconds, dropping connection", ipIfDebug, ic->forgetAfterMilliseconds / 1024); sendPeer(0xffffffff, PFChan_Core_PEER_GONE, ep, 0xffff); Allocator_free(ep->alloc); continue; } bool unresponsive = (now > ep->timeOfLastMessage + ic->unresponsiveAfterMilliseconds); if (unresponsive) { // our link to the peer is broken... // Lets skip 87% of pings when they're really down. if (ep->pingCount % 8) { ep->pingCount++; continue; } sendPeer(0xffffffff, PFChan_Core_PEER_GONE, ep, 0xffff); ep->state = InterfaceController_PeerState_UNRESPONSIVE; } Log_debug(ic->logger, "Pinging %s peer [%s] lag [%u]", (unresponsive ? "unresponsive" : "lazy"), ipIfDebug, (uint32_t)((now - ep->timeOfLastMessage) / 1024)); sendPing(ep); // we only ping one node return; } } /** * Check the table for nodes which might need to be pinged, ping a node if necessary. * If a node has not responded in unresponsiveAfterMilliseconds then mark them as unresponsive * and if the connection is incoming and the node has not responded in forgetAfterMilliseconds * then drop them entirely. * This is called every PING_INTERVAL_MILLISECONDS */ static void pingCycle(void* vic) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vic); for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); iciPing(ici, ic); } } static Iface_DEFUN afterEncrypt(struct Message* msg, struct Iface* ciphertext) { struct Peer* ep = Identity_containerOf(ciphertext, struct Peer, ciphertext); // push the lladdr... Er_assert(Message_epush(msg, ep->lladdr, ep->lladdr->addrLen)); // very noisy if (Defined(Log_DEBUG) && false) { char* printedAddr = Hex_print(&ep->lladdr[1], ep->lladdr->addrLen - Sockaddr_OVERHEAD, Message_getAlloc(msg)); Log_debug(ep->ici->ic->logger, "Outgoing message to [%s]", printedAddr); } return Iface_send(&ep->ici->pub.addrIf, msg); } // This is directly called from SwitchCore, message is not encrypted. static Iface_DEFUN sendFromSwitch(struct Message* msg, struct Iface* switchIf) { struct Peer* ep = Identity_check((struct Peer*) switchIf); // Once we know it to be an incompetible version, we quarentine it if (knownIncompatibleVersion(ep->addr.protocolVersion)) { if (Defined(Log_DEBUG)) { Log_debug(ep->ici->ic->logger, "[%s] DROP msg to node with incompat version [%d] ", Address_toString(&ep->addr, Message_getAlloc(msg))->bytes, ep->addr.protocolVersion); } ep->state = InterfaceController_PeerState_INCOMPATIBLE; return Error(msg, "UNHANDLED"); } ep->bytesOut += Message_getLength(msg); Kbps_accumulate(&ep->sendBw, Time_currentTimeMilliseconds(), Message_getLength(msg)); return Iface_next(&ep->plaintext, msg); // --> afterEncrypt } static int closeInterface(struct Allocator_OnFreeJob* job) { struct Peer* toClose = Identity_check((struct Peer*) job->userData); int index = Map_EndpointsBySockaddr_indexForHandle(toClose->handle, &toClose->ici->peerMap); if (index < 0 || toClose->ici->peerMap.values[index] != toClose) { // Happens if the ep was created as a result of handleUnexpectedIncoming return 0; } sendPeer(0xffffffff, PFChan_Core_PEER_GONE, toClose, 0xffff); Log_debug(toClose->ici->ic->logger, "Closing interface [%d] with handle [%u]", index, toClose->handle); Map_EndpointsBySockaddr_remove(index, &toClose->ici->peerMap); return 0; } static Iface_DEFUN afterDecrypt(struct Message* msg, struct Iface* plaintext); static struct Peer* mkEp( const struct Sockaddr* lladdr, struct InterfaceController_Iface_pvt* ici, uint8_t publicKey[32], bool authNeeded, const char* name, bool useNoise ) { struct Allocator* epAlloc = Allocator_child(ici->alloc); struct Peer* ep = Allocator_calloc(epAlloc, sizeof(struct Peer), 1); Identity_set(ep); ep->ici = ici; ep->lladdr = Sockaddr_clone(lladdr, epAlloc); ep->alloc = epAlloc; ep->state = InterfaceController_PeerState_UNAUTHENTICATED; ep->isIncomingConnection = false; ep->switchIf.send = sendFromSwitch; ep->ciphertext.send = afterEncrypt; ep->plaintext.send = afterDecrypt; useNoise = useNoise && ici->ic->enableNoise; ep->caSession = Ca_newSession(ici->ic->ca, epAlloc, publicKey, authNeeded, name, useNoise); Iface_plumb(ep->caSession->ciphertext, &ep->ciphertext); Iface_plumb(ep->caSession->plaintext, &ep->plaintext); Bits_memcpy(ep->addr.key, publicKey, 32); Address_getPrefix(&ep->addr); Allocator_onFree(epAlloc, closeInterface, ep); return ep; } static struct Peer* epFromSess( const struct Sockaddr* lladdr, struct InterfaceController_Iface_pvt* ici, Ca_Session_t* sess, Allocator_t* alloc ) { struct Peer* ep = Allocator_calloc(alloc, sizeof(struct Peer), 1); Identity_set(ep); ep->ici = ici; ep->lladdr = Sockaddr_clone(lladdr, alloc); ep->alloc = alloc; ep->state = InterfaceController_PeerState_UNAUTHENTICATED; ep->isIncomingConnection = true; ep->switchIf.send = sendFromSwitch; ep->ciphertext.send = afterEncrypt; ep->plaintext.send = afterDecrypt; ep->caSession = sess; Iface_plumb(ep->caSession->ciphertext, &ep->ciphertext); Iface_plumb(ep->caSession->plaintext, &ep->plaintext); Ca_getHerPubKey(sess, ep->addr.key); ep->addr.protocolVersion = Rffi_CryptoAuth2_cjdnsVer(sess); Address_getPrefix(&ep->addr); Allocator_onFree(alloc, closeInterface, ep); return ep; } /** * Expects [ struct LLAddress ][ beacon ] */ static Iface_DEFUN handleBeacon( struct Message* msg, struct InterfaceController_Iface_pvt* ici, struct Sockaddr* lladdr) { struct InterfaceController_pvt* ic = ici->ic; if (!ici->pub.beaconState) { // accepting beacons disabled. Log_debug(ic->logger, "[%s] Dropping beacon because beaconing is disabled", ici->pub.name->bytes); return NULL; } if (Message_getLength(msg) < Headers_Beacon_SIZE) { Log_debug(ic->logger, "[%s] Dropping runt beacon", ici->pub.name->bytes); return Error(msg, "RUNT"); } // clear the bcast flag lladdr->flags = 0; struct Headers_Beacon beacon; Er_assert(Message_epop(msg, &beacon, Headers_Beacon_SIZE)); if (Defined(Log_DEBUG)) { char* content = Hex_print(&beacon, Headers_Beacon_SIZE, Message_getAlloc(msg)); Log_debug(ici->ic->logger, "RECV BEACON CONTENT[%s]", content); } struct Address addr = {0}; Bits_memcpy(addr.key, beacon.publicKey, 32); addr.protocolVersion = Endian_bigEndianToHost32(beacon.version_be); Address_getPrefix(&addr); String* printedAddr = NULL; if (Defined(Log_DEBUG)) { printedAddr = Address_toString(&addr, Message_getAlloc(msg)); } if (!AddressCalc_validAddress(addr.ip6.bytes)) { Log_debug(ic->logger, "handleBeacon invalid key [%s]", printedAddr->bytes); return Error(msg, "INVALID"); } else if (!Bits_memcmp(ic->ourPubKey, addr.key, 32)) { // receive beacon from self, drop silent return NULL; } if (knownIncompatibleVersion(addr.protocolVersion)) { if (Defined(Log_DEBUG)) { Log_debug(ic->logger, "[%s] DROP beacon from [%s] which was version [%d] " "our version is [%d] making them incompatable", ici->pub.name->bytes, printedAddr->bytes, addr.protocolVersion, Version_CURRENT_PROTOCOL); } return Error(msg, "UNHANDLED"); } String* beaconPass = String_newBinary(beacon.password, Headers_Beacon_PASSWORD_LEN, Message_getAlloc(msg)); int epIndex = Map_EndpointsBySockaddr_indexForKey(&lladdr, &ici->peerMap); if (epIndex > -1) { // The password might have changed! struct Peer* ep = ici->peerMap.values[epIndex]; Ca_setAuth(beaconPass, String_CONST("Local Peers"), ep->caSession); return NULL; } bool useNoise = addr.protocolVersion >= 22; struct Peer* ep = mkEp(lladdr, ici, beacon.publicKey, false, "beacon_peer", useNoise); int setIndex = Map_EndpointsBySockaddr_put(&ep->lladdr, &ep, &ici->peerMap); ep->handle = ici->peerMap.handles[setIndex]; // We make the connection ourselves but we still consider // it "incoming" because we replied to a beacon ep->isIncomingConnection = true; ep->addr.protocolVersion = addr.protocolVersion; Ca_setAuth(beaconPass, String_CONST("Local Peers"), ep->caSession); if (SwitchCore_addInterface(ic->switchCore, &ep->switchIf, ep->alloc, &ep->addr.path)) { Log_debug(ic->logger, "handleBeacon() SwitchCore out of space"); Allocator_free(ep->alloc); return Error(msg, "UNHANDLED"); } // We want the node to immedietly be pinged but we don't want it to appear unresponsive because // the pinger will only ping every (PING_INTERVAL * 8) so we set timeOfLastMessage to // (now - pingAfterMilliseconds - 1) so it will be considered a "lazy node". ep->timeOfLastMessage = Time_currentTimeMilliseconds() - ic->pingAfterMilliseconds - 1; Log_info(ic->logger, "Added peer [%s] from beacon", Address_toString(&ep->addr, Message_getAlloc(msg))->bytes); // Ping them immediately, this prevents beacon tests from taking 1 second each sendPing(ep); return NULL; } static Iface_DEFUN handleIncomingFromWire(struct Message* msg, struct Iface* addrIf) { struct InterfaceController_Iface_pvt* ici = Identity_containerOf(addrIf, struct InterfaceController_Iface_pvt, pub.addrIf); struct Sockaddr_storage lladdrStore; struct Sockaddr* lladdr = (struct Sockaddr*) &lladdrStore; { struct Sockaddr* lladdr0 = (struct Sockaddr*) msg->msgbytes; if (Message_getLength(msg) < Sockaddr_OVERHEAD || Message_getLength(msg) < lladdr0->addrLen) { Log_debug(ici->ic->logger, "DROP runt"); return Error(msg, "RUNT"); } Er_assert(Message_epop(msg, lladdr, lladdr0->addrLen)); } Assert_true(!((uintptr_t)msg->msgbytes % 4) && "alignment fault"); Assert_true(!((uintptr_t)lladdr->addrLen % 4) && "alignment fault"); char* printedAddr = ""; if (Defined(Log_DEBUG)) { printedAddr = Hex_print(&lladdr[1], lladdr->addrLen - Sockaddr_OVERHEAD, Message_getAlloc(msg)); } // noisy if (Defined(Log_DEBUG) && false) { Log_debug(ici->ic->logger, "Incoming message from [%s]", printedAddr); } if (lladdr->flags & Sockaddr_flags_BCAST) { return handleBeacon(msg, ici, lladdr); } if (Message_getLength(msg) < 4) { return Error(msg, "RUNT"); } int epIndex = Map_EndpointsBySockaddr_indexForKey(&lladdr, &ici->peerMap); if (epIndex == -1) { // noise control message Er_assert(Message_epush(msg, NULL, 16)); Sockaddr_asIp6(msg->msgbytes, lladdr); RTypes_CryptoAuth2_TryHandshake_Ret_t ret = { .code = 0 }; Rffi_CryptoAuth2_tryHandshake(ici->ic->ca, msg, ici->alloc, true, &ret); if (ret.sess) { // We have a new session, setup the endpoint struct Peer* ep = epFromSess(lladdr, ici, ret.sess, ret.alloc); if (SwitchCore_addInterface(ici->ic->switchCore, &ep->switchIf, ep->alloc, &ep->addr.path)) { Log_debug(ici->ic->logger, "handleUnexpectedIncoming() SwitchCore out of space"); Allocator_free(ep->alloc); return Error(msg, "UNHANDLED"); } Assert_true(Map_EndpointsBySockaddr_indexForKey(&ep->lladdr, &ici->peerMap) == -1); int index = Map_EndpointsBySockaddr_put(&ep->lladdr, &ep, &ici->peerMap); Assert_true(index >= 0); ep->handle = ici->peerMap.handles[index]; // We want the node to immedietly be pinged but we don't want it to appear unresponsive because // the pinger will only ping every (PING_INTERVAL * 8) so we set timeOfLastMessage to // (now - pingAfterMilliseconds - 1) so it will be considered a "lazy node". ep->timeOfLastMessage = Time_currentTimeMilliseconds() - ici->ic->pingAfterMilliseconds - 1; Log_info(ici->ic->logger, "Added peer [%s] from incoming message", Address_toString(&ep->addr, Message_getAlloc(msg))->bytes); if (ep->addr.protocolVersion) { // This will only work if the other end sent us their version (WG mode) sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff); } else { // We don't know their version, ping them to find out sendPing(ep); } if (ret.code == RTypes_CryptoAuth2_TryHandshake_Code_t_RecvPlaintext) { // receive the packet return afterDecrypt(msg, &ep->plaintext); } } if (ret.code == RTypes_CryptoAuth2_TryHandshake_Code_t_ReplyToPeer) { // Send back a reply to the node who sent us this packet Er_assert(Message_epush(msg, lladdr, lladdr->addrLen)); return Iface_next(&ici->pub.addrIf, msg); } if (ret.code == RTypes_CryptoAuth2_TryHandshake_Code_t_Error) { Log_debug(ici->ic->logger, "Error on unexpected packet from [%s]: [%d]", printedAddr, ret.err); return Error(msg, "DECRYPT"); } if (ret.code == RTypes_CryptoAuth2_TryHandshake_Code_t_Done) { // Nothing to do return NULL; } Assert_failure("Rffi_CryptoAuth2_tryHandshake() replied [%d]", ret.code); } struct Peer* ep = Identity_check((struct Peer*) ici->peerMap.values[epIndex]); // Once we know it to be an incompetible version, we quarentine it if (knownIncompatibleVersion(ep->addr.protocolVersion)) { if (Defined(Log_DEBUG)) { Log_debug(ici->ic->logger, "[%s] DROP msg from node with incompat version [%d] ", Address_toString(&ep->addr, Message_getAlloc(msg))->bytes, ep->addr.protocolVersion); } ep->state = InterfaceController_PeerState_INCOMPATIBLE; return NULL; } Ca_resetIfTimeout(ep->caSession); Er_assert(Message_epush(msg, NULL, 16)); Sockaddr_asIp6(msg->msgbytes, lladdr); return Iface_next(&ep->ciphertext, msg); // -> afterDecrypt } // Expects result of CryptoAuth decrypt static Iface_DEFUN afterDecrypt(struct Message* msg, struct Iface* plaintext) { struct Peer* ep = Identity_containerOf(plaintext, struct Peer, plaintext); struct InterfaceController_Iface_pvt* ici = Identity_check(ep->ici); struct InterfaceController_pvt* ic = Identity_check(ici->ic); enum Ca_DecryptErr err = Er_assert(Message_epop32h(msg)); if (err) { return Error(msg, "AUTHENTICATION"); } Kbps_accumulate(&ep->recvBw, Time_currentTimeMilliseconds(), Message_getLength(msg)); ep->bytesIn += Message_getLength(msg); int caState = Ca_getState(ep->caSession); if (caState != Ca_State_ESTABLISHED) { // prevent some kinds of nasty things which could be done with packet replay. // This is checking the message switch header and will drop it unless the label // directs it to *this* router. if (Message_getLength(msg) < 8 || msg->msgbytes[7] != 1) { Log_info(ic->logger, "DROP message because CA is not established."); return Error(msg, "UNHANDLED"); } else { // When a "server" gets a new connection from a "client" the router doesn't // know about that client so if the client sends a packet to the server, the // server will be unable to handle it until the client has sent inter-router // communication to the server. Here we will ping the client so when the // server gets the ping response, it will insert the client into its table // and know its version. // prevent DoS by limiting the number of times this can be called per second // limit it to 7, this will affect innocent packets but it doesn't matter much // since this is mostly just an optimization and for keeping the tests happy. if ((ep->pingCount + 1) % 7) { sendPing(ep); } } } else { if (ep->state != caState) { sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff); } ep->timeOfLastMessage = Time_currentTimeMilliseconds(); } ep->state = caState; Identity_check(ep); return Iface_next(&ep->switchIf, msg); } int InterfaceController_ifaceCount(struct InterfaceController* ifc) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifc); return ic->icis->length; } struct InterfaceController_Iface* InterfaceController_getIface(struct InterfaceController* ifc, int ifNum) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifc); struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, ifNum); return (ici) ? &ici->pub : NULL; } struct InterfaceController_Iface* InterfaceController_newIface(struct InterfaceController* ifc, String* name, struct Allocator* alloc) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifc); struct InterfaceController_Iface_pvt* ici = Allocator_calloc(alloc, sizeof(struct InterfaceController_Iface_pvt), 1); ici->pub.name = String_clone(name, alloc); ici->peerMap.allocator = alloc; ici->ic = ic; ici->alloc = alloc; ici->pub.addrIf.send = handleIncomingFromWire; ici->pub.ifNum = ArrayList_OfIfaces_add(ic->icis, ici); Identity_set(ici); return &ici->pub; } static void sendBeacon(struct InterfaceController_Iface_pvt* ici, struct Allocator* tempAlloc) { if (ici->pub.beaconState < InterfaceController_beaconState_newState_SEND) { Log_debug(ici->ic->logger, "sendBeacon(%s) -> beaconing disabled", ici->pub.name->bytes); return; } Log_debug(ici->ic->logger, "sendBeacon(%s)", ici->pub.name->bytes); struct Message* msg = Message_new(0, 128, tempAlloc); Er_assert(Message_epush(msg, &ici->ic->beacon, Headers_Beacon_SIZE)); if (Defined(Log_DEBUG)) { char* content = Hex_print(msg->msgbytes, Message_getLength(msg), tempAlloc); Log_debug(ici->ic->logger, "SEND BEACON CONTENT[%s]", content); } struct Sockaddr sa = { .addrLen = Sockaddr_OVERHEAD, .flags = Sockaddr_flags_BCAST }; Er_assert(Message_epush(msg, &sa, Sockaddr_OVERHEAD)); Iface_send(&ici->pub.addrIf, msg); } static void handshakeCycle(void* vInterfaceController) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vInterfaceController); struct Allocator* alloc = Allocator_child(ic->alloc); for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); for (uint32_t j = 0; j < ici->peerMap.count; j++) { struct Peer* ep = Identity_check((struct Peer*) ici->peerMap.values[j]); Message_t* msg = Rffi_CryptoAuth2_noiseTick(ep->caSession, alloc); if (msg != NULL) { Er_assert(Message_epush(msg, ep->lladdr, ep->lladdr->addrLen)); Iface_send(&ep->ici->pub.addrIf, msg); } } } Allocator_free(alloc); } static void beaconInterval(void* vInterfaceController) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vInterfaceController); struct Allocator* alloc = Allocator_child(ic->alloc); for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); sendBeacon(ici, alloc); } Allocator_free(alloc); if (ic->beaconTimeoutAlloc) { Allocator_free(ic->beaconTimeoutAlloc); } ic->beaconTimeoutAlloc = Allocator_child(ic->alloc); Timeout_setTimeout( beaconInterval, ic, ic->beaconInterval, ic->eventBase, ic->beaconTimeoutAlloc); } int InterfaceController_beaconState(struct InterfaceController* ifc, int interfaceNumber, int newState) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifc); struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, interfaceNumber); if (!ici) { return InterfaceController_beaconState_NO_SUCH_IFACE; } char* val = NULL; switch (newState) { default: return InterfaceController_beaconState_INVALID_STATE; case InterfaceController_beaconState_newState_OFF: val = "OFF"; break; case InterfaceController_beaconState_newState_ACCEPT: val = "ACCEPT"; break; case InterfaceController_beaconState_newState_SEND: val = "SEND"; break; } Log_debug(ic->logger, "InterfaceController_beaconState(%s, %s)", ici->pub.name->bytes, val); ici->pub.beaconState = newState; if (newState == InterfaceController_beaconState_newState_SEND) { // Send out a beacon right away so we don't have to wait. struct Allocator* alloc = Allocator_child(ici->alloc); sendBeacon(ici, alloc); Allocator_free(alloc); } return 0; } int InterfaceController_bootstrapPeer(struct InterfaceController* ifc, int interfaceNumber, uint8_t* herPublicKey, const struct Sockaddr* lladdrParm, String* password, String* login, String* user, int version) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifc); Assert_true(herPublicKey); Assert_true(password); struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, interfaceNumber); if (!ici) { return InterfaceController_bootstrapPeer_BAD_IFNUM; } Log_debug(ic->logger, "bootstrapPeer total [%u]", ici->peerMap.count); uint8_t ip6[16]; AddressCalc_addressForPublicKey(ip6, herPublicKey); if (!AddressCalc_validAddress(ip6) || !Bits_memcmp(ic->ourPubKey, herPublicKey, 32)) { return InterfaceController_bootstrapPeer_BAD_KEY; } // We often don't know this, but in that case we will fallback to the old peering bool useNoise = version >= 22; struct Peer* ep = mkEp(lladdrParm, ici, herPublicKey, false, user ? user->bytes : NULL, useNoise); ep->addr.protocolVersion = version; int index = Map_EndpointsBySockaddr_put(&ep->lladdr, &ep, &ici->peerMap); Assert_true(index >= 0); ep->handle = ici->peerMap.handles[index]; Ca_setAuth(password, login, ep->caSession); if (SwitchCore_addInterface(ic->switchCore, &ep->switchIf, ep->alloc, &ep->addr.path)) { Log_debug(ic->logger, "bootstrapPeer() SwitchCore out of space"); Allocator_free(ep->alloc); return InterfaceController_bootstrapPeer_OUT_OF_SPACE; } // We want the node to immedietly be pinged but we don't want it to appear unresponsive because // the pinger will only ping every (PING_INTERVAL * 8) so we set timeOfLastMessage to // (now - pingAfterMilliseconds - 1) so it will be considered a "lazy node". ep->timeOfLastMessage = Time_currentTimeMilliseconds() - ic->pingAfterMilliseconds - 1; if (Defined(Log_INFO)) { struct Allocator* tempAlloc = Allocator_child(ep->alloc); String* addrStr = Address_toString(&ep->addr, tempAlloc); Log_info(ic->logger, "Adding peer [%s] from bootstrapPeer()", addrStr->bytes); Allocator_free(tempAlloc); } // We can't just add the node directly to the routing table because we do not know // the version. We'll send it a switch ping and when it responds, we will know it's // key (if we don't already) and version number. sendPing(ep); return 0; } int InterfaceController_getPeerStats(struct InterfaceController* ifController, struct Allocator* alloc, struct InterfaceController_PeerStats** statsOut) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifController); int count = 0; for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); count += ici->peerMap.count; } struct InterfaceController_PeerStats* stats = Allocator_calloc(alloc, sizeof(struct InterfaceController_PeerStats), count); uint32_t now = Time_currentTimeMilliseconds(); int xcount = 0; for (int j = 0; j < ic->icis->length; j++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, j); for (int i = 0; i < (int)ici->peerMap.count; i++) { struct Peer* peer = Identity_check((struct Peer*) ici->peerMap.values[i]); struct InterfaceController_PeerStats* s = &stats[xcount]; xcount++; s->ifNum = ici->pub.ifNum; s->lladdr = Sockaddr_clone(peer->lladdr, alloc); Bits_memcpy(&s->addr, &peer->addr, sizeof(struct Address)); s->bytesOut = peer->bytesOut; s->bytesIn = peer->bytesIn; s->timeOfLastMessage = peer->timeOfLastMessage; s->state = peer->state; s->isIncomingConnection = peer->isIncomingConnection; s->user = Ca_getName(peer->caSession, alloc); RTypes_CryptoStats_t stats; Ca_stats(peer->caSession, &stats); s->duplicates = stats.duplicate_packets; s->receivedOutOfRange = stats.received_unexpected; s->noiseProto = stats.noise_proto; s->recvKbps = Kbps_accumulate(&peer->recvBw, now, Kbps_accumulate_NO_PACKET); s->sendKbps = Kbps_accumulate(&peer->sendBw, now, Kbps_accumulate_NO_PACKET); s->receivedPackets = peer->lastPackets; s->lostPackets = peer->lastDrops; } } Assert_true(xcount == count); *statsOut = stats; return count; } void InterfaceController_resetPeering(struct InterfaceController* ifController, uint8_t herPublicKey[32]) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifController); for (int j = 0; j < ic->icis->length; j++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, j); for (int i = 0; i < (int)ici->peerMap.count; i++) { struct Peer* peer = ici->peerMap.values[i]; if (!herPublicKey || !Bits_memcmp(herPublicKey, peer->addr.key, 32)) { Ca_reset(peer->caSession); } } } } int InterfaceController_disconnectPeer(struct InterfaceController* ifController, uint8_t herPublicKey[32]) { struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) ifController); int count = 0; for (int j = 0; j < ic->icis->length; j++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, j); for (int i = 0; i < (int)ici->peerMap.count; i++) { struct Peer* peer = ici->peerMap.values[i]; if (!Bits_memcmp(herPublicKey, peer->addr.key, 32)) { Allocator_free(peer->alloc); count++; } } } return count; } static Iface_DEFUN incomingFromEventEmitterIf(struct Message* msg, struct Iface* eventEmitterIf) { struct InterfaceController_pvt* ic = Identity_containerOf(eventEmitterIf, struct InterfaceController_pvt, eventEmitterIf); uint32_t peers = Er_assert(Message_epop32be(msg)); Assert_true(peers == PFChan_Pathfinder_PEERS); uint32_t pathfinderId = Er_assert(Message_epop32be(msg)); Assert_true(!Message_getLength(msg)); for (int j = 0; j < ic->icis->length; j++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, j); for (int i = 0; i < (int)ici->peerMap.count; i++) { struct Peer* peer = Identity_check((struct Peer*) ici->peerMap.values[i]); if (peer->state != InterfaceController_PeerState_ESTABLISHED) { continue; } sendPeer(pathfinderId, PFChan_Core_PEER, peer, 0xffff); } } return NULL; } struct InterfaceController* InterfaceController_new(Ca_t* ca, struct SwitchCore* switchCore, struct Log* logger, struct EventBase* eventBase, struct SwitchPinger* switchPinger, struct Random* rand, struct Allocator* allocator, struct EventEmitter* ee, bool enableNoise) { struct Allocator* alloc = Allocator_child(allocator); struct InterfaceController_pvt* out = Allocator_calloc(alloc, sizeof(struct InterfaceController_pvt), 1); Bits_memcpy(out, (&(struct InterfaceController_pvt) { .alloc = alloc, .ca = ca, .rand = rand, .switchCore = switchCore, .logger = logger, .eventBase = eventBase, .switchPinger = switchPinger, .unresponsiveAfterMilliseconds = UNRESPONSIVE_AFTER_MILLISECONDS, .pingAfterMilliseconds = PING_AFTER_MILLISECONDS, .timeoutMilliseconds = TIMEOUT_MILLISECONDS, .forgetAfterMilliseconds = FORGET_AFTER_MILLISECONDS, .beaconInterval = BEACON_INTERVAL, .enableNoise = enableNoise, .linkStateInterval = Timeout_setInterval( linkState, out, LINKSTATE_UPDATE_INTERVAL, eventBase, alloc), .pingInterval = (switchPinger) ? Timeout_setInterval(pingCycle, out, PING_INTERVAL_MILLISECONDS, eventBase, alloc) : NULL, .noiseHandshakeInterval = Timeout_setInterval( handshakeCycle, out, HANDSHAKE_CYCLE_INTERVAL, eventBase, alloc) }), sizeof(struct InterfaceController_pvt)); Identity_set(out); out->icis = ArrayList_OfIfaces_new(alloc); out->eventEmitterIf.send = incomingFromEventEmitterIf; EventEmitter_regCore(ee, &out->eventEmitterIf, PFChan_Pathfinder_PEERS); // Add the beaconing password. Random_base32(rand, out->beacon.password, Headers_Beacon_PASSWORD_LEN); String strPass = { .bytes=(char*)out->beacon.password, .len=Headers_Beacon_PASSWORD_LEN }; int ret = Ca_addUser(&strPass, String_CONST("Local Peers"), ca); if (ret) { Log_warn(logger, "Ca_addUser() returned [%d]", ret); } Ca_getPubKey(ca, out->ourPubKey); Bits_memcpy(out->beacon.publicKey, out->ourPubKey, 32); if (enableNoise) { out->beacon.version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL); } else { // this is mostly here for testing, we have to lie about our protocol version out->beacon.version_be = Endian_hostToBigEndian32(21); } Timeout_setTimeout(beaconInterval, out, BEACON_INTERVAL, eventBase, alloc); return &out->pub; }