typedef void* (*rpc_function)(); typedef struct rpc_call { char *method; rpc_function function; uint64_t function_hash; } rpc_call; #define RPC_SIGNATURE_i_iii UINT64_C(0x78409099752fa48a) #define RPC_SIGNATURE_i_ii UINT64_C(0x258290edf43985a5) #define RPC_SIGNATURE_i_s UINT64_C(0xf7b73162829ed667) #define RPC_SIGNATURE_s_s UINT64_C(0x97deedd17d9afb12) #define RPC_SIGNATURE_s_v UINT64_C(0x09c16a1242049b80) #define RPC_SIGNATURE_v_s UINT64_C(0xc1746990ab73ed24) static rpc_call rpc_new_call(const char *signature, rpc_function function) { if( signature && function ) { array(char*)tokens = strsplit(signature, "(,)"); if( array_count(tokens) >= 1 ) { char *method = strrchr(tokens[0], ' ')+1; char *rettype = va("%.*s", (int)(method - tokens[0] - 1), tokens[0]); int num_args = array_count(tokens) - 1; char* hash_sig = va("%s(%s)", rettype, num_args ? (array_pop_front(tokens), strjoin(tokens, ",")) : "void"); uint64_t hash = hash_str(hash_sig); method = va("%s%d", method, num_args ); #if RPC_DEBUG printf("%p %p %s `%s` %s(", function, (void*)hash, rettype, hash_sig, method); for(int i = 0, end = array_count(tokens); i < end; ++i) printf("%s%s", tokens[i], i == (end-1)? "":", "); puts(");"); #endif return (rpc_call) { STRDUP(method), function, hash }; // LEAK } } return (rpc_call) {0}; } static map(char*, rpc_call) rpc_calls = 0; static void rpc_insert(const char *signature, void *function ) { rpc_call call = rpc_new_call(signature, function); if( call.method ) { if( !rpc_calls ) map_init(rpc_calls, less_str, hash_str); if( map_find(rpc_calls, call.method)) { map_erase(rpc_calls, call.method); } map_insert(rpc_calls, call.method, call); } } static char *rpc_full(unsigned id, const char* method, unsigned num_args, char *args[]) { #if RPC_DEBUG printf("id:%x method:%s args:", id, method ); for( int i = 0; i < num_args; ++i ) printf("%s,", args[i]); puts(""); #endif method = va("%s%d", method, num_args); rpc_call *found = map_find(rpc_calls, (char*)method); if( found ) { switch(found->function_hash) { case RPC_SIGNATURE_i_iii: return va("%d %d", id, (int)(intptr_t)found->function(atoi(args[0]), atoi(args[1]), atoi(args[2])) ); case RPC_SIGNATURE_i_ii: return va("%d %d", id, (int)(intptr_t)found->function(atoi(args[0]), atoi(args[1])) ); case RPC_SIGNATURE_i_s: return va("%d %d", id, (int)(intptr_t)found->function(args[0]) ); case RPC_SIGNATURE_s_s: return va("%d %s", id, (char*)found->function(args[0]) ); case RPC_SIGNATURE_s_v: return va("%d %s", id, (char*)found->function() ); case RPC_SIGNATURE_v_s: return found->function(args[0]), va("%d", id); default: break; } } return va("%d -1", id); } static array(char*) rpc_parse_args( const char *cmdline, bool quote_whitespaces ) { // parse cmdline arguments. must array_free() after use // - supports quotes: "abc" "abc def" "abc \"def\"" "abc \"def\"""ghi" etc. // - #comments removed array(char*) args = 0; // LEAK for( int i = 0; cmdline[i]; ) { char buf[256] = {0}, *ptr = buf; while(cmdline[i] && isspace(cmdline[i])) ++i; bool quoted = cmdline[i] == '\"'; if( quoted ) { while(cmdline[++i]) { char ch = cmdline[i]; /**/ if (ch == '\\' && cmdline[i + 1] == '\"') *ptr++ = '\"', ++i; else if (ch == '\"' && cmdline[i + 1] == '\"') ++i; else if (ch == '\"' && (!cmdline[i + 1] || isspace(cmdline[i + 1]))) { ++i; break; } else *ptr++ = ch; } } else { while(cmdline[i] && !isspace(cmdline[i])) *ptr++ = cmdline[i++]; } if (buf[0] && buf[0] != '#') { // exclude empty args + comments if( quote_whitespaces && quoted ) array_push(args, va("\"%s\"",buf)); else array_push(args, va("%s",buf)); } } return args; } static char* rpc(unsigned id, const char* cmdline) { array(char*) args = rpc_parse_args(cmdline, false); int num_args = array_count(args); char *ret = num_args ? rpc_full(id, args[0], num_args - 1, &args[1]) : rpc_full(id, "", 0, NULL); array_free(args); return ret; } static void enet_quit(void) { do_once { // enet_deinitialize(); } } static void enet_init() { do_once { if( enet_initialize() != 0 ) { PANIC("cannot initialize enet"); } atexit( enet_quit ); } } struct peer_node_t { int64_t id; struct peer_node_t *next; }; static ENetHost *Server; static map(ENetPeer *, int64_t) clients; static map(int64_t, ENetPeer *) peers; static int64_t next_client_id = 1; // assumes ID 0 is server static struct peer_node_t *next_free_id = NULL; enum { MSG_INIT, MSG_BUF, MSG_RPC, MSG_RPC_RESP }; bool server_bind(int max_clients, int port) { map_init(clients, less_64, hash_64); map_init(peers, less_64, hash_64); assert(port == 0 || (port > 1024 && port < 65500)); ENetAddress address = {0}; address.host = ENET_HOST_ANY; address.port = port; Server = enet_host_create(&address, max_clients, 2 /*channels*/, 0 /*in bandwidth*/, 0 /*out bandwidth*/); return Server != NULL; } static void server_drop_client(int64_t handle) { map_erase(clients, *(ENetPeer **)map_find(peers, handle)); map_erase(peers, *(int64_t *)handle); } static void server_drop_client_peer(ENetPeer *peer) { struct peer_node_t *node = C_CAST(struct peer_node_t *, CALLOC(sizeof(struct peer_node_t), 1)); node->id = *(int64_t *)map_find(clients, peer); if (!next_free_id) { next_free_id = node; } else { node->next = next_free_id; next_free_id = node; } map_erase(peers, *(int64_t *)map_find(clients, peer)); map_erase(clients, peer); } void server_broadcast_bin_flags(const void *msg, int len, uint64_t flags) { ENetPacket *packet = enet_packet_create(msg, len, flags&NETWORK_UNRELIABLE ? ENET_PACKET_FLAG_UNRELIABLE_FRAGMENT : ENET_PACKET_FLAG_RELIABLE | flags&(NETWORK_UNRELIABLE|NETWORK_UNORDERED) ? ENET_PACKET_FLAG_UNSEQUENCED : 0); enet_host_broadcast(Server, 0, packet); } void server_broadcast_bin(const void *msg, int len) { ENetPacket *packet = enet_packet_create(msg, len, ENET_PACKET_FLAG_RELIABLE); enet_host_broadcast(Server, 0, packet); //enet_host_flush(Server); // flush if needed } void server_broadcast_flags(const char *msg, uint64_t flags) { server_broadcast_bin_flags(msg, strlen(msg)+1, flags); } void server_broadcast(const char *msg) { server_broadcast_bin(msg, strlen(msg)+1); } void server_terminate() { enet_host_destroy(Server); Server = 0; } volatile int client_join_connected = 0; static int client_join_threaded(void *userdata) { ENetHost *host = (ENetHost *)userdata; ENetPacket *packet = enet_packet_create("", 1, ENET_PACKET_FLAG_RELIABLE); enet_host_broadcast(Server, 0, packet); /* Wait up to 5 seconds for the connection attempt to succeed. */ ENetEvent event; client_join_connected = 0; client_join_connected = enet_host_service(host, &event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT; return 0; } int64_t client_join(const char *ip, int port) { assert(port > 1024 && port < 65500); ENetAddress address = {0}; enet_address_set_host(&address, !strcmp(ip, "localhost") ? "127.0.0.1" : ip); address.port = port; ENetHost *host = enet_host_create(NULL, 1 /*outgoing connections*/, 2 /*channels*/, 0 /*in bandwidth*/, 0 /*out bandwidth*/); if(!host) return -1; ENetPeer *peer = enet_host_connect(host, &address, 2, 0); if(!peer) return -1; Server = host; ENetEvent event; bool client_join_connected = enet_host_service(host, &event, 5000) > 0 && event.type == ENET_EVENT_TYPE_CONNECT; if(!client_join_connected) { enet_peer_reset(peer); return -1; } // wait for the response bool msg_received = enet_host_service(host, &event, 5000) > 0 && event.type == ENET_EVENT_TYPE_RECEIVE; if (!msg_received) { enet_peer_reset(peer); return -1; } char *ptr = (char *)event.packet->data; int64_t cid = -1; // decapsulate incoming packet. uint32_t mid = *(uint32_t*)(ptr + 0); ptr += 4; switch (mid) { case MSG_INIT: cid = *(int64_t*)ptr; break; default: enet_peer_reset(peer); enet_packet_destroy( event.packet ); return -1; } /* Clean up the packet now that we're done using it. */ enet_packet_destroy( event.packet ); return cid; } void server_drop(int64_t handle) { enet_peer_disconnect_now(*(ENetPeer **)map_find(peers, handle), 0); server_drop_client(handle); } void server_send_bin(int64_t handle, const void *ptr, int len) { ENetPacket *packet = enet_packet_create(ptr, len, ENET_PACKET_FLAG_RELIABLE); enet_peer_send(*(ENetPeer **)map_find(peers, handle), 0, packet); } void server_send(int64_t handle, const char *msg) { server_send_bin(handle, msg, strlen(msg)+1); } // --- typedef struct netbuffer_t { int64_t owner; void *ptr; unsigned sz; uint64_t flags; } netbuffer_t; static array(char*) events; // @todo: make event 128 bytes max? static array(int64_t) values; // @todo: map instead? static map( int64_t, array(netbuffer_t) ) buffers; // map> static double msg_send_cooldown = 0.0; static double network_dt = 0.0; static double last_netsync = 0.0; void network_create(unsigned max_clients, const char *ip, const char *port_, unsigned flags) { if (buffers) map_clear(buffers); do_once { array_resize(values, 128); map_init(buffers, less_64, hash_64); enet_init(); } ip = ip ? ip : "0.0.0.0"; int port = atoi(port_ ? port_ : "1234"); // network_put(NETWORK_IP, 0x7F000001); // 127.0.0.1 network_put(NETWORK_PORT, port); network_put(NETWORK_LIVE, -1); network_put(NETWORK_COUNT, 0); network_put(NETWORK_CAPACITY, max_clients); network_put(NETWORK_BUF_CLEAR_ON_JOIN, 1); if( !(flags&NETWORK_CONNECT) || flags&NETWORK_BIND ) { // server, else client PRINTF("Trying to bind server, else we connect as a client...\n"); network_put(NETWORK_RANK, 0); if( server_bind(max_clients, port) ) { network_put(NETWORK_LIVE, 1); PRINTF("Server bound\n"); } else { network_put(NETWORK_RANK, -1); /* unassigned until we connect successfully */ int64_t socket = client_join(ip, port); if( socket >= 0 ) { PRINTF("Client connected, id %lld\n", socket); network_put(NETWORK_LIVE, 1); network_put(NETWORK_RANK, socket); } else { PRINTF("!Client conn failed\n"); network_put(NETWORK_LIVE, 0); if (!(flags&NETWORK_NOFAIL)) PANIC("cannot neither connect to %s:%d, nor create a server", ip, port); } } } else { // client only PRINTF("Connecting to server...\n"); network_put(NETWORK_RANK, -1); /* unassigned until we connect successfully */ int64_t socket = client_join(ip, port); if( socket > 0 ) { PRINTF("Client connected, id %lld\n", socket); network_put(NETWORK_LIVE, 1); network_put(NETWORK_RANK, socket); } else { PRINTF("!Client conn failed\n"); network_put(NETWORK_LIVE, 0); if (!(flags&NETWORK_NOFAIL)) PANIC("cannot connect to server %s:%d", ip, port); } } PRINTF("Network rank:%lld ip:%s port:%lld\n", network_get(NETWORK_RANK), ip, network_get(NETWORK_PORT)); } int64_t network_put(uint64_t key, int64_t value) { int64_t *found = key < array_count(values) ? &values[key] : NULL; if(found) *found = value; return value; } int64_t network_get(uint64_t key) { int64_t *found = key < array_count(values) ? &values[key] : NULL; return found ? *found : 0; } void* network_buffer(void *ptr, unsigned sz, uint64_t flags, int64_t rank) { assert(flags); array(netbuffer_t) *found = map_find_or_add(buffers, rank, NULL); netbuffer_t nb; nb.owner = rank; nb.ptr = ptr; nb.sz = sz; nb.flags = flags; array_push(*found, nb); return ptr; } static int enet_event_to_netsync(int ev) { switch (ev) { case ENET_EVENT_TYPE_CONNECT: return NETWORK_EVENT_CONNECT; case ENET_EVENT_TYPE_DISCONNECT: return NETWORK_EVENT_DISCONNECT; case ENET_EVENT_TYPE_RECEIVE: return NETWORK_EVENT_RECEIVE; case ENET_EVENT_TYPE_DISCONNECT_TIMEOUT: return NETWORK_EVENT_DISCONNECT_TIMEOUT; } /* passthrough for our own events */ return ev; } char** network_sync(unsigned timeout_ms) { int64_t whoami = network_get(NETWORK_RANK); bool is_server = whoami == 0; bool is_client = !is_server; if(timeout_ms < 2) timeout_ms = 2; network_dt = time_ss() - last_netsync; last_netsync = time_ss(); // Split buffers into clients @todo // clients need to do this before network polling; servers should do this after polling. if (msg_send_cooldown <= 0.0) { map_foreach(buffers, int64_t, rank, array(netbuffer_t), list) { for(int i = 0, end = array_count(list); i < end; ++i) { netbuffer_t *nb = &list[i]; if (!is_server && !(nb->flags & NETWORK_SEND)) continue; static array(char) encapsulate; array_resize(encapsulate, nb->sz + 28); uint32_t *mid = (uint32_t*)&encapsulate[0]; *mid = MSG_BUF; uint64_t *st = (uint64_t*)&encapsulate[4]; *st = nb->flags; uint32_t *idx = (uint32_t*)&encapsulate[12]; *idx = i; uint32_t *len = (uint32_t*)&encapsulate[16]; *len = nb->sz; uint64_t *who = (uint64_t*)&encapsulate[20]; *who = nb->owner; // PRINTF("sending %llx %u %lld %u\n", *st, *idx, *who, *len); memcpy(&encapsulate[28], nb->ptr, nb->sz); server_broadcast_bin(&encapsulate[0], nb->sz + 28); } } msg_send_cooldown = (double)network_get(NETWORK_SEND_MS)/1000.0; } else { msg_send_cooldown -= network_dt; } if (is_server) { return server_poll(timeout_ms); } else { return client_poll(timeout_ms); } } char** server_poll(unsigned timeout_ms) { array_clear(events); if(timeout_ms < 2) timeout_ms = 2; // network poll for( ENetEvent event; Server && enet_host_service(Server, &event, timeout_ms) > 0; ) { char *msg = 0; char ip[128]; enet_peer_get_ip(event.peer, ip, 128); switch (event.type) { default: // case ENET_EVENT_TYPE_NONE: break; case ENET_EVENT_TYPE_CONNECT:; /* ensure we have free slot for client */ if (map_count(clients) >= network_get(NETWORK_CAPACITY)-1) { msg = va("%d Server is at maximum capacity, disconnecting the peer (::%s:%u)...", 1, ip, event.peer->address.port); enet_peer_disconnect_now(event.peer, 1); break; } int64_t client_id = -1; if (next_free_id) { struct peer_node_t *node = next_free_id; client_id = next_free_id->id; next_free_id = next_free_id->next; FREE(node); } else client_id = next_client_id++; // if (network_get(NETWORK_BUF_CLEAR_ON_JOIN)) { // array(netbuffer_t) *list = map_find(buffers, client_id); // if (list) // for(int i = 0, end = array_count(list); i < end; ++i) { // netbuffer_t *nb = &list[i]; // memset(nb->ptr, 0, nb->sz); // } // } map_find_or_add(clients, event.peer, client_id); map_find_or_add(peers, client_id, event.peer); network_put(NETWORK_COUNT, network_get(NETWORK_COUNT)+1); // send server slot char init_msg[12]; *(uint32_t*)&init_msg[0] = MSG_INIT; *(int64_t*)&init_msg[4] = client_id; server_send_bin(client_id, init_msg, 12); PRINTF("Client rank %lld for peer ::%s:%u\n", client_id, ip, event.peer->address.port); msg = va( "%d new client rank:%lld from ::%s:%u", 0, client_id, ip, event.peer->address.port ); event.peer->data = (void*)client_id; break; case ENET_EVENT_TYPE_RECEIVE: { char *dbg = (char *)event.peer->data; char *ptr = (char *)event.packet->data; unsigned sz = (unsigned)event.packet->dataLength; unsigned id = (unsigned)event.channelID; // debug // puts(dbg); // hexdump(ptr, sz); // decapsulate incoming packet. uint32_t mid = *(uint32_t*)(ptr + 0); ptr += 4; switch (mid) { case MSG_BUF: { uint64_t *flags = (uint64_t*)(ptr + 0); uint32_t *idx = (uint32_t*)(ptr + 8); uint32_t *len = (uint32_t*)(ptr + 12); uint64_t *who = (uint64_t*)(ptr + 16); ptr += 24; // validate if peer owns the buffer int64_t *cid = map_find(clients, event.peer); uint8_t client_valid = cid ? *cid == *who : 0; // apply incoming packet. if( client_valid ) { array(netbuffer_t) *list = map_find(buffers, *who); assert( list ); assert( *idx < array_count(*list) ); netbuffer_t *nb = &(*list)[*idx]; assert( *len == nb->sz ); memcpy(nb->ptr, ptr, *len); } } break; case MSG_RPC: { event.type = NETWORK_EVENT_RPC; unsigned id = *(uint32_t*)ptr; ptr += 4; char *cmdline = ptr; char *resp = rpc(id, cmdline); char *resp_msg = va("%*.s%s", 4, "", resp); *(uint32_t*)&resp_msg[0] = MSG_RPC_RESP; ENetPacket *packet = enet_packet_create(resp_msg, strlen(resp) + 5, ENET_PACKET_FLAG_RELIABLE); enet_peer_send(event.peer, 0, packet); msg = va("%d req:%s res:%s", 0, cmdline, resp); } break; case MSG_RPC_RESP: { event.type = NETWORK_EVENT_RPC_RESP; msg = va("%d %s", 0, va("%s", ptr)); } break; default: msg = va("%d unk msg len:%u from rank:%lld ::%s:%u", -1, sz, (uint64_t)event.peer->data, ip, event.peer->address.port); /* @TODO: hexdump? */ break; } /* Clean up the packet now that we're done using it. */ enet_packet_destroy( event.packet ); } break; case ENET_EVENT_TYPE_DISCONNECT: msg = va( "%d disconnect rank:%lld", 0, (uint64_t)event.peer->data); /* Reset the peer's client information. */ FREE(event.peer->data); event.peer->data = NULL; server_drop_client_peer(event.peer); network_put(NETWORK_COUNT, network_get(NETWORK_COUNT)-1); break; case ENET_EVENT_TYPE_DISCONNECT_TIMEOUT: msg = va( "%d timeout rank:%lld", 0, (uint64_t)event.peer->data); FREE(event.peer->data); event.peer->data = NULL; server_drop_client_peer(event.peer); network_put(NETWORK_COUNT, network_get(NETWORK_COUNT)-1); break; } if(msg) array_push(events, va("%d %s", enet_event_to_netsync(event.type), msg)); } array_push(events, NULL); return events; } char** client_poll(unsigned timeout_ms) { array_clear(events); int64_t whoami = network_get(NETWORK_RANK); if(timeout_ms < 2) timeout_ms = 2; // network poll for( ENetEvent event; Server && enet_host_service(Server, &event, timeout_ms) > 0; ) { char *msg = 0; char ip[128]; enet_peer_get_ip(event.peer, ip, 128); switch (event.type) { default: // case ENET_EVENT_TYPE_NONE: break; case ENET_EVENT_TYPE_CONNECT: break; case ENET_EVENT_TYPE_RECEIVE: { char *dbg = (char *)event.peer->data; char *ptr = (char *)event.packet->data; unsigned sz = (unsigned)event.packet->dataLength; unsigned id = (unsigned)event.channelID; // decapsulate incoming packet. uint32_t mid = *(uint32_t*)(ptr + 0); ptr += 4; switch (mid) { case MSG_INIT: /* handled by client_join */ break; case MSG_BUF: { uint64_t *flags = (uint64_t*)(ptr + 0); uint32_t *idx = (uint32_t*)(ptr + 8); uint32_t *len = (uint32_t*)(ptr + 12); uint64_t *who = (uint64_t*)(ptr + 16); ptr += 24; // apply incoming packet. if( *who != whoami ) { array(netbuffer_t) *list = map_find(buffers, *who); assert( list ); assert( *idx < array_count(*list) ); netbuffer_t *nb = &(*list)[*idx]; assert( *len == nb->sz ); memcpy(nb->ptr, ptr, *len); } } break; case MSG_RPC: { event.type = NETWORK_EVENT_RPC; unsigned id = *(uint32_t*)ptr; ptr += 4; char *cmdline = ptr; char *resp = rpc(id, cmdline); char *resp_msg = va("%*.s%s", 4, "", resp); *(uint32_t*)&resp_msg[0] = MSG_RPC_RESP; ENetPacket *packet = enet_packet_create(resp_msg, strlen(resp) + 5, ENET_PACKET_FLAG_RELIABLE); enet_peer_send(event.peer, 0, packet); msg = va("%d req:%s res:%s", 0, cmdline, resp); } break; case MSG_RPC_RESP: { event.type = NETWORK_EVENT_RPC_RESP; msg = va("%d %s", 0, ptr); } break; default: msg = va("%d unk msg len:%u from server", -1, sz); /* @TODO: hexdump? */ break; } /* Clean up the packet now that we're done using it. */ enet_packet_destroy( event.packet ); } break; case ENET_EVENT_TYPE_DISCONNECT: msg = va( "%d disconnect", 0 ); /* Reset the peer's client information. */ FREE(event.peer->data); event.peer->data = NULL; network_put(NETWORK_RANK, -1); network_put(NETWORK_LIVE, 0); break; case ENET_EVENT_TYPE_DISCONNECT_TIMEOUT: msg = va( "%d timeout", 0); FREE(event.peer->data); event.peer->data = NULL; network_put(NETWORK_RANK, -1); network_put(NETWORK_LIVE, 0); break; } if(msg) array_push(events, va("%d %s", enet_event_to_netsync(event.type), msg)); } array_push(events, NULL); return events; } int network_event(const char *msg, int *errcode, char **errstr) { int evid = -1; int err = 0; char errbuf[128] = {0}; sscanf(msg, "%d %d %127[^\r\n]", &evid, &err, errbuf); if (errcode) *errcode = err; if (errstr) *errstr = va("%s", errbuf); return evid; } void network_rpc(const char *signature, void *function) { rpc_insert(signature, function); } void network_rpc_send_to(int64_t rank, unsigned id, const char *cmdline) { assert(network_get(NETWORK_RANK) == 0); /* must be a host */ char *msg = va("%*.s%s", 8, "", cmdline); unsigned sz = strlen(cmdline) + 9; *(uint32_t*)&msg[0] = MSG_RPC; *(uint32_t*)&msg[4] = id; server_send_bin(rank, msg, sz); } void network_rpc_send(unsigned id, const char *cmdline) { char *msg = va("%*.s%s", 8, "", cmdline); unsigned sz = strlen(cmdline) + 9; *(uint32_t*)&msg[0] = MSG_RPC; *(uint32_t*)&msg[4] = id; server_broadcast_bin(msg, sz); }