diff --git a/include/core/jconfiguration.h b/include/core/jconfiguration.h index f65738e0d..8753e5119 100644 --- a/include/core/jconfiguration.h +++ b/include/core/jconfiguration.h @@ -30,6 +30,7 @@ #include #include +#include G_BEGIN_DECLS @@ -43,6 +44,14 @@ struct JConfiguration; typedef struct JConfiguration JConfiguration; +struct JStorageTier +{ + guint64 bandwidth; ///< bandwidth in byte ber second + guint64 latency; ///< latency in ns + guint64 capacity; ///< capacity in byte +}; +typedef struct JStorageTier JStorageTier; + /** * Returns the configuration. * @@ -116,6 +125,10 @@ guint64 j_configuration_get_stripe_size(JConfiguration*); gchar const* j_configuration_get_checksum(JConfiguration*); +gchar const* j_configuration_get_object_policy_kv_backend(JConfiguration*); +gchar const* j_configuration_get_object_policy_kv_path(JConfiguration*); +gchar const* j_configuration_get_object_policy(JConfiguration*); +gchar const* const* j_configuration_get_object_policy_args(JConfiguration*); G_END_DECLS /** diff --git a/include/core/jhelper.h b/include/core/jhelper.h index b5656f787..9620de762 100644 --- a/include/core/jhelper.h +++ b/include/core/jhelper.h @@ -34,6 +34,69 @@ G_BEGIN_DECLS +/** + * Execudes a command which returns true on success. + * on error a warning is written and jumps to in the form: + * EXE failed at :: with + * "", args + * + * \param cmd command to execute + * \param err_label label to jump to, when failed + * \param ... warning message in form: , args + **/ +#define EXE(cmd, err_label, ...) \ + do \ + { \ + if ((cmd) == FALSE) \ + { \ + g_warning("EXE failed at %s:%d with:", __FILE__, __LINE__); \ + g_warning(__VA_ARGS__); \ + goto err_label; \ + } \ + } while (FALSE) + +/** + * Checks if is an non negative number, if it negative jumps to + * and print a warning in the form: + * CHECK failed at : with (): + * "", args + * + * \param res result value + * \param err_label label to jump to at error + * \param ... warning message in form: , args + **/ +#define CHECK(res, err_label, ...) \ + do \ + { \ + if (res < 0) \ + { \ + g_warning("CHECK failed at %s:%d with (%d):", __FILE__, __LINE__, res); \ + g_warning(__VA_ARGS__); \ + goto err_label; \ + } \ + } while (FALSE) + +/** + * Check if is an filled GError, if print warning and jumps to + * warnings has the form: + * CHECK failed at : with: + * "", args + * + * \param error GError + * \param err_label label to jump to at error + * \param ... warning message in form: , args + **/ +#define CHECK_GERROR(error, err_label, ...) \ + do \ + { \ + if (error != NULL) \ + { \ + g_warning("CHECK failed at %s:%d with:\n\t%s", __FILE__, __LINE__, error->message); \ + g_error_free(error); \ + goto err_label; \ + } \ + } while (FALSE) + /** * \defgroup JHelper Helper * diff --git a/include/core/jmanagedbackends.h b/include/core/jmanagedbackends.h new file mode 100644 index 000000000..9b228b3cb --- /dev/null +++ b/include/core/jmanagedbackends.h @@ -0,0 +1,400 @@ +/** + * \file JManagedBackends.h + * \ingroup backend + * + * \copyright + * JULEA - Flexible storage framework + * Copyright (C) 2022 Julian Benda + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ + +#ifndef JULEA_MANAGED_BACKENDS_H +#define JULEA_MANAGED_BACKENDS_H + +#if !defined(JULEA_H) && !defined(JULEA_COMPILATION) +#error "Only can be included directly" +#endif + +#include + +G_BEGIN_DECLS + +struct JConfiguration; +typedef struct JConfiguration JConfiguration; + +struct JList; +typedef struct JList JList; + +struct JBackend; +typedef struct JBackend JBackend; + +/** + * \defgroup JManagedBackends + * + * \todo rename to JManagedObjectBackends? + * + * A Collection of multiple object backends, which distributed the objects + * controlled by a policy Module. + * + * @{ + **/ + +/** + * \defgroup JStorageTier + * + * \todo evaluate characteristics + + * Collection of characteristics about a storage media, used for distribution + * decisions by the policy. + * + * \attention all values contained in this may only fixed values + * and as this not measured! + * + * @{ + **/ + +struct JStorageTier; +typedef struct JStorageTier JStorageTier; + +/** + * get estimated storage tier bandwidth in bytes per second + * + * \param[in] this a storage tier + * + * \return mean of bandwidth in bytes per second + **/ +guint64 j_backend_storage_tier_get_bandwidth(const JStorageTier* this); + +/** get estimated storage tier latency in us + * + * \param[in] this a storage tier + * + * \return mean of latency in us + **/ +guint64 j_backend_storage_tier_get_latency(const JStorageTier* this); + +/** get estimated total capacity of storage tier in bytes + * + * \param[in] this a storage tier + * + * \return storage tier capacity in bytes + **/ +guint64 j_backend_storage_tier_get_capacity(const JStorageTier* this); + +/** + * @} + */ + +struct JManagedBackends; +typedef struct JManagedBackends JManagedBackends; + +/** + * Contains data about an ongoing access to a object. + **/ +struct JManagedBackendScope; +typedef struct JManagedBackendScope JManagedBackendScope; + +/** + * Possible Access types to log for JObjectBackendPolicy + * used to signal the usage of their counter part j_object_backend_ functions + * \sa j_object_backend_policy_process_access + **/ +typedef enum +{ + J_OBJECT_ACCESS_WRITE, + J_OBJECT_ACCESS_READ, + J_OBJECT_ACCESS_DELETE, + J_OBJECT_ACCESS_SYNC, + J_OBJECT_ACCESS_STATUS +} JObjectBackendAccessTypes; + +/** + * Policy to distribute objects between object backends in a JManagedBackends. + * + * process_create, process_access and process_message are used + * to signal the policy on different kinds of events. + * The implementation of this functions should be minimalistic + * to reduce latency for the access, because this functions must terminate + * before the access can continued. + * + * process is executed in a separate thread to provide resources necessary + * complex calculation and executing migrations. + **/ +typedef struct +{ + /** + * Policy initializer. + * + * \param[out] policy_data a pointer to store address of policy internal data + * \param[in] args a list of strings passed from the configuration file + * which may used to parametrise the policy. + * The list is like the strings NULL terminated + * \param[in] backends access to the object backend instances. + **/ + gboolean (*init)(gpointer* policy_data, gchar const* const* args, JManagedBackends* backends); + + /** + * \param[in] policy_data data at address assigned in init + * \attention do not forget to free your data if needed + **/ + gboolean (*fini)(gpointer policy_data); + + /** + * Function used to signal object access to the policy. + * + * \attention This function is called in the same thread as the access is + * executed, therefore this function should be + * handled as fast as possible to reduce access latency. + * + * \param[inout] policy_data pointer usage depends on policy + * \param[in] namespace of the object accessed + * \param[in] path of the object accessed + * \param[in] obj_id continuous index which is for the live time of a object unique + * \param[in] tier identifier for the storage tier + * the object is currently stored on + * \param[in] access type of access performed \sa JObjectBackendAccessType + * \param[in] data corresponding to the access type. + * For more details see \ref JObjectBackendAccessDetails + **/ + gboolean (*process_access)(gpointer policy_data, const gchar* namespace, const gchar* path, guint obj_id, guint tier, JObjectBackendAccessTypes access, gconstpointer data); + + /** + * Signals a new object creation to the policy, sets instal storage tier. + * + * \attention Ensure a short processing period because it blocks the write process + * + * \param[inout] policy_data pointer usage depends on policy + * \param[in] namespace of the object which will be created + * \param[in] path of the object which will be created + * \param[in] obj_id \ref process_access + * \param[out] storage_tier ID of storage tier to create object on + * + * \sa j_backend_stack_get_tiers + **/ + gboolean (*process_create)(gpointer policy_data, const gchar* namespace, const gchar* path, guint obj_id, guint* storage_tier); + + /** + * handle messages send to policy. + * + * \todo add reply functionality + * + * \param[inout] policy_data pointer usage depends on policy + * \param[in] type message type encoded as 0-terminated string + * \param[inout] data optional message data + * \param[in] size of data field + **/ + gboolean (*process_message)(gpointer policy_data, const gchar* type, gpointer data, guint length); + + /** + * Function use to pass a dedicated thread for migrations + * and calculations to policy. + **/ + gboolean (*process)(gpointer policy_data); + + gpointer data; ///< reference to data allocated in init +} JObjectBackendPolicy; + +JObjectBackendPolicy* backend_policy_info(void); + +/** + * \defgroup JObjectBackendAccessDetails + * + * Structs used to pass additional data on a + * JObjectBackendPolicy.process_access call + * + * @{ + * + **/ + +/** + * Additional data for a read and write access + **/ +struct JObjectBackendRWAccess +{ + guint64 offset; ///< access offset + guint64 length; ///< length of data to be written/read +}; + +/** + * @} + **/ + +/** + * \param[in] config with information as about backend performances and policy to use + * \param[in] object_backends a list instantiated object backends to use, in order as configured in config + * \param[out] instance_ptr address of initialized JManagedBackends + **/ +gboolean j_backend_managed_init(JConfiguration* config, JList* object_backends, JManagedBackends** instance_ptr); + +gboolean j_backend_managed_fini(JManagedBackends* this); + +/** + * Hands current thread to policy as processing resource + * \sa JObjectBackendPolicy + * + * \param[in] this JManagedBackends instance + * \param[out] keep_running signals if the policy processing function should be called again after return + * \todo more useful if different calling pattern get defined + **/ +gboolean j_backend_managed_policy_process(JManagedBackends* this, gboolean* keep_running); + +/** + * Fetches backend where a given object is stored. Also blocks migration for that object. + * Use j_backend_managed_object_close() to allow migration again. + * If the object not already exists use j_backend_managed_object_create() to create it + * + * \param[in] this JManagedBackends instance + * \param[in] namespace of the object + * \param[in] path of the object + * \param[out] backend where the object is stored + * \param[out] scope object to track object access, e.g. needed to unlock the object later. + **/ +gboolean j_backend_managed_object_open(JManagedBackends* this, const gchar* namespace, const gchar* path, JBackend** backend, JManagedBackendScope** scope); + +/** + * Creates a object placed on a backend defined by the current policy. + * + * \param[in] this JManagedBackends instance + * \param[in] namespace of the object + * \param[in] path of the object + * \param[out] backend where the object should be created + * \param[out] scope \ref j_backend_managed_object_open() + **/ +gboolean j_backend_managed_object_create(JManagedBackends* this, const gchar* namespace, const gchar* path, JBackend** backend, JManagedBackendScope** scope); + +/** + * Removes access flag from object. Needed to re-enable migration, after blocking with j_backend_managed_object_open() or j_backend_object_open(). + **/ +gboolean j_backend_managed_object_close(JManagedBackendScope* scope); + +/** + * Sends a message to the policy. + * \todo add reply channel + * + * \param[in] this JManagedBackends instance + * \param[in] type message type encoded as 0-terminated string + * \param[inout] data optional message data + * \param[in] size of data in bytes + **/ +gboolean j_backend_managed_policy_message(JManagedBackends* this, const gchar* type, gpointer data, guint length); + +/** + * Get storage tier characteristics, the position in the array correspond to the tierID. + * \attention \ref JStorageTier + * + * \param[in] this JManagedBackends instance + * \param[inout] tiers array of storage tiers, set to NULL if you only interested in the length. + * \param[out] length number of storages tiers and length of tiers, if not NULL + **/ +gboolean j_backend_managed_get_tiers(JManagedBackends* this, const JStorageTier* const** tiers, guint* length); + +/** + * Fetch tier the object is currently stored on. + * + * \param[in] this JManagedBackends instance + * \param[in] namespace of the object + * \param[in] path of the object + * + * \return tier id of tier the object is stored on + * \retval -1 on error + **/ +guint j_backend_managed_get_tier(JManagedBackends* this, const gchar* namespace, const gchar* path); + +/** + * Migrates an object from one tier to another. + * + * If the destination tier is equal the current tier, nothing happens. + * + * The call blocks until all resources for the migration ar available and the migration is finished. + * For returning immediately if the resources are busy use j_backend_stack_migrate_object_if_free() + * + * \param[in] this JManagedBackends instance + * \param[in] namespace of the object to migrate + * \param[in] path of the object to migrate + * \param[in] dest tier id of migration destination + * + * \todo add migration command working with object ids? + * \todo add non blocking version + schedule queued accesses + **/ +gboolean j_backend_managed_object_migrate(JManagedBackends* this, const gchar* namespace, const gchar* path, guint dest); + +/** + * If resources are free migrates the specified object to the destination tier, if not return. + * + * \retval FALSE on error or if resources are busy + **/ +gboolean j_backend_managed_object_migrate_if_free(JManagedBackends* this, const gchar* namespace, const gchar* path, guint dest); + +/** + * Stops all migrations at all backends, used for maintenance. + * use j_backend_managed_unlock() to re-enable migration + * + * \attention changing object locations or editing them might crash the policy! + * + * \param[in] this JManagedBackends instance + * \param[inout] address to backends array of backends managed, set to NULL if not needed + * \param[out] length + * \todo remove for simpler code? + **/ +gboolean j_backend_managed_lock(JManagedBackends* this, JBackend*** backends, guint* length); + +/** + * returns backend management to JManagedBackends + **/ +gboolean j_backend_managed_unlock(JManagedBackends* this); + +/** + * Iterate through all existing object paths in a namespace. + * + * \param[in] this JManagedBackends instance + * \param[in] namespace of objects to iterate + * \param[out] iterator used with j_backend_stack_iterate() to access object paths + * + * \sa j_backend_stack_iterate(), j_backend_managed_get_by_prefix() + **/ +gboolean j_backend_managed_get_all(JManagedBackends* this, gchar const* namespace, gpointer* iterator); + +/** + * Iterate all object paths in a namespace with a given prefix + * + * \param[in] this JManagedBackends instance + * \param[in] namespace of objects to iterate + * \param[in] prefix of objects to iterate + * \param[out] iterator used with j_backend_stack_iterate() to access object paths + * + * \sa j_backend_managed_get_all(), _backend_managed_iterate() + **/ +gboolean j_backend_managed_get_by_prefix(JManagedBackends* this, gchar const* namespace, gchar const* prefix, gpointer* iterator); + +/** + * Advance iterator to get the next path. + * + * \attention Objects added or removed while iterating may invalidates the iterator. This depends on the used kv-store-backend for the JManagedBackends + * + * \param[in] this JManagedBackends instance + * \param[inout] iterator + * \param[out] path of next object + * + * \sa j_backend_managed_get_all(), j_backend_managed_get_by_prefix() + **/ +gboolean j_backend_managed_iterate(JManagedBackends* this, gpointer iterator, gchar const** path); + +/** + * @} + **/ + +G_END_DECLS + +#endif diff --git a/lib/core/jconfiguration.c b/lib/core/jconfiguration.c index 3f3faf56a..a6710edb2 100644 --- a/lib/core/jconfiguration.c +++ b/lib/core/jconfiguration.c @@ -140,6 +140,14 @@ struct JConfiguration gchar* path; } db; + struct + { + gchar* policy; + gchar const* const* args; + gchar* kv_backend; + gchar* kv_path; + gchar const* log_file; + } object_hsm_policy; guint64 max_operation_size; guint64 max_inject_size; guint16 port; @@ -285,6 +293,10 @@ j_configuration_new_for_data(GKeyFile* key_file) gchar* db_component; gchar* db_path; g_autofree gchar* key_file_str = NULL; + gchar* object_policy_kv_backend; + gchar* object_policy_kv_path; + gchar* object_policy; + gchar** object_policy_args; guint64 max_operation_size; guint64 max_inject_size; guint32 port; @@ -310,6 +322,10 @@ j_configuration_new_for_data(GKeyFile* key_file) db_backend = g_key_file_get_string(key_file, "db", "backend", NULL); db_component = g_key_file_get_string(key_file, "db", "component", NULL); db_path = g_key_file_get_string(key_file, "db", "path", NULL); + object_policy_kv_backend = g_key_file_get_string(key_file, "object.hsm-policy", "kv_backend", NULL); + object_policy_kv_path = g_key_file_get_string(key_file, "object.hsm-policy", "kv_path", NULL); + object_policy = g_key_file_get_string(key_file, "object.hsm-policy", "policy", NULL); + object_policy_args = g_key_file_get_string_list(key_file, "object.hsm-policy", "args", NULL, NULL); /// \todo check value ranges (max_operation_size, port, max_connections, stripe_size) // configuration->port < 0 || configuration->port > 65535 @@ -339,6 +355,10 @@ j_configuration_new_for_data(GKeyFile* key_file) g_strfreev(servers_object); g_strfreev(servers_kv); g_strfreev(servers_db); + g_free(object_policy_kv_backend); + g_free(object_policy_kv_path); + g_free(object_policy); + g_strfreev(object_policy_args); return NULL; } @@ -359,6 +379,12 @@ j_configuration_new_for_data(GKeyFile* key_file) configuration->db.backend = db_backend; configuration->db.component = db_component; configuration->db.path = db_path; + + configuration->object_hsm_policy.kv_backend = object_policy_kv_backend; + configuration->object_hsm_policy.kv_path = object_policy_kv_path; + configuration->object_hsm_policy.policy = object_policy; + configuration->object_hsm_policy.args = (const char* const*)object_policy_args; + configuration->max_operation_size = max_operation_size; configuration->port = port; configuration->max_inject_size = max_inject_size; @@ -439,6 +465,13 @@ j_configuration_unref(JConfiguration* configuration) g_free(configuration->checksum); + g_free(configuration->object_hsm_policy.kv_backend); + g_free(configuration->object_hsm_policy.kv_path); + g_free(configuration->object_hsm_policy.policy); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wcast-qual" + g_strfreev((gchar**)configuration->object_hsm_policy.args); +#pragma GCC diagnostic pop g_slice_free(JConfiguration, configuration); } } @@ -616,6 +649,45 @@ j_configuration_get_checksum(JConfiguration* configuration) return configuration->checksum; } +gchar const* +j_configuration_get_object_policy_kv_backend(JConfiguration* configuration) +{ + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(configuration != NULL, NULL); + + return configuration->object_hsm_policy.kv_backend; +} + +gchar const* +j_configuration_get_object_policy_kv_path(JConfiguration* configuration) +{ + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(configuration != NULL, NULL); + + return configuration->object_hsm_policy.kv_path; +} + +gchar const* +j_configuration_get_object_policy(JConfiguration* configuration) +{ + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(configuration != NULL, NULL); + + return configuration->object_hsm_policy.policy; +} + +gchar const* const* +j_configuration_get_object_policy_args(JConfiguration* configuration) +{ + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(configuration != NULL, NULL); + + return configuration->object_hsm_policy.args; +} /** * @} **/ diff --git a/lib/core/jmanagedbackends.c b/lib/core/jmanagedbackends.c new file mode 100644 index 000000000..69135540b --- /dev/null +++ b/lib/core/jmanagedbackends.c @@ -0,0 +1,772 @@ +/** + * \file JManagedBackends.c + * \ingroup backend + * + * \copyright + * JULEA - Flexible storage framework + * Copyright (C) 2022 Julian Benda + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + **/ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +/// an read write spin lock to avoid data access on migration +typedef struct +{ + gint read_access_count; ///< number of read access + guint generation; ///< generation count, incremented after each write_access + gint write_access; ///< true if a write operation is scheduled or running +} RWSpinLock; + +/// acquired a read lock +/** \public \memberof RWSpinLock + * \return data gegenartion when clock is acquired + * \sa read_unlock */ +guint read_lock(RWSpinLock* this); + +/// unlock a read lock +/** \public \memberof \RWSpinLock + * \sa read_lock */ +void read_unlock(RWSpinLock* this); + +/// acquire a write lock +/** \public \memberof RWSpinLock + * this will increase the generation count + * \return generation before + * \sa write_unlock*/ +guint write_lock(RWSpinLock* this); + +/// unlock a write lock +/* \public \memberof \RWSpinLock + * \sa write_lock */ +void write_unlock(RWSpinLock* this); + +#define MAX_LOG_LENGTH 8192 +struct JManagedBackends +{ + struct JBackendWrapper** object_backend; + JStorageTier** object_tier_data; + guint32 object_backend_length; + + JSemantics* kv_semantics; + JBackend* kv_store; + GModule* kv_module; + + JObjectBackendPolicy* policy; + GModule* module; + + GArray* rw_spin_locks; + guint array_length; /// used as atomic to ensure unique ids for each object + + RWSpinLock global_lock; +}; + +struct JManagedBackendScope +{ + void* mem; + JManagedBackends* stack; + guint lock_id; + guint tier; /// \TODO include in StorageTier -> less redundance + const gchar* namespace; + const gchar* path; + /// unsigned client_id; \TODO reenable? +}; + +struct JBackendWrapper +{ + struct JBackend backend; + struct JBackend* orig; + struct JStorageTier* tier_data; + struct JManagedBackendScope scope; +}; + +struct KVEntry +{ + guint generation; + guint lock_id; + guint backend_id; +}; + +/// puts KVEntry in kv store. +/** \private \memberof JManagedBackends */ +gboolean kv_put(JManagedBackends* this, const gchar* namespace, const gchar* key, + struct KVEntry* entry); + +/// get KVEntry in kv store. +/** \private \memberof JManagedBackends */ +gboolean kv_get(JManagedBackends* this, const gchar* namespace, const gchar* key, + struct KVEntry** entry); + +/// rm KV entry in kv store. +/** \private \memberof JManagedBackends */ +gboolean kv_rm(JManagedBackends* this, const gchar* namespace, const gchar* key); + +#define OPERATION_DEBUG(obj_id, tier, LETTER) \ + g_debug("ACCESS time(%lu) type(%c) obj_id(%u) tier(%u)", g_get_monotonic_time(), LETTER, obj_id, tier) + +#define ACCESS(TYPE, DATA, LETTER) \ + OPERATION_DEBUG(this->scope.lock_id, this->scope.tier, LETTER); \ + this->scope.stack->policy->process_access( \ + this->scope.stack->policy->data, \ + this->scope.namespace, \ + this->scope.path, \ + this->scope.lock_id, \ + this->scope.tier, \ + J_OBJECT_ACCESS_##TYPE, \ + DATA) + +static gboolean +backend_status(gpointer this_raw, gpointer object, gint64* modification_time, guint64* size) +{ + J_TRACE_FUNCTION(NULL); + struct JBackendWrapper* this = (struct JBackendWrapper*)this_raw; + gboolean ret; + ACCESS(STATUS, NULL, 'o'); + + ret = this->orig->object.backend_status( + this->orig, + object, + modification_time, + size); + + return ret; +} +static gboolean +backend_sync(gpointer this_raw, gpointer object) +{ + J_TRACE_FUNCTION(NULL); + struct JBackendWrapper* this = (struct JBackendWrapper*)this_raw; + gboolean ret; + + ACCESS(SYNC, NULL, 'o'); + + ret = this->orig->object.backend_sync(this->orig, object); + + return ret; +} + +static gboolean +backend_read(gpointer this_raw, gpointer object, gpointer buffer, guint64 length, guint64 offset, guint64* bytes_read) +{ + J_TRACE_FUNCTION(NULL); + struct JBackendWrapper* this = (struct JBackendWrapper*)this_raw; + gboolean ret; + struct JObjectBackendRWAccess data = { + .length = length, + .offset = offset, + }; + + ACCESS(READ, &data, 'r'); + + ret = this->orig->object.backend_read(this->orig, object, buffer, length, offset, bytes_read); + + return ret; +} + +static gboolean +backend_write(gpointer this_raw, gpointer object, gconstpointer buffer, guint64 length, guint64 offset, guint64* bytes_written) +{ + J_TRACE_FUNCTION(NULL); + struct JBackendWrapper* this = (struct JBackendWrapper*)this_raw; + gboolean ret; + struct JObjectBackendRWAccess data = { + .length = length, + .offset = offset + }; + + ACCESS(WRITE, &data, 'w'); + + ret = this->orig->object.backend_write(this->orig, object, buffer, length, offset, bytes_written); + + return ret; +} + +static gboolean +backend_delete(gpointer this_raw, gpointer object) +{ + J_TRACE_FUNCTION(NULL); + struct JBackendWrapper* this = (struct JBackendWrapper*)this_raw; + gboolean ret; + + ACCESS(DELETE, NULL, 'd'); + + /// \todo reuse lock + kv_rm(this->scope.stack, this->scope.namespace, this->scope.path); + ret = this->orig->object.backend_delete(this->orig, object); + + return ret; +} + +guint +read_lock(RWSpinLock* this) +{ + while (true) + { + while (this->write_access) + ; + g_atomic_int_inc(&this->read_access_count); + if (this->write_access) + { + g_atomic_int_dec_and_test(&this->read_access_count); + } + else + { + return this->generation; + } + } +} + +void +read_unlock(RWSpinLock* this) +{ + g_atomic_int_dec_and_test(&this->read_access_count); +} + +guint +write_lock(RWSpinLock* this) +{ + while (!g_atomic_int_compare_and_exchange(&this->write_access, 0, 1)) + ; + while (this->read_access_count) + ; + return this->generation++; +} + +void +write_unlock(RWSpinLock* this) +{ + g_atomic_int_set(&this->write_access, 0); +} + +/// proxy function +gchar const* const* j_configuration_get_object_tiers(JConfiguration* config); +gchar const* const* +j_configuration_get_object_tiers(JConfiguration* config) +{ + (void)config; + return NULL; +} + +gboolean +j_backend_managed_init(JConfiguration* config, JList* object_backends, JManagedBackends** instance_ptr) +{ + JListIterator* itr; + gchar const* const* tier_itr; + JManagedBackends* this; + struct JBackendWrapper** b_itr; + struct JStorageTier** t_itr; + const gchar* policy_name = j_configuration_get_object_policy(config); + char const* const* policy_args = j_configuration_get_object_policy_args(config); + JObjectBackendPolicy* (*module_backend_policy_info)(void) = NULL; + JObjectBackendPolicy* tmp_policy; + *instance_ptr = malloc(sizeof(JManagedBackends)); + this = *instance_ptr; + + // init spin lock array + this->rw_spin_locks = g_array_sized_new(false, false, sizeof(RWSpinLock), 100); + this->array_length = 0; + + // setup backends + this->object_backend_length = j_list_length(object_backends); + this->object_backend = malloc(sizeof(struct JBackendWrapper*) * this->object_backend_length); + this->object_tier_data = malloc(sizeof(JStorageTier*) * this->object_backend_length); + itr = j_list_iterator_new(object_backends); + tier_itr = j_configuration_get_object_tiers(config); + b_itr = this->object_backend; + t_itr = this->object_tier_data; + while (j_list_iterator_next(itr)) + { + *b_itr = malloc(sizeof(struct JBackendWrapper)); + memcpy(*b_itr, j_list_iterator_get(itr), sizeof(JBackend)); + (*b_itr)->backend.object.backend_write = backend_write; + (*b_itr)->backend.object.backend_read = backend_read; + (*b_itr)->backend.object.backend_status = backend_status; + (*b_itr)->backend.object.backend_sync = backend_sync; + (*b_itr)->backend.object.backend_delete = backend_delete; + (*b_itr)->orig = j_list_iterator_get(itr); + (*b_itr)->scope = (struct JManagedBackendScope){ 0 }; + (*b_itr)->tier_data = malloc(sizeof(JStorageTier)); + if (tier_itr && *tier_itr) + memcpy((*b_itr)->tier_data, tier_itr++, sizeof(JStorageTier)); + else + memset((*b_itr)->tier_data, 0, sizeof(JStorageTier)); + + *t_itr = (*b_itr)->tier_data; + ++t_itr; + ++b_itr; + } + g_assert_true(tier_itr == FALSE || *tier_itr == FALSE); + j_list_iterator_free(itr); + + // load policy + this->module = NULL; + { + const gchar* module_name = g_strdup_printf("policy-object-%s", policy_name); + gchar* path = g_module_build_path(JULEA_BACKEND_PATH, module_name); + this->module = g_module_open(path, G_MODULE_BIND_LOCAL); + if (this->module == NULL) + { + g_warning("Could not load policy module: %s.", path); + goto end; + } + g_free(path); + } + g_module_symbol(this->module, "backend_policy_info", (gpointer*)&module_backend_policy_info); + if (module_backend_policy_info == NULL) + { + g_warning("unable to find entry point in backend policy module!"); + goto end; + } + tmp_policy = module_backend_policy_info(); + if (tmp_policy == NULL) + { + g_warning("failed to get policy info!"); + goto end; + } + if (tmp_policy->process_create == NULL + || tmp_policy->process_message == NULL + || tmp_policy->process_access == NULL + || tmp_policy->init == NULL + || tmp_policy->process == NULL) + { + g_warning("Policy don't provide all functions necessary"); + goto end; + } + this->policy = malloc(sizeof(JObjectBackendPolicy)); + memcpy(this->policy, tmp_policy, sizeof(JObjectBackendPolicy)); + + /// \todo use helper list + this->policy->init(&this->policy->data, policy_args, this); + + // setup kv + EXE(j_backend_load_server(j_configuration_get_object_policy_kv_backend(config), + "server", + J_BACKEND_TYPE_KV, + &this->kv_module, + &this->kv_store), + end, + "failed to create kv for backend manager!"); + EXE(this->kv_store->kv.backend_init(j_configuration_get_object_policy_kv_path(config), + &this->kv_store->data), + end, + "failed to init kv for backend manager!"); + this->kv_semantics = j_semantics_new(J_SEMANTICS_TEMPLATE_DEFAULT); + + return TRUE; +end: + return FALSE; +} + +gboolean +j_backend_managed_fini(JManagedBackends* this) +{ + this->policy->fini(this->policy->data); + free(this->policy); + free(this->object_backend); + g_array_unref(this->rw_spin_locks); + g_module_close(this->module); + j_backend_kv_fini(this->kv_store); + free(this); + + return TRUE; +} + +/// \TODO optimize for only one backend +/// \TODO remove scope return <- just complete virtual +gboolean +j_backend_managed_object_open(JManagedBackends* this, + const gchar* namespace, + const gchar* path, + JBackend** backend, + JManagedBackendScope** scope) +{ + struct KVEntry* entry; + struct JBackendWrapper* wrapper; + gboolean ret = FALSE; + guint generation; + + if (!kv_get(this, namespace, path, &entry)) + { + goto end; + } + + generation = read_lock(&g_array_index(this->rw_spin_locks, RWSpinLock, entry->lock_id)); + if (generation != entry->generation) + { + EXE(kv_get(this, namespace, path, &entry), end, "failed to fetch kvEntry for open"); + } + + *backend = g_memdup2(this->object_backend[entry->backend_id], sizeof(struct JBackendWrapper)); + wrapper = (void*)*backend; + wrapper->scope = (JManagedBackendScope){ + .mem = wrapper, + .lock_id = entry->lock_id, + .tier = entry->backend_id, + .stack = this, + .namespace = namespace, + .path = path, + }; + *scope = &wrapper->scope; + ret = TRUE; +end: + return ret; +} + +gboolean +j_backend_managed_object_create(JManagedBackends* this, + const gchar* namespace, + const gchar* path, + JBackend** backend, + JManagedBackendScope** scope) +{ + gboolean ret = FALSE; + struct JBackendWrapper* wrapper; + struct KVEntry entry; + guint lock_id; + guint tier; + RWSpinLock newLock; + + // allocate new spinlock with read lock + newLock = (RWSpinLock){ .generation = 0, .read_access_count = 1, .write_access = 0 }; + g_array_append_val(this->rw_spin_locks, newLock); + lock_id = g_atomic_int_add(&this->array_length, 1); + + // ask policy for tier + EXE(this->policy->process_create(this->policy->data, namespace, path, lock_id, &tier), + end, + "failed to match storage tier for new object!"); + *backend = g_memdup2(this->object_backend[tier], sizeof(struct JBackendWrapper)); + wrapper = (void*)*backend; + + entry = (struct KVEntry){ + .lock_id = lock_id, + .backend_id = tier, + .generation = 0 + }; + + EXE(kv_put(this, namespace, path, &entry), + end, + "failed to store new object in kv"); + + wrapper->scope = (JManagedBackendScope){ + .mem = wrapper, + .lock_id = entry.lock_id, + .stack = this, + .tier = entry.backend_id, + .namespace = namespace, + .path = path + }; + *scope = &wrapper->scope; + + OPERATION_DEBUG(wrapper->scope.lock_id, wrapper->scope.tier, 'c'); + + ret = TRUE; +end: + return ret; +} + +gboolean +j_backend_managed_object_close(JManagedBackendScope* this) +{ + read_unlock(&g_array_index(this->stack->rw_spin_locks, RWSpinLock, this->lock_id)); + free(this->mem); + return TRUE; +} + +gboolean +j_backend_managed_policy_message(JManagedBackends* this, + const gchar* type, gpointer data, guint length) +{ + if (this->policy->process_message) + { + return this->policy->process_message(this->policy->data, type, data, length); + } + return TRUE; +} + +gboolean +j_backend_managed_get_tiers(JManagedBackends* this, JStorageTier const* const** tiers, guint* length) +{ + if (tiers) + { + *tiers = (JStorageTier const* const*)this->object_tier_data; + } + *length = this->object_backend_length; + return TRUE; +} + +guint +j_backend_managed_get_tier(JManagedBackends* this, + const gchar* namespace, const gchar* path) +{ + struct KVEntry* entry; + EXE(kv_get(this, namespace, path, &entry), + end, + "failed to fetch kventry"); + return entry->backend_id; +end: + return -1; +} + +gboolean +kv_put(JManagedBackends* this, const gchar* namespace, const gchar* key, + struct KVEntry* entry) +{ + gpointer batch; + EXE(j_backend_kv_batch_start(this->kv_store, namespace, this->kv_semantics, &batch), + end, + "failed do start kv batch"); + EXE(j_backend_kv_put(this->kv_store, batch, key, entry, sizeof(struct KVEntry)), + end, + "failed batch put value command"); + EXE(j_backend_kv_batch_execute(this->kv_store, batch), + end, + "failed to execute put value batch"); + return TRUE; +end: + return FALSE; +} + +gboolean +kv_get(JManagedBackends* this, const gchar* namespace, const gchar* key, + struct KVEntry** entry) +{ + gpointer batch; + guint32 len; + gboolean ret; + EXE(j_backend_kv_batch_start(this->kv_store, namespace, this->kv_semantics, &batch), + end, + "failed do start kv batch"); + ret = j_backend_kv_get(this->kv_store, batch, key, (gpointer*)entry, &len); + EXE(j_backend_kv_batch_execute(this->kv_store, batch), + end, + "failed to execute get value batch"); +#ifndef NDEBUG + if (len != sizeof(struct KVEntry)) + { + ret = FALSE; + } +#endif + return ret; +end: + return FALSE; +} + +gboolean +kv_rm(JManagedBackends* this, const gchar* namespace, const gchar* key) +{ + gpointer batch; + gboolean ret = FALSE; + + EXE(j_backend_kv_batch_start(this->kv_store, namespace, this->kv_semantics, &batch), + end, + "failed to start kv backend"); + ret = j_backend_kv_delete(this->kv_store, batch, key); + EXE(j_backend_kv_batch_execute(this->kv_store, batch), + end, + "failed to execute delete kv value"); +end: + return ret; +} + +gboolean +j_backend_managed_lock(JManagedBackends* this, + JBackend*** backends, + guint* length) +{ + write_lock(&this->global_lock); + if (backends != NULL) + { + *backends = (void*)this->object_backend; + *length = this->object_backend_length; + } + return TRUE; +} + +/// returns management to stack +gboolean +j_backend_managed_unlock(JManagedBackends* this) +{ + (void)this; + write_unlock(&this->global_lock); + return TRUE; +} + +gboolean +j_backend_managed_get_all(JManagedBackends* this, gchar const* namespace, gpointer* iterator) +{ + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(this != NULL, FALSE); + + return j_backend_kv_get_all(this->kv_store, namespace, iterator); +} + +gboolean +j_backend_managed_get_by_prefix(JManagedBackends* this, gchar const* namespace, gchar const* prefix, gpointer* iterator) +{ + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(this != NULL, FALSE); + + return j_backend_kv_get_by_prefix(this->kv_store, namespace, prefix, iterator); +} + +gboolean +j_backend_managed_iterate(JManagedBackends* this, gpointer iterator, gchar const** name) +{ + guint32 length; + gconstpointer data; + + J_TRACE_FUNCTION(NULL); + + g_return_val_if_fail(this != NULL, FALSE); + + return j_backend_kv_iterate(this->kv_store, iterator, name, &data, &length); +} + +guint64 +j_backend_storage_tier_get_bandwidth(const JStorageTier* this) +{ + return this->bandwidth; +} + +guint64 +j_backend_storage_tier_get_latency(const JStorageTier* this) +{ + return this->latency; +} + +guint64 +j_backend_storage_tier_get_capacity(const JStorageTier* this) +{ + return this->capacity; +} + +gboolean +j_backend_managed_object_migrate(JManagedBackends* this, + const gchar* namespace, + const gchar* path, + guint dest) +{ + struct KVEntry* entry; + struct KVEntry new_entry; + gpointer object_from, object_to; + gboolean ret = FALSE; + gboolean lock = FALSE; + JBackend* from; + JBackend* to = this->object_backend[dest]->orig; + gpointer data = NULL; + guint generation; + + read_lock(&this->global_lock); + EXE(kv_get(this, namespace, path, &entry), end, "Unable to migrate, because entry not found!"); + if (entry->backend_id == dest) + { + ret = TRUE; + goto end; + } + generation = write_lock(&g_array_index(this->rw_spin_locks, RWSpinLock, entry->lock_id)); + lock = TRUE; + if (generation != entry->generation) + { + EXE(kv_get(this, namespace, path, &entry), end, "failed open kvEntry again for migration"); + } + from = this->object_backend[entry->backend_id]->orig; + OPERATION_DEBUG(entry->lock_id, dest, 'm'); + + { + const guint64 max_chunk_size = 1024 * 1024 * 64; ///< 64MB max chunk size for transfer \todo make configurable + guint64 size; + gint64 mod_time; + guint64 offset = 0; + if (!j_backend_object_open(from, namespace, path, &object_from)) + { + /// \todo remove data (object was deleted!) + ret = TRUE; + goto end; + } + EXE(j_backend_object_status(from, object_from, &mod_time, &size), + end, + "Failed to get object size"); + data = malloc(MIN(size, max_chunk_size)); + + EXE(j_backend_object_create(to, namespace, path, &object_to), + end, + "Failed to create new object"); + while (size > 0) + { + guint64 written, transfer = MIN(size, max_chunk_size); + EXE(j_backend_object_read(from, object_from, data, transfer, offset, &written) && transfer == written, + end, + "Failed to read object for transmission"); + EXE(j_backend_object_write(to, object_to, data, transfer, offset, &written) && written == transfer, + end, + "Failed to write migrated object"); + size -= written; + offset += written; + } + + EXE(j_backend_object_delete(from, object_from), + end, + "Failed to delete original object"); + EXE(j_backend_object_close(to, object_to), + end, + "Failed to close new object"); + } + + new_entry = *entry; + ++new_entry.generation; + new_entry.backend_id = dest; + EXE(kv_put(this, namespace, path, &new_entry), end, "failed to store new migrated entry"); + ret = TRUE; +end: + + if (data != NULL) + free(data); + if (lock) + { + write_unlock(&g_array_index(this->rw_spin_locks, RWSpinLock, entry->lock_id)); + } + read_unlock(&this->global_lock); + return ret; +} + +gboolean +j_backend_managed_policy_process(JManagedBackends* this, gboolean* keep_running) +{ + gboolean ret = TRUE; + g_message("start process"); + do + { + ret &= this->policy->process(this->policy->data); + } while (ret && *keep_running); + return ret; +} diff --git a/lib/core/jnetwork.c b/lib/core/jnetwork.c index eb72951e9..1d0fffa6d 100644 --- a/lib/core/jnetwork.c +++ b/lib/core/jnetwork.c @@ -36,8 +36,8 @@ #include #include -#include #include +#include /** * \addtogroup JNetwork Network @@ -192,37 +192,6 @@ enum JNetworkConnectionEvents typedef enum JNetworkConnectionEvents JNetworkConnectionEvents; -#define EXE(cmd, ...) \ - do \ - { \ - if ((cmd) == FALSE) \ - { \ - g_warning(__VA_ARGS__); \ - goto end; \ - } \ - } while (FALSE) - -#define CHECK(msg) \ - do \ - { \ - if (res < 0) \ - { \ - g_warning("%s: " msg "\t(%s:%d)\nDetails:\t%s", "??TODO??", __FILE__, __LINE__, fi_strerror(-res)); \ - goto end; \ - } \ - } while (FALSE) - -#define G_CHECK(msg) \ - do \ - { \ - if (error != NULL) \ - { \ - g_warning(msg "\n\tWith:%s", error->message); \ - g_error_free(error); \ - goto end; \ - } \ - } while (FALSE) - static void free_dangling_infos(struct fi_info* info) { @@ -272,7 +241,7 @@ j_network_connection_sread_event(JNetworkConnection* connection, gint timeout, J do { res = fi_eq_readerr(connection->eq, &error, 0); - CHECK("Failed to read error!"); + CHECK(res, end, "Failed to read error!"); g_warning("event queue contains following error (%s|c:%p):\n\t%s", fi_strerror(FI_EAVAIL), error.context, fi_eq_strerror(connection->eq, error.prov_errno, error.err_data, NULL, 0)); } while (res > 0); @@ -280,7 +249,7 @@ j_network_connection_sread_event(JNetworkConnection* connection, gint timeout, J goto end; } - CHECK("Failed to read event of connection!"); + CHECK(res, end, "Failed to read event of connection!"); switch (fi_event) { @@ -395,7 +364,7 @@ j_network_fabric_sread_event(JNetworkFabric* fabric, gint timeout, JNetworkFabri *event = J_FABRIC_EVENT_ERROR; res = fi_eq_readerr(fabric->pep_eq, &error, 0); - CHECK("Failed to read error!"); + CHECK(res, end, "Failed to read error!"); g_warning("event queue contains following error (%s|c:%p):\n\t%s", fi_strerror(FI_EAVAIL), error.context, fi_eq_strerror(fabric->pep_eq, error.prov_errno, error.err_data, NULL, 0)); @@ -403,7 +372,7 @@ j_network_fabric_sread_event(JNetworkFabric* fabric, gint timeout, JNetworkFabri goto end; } - CHECK("failed to read pep event queue!"); + CHECK(res, end, "failed to read pep event queue!"); switch (fi_event) { @@ -450,40 +419,40 @@ j_network_fabric_init_server(JConfiguration* configuration) hints->fabric_attr->prov_name = NULL; res = fi_getinfo(FI_VERSION(1, 11), NULL, NULL, 0, hints, &fabric->info); - CHECK("Failed to find fabric for server!"); + CHECK(res, end, "Failed to find fabric for server!"); free_dangling_infos(fabric->info); // no context needed, because we are in sync res = fi_fabric(fabric->info->fabric_attr, &fabric->fabric, NULL); - CHECK("Failed to open server fabric!"); + CHECK(res, end, "Failed to open server fabric!"); // wait obj defined to use synced actions res = fi_eq_open(fabric->fabric, &(struct fi_eq_attr){ .wait_obj = FI_WAIT_UNSPEC }, &fabric->pep_eq, NULL); - CHECK("failed to create eq for fabric!"); + CHECK(res, end, "failed to create eq for fabric!"); res = fi_passive_ep(fabric->fabric, fabric->info, &fabric->pep, NULL); - CHECK("failed to create pep for fabric!"); + CHECK(res, end, "failed to create pep for fabric!"); res = fi_pep_bind(fabric->pep, &fabric->pep_eq->fid, 0); - CHECK("failed to bind event queue to pep!"); + CHECK(res, end, "failed to bind event queue to pep!"); // initialize addr field! res = fi_getname(&fabric->pep->fid, NULL, &addrlen); if (res != -FI_ETOOSMALL) { - CHECK("failed to fetch address len from libfabirc!"); + CHECK(res, end, "failed to fetch address len from libfabirc!"); } fabric->fabric_addr_network.addr_len = addrlen; fabric->fabric_addr_network.addr = g_malloc(fabric->fabric_addr_network.addr_len); res = fi_getname(&fabric->pep->fid, fabric->fabric_addr_network.addr, &addrlen); - CHECK("failed to fetch address from libfabric!"); + CHECK(res, end, "failed to fetch address from libfabric!"); res = fi_listen(fabric->pep); - CHECK("failed to start listening on pep!"); + CHECK(res, end, "failed to start listening on pep!"); fabric->fabric_addr_network.addr_len = htonl(fabric->fabric_addr_network.addr_len); fabric->fabric_addr_network.addr_format = htonl(fabric->info->addr_format); @@ -538,12 +507,12 @@ j_network_fabric_init_client(JConfiguration* configuration, JNetworkFabricAddr* fabric->hints->dest_addrlen = addr->addr_len; res = fi_getinfo(FI_VERSION(1, 11), NULL, NULL, 0, fabric->hints, &fabric->info); - CHECK("Failed to find fabric!"); + CHECK(res, end, "Failed to find fabric!"); free_dangling_infos(fabric->info); res = fi_fabric(fabric->info->fabric_attr, &fabric->fabric, NULL); - CHECK("failed to initelize client fabric!"); + CHECK(res, end, "failed to initelize client fabric!"); return fabric; @@ -574,16 +543,16 @@ j_network_fabric_fini(JNetworkFabric* fabric) if (fabric->con_side == JF_SERVER) { res = fi_close(&fabric->pep->fid); - CHECK("Failed to close PEP!"); + CHECK(res, end, "Failed to close PEP!"); fabric->pep = NULL; res = fi_close(&fabric->pep_eq->fid); - CHECK("Failed to close EQ for PEP!"); + CHECK(res, end, "Failed to close EQ for PEP!"); fabric->pep_eq = NULL; } res = fi_close(&fabric->fabric->fid); - CHECK("failed to close fabric!"); + CHECK(res, end, "failed to close fabric!"); fabric->fabric = NULL; if (fabric->hints) @@ -647,7 +616,7 @@ j_network_connection_create_memory_resources(JNetworkConnection* connection) connection->memory.tx_prefix_size = tx_prefix * prefix_size; res = fi_mr_reg(connection->domain, connection->memory.buffer, connection->memory.buffer_size, FI_SEND | FI_RECV, 0, 0, 0, &connection->memory.mr, NULL); - CHECK("Failed to register memory for msg communication!"); + CHECK(res, end, "Failed to register memory for msg communication!"); } else { @@ -687,33 +656,33 @@ j_network_connection_init(JNetworkConnection* connection) connection->next_key = KEY_MIN; res = fi_eq_open(connection->fabric->fabric, &(struct fi_eq_attr){ .wait_obj = FI_WAIT_UNSPEC }, &connection->eq, NULL); - CHECK("Failed to open event queue for connection!"); + CHECK(res, end, "Failed to open event queue for connection!"); res = fi_domain(connection->fabric->fabric, connection->info, &connection->domain, NULL); - CHECK("Failed to open connection domain!"); + CHECK(res, end, "Failed to open connection domain!"); connection->inject_size = connection->fabric->info->tx_attr->inject_size; - EXE(j_network_connection_create_memory_resources(connection), "Failed to create memory resources for connection!"); + EXE(j_network_connection_create_memory_resources(connection), end, "Failed to create memory resources for connection!"); res = fi_cq_open(connection->domain, &(struct fi_cq_attr){ .wait_obj = FI_WAIT_UNSPEC, .format = FI_CQ_FORMAT_CONTEXT, .size = connection->info->tx_attr->size }, &connection->cq.tx, &connection->cq.tx); res = fi_cq_open(connection->domain, &(struct fi_cq_attr){ .wait_obj = FI_WAIT_UNSPEC, .format = FI_CQ_FORMAT_CONTEXT, .size = connection->info->rx_attr->size }, &connection->cq.rx, &connection->cq.rx); - CHECK("Failed to create completion queue!"); + CHECK(res, end, "Failed to create completion queue!"); res = fi_endpoint(connection->domain, connection->info, &connection->ep, NULL); - CHECK("Failed to open endpoint for connection!"); + CHECK(res, end, "Failed to open endpoint for connection!"); res = fi_ep_bind(connection->ep, &connection->eq->fid, 0); - CHECK("Failed to bind event queue to endpoint!"); + CHECK(res, end, "Failed to bind event queue to endpoint!"); res = fi_ep_bind(connection->ep, &connection->cq.tx->fid, FI_TRANSMIT); - CHECK("Failed to bind tx completion queue to endpoint!"); + CHECK(res, end, "Failed to bind tx completion queue to endpoint!"); res = fi_ep_bind(connection->ep, &connection->cq.rx->fid, FI_RECV); - CHECK("Failed to bind rx completion queue to endpoint!"); + CHECK(res, end, "Failed to bind rx completion queue to endpoint!"); res = fi_enable(connection->ep); - CHECK("Failed to enable connection!"); + CHECK(res, end, "Failed to enable connection!"); connection->closed = FALSE; @@ -745,7 +714,7 @@ j_network_connection_init_client(JConfiguration* configuration, JBackendType bac socket_client = g_socket_client_new(); server = j_configuration_get_server(configuration, backend, index); socket_connection = g_socket_client_connect_to_host(socket_client, server, j_configuration_get_port(configuration), NULL, &error); - G_CHECK("Failed to build gsocket connection to host"); + CHECK_GERROR(error, end, "Failed to build gsocket connection to host"); if (socket_connection == NULL) { @@ -758,22 +727,22 @@ j_network_connection_init_client(JConfiguration* configuration, JBackendType bac input_stream = g_io_stream_get_input_stream(G_IO_STREAM(socket_connection)); g_input_stream_read(input_stream, &jf_addr.addr_format, sizeof(jf_addr.addr_format), NULL, &error); - G_CHECK("Failed to read addr format from socket_connection!"); + CHECK_GERROR(error, end, "Failed to read addr format from socket_connection!"); jf_addr.addr_format = ntohl(jf_addr.addr_format); g_input_stream_read(input_stream, &jf_addr.addr_len, sizeof(jf_addr.addr_len), NULL, &error); - G_CHECK("Failed to read addr len from socket_connection!"); + CHECK_GERROR(error, end, "Failed to read addr len from socket_connection!"); jf_addr.addr_len = ntohl(jf_addr.addr_len); jf_addr.addr = g_malloc(jf_addr.addr_len); g_input_stream_read(input_stream, jf_addr.addr, jf_addr.addr_len, NULL, &error); - G_CHECK("Failed to read addr from socket_connection!"); + CHECK_GERROR(error, end, "Failed to read addr from socket_connection!"); g_input_stream_close(input_stream, NULL, &error); - G_CHECK("Failed to close input stream!"); + CHECK_GERROR(error, end, "Failed to close input stream!"); g_io_stream_close(G_IO_STREAM(socket_connection), NULL, &error); - G_CHECK("Failed to close gsocket!"); + CHECK_GERROR(error, end, "Failed to close gsocket!"); connection->fabric = j_network_fabric_init_client(configuration, &jf_addr); @@ -785,14 +754,14 @@ j_network_connection_init_client(JConfiguration* configuration, JBackendType bac connection->info = connection->fabric->info; - EXE(j_network_connection_init(connection), "Failed to initelze connection!"); + EXE(j_network_connection_init(connection), end, "Failed to initelze connection!"); res = fi_connect(connection->ep, jf_addr.addr, NULL, 0); - CHECK("Failed to fire connection request!"); + CHECK(res, end, "Failed to fire connection request!"); do { - EXE(j_network_connection_sread_event(connection, 1, &event), "Failed to read event queue, waiting for CONNECTED signal!"); + EXE(j_network_connection_sread_event(connection, 1, &event), end, "Failed to read event queue, waiting for CONNECTED signal!"); } while (event == J_CONNECTION_EVENT_TIMEOUT); if (event != J_CONNECTION_EVENT_CONNECTED) @@ -829,20 +798,20 @@ j_network_connection_init_server(JNetworkFabric* fabric, GSocketConnection* gcon // send addr output_stream = g_io_stream_get_output_stream(G_IO_STREAM(gconnection)); g_output_stream_write(output_stream, &addr->addr_format, sizeof(addr->addr_format), NULL, &error); - G_CHECK("Failed to write addr_format to stream!"); + CHECK_GERROR(error, end, "Failed to write addr_format to stream!"); g_output_stream_write(output_stream, &addr->addr_len, sizeof(addr->addr_len), NULL, &error); - G_CHECK("Failed to write addr_len to stream!"); + CHECK_GERROR(error, end, "Failed to write addr_len to stream!"); g_output_stream_write(output_stream, addr->addr, ntohl(addr->addr_len), NULL, &error); - G_CHECK("Failed to write addr to stream!"); + CHECK_GERROR(error, end, "Failed to write addr to stream!"); g_output_stream_close(output_stream, NULL, &error); - G_CHECK("Failed to close output stream!"); + CHECK_GERROR(error, end, "Failed to close output stream!"); do { - EXE(j_network_fabric_sread_event(fabric, 2, &event, &request), "Failed to wait for connection request"); + EXE(j_network_fabric_sread_event(fabric, 2, &event, &request), end, "Failed to wait for connection request"); } while (event == J_FABRIC_EVENT_TIMEOUT); if (event != J_FABRIC_EVENT_CONNECTION_REQUEST) @@ -854,12 +823,12 @@ j_network_connection_init_server(JNetworkFabric* fabric, GSocketConnection* gcon connection->fabric = fabric; connection->info = request.info; - EXE(j_network_connection_init(connection), "Failed to initelize connection server side!"); + EXE(j_network_connection_init(connection), end, "Failed to initelize connection server side!"); res = fi_accept(connection->ep, NULL, 0); - CHECK("Failed to accept connection!"); + CHECK(res, end, "Failed to accept connection!"); - EXE(j_network_connection_sread_event(connection, 2, &con_event), "Failed to verify connection!"); + EXE(j_network_connection_sread_event(connection, 2, &con_event), end, "Failed to verify connection!"); if (con_event != J_CONNECTION_EVENT_CONNECTED) { @@ -894,7 +863,7 @@ j_network_connection_send(JNetworkConnection* connection, gpointer data, gsize d res = fi_inject(connection->ep, data, data_len, 0); } while (res == -FI_EAGAIN); - CHECK("Failed to inject data!"); + CHECK(res, end, "Failed to inject data!"); ret = TRUE; goto end; @@ -914,7 +883,7 @@ j_network_connection_send(JNetworkConnection* connection, gpointer data, gsize d res = fi_send(connection->ep, segment, size, fi_mr_desc(connection->memory.mr), 0, context); } while (res == -FI_EAGAIN); - CHECK("Failed to initelize sending!"); + CHECK(res, end, "Failed to initelize sending!"); connection->memory.used += size; @@ -958,7 +927,7 @@ j_network_connection_recv(JNetworkConnection* connection, gsize data_len, gpoint size = data_len + connection->memory.rx_prefix_size; res = fi_recv(connection->ep, segment, size, connection->memory.active ? fi_mr_desc(connection->memory.mr) : NULL, 0, segment); - CHECK("Failed to initelized receiving!"); + CHECK(res, end, "Failed to initelized receiving!"); if (connection->memory.active) { @@ -1037,7 +1006,7 @@ j_network_connection_wait_for_completion(JNetworkConnection* connection) } res = fi_cq_readerr(rx ? connection->cq.rx : connection->cq.tx, &err_entry, 0); - CHECK("Failed to read error of cq!"); + CHECK(res, end, "Failed to read error of cq!"); g_warning("Failed to read completion queue\nWidth:\t%s", fi_cq_strerror(rx ? connection->cq.rx : connection->cq.tx, err_entry.prov_errno, err_entry.err_data, NULL, 0)); @@ -1045,7 +1014,7 @@ j_network_connection_wait_for_completion(JNetworkConnection* connection) } else { - CHECK("Failed to read completion queue!"); + CHECK(res, end, "Failed to read completion queue!"); } for (i = 0; i <= connection->running_actions.msg_len; i++) @@ -1054,7 +1023,7 @@ j_network_connection_wait_for_completion(JNetworkConnection* connection) { // If there is no match -> it's an rma transafer -> context = memory region res = fi_close(&((struct fid_mr*)entry.op_context)->fid); - CHECK("Failed to free receiving memory!"); + CHECK(res, end, "Failed to free receiving memory!"); connection->running_actions.rma_len--; } @@ -1093,7 +1062,7 @@ j_network_connection_rma_register(JNetworkConnection* connection, gconstpointer gint res; res = fi_mr_reg(connection->domain, data, data_len, FI_REMOTE_READ, 0, connection->next_key, 0, &handle->memory_region, NULL); - CHECK("Failed to register memory region!"); + CHECK(res, end, "Failed to register memory region!"); handle->addr = (connection->info->domain_attr->mr_mode & FI_MR_VIRT_ADDR) ? (guint64)data : 0; handle->size = data_len; @@ -1115,7 +1084,7 @@ j_network_connection_rma_unregister(JNetworkConnection* connection, JNetworkConn connection->next_key = KEY_MIN; res = fi_close(&handle->memory_region->fid); - CHECK("Failed to unregistrer rma memory!"); + CHECK(res, end, "Failed to unregistrer rma memory!"); return TRUE; @@ -1149,7 +1118,7 @@ j_network_connection_rma_read(JNetworkConnection* connection, const JNetworkConn static unsigned key = 0; res = fi_mr_reg(connection->domain, data, memoryID->size, FI_READ, 0, ++key, 0, &mr, 0); - CHECK("Failed to register receiving memory!"); + CHECK(res, end, "Failed to register receiving memory!"); do { @@ -1162,7 +1131,7 @@ j_network_connection_rma_read(JNetworkConnection* connection, const JNetworkConn } } while (res == -FI_EAGAIN); - CHECK("Failed to initiate reading"); + CHECK(res, end, "Failed to initiate reading"); connection->running_actions.rma_len++; @@ -1187,29 +1156,29 @@ j_network_connection_fini(JNetworkConnection* connection) gint res; res = fi_shutdown(connection->ep, 0); - CHECK("failed to send shutdown signal"); + CHECK(res, end, "failed to send shutdown signal"); if (connection->memory.active) { res = fi_close(&connection->memory.mr->fid); - CHECK("failed to free memory region!"); + CHECK(res, end, "failed to free memory region!"); g_free(connection->memory.buffer); } res = fi_close(&connection->ep->fid); - CHECK("failed to close endpoint!"); + CHECK(res, end, "failed to close endpoint!"); res = fi_close(&connection->cq.tx->fid); - CHECK("failed to close tx cq!"); + CHECK(res, end, "failed to close tx cq!"); res = fi_close(&connection->cq.rx->fid); - CHECK("failed to close rx cq!"); + CHECK(res, end, "failed to close rx cq!"); res = fi_close(&connection->eq->fid); - CHECK("failed to close event queue!"); + CHECK(res, end, "failed to close event queue!"); res = fi_close(&connection->domain->fid); - CHECK("failed to close domain!"); + CHECK(res, end, "failed to close domain!"); if (connection->fabric->con_side == JF_CLIENT) { diff --git a/meson.build b/meson.build index db2a56675..3d3462a38 100644 --- a/meson.build +++ b/meson.build @@ -366,6 +366,7 @@ julea_srcs = files([ 'lib/core/distribution/single-server.c', 'lib/core/distribution/weighted.c', 'lib/core/jbackend.c', + 'lib/core/jmanagedbackends.c', 'lib/core/jbackend-operation.c', 'lib/core/jbackground-operation.c', 'lib/core/jbatch.c', @@ -758,6 +759,7 @@ endif julea_client_hdrs = { 'core': files([ 'include/core/jbackend.h', + 'include/core/jmanagedbackends.h', 'include/core/jbackend-operation.h', 'include/core/jbackground-operation.h', 'include/core/jbatch.h', diff --git a/test/core/configuration.c b/test/core/configuration.c index ff3c90635..7eab9981d 100644 --- a/test/core/configuration.c +++ b/test/core/configuration.c @@ -61,6 +61,9 @@ test_configuration_new_for_data(void) g_key_file_set_string(key_file, "db", "backend", "null"); g_key_file_set_string(key_file, "db", "component", "server"); g_key_file_set_string(key_file, "db", "path", ""); + g_key_file_set_string(key_file, "object.hsm-policy", "kv_backend", "null"); + g_key_file_set_string(key_file, "object.hsm-policy", "kv_path", "null"); + g_key_file_set_string(key_file, "object.hsm-policy", "policy", "null"); configuration = j_configuration_new_for_data(key_file); g_assert_true(configuration != NULL); @@ -93,6 +96,9 @@ test_configuration_get(void) g_key_file_set_string(key_file, "db", "backend", "null3"); g_key_file_set_string(key_file, "db", "component", "client"); g_key_file_set_string(key_file, "db", "path", "NULL3"); + g_key_file_set_string(key_file, "object.hsm-policy", "kv_backend", "NULL4"); + g_key_file_set_string(key_file, "object.hsm-policy", "kv_path", "NULL5"); + g_key_file_set_string(key_file, "object.hsm-policy", "policy", "NULL6"); configuration = j_configuration_new_for_data(key_file); g_assert_true(configuration != NULL); @@ -120,6 +126,10 @@ test_configuration_get(void) g_assert_cmpstr(j_configuration_get_backend_component(configuration, J_BACKEND_TYPE_DB), ==, "client"); g_assert_cmpstr(j_configuration_get_backend_path(configuration, J_BACKEND_TYPE_DB), ==, "NULL3"); + g_assert_cmpstr(j_configuration_get_object_policy_kv_backend(configuration), ==, "NULL4"); + g_assert_cmpstr(j_configuration_get_object_policy_kv_path(configuration), ==, "NULL5"); + g_assert_cmpstr(j_configuration_get_object_policy(configuration), ==, "NULL6"); + j_configuration_unref(configuration); g_key_file_free(key_file); diff --git a/test/core/distribution.c b/test/core/distribution.c index f6a5a85e8..4ff67256e 100644 --- a/test/core/distribution.c +++ b/test/core/distribution.c @@ -45,6 +45,9 @@ test_distribution_fixture_setup(JConfiguration** configuration, gconstpointer da g_key_file_set_string(key_file, "db", "backend", "null"); g_key_file_set_string(key_file, "db", "component", "server"); g_key_file_set_string(key_file, "db", "path", ""); + g_key_file_set_string(key_file, "object.hsm-policy", "kv_backend", "lmdb"); + g_key_file_set_string(key_file, "object.hsm-policy", "kv_path", ""); + g_key_file_set_string(key_file, "object.hsm-policy", "policy", "dummy"); *configuration = j_configuration_new_for_data(key_file); diff --git a/tools/config.c b/tools/config.c index 3ec1e6c7c..fd79e487d 100644 --- a/tools/config.c +++ b/tools/config.c @@ -46,6 +46,10 @@ static gint64 opt_max_inject_size = 0; static gint opt_port = 0; static gint opt_max_connections = 0; static gint64 opt_stripe_size = 0; +static gchar const* opt_object_policy_kv_backend = NULL; +static gchar const* opt_object_policy_kv_path = NULL; +static gchar const* opt_object_policy = NULL; +static gchar const* opt_object_policy_args = NULL; static gchar** string_split(gchar const* string) @@ -100,10 +104,15 @@ write_config(gchar* path) g_auto(GStrv) servers_object = NULL; g_auto(GStrv) servers_kv = NULL; g_auto(GStrv) servers_db = NULL; + g_auto(GStrv) object_policy_args = NULL; servers_object = string_split(opt_servers_object); servers_kv = string_split(opt_servers_kv); servers_db = string_split(opt_servers_db); + if (opt_object_policy_args) + { + object_policy_args = string_split(opt_object_policy_args); + } key_file = g_key_file_new(); g_key_file_set_int64(key_file, "core", "max-operation-size", opt_max_operation_size); @@ -123,6 +132,22 @@ write_config(gchar* path) g_key_file_set_string(key_file, "db", "backend", opt_db_backend); g_key_file_set_string(key_file, "db", "component", opt_db_component); g_key_file_set_string(key_file, "db", "path", opt_db_path); + if (opt_object_policy_kv_backend) + { + g_key_file_set_string(key_file, "object.hsm-policy", "kv_backend", opt_object_policy_kv_backend); + } + if (opt_object_policy_kv_path) + { + g_key_file_set_string(key_file, "object.hsm-policy", "kv_path", opt_object_policy_kv_path); + } + if (opt_object_policy) + { + g_key_file_set_string(key_file, "object.hsm-policy", "policy", opt_object_policy); + } + if (opt_object_policy_args) + { + g_key_file_set_string_list(key_file, "object.hsm-policy", "args", (gchar const* const*)object_policy_args, g_strv_length(object_policy_args)); + } key_file_data = g_key_file_to_data(key_file, &key_file_data_len, NULL); if (path != NULL) @@ -173,6 +198,10 @@ main(gint argc, gchar** argv) { "port", 0, 0, G_OPTION_ARG_INT, &opt_port, "Default network port", "0" }, { "max-connections", 0, 0, G_OPTION_ARG_INT, &opt_max_connections, "Maximum number of connections", "0" }, { "stripe-size", 0, 0, G_OPTION_ARG_INT64, &opt_stripe_size, "Default stripe size", "0" }, + { "object-policy-kv-backend", 0, 0, G_OPTION_ARG_STRING, &opt_object_policy_kv_backend, "Key-value backend to use managed object backends.", "leveldb" }, + { "object-policy-kv_path", 0, 0, G_OPTION_ARG_STRING, &opt_object_policy_kv_path, "Key-value path to use", "/path/to/storage" }, + { "object-policy", 0, 0, G_OPTION_ARG_STRING, &opt_object_policy, "Policy for managed object backends", "dummy" }, + { "object-policy-args", 0, 0, G_OPTION_ARG_STRING, &opt_object_policy_args, "Arguments passed to policy for initialisation", "arg1;arg2;arg3;" }, { NULL, 0, 0, 0, NULL, NULL, NULL } };