diff options
author | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-19 12:59:19 +0200 |
---|---|---|
committer | Nathanael Sensfelder <SpamShield0@MultiAgentSystems.org> | 2017-06-19 12:59:19 +0200 |
commit | c4798a34b7759d578ea48e80e6a7f1ece22ddbca (patch) | |
tree | 269e81dba880e05ef69d87d2008c977c0fd3f4f9 | |
parent | bf8606c7fc9720afe04eb403d967a352fa92bd0f (diff) | |
download | storage-c4798a34b7759d578ea48e80e6a7f1ece22ddbca.zip storage-c4798a34b7759d578ea48e80e6a7f1ece22ddbca.tar.bz2 |
Attempt at fixing incorrect timedwait usage.
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/pervasive.h | 8 | ||||
-rw-r--r-- | src/server/server.c | 31 | ||||
-rw-r--r-- | src/server/server_initialize.c | 15 | ||||
-rw-r--r-- | src/server/server_new_connection.c | 2 | ||||
-rw-r--r-- | src/server/server_types.h | 2 | ||||
-rw-r--r-- | src/server/server_worker.c | 18 | ||||
-rw-r--r-- | src/server/server_worker_data_merger.c | 17 | ||||
-rw-r--r-- | src/timespec/CMakeLists.txt | 7 | ||||
-rw-r--r-- | src/timespec/timespec.c | 70 | ||||
-rw-r--r-- | src/timespec/timespec.h | 26 |
11 files changed, 179 insertions, 18 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0b242bc..f89d3e9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(error) add_subdirectory(core) add_subdirectory(filter) add_subdirectory(server) +add_subdirectory(timespec) add_subdirectory(parameters) set( diff --git a/src/pervasive.h b/src/pervasive.h index 27d832d..b79f9f1 100644 --- a/src/pervasive.h +++ b/src/pervasive.h @@ -16,6 +16,14 @@ #define JH_DEBUG_ALL 0 #endif +#ifndef JH_DEBUG_SERVER_CONNECTIONS + #define JH_DEBUG_SERVER_CONNECTIONS (JH_DEBUG_ALL || 0) +#endif + +#ifndef JH_DEBUG_SERVER_TIMEOUTS + #define JH_DEBUG_SERVER_TIMEOUTS (JH_DEBUG_ALL || 0) +#endif + #define JH__TO_STRING(x) #x #define JH_TO_STRING(x) JH__TO_STRING(x) #define JH_ISOLATE(a) do {a} while (0) diff --git a/src/server/server.c b/src/server/server.c index 640584f..b045cc2 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -2,6 +2,7 @@ #include <string.h> #include <stdio.h> +#include "../pervasive.h" #include "../parameters/parameters.h" #include "server.h" @@ -15,12 +16,6 @@ int JH_server_main JH_index retries; retries = 0; - /* TODO - if (JH_server_set_signal_handlers < 0) - { - return -1; - } - */ if (JH_server_initialize(&server, params) < 0) { @@ -32,7 +27,7 @@ int JH_server_main switch (JH_server_wait_for_event(&server)) { case 0: /* Timed out or signal'd. */ - JH_S_DEBUG(stderr, 1, "Timed out..."); + JH_S_DEBUG(stderr, JH_DEBUG_SERVER_TIMEOUTS, "Timed out..."); JH_server_handle_joining_threads(&server); retries = 0; @@ -40,7 +35,7 @@ int JH_server_main break; case 1: /* New client attempted connection. */ - JH_S_DEBUG(stderr, 1, "New connection."); + JH_S_DEBUG(stderr, JH_DEBUG_SERVER_CONNECTIONS, "New connection."); JH_server_handle_joining_threads(&server); (void) JH_server_handle_new_connection(&server); @@ -51,6 +46,15 @@ int JH_server_main case -1: /* Something bad happened. */ retries += 1; + JH_WARNING + ( + stderr, + "The server had an issue while waiting for events to occur." + " This way try %d, out of %d.", + retries, + JH_SERVER_MAX_RETRIES + ); + if (retries == JH_SERVER_MAX_RETRIES) { JH_server_finalize(&server); @@ -71,6 +75,8 @@ int JH_server_main } } + retries = 0; + /* Waiting for the threads to join... */ while (server.workers.currently_running > 0) { @@ -84,6 +90,15 @@ int JH_server_main case -1: /* Something bad happened. */ retries += 1; + JH_WARNING + ( + stderr, + "The server had an issue while waiting for threads to join." + " This way try %d out of %d.", + retries, + JH_SERVER_MAX_RETRIES + ); + if (retries == JH_SERVER_MAX_RETRIES) { JH_server_finalize(&server); diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c index 895a833..dd929f5 100644 --- a/src/server/server_initialize.c +++ b/src/server/server_initialize.c @@ -166,7 +166,7 @@ int JH_server_initialize if (initialize_worker_collection(&(server->workers), params) < 0) { - return -1; + return -2; } if @@ -178,12 +178,19 @@ int JH_server_initialize ) < 0 ) { - /* TODO: free "server->workers" */ + JH_server_finalize(server); - return -2; + return -3; } initialize_thread_parameters(server, params); - return JH_server_worker_data_merger_thread_init(server); + if (JH_server_worker_data_merger_thread_init(server) < 0) + { + JH_server_finalize(server); + + return -4; + } + + return 0; } diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c index 0734249..023a855 100644 --- a/src/server/server_new_connection.c +++ b/src/server/server_new_connection.c @@ -103,6 +103,8 @@ static int get_new_thread (struct JH_server server [const restrict static 1]) "Reallocation of the threads' data list failed." ); + server->workers.threads_capacity -= 1; + pthread_mutex_unlock(&(server->workers.mutex)); return -1; diff --git a/src/server/server_types.h b/src/server/server_types.h index 3159ca5..7e3272f 100644 --- a/src/server/server_types.h +++ b/src/server/server_types.h @@ -24,6 +24,8 @@ #define JH_SERVER_WORKER_MAX_WAITING_TIME 5 +#define JH_SERVER_WORKER_DATA_MERGER_DELAY 5 + enum JH_server_thread_state { JH_SERVER_JOINING_THREAD, diff --git a/src/server/server_worker.c b/src/server/server_worker.c index faf8c32..1ec4e0b 100644 --- a/src/server/server_worker.c +++ b/src/server/server_worker.c @@ -121,7 +121,16 @@ static int initialize return -1; } - return connect_downstream(worker); + if (connect_downstream(worker) < 0) + { + fclose(worker->storage_file); + + worker->storage_file = (FILE *) NULL; + + return -1; + } + + return 0; } static void finalize @@ -221,7 +230,12 @@ void * JH_server_worker_main (void * input) struct JH_filter filter; struct JH_server_worker worker; - initialize(&worker, input); + if (initialize(&worker, input) < 0) + { + finalize(&worker); + + return NULL; + } if (JH_filter_initialize(&filter) < 0) { diff --git a/src/server/server_worker_data_merger.c b/src/server/server_worker_data_merger.c index a5f5bdf..c0df221 100644 --- a/src/server/server_worker_data_merger.c +++ b/src/server/server_worker_data_merger.c @@ -7,8 +7,11 @@ #include "../core/index.h" #include "../parameters/parameters.h" + #include "../error/error.h" +#include "../timespec/timespec.h" + #include "../filter/filter.h" #include "server.h" @@ -146,10 +149,11 @@ void * JH_server_worker_data_merger_main (void * input) int err; JH_index i; struct JH_server_worker worker; - struct timespec abstime; + struct timespec delay, current_time, target_time; + + memset((void *) &delay, 0, sizeof(struct timespec)); - memset((void *) &abstime, 0, sizeof(struct timespec)); - abstime.tv_sec = 5; + delay.tv_sec = JH_SERVER_WORKER_DATA_MERGER_DELAY; initialize(&worker, input); @@ -157,12 +161,17 @@ void * JH_server_worker_data_merger_main (void * input) while (JH_server_is_running()) { + /* Get current time */ + (void) clock_gettime(CLOCK_REALTIME, ¤t_time); + + JH_timespec_add(¤t_time, &delay, &target_time); + err = pthread_cond_timedwait ( &(worker.params.thread_collection->merger_condition), &(worker.params.thread_collection->merger_mutex), - &abstime + &target_time ); if (err == ETIMEDOUT) diff --git a/src/timespec/CMakeLists.txt b/src/timespec/CMakeLists.txt new file mode 100644 index 0000000..a84f996 --- /dev/null +++ b/src/timespec/CMakeLists.txt @@ -0,0 +1,7 @@ +set( + SRC_FILES ${SRC_FILES} + ${CMAKE_CURRENT_SOURCE_DIR}/timespec.c +) + +set(SRC_FILES ${SRC_FILES} PARENT_SCOPE) + diff --git a/src/timespec/timespec.c b/src/timespec/timespec.c new file mode 100644 index 0000000..5f9584f --- /dev/null +++ b/src/timespec/timespec.c @@ -0,0 +1,70 @@ +#include <time.h> + +#include "timespec.h" + +#define JH_NANOSECONDS_IN_A_SECOND 1000000000 + +/* FIXME NOT OVERFLOW SAFE */ +void JH_timespec_add +( + const struct timespec a [const restrict static 1], + const struct timespec b [const restrict static 1], + struct timespec result [const restrict static 1] +) +{ + long dist_to_sec; + result->tv_sec = (a->tv_sec + b->tv_sec); + + dist_to_sec = (JH_NANOSECONDS_IN_A_SECOND - b->tv_nsec); + + if (dist_to_sec <= a->tv_nsec) + { + result->tv_sec += 1; + result->tv_nsec = (a->tv_nsec - dist_to_sec); + } + else + { + result->tv_nsec = a->tv_nsec + b->tv_nsec; + } +} + +/* FIXME: NOT UNDERFLOW SAFE */ +void JH_timespec_sub +( + const struct timespec a [const restrict static 1], + const struct timespec b [const restrict static 1], + struct timespec result [const restrict static 1] +) +{ + result->tv_sec = (a->tv_sec - b->tv_sec); + + if (b->tv_nsec > a->tv_nsec) + { + result->tv_sec -= 1; + result->tv_nsec = + (JH_NANOSECONDS_IN_A_SECOND - (b->tv_nsec - a->tv_nsec)); + } + else + { + result->tv_nsec = a->tv_nsec - b->tv_nsec; + } +} + +int JH_timespec_greater_than +( + const struct timespec a [const restrict static 1], + const struct timespec b [const restrict static 1] +) +{ + if (a->tv_sec > b->tv_sec) + { + return 1; + } + + if (a->tv_sec < b->tv_sec) + { + return 0; + } + + return (a->tv_nsec > b->tv_nsec); +} diff --git a/src/timespec/timespec.h b/src/timespec/timespec.h new file mode 100644 index 0000000..3e4dd6c --- /dev/null +++ b/src/timespec/timespec.h @@ -0,0 +1,26 @@ +#ifndef _JH_TIMESPEC_H_ +#define _JH_TIMESPEC_H_ + +#include <time.h> + +void JH_timespec_add +( + const struct timespec a [const restrict static 1], + const struct timespec b [const restrict static 1], + struct timespec result [const restrict static 1] +); + +void JH_timespec_sub +( + const struct timespec a [const restrict static 1], + const struct timespec b [const restrict static 1], + struct timespec result [const restrict static 1] +); + +int JH_timespec_greater_than +( + const struct timespec a [const restrict static 1], + const struct timespec b [const restrict static 1] +); + +#endif |