code: added enet server loop + small file reorg on server

isolation_bkp/dynres
Vladyslav Hrytsenko 2021-01-17 09:03:06 +02:00
parent 6345eb5fa6
commit eafc2af26e
18 changed files with 25634 additions and 15735 deletions

View File

@ -1,17 +1,17 @@
add_executable(eco2d-server
source/main.c
source/network.c
source/perlin.c
source/options.c
source/world.c
source/blocks.c
source/utils/options.c
source/world/perlin.c
source/world/world.c
source/world/blocks.c
header/network.h
header/perlin.h
header/options.h
header/world.h
header/blocks.h
header/blocks_info.h
header/utils/options.h
header/world/perlin.h
header/world/world.h
header/world/blocks.h
header/world/blocks_info.h
)
include_directories(eco2d-server header)

View File

@ -1 +1,9 @@
#pragma once
int32_t network_init(void);
int32_t network_destroy(void);
int32_t network_server_start(const char *host, uint16_t port);
int32_t network_server_stop(void);
int32_t network_server_tick(void);
void network_server_update(void *data);

View File

@ -1,16 +1,20 @@
#define ZPL_IMPL
#include "zpl.h"
#define LIBRG_IMPL
#define LIBRG_CUSTOM_ZPL
#include "librg.h"
#include "system.h"
#include "options.h"
#include "network.h"
#include "utils/options.h"
#define DEFAULT_WORLD_SEED 302097
#define DEFAULT_WORLD_DIMS 32
#define IF(call) do { \
if (call != 0) { \
zpl_printf("[ERROR] A call to a function %s failed\n", #call); \
return 1; \
} \
} while (0)
int main(int argc, char** argv) {
zpl_opts opts={0};
zpl_opts_init(&opts, zpl_heap(), argv[0]);
@ -42,6 +46,15 @@ int main(int argc, char** argv) {
return 0;
}
printf("hello world\n");
IF(network_init());
IF(network_server_start("0.0.0.0", 27000));
while (true) {
network_server_tick();
}
IF(network_server_stop());
IF(network_destroy());
return 0;
}

View File

