36 #include <sys/types.h> 37 #include <sys/socket.h> 39 #include <sys/ioctl.h> 40 #include <netinet/in.h> 48 #include <arpa/inet.h> 55 #include <qb/qbipc_common.h> 63 #define MESSAGE_REQ_SYNC_BARRIER 0 64 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 80 const unsigned int *trans_list,
81 size_t trans_list_entries,
82 const unsigned int *member_list,
83 size_t member_list_entries,
98 struct qb_ipc_request_header header __attribute__((aligned(8)));
105 struct qb_ipc_request_header header __attribute__((aligned(8)));
106 struct memb_ring_id ring_id __attribute__((aligned(8)));
113 static int my_processing_idx = 0;
123 static size_t my_member_list_entries = 0;
125 static size_t my_trans_list_entries = 0;
127 static int my_processor_list_entries = 0;
129 static struct service_entry my_service_list[SERVICES_COUNT_MAX];
131 static int my_service_list_entries = 0;
133 static void (*sync_synchronization_completed) (void);
135 static void sync_deliver_fn (
138 unsigned int msg_len,
139 int endian_conversion_required);
141 static int schedwrk_processor (
const void *context);
143 static void sync_process_enter (
void);
145 static void sync_process_call_init (
void);
152 static void *sync_group_handle;
159 int (*sync_callbacks_retrieve) (
162 void (*synchronization_completed) (
void))
172 "Couldn't initialize groups interface.");
185 sync_synchronization_completed = synchronization_completed;
191 static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
195 int barrier_reached = 1;
197 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
203 for (i = 0; i < my_processor_list_entries; i++) {
204 if (my_processor_list[i].nodeid == nodeid) {
208 for (i = 0; i < my_processor_list_entries; i++) {
209 if (my_processor_list[i].received == 0) {
213 if (barrier_reached) {
215 my_service_list[my_processing_idx].
name);
222 my_processing_idx += 1;
223 if (my_service_list_entries == my_processing_idx) {
224 sync_synchronization_completed ();
226 sync_process_enter ();
231 static void dummy_sync_abort (
void)
235 static int dummy_sync_process (
void)
240 static void dummy_sync_activate (
void)
244 static int service_entry_compare (
const void *a,
const void *b)
252 static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
256 int barrier_reached = 1;
258 int qsort_trigger = 0;
260 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
265 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
268 for (j = 0; j < my_service_list_entries; j++) {
269 if (req_exec_service_build_message->service_list[i] ==
276 my_service_list[my_service_list_entries].
state =
PROCESS;
277 my_service_list[my_service_list_entries].
service_id =
278 req_exec_service_build_message->service_list[i];
279 sprintf (my_service_list[my_service_list_entries].
name,
280 "Unknown External Service (id = %d)\n",
281 req_exec_service_build_message->service_list[i]);
282 my_service_list[my_service_list_entries].
sync_init =
284 my_service_list[my_service_list_entries].
sync_abort =
290 my_service_list_entries += 1;
296 qsort (my_service_list, my_service_list_entries,
299 for (i = 0; i < my_processor_list_entries; i++) {
300 if (my_processor_list[i].nodeid == nodeid) {
304 for (i = 0; i < my_processor_list_entries; i++) {
305 if (my_processor_list[i].received == 0) {
309 if (barrier_reached) {
311 sync_process_enter ();
315 static void sync_deliver_fn (
318 unsigned int msg_len,
319 int endian_conversion_required)
321 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
323 switch (header->id) {
325 sync_barrier_handler (nodeid, msg);
328 sync_service_build_handler (nodeid, msg);
333 static void barrier_message_transmit (
void)
338 memset(&req_exec_barrier_message, 0,
sizeof(req_exec_barrier_message));
340 req_exec_barrier_message.header.size =
sizeof (
struct req_exec_barrier_message);
343 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
346 iovec.iov_base = (
char *)&req_exec_barrier_message;
347 iovec.iov_len =
sizeof (req_exec_barrier_message);
360 memcpy (&service_build_message->ring_id, &my_ring_id,
363 iovec.iov_base = (
void *)service_build_message;
370 static void sync_barrier_enter (
void)
373 barrier_message_transmit ();
376 static void sync_process_call_init (
void)
379 size_t old_trans_list_entries = 0;
383 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
384 sizeof (
unsigned int));
385 old_trans_list_entries = my_trans_list_entries;
387 my_trans_list_entries = 0;
388 for (o = 0; o < old_trans_list_entries; o++) {
389 for (m = 0; m < my_member_list_entries; m++) {
390 if (old_trans_list[o] == my_member_list[m]) {
391 my_trans_list[my_trans_list_entries] = my_member_list[m];
392 my_trans_list_entries++;
398 for (i = 0; i < my_service_list_entries; i++) {
400 my_service_list[i].
sync_init (my_trans_list,
401 my_trans_list_entries, my_member_list,
402 my_member_list_entries,
408 static void sync_process_enter (
void)
417 if (my_service_list_entries == 0) {
419 sync_synchronization_completed ();
422 for (i = 0; i < my_processor_list_entries; i++) {
431 static void sync_servicelist_build_enter (
432 const unsigned int *member_list,
433 size_t member_list_entries,
441 memset(&service_build, 0,
sizeof(service_build));
444 for (i = 0; i < member_list_entries; i++) {
445 my_processor_list[i].
nodeid = member_list[i];
448 my_processor_list_entries = member_list_entries;
450 memcpy (my_member_list, member_list,
451 member_list_entries *
sizeof (
unsigned int));
452 my_member_list_entries = member_list_entries;
454 my_processing_idx = 0;
456 memset(my_service_list, 0,
sizeof (
struct service_entry) * SERVICES_COUNT_MAX);
457 my_service_list_entries = 0;
467 my_service_list[my_service_list_entries].
state =
PROCESS;
468 my_service_list[my_service_list_entries].
service_id = i;
470 assert(strlen(sync_callbacks.
name) <
sizeof(my_service_list[my_service_list_entries].
name));
472 strcpy (my_service_list[my_service_list_entries].
name,
473 sync_callbacks.
name);
478 my_service_list_entries += 1;
481 for (i = 0; i < my_service_list_entries; i++) {
482 service_build.service_list[i] =
485 service_build.service_list_entries = my_service_list_entries;
487 service_build_message_transmit (&service_build);
490 sync_process_call_init ();
493 static int schedwrk_processor (
const void *context)
497 if (my_service_list[my_processing_idx].
state ==
PROCESS) {
499 res = my_service_list[my_processing_idx].
sync_process ();
504 sync_barrier_enter();
513 const unsigned int *member_list,
514 size_t member_list_entries,
518 memcpy (&my_ring_id, ring_id,
sizeof (
struct memb_ring_id));
520 sync_servicelist_build_enter (member_list, member_list_entries,
525 const unsigned int *member_list,
526 size_t member_list_entries,
530 memcpy (my_trans_list, member_list, member_list_entries *
531 sizeof (
unsigned int));
532 my_trans_list_entries = member_list_entries;
541 my_service_list[my_processing_idx].
sync_abort ();
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Totem Single Ring Protocol.
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
void(* sync_activate)(void)
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
#define log_printf(level, format, args...)
void(* sync_activate)(void)
void schedwrk_destroy(hdb_handle_t handle)
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
#define LOGSYS_LEVEL_ERROR
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
#define LOGSYS_LEVEL_DEBUG
struct totem_message_header header
enum sync_process_state state
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
#define PROCESSOR_COUNT_MAX
#define MESSAGE_REQ_SYNC_BARRIER
#define SERVICES_COUNT_MAX
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
LOGSYS_DECLARE_SUBSYS("SYNC")
struct memb_ring_id ring_id
int(* sync_process)(void)
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
int(* sync_process)(void)