aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathanael Sensfelder <SpamShield0@MultiAgentSystems.org>2017-06-19 12:59:19 +0200
committerNathanael Sensfelder <SpamShield0@MultiAgentSystems.org>2017-06-19 12:59:19 +0200
commitc4798a34b7759d578ea48e80e6a7f1ece22ddbca (patch)
tree269e81dba880e05ef69d87d2008c977c0fd3f4f9
parentbf8606c7fc9720afe04eb403d967a352fa92bd0f (diff)
downloadstorage-c4798a34b7759d578ea48e80e6a7f1ece22ddbca.zip
storage-c4798a34b7759d578ea48e80e6a7f1ece22ddbca.tar.bz2
Attempt at fixing incorrect timedwait usage.
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/pervasive.h8
-rw-r--r--src/server/server.c31
-rw-r--r--src/server/server_initialize.c15
-rw-r--r--src/server/server_new_connection.c2
-rw-r--r--src/server/server_types.h2
-rw-r--r--src/server/server_worker.c18
-rw-r--r--src/server/server_worker_data_merger.c17
-rw-r--r--src/timespec/CMakeLists.txt7
-rw-r--r--src/timespec/timespec.c70
-rw-r--r--src/timespec/timespec.h26
11 files changed, 179 insertions, 18 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 0b242bc..f89d3e9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -2,6 +2,7 @@ add_subdirectory(error)
add_subdirectory(core)
add_subdirectory(filter)
add_subdirectory(server)
+add_subdirectory(timespec)
add_subdirectory(parameters)
set(
diff --git a/src/pervasive.h b/src/pervasive.h
index 27d832d..b79f9f1 100644
--- a/src/pervasive.h
+++ b/src/pervasive.h
@@ -16,6 +16,14 @@
#define JH_DEBUG_ALL 0
#endif
+#ifndef JH_DEBUG_SERVER_CONNECTIONS
+ #define JH_DEBUG_SERVER_CONNECTIONS (JH_DEBUG_ALL || 0)
+#endif
+
+#ifndef JH_DEBUG_SERVER_TIMEOUTS
+ #define JH_DEBUG_SERVER_TIMEOUTS (JH_DEBUG_ALL || 0)
+#endif
+
#define JH__TO_STRING(x) #x
#define JH_TO_STRING(x) JH__TO_STRING(x)
#define JH_ISOLATE(a) do {a} while (0)
diff --git a/src/server/server.c b/src/server/server.c
index 640584f..b045cc2 100644
--- a/src/server/server.c
+++ b/src/server/server.c
@@ -2,6 +2,7 @@
#include <string.h>
#include <stdio.h>
+#include "../pervasive.h"
#include "../parameters/parameters.h"
#include "server.h"
@@ -15,12 +16,6 @@ int JH_server_main
JH_index retries;
retries = 0;
- /* TODO
- if (JH_server_set_signal_handlers < 0)
- {
- return -1;
- }
- */
if (JH_server_initialize(&server, params) < 0)
{
@@ -32,7 +27,7 @@ int JH_server_main
switch (JH_server_wait_for_event(&server))
{
case 0: /* Timed out or signal'd. */
- JH_S_DEBUG(stderr, 1, "Timed out...");
+ JH_S_DEBUG(stderr, JH_DEBUG_SERVER_TIMEOUTS, "Timed out...");
JH_server_handle_joining_threads(&server);
retries = 0;
@@ -40,7 +35,7 @@ int JH_server_main
break;
case 1: /* New client attempted connection. */
- JH_S_DEBUG(stderr, 1, "New connection.");
+ JH_S_DEBUG(stderr, JH_DEBUG_SERVER_CONNECTIONS, "New connection.");
JH_server_handle_joining_threads(&server);
(void) JH_server_handle_new_connection(&server);
@@ -51,6 +46,15 @@ int JH_server_main
case -1: /* Something bad happened. */
retries += 1;
+ JH_WARNING
+ (
+ stderr,
+ "The server had an issue while waiting for events to occur."
+ " This way try %d, out of %d.",
+ retries,
+ JH_SERVER_MAX_RETRIES
+ );
+
if (retries == JH_SERVER_MAX_RETRIES)
{
JH_server_finalize(&server);
@@ -71,6 +75,8 @@ int JH_server_main
}
}
+ retries = 0;
+
/* Waiting for the threads to join... */
while (server.workers.currently_running > 0)
{
@@ -84,6 +90,15 @@ int JH_server_main
case -1: /* Something bad happened. */
retries += 1;
+ JH_WARNING
+ (
+ stderr,
+ "The server had an issue while waiting for threads to join."
+ " This way try %d out of %d.",
+ retries,
+ JH_SERVER_MAX_RETRIES
+ );
+
if (retries == JH_SERVER_MAX_RETRIES)
{
JH_server_finalize(&server);
diff --git a/src/server/server_initialize.c b/src/server/server_initialize.c
index 895a833..dd929f5 100644
--- a/src/server/server_initialize.c
+++ b/src/server/server_initialize.c
@@ -166,7 +166,7 @@ int JH_server_initialize
if (initialize_worker_collection(&(server->workers), params) < 0)
{
- return -1;
+ return -2;
}
if
@@ -178,12 +178,19 @@ int JH_server_initialize
) < 0
)
{
- /* TODO: free "server->workers" */
+ JH_server_finalize(server);
- return -2;
+ return -3;
}
initialize_thread_parameters(server, params);
- return JH_server_worker_data_merger_thread_init(server);
+ if (JH_server_worker_data_merger_thread_init(server) < 0)
+ {
+ JH_server_finalize(server);
+
+ return -4;
+ }
+
+ return 0;
}
diff --git a/src/server/server_new_connection.c b/src/server/server_new_connection.c
index 0734249..023a855 100644
--- a/src/server/server_new_connection.c
+++ b/src/server/server_new_connection.c
@@ -103,6 +103,8 @@ static int get_new_thread (struct JH_server server [const restrict static 1])
"Reallocation of the threads' data list failed."
);
+ server->workers.threads_capacity -= 1;
+
pthread_mutex_unlock(&(server->workers.mutex));
return -1;
diff --git a/src/server/server_types.h b/src/server/server_types.h
index 3159ca5..7e3272f 100644
--- a/src/server/server_types.h
+++ b/src/server/server_types.h
@@ -24,6 +24,8 @@
#define JH_SERVER_WORKER_MAX_WAITING_TIME 5
+#define JH_SERVER_WORKER_DATA_MERGER_DELAY 5
+
enum JH_server_thread_state
{
JH_SERVER_JOINING_THREAD,
diff --git a/src/server/server_worker.c b/src/server/server_worker.c
index faf8c32..1ec4e0b 100644
--- a/src/server/server_worker.c
+++ b/src/server/server_worker.c
@@ -121,7 +121,16 @@ static int initialize
return -1;
}
- return connect_downstream(worker);
+ if (connect_downstream(worker) < 0)
+ {
+ fclose(worker->storage_file);
+
+ worker->storage_file = (FILE *) NULL;
+
+ return -1;
+ }
+
+ return 0;
}
static void finalize
@@ -221,7 +230,12 @@ void * JH_server_worker_main (void * input)
struct JH_filter filter;
struct JH_server_worker worker;
- initialize(&worker, input);
+ if (initialize(&worker, input) < 0)
+ {
+ finalize(&worker);
+
+ return NULL;
+ }
if (JH_filter_initialize(&filter) < 0)
{
diff --git a/src/server/server_worker_data_merger.c b/src/server/server_worker_data_merger.c
index a5f5bdf..c0df221 100644
--- a/src/server/server_worker_data_merger.c
+++ b/src/server/server_worker_data_merger.c
@@ -7,8 +7,11 @@
#include "../core/index.h"
#include "../parameters/parameters.h"
+
#include "../error/error.h"
+#include "../timespec/timespec.h"
+
#include "../filter/filter.h"
#include "server.h"
@@ -146,10 +149,11 @@ void * JH_server_worker_data_merger_main (void * input)
int err;
JH_index i;
struct JH_server_worker worker;
- struct timespec abstime;
+ struct timespec delay, current_time, target_time;
+
+ memset((void *) &delay, 0, sizeof(struct timespec));
- memset((void *) &abstime, 0, sizeof(struct timespec));
- abstime.tv_sec = 5;
+ delay.tv_sec = JH_SERVER_WORKER_DATA_MERGER_DELAY;
initialize(&worker, input);
@@ -157,12 +161,17 @@ void * JH_server_worker_data_merger_main (void * input)
while (JH_server_is_running())
{
+ /* Get current time */
+ (void) clock_gettime(CLOCK_REALTIME, &current_time);
+
+ JH_timespec_add(&current_time, &delay, &target_time);
+
err =
pthread_cond_timedwait
(
&(worker.params.thread_collection->merger_condition),
&(worker.params.thread_collection->merger_mutex),
- &abstime
+ &target_time
);
if (err == ETIMEDOUT)
diff --git a/src/timespec/CMakeLists.txt b/src/timespec/CMakeLists.txt
new file mode 100644
index 0000000..a84f996
--- /dev/null
+++ b/src/timespec/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(
+ SRC_FILES ${SRC_FILES}
+ ${CMAKE_CURRENT_SOURCE_DIR}/timespec.c
+)
+
+set(SRC_FILES ${SRC_FILES} PARENT_SCOPE)
+
diff --git a/src/timespec/timespec.c b/src/timespec/timespec.c
new file mode 100644
index 0000000..5f9584f
--- /dev/null
+++ b/src/timespec/timespec.c
@@ -0,0 +1,70 @@
+#include <time.h>
+
+#include "timespec.h"
+
+#define JH_NANOSECONDS_IN_A_SECOND 1000000000
+
+/* FIXME NOT OVERFLOW SAFE */
+void JH_timespec_add
+(
+ const struct timespec a [const restrict static 1],
+ const struct timespec b [const restrict static 1],
+ struct timespec result [const restrict static 1]
+)
+{
+ long dist_to_sec;
+ result->tv_sec = (a->tv_sec + b->tv_sec);
+
+ dist_to_sec = (JH_NANOSECONDS_IN_A_SECOND - b->tv_nsec);
+
+ if (dist_to_sec <= a->tv_nsec)
+ {
+ result->tv_sec += 1;
+ result->tv_nsec = (a->tv_nsec - dist_to_sec);
+ }
+ else
+ {
+ result->tv_nsec = a->tv_nsec + b->tv_nsec;
+ }
+}
+
+/* FIXME: NOT UNDERFLOW SAFE */
+void JH_timespec_sub
+(
+ const struct timespec a [const restrict static 1],
+ const struct timespec b [const restrict static 1],
+ struct timespec result [const restrict static 1]
+)
+{
+ result->tv_sec = (a->tv_sec - b->tv_sec);
+
+ if (b->tv_nsec > a->tv_nsec)
+ {
+ result->tv_sec -= 1;
+ result->tv_nsec =
+ (JH_NANOSECONDS_IN_A_SECOND - (b->tv_nsec - a->tv_nsec));
+ }
+ else
+ {
+ result->tv_nsec = a->tv_nsec - b->tv_nsec;
+ }
+}
+
+int JH_timespec_greater_than
+(
+ const struct timespec a [const restrict static 1],
+ const struct timespec b [const restrict static 1]
+)
+{
+ if (a->tv_sec > b->tv_sec)
+ {
+ return 1;
+ }
+
+ if (a->tv_sec < b->tv_sec)
+ {
+ return 0;
+ }
+
+ return (a->tv_nsec > b->tv_nsec);
+}
diff --git a/src/timespec/timespec.h b/src/timespec/timespec.h
new file mode 100644
index 0000000..3e4dd6c
--- /dev/null
+++ b/src/timespec/timespec.h
@@ -0,0 +1,26 @@
+#ifndef _JH_TIMESPEC_H_
+#define _JH_TIMESPEC_H_
+
+#include <time.h>
+
+void JH_timespec_add
+(
+ const struct timespec a [const restrict static 1],
+ const struct timespec b [const restrict static 1],
+ struct timespec result [const restrict static 1]
+);
+
+void JH_timespec_sub
+(
+ const struct timespec a [const restrict static 1],
+ const struct timespec b [const restrict static 1],
+ struct timespec result [const restrict static 1]
+);
+
+int JH_timespec_greater_than
+(
+ const struct timespec a [const restrict static 1],
+ const struct timespec b [const restrict static 1]
+);
+
+#endif