@ -0,0 +1,155 @@
#include "zpl.h"
#define ENET_IMPLEMENTATION
#include "enet.h"
#define LIBRG_IMPL
#define LIBRG_CUSTOM_ZPL
#include "librg.h"
#include "system.h"
#include "network.h"
#define NETWORK_UPDATE_DELAY 0.100
#define NETWORK_MAX_CLIENTS 32
static ENetHost *server = NULL;
static librg_world *server_world = NULL;
static zpl_timer nettimer = {0};
int32_t network_init(void) {
zpl_timer_set(&nettimer, NETWORK_UPDATE_DELAY, -1, network_server_update);
return enet_initialize() != 0;
}
int32_t network_destroy(void) {
enet_deinitialize();
return 0;
}
int32_t network_server_start(const char *host, uint16_t port) {
zpl_unused(host);
zpl_timer_start(&nettimer, NETWORK_UPDATE_DELAY);
ENetAddress address = {0};
address.host = ENET_HOST_ANY; /* Bind the server to the default localhost. */
address.port = port; /* Bind the server to port. */
/* create a server */
server = enet_host_create(&address, NETWORK_MAX_CLIENTS, 2, 0, 0);
if (server == NULL) {
zpl_printf("[ERROR] An error occurred while trying to create an ENet server host.\n");
return 1;
}
zpl_printf("[INFO] Started an ENet server...\n");
server_world = librg_world_create();
if (server_world == NULL) {
zpl_printf("[ERROR] An error occurred while trying to create a server world.\n");
return 1;
}
zpl_printf("[INFO] Created a new server world\n");
/* store our host to the userdata */
librg_world_userdata_set(server_world, server);
/* config our world grid */
librg_config_chunksize_set(server_world, 16, 16, 16);
librg_config_chunkamount_set(server_world, 9, 9, 9);
librg_config_chunkoffset_set(server_world, LIBRG_OFFSET_MID, LIBRG_OFFSET_MID, LIBRG_OFFSET_MID);
// librg_event_set(server_world, LIBRG_WRITE_UPDATE, server_write_update);
// librg_event_set(server_world, LIBRG_READ_UPDATE, server_read_update);
return 0;
}
int32_t network_server_stop(void) {
zpl_timer_stop(&nettimer);
enet_host_destroy(server);
librg_world_destroy(server_world);
server_world = NULL;
server = NULL;
return 0;
}
int32_t network_server_tick(void) {
ENetEvent event = {0};
while (enet_host_service(server, &event, 1) > 0) {
switch (event.type) {
case ENET_EVENT_TYPE_CONNECT: {
zpl_printf("[INFO] A new user %d connected.\n", event.peer->incomingPeerID);
int64_t entity_id = event.peer->incomingPeerID;
// /* we create an entity for our client */
// /* in our case it is going to have same id as owner id */
// /* since we do not really plan on adding more entities per client for now */
// /* and place his entity right in the centerl of the world */
// librg_entity_track(server_world, entity_id);
// librg_entity_owner_set(server_world, entity_id, event.peer->incomingPeerID);
// librg_entity_chunk_set(server_world, entity_id, 1);
// librg_entity_radius_set(server_world, entity_id, 2); /* 2 chunk radius visibility */
// librg_entity_userdata_set(server_world, entity_id, event.peer); /* save ptr to peer */
} break;
case ENET_EVENT_TYPE_DISCONNECT:
case ENET_EVENT_TYPE_DISCONNECT_TIMEOUT: {
zpl_printf("[INFO] A user %d disconnected.\n", event.peer->incomingPeerID);
int64_t entity_id = event.peer->incomingPeerID;
// librg_entity_untrack(server_world, entity_id);
} break;
case ENET_EVENT_TYPE_RECEIVE: {
// /* handle a newly received event */
// librg_world_read(
// server_world,
// event.peer->incomingPeerID,
// (char *)event.packet->data,
// event.packet->dataLength,
// NULL
// );
/* Clean up the packet now that we're done using it. */
enet_packet_destroy(event.packet);
} break;
case ENET_EVENT_TYPE_NONE: break;
}
}
zpl_timer_update(&nettimer);
return 0;
}
void network_server_update(void *data) {
// /* iterate peers and send them updates */
// ENetPeer *currentPeer;
// for (currentPeer = server->peers; currentPeer < &server->peers[server->peerCount]; ++currentPeer) {
// if (currentPeer->state != ENET_PEER_STATE_CONNECTED) {
// continue;
// }
// char buffer[1024] = {0};
// size_t buffer_length = 1024;
// /* serialize peer's the world view to a buffer */
// librg_world_write(
// server_world,
// currentPeer->incomingPeerID,
// buffer,
// &buffer_length,
// NULL
// );
// /* create packet with actual length, and send it */
// ENetPacket *packet = enet_packet_create(buffer, buffer_length, ENET_PACKET_FLAG_RELIABLE);
// enet_peer_send(currentPeer, 0, packet);
// }
}

View File

