From: Felix Fietkau Date: Sat, 10 Sep 2022 06:43:22 +0000 (+0200) Subject: add DHT discovery service X-Git-Url: http://git.lede-project.org./?a=commitdiff_plain;h=e58a5669713197278e4317a1128b9a7be0818d5a;p=project%2Funetd.git add DHT discovery service This uses the BitTorrent 'Mainline' DHT in order to find peers. It operates on the global PEX port, in order to allow exchanging network data through double NAT. Only the IPv4 DHT is used at the moment. Signed-off-by: Felix Fietkau --- diff --git a/.gitignore b/.gitignore index 3fc95a3..95a8b81 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ CMakeFiles *.dylib unetd unet-tool +unet-dht diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c0e5e8..806e3bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ ENDIF() IF(UBUS_SUPPORT) SET(SOURCES ${SOURCES} ubus.c) + SET(DHT_SOURCES ${DHT_SOURCES} udht-ubus.c) ADD_DEFINITIONS(-DUBUS_SUPPORT=1) FIND_LIBRARY(ubus ubus) ELSE() @@ -51,7 +52,10 @@ TARGET_LINK_LIBRARIES(unetd unet ubox ${ubus} blobmsg_json ${libjson} ${nl} ${bp ADD_EXECUTABLE(unet-tool cli.c) TARGET_LINK_LIBRARIES(unet-tool unet blobmsg_json ${libjson} ubox) -INSTALL(TARGETS unetd unet unet-tool +ADD_EXECUTABLE(unet-dht dht.c udht.c ${DHT_SOURCES}) +TARGET_LINK_LIBRARIES(unet-dht unet ${ubus} ubox) + +INSTALL(TARGETS unetd unet unet-tool unet-dht RUNTIME DESTINATION sbin LIBRARY DESTINATION lib ) diff --git a/dht.c b/dht.c new file mode 100644 index 0000000..2f9b01d --- /dev/null +++ b/dht.c @@ -0,0 +1,3070 @@ +/* +Copyright (c) 2009-2011 by Juliusz Chroboczek + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +/* Please, please, please. + + You are welcome to integrate this code in your favourite Bittorrent + client. Please remember, however, that it is meant to be usable by + others, including myself. This means no C++, no relicensing, and no + gratuitious changes to the coding style. And please send back any + improvements to the author. */ + +/* For memmem. */ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include + +#if !defined(_WIN32) || defined(__MINGW32__) +#include +#endif + +#ifndef _WIN32 +#include +#include + +#include +#include +#include +#include +#else +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 /* Windows XP */ +#endif +#ifndef WINVER +#define WINVER _WIN32_WINNT +#endif +#include +#include +#endif + +#include "dht.h" + +#ifndef HAVE_MEMMEM +#ifdef __GLIBC__ +#define HAVE_MEMMEM +#endif +#endif + +#ifndef MSG_CONFIRM +#define MSG_CONFIRM 0 +#endif + +#if !defined(_WIN32) || defined(__MINGW32__) +#define dht_gettimeofday(_ts, _tz) gettimeofday((_ts), (_tz)) +#else +extern int dht_gettimeofday(struct timeval *tv, struct timezone *tz); +#endif + +#ifdef _WIN32 + +#undef EAFNOSUPPORT +#define EAFNOSUPPORT WSAEAFNOSUPPORT + +static int +random(void) +{ + return rand(); +} + +/* Windows Vista and later already provide the implementation. */ +#if _WIN32_WINNT < 0x0600 +extern const char *inet_ntop(int, const void *, char *, socklen_t); +#endif + +#ifdef _MSC_VER +/* There is no snprintf in MSVCRT. */ +#define snprintf _snprintf +#endif + +#endif + +/* We set sin_family to 0 to mark unused slots. */ +#if AF_INET == 0 || AF_INET6 == 0 +#error You lose +#endif + +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901L +/* nothing */ +#elif defined(__GNUC__) +#define inline __inline +#if (__GNUC__ >= 3) +#define restrict __restrict +#else +#define restrict /**/ +#endif +#else +#define inline /**/ +#define restrict /**/ +#endif + +#define MAX(x, y) ((x) >= (y) ? (x) : (y)) +#define MIN(x, y) ((x) <= (y) ? (x) : (y)) + +struct node { + unsigned char id[20]; + struct sockaddr_storage ss; + int sslen; + time_t time; /* time of last message received */ + time_t reply_time; /* time of last correct reply received */ + time_t pinged_time; /* time of last request */ + int pinged; /* how many requests we sent since last reply */ + struct node *next; +}; + +struct bucket { + int af; + unsigned char first[20]; + int count; /* number of nodes */ + int max_count; /* max number of nodes for this bucket */ + time_t time; /* time of last reply in this bucket */ + struct node *nodes; + struct sockaddr_storage cached; /* the address of a likely candidate */ + int cachedlen; + struct bucket *next; +}; + +struct search_node { + unsigned char id[20]; + struct sockaddr_storage ss; + int sslen; + time_t request_time; /* the time of the last unanswered request */ + time_t reply_time; /* the time of the last reply */ + int pinged; + unsigned char token[40]; + int token_len; + int replied; /* whether we have received a reply */ + int acked; /* whether they acked our announcement */ +}; + +/* When performing a search, we search for up to SEARCH_NODES closest nodes + to the destination, and use the additional ones to backtrack if any of + the target 8 turn out to be dead. */ +#define SEARCH_NODES 14 + +struct search { + unsigned short tid; + int af; + time_t step_time; /* the time of the last search_step */ + unsigned char id[20]; + unsigned short port; /* 0 for pure searches */ + int done; + struct search_node nodes[SEARCH_NODES]; + int numnodes; + struct search *next; +}; + +struct peer { + time_t time; + unsigned char ip[16]; + unsigned short len; + unsigned short port; +}; + +/* The maximum number of peers we store for a given hash. */ +#ifndef DHT_MAX_PEERS +#define DHT_MAX_PEERS 2048 +#endif + +/* The maximum number of hashes we're willing to track. */ +#ifndef DHT_MAX_HASHES +#define DHT_MAX_HASHES 16384 +#endif + +/* The maximum number of searches we keep data about. */ +#ifndef DHT_MAX_SEARCHES +#define DHT_MAX_SEARCHES 1024 +#endif + +/* The time after which we consider a search to be expirable. */ +#ifndef DHT_SEARCH_EXPIRE_TIME +#define DHT_SEARCH_EXPIRE_TIME (62 * 60) +#endif + +/* The maximum number of in-flight queries per search. */ +#ifndef DHT_INFLIGHT_QUERIES +#define DHT_INFLIGHT_QUERIES 4 +#endif + +/* The retransmit timeout when performing searches. */ +#ifndef DHT_SEARCH_RETRANSMIT +#define DHT_SEARCH_RETRANSMIT 10 +#endif + +struct storage { + unsigned char id[20]; + int numpeers, maxpeers; + struct peer *peers; + struct storage *next; +}; + +static struct storage * find_storage(const unsigned char *id); +static void flush_search_node(struct search_node *n, struct search *sr); + +static int send_ping(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len); +static int send_pong(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len); +static int send_find_node(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len, + const unsigned char *target, int want, int confirm); +static int send_nodes_peers(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len, + const unsigned char *nodes, int nodes_len, + const unsigned char *nodes6, int nodes6_len, + int af, struct storage *st, + const unsigned char *token, int token_len); +static int send_closest_nodes(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len, + const unsigned char *id, int want, + int af, struct storage *st, + const unsigned char *token, int token_len); +static int send_get_peers(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len, + unsigned char *infohash, int want, int confirm); +static int send_announce_peer(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len, + unsigned char *infohas, unsigned short port, + unsigned char *token, int token_len, int confirm); +static int send_peer_announced(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len); +static int send_error(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len, + int code, const char *message); + +static void +add_search_node(const unsigned char *id, const struct sockaddr *sa, int salen); + +#define ERROR 0 +#define REPLY 1 +#define PING 2 +#define FIND_NODE 3 +#define GET_PEERS 4 +#define ANNOUNCE_PEER 5 + +#define WANT4 1 +#define WANT6 2 + +#define PARSE_TID_LEN 16 +#define PARSE_TOKEN_LEN 128 +#define PARSE_NODES_LEN (26 * 16) +#define PARSE_NODES6_LEN (38 * 16) +#define PARSE_VALUES_LEN 2048 +#define PARSE_VALUES6_LEN 2048 + +struct parsed_message { + unsigned char tid[PARSE_TID_LEN]; + unsigned short tid_len; + unsigned char id[20]; + unsigned char info_hash[20]; + unsigned char target[20]; + unsigned short port; + unsigned short implied_port; + unsigned char token[PARSE_TOKEN_LEN]; + unsigned short token_len; + unsigned char nodes[PARSE_NODES_LEN]; + unsigned short nodes_len; + unsigned char nodes6[PARSE_NODES6_LEN]; + unsigned short nodes6_len; + unsigned char values[PARSE_VALUES_LEN]; + unsigned short values_len; + unsigned char values6[PARSE_VALUES6_LEN]; + unsigned short values6_len; + unsigned short want; +}; + +static int parse_message(const unsigned char *buf, int buflen, + struct parsed_message *m); + +static const unsigned char zeroes[20] = {0}; +static const unsigned char v4prefix[16] = { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0 +}; + +static int dht_socket = -1; +static int dht_socket6 = -1; + +static time_t search_time; +static time_t confirm_nodes_time; +static time_t rotate_secrets_time; + +static unsigned char myid[20]; +static int have_v = 0; +static unsigned char my_v[9]; +static unsigned char secret[8]; +static unsigned char oldsecret[8]; + +static struct bucket *buckets = NULL; +static struct bucket *buckets6 = NULL; +static struct storage *storage; +static int numstorage; + +static struct search *searches = NULL; +static int numsearches; +static unsigned short search_id; + +/* The maximum number of nodes that we snub. There is probably little + reason to increase this value. */ +#ifndef DHT_MAX_BLACKLISTED +#define DHT_MAX_BLACKLISTED 10 +#endif +static struct sockaddr_storage blacklist[DHT_MAX_BLACKLISTED]; +int next_blacklisted; + +static struct timeval now; +static time_t mybucket_grow_time, mybucket6_grow_time; +static time_t expire_stuff_time; + +#define MAX_TOKEN_BUCKET_TOKENS 400 +static time_t token_bucket_time; +static int token_bucket_tokens; + +FILE *dht_debug = NULL; + +#ifdef __GNUC__ + __attribute__ ((format (printf, 1, 2))) +#endif +static void +debugf(const char *format, ...) +{ + va_list args; + va_start(args, format); + if(dht_debug) + vfprintf(dht_debug, format, args); + va_end(args); + if(dht_debug) + fflush(dht_debug); +} + +static void +debug_printable(const unsigned char *buf, int buflen) +{ + int i; + if(dht_debug) { + for(i = 0; i < buflen; i++) + putc(buf[i] >= 32 && buf[i] <= 126 ? buf[i] : '.', dht_debug); + } +} + +static void +print_hex(FILE *f, const unsigned char *buf, int buflen) +{ + int i; + for(i = 0; i < buflen; i++) + fprintf(f, "%02x", buf[i]); +} + +static int +is_martian(const struct sockaddr *sa) +{ + switch(sa->sa_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in*)sa; + const unsigned char *address = (const unsigned char*)&sin->sin_addr; + return sin->sin_port == 0 || + (address[0] == 0) || + (address[0] == 127) || + ((address[0] & 0xE0) == 0xE0); + } + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)sa; + const unsigned char *address = (const unsigned char*)&sin6->sin6_addr; + return sin6->sin6_port == 0 || + (address[0] == 0xFF) || + (address[0] == 0xFE && (address[1] & 0xC0) == 0x80) || + (memcmp(address, zeroes, 15) == 0 && + (address[15] == 0 || address[15] == 1)) || + (memcmp(address, v4prefix, 12) == 0); + } + + default: + return 0; + } +} + +/* Forget about the ``XOR-metric''. An id is just a path from the + root of the tree, so bits are numbered from the start. */ + +static int +id_cmp(const unsigned char *restrict id1, const unsigned char *restrict id2) +{ + /* Memcmp is guaranteed to perform an unsigned comparison. */ + return memcmp(id1, id2, 20); +} + +/* Find the lowest 1 bit in an id. */ +static int +lowbit(const unsigned char *id) +{ + int i, j; + for(i = 19; i >= 0; i--) + if(id[i] != 0) + break; + + if(i < 0) + return -1; + + for(j = 7; j >= 0; j--) + if((id[i] & (0x80 >> j)) != 0) + break; + + return 8 * i + j; +} + +/* Find how many bits two ids have in common. */ +static int +common_bits(const unsigned char *id1, const unsigned char *id2) +{ + int i, j; + unsigned char xor; + for(i = 0; i < 20; i++) { + if(id1[i] != id2[i]) + break; + } + + if(i == 20) + return 160; + + xor = id1[i] ^ id2[i]; + + j = 0; + while((xor & 0x80) == 0) { + xor <<= 1; + j++; + } + + return 8 * i + j; +} + +/* Determine whether id1 or id2 is closer to ref */ +static int +xorcmp(const unsigned char *id1, const unsigned char *id2, + const unsigned char *ref) +{ + int i; + for(i = 0; i < 20; i++) { + unsigned char xor1, xor2; + if(id1[i] == id2[i]) + continue; + xor1 = id1[i] ^ ref[i]; + xor2 = id2[i] ^ ref[i]; + if(xor1 < xor2) + return -1; + else + return 1; + } + return 0; +} + +/* We keep buckets in a sorted linked list. A bucket b ranges from + b->first inclusive up to b->next->first exclusive. */ +static int +in_bucket(const unsigned char *id, struct bucket *b) +{ + return id_cmp(b->first, id) <= 0 && + (b->next == NULL || id_cmp(id, b->next->first) < 0); +} + +static struct bucket * +find_bucket(unsigned const char *id, int af) +{ + struct bucket *b = af == AF_INET ? buckets : buckets6; + + if(b == NULL) + return NULL; + + while(1) { + if(b->next == NULL) + return b; + if(id_cmp(id, b->next->first) < 0) + return b; + b = b->next; + } +} + +static struct bucket * +previous_bucket(struct bucket *b) +{ + struct bucket *p = b->af == AF_INET ? buckets : buckets6; + + if(b == p) + return NULL; + + while(1) { + if(p->next == NULL) + return NULL; + if(p->next == b) + return p; + p = p->next; + } +} + +/* Every bucket contains an unordered list of nodes. */ +static struct node * +find_node(const unsigned char *id, int af) +{ + struct bucket *b = find_bucket(id, af); + struct node *n; + + if(b == NULL) + return NULL; + + n = b->nodes; + while(n) { + if(id_cmp(n->id, id) == 0) + return n; + n = n->next; + } + return NULL; +} + +/* Return a random node in a bucket. */ +static struct node * +random_node(struct bucket *b) +{ + struct node *n; + int nn; + + if(b->count == 0) + return NULL; + + nn = random() % b->count; + n = b->nodes; + while(nn > 0 && n) { + n = n->next; + nn--; + } + return n; +} + +/* Return the middle id of a bucket. */ +static int +bucket_middle(struct bucket *b, unsigned char *id_return) +{ + int bit1 = lowbit(b->first); + int bit2 = b->next ? lowbit(b->next->first) : -1; + int bit = MAX(bit1, bit2) + 1; + + if(bit >= 160) + return -1; + + memcpy(id_return, b->first, 20); + id_return[bit / 8] |= (0x80 >> (bit % 8)); + return 1; +} + +/* Return a random id within a bucket. */ +static int +bucket_random(struct bucket *b, unsigned char *id_return) +{ + int bit1 = lowbit(b->first); + int bit2 = b->next ? lowbit(b->next->first) : -1; + int bit = MAX(bit1, bit2) + 1; + int i; + + if(bit >= 160) { + memcpy(id_return, b->first, 20); + return 1; + } + + memcpy(id_return, b->first, bit / 8); + id_return[bit / 8] = b->first[bit / 8] & (0xFF00 >> (bit % 8)); + id_return[bit / 8] |= random() & 0xFF >> (bit % 8); + for(i = bit / 8 + 1; i < 20; i++) + id_return[i] = random() & 0xFF; + return 1; +} + +/* This is our definition of a known-good node. */ +static int +node_good(struct node *node) +{ + return + node->pinged <= 2 && + node->reply_time >= now.tv_sec - 7200 && + node->time >= now.tv_sec - 900; +} + +/* Our transaction-ids are 4-bytes long, with the first two bytes identi- + fying the kind of request, and the remaining two a sequence number in + host order. */ + +static void +make_tid(unsigned char *tid_return, const char *prefix, unsigned short seqno) +{ + tid_return[0] = prefix[0] & 0xFF; + tid_return[1] = prefix[1] & 0xFF; + memcpy(tid_return + 2, &seqno, 2); +} + +static int +tid_match(const unsigned char *tid, const char *prefix, + unsigned short *seqno_return) +{ + if(tid[0] == (prefix[0] & 0xFF) && tid[1] == (prefix[1] & 0xFF)) { + if(seqno_return) + memcpy(seqno_return, tid + 2, 2); + return 1; + } else + return 0; +} + +/* Every bucket caches the address of a likely node. Ping it. */ +static int +send_cached_ping(struct bucket *b) +{ + unsigned char tid[4]; + int rc; + /* We set family to 0 when there's no cached node. */ + if(b->cached.ss_family == 0) + return 0; + + debugf("Sending ping to cached node.\n"); + make_tid(tid, "pn", 0); + rc = send_ping((struct sockaddr*)&b->cached, b->cachedlen, tid, 4); + b->cached.ss_family = 0; + b->cachedlen = 0; + return rc; +} + +/* Called whenever we send a request to a node, increases the ping count + and, if that reaches 3, sends a ping to a new candidate. */ +static void +pinged(struct node *n, struct bucket *b) +{ + n->pinged++; + n->pinged_time = now.tv_sec; + if(n->pinged >= 3) + send_cached_ping(b ? b : find_bucket(n->id, n->ss.ss_family)); +} + +/* The internal blacklist is an LRU cache of nodes that have sent + incorrect messages. */ +static void +blacklist_node(const unsigned char *id, const struct sockaddr *sa, int salen) +{ + int i; + + debugf("Blacklisting broken node.\n"); + + if(id) { + struct node *n; + struct search *sr; + /* Make the node easy to discard. */ + n = find_node(id, sa->sa_family); + if(n) { + n->pinged = 3; + pinged(n, NULL); + } + /* Discard it from any searches in progress. */ + sr = searches; + while(sr) { + for(i = 0; i < sr->numnodes; i++) + if(id_cmp(sr->nodes[i].id, id) == 0) + flush_search_node(&sr->nodes[i], sr); + sr = sr->next; + } + } + /* And make sure we don't hear from it again. */ + memcpy(&blacklist[next_blacklisted], sa, salen); + next_blacklisted = (next_blacklisted + 1) % DHT_MAX_BLACKLISTED; +} + +static int +node_blacklisted(const struct sockaddr *sa, int salen) +{ + int i; + + if((unsigned)salen > sizeof(struct sockaddr_storage)) + abort(); + + if(dht_blacklisted(sa, salen)) + return 1; + + for(i = 0; i < DHT_MAX_BLACKLISTED; i++) { + if(memcmp(&blacklist[i], sa, salen) == 0) + return 1; + } + + return 0; +} + +static struct node * +append_nodes(struct node *n1, struct node *n2) +{ + struct node *n; + + if(n1 == NULL) + return n2; + + if(n2 == NULL) + return n1; + + n = n1; + while(n->next != NULL) + n = n->next; + + n->next = n2; + return n1; +} + +/* Insert a new node into a bucket, don't check for duplicates. + Returns 1 if the node was inserted, 0 if a bucket must be split. */ +static int +insert_node(struct node *node, struct bucket **split_return) +{ + struct bucket *b = find_bucket(node->id, node->ss.ss_family); + + if(b == NULL) + return -1; + + if(b->count >= b->max_count) { + *split_return = b; + return 0; + } + node->next = b->nodes; + b->nodes = node; + b->count++; + return 1; +} + +/* Splits a bucket, and returns the list of nodes that must be reinserted + into the routing table. */ +static int +split_bucket_helper(struct bucket *b, struct node **nodes_return) +{ + struct bucket *new; + int rc; + unsigned char new_id[20]; + + if(!in_bucket(myid, b)) { + debugf("Attempted to split wrong bucket.\n"); + return -1; + } + + rc = bucket_middle(b, new_id); + if(rc < 0) + return -1; + + new = calloc(1, sizeof(struct bucket)); + if(new == NULL) + return -1; + + send_cached_ping(b); + + new->af = b->af; + memcpy(new->first, new_id, 20); + new->time = b->time; + + *nodes_return = b->nodes; + b->nodes = NULL; + b->count = 0; + new->next = b->next; + b->next = new; + + if(in_bucket(myid, b)) { + new->max_count = b->max_count; + b->max_count = MAX(b->max_count / 2, 8); + } else { + new->max_count = MAX(b->max_count / 2, 8); + } + + return 1; +} + +static int +split_bucket(struct bucket *b) +{ + int rc; + struct node *nodes = NULL; + struct node *n = NULL; + + debugf("Splitting.\n"); + rc = split_bucket_helper(b, &nodes); + if(rc < 0) { + debugf("Couldn't split bucket"); + return -1; + } + + while(n != NULL || nodes != NULL) { + struct bucket *split = NULL; + if(n == NULL) { + n = nodes; + nodes = nodes->next; + n->next = NULL; + } + rc = insert_node(n, &split); + if(rc < 0) { + debugf("Couldn't insert node.\n"); + free(n); + n = NULL; + } else if(rc > 0) { + n = NULL; + } else if(!in_bucket(myid, split)) { + free(n); + n = NULL; + } else { + struct node *insert = NULL; + debugf("Splitting (recursive).\n"); + rc = split_bucket_helper(split, &insert); + if(rc < 0) { + debugf("Couldn't split bucket.\n"); + free(n); + n = NULL; + } else { + nodes = append_nodes(nodes, insert); + } + } + } + return 1; +} + +/* We just learnt about a node, not necessarily a new one. Confirm is 1 if + the node sent a message, 2 if it sent us a reply. */ +static struct node * +new_node(const unsigned char *id, const struct sockaddr *sa, int salen, + int confirm) +{ + struct bucket *b; + struct node *n; + int mybucket; + + again: + + b = find_bucket(id, sa->sa_family); + if(b == NULL) + return NULL; + + if(id_cmp(id, myid) == 0) + return NULL; + + if(is_martian(sa) || node_blacklisted(sa, salen)) + return NULL; + + mybucket = in_bucket(myid, b); + + if(confirm == 2) + b->time = now.tv_sec; + + n = b->nodes; + while(n) { + if(id_cmp(n->id, id) == 0) { + if(confirm || n->time < now.tv_sec - 15 * 60) { + /* Known node. Update stuff. */ + memcpy((struct sockaddr*)&n->ss, sa, salen); + if(confirm) + n->time = now.tv_sec; + if(confirm >= 2) { + n->reply_time = now.tv_sec; + n->pinged = 0; + n->pinged_time = 0; + } + } + if(confirm == 2) + add_search_node(id, sa, salen); + return n; + } + n = n->next; + } + + /* New node. */ + + if(mybucket) { + if(sa->sa_family == AF_INET) + mybucket_grow_time = now.tv_sec; + else + mybucket6_grow_time = now.tv_sec; + } + + /* First, try to get rid of a known-bad node. */ + n = b->nodes; + while(n) { + if(n->pinged >= 3 && n->pinged_time < now.tv_sec - 15) { + memcpy(n->id, id, 20); + memcpy((struct sockaddr*)&n->ss, sa, salen); + n->time = confirm ? now.tv_sec : 0; + n->reply_time = confirm >= 2 ? now.tv_sec : 0; + n->pinged_time = 0; + n->pinged = 0; + if(confirm == 2) + add_search_node(id, sa, salen); + return n; + } + n = n->next; + } + + if(b->count >= b->max_count) { + /* Bucket full. Ping a dubious node */ + int dubious = 0; + n = b->nodes; + while(n) { + /* Pick the first dubious node that we haven't pinged in the + last 15 seconds. This gives nodes the time to reply, but + tends to concentrate on the same nodes, so that we get rid + of bad nodes fast. */ + if(!node_good(n)) { + dubious = 1; + if(n->pinged_time < now.tv_sec - 15) { + unsigned char tid[4]; + debugf("Sending ping to dubious node.\n"); + make_tid(tid, "pn", 0); + send_ping((struct sockaddr*)&n->ss, n->sslen, + tid, 4); + n->pinged++; + n->pinged_time = now.tv_sec; + break; + } + } + n = n->next; + } + + if(mybucket && !dubious) { + int rc; + rc = split_bucket(b); + if(rc > 0) + goto again; + return NULL; + } + + /* No space for this node. Cache it away for later. */ + if(confirm || b->cached.ss_family == 0) { + memcpy(&b->cached, sa, salen); + b->cachedlen = salen; + } + + if(confirm == 2) + add_search_node(id, sa, salen); + return NULL; + } + + /* Create a new node. */ + n = calloc(1, sizeof(struct node)); + if(n == NULL) + return NULL; + memcpy(n->id, id, 20); + memcpy(&n->ss, sa, salen); + n->sslen = salen; + n->time = confirm ? now.tv_sec : 0; + n->reply_time = confirm >= 2 ? now.tv_sec : 0; + n->next = b->nodes; + b->nodes = n; + b->count++; + if(confirm == 2) + add_search_node(id, sa, salen); + return n; +} + +/* Called periodically to purge known-bad nodes. Note that we're very + conservative here: broken nodes in the table don't do much harm, we'll + recover as soon as we find better ones. */ +static int +expire_buckets(struct bucket *b) +{ + while(b) { + struct node *n, *p; + int changed = 0; + + while(b->nodes && b->nodes->pinged >= 4) { + n = b->nodes; + b->nodes = n->next; + b->count--; + changed = 1; + free(n); + } + + p = b->nodes; + while(p) { + while(p->next && p->next->pinged >= 4) { + n = p->next; + p->next = n->next; + b->count--; + changed = 1; + free(n); + } + p = p->next; + } + + if(changed) + send_cached_ping(b); + + b = b->next; + } + expire_stuff_time = now.tv_sec + 120 + random() % 240; + return 1; +} + +/* While a search is in progress, we don't necessarily keep the nodes being + walked in the main bucket table. A search in progress is identified by + a unique transaction id, a short (and hence small enough to fit in the + transaction id of the protocol packets). */ + +static struct search * +find_search(unsigned short tid, int af) +{ + struct search *sr = searches; + while(sr) { + if(sr->tid == tid && sr->af == af) + return sr; + sr = sr->next; + } + return NULL; +} + +/* A search contains a list of nodes, sorted by decreasing distance to the + target. We just got a new candidate, insert it at the right spot or + discard it. */ + +static struct search_node* +insert_search_node(const unsigned char *id, + const struct sockaddr *sa, int salen, + struct search *sr, int replied, + unsigned char *token, int token_len) +{ + struct search_node *n; + int i, j; + + if(sa->sa_family != sr->af) { + debugf("Attempted to insert node in the wrong family.\n"); + return NULL; + } + + for(i = 0; i < sr->numnodes; i++) { + if(id_cmp(id, sr->nodes[i].id) == 0) { + n = &sr->nodes[i]; + goto found; + } + if(xorcmp(id, sr->nodes[i].id, sr->id) < 0) + break; + } + + if(i == SEARCH_NODES) + return NULL; + + if(sr->numnodes < SEARCH_NODES) + sr->numnodes++; + + for(j = sr->numnodes - 1; j > i; j--) { + sr->nodes[j] = sr->nodes[j - 1]; + } + + n = &sr->nodes[i]; + + memset(n, 0, sizeof(struct search_node)); + memcpy(n->id, id, 20); + +found: + memcpy(&n->ss, sa, salen); + n->sslen = salen; + + if(replied) { + n->replied = 1; + n->reply_time = now.tv_sec; + n->request_time = 0; + n->pinged = 0; + } + if(token) { + if(token_len >= 40) { + debugf("Eek! Overlong token.\n"); + } else { + memcpy(n->token, token, token_len); + n->token_len = token_len; + } + } + + return n; +} + +static void +flush_search_node(struct search_node *n, struct search *sr) +{ + int i = n - sr->nodes, j; + for(j = i; j < sr->numnodes - 1; j++) + sr->nodes[j] = sr->nodes[j + 1]; + sr->numnodes--; +} + +static void +expire_searches(dht_callback_t *callback, void *closure) +{ + struct search *sr = searches, *previous = NULL; + + while(sr) { + struct search *next = sr->next; + if(sr->step_time < now.tv_sec - DHT_SEARCH_EXPIRE_TIME) { + if(previous) + previous->next = next; + else + searches = next; + numsearches--; + if (!sr->done) { + if(callback) + (*callback)(closure, + sr->af == AF_INET ? + DHT_EVENT_SEARCH_DONE : DHT_EVENT_SEARCH_DONE6, + sr->id, NULL, 0); + } + free(sr); + } else { + previous = sr; + } + sr = next; + } +} + +/* This must always return 0 or 1, never -1, not even on failure (see below). */ +static int +search_send_get_peers(struct search *sr, struct search_node *n) +{ + struct node *node; + unsigned char tid[4]; + + if(n == NULL) { + int i; + for(i = 0; i < sr->numnodes; i++) { + if(sr->nodes[i].pinged < 3 && !sr->nodes[i].replied && + sr->nodes[i].request_time < now.tv_sec - DHT_SEARCH_RETRANSMIT) + n = &sr->nodes[i]; + } + } + + if(!n || n->pinged >= 3 || n->replied || + n->request_time >= now.tv_sec - DHT_SEARCH_RETRANSMIT) + return 0; + + debugf("Sending get_peers.\n"); + make_tid(tid, "gp", sr->tid); + send_get_peers((struct sockaddr*)&n->ss, n->sslen, tid, 4, sr->id, -1, + n->reply_time >= now.tv_sec - DHT_SEARCH_RETRANSMIT); + n->pinged++; + n->request_time = now.tv_sec; + /* If the node happens to be in our main routing table, mark it + as pinged. */ + node = find_node(n->id, n->ss.ss_family); + if(node) pinged(node, NULL); + return 1; +} + +/* Insert a new node into any incomplete search. */ +static void +add_search_node(const unsigned char *id, const struct sockaddr *sa, int salen) +{ + struct search *sr; + for(sr = searches; sr; sr = sr->next) { + if(sr->af == sa->sa_family && sr->numnodes < SEARCH_NODES) { + struct search_node *n = + insert_search_node(id, sa, salen, sr, 0, NULL, 0); + if(n) + search_send_get_peers(sr, n); + } + } +} + +/* When a search is in progress, we periodically call search_step to send + further requests. */ +static void +search_step(struct search *sr, dht_callback_t *callback, void *closure) +{ + int i, j; + int all_done = 1; + + /* Check if the first 8 live nodes have replied. */ + j = 0; + for(i = 0; i < sr->numnodes && j < 8; i++) { + struct search_node *n = &sr->nodes[i]; + if(n->pinged >= 3) + continue; + if(!n->replied) { + all_done = 0; + break; + } + j++; + } + + if(all_done) { + if(sr->port == 0) { + goto done; + } else { + int all_acked = 1; + j = 0; + for(i = 0; i < sr->numnodes && j < 8; i++) { + struct search_node *n = &sr->nodes[i]; + struct node *node; + unsigned char tid[4]; + if(n->pinged >= 3) + continue; + /* A proposed extension to the protocol consists in + omitting the token when storage tables are full. While + I don't think this makes a lot of sense -- just sending + a positive reply is just as good --, let's deal with it. */ + if(n->token_len == 0) + n->acked = 1; + if(!n->acked) { + all_acked = 0; + debugf("Sending announce_peer.\n"); + make_tid(tid, "ap", sr->tid); + send_announce_peer((struct sockaddr*)&n->ss, + sizeof(struct sockaddr_storage), + tid, 4, sr->id, sr->port, + n->token, n->token_len, + n->reply_time >= now.tv_sec - 15); + n->pinged++; + n->request_time = now.tv_sec; + node = find_node(n->id, n->ss.ss_family); + if(node) pinged(node, NULL); + } + j++; + } + if(all_acked) + goto done; + } + sr->step_time = now.tv_sec; + return; + } + + if(sr->step_time + DHT_SEARCH_RETRANSMIT >= now.tv_sec) + return; + + j = 0; + for(i = 0; i < sr->numnodes; i++) { + j += search_send_get_peers(sr, &sr->nodes[i]); + if(j >= DHT_INFLIGHT_QUERIES) + break; + } + sr->step_time = now.tv_sec; + return; + + done: + sr->done = 1; + if(callback) + (*callback)(closure, + sr->af == AF_INET ? + DHT_EVENT_SEARCH_DONE : DHT_EVENT_SEARCH_DONE6, + sr->id, NULL, 0); + sr->step_time = now.tv_sec; +} + +static struct search * +new_search(void) +{ + struct search *sr, *oldest = NULL; + + /* Find the oldest done search */ + sr = searches; + while(sr) { + if(sr->done && + (oldest == NULL || oldest->step_time > sr->step_time)) + oldest = sr; + sr = sr->next; + } + + /* The oldest slot is expired. */ + if(oldest && oldest->step_time < now.tv_sec - DHT_SEARCH_EXPIRE_TIME) + return oldest; + + /* Allocate a new slot. */ + if(numsearches < DHT_MAX_SEARCHES) { + sr = calloc(1, sizeof(struct search)); + if(sr != NULL) { + sr->next = searches; + searches = sr; + numsearches++; + return sr; + } + } + + /* Return oldest slot if it's done. */ + if(oldest && oldest->done) + return oldest; + + /* No available slots found, return NULL. */ + return NULL; +} + +/* Insert the contents of a bucket into a search structure. */ +static void +insert_search_bucket(struct bucket *b, struct search *sr) +{ + struct node *n; + n = b->nodes; + while(n) { + insert_search_node(n->id, (struct sockaddr*)&n->ss, n->sslen, + sr, 0, NULL, 0); + n = n->next; + } +} + +/* Start a search. If port is non-zero, perform an announce when the + search is complete. */ +int +dht_search(const unsigned char *id, int port, int af, + dht_callback_t *callback, void *closure) +{ + struct search *sr; + struct storage *st; + struct bucket *b = find_bucket(id, af); + + if(b == NULL) { + errno = EAFNOSUPPORT; + return -1; + } + + /* Try to answer this search locally. In a fully grown DHT this + is very unlikely, but people are running modified versions of + this code in private DHTs with very few nodes. What's wrong + with flooding? */ + if(callback) { + st = find_storage(id); + if(st) { + unsigned short swapped; + unsigned char buf[18]; + int i; + + debugf("Found local data (%d peers).\n", st->numpeers); + + for(i = 0; i < st->numpeers; i++) { + swapped = htons(st->peers[i].port); + if(st->peers[i].len == 4) { + memcpy(buf, st->peers[i].ip, 4); + memcpy(buf + 4, &swapped, 2); + (*callback)(closure, DHT_EVENT_VALUES, id, + (void*)buf, 6); + } else if(st->peers[i].len == 16) { + memcpy(buf, st->peers[i].ip, 16); + memcpy(buf + 16, &swapped, 2); + (*callback)(closure, DHT_EVENT_VALUES6, id, + (void*)buf, 18); + } + } + } + } + + sr = searches; + while(sr) { + if(sr->af == af && id_cmp(sr->id, id) == 0) + break; + sr = sr->next; + } + + int sr_duplicate = sr && !sr->done; + + if(sr) { + /* We're reusing data from an old search. Reusing the same tid + means that we can merge replies for both searches. */ + int i; + sr->done = 0; + again: + for(i = 0; i < sr->numnodes; i++) { + struct search_node *n; + n = &sr->nodes[i]; + /* Discard any doubtful nodes. */ + if(n->pinged >= 3 || n->reply_time < now.tv_sec - 7200) { + flush_search_node(n, sr); + goto again; + } + n->pinged = 0; + n->token_len = 0; + n->replied = 0; + n->acked = 0; + } + } else { + sr = new_search(); + if(sr == NULL) { + errno = ENOSPC; + return -1; + } + sr->af = af; + sr->tid = search_id++; + sr->step_time = 0; + memcpy(sr->id, id, 20); + sr->done = 0; + sr->numnodes = 0; + } + + sr->port = port; + + insert_search_bucket(b, sr); + + if(sr->numnodes < SEARCH_NODES) { + struct bucket *p = previous_bucket(b); + if(b->next) + insert_search_bucket(b->next, sr); + if(p) + insert_search_bucket(p, sr); + } + if(sr->numnodes < SEARCH_NODES) + insert_search_bucket(find_bucket(myid, af), sr); + + search_step(sr, callback, closure); + search_time = now.tv_sec; + if(sr_duplicate) { + return 0; + } else { + return 1; + } +} + +/* A struct storage stores all the stored peer addresses for a given info + hash. */ + +static struct storage * +find_storage(const unsigned char *id) +{ + struct storage *st = storage; + + while(st) { + if(id_cmp(id, st->id) == 0) + break; + st = st->next; + } + return st; +} + +static int +storage_store(const unsigned char *id, + const struct sockaddr *sa, unsigned short port) +{ + int i, len; + struct storage *st; + unsigned char *ip; + + if(sa->sa_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in*)sa; + ip = (unsigned char*)&sin->sin_addr; + len = 4; + } else if(sa->sa_family == AF_INET6) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)sa; + ip = (unsigned char*)&sin6->sin6_addr; + len = 16; + } else { + return -1; + } + + st = find_storage(id); + + if(st == NULL) { + if(numstorage >= DHT_MAX_HASHES) + return -1; + st = calloc(1, sizeof(struct storage)); + if(st == NULL) return -1; + memcpy(st->id, id, 20); + st->next = storage; + storage = st; + numstorage++; + } + + for(i = 0; i < st->numpeers; i++) { + if(st->peers[i].port == port && st->peers[i].len == len && + memcmp(st->peers[i].ip, ip, len) == 0) + break; + } + + if(i < st->numpeers) { + /* Already there, only need to refresh */ + st->peers[i].time = now.tv_sec; + return 0; + } else { + struct peer *p; + if(i >= st->maxpeers) { + /* Need to expand the array. */ + struct peer *new_peers; + int n; + if(st->maxpeers >= DHT_MAX_PEERS) + return 0; + n = st->maxpeers == 0 ? 2 : 2 * st->maxpeers; + n = MIN(n, DHT_MAX_PEERS); + new_peers = realloc(st->peers, n * sizeof(struct peer)); + if(new_peers == NULL) + return -1; + st->peers = new_peers; + st->maxpeers = n; + } + p = &st->peers[st->numpeers++]; + p->time = now.tv_sec; + p->len = len; + memcpy(p->ip, ip, len); + p->port = port; + return 1; + } +} + +static int +expire_storage(void) +{ + struct storage *st = storage, *previous = NULL; + while(st) { + int i = 0; + while(i < st->numpeers) { + if(st->peers[i].time < now.tv_sec - 32 * 60) { + if(i != st->numpeers - 1) + st->peers[i] = st->peers[st->numpeers - 1]; + st->numpeers--; + } else { + i++; + } + } + + if(st->numpeers == 0) { + free(st->peers); + if(previous) + previous->next = st->next; + else + storage = st->next; + free(st); + if(previous) + st = previous->next; + else + st = storage; + numstorage--; + if(numstorage < 0) { + debugf("Eek... numstorage became negative.\n"); + numstorage = 0; + } + } else { + previous = st; + st = st->next; + } + } + return 1; +} + +static int +rotate_secrets(void) +{ + int rc; + + rotate_secrets_time = now.tv_sec + 900 + random() % 1800; + + memcpy(oldsecret, secret, sizeof(secret)); + rc = dht_random_bytes(secret, sizeof(secret)); + + if(rc < 0) + return -1; + + return 1; +} + +#ifndef TOKEN_SIZE +#define TOKEN_SIZE 8 +#endif + +static void +make_token(const struct sockaddr *sa, int old, unsigned char *token_return) +{ + void *ip; + int iplen; + unsigned short port; + + if(sa->sa_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in*)sa; + ip = &sin->sin_addr; + iplen = 4; + port = htons(sin->sin_port); + } else if(sa->sa_family == AF_INET6) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)sa; + ip = &sin6->sin6_addr; + iplen = 16; + port = htons(sin6->sin6_port); + } else { + abort(); + } + + dht_hash(token_return, TOKEN_SIZE, + old ? oldsecret : secret, sizeof(secret), + ip, iplen, (unsigned char*)&port, 2); +} +static int +token_match(const unsigned char *token, int token_len, + const struct sockaddr *sa) +{ + unsigned char t[TOKEN_SIZE]; + if(token_len != TOKEN_SIZE) + return 0; + make_token(sa, 0, t); + if(memcmp(t, token, TOKEN_SIZE) == 0) + return 1; + make_token(sa, 1, t); + if(memcmp(t, token, TOKEN_SIZE) == 0) + return 1; + return 0; +} + +int +dht_nodes(int af, int *good_return, int *dubious_return, int *cached_return, + int *incoming_return) +{ + int good = 0, dubious = 0, cached = 0, incoming = 0; + struct bucket *b = af == AF_INET ? buckets : buckets6; + + while(b) { + struct node *n = b->nodes; + while(n) { + if(node_good(n)) { + good++; + if(n->time > n->reply_time) + incoming++; + } else { + dubious++; + } + n = n->next; + } + if(b->cached.ss_family > 0) + cached++; + b = b->next; + } + if(good_return) + *good_return = good; + if(dubious_return) + *dubious_return = dubious; + if(cached_return) + *cached_return = cached; + if(incoming_return) + *incoming_return = incoming; + return good + dubious; +} + +static void +dump_bucket(FILE *f, struct bucket *b) +{ + struct node *n = b->nodes; + fprintf(f, "Bucket "); + print_hex(f, b->first, 20); + fprintf(f, " count %d/%d age %d%s%s:\n", + b->count, b->max_count, (int)(now.tv_sec - b->time), + in_bucket(myid, b) ? " (mine)" : "", + b->cached.ss_family ? " (cached)" : ""); + while(n) { + char buf[512]; + unsigned short port; + fprintf(f, " Node "); + print_hex(f, n->id, 20); + if(n->ss.ss_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in*)&n->ss; + inet_ntop(AF_INET, &sin->sin_addr, buf, 512); + port = ntohs(sin->sin_port); + } else if(n->ss.ss_family == AF_INET6) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)&n->ss; + inet_ntop(AF_INET6, &sin6->sin6_addr, buf, 512); + port = ntohs(sin6->sin6_port); + } else { + snprintf(buf, 512, "unknown(%d)", n->ss.ss_family); + port = 0; + } + + if(n->ss.ss_family == AF_INET6) + fprintf(f, " [%s]:%d ", buf, port); + else + fprintf(f, " %s:%d ", buf, port); + if(n->time != n->reply_time) + fprintf(f, "age %ld, %ld", + (long)(now.tv_sec - n->time), + (long)(now.tv_sec - n->reply_time)); + else + fprintf(f, "age %ld", (long)(now.tv_sec - n->time)); + if(n->pinged) + fprintf(f, " (%d)", n->pinged); + if(node_good(n)) + fprintf(f, " (good)"); + fprintf(f, "\n"); + n = n->next; + } + +} + +void +dht_dump_tables(FILE *f) +{ + int i; + struct bucket *b; + struct storage *st = storage; + struct search *sr = searches; + + fprintf(f, "My id "); + print_hex(f, myid, 20); + fprintf(f, "\n"); + + b = buckets; + while(b) { + dump_bucket(f, b); + b = b->next; + } + + fprintf(f, "\n"); + + b = buckets6; + while(b) { + dump_bucket(f, b); + b = b->next; + } + + while(sr) { + fprintf(f, "\nSearch%s id ", sr->af == AF_INET6 ? " (IPv6)" : ""); + print_hex(f, sr->id, 20); + fprintf(f, " age %d%s\n", (int)(now.tv_sec - sr->step_time), + sr->done ? " (done)" : ""); + for(i = 0; i < sr->numnodes; i++) { + struct search_node *n = &sr->nodes[i]; + fprintf(f, "Node %d id ", i); + print_hex(f, n->id, 20); + fprintf(f, " bits %d age ", common_bits(sr->id, n->id)); + if(n->request_time) + fprintf(f, "%d, ", (int)(now.tv_sec - n->request_time)); + fprintf(f, "%d", (int)(now.tv_sec - n->reply_time)); + if(n->pinged) + fprintf(f, " (%d)", n->pinged); + fprintf(f, "%s%s.\n", + find_node(n->id, sr->af) ? " (known)" : "", + n->replied ? " (replied)" : ""); + } + sr = sr->next; + } + + while(st) { + fprintf(f, "\nStorage "); + print_hex(f, st->id, 20); + fprintf(f, " %d/%d nodes:", st->numpeers, st->maxpeers); + for(i = 0; i < st->numpeers; i++) { + char buf[100]; + if(st->peers[i].len == 4) { + inet_ntop(AF_INET, st->peers[i].ip, buf, 100); + } else if(st->peers[i].len == 16) { + buf[0] = '['; + inet_ntop(AF_INET6, st->peers[i].ip, buf + 1, 98); + strcat(buf, "]"); + } else { + strcpy(buf, "???"); + } + fprintf(f, " %s:%u (%ld)", + buf, st->peers[i].port, + (long)(now.tv_sec - st->peers[i].time)); + } + st = st->next; + } + + fprintf(f, "\n\n"); + fflush(f); +} + +int +dht_init(int s, int s6, const unsigned char *id, const unsigned char *v) +{ + int rc; + + if(dht_socket >= 0 || dht_socket6 >= 0 || buckets || buckets6) { + errno = EBUSY; + return -1; + } + + searches = NULL; + numsearches = 0; + + storage = NULL; + numstorage = 0; + + if(s >= 0) { + buckets = calloc(1, sizeof(struct bucket)); + if(buckets == NULL) + return -1; + buckets->max_count = 128; + buckets->af = AF_INET; + } + + if(s6 >= 0) { + buckets6 = calloc(1, sizeof(struct bucket)); + if(buckets6 == NULL) + return -1; + buckets6->max_count = 128; + buckets6->af = AF_INET6; + } + + memcpy(myid, id, 20); + if(v) { + memcpy(my_v, "1:v4:", 5); + memcpy(my_v + 5, v, 4); + have_v = 1; + } else { + have_v = 0; + } + + dht_gettimeofday(&now, NULL); + + mybucket_grow_time = now.tv_sec; + mybucket6_grow_time = now.tv_sec; + confirm_nodes_time = now.tv_sec + random() % 3; + + search_id = random() & 0xFFFF; + search_time = 0; + + next_blacklisted = 0; + + token_bucket_time = now.tv_sec; + token_bucket_tokens = MAX_TOKEN_BUCKET_TOKENS; + + memset(secret, 0, sizeof(secret)); + rc = rotate_secrets(); + if(rc < 0) + goto fail; + + dht_socket = s; + dht_socket6 = s6; + + expire_buckets(buckets); + expire_buckets(buckets6); + + return 1; + + fail: + free(buckets); + buckets = NULL; + free(buckets6); + buckets6 = NULL; + return -1; +} + +int +dht_uninit() +{ + if(dht_socket < 0 && dht_socket6 < 0) { + errno = EINVAL; + return -1; + } + + dht_socket = -1; + dht_socket6 = -1; + + while(buckets) { + struct bucket *b = buckets; + buckets = b->next; + while(b->nodes) { + struct node *n = b->nodes; + b->nodes = n->next; + free(n); + } + free(b); + } + + while(buckets6) { + struct bucket *b = buckets6; + buckets6 = b->next; + while(b->nodes) { + struct node *n = b->nodes; + b->nodes = n->next; + free(n); + } + free(b); + } + + while(storage) { + struct storage *st = storage; + storage = storage->next; + free(st->peers); + free(st); + } + + while(searches) { + struct search *sr = searches; + searches = searches->next; + free(sr); + } + + return 1; +} + +/* Rate control for requests we receive. */ + +static int +token_bucket(void) +{ + if(token_bucket_tokens == 0) { + token_bucket_tokens = MIN(MAX_TOKEN_BUCKET_TOKENS, + 100 * (now.tv_sec - token_bucket_time)); + token_bucket_time = now.tv_sec; + } + + if(token_bucket_tokens == 0) + return 0; + + token_bucket_tokens--; + return 1; +} + +static int +neighbourhood_maintenance(int af) +{ + unsigned char id[20]; + struct bucket *b = find_bucket(myid, af); + struct bucket *q; + struct node *n; + + if(b == NULL) + return 0; + + memcpy(id, myid, 20); + id[19] = random() & 0xFF; + q = b; + if(q->next && (q->count == 0 || (random() & 7) == 0)) + q = b->next; + if(q->count == 0 || (random() & 7) == 0) { + struct bucket *r; + r = previous_bucket(b); + if(r && r->count > 0) + q = r; + } + + if(q) { + /* Since our node-id is the same in both DHTs, it's probably + profitable to query both families. */ + int want = dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; + n = random_node(q); + if(n) { + unsigned char tid[4]; + debugf("Sending find_node for%s neighborhood maintenance.\n", + af == AF_INET6 ? " IPv6" : ""); + make_tid(tid, "fn", 0); + send_find_node((struct sockaddr*)&n->ss, n->sslen, + tid, 4, id, want, + n->reply_time >= now.tv_sec - 15); + pinged(n, q); + } + return 1; + } + return 0; +} + +static int +bucket_maintenance(int af) +{ + struct bucket *b; + + b = af == AF_INET ? buckets : buckets6; + + while(b) { + /* 10 minutes for an 8-node bucket */ + int to = MAX(600 / (b->max_count / 8), 30); + struct bucket *q; + if(b->time < now.tv_sec - to) { + /* This bucket hasn't seen any positive confirmation for a long + time. Pick a random id in this bucket's range, and send + a request to a random node. */ + unsigned char id[20]; + struct node *n; + int rc; + + rc = bucket_random(b, id); + if(rc < 0) + memcpy(id, b->first, 20); + + q = b; + /* If the bucket is empty, we try to fill it from a neighbour. + We also sometimes do it gratuitiously to recover from + buckets full of broken nodes. */ + if(q->next && (q->count == 0 || (random() & 7) == 0)) + q = b->next; + if(q->count == 0 || (random() & 7) == 0) { + struct bucket *r; + r = previous_bucket(b); + if(r && r->count > 0) + q = r; + } + + if(q) { + n = random_node(q); + if(n) { + unsigned char tid[4]; + int want = -1; + + if(dht_socket >= 0 && dht_socket6 >= 0) { + struct bucket *otherbucket; + otherbucket = + find_bucket(id, af == AF_INET ? AF_INET6 : AF_INET); + if(otherbucket && + otherbucket->count < otherbucket->max_count) + /* The corresponding bucket in the other family + is not full -- querying both is useful. */ + want = WANT4 | WANT6; + else if(random() % 37 == 0) + /* Most of the time, this just adds overhead. + However, it might help stitch back one of + the DHTs after a network collapse, so query + both, but only very occasionally. */ + want = WANT4 | WANT6; + } + + debugf("Sending find_node for%s bucket maintenance.\n", + af == AF_INET6 ? " IPv6" : ""); + make_tid(tid, "fn", 0); + send_find_node((struct sockaddr*)&n->ss, n->sslen, + tid, 4, id, want, + n->reply_time >= now.tv_sec - 15); + pinged(n, q); + /* In order to avoid sending queries back-to-back, + give up for now and reschedule us soon. */ + return 1; + } + } + } + b = b->next; + } + return 0; +} + +int +dht_periodic(const void *buf, size_t buflen, + const struct sockaddr *from, int fromlen, + time_t *tosleep, + dht_callback_t *callback, void *closure) +{ + dht_gettimeofday(&now, NULL); + + if(buflen > 0) { + int message; + struct parsed_message m; + unsigned short ttid; + + if(is_martian(from)) + goto dontread; + + if(node_blacklisted(from, fromlen)) { + debugf("Received packet from blacklisted node.\n"); + goto dontread; + } + + if(((char*)buf)[buflen] != '\0') { + debugf("Unterminated message.\n"); + errno = EINVAL; + return -1; + } + + memset(&m, 0, sizeof(m)); + message = parse_message(buf, buflen, &m); + + if(message < 0 || message == ERROR || id_cmp(m.id, zeroes) == 0) { + debugf("Unparseable message: "); + debug_printable(buf, buflen); + debugf("\n"); + goto dontread; + } + + if(id_cmp(m.id, myid) == 0) { + debugf("Received message from self.\n"); + goto dontread; + } + + if(message > REPLY) { + /* Rate limit requests. */ + if(!token_bucket()) { + debugf("Dropping request due to rate limiting.\n"); + goto dontread; + } + } + + switch(message) { + case REPLY: + if(m.tid_len != 4) { + debugf("Broken node truncates transaction ids: "); + debug_printable(buf, buflen); + debugf("\n"); + /* This is really annoying, as it means that we will + time-out all our searches that go through this node. + Kill it. */ + blacklist_node(m.id, from, fromlen); + goto dontread; + } + if(tid_match(m.tid, "pn", NULL)) { + debugf("Pong!\n"); + new_node(m.id, from, fromlen, 2); + } else if(tid_match(m.tid, "fn", NULL) || + tid_match(m.tid, "gp", NULL)) { + int gp = 0; + struct search *sr = NULL; + if(tid_match(m.tid, "gp", &ttid)) { + gp = 1; + sr = find_search(ttid, from->sa_family); + } + debugf("Nodes found (%d+%d)%s!\n", + m.nodes_len/26, m.nodes6_len/38, + gp ? " for get_peers" : ""); + if(m.nodes_len % 26 != 0 || m.nodes6_len % 38 != 0) { + debugf("Unexpected length for node info!\n"); + blacklist_node(m.id, from, fromlen); + } else if(gp && sr == NULL) { + debugf("Unknown search!\n"); + new_node(m.id, from, fromlen, 1); + } else { + int i; + new_node(m.id, from, fromlen, 2); + for(i = 0; i < m.nodes_len / 26; i++) { + unsigned char *ni = m.nodes + i * 26; + struct sockaddr_in sin; + if(id_cmp(ni, myid) == 0) + continue; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + memcpy(&sin.sin_addr, ni + 20, 4); + memcpy(&sin.sin_port, ni + 24, 2); + new_node(ni, (struct sockaddr*)&sin, sizeof(sin), 0); + if(sr && sr->af == AF_INET) { + insert_search_node(ni, + (struct sockaddr*)&sin, + sizeof(sin), + sr, 0, NULL, 0); + } + } + for(i = 0; i < m.nodes6_len / 38; i++) { + unsigned char *ni = m.nodes6 + i * 38; + struct sockaddr_in6 sin6; + if(id_cmp(ni, myid) == 0) + continue; + memset(&sin6, 0, sizeof(sin6)); + sin6.sin6_family = AF_INET6; + memcpy(&sin6.sin6_addr, ni + 20, 16); + memcpy(&sin6.sin6_port, ni + 36, 2); + new_node(ni, (struct sockaddr*)&sin6, sizeof(sin6), 0); + if(sr && sr->af == AF_INET6) { + insert_search_node(ni, + (struct sockaddr*)&sin6, + sizeof(sin6), + sr, 0, NULL, 0); + } + } + if(sr) + /* Since we received a reply, the number of + requests in flight has decreased. Let's push + another request. */ + search_send_get_peers(sr, NULL); + } + if(sr) { + insert_search_node(m.id, from, fromlen, sr, + 1, m.token, m.token_len); + if(m.values_len > 0 || m.values6_len > 0) { + debugf("Got values (%d+%d)!\n", + m.values_len / 6, m.values6_len / 18); + if(callback) { + if(m.values_len > 0) + (*callback)(closure, DHT_EVENT_VALUES, sr->id, + (void*)m.values, m.values_len); + + if(m.values6_len > 0) + (*callback)(closure, DHT_EVENT_VALUES6, sr->id, + (void*)m.values6, m.values6_len); + } + } + } + } else if(tid_match(m.tid, "ap", &ttid)) { + struct search *sr; + debugf("Got reply to announce_peer.\n"); + sr = find_search(ttid, from->sa_family); + if(!sr) { + debugf("Unknown search!\n"); + new_node(m.id, from, fromlen, 1); + } else { + int i; + new_node(m.id, from, fromlen, 2); + for(i = 0; i < sr->numnodes; i++) + if(id_cmp(sr->nodes[i].id, m.id) == 0) { + sr->nodes[i].request_time = 0; + sr->nodes[i].reply_time = now.tv_sec; + sr->nodes[i].acked = 1; + sr->nodes[i].pinged = 0; + break; + } + /* See comment for gp above. */ + search_send_get_peers(sr, NULL); + } + } else { + debugf("Unexpected reply: "); + debug_printable(buf, buflen); + debugf("\n"); + } + break; + case PING: + debugf("Ping (%d)!\n", m.tid_len); + new_node(m.id, from, fromlen, 1); + debugf("Sending pong.\n"); + send_pong(from, fromlen, m.tid, m.tid_len); + break; + case FIND_NODE: + debugf("Find node!\n"); + new_node(m.id, from, fromlen, 1); + debugf("Sending closest nodes (%d).\n", m.want); + send_closest_nodes(from, fromlen, + m.tid, m.tid_len, m.target, m.want, + 0, NULL, NULL, 0); + break; + case GET_PEERS: + debugf("Get_peers!\n"); + new_node(m.id, from, fromlen, 1); + if(id_cmp(m.info_hash, zeroes) == 0) { + debugf("Eek! Got get_peers with no info_hash.\n"); + send_error(from, fromlen, m.tid, m.tid_len, + 203, "Get_peers with no info_hash"); + break; + } else { + struct storage *st = find_storage(m.info_hash); + unsigned char token[TOKEN_SIZE]; + make_token(from, 0, token); + if(st && st->numpeers > 0) { + debugf("Sending found%s peers.\n", + from->sa_family == AF_INET6 ? " IPv6" : ""); + send_closest_nodes(from, fromlen, + m.tid, m.tid_len, + m.info_hash, m.want, + from->sa_family, st, + token, TOKEN_SIZE); + } else { + debugf("Sending nodes for get_peers.\n"); + send_closest_nodes(from, fromlen, + m.tid, m.tid_len, m.info_hash, m.want, + 0, NULL, token, TOKEN_SIZE); + } + } + break; + case ANNOUNCE_PEER: + debugf("Announce peer!\n"); + new_node(m.id, from, fromlen, 1); + if(id_cmp(m.info_hash, zeroes) == 0) { + debugf("Announce_peer with no info_hash.\n"); + send_error(from, fromlen, m.tid, m.tid_len, + 203, "Announce_peer with no info_hash"); + break; + } + if(!token_match(m.token, m.token_len, from)) { + debugf("Incorrect token for announce_peer.\n"); + send_error(from, fromlen, m.tid, m.tid_len, + 203, "Announce_peer with wrong token"); + break; + } + if(m.implied_port != 0) { + /* Do this even if port > 0. That's what the spec says. */ + switch(from->sa_family) { + case AF_INET: + m.port = htons(((struct sockaddr_in*)from)->sin_port); + break; + case AF_INET6: + m.port = htons(((struct sockaddr_in6*)from)->sin6_port); + break; + } + } + if(m.port == 0) { + debugf("Announce_peer with forbidden port %d.\n", m.port); + send_error(from, fromlen, m.tid, m.tid_len, + 203, "Announce_peer with forbidden port number"); + break; + } + storage_store(m.info_hash, from, m.port); + /* Note that if storage_store failed, we lie to the requestor. + This is to prevent them from backtracking, and hence + polluting the DHT. */ + debugf("Sending peer announced.\n"); + send_peer_announced(from, fromlen, m.tid, m.tid_len); + } + } + + dontread: + if(now.tv_sec >= rotate_secrets_time) + rotate_secrets(); + + if(now.tv_sec >= expire_stuff_time) { + expire_buckets(buckets); + expire_buckets(buckets6); + expire_storage(); + expire_searches(callback, closure); + } + + if(search_time > 0 && now.tv_sec >= search_time) { + struct search *sr; + sr = searches; + while(sr) { + if(!sr->done && + sr->step_time + DHT_SEARCH_RETRANSMIT / 2 + 1 <= now.tv_sec) { + search_step(sr, callback, closure); + } + sr = sr->next; + } + + search_time = 0; + + sr = searches; + while(sr) { + if(!sr->done) { + time_t tm = sr->step_time + + DHT_SEARCH_RETRANSMIT + random() % DHT_SEARCH_RETRANSMIT; + if(search_time == 0 || search_time > tm) + search_time = tm; + } + sr = sr->next; + } + } + + if(now.tv_sec >= confirm_nodes_time) { + int soon = 0; + + soon |= bucket_maintenance(AF_INET); + soon |= bucket_maintenance(AF_INET6); + + if(!soon) { + if(mybucket_grow_time >= now.tv_sec - 150) + soon |= neighbourhood_maintenance(AF_INET); + if(mybucket6_grow_time >= now.tv_sec - 150) + soon |= neighbourhood_maintenance(AF_INET6); + } + + /* Given the timeouts in bucket_maintenance, with a 22-bucket + table, worst case is a ping every 18 seconds (22 buckets plus + 11 buckets overhead for the larger buckets). Keep the "soon" + case within 15 seconds, which gives some margin for neighbourhood + maintenance. */ + + if(soon) + confirm_nodes_time = now.tv_sec + 5 + random() % 10; + else + confirm_nodes_time = now.tv_sec + 60 + random() % 120; + } + + if(confirm_nodes_time > now.tv_sec) + *tosleep = confirm_nodes_time - now.tv_sec; + else + *tosleep = 0; + + if(search_time > 0) { + if(search_time <= now.tv_sec) + *tosleep = 0; + else if(*tosleep > search_time - now.tv_sec) + *tosleep = search_time - now.tv_sec; + } + + return 1; +} + +int +dht_get_nodes(struct sockaddr_in *sin, int *num, + struct sockaddr_in6 *sin6, int *num6) +{ + int i, j; + struct bucket *b; + struct node *n; + + i = 0; + + /* For restoring to work without discarding too many nodes, the list + must start with the contents of our bucket. */ + b = find_bucket(myid, AF_INET); + if(b == NULL) + goto no_ipv4; + + n = b->nodes; + while(n && i < *num) { + if(node_good(n)) { + sin[i] = *(struct sockaddr_in*)&n->ss; + i++; + } + n = n->next; + } + + b = buckets; + while(b && i < *num) { + if(!in_bucket(myid, b)) { + n = b->nodes; + while(n && i < *num) { + if(node_good(n)) { + sin[i] = *(struct sockaddr_in*)&n->ss; + i++; + } + n = n->next; + } + } + b = b->next; + } + + no_ipv4: + + j = 0; + + b = find_bucket(myid, AF_INET6); + if(b == NULL) + goto no_ipv6; + + n = b->nodes; + while(n && j < *num6) { + if(node_good(n)) { + sin6[j] = *(struct sockaddr_in6*)&n->ss; + j++; + } + n = n->next; + } + + b = buckets6; + while(b && j < *num6) { + if(!in_bucket(myid, b)) { + n = b->nodes; + while(n && j < *num6) { + if(node_good(n)) { + sin6[j] = *(struct sockaddr_in6*)&n->ss; + j++; + } + n = n->next; + } + } + b = b->next; + } + + no_ipv6: + + *num = i; + *num6 = j; + return i + j; +} + +int +dht_insert_node(const unsigned char *id, struct sockaddr *sa, int salen) +{ + struct node *n; + + if(sa->sa_family != AF_INET && sa->sa_family != AF_INET6) { + errno = EAFNOSUPPORT; + return -1; + } + + n = new_node(id, sa, salen, 0); + return !!n; +} + +int +dht_ping_node(const struct sockaddr *sa, int salen) +{ + unsigned char tid[4]; + + debugf("Sending ping.\n"); + make_tid(tid, "pn", 0); + return send_ping(sa, salen, tid, 4); +} + +/* We could use a proper bencoding printer and parser, but the format of + DHT messages is fairly stylised, so this seemed simpler. */ + +#define CHECK(offset, delta, size) \ + if(delta < 0 || offset + delta > size) goto fail + +#define INC(offset, delta, size) \ + CHECK(offset, delta, size); \ + offset += delta + +#define COPY(buf, offset, src, delta, size) \ + CHECK(offset, delta, size); \ + memcpy(buf + offset, src, delta); \ + offset += delta; + +#define ADD_V(buf, offset, size) \ + if(have_v) { \ + COPY(buf, offset, my_v, sizeof(my_v), size); \ + } + +static int +dht_send(const void *buf, size_t len, int flags, + const struct sockaddr *sa, int salen) +{ + int s; + + if(salen == 0) + abort(); + + if(node_blacklisted(sa, salen)) { + debugf("Attempting to send to blacklisted node.\n"); + errno = EPERM; + return -1; + } + + if(sa->sa_family == AF_INET) + s = dht_socket; + else if(sa->sa_family == AF_INET6) + s = dht_socket6; + else + s = -1; + + if(s < 0) { + errno = EAFNOSUPPORT; + return -1; + } + + return dht_sendto(s, buf, len, flags, sa, salen); +} + +int +send_ping(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len) +{ + char buf[512]; + int i = 0, rc; + rc = snprintf(buf + i, 512 - i, "d1:ad2:id20:"); INC(i, rc, 512); + COPY(buf, i, myid, 20, 512); + rc = snprintf(buf + i, 512 - i, "e1:q4:ping1:t%d:", tid_len); + INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:qe"); INC(i, rc, 512); + return dht_send(buf, i, 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +int +send_pong(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len) +{ + char buf[512]; + int i = 0, rc; + rc = snprintf(buf + i, 512 - i, "d1:rd2:id20:"); INC(i, rc, 512); + COPY(buf, i, myid, 20, 512); + rc = snprintf(buf + i, 512 - i, "e1:t%d:", tid_len); INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:re"); INC(i, rc, 512); + return dht_send(buf, i, 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +int +send_find_node(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len, + const unsigned char *target, int want, int confirm) +{ + char buf[512]; + int i = 0, rc; + rc = snprintf(buf + i, 512 - i, "d1:ad2:id20:"); INC(i, rc, 512); + COPY(buf, i, myid, 20, 512); + rc = snprintf(buf + i, 512 - i, "6:target20:"); INC(i, rc, 512); + COPY(buf, i, target, 20, 512); + if(want > 0) { + rc = snprintf(buf + i, 512 - i, "4:wantl%s%se", + (want & WANT4) ? "2:n4" : "", + (want & WANT6) ? "2:n6" : ""); + INC(i, rc, 512); + } + rc = snprintf(buf + i, 512 - i, "e1:q9:find_node1:t%d:", tid_len); + INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:qe"); INC(i, rc, 512); + return dht_send(buf, i, confirm ? MSG_CONFIRM : 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +int +send_nodes_peers(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len, + const unsigned char *nodes, int nodes_len, + const unsigned char *nodes6, int nodes6_len, + int af, struct storage *st, + const unsigned char *token, int token_len) +{ + char buf[2048]; + int i = 0, rc, j0, j, k, len; + + rc = snprintf(buf + i, 2048 - i, "d1:rd2:id20:"); INC(i, rc, 2048); + COPY(buf, i, myid, 20, 2048); + if(nodes_len > 0) { + rc = snprintf(buf + i, 2048 - i, "5:nodes%d:", nodes_len); + INC(i, rc, 2048); + COPY(buf, i, nodes, nodes_len, 2048); + } + if(nodes6_len > 0) { + rc = snprintf(buf + i, 2048 - i, "6:nodes6%d:", nodes6_len); + INC(i, rc, 2048); + COPY(buf, i, nodes6, nodes6_len, 2048); + } + if(token_len > 0) { + rc = snprintf(buf + i, 2048 - i, "5:token%d:", token_len); + INC(i, rc, 2048); + COPY(buf, i, token, token_len, 2048); + } + + if(st && st->numpeers > 0) { + /* We treat the storage as a circular list, and serve a randomly + chosen slice. In order to make sure we fit within 1024 octets, + we limit ourselves to 50 peers. */ + + len = af == AF_INET ? 4 : 16; + j0 = random() % st->numpeers; + j = j0; + k = 0; + + rc = snprintf(buf + i, 2048 - i, "6:valuesl"); INC(i, rc, 2048); + do { + if(st->peers[j].len == len) { + unsigned short swapped; + swapped = htons(st->peers[j].port); + rc = snprintf(buf + i, 2048 - i, "%d:", len + 2); + INC(i, rc, 2048); + COPY(buf, i, st->peers[j].ip, len, 2048); + COPY(buf, i, &swapped, 2, 2048); + k++; + } + j = (j + 1) % st->numpeers; + } while(j != j0 && k < 50); + rc = snprintf(buf + i, 2048 - i, "e"); INC(i, rc, 2048); + } + + rc = snprintf(buf + i, 2048 - i, "e1:t%d:", tid_len); INC(i, rc, 2048); + COPY(buf, i, tid, tid_len, 2048); + ADD_V(buf, i, 2048); + rc = snprintf(buf + i, 2048 - i, "1:y1:re"); INC(i, rc, 2048); + + return dht_send(buf, i, 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +static int +insert_closest_node(unsigned char *nodes, int numnodes, + const unsigned char *id, struct node *n) +{ + int i, size; + + if(n->ss.ss_family == AF_INET) + size = 26; + else if(n->ss.ss_family == AF_INET6) + size = 38; + else + abort(); + + for(i = 0; i< numnodes; i++) { + if(id_cmp(n->id, nodes + size * i) == 0) + return numnodes; + if(xorcmp(n->id, nodes + size * i, id) < 0) + break; + } + + if(i == 8) + return numnodes; + + if(numnodes < 8) + numnodes++; + + if(i < numnodes - 1) + memmove(nodes + size * (i + 1), nodes + size * i, + size * (numnodes - i - 1)); + + if(n->ss.ss_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in*)&n->ss; + memcpy(nodes + size * i, n->id, 20); + memcpy(nodes + size * i + 20, &sin->sin_addr, 4); + memcpy(nodes + size * i + 24, &sin->sin_port, 2); + } else if(n->ss.ss_family == AF_INET6) { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)&n->ss; + memcpy(nodes + size * i, n->id, 20); + memcpy(nodes + size * i + 20, &sin6->sin6_addr, 16); + memcpy(nodes + size * i + 36, &sin6->sin6_port, 2); + } else { + abort(); + } + + return numnodes; +} + +static int +buffer_closest_nodes(unsigned char *nodes, int numnodes, + const unsigned char *id, struct bucket *b) +{ + struct node *n = b->nodes; + while(n) { + if(node_good(n)) + numnodes = insert_closest_node(nodes, numnodes, id, n); + n = n->next; + } + return numnodes; +} + +int +send_closest_nodes(const struct sockaddr *sa, int salen, + const unsigned char *tid, int tid_len, + const unsigned char *id, int want, + int af, struct storage *st, + const unsigned char *token, int token_len) +{ + unsigned char nodes[8 * 26]; + unsigned char nodes6[8 * 38]; + int numnodes = 0, numnodes6 = 0; + struct bucket *b; + + if(want <= 0) + want = sa->sa_family == AF_INET ? WANT4 : WANT6; + + if((want & WANT4)) { + b = find_bucket(id, AF_INET); + if(b) { + numnodes = buffer_closest_nodes(nodes, numnodes, id, b); + if(b->next) + numnodes = buffer_closest_nodes(nodes, numnodes, id, b->next); + b = previous_bucket(b); + if(b) + numnodes = buffer_closest_nodes(nodes, numnodes, id, b); + } + } + + if((want & WANT6)) { + b = find_bucket(id, AF_INET6); + if(b) { + numnodes6 = buffer_closest_nodes(nodes6, numnodes6, id, b); + if(b->next) + numnodes6 = + buffer_closest_nodes(nodes6, numnodes6, id, b->next); + b = previous_bucket(b); + if(b) + numnodes6 = buffer_closest_nodes(nodes6, numnodes6, id, b); + } + } + debugf(" (%d+%d nodes.)\n", numnodes, numnodes6); + + return send_nodes_peers(sa, salen, tid, tid_len, + nodes, numnodes * 26, + nodes6, numnodes6 * 38, + af, st, token, token_len); +} + +int +send_get_peers(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len, unsigned char *infohash, + int want, int confirm) +{ + char buf[512]; + int i = 0, rc; + + rc = snprintf(buf + i, 512 - i, "d1:ad2:id20:"); INC(i, rc, 512); + COPY(buf, i, myid, 20, 512); + rc = snprintf(buf + i, 512 - i, "9:info_hash20:"); INC(i, rc, 512); + COPY(buf, i, infohash, 20, 512); + if(want > 0) { + rc = snprintf(buf + i, 512 - i, "4:wantl%s%se", + (want & WANT4) ? "2:n4" : "", + (want & WANT6) ? "2:n6" : ""); + INC(i, rc, 512); + } + rc = snprintf(buf + i, 512 - i, "e1:q9:get_peers1:t%d:", tid_len); + INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:qe"); INC(i, rc, 512); + return dht_send(buf, i, confirm ? MSG_CONFIRM : 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +int +send_announce_peer(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len, + unsigned char *infohash, unsigned short port, + unsigned char *token, int token_len, int confirm) +{ + char buf[512]; + int i = 0, rc; + + rc = snprintf(buf + i, 512 - i, "d1:ad2:id20:"); INC(i, rc, 512); + COPY(buf, i, myid, 20, 512); + rc = snprintf(buf + i, 512 - i, "12:implied_porti1e9:info_hash20:"); INC(i, rc, 512); + COPY(buf, i, infohash, 20, 512); + rc = snprintf(buf + i, 512 - i, "4:porti%ue5:token%d:", (unsigned)port, + token_len); + INC(i, rc, 512); + COPY(buf, i, token, token_len, 512); + rc = snprintf(buf + i, 512 - i, "e1:q13:announce_peer1:t%d:", tid_len); + INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:qe"); INC(i, rc, 512); + + return dht_send(buf, i, confirm ? 0 : MSG_CONFIRM, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +static int +send_peer_announced(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len) +{ + char buf[512]; + int i = 0, rc; + + rc = snprintf(buf + i, 512 - i, "d1:rd2:id20:"); INC(i, rc, 512); + COPY(buf, i, myid, 20, 512); + rc = snprintf(buf + i, 512 - i, "e1:t%d:", tid_len); + INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:re"); INC(i, rc, 512); + return dht_send(buf, i, 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +static int +send_error(const struct sockaddr *sa, int salen, + unsigned char *tid, int tid_len, + int code, const char *message) +{ + char buf[512]; + int i = 0, rc, message_len; + + message_len = strlen(message); + rc = snprintf(buf + i, 512 - i, "d1:eli%de%d:", code, message_len); + INC(i, rc, 512); + COPY(buf, i, message, message_len, 512); + rc = snprintf(buf + i, 512 - i, "e1:t%d:", tid_len); INC(i, rc, 512); + COPY(buf, i, tid, tid_len, 512); + ADD_V(buf, i, 512); + rc = snprintf(buf + i, 512 - i, "1:y1:ee"); INC(i, rc, 512); + return dht_send(buf, i, 0, sa, salen); + + fail: + errno = ENOSPC; + return -1; +} + +#undef CHECK +#undef INC +#undef COPY +#undef ADD_V + +#ifdef HAVE_MEMMEM + +static void * +dht_memmem(const void *haystack, size_t haystacklen, + const void *needle, size_t needlelen) +{ + return memmem(haystack, haystacklen, needle, needlelen); +} + +#else + +static void * +dht_memmem(const void *haystack, size_t haystacklen, + const void *needle, size_t needlelen) +{ + const char *h = haystack; + const char *n = needle; + size_t i; + + /* size_t is unsigned */ + if(needlelen > haystacklen) + return NULL; + + for(i = 0; i <= haystacklen - needlelen; i++) { + if(memcmp(h + i, n, needlelen) == 0) + return (void*)(h + i); + } + return NULL; +} + +#endif + +static int +parse_message(const unsigned char *buf, int buflen, + struct parsed_message *m) +{ + const unsigned char *p; + + /* This code will happily crash if the buffer is not NUL-terminated. */ + if(buf[buflen] != '\0') { + debugf("Eek! parse_message with unterminated buffer.\n"); + return -1; + } + + +#define CHECK(ptr, len) \ + if(((unsigned char*)ptr) + (len) > (buf) + (buflen)) goto overflow; + + p = dht_memmem(buf, buflen, "1:t", 3); + if(p) { + long l; + char *q; + l = strtol((char*)p + 3, &q, 10); + if(q && *q == ':' && l > 0 && l < PARSE_TID_LEN) { + CHECK(q + 1, l); + memcpy(m->tid, q + 1, l); + m->tid_len = l; + } + } + + p = dht_memmem(buf, buflen, "2:id20:", 7); + if(p) { + CHECK(p + 7, 20); + memcpy(m->id, p + 7, 20); + } + + p = dht_memmem(buf, buflen, "9:info_hash20:", 14); + if(p) { + CHECK(p + 14, 20); + memcpy(m->info_hash, p + 14, 20); + } + + p = dht_memmem(buf, buflen, "4:porti", 7); + if(p) { + long l; + char *q; + l = strtol((char*)p + 7, &q, 10); + if(q && *q == 'e' && l > 0 && l < 0x10000) + m->port = l; + } + + p = dht_memmem(buf, buflen, "12:implied_porti", 16); + if(p) { + long l; + char *q; + l = strtol((char*)p + 16, &q, 10); + if(q && *q == 'e' && l > 0 && l < 0x10000) + m->implied_port = l; + } + + p = dht_memmem(buf, buflen, "6:target20:", 11); + if(p) { + CHECK(p + 11, 20); + memcpy(m->target, p + 11, 20); + } + + p = dht_memmem(buf, buflen, "5:token", 7); + if(p) { + long l; + char *q; + l = strtol((char*)p + 7, &q, 10); + if(q && *q == ':' && l > 0 && l < PARSE_TOKEN_LEN) { + CHECK(q + 1, l); + memcpy(m->token, q + 1, l); + m->token_len = l; + } + } + + p = dht_memmem(buf, buflen, "5:nodes", 7); + if(p) { + long l; + char *q; + l = strtol((char*)p + 7, &q, 10); + if(q && *q == ':' && l > 0 && l <= PARSE_NODES_LEN) { + CHECK(q + 1, l); + memcpy(m->nodes, q + 1, l); + m->nodes_len = l; + } + } + + p = dht_memmem(buf, buflen, "6:nodes6", 8); + if(p) { + long l; + char *q; + l = strtol((char*)p + 8, &q, 10); + if(q && *q == ':' && l > 0 && l <= PARSE_NODES6_LEN) { + CHECK(q + 1, l); + memcpy(m->nodes6, q + 1, l); + m->nodes6_len = l; + } + } + + p = dht_memmem(buf, buflen, "6:valuesl", 9); + if(p) { + int i = p - buf + 9; + int j = 0, j6 = 0; + while(1) { + long l; + char *q; + l = strtol((char*)buf + i, &q, 10); + if(q && *q == ':' && l > 0) { + CHECK(q + 1, l); + i = q + 1 + l - (char*)buf; + if(l == 6) { + if(j + l > PARSE_VALUES_LEN) + continue; + memcpy((char*)m->values + j, q + 1, l); + j += l; + } else if(l == 18) { + if(j6 + l > PARSE_VALUES6_LEN) + continue; + memcpy((char*)m->values6 + j6, q + 1, l); + j6 += l; + } else { + debugf("Received weird value -- %d bytes.\n", (int)l); + } + } else { + break; + } + } + if(i >= buflen || buf[i] != 'e') + debugf("eek... unexpected end for values.\n"); + m->values_len = j; + m->values6_len = j6; + } + + p = dht_memmem(buf, buflen, "4:wantl", 7); + if(p) { + int i = p - buf + 7; + m->want = 0; + while(buf[i] > '0' && buf[i] <= '9' && buf[i + 1] == ':' && + i + 2 + buf[i] - '0' < buflen) { + CHECK(buf + i + 2, buf[i] - '0'); + if(buf[i] == '2' && memcmp(buf + i + 2, "n4", 2) == 0) + m->want |= WANT4; + else if(buf[i] == '2' && memcmp(buf + i + 2, "n6", 2) == 0) + m->want |= WANT6; + else + debugf("eek... unexpected want flag (%c)\n", buf[i]); + i += 2 + buf[i] - '0'; + } + if(i >= buflen || buf[i] != 'e') + debugf("eek... unexpected end for want.\n"); + } + +#undef CHECK + + if(dht_memmem(buf, buflen, "1:y1:r", 6)) + return REPLY; + if(dht_memmem(buf, buflen, "1:y1:e", 6)) + return ERROR; + if(!dht_memmem(buf, buflen, "1:y1:q", 6)) + return -1; + if(dht_memmem(buf, buflen, "1:q4:ping", 9)) + return PING; + if(dht_memmem(buf, buflen, "1:q9:find_node", 14)) + return FIND_NODE; + if(dht_memmem(buf, buflen, "1:q9:get_peers", 14)) + return GET_PEERS; + if(dht_memmem(buf, buflen, "1:q13:announce_peer", 19)) + return ANNOUNCE_PEER; + return -1; + + overflow: + debugf("Truncated message.\n"); + return -1; +} diff --git a/dht.h b/dht.h new file mode 100644 index 0000000..31d7c46 --- /dev/null +++ b/dht.h @@ -0,0 +1,68 @@ +/* +Copyright (c) 2009-2011 by Juliusz Chroboczek + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void +dht_callback_t(void *closure, int event, + const unsigned char *info_hash, + const void *data, size_t data_len); + +#define DHT_EVENT_NONE 0 +#define DHT_EVENT_VALUES 1 +#define DHT_EVENT_VALUES6 2 +#define DHT_EVENT_SEARCH_DONE 3 +#define DHT_EVENT_SEARCH_DONE6 4 + +extern FILE *dht_debug; + +int dht_init(int s, int s6, const unsigned char *id, const unsigned char *v); +int dht_insert_node(const unsigned char *id, struct sockaddr *sa, int salen); +int dht_ping_node(const struct sockaddr *sa, int salen); +int dht_periodic(const void *buf, size_t buflen, + const struct sockaddr *from, int fromlen, time_t *tosleep, + dht_callback_t *callback, void *closure); +int dht_search(const unsigned char *id, int port, int af, + dht_callback_t *callback, void *closure); +int dht_nodes(int af, + int *good_return, int *dubious_return, int *cached_return, + int *incoming_return); +void dht_dump_tables(FILE *f); +int dht_get_nodes(struct sockaddr_in *sin, int *num, + struct sockaddr_in6 *sin6, int *num6); +int dht_uninit(void); + +/* This must be provided by the user. */ +int dht_sendto(int sockfd, const void *buf, int len, int flags, + const struct sockaddr *to, int tolen); +int dht_blacklisted(const struct sockaddr *sa, int salen); +void dht_hash(void *hash_return, int hash_size, + const void *v1, int len1, + const void *v2, int len2, + const void *v3, int len3); +int dht_random_bytes(void *buf, size_t size); + +#ifdef __cplusplus +} +#endif diff --git a/udht-ubus.c b/udht-ubus.c new file mode 100644 index 0000000..2f0eab7 --- /dev/null +++ b/udht-ubus.c @@ -0,0 +1,143 @@ +#include +#include "udht.h" +#include "curve25519.h" + +static struct ubus_auto_conn conn; +static struct blob_buf b; + +static void +udht_ubus_network_add(struct blob_attr *data, int seq) +{ + static const struct blobmsg_policy net_policy = + { "config", BLOBMSG_TYPE_TABLE }; + enum { + CONFIG_ATTR_AUTH_KEY, + CONFIG_ATTR_DHT, + __CONFIG_ATTR_MAX + }; + static const struct blobmsg_policy policy[__CONFIG_ATTR_MAX] = { + [CONFIG_ATTR_AUTH_KEY] = { "auth_key", BLOBMSG_TYPE_STRING }, + [CONFIG_ATTR_DHT] = { "dht", BLOBMSG_TYPE_BOOL }, + }; + struct blob_attr *tb[__CONFIG_ATTR_MAX]; + struct blob_attr *config, *cur; + uint8_t auth_key[CURVE25519_KEY_SIZE]; + + blobmsg_parse(&net_policy, 1, &config, blobmsg_data(data), blobmsg_len(data)); + + if (!config) + return; + + blobmsg_parse(policy, __CONFIG_ATTR_MAX, tb, blobmsg_data(config), blobmsg_len(config)); + if ((cur = tb[CONFIG_ATTR_DHT]) == NULL || !blobmsg_get_u8(cur)) + return; + + if ((cur = tb[CONFIG_ATTR_AUTH_KEY]) == NULL || + (b64_decode(blobmsg_get_string(cur), auth_key, CURVE25519_KEY_SIZE) != + CURVE25519_KEY_SIZE)) + return; + + udht_network_add(auth_key, seq); +} + + +static void +udht_ubus_network_cb(struct ubus_request *req, int type, + struct blob_attr *msg) +{ + static const struct blobmsg_policy policy = + { "networks", BLOBMSG_TYPE_TABLE }; + struct blob_attr *networks, *cur; + int *seq = req->priv; + int rem; + + blobmsg_parse(&policy, 1, &networks, blobmsg_data(msg), blobmsg_len(msg)); + + if (!networks) + return; + + blobmsg_for_each_attr(cur, networks, rem) + udht_ubus_network_add(cur, *seq); +} + +static void +udht_ubus_update_networks(struct ubus_context *ctx) +{ + static int seq; + uint32_t id; + + seq++; + + if (ubus_lookup_id(ctx, "unetd", &id) == 0) + ubus_invoke(ctx, id, "network_get", b.head, udht_ubus_network_cb, &seq, 5000); + + udht_network_flush(seq); +} + +static int +udht_ubus_unetd_cb(struct ubus_context *ctx, struct ubus_object *obj, + struct ubus_request_data *req, const char *method, + struct blob_attr *msg) +{ + udht_ubus_update_networks(ctx); + + return 0; +} + +static void +udht_subscribe_unetd(struct ubus_context *ctx) +{ + static struct ubus_subscriber sub = { + .cb = udht_ubus_unetd_cb + }; + uint32_t id; + + if (!sub.obj.id && ubus_register_subscriber(ctx, &sub)) + return; + + if (ubus_lookup_id(ctx, "unetd", &id)) + return; + + ubus_subscribe(ctx, &sub, id); + + /* ensure that unetd's socket is ready by testing if it's reachable over ubus */ + if (ubus_invoke(ctx, id, "network_get", b.head, NULL, NULL, 10000)) + return; + + udht_reconnect(); + udht_ubus_update_networks(ctx); +} + +static void +udht_ubus_event_cb(struct ubus_context *ctx, struct ubus_event_handler *ev, + const char *type, struct blob_attr *msg) +{ + static const struct blobmsg_policy policy = + { "path", BLOBMSG_TYPE_STRING }; + struct blob_attr *attr; + + blobmsg_parse(&policy, 1, &attr, blobmsg_data(msg), blobmsg_len(msg)); + if (!attr) + return; + + if (!strcmp(blobmsg_get_string(attr), "unetd")) + udht_subscribe_unetd(ctx); +} + +static void +ubus_connect_handler(struct ubus_context *ctx) +{ + static struct ubus_event_handler ev = { + .cb = udht_ubus_event_cb, + }; + + ubus_register_event_handler(ctx, &ev, "ubus.object.add"); + udht_subscribe_unetd(ctx); +} + +void udht_ubus_init(void) +{ + blob_buf_init(&b, 0); + conn.cb = ubus_connect_handler; + ubus_auto_connect(&conn); +} diff --git a/udht.c b/udht.c new file mode 100644 index 0000000..b29d9aa --- /dev/null +++ b/udht.c @@ -0,0 +1,689 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "curve25519.h" +#include "siphash.h" +#include "sha512.h" +#include "dht.h" +#include "udht.h" +#include "pex-msg.h" + +static struct uloop_timeout periodic_timer, peer_timer, status_timer, disconnect_timer; +static struct uloop_fd dht_fd; +static int dht_unix_fd; +static LIST_HEAD(bootstrap_peers); +static LIST_HEAD(networks); +static struct blob_buf b; +static uint8_t local_id[20]; +static const char *node_file; +static const char *unix_path; +static bool udht_connected; + +static struct { + unsigned int tick; + unsigned int peer_count; + bool bootstrap_added; + bool dht_ready; +} state; + +struct network_entry { + struct list_head list; + uint8_t auth_key[CURVE25519_KEY_SIZE]; + uint8_t id[20]; + struct uloop_timeout search_timer; + int search_count; + int seq; +}; + +struct peer_entry { + struct list_head list; + + struct sockaddr_storage sa; + int sa_len; +}; + +void dht_hash(void *hash_return, int hash_size, + const void *v1, int len1, + const void *v2, int len2, + const void *v3, int len3) +{ + siphash_key_t key = {}; + + if (hash_size != 8) + abort(); + + key.key[0] = siphash(v1, len1, &key); + key.key[1] = siphash(v2, len2, &key); + siphash_to_le64(hash_return, v3, len3, &key); +} + +int dht_sendto(int sockfd, const void *buf, int len, int flags, + const struct sockaddr *to, int tolen) +{ + struct iovec iov[2] = { + { .iov_base = (void *)to }, + { .iov_base = (void *)buf, .iov_len = len }, + }; + struct msghdr msg = { + .msg_iov = iov, + .msg_iovlen = ARRAY_SIZE(iov), + }; + int ret; + + if (to->sa_family == AF_INET) + iov[0].iov_len = sizeof(struct sockaddr_in); + else if (to->sa_family == AF_INET6) + iov[0].iov_len = sizeof(struct sockaddr_in6); + else + return -1; + + ret = sendmsg(sockfd, &msg, flags); + if (ret < 0) { + perror("send"); + if (errno == ECONNRESET || errno == EDESTADDRREQ || + errno == ENOTCONN || errno == ECONNREFUSED) + uloop_timeout_set(&disconnect_timer, 1); + } + return ret; +} + +int dht_blacklisted(const struct sockaddr *sa, int salen) +{ + return 0; +} + +int dht_random_bytes(void *buf, size_t size) +{ + int fd, rc, save; + + fd = open("/dev/urandom", O_RDONLY); + if(fd < 0) + return -1; + + rc = read(fd, buf, size); + + save = errno; + close(fd); + errno = save; + + return rc; +} + +static void +udht_start_search(void) +{ + struct network_entry *n; + + if (!state.dht_ready) + return; + + list_for_each_entry(n, &networks, list) { + if (n->search_timer.pending) + continue; + + uloop_timeout_set(&n->search_timer, 1); + } +} + +static void +udht_send_v4_node(const void *id, const void *data) +{ + struct network_entry *n; + + struct { + struct sockaddr sa; + struct pex_msg_local_control local; + } msg = { + .sa = { + .sa_family = AF_LOCAL + }, + .local = { + .ep.in = { + .sin_family = AF_INET, + .sin_addr = *(const struct in_addr *)data, + .sin_port = *(const uint16_t *)(data + 4), + }, + .timeout = 15 * 60, + } + }; + + list_for_each_entry(n, &networks, list) { + if (memcmp(n->id, id, sizeof(n->id)) != 0) + continue; + + memcpy(&msg.local.auth_id, n->auth_key, sizeof(msg.local.auth_id)); + goto found; + } + +found: + send(dht_unix_fd, &msg, sizeof(msg), 0); +} + +static void +udht_cb(void *closure, int event, const unsigned char *info_hash, + const void *data, size_t data_len) +{ + char addrbuf[INET6_ADDRSTRLEN]; + int i; + + if (!udht_connected) + return; + + if (event == DHT_EVENT_SEARCH_DONE) { + printf("Search done.\n"); + udht_start_search(); + } else if (event == DHT_EVENT_SEARCH_DONE6) { + printf("IPv6 search done.\n"); + } else if (event == DHT_EVENT_VALUES) { + printf("Received %d values.\n", (int)(data_len / 6)); + for (i = 0; i < data_len / 6; i++) { + fprintf(stderr, "Node: %s:%d\n", inet_ntop(AF_INET, data, addrbuf, sizeof(addrbuf)), ntohs(*(uint16_t *)(data + 4))); + udht_send_v4_node(info_hash, data); + data += 6; + } + } + else if (event == DHT_EVENT_VALUES6) + printf("Received %d IPv6 values.\n", (int)(data_len / 18)); + else + printf("Unknown DHT event %d.\n", event); +} + +static void +udht_search_timer_cb(struct uloop_timeout *t) +{ + struct network_entry *n = container_of(t, struct network_entry, search_timer); + char id_str[42]; + int i; + + for (i = 0; i < sizeof(n->id); i++) + snprintf(&id_str[i * 2], sizeof(id_str) - i * 2, "%02x", n->id[i]); + + fprintf(stderr, "Start search for network, id=%s\n", id_str); + dht_search(n->id, UNETD_GLOBAL_PEX_PORT, AF_INET, udht_cb, NULL); + + if (++n->search_count > 2) + uloop_timeout_set(&n->search_timer, 30 * 1000); +} + +static void +udht_timer_cb(struct uloop_timeout *t) +{ + time_t tosleep = 1; + + dht_periodic(NULL, 0, NULL, 0, &tosleep, udht_cb, NULL); + if (!tosleep) + tosleep = 1; + uloop_timeout_set(t, tosleep * 1000); +} + +static void +udht_fd_cb(struct uloop_fd *fd, unsigned int events) +{ + static char buf[4096]; + struct sockaddr *sa = (struct sockaddr *)buf; + time_t tosleep = 1; + int len; + + while (1) { + socklen_t fromlen; + + len = recv(fd->fd, buf, sizeof(buf) - 1, 0); + if (len < 0) { + if (errno == EINTR) + continue; + + if (errno == EAGAIN) + break; + + perror("recvfrom"); + uloop_timeout_set(&disconnect_timer, 1); + return; + } + + if (len <= sizeof(struct sockaddr)) + continue; + + if (sa->sa_family == AF_INET) + fromlen = sizeof(struct sockaddr_in); + else if (sa->sa_family == AF_INET6) + fromlen = sizeof(struct sockaddr_in6); + else + continue; + + if (len <= fromlen) + continue; + + buf[len] = 0; + dht_periodic(buf + fromlen, len - fromlen, sa, fromlen, + &tosleep, udht_cb, NULL); + if (!tosleep) + tosleep = 1; + uloop_timeout_set(&periodic_timer, tosleep * 1000); + } +} + +static int +udht_open_socket(const char *unix_path) +{ + uint8_t fd_buf[CMSG_SPACE(sizeof(int))] = { 0 }; + static struct sockaddr sa = { + .sa_family = AF_LOCAL, + }; + static struct iovec iov = { + .iov_base = &sa, + .iov_len = sizeof(sa), + }; + struct msghdr msg = { + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = fd_buf, + .msg_controllen = CMSG_LEN(sizeof(int)), + }; + struct cmsghdr *cmsg; + int sfd[2]; + int fd; + + fd = usock(USOCK_UNIX | USOCK_UDP, unix_path, NULL); + if (fd < 0) + return -1; + + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sfd) < 0) + close(fd); + + dht_unix_fd = fd; + dht_fd.fd = sfd[1]; + dht_fd.cb = udht_fd_cb; + uloop_fd_add(&dht_fd, ULOOP_READ); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + *(int *)CMSG_DATA(cmsg) = sfd[0]; + + sendmsg(dht_unix_fd, &msg, 0); + close(sfd[0]); + + return 0; +} + +static void +udht_close_socket(void) +{ + uloop_fd_delete(&dht_fd); + close(dht_fd.fd); + close(dht_unix_fd); +} + +static void udht_id_hash(uint8_t *dest, const void *data, int len) +{ + struct sha512_state s; + uint8_t hash[SHA512_HASH_SIZE]; + + sha512_init(&s); + sha512_add(&s, data, len); + sha512_final(&s, hash); + memcpy(dest, hash, 20); +} + +static void udht_add_peer(const void *data, int len) +{ + const struct sockaddr *sa = data; + struct peer_entry *p; + + p = calloc(1, sizeof(*p)); + memcpy(&p->sa, sa, len); + p->sa_len = len; + list_add_tail(&p->list, &bootstrap_peers); + + if (!peer_timer.pending) + uloop_timeout_set(&peer_timer, 1); +} + +static void udht_add_bootstrap_peer(void) +{ + const struct addrinfo hints = { + .ai_family = AF_INET, + .ai_socktype = SOCK_DGRAM, + .ai_flags = AI_ADDRCONFIG, + }; + struct addrinfo *res, *cur; + static const char * const bootstrap_hosts[] = { + "router.bittorrent.com", + "router.utorrent.com", + }; + int i; + + for (i = 0; i < ARRAY_SIZE(bootstrap_hosts); i++) { + if (getaddrinfo(bootstrap_hosts[i], "6881", &hints, &res) < 0) + continue; + + for (cur = res; cur; cur = cur->ai_next) + udht_add_peer(cur->ai_addr, cur->ai_addrlen); + + freeaddrinfo(res); + } + + state.bootstrap_added = true; +} + +static void udht_peer_timer_cb(struct uloop_timeout *t) +{ + struct peer_entry *p; + struct sockaddr_in *sin; + char buf[INET6_ADDRSTRLEN]; + + if (list_empty(&bootstrap_peers)) { + if (!state.peer_count && !state.bootstrap_added) + udht_add_bootstrap_peer(); + + return; + } + + p = list_first_entry(&bootstrap_peers, struct peer_entry, list); + list_del(&p->list); + sin = (struct sockaddr_in *)&p->sa; + fprintf(stderr, "Ping node %s\n", inet_ntop(sin->sin_family, &sin->sin_addr, buf, sizeof(buf))); + dht_ping_node((struct sockaddr *)&p->sa, p->sa_len); + free(p); + + if (state.peer_count++ < 8) + uloop_timeout_set(t, 2000); + else + uloop_timeout_set(t, 15000); +} + +void udht_network_add(const uint8_t *auth_key, int seq) +{ + struct network_entry *n; + + list_for_each_entry(n, &networks, list) { + if (memcmp(n->auth_key, auth_key, sizeof(n->auth_key)) != 0) + continue; + + goto out; + } + + n = calloc(1, sizeof(*n)); + n->search_timer.cb = udht_search_timer_cb; + memcpy(n->auth_key, auth_key, sizeof(n->auth_key)); + udht_id_hash(n->id, n->auth_key, sizeof(n->auth_key)); + list_add_tail(&n->list, &networks); + + if (state.dht_ready) + uloop_timeout_set(&n->search_timer, 1); + +out: + n->seq = seq; +} + +void udht_network_flush(int seq) +{ + struct network_entry *n, *tmp; + + list_for_each_entry_safe(n, tmp, &networks, list) { + if (seq >= 0 && (n->seq < 0 || n->seq == seq)) + continue; + + list_del(&n->list); + uloop_timeout_cancel(&n->search_timer); + free(n); + } +} + + +static void +udht_status_check(struct uloop_timeout *t) +{ + int good = 0, dubious = 0, incoming = 0; + static int prev_good, prev_dubious, prev_incoming; + + state.tick++; + uloop_timeout_set(t, 1000); + + dht_nodes(AF_INET, &good, &dubious, NULL, &incoming); + if (good != prev_good || dubious != prev_dubious || incoming != prev_incoming) + fprintf(stderr, "DHT status: good=%d, dubious=%d, incoming=%d\n", good, dubious, incoming); + + prev_good = good; + prev_dubious = dubious; + prev_incoming = incoming; + + if (state.dht_ready) + return; + + if (good < 4 || good + dubious < 8) { + if (state.tick > 45 && !state.bootstrap_added) + udht_add_bootstrap_peer(); + + return; + } + + state.dht_ready = true; + fprintf(stderr, "DHT is ready\n"); + udht_start_search(); +} + +static void +udht_load_nodes(const char *filename) +{ + struct blob_attr *data, *cur; + size_t len; + FILE *f; + int rem; + + f = fopen(filename, "r"); + if (!f) + return; + + data = malloc(sizeof(struct blob_attr)); + if (fread(data, sizeof(struct blob_attr), 1, f) != 1) + goto out; + + len = blob_pad_len(data); + if (len <= sizeof(struct blob_attr)) + goto out; + + if (len >= 256 * 1024) + goto out; + + data = realloc(data, len); + if (fread(data + 1, len - sizeof(struct blob_attr), 1, f) != 1) + goto out; + + blob_for_each_attr(cur, data, rem) { + void *entry = blob_data(cur); + + if (blob_len(cur) == 6) { + struct sockaddr_in sin = { + .sin_family = AF_INET, + .sin_addr = *(struct in_addr *)entry, + .sin_port = *(uint16_t *)(entry + 4), + }; + udht_add_peer(&sin, sizeof(sin)); + } else { + continue; + } + } + +out: + free(data); +} + +static void +udht_save_nodes(const char *filename) +{ + struct sockaddr_in sin[128]; + struct sockaddr_in6 sin6[128]; + int n_sin = ARRAY_SIZE(sin); + int n_sin6 = ARRAY_SIZE(sin6); + FILE *f; + int i; + + if (!filename) + return; + + if (dht_get_nodes(sin, &n_sin, sin6, &n_sin6) <= 0) + return; + + if (n_sin < 8) + return; + + blob_buf_init(&b, 0); + for (i = 0; i < n_sin; i++) { + struct { + struct in_addr addr; + uint16_t port; + } __attribute__((packed)) data = { + .addr = sin[i].sin_addr, + .port = sin[i].sin_port, + }; + blob_put(&b, 4, &data, sizeof(data)); + } + + f = fopen(filename, "w"); + if (!f) + return; + + fwrite(b.head, blob_pad_len(b.head), 1, f); + + fclose(f); +} + +static int usage(const char *progname) +{ + fprintf(stderr, "Usage: %s [] \n" + "Options:\n" + " -d Enable debug mode\n" + " -n Set node filename\n" + " -N Add network key\n" + "\n", + progname); + return 1; +} + +static void udht_disconnect(struct uloop_timeout *t) +{ + struct peer_entry *p, *tmp; + + if (!udht_connected) + return; + + list_for_each_entry_safe(p, tmp, &bootstrap_peers, list) { + list_del(&p->list); + free(p); + } + + uloop_timeout_cancel(&disconnect_timer); + udht_connected = false; + udht_network_flush(-1); + + uloop_timeout_cancel(&peer_timer); + uloop_timeout_cancel(&status_timer); + uloop_timeout_cancel(&periodic_timer); + + udht_save_nodes(node_file); + dht_uninit(); + + memset(&state, 0, sizeof(state)); + + udht_close_socket(); + +#ifndef UBUS_SUPPORT + uloop_end(); +#endif +} + +int udht_reconnect(void) +{ + udht_disconnect(&disconnect_timer); + + if (udht_open_socket(unix_path) < 0) + return -1; + + if (dht_init(dht_unix_fd, -1, local_id, NULL) < 0) { + udht_close_socket(); + return -1; + } + + udht_connected = true; + fprintf(stderr, "DHT connected\n"); + + udht_load_nodes(node_file); + + uloop_timeout_set(&peer_timer, 1); + uloop_timeout_set(&status_timer, 1000); + + return 0; +} + +int main(int argc, char **argv) +{ + const char *progname = argv[0]; + uint8_t auth_key[CURVE25519_KEY_SIZE]; + int ch; + + while ((ch = getopt(argc, argv, "dN:n:u:")) != -1) { + switch (ch) { + case 'N': + if (b64_decode(optarg, auth_key, CURVE25519_KEY_SIZE) != CURVE25519_KEY_SIZE) { + fprintf(stderr, "Invalid network key\n"); + return 1; + } + + udht_network_add(auth_key, -1); + break; + case 'n': + node_file = optarg; + break; + case 'd': + dht_debug = stderr; + break; + case 'u': + unix_path = optarg; + break; + default: + return usage(progname); + } + } + + argv += optind; + argc -= optind; + + if (argc != 1) + return usage(progname); + + udht_id_hash(local_id, argv[0], strlen(argv[0])); + + status_timer.cb = udht_status_check; + periodic_timer.cb = udht_timer_cb; + peer_timer.cb = udht_peer_timer_cb; + disconnect_timer.cb = udht_disconnect; + uloop_init(); + +#ifdef UBUS_SUPPORT + udht_ubus_init(); +#else + if (udht_reconnect() < 0) { + fprintf(stderr, "Failed to connect to unetd\n"); + return 1; + } +#endif + + uloop_run(); + uloop_done(); + + udht_disconnect(&disconnect_timer); + blob_buf_free(&b); + + return 0; +} diff --git a/udht.h b/udht.h new file mode 100644 index 0000000..c85a1e6 --- /dev/null +++ b/udht.h @@ -0,0 +1,17 @@ +#ifndef __UDHT_H +#define __UDHT_H + +int udht_reconnect(void); +void udht_network_add(const uint8_t *auth_key, int seq); +void udht_network_flush(int seq); + +#ifdef UBUS_SUPPORT +void udht_ubus_init(void); +#else +static inline void udht_ubus_init(void) +{ +} +#endif + + +#endif