/* (c) Copyright 2001-2009 The world wide DirectFB Open Source Community (directfb.org) (c) Copyright 2000-2004 Convergence (integrated media) GmbH All rights reserved. Written by Denis Oliver Kropp , Andreas Hundt , Sven Neumann , Ville Syrjälä and Claudio Ciccani . This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "fusion_internal.h" #include #include #if FUSION_BUILD_MULTI D_DEBUG_DOMAIN( Fusion_Main, "Fusion/Main", "Fusion - High level IPC" ); D_DEBUG_DOMAIN( Fusion_Main_Dispatch, "Fusion/Main/Dispatch", "Fusion - High level IPC Dispatch" ); /**********************************************************************************************************************/ static void *fusion_dispatch_loop ( DirectThread *thread, void *arg ); /**********************************************************************************************************************/ static void fusion_fork_handler_prepare( void ); static void fusion_fork_handler_parent( void ); static void fusion_fork_handler_child( void ); /**********************************************************************************************************************/ static FusionWorld *fusion_worlds[FUSION_MAX_WORLDS]; static pthread_mutex_t fusion_worlds_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_once_t fusion_init_once = PTHREAD_ONCE_INIT; /**********************************************************************************************************************/ int _fusion_fd( const FusionWorldShared *shared ) { int index; FusionWorld *world; // D_MAGIC_ASSERT( shared, FusionWorldShared ); index = shared->world_index; D_ASSERT( index >= 0 ); D_ASSERT( index < FUSION_MAX_WORLDS ); world = fusion_worlds[index]; D_MAGIC_ASSERT( world, FusionWorld ); return world->fusion_fd; } FusionID _fusion_id( const FusionWorldShared *shared ) { int index; FusionWorld *world; D_MAGIC_ASSERT( shared, FusionWorldShared ); index = shared->world_index; D_ASSERT( index >= 0 ); D_ASSERT( index < FUSION_MAX_WORLDS ); world = fusion_worlds[index]; D_MAGIC_ASSERT( world, FusionWorld ); return world->fusion_id; } FusionWorld * _fusion_world( const FusionWorldShared *shared ) { int index; FusionWorld *world; D_MAGIC_ASSERT( shared, FusionWorldShared ); index = shared->world_index; D_ASSERT( index >= 0 ); D_ASSERT( index < FUSION_MAX_WORLDS ); world = fusion_worlds[index]; D_MAGIC_ASSERT( world, FusionWorld ); return world; } /**********************************************************************************************************************/ static void init_once( void ) { struct utsname uts; int i, j, k, l; pthread_atfork( fusion_fork_handler_prepare, fusion_fork_handler_parent, fusion_fork_handler_child ); if (uname( &uts ) < 0) { D_PERROR( "Fusion/Init: uname() failed!\n" ); return; } #if !FUSION_BUILD_KERNEL D_INFO( "Fusion/Init: " "Builtin Implementation is still experimental! Crash/Deadlocks might occur!\n" ); #endif if (fusion_config->madv_remove_force) { if (fusion_config->madv_remove) D_INFO( "Fusion/SHM: Using MADV_REMOVE (forced)\n" ); else D_INFO( "Fusion/SHM: Not using MADV_REMOVE (forced)!\n" ); } else { switch (sscanf( uts.release, "%d.%d.%d.%d", &i, &j, &k, &l )) { case 3: l = 0; case 4: if (((i << 24) | (j << 16) | (k << 8) | l) >= 0x02061302) fusion_config->madv_remove = true; break; default: D_WARN( "could not parse kernel version '%s'", uts.release ); } if (fusion_config->madv_remove) D_INFO( "Fusion/SHM: Using MADV_REMOVE (%d.%d.%d.%d >= 2.6.19.2)\n", i, j, k, l ); else D_INFO( "Fusion/SHM: NOT using MADV_REMOVE (%d.%d.%d.%d < 2.6.19.2)! [0x%08x]\n", i, j, k, l, (i << 24) | (j << 16) | (k << 8) | l ); } } /**********************************************************************************************************************/ #if FUSION_BUILD_KERNEL static void fusion_world_fork( FusionWorld *world ) { int fd; char buf1[20]; char buf2[20]; FusionEnter enter; FusionFork fork; FusionWorldShared *shared; D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); snprintf( buf1, sizeof(buf1), "/dev/fusion%d", shared->world_index ); snprintf( buf2, sizeof(buf2), "/dev/fusion/%d", shared->world_index ); /* Open Fusion Kernel Device. */ fd = direct_try_open( buf1, buf2, O_RDWR | O_NONBLOCK, true ); if (fd < 0) { D_PERROR( "Fusion/Main: Reopening fusion device (world %d) failed!\n", shared->world_index ); raise(5); } /* Drop "identity" when running another program. */ if (fcntl( fd, F_SETFD, FD_CLOEXEC ) < 0) D_PERROR( "Fusion/Init: Setting FD_CLOEXEC flag failed!\n" ); /* Fill enter information. */ enter.api.major = FUSION_API_MAJOR_REQUIRED; enter.api.minor = FUSION_API_MINOR_REQUIRED; enter.fusion_id = 0; /* Clear for check below. */ /* Enter the fusion world. */ while (ioctl( fd, FUSION_ENTER, &enter )) { if (errno != EINTR) { D_PERROR( "Fusion/Init: Could not reenter world '%d'!\n", shared->world_index ); raise(5); } } /* Check for valid Fusion ID. */ if (!enter.fusion_id) { D_ERROR( "Fusion/Init: Got no ID from FUSION_ENTER! Kernel module might be too old.\n" ); raise(5); } D_DEBUG_AT( Fusion_Main, " -> Fusion ID 0x%08lx\n", enter.fusion_id ); /* Fill fork information. */ fork.fusion_id = world->fusion_id; /* Fork within the fusion world. */ while (ioctl( fd, FUSION_FORK, &fork )) { if (errno != EINTR) { D_PERROR( "Fusion/Main: Could not fork in world '%d'!\n", shared->world_index ); raise(5); } } D_DEBUG_AT( Fusion_Main, " -> Fusion ID 0x%08lx\n", fork.fusion_id ); /* Get new fusion id back. */ world->fusion_id = fork.fusion_id; /* Close old file descriptor. */ close( world->fusion_fd ); /* Write back new file descriptor. */ world->fusion_fd = fd; D_DEBUG_AT( Fusion_Main, " -> restarting dispatcher loop...\n" ); /* Restart the dispatcher thread. FIXME: free old struct */ world->dispatch_loop = direct_thread_create( DTT_MESSAGING, fusion_dispatch_loop, world, "Fusion Dispatch" ); if (!world->dispatch_loop) raise(5); } static void fusion_fork_handler_prepare( void ) { int i; D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ ); for (i=0; ifork_callback) world->fork_callback( world->fork_action, FFS_PREPARE ); } } static void fusion_fork_handler_parent( void ) { int i; D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ ); for (i=0; ishared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (world->fork_callback) world->fork_callback( world->fork_action, FFS_PARENT ); if (world->fork_action == FFA_FORK) { /* Increase the shared reference counter. */ if (fusion_master( world )) shared->refs++; } } } static void fusion_fork_handler_child( void ) { int i; D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ ); for (i=0; ifork_callback) world->fork_callback( world->fork_action, FFS_CHILD ); switch (world->fork_action) { default: D_BUG( "unknown fork action %d", world->fork_action ); case FFA_CLOSE: D_DEBUG_AT( Fusion_Main, " -> closing world %d\n", i ); /* Remove world from global list. */ fusion_worlds[i] = NULL; /* Unmap shared area. */ munmap( world->shared, sizeof(FusionWorldShared) ); /* Close Fusion Kernel Device. */ close( world->fusion_fd ); /* Free local world data. */ D_MAGIC_CLEAR( world ); D_FREE( world ); break; case FFA_FORK: D_DEBUG_AT( Fusion_Main, " -> forking in world %d\n", i ); fusion_world_fork( world ); break; } } } /**********************************************************************************************************************/ /* * Enters a fusion world by joining or creating it. * * If world is negative, the next free index is used to create a new world. * Otherwise the world with the specified index is joined or created. */ DirectResult fusion_enter( int world_index, int abi_version, FusionEnterRole role, FusionWorld **ret_world ) { DirectResult ret = DR_OK; int fd = -1; FusionWorld *world = NULL; FusionWorldShared *shared = NULL; FusionEnter enter; char buf1[20]; char buf2[20]; D_DEBUG_AT( Fusion_Main, "%s( %d, %d, %p )\n", __FUNCTION__, world_index, abi_version, ret_world ); D_ASSERT( ret_world != NULL ); if (world_index >= FUSION_MAX_WORLDS) { D_ERROR( "Fusion/Init: World index %d exceeds maximum index %d!\n", world_index, FUSION_MAX_WORLDS - 1 ); return DR_INVARG; } pthread_once( &fusion_init_once, init_once ); if (fusion_config->force_slave) role = FER_SLAVE; direct_initialize(); pthread_mutex_lock( &fusion_worlds_lock ); if (world_index < 0) { if (role == FER_SLAVE) { D_ERROR( "Fusion/Init: Slave role and a new world (index -1) was requested!\n" ); pthread_mutex_unlock( &fusion_worlds_lock ); return DR_INVARG; } for (world_index=0; world_indexrefs > 0 ); /* Check the role again. */ switch (role) { case FER_MASTER: if (world->fusion_id != FUSION_ID_MASTER) { D_ERROR( "Fusion/Init: Master role requested for a world (%d) " "we're already slave in!\n", world_index ); ret = DR_UNSUPPORTED; goto error; } break; case FER_SLAVE: if (world->fusion_id == FUSION_ID_MASTER) { D_ERROR( "Fusion/Init: Slave role requested for a world (%d) " "we're already master in!\n", world_index ); ret = DR_UNSUPPORTED; goto error; } break; case FER_ANY: break; } shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (shared->world_abi != abi_version) { D_ERROR( "Fusion/Init: World ABI (%d) of world '%d' doesn't match own (%d)!\n", shared->world_abi, world_index, abi_version ); ret = DR_VERSIONMISMATCH; goto error; } world->refs++; pthread_mutex_unlock( &fusion_worlds_lock ); D_DEBUG_AT( Fusion_Main, " -> using existing world %p [%d]\n", world, world_index ); /* Return the world. */ *ret_world = world; return DR_OK; } if (fd < 0) { D_PERROR( "Fusion/Init: Opening fusion device (world %d) as '%s' failed!\n", world_index, role == FER_ANY ? "any" : (role == FER_MASTER ? "master" : "slave") ); ret = DR_INIT; goto error; } /* Drop "identity" when running another program. */ if (fcntl( fd, F_SETFD, FD_CLOEXEC ) < 0) D_PERROR( "Fusion/Init: Setting FD_CLOEXEC flag failed!\n" ); /* Fill enter information. */ enter.api.major = FUSION_API_MAJOR_REQUIRED; enter.api.minor = FUSION_API_MINOR_REQUIRED; enter.fusion_id = 0; /* Clear for check below. */ /* Enter the fusion world. */ while (ioctl( fd, FUSION_ENTER, &enter )) { if (errno != EINTR) { D_PERROR( "Fusion/Init: Could not enter world '%d'!\n", world_index ); ret = DR_INIT; goto error; } } /* Check for valid Fusion ID. */ if (!enter.fusion_id) { D_ERROR( "Fusion/Init: Got no ID from FUSION_ENTER! Kernel module might be too old.\n" ); ret = DR_INIT; goto error; } D_DEBUG_AT( Fusion_Main, " -> Fusion ID 0x%08lx\n", enter.fusion_id ); /* Check slave role only, master is handled by O_EXCL earlier. */ if (role == FER_SLAVE && enter.fusion_id == FUSION_ID_MASTER) { D_PERROR( "Fusion/Init: Entering world '%d' as a slave failed!\n", world_index ); ret = DR_UNSUPPORTED; goto error; } /* Map shared area. */ shared = mmap( (void*) 0x20000000 + 0x2000 * world_index, sizeof(FusionWorldShared), PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0 ); if (shared == MAP_FAILED) { D_PERROR( "Fusion/Init: Mapping shared area failed!\n" ); goto error; } D_DEBUG_AT( Fusion_Main, " -> shared area at %p, size %zu\n", shared, sizeof(FusionWorldShared) ); /* Initialize shared data. */ if (enter.fusion_id == FUSION_ID_MASTER) { /* Initialize reference counter. */ shared->refs = 1; /* Set ABI version. */ shared->world_abi = abi_version; /* Set the world index. */ shared->world_index = world_index; /* Set start time of world clock. */ direct_monotonic_gettimeofday( &shared->start_time ); D_MAGIC_SET( shared, FusionWorldShared ); } else { D_MAGIC_ASSERT( shared, FusionWorldShared ); /* Check ABI version. */ if (shared->world_abi != abi_version) { D_ERROR( "Fusion/Init: World ABI (%d) doesn't match own (%d)!\n", shared->world_abi, abi_version ); ret = DR_VERSIONMISMATCH; goto error; } } /* Synchronize to world clock. */ direct_clock_set_start( &shared->start_time ); /* Allocate local data. */ world = D_CALLOC( 1, sizeof(FusionWorld) ); if (!world) { ret = D_OOM(); goto error; } /* Initialize local data. */ world->refs = 1; world->shared = shared; world->fusion_fd = fd; world->fusion_id = enter.fusion_id; D_MAGIC_SET( world, FusionWorld ); fusion_worlds[world_index] = world; /* Initialize shared memory part. */ ret = fusion_shm_init( world ); if (ret) goto error2; D_DEBUG_AT( Fusion_Main, " -> initializing other parts...\n" ); /* Initialize other parts. */ if (enter.fusion_id == FUSION_ID_MASTER) { fusion_skirmish_init( &shared->arenas_lock, "Fusion Arenas", world ); fusion_skirmish_init( &shared->reactor_globals, "Fusion Reactor Globals", world ); /* Create the main pool. */ ret = fusion_shm_pool_create( world, "Fusion Main Pool", 0x100000, fusion_config->debugshm, &shared->main_pool ); if (ret) goto error3; } D_DEBUG_AT( Fusion_Main, " -> starting dispatcher loop...\n" ); /* Start the dispatcher thread. */ world->dispatch_loop = direct_thread_create( DTT_MESSAGING, fusion_dispatch_loop, world, "Fusion Dispatch" ); if (!world->dispatch_loop) { ret = DR_FAILURE; goto error4; } /* Let others enter the world. */ if (enter.fusion_id == FUSION_ID_MASTER) { D_DEBUG_AT( Fusion_Main, " -> unblocking world...\n" ); while (ioctl( fd, FUSION_UNBLOCK )) { if (errno != EINTR) { D_PERROR( "Fusion/Init: Could not unblock world!\n" ); ret = DR_FUSION; goto error4; } } } D_DEBUG_AT( Fusion_Main, " -> done. (%p)\n", world ); pthread_mutex_unlock( &fusion_worlds_lock ); /* Return the fusion world. */ *ret_world = world; return DR_OK; error4: if (world->dispatch_loop) direct_thread_destroy( world->dispatch_loop ); if (enter.fusion_id == FUSION_ID_MASTER) fusion_shm_pool_destroy( world, shared->main_pool ); error3: if (enter.fusion_id == FUSION_ID_MASTER) { fusion_skirmish_destroy( &shared->arenas_lock ); fusion_skirmish_destroy( &shared->reactor_globals ); } fusion_shm_deinit( world ); error2: fusion_worlds[world_index] = world; D_MAGIC_CLEAR( world ); D_FREE( world ); error: if (shared && shared != MAP_FAILED) { if (enter.fusion_id == FUSION_ID_MASTER) D_MAGIC_CLEAR( shared ); munmap( shared, sizeof(FusionWorldShared) ); } if (fd != -1) close( fd ); pthread_mutex_unlock( &fusion_worlds_lock ); direct_shutdown(); return ret; } DirectResult fusion_stop_dispatcher( FusionWorld *world, bool emergency ) { if (!emergency) { fusion_sync( world ); direct_thread_lock( world->dispatch_loop ); } world->dispatch_stop = true; if (!emergency) { direct_thread_unlock( world->dispatch_loop ); fusion_sync( world ); } return DR_OK; } /* * Exits the fusion world. * * If 'emergency' is true the function won't join but kill the dispatcher thread. */ DirectResult fusion_exit( FusionWorld *world, bool emergency ) { FusionWorldShared *shared; D_DEBUG_AT( Fusion_Main, "%s( %p, %semergency )\n", __FUNCTION__, world, emergency ? "" : "no " ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); pthread_mutex_lock( &fusion_worlds_lock ); D_ASSERT( world->refs > 0 ); if (--world->refs) { pthread_mutex_unlock( &fusion_worlds_lock ); return DR_OK; } if (!emergency) { int foo; FusionSendMessage msg; /* Wake up the read loop thread. */ msg.fusion_id = world->fusion_id; msg.msg_id = 0; msg.msg_data = &foo; msg.msg_size = sizeof(foo); while (ioctl( world->fusion_fd, FUSION_SEND_MESSAGE, &msg ) < 0) { if (errno != EINTR) { D_PERROR( "FUSION_SEND_MESSAGE" ); direct_thread_cancel( world->dispatch_loop ); break; } } /* Wait for its termination. */ direct_thread_join( world->dispatch_loop ); } direct_thread_destroy( world->dispatch_loop ); /* Master has to deinitialize shared data. */ if (fusion_master( world )) { shared->refs--; if (shared->refs == 0) { fusion_skirmish_destroy( &shared->reactor_globals ); fusion_skirmish_destroy( &shared->arenas_lock ); fusion_shm_pool_destroy( world, shared->main_pool ); /* Deinitialize shared memory. */ fusion_shm_deinit( world ); } } else { /* Leave shared memory. */ fusion_shm_deinit( world ); } /* Reset local dispatch nodes. */ _fusion_reactor_free_all( world ); /* Remove world from global list. */ fusion_worlds[shared->world_index] = NULL; /* Unmap shared area. */ if (fusion_master( world ) && shared->refs == 0) D_MAGIC_CLEAR( shared ); munmap( shared, sizeof(FusionWorldShared) ); /* Close Fusion Kernel Device. */ close( world->fusion_fd ); /* Free local world data. */ D_MAGIC_CLEAR( world ); D_FREE( world ); pthread_mutex_unlock( &fusion_worlds_lock ); direct_shutdown(); return DR_OK; } /* * Sends a signal to one or more fusionees and optionally waits * for their processes to terminate. * * A fusion_id of zero means all fusionees but the calling one. * A timeout of zero means infinite waiting while a negative value * means no waiting at all. */ DirectResult fusion_kill( FusionWorld *world, FusionID fusion_id, int signal, int timeout_ms ) { FusionKill param; D_MAGIC_ASSERT( world, FusionWorld ); param.fusion_id = fusion_id; param.signal = signal; param.timeout_ms = timeout_ms; while (ioctl( world->fusion_fd, FUSION_KILL, ¶m )) { switch (errno) { case EINTR: continue; case ETIMEDOUT: return DR_TIMEOUT; default: break; } D_PERROR ("FUSION_KILL"); return DR_FAILURE; } return DR_OK; } /**********************************************************************************************************************/ static void * fusion_dispatch_loop( DirectThread *thread, void *arg ) { int len = 0; int result; char buf[FUSION_MESSAGE_SIZE]; fd_set set; FusionWorld *world = arg; D_DEBUG_AT( Fusion_Main_Dispatch, "%s() running...\n", __FUNCTION__ ); while (true) { char *buf_p = buf; D_MAGIC_ASSERT( world, FusionWorld ); FD_ZERO( &set ); FD_SET( world->fusion_fd, &set ); result = select( world->fusion_fd + 1, &set, NULL, NULL, NULL ); if (result < 0) { switch (errno) { case EINTR: continue; default: D_PERROR( "Fusion/Dispatcher: select() failed!\n" ); return NULL; } } D_MAGIC_ASSERT( world, FusionWorld ); if (FD_ISSET( world->fusion_fd, &set )) { len = read( world->fusion_fd, buf, FUSION_MESSAGE_SIZE ); if (len < 0) break; D_DEBUG_AT( Fusion_Main_Dispatch, " -> got %d bytes...\n", len ); direct_thread_lock( world->dispatch_loop ); if (world->dispatch_stop) { D_DEBUG_AT( Fusion_Main_Dispatch, " -> IGNORING (dispatch_stop!)\n" ); } else { while (buf_p < buf + len) { FusionReadMessage *header = (FusionReadMessage*) buf_p; void *data = buf_p + sizeof(FusionReadMessage); if (world->dispatch_stop) { D_DEBUG_AT( Fusion_Main_Dispatch, " -> ABORTING (dispatch_stop!)\n" ); break; } D_MAGIC_ASSERT( world, FusionWorld ); D_ASSERT( (buf + len - buf_p) >= sizeof(FusionReadMessage) ); switch (header->msg_type) { case FMT_SEND: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_SEND!\n" ); break; case FMT_CALL: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_CALL...\n" ); _fusion_call_process( world, header->msg_id, data ); break; case FMT_REACTOR: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_REACTOR...\n" ); _fusion_reactor_process_message( world, header->msg_id, header->msg_channel, data ); break; case FMT_SHMPOOL: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_SHMPOOL...\n" ); _fusion_shmpool_process( world, header->msg_id, data ); break; default: D_DEBUG( "Fusion/Receiver: discarding message of unknown type '%d'\n", header->msg_type ); break; } buf_p = data + ((header->msg_size + 3) & ~3); } } direct_thread_unlock( world->dispatch_loop ); if (!world->refs) { D_DEBUG_AT( Fusion_Main_Dispatch, " -> good bye!\n" ); return NULL; } } } D_PERROR( "Fusion/Receiver: reading from fusion device failed!\n" ); return NULL; } /**********************************************************************************************************************/ #else /* FUSION_BUILD_KERNEL */ #include #include typedef struct { DirectLink link; FusionRef *ref; int count; } __FusioneeRef; typedef struct { DirectLink link; FusionID id; pid_t pid; DirectLink *refs; } __Fusionee; /**********************************************************************************************************************/ static DirectResult _fusion_add_fusionee( FusionWorld *world, FusionID fusion_id ) { DirectResult ret; FusionWorldShared *shared; __Fusionee *fusionee; D_DEBUG_AT( Fusion_Main, "%s( %p, %lu )\n", __FUNCTION__, world, fusion_id ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); fusionee = SHCALLOC( shared->main_pool, 1, sizeof(__Fusionee) ); if (!fusionee) return D_OOSHM(); fusionee->id = fusion_id; fusionee->pid = direct_gettid(); ret = fusion_skirmish_prevail( &shared->fusionees_lock ); if (ret) { SHFREE( shared->main_pool, fusionee ); return ret; } direct_list_append( &shared->fusionees, &fusionee->link ); fusion_skirmish_dismiss( &shared->fusionees_lock ); /* Set local pointer. */ world->fusionee = fusionee; return DR_OK; } void _fusion_add_local( FusionWorld *world, FusionRef *ref, int add ) { FusionWorldShared *shared; __Fusionee *fusionee; __FusioneeRef *fusionee_ref; //D_DEBUG_AT( Fusion_Main, "%s( %p, %p, %d )\n", __FUNCTION__, world, ref, add ); D_ASSERT( ref != NULL ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); fusionee = world->fusionee; D_ASSERT( fusionee != NULL ); direct_list_foreach (fusionee_ref, fusionee->refs) { if (fusionee_ref->ref == ref) break; } if (fusionee_ref) { fusionee_ref->count += add; //D_DEBUG_AT( Fusion_Main, " -> refs = %d\n", fusionee_ref->count ); if (fusionee_ref->count == 0) { direct_list_remove( &fusionee->refs, &fusionee_ref->link ); SHFREE( shared->main_pool, fusionee_ref ); } } else { if (add <= 0) /* called from _fusion_remove_fusionee() */ return; //D_DEBUG_AT( Fusion_Main, " -> new ref\n" ); fusionee_ref = SHCALLOC( shared->main_pool, 1, sizeof(__FusioneeRef) ); if (!fusionee_ref) { D_OOSHM(); return; } fusionee_ref->ref = ref; fusionee_ref->count = add; direct_list_prepend( &fusionee->refs, &fusionee_ref->link ); } } void _fusion_check_locals( FusionWorld *world, FusionRef *ref ) { FusionWorldShared *shared; __Fusionee *fusionee; __FusioneeRef *fusionee_ref, *temp; DirectLink *list = NULL; D_DEBUG_AT( Fusion_Main, "%s( %p, %p )\n", __FUNCTION__, world, ref ); D_ASSERT( ref != NULL ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (fusion_skirmish_prevail( &shared->fusionees_lock )) return; direct_list_foreach (fusionee, shared->fusionees) { if (fusionee->id == world->fusion_id) continue; direct_list_foreach (fusionee_ref, fusionee->refs) { if (fusionee_ref->ref == ref) { if (kill( fusionee->pid, 0 ) < 0 && errno == ESRCH) { direct_list_remove( &fusionee->refs, &fusionee_ref->link ); direct_list_append( &list, &fusionee_ref->link ); } break; } } } fusion_skirmish_dismiss( &shared->fusionees_lock ); direct_list_foreach_safe (fusionee_ref, temp, list) { _fusion_ref_change( ref, -fusionee_ref->count, false ); SHFREE( shared->main_pool, fusionee_ref ); } } void _fusion_remove_all_locals( FusionWorld *world, const FusionRef *ref ) { FusionWorldShared *shared; __Fusionee *fusionee; __FusioneeRef *fusionee_ref, *temp; D_DEBUG_AT( Fusion_Main, "%s( %p, %p )\n", __FUNCTION__, world, ref ); D_ASSERT( ref != NULL ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (fusion_skirmish_prevail( &shared->fusionees_lock )) return; direct_list_foreach (fusionee, shared->fusionees) { direct_list_foreach_safe (fusionee_ref, temp, fusionee->refs) { if (fusionee_ref->ref == ref) { direct_list_remove( &fusionee->refs, &fusionee_ref->link ); SHFREE( shared->main_pool, fusionee_ref ); } } } fusion_skirmish_dismiss( &shared->fusionees_lock ); } static void _fusion_remove_fusionee( FusionWorld *world, FusionID fusion_id ) { FusionWorldShared *shared; __Fusionee *fusionee; __FusioneeRef *fusionee_ref, *temp; D_DEBUG_AT( Fusion_Main, "%s( %p, %lu )\n", __FUNCTION__, world, fusion_id ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); fusion_skirmish_prevail( &shared->fusionees_lock ); if (fusion_id == world->fusion_id) { fusionee = world->fusionee; } else { direct_list_foreach (fusionee, shared->fusionees) { if (fusionee->id == fusion_id) break; } } if (!fusionee) { D_DEBUG_AT( Fusion_Main, "Fusionee %lu not found!\n", fusion_id ); fusion_skirmish_dismiss( &shared->fusionees_lock ); return; } direct_list_remove( &shared->fusionees, &fusionee->link ); fusion_skirmish_dismiss( &shared->fusionees_lock ); direct_list_foreach_safe (fusionee_ref, temp, fusionee->refs) { direct_list_remove( &fusionee->refs, &fusionee_ref->link ); _fusion_ref_change( fusionee_ref->ref, -fusionee_ref->count, false ); SHFREE( shared->main_pool, fusionee_ref ); } SHFREE( shared->main_pool, fusionee ); } /**********************************************************************************************************************/ DirectResult _fusion_send_message( int fd, const void *msg, size_t msg_size, struct sockaddr_un *addr ) { socklen_t len = sizeof(struct sockaddr_un); D_ASSERT( msg != NULL ); if (!addr) { addr = alloca( sizeof(struct sockaddr_un) ); getsockname( fd, (struct sockaddr*)addr, &len ); } while (sendto( fd, msg, msg_size, 0, (struct sockaddr*)addr, len ) < 0) { switch (errno) { case EINTR: continue; case ECONNREFUSED: return DR_FUSION; default: break; } D_PERROR( "Fusion: sendto()\n" ); return DR_IO; } return DR_OK; } DirectResult _fusion_recv_message( int fd, void *msg, size_t msg_size, struct sockaddr_un *addr ) { socklen_t len = addr ? sizeof(struct sockaddr_un) : 0; D_ASSERT( msg != NULL ); while (recvfrom( fd, msg, msg_size, 0, (struct sockaddr*)addr, &len ) < 0) { switch (errno) { case EINTR: continue; case ECONNREFUSED: return DR_FUSION; default: break; } D_PERROR( "Fusion: recvfrom()\n" ); return DR_IO; } return DR_OK; } /**********************************************************************************************************************/ static void fusion_fork_handler_prepare( void ) { int i; D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ ); for (i=0; ifork_callback) world->fork_callback( world->fork_action, FFS_PREPARE ); } } static void fusion_fork_handler_parent( void ) { int i; D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ ); for (i=0; ishared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (world->fork_callback) world->fork_callback( world->fork_action, FFS_PARENT ); if (world->fork_action == FFA_FORK) { /* Increase the shared reference counter. */ if (fusion_master( world )) shared->refs++; /* Cancel the dispatcher to prevent conflicts. */ direct_thread_cancel( world->dispatch_loop ); } } } static void fusion_fork_handler_child( void ) { int i; D_DEBUG_AT( Fusion_Main, "%s()\n", __FUNCTION__ ); for (i=0; ishared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (world->fork_callback) world->fork_callback( world->fork_action, FFS_CHILD ); switch (world->fork_action) { default: D_BUG( "unknown fork action %d", world->fork_action ); case FFA_CLOSE: D_DEBUG_AT( Fusion_Main, " -> closing world %d\n", i ); /* Remove world from global list. */ fusion_worlds[i] = NULL; /* Unmap shared area. */ munmap( world->shared, sizeof(FusionWorldShared) ); /* Close socket. */ close( world->fusion_fd ); /* Free local world data. */ D_MAGIC_CLEAR( world ); D_FREE( world ); break; case FFA_FORK: { __Fusionee *fusionee; __FusioneeRef *fusionee_ref; D_DEBUG_AT( Fusion_Main, " -> forking in world %d\n", i ); fusionee = world->fusionee; D_DEBUG_AT( Fusion_Main, " -> duplicating fusion id %lu\n", world->fusion_id ); fusion_skirmish_prevail( &shared->fusionees_lock ); if (_fusion_add_fusionee( world, world->fusion_id )) { fusion_skirmish_dismiss( &shared->fusionees_lock ); raise( SIGTRAP ); } D_DEBUG_AT( Fusion_Main, " -> duplicating local refs...\n" ); direct_list_foreach (fusionee_ref, fusionee->refs) { __FusioneeRef *new_ref; new_ref = SHCALLOC( shared->main_pool, 1, sizeof(__FusioneeRef) ); if (!new_ref) { D_OOSHM(); fusion_skirmish_dismiss( &shared->fusionees_lock ); raise( SIGTRAP ); } new_ref->ref = fusionee_ref->ref; new_ref->count = fusionee_ref->count; /* Avoid locking. */ new_ref->ref->multi.builtin.local += new_ref->count; direct_list_append( &((__Fusionee*)world->fusionee)->refs, &new_ref->link ); } fusion_skirmish_dismiss( &shared->fusionees_lock ); D_DEBUG_AT( Fusion_Main, " -> restarting dispatcher loop...\n" ); /* Restart the dispatcher thread. FIXME: free old struct */ world->dispatch_loop = direct_thread_create( DTT_MESSAGING, fusion_dispatch_loop, world, "Fusion Dispatch" ); if (!world->dispatch_loop) raise( SIGTRAP ); } break; } } } /**********************************************************************************************************************/ /* * Enters a fusion world by joining or creating it. * * If world is negative, the next free index is used to create a new world. * Otherwise the world with the specified index is joined or created. */ DirectResult fusion_enter( int world_index, int abi_version, FusionEnterRole role, FusionWorld **ret_world ) { DirectResult ret = DR_OK; int fd = -1; FusionID id = -1; FusionWorld *world = NULL; FusionWorldShared *shared = MAP_FAILED; struct sockaddr_un addr; char buf[128]; int len, err; D_DEBUG_AT( Fusion_Main, "%s( %d, %d, %p )\n", __FUNCTION__, world_index, abi_version, ret_world ); D_ASSERT( ret_world != NULL ); if (world_index >= FUSION_MAX_WORLDS) { D_ERROR( "Fusion/Init: World index %d exceeds maximum index %d!\n", world_index, FUSION_MAX_WORLDS - 1 ); return DR_INVARG; } if (fusion_config->force_slave) role = FER_SLAVE; pthread_once( &fusion_init_once, init_once ); direct_initialize(); fd = socket( PF_LOCAL, SOCK_RAW, 0 ); if (fd < 0) { D_PERROR( "Fusion/Init: Error creating local socket!\n" ); return DR_IO; } /* Set close-on-exec flag. */ if (fcntl( fd, F_SETFD, FD_CLOEXEC ) < 0) D_PERROR( "Fusion/Init: Couldn't set close-on-exec flag!\n" ); pthread_mutex_lock( &fusion_worlds_lock ); addr.sun_family = AF_UNIX; if (world_index < 0) { if (role == FER_SLAVE) { D_ERROR( "Fusion/Init: Slave role and a new world (index -1) was requested!\n" ); pthread_mutex_unlock( &fusion_worlds_lock ); close( fd ); return DR_INVARG; } for (world_index=0; world_indexshmfile_gid != (gid_t)-1) chown( addr.sun_path, -1, fusion_config->shmfile_gid ); } snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", FUSION_ID_MASTER ); /* Bind to address. */ err = bind( fd, (struct sockaddr*)&addr, sizeof(addr) ); if (err == 0) { chmod( addr.sun_path, 0660 ); /* Change group, if requested. */ if (fusion_config->shmfile_gid != (gid_t)-1) chown( addr.sun_path, -1, fusion_config->shmfile_gid ); id = FUSION_ID_MASTER; break; } } } else { world = fusion_worlds[world_index]; if (!world) { len = snprintf( addr.sun_path, sizeof(addr.sun_path), "/tmp/.fusion-%d/", world_index ); /* Make socket directory if it doesn't exits. */ if (mkdir( addr.sun_path, 0775 ) == 0) { chmod( addr.sun_path, 0775 ); if (fusion_config->shmfile_gid != (gid_t)-1) chown( addr.sun_path, -1, fusion_config->shmfile_gid ); } /* Check wether we are master. */ snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", FUSION_ID_MASTER ); err = bind( fd, (struct sockaddr*)&addr, sizeof(addr) ); if (err < 0) { if (role == FER_MASTER) { D_ERROR( "Fusion/Main: Couldn't start session as master! Remove %s.\n", addr.sun_path ); ret = DR_INIT; goto error; } /* Auto generate slave id. */ for (id = FUSION_ID_MASTER+1; id < (FusionID)-1; id++) { snprintf( addr.sun_path+len, sizeof(addr.sun_path)-len, "%lx", id ); err = bind( fd, (struct sockaddr*)&addr, sizeof(addr) ); if (err == 0) { chmod( addr.sun_path, 0660 ); /* Change group, if requested. */ if (fusion_config->shmfile_gid != (gid_t)-1) chown( addr.sun_path, -1, fusion_config->shmfile_gid ); break; } } } else if (err == 0 && role != FER_SLAVE) { chmod( addr.sun_path, 0660 ); /* Change group, if requested. */ if (fusion_config->shmfile_gid != (gid_t)-1) chown( addr.sun_path, -1, fusion_config->shmfile_gid ); id = FUSION_ID_MASTER; } } } /* Enter a world again? */ if (world) { D_MAGIC_ASSERT( world, FusionWorld ); D_ASSERT( world->refs > 0 ); /* Check the role again. */ switch (role) { case FER_MASTER: if (world->fusion_id != FUSION_ID_MASTER) { D_ERROR( "Fusion/Init: Master role requested for a world (%d) " "we're already slave in!\n", world_index ); ret = DR_UNSUPPORTED; goto error; } break; case FER_SLAVE: if (world->fusion_id == FUSION_ID_MASTER) { D_ERROR( "Fusion/Init: Slave role requested for a world (%d) " "we're already master in!\n", world_index ); ret = DR_UNSUPPORTED; goto error; } break; case FER_ANY: break; } shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); if (shared->world_abi != abi_version) { D_ERROR( "Fusion/Init: World ABI (%d) of world '%d' doesn't match own (%d)!\n", shared->world_abi, world_index, abi_version ); ret = DR_VERSIONMISMATCH; goto error; } world->refs++; pthread_mutex_unlock( &fusion_worlds_lock ); D_DEBUG_AT( Fusion_Main, " -> using existing world %p [%d]\n", world, world_index ); close( fd ); /* Return the world. */ *ret_world = world; return DR_OK; } if (id == (FusionID)-1) { D_ERROR( "Fusion/Init: Opening fusion socket (world %d) as '%s' failed!\n", world_index, role == FER_ANY ? "any" : (role == FER_MASTER ? "master" : "slave") ); ret = DR_INIT; goto error; } D_DEBUG_AT( Fusion_Main, " -> Fusion ID 0x%08lx\n", id ); if (id == FUSION_ID_MASTER) { int shared_fd; snprintf( buf, sizeof(buf), "%s/fusion.%d.core", fusion_config->tmpfs ? : "/dev/shm", world_index ); /* Open shared memory file. */ shared_fd = open( buf, O_RDWR | O_CREAT | O_TRUNC, 0660 ); if (shared_fd < 0) { D_PERROR( "Fusion/Init: Couldn't open shared memory file!\n" ); ret = DR_INIT; goto error; } if (fusion_config->shmfile_gid != (gid_t)-1) { if (fchown( shared_fd, -1, fusion_config->shmfile_gid ) != 0) D_INFO( "Fusion/Init: Changing owner on %s failed... continuing on.\n", buf ); } fchmod( shared_fd, 0660 ); ftruncate( shared_fd, sizeof(FusionWorldShared) ); /* Map shared area. */ shared = mmap( (void*) 0x20000000 + 0x2000 * world_index, sizeof(FusionWorldShared), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shared_fd, 0 ); if (shared == MAP_FAILED) { D_PERROR( "Fusion/Init: Mapping shared area failed!\n" ); close( shared_fd ); ret = DR_INIT; goto error; } close( shared_fd ); D_DEBUG_AT( Fusion_Main, " -> shared area at %p, size %zu\n", shared, sizeof(FusionWorldShared) ); /* Initialize reference counter. */ shared->refs = 1; /* Set ABI version. */ shared->world_abi = abi_version; /* Set the world index. */ shared->world_index = world_index; /* Set pool allocation base/max. */ shared->pool_base = (void*)0x20000000 + 0x2000 * FUSION_MAX_WORLDS + 0x8000000 * world_index; shared->pool_max = shared->pool_base + 0x8000000 - 1; /* Set start time of world clock. */ direct_monotonic_gettimeofday( &shared->start_time ); D_MAGIC_SET( shared, FusionWorldShared ); } else { FusionEnter enter; int shared_fd; /* Fill enter information. */ enter.type = FMT_ENTER; enter.fusion_id = id; snprintf( addr.sun_path, sizeof(addr.sun_path), "/tmp/.fusion-%d/%lx", world_index, FUSION_ID_MASTER ); /* Send enter message (used to sync with the master) */ ret = _fusion_send_message( fd, &enter, sizeof(FusionEnter), &addr ); if (ret == DR_OK) { ret = _fusion_recv_message( fd, &enter, sizeof(FusionEnter), NULL ); if (ret == DR_OK && enter.type != FMT_ENTER) { D_ERROR( "Fusion/Init: Expected message ENTER, got '%d'!\n", enter.type ); ret = DR_FUSION; } } if (ret) { D_ERROR( "Fusion/Init: Could not enter world '%d'!\n", world_index ); goto error; } snprintf( buf, sizeof(buf), "%s/fusion.%d.core", fusion_config->tmpfs ? : "/dev/shm", world_index ); /* Open shared memory file. */ shared_fd = open( buf, O_RDWR ); if (shared_fd < 0) { D_PERROR( "Fusion/Init: Couldn't open shared memory file!\n" ); ret = DR_INIT; goto error; } /* Map shared area. */ shared = mmap( (void*) 0x20000000 + 0x2000 * world_index, sizeof(FusionWorldShared), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, shared_fd, 0 ); if (shared == MAP_FAILED) { D_PERROR( "Fusion/Init: Mapping shared area failed!\n" ); close( shared_fd ); ret = DR_INIT; goto error; } close( shared_fd ); D_DEBUG_AT( Fusion_Main, " -> shared area at %p, size %zu\n", shared, sizeof(FusionWorldShared) ); D_MAGIC_ASSERT( shared, FusionWorldShared ); /* Check ABI version. */ if (shared->world_abi != abi_version) { D_ERROR( "Fusion/Init: World ABI (%d) doesn't match own (%d)!\n", shared->world_abi, abi_version ); ret = DR_VERSIONMISMATCH; goto error; } } /* Synchronize to world clock. */ direct_clock_set_start( &shared->start_time ); /* Allocate local data. */ world = D_CALLOC( 1, sizeof(FusionWorld) ); if (!world) { ret = D_OOM(); goto error; } /* Initialize local data. */ world->refs = 1; world->shared = shared; world->fusion_fd = fd; world->fusion_id = id; D_MAGIC_SET( world, FusionWorld ); fusion_worlds[world_index] = world; /* Initialize shared memory part. */ ret = fusion_shm_init( world ); if (ret) goto error2; D_DEBUG_AT( Fusion_Main, " -> initializing other parts...\n" ); /* Initialize other parts. */ if (world->fusion_id == FUSION_ID_MASTER) { fusion_skirmish_init( &shared->arenas_lock, "Fusion Arenas", world ); fusion_skirmish_init( &shared->reactor_globals, "Fusion Reactor Globals", world ); fusion_skirmish_init( &shared->fusionees_lock, "Fusionees", world ); /* Create the main pool. */ ret = fusion_shm_pool_create( world, "Fusion Main Pool", 0x100000, fusion_config->debugshm, &shared->main_pool ); if (ret) goto error3; } /* Add ourselves to the list of fusionees. */ ret = _fusion_add_fusionee( world, id ); if (ret) goto error4; D_DEBUG_AT( Fusion_Main, " -> starting dispatcher loop...\n" ); /* Start the dispatcher thread. */ world->dispatch_loop = direct_thread_create( DTT_MESSAGING, fusion_dispatch_loop, world, "Fusion Dispatch" ); if (!world->dispatch_loop) { ret = DR_FAILURE; goto error5; } D_DEBUG_AT( Fusion_Main, " -> done. (%p)\n", world ); pthread_mutex_unlock( &fusion_worlds_lock ); /* Return the fusion world. */ *ret_world = world; return DR_OK; error5: if (world->dispatch_loop) direct_thread_destroy( world->dispatch_loop ); _fusion_remove_fusionee( world, id ); error4: if (world->fusion_id == FUSION_ID_MASTER) fusion_shm_pool_destroy( world, shared->main_pool ); error3: if (world->fusion_id == FUSION_ID_MASTER) { fusion_skirmish_destroy( &shared->arenas_lock ); fusion_skirmish_destroy( &shared->reactor_globals ); fusion_skirmish_destroy( &shared->fusionees_lock ); } fusion_shm_deinit( world ); error2: fusion_worlds[world_index] = world; D_MAGIC_CLEAR( world ); D_FREE( world ); error: if (shared != MAP_FAILED) { if (id == FUSION_ID_MASTER) D_MAGIC_CLEAR( shared ); munmap( shared, sizeof(FusionWorldShared) ); } if (fd != -1) { /* Unbind. */ socklen_t len = sizeof(addr); if (getsockname( fd, (struct sockaddr*)&addr, &len ) == 0) unlink( addr.sun_path ); close( fd ); } pthread_mutex_unlock( &fusion_worlds_lock ); direct_shutdown(); return ret; } DirectResult fusion_stop_dispatcher( FusionWorld *world, bool emergency ) { if (!emergency) { fusion_sync( world ); direct_thread_lock( world->dispatch_loop ); } world->dispatch_stop = true; if (!emergency) { direct_thread_unlock( world->dispatch_loop ); fusion_sync( world ); } return DR_OK; } /* * Exits the fusion world. * * If 'emergency' is true the function won't join but kill the dispatcher thread. */ DirectResult fusion_exit( FusionWorld *world, bool emergency ) { FusionWorldShared *shared; int world_index; bool clear = false; D_DEBUG_AT( Fusion_Main, "%s( %p, %semergency )\n", __FUNCTION__, world, emergency ? "" : "no " ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); world_index = shared->world_index; pthread_mutex_lock( &fusion_worlds_lock ); D_ASSERT( world->refs > 0 ); if (--world->refs) { pthread_mutex_unlock( &fusion_worlds_lock ); return DR_OK; } if (!emergency) { FusionMessageType msg = FMT_SEND; /* Wakeup dispatcher. */ if (_fusion_send_message( world->fusion_fd, &msg, sizeof(msg), NULL )) direct_thread_cancel( world->dispatch_loop ); /* Wait for its termination. */ direct_thread_join( world->dispatch_loop ); } direct_thread_destroy( world->dispatch_loop ); /* Remove ourselves from list. */ if (!emergency || fusion_master( world )) { _fusion_remove_fusionee( world, world->fusion_id ); } else { struct sockaddr_un addr; FusionLeave leave; addr.sun_family = AF_UNIX; snprintf( addr.sun_path, sizeof(addr.sun_path), "/tmp/.fusion-%d/%lx", world_index, FUSION_ID_MASTER ); leave.type = FMT_LEAVE; leave.fusion_id = world->fusion_id; _fusion_send_message( world->fusion_fd, &leave, sizeof(FusionLeave), &addr ); } /* Master has to deinitialize shared data. */ if (fusion_master( world )) { shared->refs--; if (shared->refs == 0) { fusion_skirmish_destroy( &shared->reactor_globals ); fusion_skirmish_destroy( &shared->arenas_lock ); fusion_skirmish_destroy( &shared->fusionees_lock ); fusion_shm_pool_destroy( world, shared->main_pool ); /* Deinitialize shared memory. */ fusion_shm_deinit( world ); clear = true; } } else { /* Leave shared memory. */ fusion_shm_deinit( world ); } /* Reset local dispatch nodes. */ _fusion_reactor_free_all( world ); /* Remove world from global list. */ fusion_worlds[shared->world_index] = NULL; /* Unmap shared area. */ if (clear) D_MAGIC_CLEAR( shared ); munmap( shared, sizeof(FusionWorldShared) ); /* Close socket. */ close( world->fusion_fd ); if (clear) { DIR *dir; char buf[128]; int len; /* Remove core shmfile. */ snprintf( buf, sizeof(buf), "%s/fusion.%d.core", fusion_config->tmpfs ? : "/dev/shm", world_index ); D_DEBUG_AT( Fusion_Main, "Removing shmfile %s.\n", buf ); unlink( buf ); /* Cleanup socket directory. */ len = snprintf( buf, sizeof(buf), "/tmp/.fusion-%d/", world_index ); dir = opendir( buf ); if (dir) { struct dirent *entry = NULL; struct dirent tmp; while (readdir_r( dir, &tmp, &entry ) == 0 && entry) { if (entry->d_name[0] != '.') { struct stat st; direct_snputs( buf+len, entry->d_name, sizeof(buf)-len ); if (stat( buf, &st ) == 0 && S_ISSOCK(st.st_mode)) { D_DEBUG_AT( Fusion_Main, "Removing socket %s.\n", buf ); unlink( buf ); } } } closedir( dir ); } else { D_PERROR( "Fusion/Main: Couldn't open socket directory %s", buf ); } } /* Free local world data. */ D_MAGIC_CLEAR( world ); D_FREE( world ); D_DEBUG_AT( Fusion_Main, "%s( %p ) done.\n", __FUNCTION__, world ); pthread_mutex_unlock( &fusion_worlds_lock ); direct_shutdown(); return DR_OK; } /* * Sends a signal to one or more fusionees and optionally waits * for their processes to terminate. * * A fusion_id of zero means all fusionees but the calling one. * A timeout of zero means infinite waiting while a negative value * means no waiting at all. */ DirectResult fusion_kill( FusionWorld *world, FusionID fusion_id, int signal, int timeout_ms ) { FusionWorldShared *shared; __Fusionee *fusionee, *temp; int result; D_DEBUG_AT( Fusion_Main, "%s( %p, %lu, %d, %d )\n", __FUNCTION__, world, fusion_id, signal, timeout_ms ); D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); fusion_skirmish_prevail( &shared->fusionees_lock ); direct_list_foreach_safe (fusionee, temp, shared->fusionees) { if (fusion_id == 0 && fusionee->id == world->fusion_id) continue; if (fusion_id != 0 && fusionee->id != fusion_id) continue; D_DEBUG_AT( Fusion_Main, " -> killing fusionee %lu (%d)...\n", fusionee->id, fusionee->pid ); result = kill( fusionee->pid, signal ); if (result == 0 && timeout_ms >= 0) { pid_t pid = fusionee->pid; long long stop = timeout_ms ? (direct_clock_get_micros() + timeout_ms*1000) : 0; fusion_skirmish_dismiss( &shared->fusionees_lock ); while (kill( pid, 0 ) == 0) { usleep( 1000 ); if (timeout_ms && direct_clock_get_micros() >= stop) break; }; fusion_skirmish_prevail( &shared->fusionees_lock ); } else if (result < 0) { if (errno == ESRCH) { D_DEBUG_AT( Fusion_Main, " ... fusionee %lu exited without removing itself!\n", fusionee->id ); _fusion_remove_fusionee( world, fusionee->id ); } else { D_PERROR( "Fusion/Main: kill(%d, %d)\n", fusionee->pid, signal ); } } } fusion_skirmish_dismiss( &shared->fusionees_lock ); return DR_OK; } /**********************************************************************************************************************/ static void * fusion_dispatch_loop( DirectThread *self, void *arg ) { FusionWorld *world = arg; struct sockaddr_un addr; socklen_t addr_len = sizeof(addr); fd_set set; char buf[FUSION_MESSAGE_SIZE]; D_DEBUG_AT( Fusion_Main_Dispatch, "%s() running...\n", __FUNCTION__ ); while (true) { int result; D_MAGIC_ASSERT( world, FusionWorld ); FD_ZERO( &set ); FD_SET( world->fusion_fd, &set ); result = select( world->fusion_fd + 1, &set, NULL, NULL, NULL ); if (result < 0) { switch (errno) { case EINTR: continue; default: D_PERROR( "Fusion/Dispatcher: select() failed!\n" ); return NULL; } } D_MAGIC_ASSERT( world, FusionWorld ); if (FD_ISSET( world->fusion_fd, &set ) && recvfrom( world->fusion_fd, buf, sizeof(buf), 0, (struct sockaddr*)&addr, &addr_len ) > 0) { FusionMessage *msg = (FusionMessage*)buf; pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, NULL ); D_DEBUG_AT( Fusion_Main_Dispatch, " -> message from '%s'...\n", addr.sun_path ); direct_thread_lock( world->dispatch_loop ); if (world->dispatch_stop) { D_DEBUG_AT( Fusion_Main_Dispatch, " -> IGNORING (dispatch_stop!)\n" ); } else { switch (msg->type) { case FMT_SEND: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_SEND...\n" ); break; case FMT_ENTER: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_ENTER...\n" ); if (!fusion_master( world )) { D_ERROR( "Fusion/Dispatch: Got ENTER request, but I'm not master!\n" ); break; } if (msg->enter.fusion_id == world->fusion_id) { D_ERROR( "Fusion/Dispatch: Received ENTER request from myself!\n" ); break; } /* Nothing to do here. Send back message. */ _fusion_send_message( world->fusion_fd, msg, sizeof(FusionEnter), &addr ); break; case FMT_LEAVE: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_LEAVE...\n" ); if (!fusion_master( world )) { D_ERROR( "Fusion/Dispatch: Got LEAVE request, but I'm not master!\n" ); break; } if (msg->leave.fusion_id == world->fusion_id) { D_ERROR( "Fusion/Dispatch: Received LEAVE request from myself!\n" ); break; } _fusion_remove_fusionee( world, msg->leave.fusion_id ); break; case FMT_CALL: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_CALL...\n" ); _fusion_call_process( world, msg->call.call_id, &msg->call ); break; case FMT_REACTOR: D_DEBUG_AT( Fusion_Main_Dispatch, " -> FMT_REACTOR...\n" ); _fusion_reactor_process_message( world, msg->reactor.id, msg->reactor.channel, &buf[sizeof(FusionReactorMessage)] ); if (msg->reactor.ref) { fusion_ref_down( msg->reactor.ref, true ); if (fusion_ref_zero_trylock( msg->reactor.ref ) == DR_OK) { fusion_ref_destroy( msg->reactor.ref ); SHFREE( world->shared->main_pool, msg->reactor.ref ); } } break; default: D_BUG( "unexpected message type (%d)", msg->type ); break; } } direct_thread_unlock( world->dispatch_loop ); if (!world->refs) { D_DEBUG_AT( Fusion_Main_Dispatch, " -> good bye!\n" ); return NULL; } D_DEBUG_AT( Fusion_Main_Dispatch, " ...done\n" ); pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); } } return NULL; } /**********************************************************************************************************************/ #endif /* FUSION_BUILD_KERNEL */ /* * Wait until all pending messages are processed. */ DirectResult fusion_sync( const FusionWorld *world ) { int result; fd_set set; struct timeval tv; int loops = 200; D_MAGIC_ASSERT( world, FusionWorld ); D_DEBUG_AT( Fusion_Main, "%s( %p )\n", __FUNCTION__, world ); D_DEBUG_AT( Fusion_Main, "syncing with fusion device...\n" ); while (loops--) { FD_ZERO( &set ); FD_SET( world->fusion_fd, &set ); tv.tv_sec = 0; tv.tv_usec = 20000; result = select( world->fusion_fd + 1, &set, NULL, NULL, &tv ); D_DEBUG_AT( Fusion_Main, " -> select() returned %d...\n", result ); switch (result) { case -1: if (errno == EINTR) return DR_OK; D_PERROR( "Fusion/Sync: select() failed!\n"); return DR_FAILURE; default: D_DEBUG_AT( Fusion_Main, " -> FD_ISSET %d...\n", FD_ISSET( world->fusion_fd, &set ) ); if (FD_ISSET( world->fusion_fd, &set )) { usleep( 20000 ); break; } case 0: D_DEBUG_AT( Fusion_Main, " -> synced.\n"); return DR_OK; } } D_DEBUG_AT( Fusion_Main, " -> timeout!\n"); D_ERROR( "Fusion/Main: Timeout waiting for empty message queue!\n" ); return DR_TIMEOUT; } /* * Sets the fork() action of the calling Fusionee within the world. */ void fusion_world_set_fork_action( FusionWorld *world, FusionForkAction action ) { D_MAGIC_ASSERT( world, FusionWorld ); world->fork_action = action; } /* * Gets the current fork() action. */ FusionForkAction fusion_world_get_fork_action( FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return world->fork_action; } /* * Registers a callback called upon fork(). */ void fusion_world_set_fork_callback( FusionWorld *world, FusionForkCallback callback ) { D_MAGIC_ASSERT( world, FusionWorld ); world->fork_callback = callback; } /* * Return the index of the specified world. */ int fusion_world_index( const FusionWorld *world ) { FusionWorldShared *shared; D_MAGIC_ASSERT( world, FusionWorld ); shared = world->shared; D_MAGIC_ASSERT( shared, FusionWorldShared ); return shared->world_index; } /* * Return the own Fusion ID within the specified world. */ FusionID fusion_id( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return world->fusion_id; } /* * Return if the world is a multi application world. */ bool fusion_is_multi( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return true; } /* * Return the thread ID of the Fusion Dispatcher within the specified world. */ pid_t fusion_dispatcher_tid( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return direct_thread_get_tid( world->dispatch_loop ); } /* * Return true if this process is the master. */ bool fusion_master( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return world->fusion_id == FUSION_ID_MASTER; } /* * Check if a pointer points to the shared memory. */ bool fusion_is_shared( FusionWorld *world, const void *ptr ) { int i; DirectResult ret; FusionSHM *shm; FusionSHMShared *shared; D_MAGIC_ASSERT( world, FusionWorld ); shm = &world->shm; D_MAGIC_ASSERT( shm, FusionSHM ); shared = shm->shared; D_MAGIC_ASSERT( shared, FusionSHMShared ); if (ptr >= (void*) world->shared && ptr < (void*) world->shared + sizeof(FusionWorldShared)) return true; ret = fusion_skirmish_prevail( &shared->lock ); if (ret) return false; for (i=0; ipools[i].active) { shmalloc_heap *heap; FusionSHMPoolShared *pool = &shared->pools[i]; D_MAGIC_ASSERT( pool, FusionSHMPoolShared ); heap = pool->heap; D_MAGIC_ASSERT( heap, shmalloc_heap ); if (ptr >= pool->addr_base && ptr < pool->addr_base + heap->size) { fusion_skirmish_dismiss( &shared->lock ); return true; } } } fusion_skirmish_dismiss( &shared->lock ); return false; } #else /* FUSION_BUILD_MULTI */ /* * Enters a fusion world by joining or creating it. * * If world_index is negative, the next free index is used to create a new world. * Otherwise the world with the specified index is joined or created. */ DirectResult fusion_enter( int world_index, int abi_version, FusionEnterRole role, FusionWorld **ret_world ) { DirectResult ret; FusionWorld *world = NULL; D_ASSERT( ret_world != NULL ); ret = direct_initialize(); if (ret) return ret; world = D_CALLOC( 1, sizeof(FusionWorld) ); if (!world) { ret = D_OOM(); goto error; } world->shared = D_CALLOC( 1, sizeof(FusionWorldShared) ); if (!world->shared) { ret = D_OOM(); goto error; } /* Create the main pool. */ ret = fusion_shm_pool_create( world, "Fusion Main Pool", 0x100000, fusion_config->debugshm, &world->shared->main_pool ); if (ret) goto error; D_MAGIC_SET( world, FusionWorld ); D_MAGIC_SET( world->shared, FusionWorldShared ); *ret_world = world; return DR_OK; error: if (world) { if (world->shared) D_FREE( world->shared ); D_FREE( world ); } direct_shutdown(); return ret; } DirectResult fusion_stop_dispatcher( FusionWorld *world, bool emergency ) { return DR_OK; } /* * Exits the fusion world. * * If 'emergency' is true the function won't join but kill the dispatcher thread. */ DirectResult fusion_exit( FusionWorld *world, bool emergency ) { D_MAGIC_ASSERT( world, FusionWorld ); D_MAGIC_ASSERT( world->shared, FusionWorldShared ); fusion_shm_pool_destroy( world, world->shared->main_pool ); D_MAGIC_CLEAR( world->shared ); D_FREE( world->shared ); D_MAGIC_CLEAR( world ); D_FREE( world ); direct_shutdown(); return DR_OK; } /* * Sets the fork() action of the calling Fusionee within the world. */ void fusion_world_set_fork_action( FusionWorld *world, FusionForkAction action ) { D_MAGIC_ASSERT( world, FusionWorld ); } /* * Gets the current fork() action. */ FusionForkAction fusion_world_get_fork_action( FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return world->fork_action; } /* * Registers a callback called upon fork(). */ void fusion_world_set_fork_callback( FusionWorld *world, FusionForkCallback callback ) { D_MAGIC_ASSERT( world, FusionWorld ); } /* * Return the index of the specified world. */ int fusion_world_index( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return 0; } /* * Return true if this process is the master. */ bool fusion_master( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return true; } /* * Sends a signal to one or more fusionees and optionally waits * for their processes to terminate. * * A fusion_id of zero means all fusionees but the calling one. * A timeout of zero means infinite waiting while a negative value * means no waiting at all. */ DirectResult fusion_kill( FusionWorld *world, FusionID fusion_id, int signal, int timeout_ms ) { D_MAGIC_ASSERT( world, FusionWorld ); return DR_OK; } /* * Return the own Fusion ID within the specified world. */ FusionID fusion_id( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return 1; } /* * Return if the world is a multi application world. */ bool fusion_is_multi( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return false; } /* * Wait until all pending messages are processed. */ DirectResult fusion_sync( const FusionWorld *world ) { D_MAGIC_ASSERT( world, FusionWorld ); return DR_OK; } /* Check if a pointer points to the shared memory. */ bool fusion_is_shared( FusionWorld *world, const void *ptr ) { D_MAGIC_ASSERT( world, FusionWorld ); return true; } #endif