@ -1,8 +1,9 @@
#include "options.h"
#include "world.h"
#include "blocks.h"
#include <stdio.h>
#include "world/world.h"
#include "world/blocks.h"
#include "utils/options.h"
void generate_minimap(int32_t seed, int32_t world_size) {
world_init(seed, world_size, world_size);
uint8_t const *world;

View File

@ -1,5 +1,5 @@
#include "blocks.h"
#include "zpl.h"
#include "world/blocks.h"
// todo: csv parsing + utils

View File

@ -1,5 +1,5 @@
#include "blocks.h"
#include "atlas_shared.h"
#include "world/blocks.h"
static block blocks[] = {
{.tex_id = ATLAS_XY(0, 0), .name = "base-ground", .flags = 0, .kind = BLOCK_KIND_GROUND, .biome = 0, .symbol = '.'},

View File

@ -1,6 +1,7 @@
#include <math.h>
#include "world/perlin.h"
// adapted from: https://gist.github.com/nowl/828013#gistcomment-2807232
#include "perlin.h"
#include "math.h"
static const uint8_t PERLIN_PERM_TABLE[] = {
208,34,231,213,32,248,233,56,161,78,24,140,71,48,140,254,245,255,247,247,40,

View File

@ -1,5 +1,5 @@
#include "world.h"
#include "zpl.h"
#include "world/world.h"
typedef struct {
uint8_t *data;

View File

@ -1,11 +1,12 @@
#include "world.h"
#include "blocks.h"
#include "perlin.h"
#include "zpl.h"
#include <math.h>
#include <stdlib.h>
#include "world/world.h"
#include "world/blocks.h"
#include "world/perlin.h"
#define WORLD_BLOCK_OBSERVER(name) uint32_t name(uint32_t id, uint32_t block_idx)
typedef WORLD_BLOCK_OBSERVER(world_block_observer_proc);

5932
code/vendors/enet.h vendored 100644

File diff suppressed because it is too large Load Diff

7013
code/vendors/librg.h vendored

File diff suppressed because it is too large Load Diff

345
code/vendors/zpl.h vendored
View File

@ -27,6 +27,10 @@ GitHub:
https://github.com/zpl-c/zpl
Version History:
12.0.0 - New jobs system
11.0.0 - Rewrite the timer module
10.13.0 - Initial ARM threading support
10.12.1 - Fix missing zpL_alloc_str
10.12.0 - Add zpl_crc64
@ -4082,24 +4086,6 @@ ZPL_END_C_DECLS
//
// Instantiated Circular buffer
//
/*
int main()
{
zpl_ring_zpl_u32 pad={0};
zpl_ring_zpl_u32_init(&pad, zpl_heap(), 3);
zpl_ring_zpl_u32_append(&pad, 1);
zpl_ring_zpl_u32_append(&pad, 2);
zpl_ring_zpl_u32_append(&pad, 3);
while (!zpl_ring_zpl_u32_empty(&pad)) {
zpl_printf("Result is %d\n", *zpl_ring_zpl_u32_get(&pad));
}
zpl_ring_zpl_u32_free(&pad);
return 0;
}
*/
#ifdef ZPL_EDITOR
#include <zpl.h>
@ -4107,27 +4093,27 @@ ZPL_END_C_DECLS
ZPL_BEGIN_C_DECLS
#define ZPL_RING_DECLARE(type) \
#define ZPL_RING_DECLARE(prefix,type) \
typedef struct { \
zpl_allocator backing; \
zpl_buffer(type) buf; \
zpl_usize head, tail; \
zpl_usize capacity; \
} ZPL_JOIN2(zpl_ring_, type); \
} ZPL_JOIN2(prefix, type); \
\
ZPL_DEF void ZPL_JOIN3(zpl_ring_, type, _init)(ZPL_JOIN2(zpl_ring_, type) * pad, zpl_allocator a, zpl_isize max_size); \
ZPL_DEF void ZPL_JOIN3(zpl_ring_, type, _free)(ZPL_JOIN2(zpl_ring_, type) * pad); \
ZPL_DEF zpl_b32 ZPL_JOIN3(zpl_ring_, type, _full)(ZPL_JOIN2(zpl_ring_, type) * pad); \
ZPL_DEF zpl_b32 ZPL_JOIN3(zpl_ring_, type, _empty)(ZPL_JOIN2(zpl_ring_, type) * pad); \
ZPL_DEF void ZPL_JOIN3(zpl_ring_, type, _append)(ZPL_JOIN2(zpl_ring_, type) * pad, type data); \
ZPL_DEF void ZPL_JOIN3(zpl_ring_, type, _append_array)(ZPL_JOIN2(zpl_ring_, type) * pad, zpl_array(type) data); \
ZPL_DEF type *ZPL_JOIN3(zpl_ring_, type, _get)(ZPL_JOIN2(zpl_ring_, type) * pad); \
ZPL_DEF void ZPL_JOIN2(prefix, init)(ZPL_JOIN2(prefix, type) * pad, zpl_allocator a, zpl_isize max_size); \
ZPL_DEF void ZPL_JOIN2(prefix, free)(ZPL_JOIN2(prefix, type) * pad); \
ZPL_DEF zpl_b32 ZPL_JOIN2(prefix, full)(ZPL_JOIN2(prefix, type) * pad); \
ZPL_DEF zpl_b32 ZPL_JOIN2(prefix, empty)(ZPL_JOIN2(prefix, type) * pad); \
ZPL_DEF void ZPL_JOIN2(prefix, append)(ZPL_JOIN2(prefix, type) * pad, type data); \
ZPL_DEF void ZPL_JOIN2(prefix, append_array)(ZPL_JOIN2(prefix, type) * pad, zpl_array(type) data); \
ZPL_DEF type *ZPL_JOIN2(prefix, get)(ZPL_JOIN2(prefix, type) * pad); \
ZPL_DEF zpl_array(type) \
ZPL_JOIN3(zpl_ring_, type, _get_array)(ZPL_JOIN2(zpl_ring_, type) * pad, zpl_usize max_size, zpl_allocator a);
ZPL_JOIN2(prefix, get_array)(ZPL_JOIN2(prefix, type) * pad, zpl_usize max_size, zpl_allocator a);
#define ZPL_RING_DEFINE(type) \
void ZPL_JOIN3(zpl_ring_, type, _init)(ZPL_JOIN2(zpl_ring_, type) * pad, zpl_allocator a, zpl_isize max_size) { \
ZPL_JOIN2(zpl_ring_, type) pad_ = { 0 }; \
#define ZPL_RING_DEFINE(prefix,type) \
void ZPL_JOIN2(prefix, init)(ZPL_JOIN2(prefix, type) * pad, zpl_allocator a, zpl_isize max_size) { \
ZPL_JOIN2(prefix, type) pad_ = { 0 }; \
*pad = pad_; \
\
pad->backing = a; \
@ -4135,30 +4121,30 @@ ZPL_END_C_DECLS
pad->capacity = max_size + 1; \
pad->head = pad->tail = 0; \
} \
void ZPL_JOIN3(zpl_ring_, type, _free)(ZPL_JOIN2(zpl_ring_, type) * pad) { \
void ZPL_JOIN2(prefix, free)(ZPL_JOIN2(prefix, type) * pad) { \
zpl_buffer_free(pad->buf); \
} \
\
zpl_b32 ZPL_JOIN3(zpl_ring_, type, _full)(ZPL_JOIN2(zpl_ring_, type) * pad) { \
zpl_b32 ZPL_JOIN2(prefix, full)(ZPL_JOIN2(prefix, type) * pad) { \
return ((pad->head + 1) % pad->capacity) == pad->tail; \
} \
\
zpl_b32 ZPL_JOIN3(zpl_ring_, type, _empty)(ZPL_JOIN2(zpl_ring_, type) * pad) { return pad->head == pad->tail; } \
zpl_b32 ZPL_JOIN2(prefix, empty)(ZPL_JOIN2(prefix, type) * pad) { return pad->head == pad->tail; } \
\
void ZPL_JOIN3(zpl_ring_, type, _append)(ZPL_JOIN2(zpl_ring_, type) * pad, type data) { \
void ZPL_JOIN2(prefix, append)(ZPL_JOIN2(prefix, type) * pad, type data) { \
pad->buf[pad->head] = data; \
pad->head = (pad->head + 1) % pad->capacity; \
\
if (pad->head == pad->tail) { pad->tail = (pad->tail + 1) % pad->capacity; } \
} \
\
void ZPL_JOIN3(zpl_ring_, type, _append_array)(ZPL_JOIN2(zpl_ring_, type) * pad, zpl_array(type) data) { \
void ZPL_JOIN2(prefix, append_array)(ZPL_JOIN2(prefix, type) * pad, zpl_array(type) data) { \
zpl_usize c = zpl_array_count(data); \
for (zpl_usize i = 0; i < c; ++i) { ZPL_JOIN3(zpl_ring_, type, _append)(pad, data[i]); } \
for (zpl_usize i = 0; i < c; ++i) { ZPL_JOIN2(prefix, append)(pad, data[i]); } \
} \
\
type *ZPL_JOIN3(zpl_ring_, type, _get)(ZPL_JOIN2(zpl_ring_, type) * pad) { \
if (ZPL_JOIN3(zpl_ring_, type, _empty)(pad)) { return NULL; } \
type *ZPL_JOIN2(prefix, get)(ZPL_JOIN2(prefix, type) * pad) { \
if (ZPL_JOIN2(prefix, empty)(pad)) { return NULL; } \
\
type *data = &pad->buf[pad->tail]; \
pad->tail = (pad->tail + 1) % pad->capacity; \
@ -4167,11 +4153,11 @@ ZPL_END_C_DECLS
} \
\
zpl_array(type) \
ZPL_JOIN3(zpl_ring_, type, _get_array)(ZPL_JOIN2(zpl_ring_, type) * pad, zpl_usize max_size, zpl_allocator a) { \
ZPL_JOIN2(prefix, get_array)(ZPL_JOIN2(prefix, type) * pad, zpl_usize max_size, zpl_allocator a) { \
zpl_array(type) vals = 0; \
zpl_array_init(vals, a); \
while (--max_size && !ZPL_JOIN3(zpl_ring_, type, _empty)(pad)) { \
zpl_array_append(vals, *ZPL_JOIN3(zpl_ring_, type, _get)(pad)); \
while (--max_size && !ZPL_JOIN2(prefix, empty)(pad)) { \
zpl_array_append(vals, *ZPL_JOIN2(prefix, get)(pad)); \
} \
return vals; \
}
@ -5790,7 +5776,10 @@ ZPL_END_C_DECLS
//! Perform timer pool update.
//! Traverse over all timers and update them accordingly. Should be called by Main Thread in a tight loop.
ZPL_DEF void zpl_timer_update(zpl_timer_pool pool);
ZPL_DEF void zpl_timer_update_array(zpl_timer_pool pool);
ZPL_DEF void zpl_timer_update(zpl_timer *timer);
//! Set up timer.
@ -7367,6 +7356,14 @@ ZPL_END_C_DECLS
#define ZPL_INVALID_JOB ZPL_U32_MAX
#ifndef ZPL_JOBS_MAX_QUEUE
#define ZPL_JOBS_MAX_QUEUE 100
#endif
#ifdef ZPL_JOBS_ENABLE_DEBUG
#define ZPL_JOBS_DEBUG
#endif
typedef enum {
ZPL_JOBS_STATUS_READY,
ZPL_JOBS_STATUS_BUSY,
@ -7374,42 +7371,73 @@ ZPL_END_C_DECLS
ZPL_JOBS_STATUS_TERM,
} zpl_jobs_status;
typedef enum {
ZPL_JOBS_PRIORITY_REALTIME,
ZPL_JOBS_PRIORITY_HIGH,
ZPL_JOBS_PRIORITY_NORMAL,
ZPL_JOBS_PRIORITY_LOW,
ZPL_JOBS_PRIORITY_IDLE,
ZPL_JOBS_MAX_PRIORITIES,
} zpl_jobs_priority;
typedef struct {
zpl_jobs_proc proc;
void *data;
zpl_f32 priority;
} zpl_thread_job;
ZPL_RING_DECLARE(zpl__jobs_ring_, zpl_thread_job);
typedef struct {
zpl_thread thread;
zpl_atomic32 status;
zpl_u32 jobid;
void *pool;
zpl_thread_job job;
#ifdef ZPL_JOBS_DEBUG
zpl_u32 hits;
zpl_u32 idle;
#endif
} zpl_thread_worker;
typedef struct {
zpl__jobs_ring_zpl_thread_job jobs; ///< zpl_ring
zpl_u32 chance;
#ifdef ZPL_JOBS_DEBUG
zpl_u32 hits;
#endif
} zpl_thread_queue;
typedef struct {
zpl_allocator alloc;
zpl_u32 max_threads;
zpl_f32 job_spawn_treshold;
zpl_mutex access;
zpl_u32 max_threads, max_jobs, counter;
zpl_thread_worker *workers; ///< zpl_buffer
zpl_thread_job *jobs; ///< zpl_array
zpl_u32 *queue; ///< zpl_array
zpl_u32 *available; ///< zpl_array
zpl_thread_queue queues[ZPL_JOBS_MAX_PRIORITIES];
} zpl_thread_pool;
//! Initialize thread pool with specified amount of fixed threads.
ZPL_DEF void zpl_jobs_init(zpl_thread_pool *pool, zpl_allocator a, zpl_u32 max_threads);
//! Initialize thread pool with specified amount of fixed threads and custom job limit.
ZPL_DEF void zpl_jobs_init_with_limit(zpl_thread_pool *pool, zpl_allocator a, zpl_u32 max_threads, zpl_u32 max_jobs);
//! Release the resources use by thread pool.
ZPL_DEF void zpl_jobs_free(zpl_thread_pool *pool);
//! Enqueue a job with specified data.
ZPL_DEF void zpl_jobs_enqueue(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data);
//! Enqueue a job with specified data and custom priority.
ZPL_DEF zpl_b32 zpl_jobs_enqueue_with_priority(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data, zpl_jobs_priority priority);
//! Enqueue a job with specific priority with specified data.
ZPL_DEF void zpl_jobs_enqueue_with_priority(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data, zpl_f32 priority);
//! Enqueue a job with specified data.
ZPL_DEF zpl_b32 zpl_jobs_enqueue(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data);
//! Check if the work queue is empty.
ZPL_DEF zpl_b32 zpl_jobs_empty(zpl_thread_pool *pool, zpl_jobs_priority priority);
ZPL_DEF zpl_b32 zpl_jobs_empty_all(zpl_thread_pool *pool);
ZPL_DEF zpl_b32 zpl_jobs_full_all(zpl_thread_pool *pool);
//! Check if the work queue is full.
ZPL_DEF zpl_b32 zpl_jobs_full(zpl_thread_pool *pool, zpl_jobs_priority priority);
//! Check if all workers are done.
ZPL_DEF zpl_b32 zpl_jobs_done(zpl_thread_pool *pool);
//! Process all jobs and check all threads. Should be called by Main Thread in a tight loop.
ZPL_DEF zpl_b32 zpl_jobs_process(zpl_thread_pool *pool);
@ -10964,7 +10992,9 @@ ZPL_END_C_DECLS
zpl_isize zpl_random_gen_isize(zpl_random *r) {
zpl_u64 u = zpl_random_gen_u64(r);
return *cast(zpl_isize *)&u;
zpl_isize i;
zpl_memcopy(&i, &u, zpl_size_of(u));
return i;
}
@ -10979,7 +11009,8 @@ ZPL_END_C_DECLS
zpl_isize zpl_random_range_isize(zpl_random *r, zpl_isize lower_inc, zpl_isize higher_inc) {
zpl_u64 u = zpl_random_gen_u64(r);
zpl_isize i = *cast(zpl_isize *)&u;
zpl_isize i;
zpl_memcopy(&i, &u, zpl_size_of(u));
zpl_isize diff = higher_inc-lower_inc+1;
i %= diff;
i += lower_inc;
@ -11398,29 +11429,32 @@ ZPL_END_C_DECLS
t->enabled = false;
}
void zpl_timer_update(zpl_timer_pool pool) {
ZPL_ASSERT(pool);
void zpl_timer_update(zpl_timer *timer) {
zpl_f64 now = zpl_time_rel();
if (timer->enabled) {
if (timer->remaining_calls > 0 || timer->initial_calls == -1) {
if (timer->next_call_ts <= now) {
if (timer->initial_calls != -1) { --timer->remaining_calls; }
zpl_f64 now = zpl_time_rel( );
if (timer->remaining_calls == 0) {
timer->enabled = false;
} else {
timer->next_call_ts = now + timer->duration;
}
timer->callback(timer->user_data);
}
}
}
}
void zpl_timer_update_array(zpl_timer_pool pool) {
ZPL_ASSERT(pool);
for (zpl_isize i = 0; i < zpl_array_count(pool); ++i) {
zpl_timer *t = pool + i;
if (t->enabled) {
if (t->remaining_calls > 0 || t->initial_calls == -1) {
if (t->next_call_ts <= now) {
if (t->initial_calls != -1) { --t->remaining_calls; }
if (t->remaining_calls == 0) {
t->enabled = false;
} else {
t->next_call_ts = now + t->duration;
}
t->callback(t->user_data);
}
}
}
zpl_timer_update(t);
}
}
@ -16803,9 +16837,14 @@ ZPL_END_C_DECLS
ZPL_BEGIN_C_DECLS
ZPL_RING_DEFINE(zpl__jobs_ring_, zpl_thread_job);
zpl_global const zpl_u32 zpl__jobs_chances[ZPL_JOBS_MAX_PRIORITIES] = {
2, 3, 5, 7, 11
};
zpl_isize zpl__jobs_entry(struct zpl_thread *thread) {
zpl_thread_worker *tw = (zpl_thread_worker *)thread->user_data;
zpl_thread_pool *pool = (zpl_thread_pool *)tw->pool;
for (;;) {
zpl_u32 status = zpl_atomic32_load(&tw->status);
@ -16813,18 +16852,19 @@ ZPL_END_C_DECLS
switch (status) {
case ZPL_JOBS_STATUS_READY: {
zpl_atomic32_store(&tw->status, ZPL_JOBS_STATUS_BUSY);
tw->job.proc(tw->job.data);
zpl_atomic32_compare_exchange(&tw->status, ZPL_JOBS_STATUS_BUSY, ZPL_JOBS_STATUS_WAITING);
zpl_mutex_lock(&pool->access);
zpl_thread_job *job = pool->jobs + tw->jobid;
zpl_mutex_unlock(&pool->access);
job->proc(job->data);
zpl_atomic32_store(&tw->status, ZPL_JOBS_STATUS_WAITING);
#ifdef ZPL_JOBS_DEBUG
++tw->hits;
#endif
} break;
case ZPL_JOBS_STATUS_WAITING: {
zpl_yield( );
#ifdef ZPL_JOBS_DEBUG
++tw->idle;
#endif
zpl_yield();
} break;
case ZPL_JOBS_STATUS_TERM: {
@ -16837,21 +16877,25 @@ ZPL_END_C_DECLS
}
void zpl_jobs_init(zpl_thread_pool *pool, zpl_allocator a, zpl_u32 max_threads) {
zpl_jobs_init_with_limit(pool, a, max_threads, ZPL_JOBS_MAX_QUEUE);
}
void zpl_jobs_init_with_limit(zpl_thread_pool *pool, zpl_allocator a, zpl_u32 max_threads, zpl_u32 max_jobs) {
zpl_thread_pool pool_ = { 0 };
*pool = pool_;
zpl_mutex_init(&pool->access);
pool->alloc = a;
pool->max_threads = max_threads;
// NOTE: Spawn a new job slot when number of available slots is below 25%
// compared to the total number of slots.
pool->job_spawn_treshold = 0.25;
pool->max_jobs = max_jobs;
pool->counter = 0;
zpl_buffer_init(pool->workers, a, max_threads);
zpl_array_init(pool->jobs, a);
zpl_array_init(pool->queue, a);
zpl_array_init(pool->available, a);
for (zpl_usize i = 0; i < ZPL_JOBS_MAX_PRIORITIES; ++i) {
zpl_thread_queue *q = &pool->queues[i];
zpl__jobs_ring_init(&q->jobs, a, max_jobs);
q->chance = zpl__jobs_chances[i];
}
for (zpl_usize i = 0; i < max_threads; ++i) {
zpl_thread_worker worker_ = { 0 };
@ -16860,8 +16904,6 @@ ZPL_END_C_DECLS
zpl_thread_init(&tw->thread);
zpl_atomic32_store(&tw->status, ZPL_JOBS_STATUS_WAITING);
tw->pool = pool;
tw->jobid = ZPL_INVALID_JOB;
zpl_thread_start(&tw->thread, zpl__jobs_entry, (void *)tw);
}
}
@ -16875,79 +16917,97 @@ ZPL_END_C_DECLS
}
zpl_buffer_free(pool->workers);
zpl_array_free(pool->jobs);
zpl_array_free(pool->queue);
zpl_array_free(pool->available);
for (zpl_usize i = 0; i < ZPL_JOBS_MAX_PRIORITIES; ++i) {
zpl_thread_queue *q = &pool->queues[i];
zpl__jobs_ring_free(&q->jobs);
}
}
void zpl_jobs_enqueue_with_priority(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data, zpl_f32 priority) {
zpl_b32 zpl_jobs_enqueue_with_priority(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data, zpl_jobs_priority priority) {
ZPL_ASSERT_NOT_NULL(proc);
zpl_f32 treshold = 0.0f;
if (zpl_array_count(pool->queue) > 0) {
treshold = (zpl_array_count(pool->available) / (zpl_f32)zpl_array_count(pool->jobs));
}
if (treshold <= pool->job_spawn_treshold) {
zpl_thread_job job = { 0 };
ZPL_ASSERT(priority >= 0 && priority < ZPL_JOBS_MAX_PRIORITIES);
zpl_thread_job job = {0};
job.proc = proc;
job.data = data;
job.priority = priority;
zpl_array_append(pool->jobs, job);
zpl_u32 jobid = (zpl_u32)zpl_array_count(pool->jobs) - 1;
zpl_array_append(pool->queue, jobid);
} else {
zpl_u32 jobid = zpl_array_back(pool->available);
zpl_thread_job *jp = pool->jobs + jobid;
zpl_array_pop(pool->available);
if (!zpl_jobs_full(pool, priority)) {
zpl__jobs_ring_append(&pool->queues[priority].jobs, job);
return true;
}
return false;
}
jp->proc = proc;
jp->data = data;
jp->priority = priority;
zpl_b32 zpl_jobs_enqueue(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data) {
return zpl_jobs_enqueue_with_priority(pool, proc, data, ZPL_JOBS_PRIORITY_NORMAL);
}
zpl_array_append(pool->queue, jobid);
zpl_b32 zpl_jobs_empty(zpl_thread_pool *pool, zpl_jobs_priority priority) {
return zpl__jobs_ring_empty(&pool->queues[priority].jobs);
}
zpl_b32 zpl_jobs_full(zpl_thread_pool *pool, zpl_jobs_priority priority) {
return zpl__jobs_ring_full(&pool->queues[priority].jobs);
}
zpl_b32 zpl_jobs_done(zpl_thread_pool *pool) {
for (zpl_usize i = 0; i < pool->max_threads; ++i) {
zpl_thread_worker *tw = pool->workers + i;
if (zpl_atomic32_load(&tw->status) != ZPL_JOBS_STATUS_WAITING) {
return false;
}
}
void zpl_jobs_enqueue(zpl_thread_pool *pool, zpl_jobs_proc proc, void *data) {
ZPL_ASSERT_NOT_NULL(proc);
zpl_jobs_enqueue_with_priority(pool, proc, data, 1.0f);
return zpl_jobs_empty_all(pool);
}
zpl_thread_local zpl_thread_pool *zpl__thread_pool;
ZPL_COMPARE_PROC(zpl___jobs_cmp) {
zpl_thread_job *p = (zpl_thread_job *)(zpl__thread_pool->jobs + *(zpl_u32 *)a);
zpl_thread_job *q = (zpl_thread_job *)(zpl__thread_pool->jobs + *(zpl_u32 *)b);
return cast(zpl_i32)(p->priority < q->priority ? 1.0f : p->priority > q->priority);
zpl_b32 zpl_jobs_empty_all(zpl_thread_pool *pool) {
for (zpl_usize i = 0; i < ZPL_JOBS_MAX_PRIORITIES; ++i) {
if (!zpl_jobs_empty(pool, (zpl_jobs_priority)i)) {
return false;
}
}
return true;
}
ZPL_COMPARE_PROC_PTR(zpl__jobs_cmp(zpl_thread_pool *pool)) {
zpl__thread_pool = pool;
return &zpl___jobs_cmp;
zpl_b32 zpl_jobs_full_all(zpl_thread_pool *pool) {
for (zpl_usize i = 0; i < ZPL_JOBS_MAX_PRIORITIES; ++i) {
if (!zpl_jobs_full(pool, (zpl_jobs_priority)i)) {
return false;
}
}
return true;
}
zpl_b32 zpl_jobs_process(zpl_thread_pool *pool) {
// NOTE: Sort the queue based on the job priority
if (zpl_array_count(pool->queue)) {
zpl_sort_array(pool->queue, zpl_array_count(pool->queue), zpl__jobs_cmp(pool));
if (zpl_jobs_empty_all(pool)) {
return false;
}
// NOTE: Process the jobs
for (zpl_usize i = 0; i < pool->max_threads; ++i) {
zpl_thread_worker *tw = pool->workers + i;
if (zpl_array_count(pool->queue) == 0) return false;
zpl_u32 status = zpl_atomic32_load(&tw->status);
zpl_b32 last_empty = false;
if (status == ZPL_JOBS_STATUS_WAITING) {
if (tw->jobid != ZPL_INVALID_JOB) { zpl_array_append(pool->available, tw->jobid); }
for (zpl_usize i = 0; i < ZPL_JOBS_MAX_PRIORITIES; ++i) {
zpl_thread_queue *q = &pool->queues[i];
if (zpl_jobs_empty(pool, (zpl_jobs_priority)i)) {
last_empty = true;
continue;
}
if (!last_empty && ((pool->counter++ % q->chance) != 0)) {
continue;
}
zpl_u32 jobid = *pool->queue;
zpl_array_remove_at(pool->queue, 0);
tw->jobid = jobid;
last_empty = false;
tw->job = *zpl__jobs_ring_get(&q->jobs);
zpl_atomic32_store(&tw->status, ZPL_JOBS_STATUS_READY);
#ifdef ZPL_JOBS_DEBUG
++q->hits;
#endif
break;
}
}
}
@ -17221,3 +17281,4 @@ ZPL_END_C_DECLS
// source/threading/affinity.c
// source/jobs.c
// source/coroutines.c