OpenSync
0.22
|
00001 /* 00002 * libosengine - A synchronization engine for the opensync framework 00003 * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org> 00004 * 00005 * This library is free software; you can redistribute it and/or 00006 * modify it under the terms of the GNU Lesser General Public 00007 * License as published by the Free Software Foundation; either 00008 * version 2.1 of the License, or (at your option) any later version. 00009 * 00010 * This library is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00013 * Lesser General Public License for more details. 00014 * 00015 * You should have received a copy of the GNU Lesser General Public 00016 * License along with this library; if not, write to the Free Software 00017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00018 * 00019 */ 00020 00021 #include "engine.h" 00022 00023 #include <errno.h> 00024 #include <sys/stat.h> 00025 #include <sys/types.h> 00026 00027 #include <glib.h> 00028 00029 #include <opensync/opensync_support.h> 00030 #include "opensync/opensync_message_internals.h" 00031 #include "opensync/opensync_queue_internals.h" 00032 #include "opensync/opensync_format_internals.h" 00033 00034 #include "engine_internals.h" 00035 #include <opensync/opensync_user_internals.h> 00036 00037 OSyncMappingEntry *osengine_mappingtable_find_entry(OSyncMappingTable *table, const char *uid, const char *objtype, long long int memberid); 00056 00057 void _new_change_receiver(OSyncEngine *engine, OSyncClient *client, OSyncChange *change) 00058 { 00059 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, client, change); 00060 00061 OSyncError *error = NULL; 00062 OSyncChangeType change_type = osync_change_get_changetype(change); 00063 OSyncFormatEnv *format_env = osync_group_get_format_env(engine->group); 00064 OSyncObjType *objtype = osync_change_get_objtype(change); 00065 const char* uid = osync_change_get_uid(change); 00066 OSyncObjFormat *objformat = osync_change_get_objformat(change); 00067 00068 osync_change_set_member(change, client->member); 00069 00070 osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, objtype %s and format %s from member %lli", uid, change_type, 00071 objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None", 00072 osync_member_get_id(client->member)); 00073 00074 00081 if ( (change_type != CHANGE_DELETED) && 00082 (osync_change_has_data(change))) { 00083 osync_bool is_file_objformat = FALSE; 00084 if(objformat) 00085 is_file_objformat = 00086 ((!strcmp(objformat->name, "file"))?(TRUE):(FALSE)); 00087 if ( (!objtype) || (!objformat) || 00088 (!strcmp(osync_objtype_get_name(objtype), "data")) || 00089 (!strcmp(objformat->name, "plain"))) { 00090 OSyncObjType *objtype_test = osync_change_detect_objtype_full(format_env, change, &error); 00091 objtype = (objtype_test)?(objtype_test):(objtype); 00092 } 00093 if (objtype) { 00094 osync_trace(TRACE_INTERNAL, "Detected the object to be of type %s", osync_objtype_get_name(objtype)); 00095 00096 osync_change_set_objtype(change, objtype); 00097 00102 if ( ( (osync_group_get_slow_sync(engine->group, 00103 osync_objtype_get_name(objtype))) || 00104 ( (!is_file_objformat) && 00105 (!osengine_mappingtable_find_entry( 00106 engine->maptable, uid, 00107 osync_objtype_get_name(objtype), 00108 osync_member_get_id(client->member))) ) 00109 ) && (change_type == CHANGE_MODIFIED) ){ 00110 osync_change_set_changetype(change, CHANGE_ADDED); 00111 change_type = osync_change_get_changetype(change); 00112 } 00113 } 00114 } else 00115 if (change_type == CHANGE_DELETED){ 00121 if ( !objtype || 00122 (( !strcmp(osync_objtype_get_name(objtype), "data") ) && 00123 ( !osengine_mappingtable_find_entry( 00124 engine->maptable, uid, 00125 osync_objtype_get_name(objtype), 00126 osync_member_get_id(client->member)) )) ){ 00127 00128 OSyncMappingEntry *entry = 00129 osengine_mappingtable_find_entry( 00130 engine->maptable, uid, NULL, 00131 osync_member_get_id(client->member) 00132 ); 00133 if (entry) { 00134 osync_change_set_objtype(change, 00135 osync_change_get_objtype( 00136 entry->change)); 00137 objtype=osync_change_get_objtype(change); 00138 } else { 00139 osync_error_set(&error, OSYNC_ERROR_GENERIC, 00140 "Could not find one entry with UID=%s to delete.", uid); 00141 goto error; 00142 } 00143 } 00144 } else { 00145 osync_trace(TRACE_INTERNAL, "Change has no data!"); 00146 } 00147 00148 osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", uid, change_type, osync_change_get_data(change), osync_change_get_datasize(change), objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None", osync_member_get_id(client->member)); 00149 00150 if (!objtype){ 00151 osync_error_set(&error, OSYNC_ERROR_GENERIC, 00152 "ObjType not set for uid %s.", uid); 00153 goto error; 00154 } 00155 00156 00157 OSyncMappingEntry *entry = osengine_mappingtable_store_change(engine->maptable, change); 00158 change = entry->change; 00159 if (!osync_change_save(change, TRUE, &error)) { 00160 osync_error_duplicate(&engine->error, &error); 00161 osync_status_update_change(engine, change, CHANGE_RECV_ERROR, &error); 00162 osync_error_update(&engine->error, "Unable to receive one or more objects"); 00163 osync_flag_unset(entry->fl_has_data); 00164 goto error; 00165 } 00166 00167 osync_group_remove_changelog(engine->group, change, &error); 00168 00169 //We convert to the common format here to make sure we always pass it 00170 osync_change_convert_to_common(change, NULL); 00171 00172 if (!entry->mapping) { 00173 osync_flag_attach(entry->fl_mapped, engine->cmb_entries_mapped); 00174 osync_flag_unset(entry->fl_mapped); 00175 osync_debug("ENG", 3, "+It has no mapping"); 00176 } else { 00177 osync_debug("ENG", 3, "+It has mapping"); 00178 osync_flag_set(entry->fl_mapped); 00179 osync_flag_unset(entry->mapping->fl_solved); 00180 osync_flag_unset(entry->mapping->fl_chkconflict); 00181 osync_flag_unset(entry->mapping->fl_multiplied); 00182 } 00183 00184 if (osync_change_has_data(change)) { 00185 osync_debug("ENG", 3, "+It has data"); 00186 osync_flag_set(entry->fl_has_data); 00187 osync_status_update_change(engine, change, CHANGE_RECEIVED, NULL); 00188 } else { 00189 osync_debug("ENG", 3, "+It has no data"); 00190 osync_flag_unset(entry->fl_has_data); 00191 osync_status_update_change(engine, change, CHANGE_RECEIVED_INFO, NULL); 00192 } 00193 00194 if (osync_change_get_changetype(change) == CHANGE_DELETED) 00195 osync_flag_set(entry->fl_deleted); 00196 00197 osync_flag_set(entry->fl_has_info); 00198 osync_flag_unset(entry->fl_synced); 00199 00200 osengine_mappingentry_decider(engine, entry); 00201 00202 osync_trace(TRACE_EXIT, "%s", __func__); 00203 return; 00204 00205 error: 00206 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error)); 00207 osync_error_free(&error); 00208 return; 00209 } 00210 00211 OSyncClient *osengine_get_client(OSyncEngine *engine, long long int memberId) 00212 { 00213 GList *c = NULL; 00214 for (c = engine->clients; c; c = c->next) { 00215 OSyncClient *client = c->data; 00216 if (osync_member_get_id(client->member) == memberId) 00217 return client; 00218 } 00219 return NULL; 00220 } 00221 00222 00223 void send_engine_changed(OSyncEngine *engine) 00224 { 00225 if (!engine->is_initialized) 00226 return; 00227 00228 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_ENGINE_CHANGED, 0, NULL); 00229 /*FIXME: Handle errors here */ 00230 00231 osync_debug("ENG", 4, "Sending message %p:\"ENGINE_CHANGED\"", message); 00232 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL); 00233 } 00234 00235 void send_mapping_changed(OSyncEngine *engine, OSyncMapping *mapping) 00236 { 00237 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPING_CHANGED, sizeof(long long), NULL); 00238 osync_message_write_long_long_int(message, mapping->id); 00239 /*FIXME: Handle errors here */ 00240 00241 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL); 00242 /*FIXME: Handle errors here, too */ 00243 } 00244 00245 void send_mappingentry_changed(OSyncEngine *engine, OSyncMappingEntry *entry) 00246 { 00247 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPINGENTRY_CHANGED, sizeof(long long)*2, NULL); 00248 00249 /*FIXME: don't pass a pointer through the messaging system */ 00250 long long ptr = (long long)(long)entry; 00251 osync_message_write_long_long_int(message, ptr); 00252 /*FIXME: Handle errors here */ 00253 00254 osync_queue_send_message(engine->commands_to_self, NULL, message, NULL); 00255 /*FIXME: Handle errors here, too */ 00256 } 00257 00265 static void engine_message_handler(OSyncMessage *message, OSyncEngine *engine) 00266 { 00267 osync_trace(TRACE_ENTRY, "engine_message_handler(%p:%lli-%i, %p)", message, message->id1, message->id2, engine); 00268 00269 OSyncChange *change = NULL; 00270 00271 osync_trace(TRACE_INTERNAL, "engine received command %i", osync_message_get_command(message)); 00272 00273 switch (osync_message_get_command(message)) { 00274 case OSYNC_MESSAGE_SYNCHRONIZE: 00275 osync_trace(TRACE_INTERNAL, "all deciders"); 00276 osengine_client_all_deciders(engine); 00277 break; 00278 case OSYNC_MESSAGE_NEW_CHANGE: 00279 osync_demarshal_change(message, osync_group_get_format_env(engine->group), &change); 00280 00281 long long int member_id = 0; 00282 osync_message_read_long_long_int(message, &member_id); 00283 OSyncClient *sender = osengine_get_client(engine, member_id); 00284 00285 _new_change_receiver(engine, sender, change); 00286 break; 00287 case OSYNC_MESSAGE_ENGINE_CHANGED: 00288 osengine_client_all_deciders(engine); 00289 osengine_mapping_all_deciders(engine); 00290 GList *u; 00291 for (u = engine->maptable->unmapped; u; u = u->next) { 00292 OSyncMappingEntry *unmapped = u->data; 00293 send_mappingentry_changed(engine, unmapped); 00294 } 00295 break; 00296 case OSYNC_MESSAGE_MAPPING_CHANGED: 00297 { 00298 long long id; 00299 osync_message_read_long_long_int(message, &id); 00300 /*FIXME: check errors by read_long_long_int */ 00301 OSyncMapping *mapping = osengine_mappingtable_mapping_from_id(engine->maptable, id); 00302 00303 if (!g_list_find(engine->maptable->mappings, mapping)) { 00304 osync_trace(TRACE_EXIT, "%s: Mapping %p is dead", __func__, mapping); 00305 return; 00306 } 00307 00308 osengine_mapping_decider(engine, mapping); 00309 } 00310 break; 00311 case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED: 00312 { 00313 long long ptr; 00314 osync_message_read_long_long_int(message, &ptr); 00315 OSyncMappingEntry *entry = (OSyncMappingEntry*)(long)ptr; 00316 00317 if (!g_list_find(engine->maptable->entries, entry) && !g_list_find(engine->maptable->unmapped, entry)) { 00318 osync_trace(TRACE_EXIT, "%s: Entry %p is dead", __func__, entry); 00319 return; 00320 } 00321 00322 osengine_mappingentry_decider(engine, entry); 00323 } 00324 break; 00325 case OSYNC_MESSAGE_SYNC_ALERT: 00326 if (engine->allow_sync_alert) 00327 osync_flag_set(engine->fl_running); 00328 else 00329 osync_trace(TRACE_INTERNAL, "Sync Alert not allowed"); 00330 break; 00331 00332 default: 00333 break; 00334 } 00335 00336 /*TODO: Implement handling of the messages listed below, on commented code */ 00337 00338 /* 00339 if (osync_message_is_signal (message, "CLIENT_CHANGED")) { 00340 OSyncClient *client = osync_message_get_data(message, "client"); 00341 00342 if (!g_list_find(engine->clients, client)) { 00343 osync_trace(TRACE_EXIT, "%s: Client %p is dead", __func__, client); 00344 return; 00345 } 00346 00347 osengine_client_decider(engine, client); 00348 osync_trace(TRACE_EXIT, "engine_message_handler"); 00349 return; 00350 } 00351 00352 if (osync_message_is_signal (message, "PLUGIN_MESSAGE")) { 00353 char *name = osync_message_get_data(message, "name"); 00354 void *data = osync_message_get_data(message, "data"); 00355 engine->plgmsg_callback(engine, sender, name, data, engine->plgmsg_userdata); 00356 osync_trace(TRACE_EXIT, "engine_message_handler"); 00357 return; 00358 } 00359 00360 osync_debug("ENG", 0, "Unknown message \"%s\"", osync_message_get_msgname(message)); 00361 osync_trace(TRACE_EXIT_ERROR, "engine_message_handler: Unknown message"); 00362 g_assert_not_reached();*/ 00363 osync_trace(TRACE_EXIT, "%s", __func__); 00364 } 00365 00366 static void trigger_clients_sent_changes(OSyncEngine *engine) 00367 { 00368 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00369 osync_status_update_engine(engine, ENG_ENDPHASE_READ, NULL); 00370 00371 g_mutex_lock(engine->info_received_mutex); 00372 g_cond_signal(engine->info_received); 00373 g_mutex_unlock(engine->info_received_mutex); 00374 00375 //Load the old mappings 00376 osengine_mappingtable_inject_changes(engine->maptable); 00377 00378 send_engine_changed(engine); 00379 osync_trace(TRACE_EXIT, "%s", __func__); 00380 } 00381 00382 static void trigger_clients_read_all(OSyncEngine *engine) 00383 { 00384 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00385 00386 send_engine_changed(engine); 00387 osync_trace(TRACE_EXIT, "%s", __func__); 00388 } 00389 00390 static void trigger_status_end_conflicts(OSyncEngine *engine) 00391 { 00392 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00393 osync_status_update_engine(engine, ENG_END_CONFLICTS, NULL); 00394 00395 osync_trace(TRACE_EXIT, "%s", __func__); 00396 } 00397 00398 static void trigger_clients_connected(OSyncEngine *engine) 00399 { 00400 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00401 osync_status_update_engine(engine, ENG_ENDPHASE_CON, NULL); 00402 osengine_client_all_deciders(engine); 00403 00404 osync_trace(TRACE_EXIT, "%s", __func__); 00405 } 00406 00407 static void trigger_clients_comitted_all(OSyncEngine *engine) 00408 { 00409 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00410 osync_status_update_engine(engine, ENG_ENDPHASE_WRITE, NULL); 00411 00412 osync_trace(TRACE_EXIT, "%s", __func__); 00413 } 00414 00415 00416 /*void send_engine_committed_all(OSyncEngine *engine) 00417 { 00418 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00419 00420 engine->committed_all_sent = TRUE; 00421 00422 osync_trace(TRACE_INTERNAL, "++++ ENGINE COMMAND: Committed all ++++"); 00423 00424 GList *c = NULL; 00425 for (c = engine->clients; c; c = c->next) { 00426 OSyncClient *client = c->data; 00427 if (osync_flag_is_not_set(client->fl_committed_all)) 00428 send_committed_all(client, engine); 00429 } 00430 00431 osync_trace(TRACE_EXIT, "%s", __func__); 00432 } 00433 00434 static void trigger_engine_committed_all(OSyncEngine *engine) 00435 { 00436 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 00437 00438 if (osync_flag_is_not_set(engine->cmb_multiplied)) { 00439 osync_trace(TRACE_EXIT, "%s: Not multiplied yet", __func__); 00440 return; 00441 } 00442 00443 send_engine_committed_all(engine); 00444 00445 osync_trace(TRACE_EXIT, "%s", __func__); 00446 }*/ 00447 00448 static gboolean startupfunc(gpointer data) 00449 { 00450 OSyncEngine *engine = data; 00451 osync_trace(TRACE_INTERNAL, "+++++++++ This is the engine of group \"%s\" +++++++++", osync_group_get_name(engine->group)); 00452 00453 OSyncError *error = NULL; 00454 if (!osengine_mappingtable_load(engine->maptable, &error)) { 00455 osync_error_duplicate(&engine->error, &error); 00456 osync_status_update_engine(engine, ENG_ERROR, &error); 00457 osync_error_update(&engine->error, "Unable to connect one of the members"); 00458 osync_flag_set(engine->fl_stop); 00459 } 00460 00461 g_mutex_lock(engine->started_mutex); 00462 g_cond_signal(engine->started); 00463 g_mutex_unlock(engine->started_mutex); 00464 return FALSE; 00465 } 00466 00478 00489 osync_bool osengine_reset(OSyncEngine *engine, OSyncError **error) 00490 { 00491 //FIXME Check if engine is running 00492 osync_trace(TRACE_ENTRY, "osengine_reset(%p, %p)", engine, error); 00493 GList *c = NULL; 00494 for (c = engine->clients; c; c = c->next) { 00495 OSyncClient *client = c->data; 00496 osync_client_reset(client); 00497 } 00498 00499 osync_flag_set_state(engine->fl_running, FALSE); 00500 osync_flag_set_state(engine->fl_stop, FALSE); 00501 osync_flag_set_state(engine->cmb_sent_changes, FALSE); 00502 osync_flag_set_state(engine->cmb_entries_mapped, TRUE); 00503 osync_flag_set_state(engine->cmb_synced, TRUE); 00504 osync_flag_set_state(engine->cmb_chkconflict, TRUE); 00505 osync_flag_set_state(engine->cmb_finished, FALSE); 00506 osync_flag_set_state(engine->cmb_connected, FALSE); 00507 osync_flag_set_state(engine->cmb_read_all, TRUE); 00508 osync_flag_set_state(engine->cmb_committed_all, TRUE); 00509 osync_flag_set_state(engine->cmb_committed_all_sent, FALSE); 00510 00511 osync_status_update_engine(engine, ENG_ENDPHASE_DISCON, NULL); 00512 00513 engine->committed_all_sent = FALSE; 00514 00515 osengine_mappingtable_reset(engine->maptable); 00516 00517 if (engine->error) { 00518 //FIXME We might be leaking memory here 00519 OSyncError *newerror = NULL; 00520 osync_error_duplicate(&newerror, &engine->error); 00521 osync_status_update_engine(engine, ENG_ERROR, &newerror); 00522 osync_group_set_slow_sync(engine->group, "data", TRUE); 00523 } else { 00524 osync_status_update_engine(engine, ENG_SYNC_SUCCESSFULL, NULL); 00525 osync_group_reset_slow_sync(engine->group, "data"); 00526 } 00527 00528 osync_trace(TRACE_INTERNAL, "engine error is %p", engine->error); 00529 00530 g_mutex_lock(engine->syncing_mutex); 00531 g_cond_signal(engine->syncing); 00532 g_mutex_unlock(engine->syncing_mutex); 00533 00534 osync_trace(TRACE_EXIT, "osengine_reset"); 00535 return TRUE; 00536 } 00537 00538 /* Implementation of g_mkdir_with_parents() 00539 * 00540 * This function overwrite the contents of the 'dir' parameter 00541 */ 00542 static int __mkdir_with_parents(char *dir, int mode) 00543 { 00544 if (g_file_test(dir, G_FILE_TEST_IS_DIR)) 00545 return 0; 00546 00547 char *slash = strrchr(dir, '/'); 00548 if (slash && slash != dir) { 00549 /* Create parent directory if needed */ 00550 00551 /* This is a trick: I don't want to allocate a new string 00552 * for the parent directory. So, just put a NUL char 00553 * in the last slash, and restore it after creating the 00554 * parent directory 00555 */ 00556 *slash = '\0'; 00557 if (__mkdir_with_parents(dir, mode) < 0) 00558 return -1; 00559 *slash = '/'; 00560 } 00561 00562 if (mkdir(dir, mode) < 0) 00563 return -1; 00564 00565 return 0; 00566 } 00567 00568 static int mkdir_with_parents(const char *dir, int mode) 00569 { 00570 int r; 00571 char *mydir = strdup(dir); 00572 if (!mydir) 00573 return -1; 00574 00575 r = __mkdir_with_parents(mydir, mode); 00576 free(mydir); 00577 return r; 00578 } 00579 00589 OSyncEngine *osengine_new(OSyncGroup *group, OSyncError **error) 00590 { 00591 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, group, error); 00592 00593 g_assert(group); 00594 OSyncEngine *engine = g_malloc0(sizeof(OSyncEngine)); 00595 osync_group_set_data(group, engine); 00596 00597 if (!g_thread_supported ()) 00598 g_thread_init (NULL); 00599 00600 engine->context = g_main_context_new(); 00601 engine->syncloop = g_main_loop_new(engine->context, FALSE); 00602 engine->group = group; 00603 00604 OSyncUserInfo *user = osync_user_new(error); 00605 if (!user) 00606 goto error; 00607 00608 char *enginesdir = g_strdup_printf("%s/engines", osync_user_get_confdir(user)); 00609 char *path = g_strdup_printf("%s/enginepipe", enginesdir); 00610 00611 if (mkdir_with_parents(enginesdir, 0755) < 0) { 00612 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't create engines directory: %s", strerror(errno)); 00613 goto error_free_paths; 00614 } 00615 00616 engine->syncing_mutex = g_mutex_new(); 00617 engine->info_received_mutex = g_mutex_new(); 00618 engine->syncing = g_cond_new(); 00619 engine->info_received = g_cond_new(); 00620 engine->started_mutex = g_mutex_new(); 00621 engine->started = g_cond_new(); 00622 00623 //Set the default start flags 00624 engine->fl_running = osync_flag_new(NULL); 00625 osync_flag_set_pos_trigger(engine->fl_running, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL); 00626 00627 engine->fl_sync = osync_flag_new(NULL); 00628 engine->fl_stop = osync_flag_new(NULL); 00629 osync_flag_set_pos_trigger(engine->fl_stop, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL); 00630 00631 //The combined flags 00632 engine->cmb_sent_changes = osync_comb_flag_new(FALSE, FALSE); 00633 osync_flag_set_pos_trigger(engine->cmb_sent_changes, (OSyncFlagTriggerFunc)trigger_clients_sent_changes, engine, NULL); 00634 00635 engine->cmb_read_all = osync_comb_flag_new(FALSE, TRUE); 00636 osync_flag_set_pos_trigger(engine->cmb_read_all, (OSyncFlagTriggerFunc)trigger_clients_read_all, engine, NULL); 00637 00638 engine->cmb_entries_mapped = osync_comb_flag_new(FALSE, FALSE); 00639 osync_flag_set_pos_trigger(engine->cmb_entries_mapped, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL); 00640 00641 00642 engine->cmb_synced = osync_comb_flag_new(FALSE, TRUE); 00643 osync_flag_set_pos_trigger(engine->cmb_synced, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL); 00644 00645 00646 engine->cmb_finished = osync_comb_flag_new(FALSE, TRUE); 00647 osync_flag_set_pos_trigger(engine->cmb_finished, (OSyncFlagTriggerFunc)osengine_reset, engine, NULL); 00648 00649 engine->cmb_connected = osync_comb_flag_new(FALSE, FALSE); 00650 osync_flag_set_pos_trigger(engine->cmb_connected, (OSyncFlagTriggerFunc)trigger_clients_connected, engine, NULL); 00651 00652 engine->cmb_chkconflict = osync_comb_flag_new(FALSE, TRUE); 00653 osync_flag_set_pos_trigger(engine->cmb_chkconflict, (OSyncFlagTriggerFunc)trigger_status_end_conflicts, engine, NULL); 00654 00655 engine->cmb_multiplied = osync_comb_flag_new(FALSE, TRUE); 00656 00657 engine->cmb_committed_all = osync_comb_flag_new(FALSE, TRUE); 00658 osync_flag_set_pos_trigger(engine->cmb_committed_all, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL); 00659 00660 00661 engine->cmb_committed_all_sent = osync_comb_flag_new(FALSE, TRUE); 00662 osync_flag_set_pos_trigger(engine->cmb_committed_all_sent, (OSyncFlagTriggerFunc)trigger_clients_comitted_all, engine, NULL); 00663 00664 osync_flag_set(engine->fl_sync); 00665 00666 int i; 00667 for (i = 0; i < osync_group_num_members(group); i++) { 00668 OSyncMember *member = osync_group_nth_member(group, i); 00669 if (!osync_client_new(engine, member, error)) 00670 goto error_free_paths; 00671 } 00672 00673 engine->maptable = osengine_mappingtable_new(engine); 00674 00675 osync_trace(TRACE_EXIT, "osengine_new: %p", engine); 00676 return engine; 00677 00678 error_free_paths: 00679 g_free(path); 00680 g_free(enginesdir); 00681 error: 00682 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00683 return NULL; 00684 } 00685 00693 void osengine_free(OSyncEngine *engine) 00694 { 00695 osync_trace(TRACE_ENTRY, "osengine_free(%p)", engine); 00696 00697 GList *c = NULL; 00698 for (c = engine->clients; c; c = c->next) { 00699 OSyncClient *client = c->data; 00700 osync_client_free(client); 00701 } 00702 00703 osengine_mappingtable_free(engine->maptable); 00704 engine->maptable = NULL; 00705 00706 osync_flag_free(engine->fl_running); 00707 osync_flag_free(engine->fl_sync); 00708 osync_flag_free(engine->fl_stop); 00709 osync_flag_free(engine->cmb_sent_changes); 00710 osync_flag_free(engine->cmb_entries_mapped); 00711 osync_flag_free(engine->cmb_synced); 00712 osync_flag_free(engine->cmb_chkconflict); 00713 osync_flag_free(engine->cmb_finished); 00714 osync_flag_free(engine->cmb_connected); 00715 osync_flag_free(engine->cmb_read_all); 00716 osync_flag_free(engine->cmb_multiplied); 00717 osync_flag_free(engine->cmb_committed_all); 00718 osync_flag_free(engine->cmb_committed_all_sent); 00719 00720 g_list_free(engine->clients); 00721 g_main_loop_unref(engine->syncloop); 00722 00723 g_main_context_unref(engine->context); 00724 00725 g_mutex_free(engine->syncing_mutex); 00726 g_mutex_free(engine->info_received_mutex); 00727 g_cond_free(engine->syncing); 00728 g_cond_free(engine->info_received); 00729 g_mutex_free(engine->started_mutex); 00730 g_cond_free(engine->started); 00731 00732 g_free(engine); 00733 osync_trace(TRACE_EXIT, "osengine_free"); 00734 } 00735 00745 void osengine_set_conflict_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncMapping *, void *), void *user_data) 00746 { 00747 engine->conflict_callback = function; 00748 engine->conflict_userdata = user_data; 00749 } 00750 00760 void osengine_set_changestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncChangeUpdate *, void *), void *user_data) 00761 { 00762 engine->changestat_callback = function; 00763 engine->changestat_userdata = user_data; 00764 } 00765 00775 void osengine_set_mappingstatus_callback(OSyncEngine *engine, void (* function) (OSyncMappingUpdate *, void *), void *user_data) 00776 { 00777 engine->mapstat_callback = function; 00778 engine->mapstat_userdata = user_data; 00779 } 00780 00790 void osengine_set_enginestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncEngineUpdate *, void *), void *user_data) 00791 { 00792 engine->engstat_callback = function; 00793 engine->engstat_userdata = user_data; 00794 } 00795 00805 void osengine_set_memberstatus_callback(OSyncEngine *engine, void (* function) (OSyncMemberUpdate *, void *), void *user_data) 00806 { 00807 engine->mebstat_callback = function; 00808 engine->mebstat_userdata = user_data; 00809 } 00810 00820 void osengine_set_message_callback(OSyncEngine *engine, void *(* function) (OSyncEngine *, OSyncClient *, const char *, void *, void *), void *user_data) 00821 { 00822 engine->plgmsg_callback = function; 00823 engine->plgmsg_userdata = user_data; 00824 } 00825 00837 osync_bool osengine_init(OSyncEngine *engine, OSyncError **error) 00838 { 00839 osync_trace(TRACE_ENTRY, "osengine_init(%p, %p)", engine, error); 00840 00841 if (engine->is_initialized) { 00842 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "This engine was already initialized"); 00843 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00844 return FALSE; 00845 } 00846 00847 switch (osync_group_lock(engine->group)) { 00848 case OSYNC_LOCKED: 00849 osync_error_set(error, OSYNC_ERROR_LOCKED, "Group is locked"); 00850 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00851 return FALSE; 00852 case OSYNC_LOCK_STALE: 00853 osync_debug("ENG", 1, "Detected stale lock file. Slow-syncing"); 00854 osync_status_update_engine(engine, ENG_PREV_UNCLEAN, NULL); 00855 osync_group_set_slow_sync(engine->group, "data", TRUE); 00856 break; 00857 default: 00858 break; 00859 } 00860 00861 osync_flag_set(engine->cmb_entries_mapped); 00862 osync_flag_set(engine->cmb_synced); 00863 engine->allow_sync_alert = TRUE; 00864 00865 //OSyncMember *member = NULL; 00866 OSyncGroup *group = engine->group; 00867 00868 if (osync_group_num_members(group) < 2) { 00869 //Not enough members! 00870 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "You only configured %i members, but at least 2 are needed", osync_group_num_members(group)); 00871 osync_group_unlock(engine->group, TRUE); 00872 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00873 return FALSE; 00874 } 00875 00876 engine->is_initialized = TRUE; 00877 00878 osync_trace(TRACE_INTERNAL, "Spawning clients"); 00879 GList *c = NULL; 00880 for (c = engine->clients; c; c = c->next) { 00881 OSyncClient *client = c->data; 00882 osync_queue_create(client->commands_from_osplugin, NULL); 00883 00884 if (!osync_client_spawn(client, engine, error)) { 00885 osync_group_unlock(engine->group, TRUE); 00886 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00887 return FALSE; 00888 } 00889 00890 osync_queue_set_message_handler(client->commands_from_osplugin, (OSyncMessageHandler)engine_message_handler, engine); 00891 if (!(engine->man_dispatch)) 00892 osync_queue_setup_with_gmainloop(client->commands_from_osplugin, engine->context); 00893 osync_trace(TRACE_INTERNAL, "opening client queue"); 00894 if (!osync_queue_connect(client->commands_from_osplugin, OSYNC_QUEUE_RECEIVER, 0 )) { 00895 osync_group_unlock(engine->group, TRUE); 00896 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00897 return FALSE; 00898 } 00899 } 00900 00901 osync_trace(TRACE_INTERNAL, "opening engine queue"); 00902 if (!osync_queue_new_pipes(&engine->commands_from_self, &engine->commands_to_self, error)) { 00903 osync_group_unlock(engine->group, TRUE); 00904 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00905 return FALSE; 00906 } 00907 00908 if (!osync_queue_connect(engine->commands_from_self, OSYNC_QUEUE_RECEIVER, 0 )) { 00909 osync_group_unlock(engine->group, TRUE); 00910 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00911 return FALSE; 00912 } 00913 00914 if (!osync_queue_connect(engine->commands_to_self, OSYNC_QUEUE_SENDER, 0 )) { 00915 osync_group_unlock(engine->group, TRUE); 00916 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00917 return FALSE; 00918 } 00919 00920 osync_queue_set_message_handler(engine->commands_from_self, (OSyncMessageHandler)engine_message_handler, engine); 00921 if (!(engine->man_dispatch)) 00922 osync_queue_setup_with_gmainloop(engine->commands_from_self, engine->context); 00923 00924 osync_trace(TRACE_INTERNAL, "initializing clients"); 00925 for (c = engine->clients; c; c = c->next) { 00926 OSyncClient *client = c->data; 00927 if (!osync_client_init(client, engine, error)) { 00928 osengine_finalize(engine); 00929 osync_group_unlock(engine->group, TRUE); 00930 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error)); 00931 return FALSE; 00932 } 00933 } 00934 00935 osync_debug("ENG", 3, "Running the main loop"); 00936 00937 //Now we can run the main loop 00938 //We protect the startup by a g_cond 00939 g_mutex_lock(engine->started_mutex); 00940 GSource *idle = g_idle_source_new(); 00941 g_source_set_priority(idle, G_PRIORITY_HIGH); 00942 g_source_set_callback(idle, startupfunc, engine, NULL); 00943 g_source_attach(idle, engine->context); 00944 engine->thread = g_thread_create ((GThreadFunc)g_main_loop_run, engine->syncloop, TRUE, NULL); 00945 g_cond_wait(engine->started, engine->started_mutex); 00946 g_mutex_unlock(engine->started_mutex); 00947 00948 osync_trace(TRACE_EXIT, "osengine_init"); 00949 return TRUE; 00950 } 00951 00960 void osengine_finalize(OSyncEngine *engine) 00961 { 00962 //FIXME check if engine is running 00963 osync_trace(TRACE_ENTRY, "osengine_finalize(%p)", engine); 00964 00965 if (!engine->is_initialized) { 00966 osync_trace(TRACE_EXIT_ERROR, "osengine_finalize: Not initialized"); 00967 return; 00968 } 00969 00970 g_assert(engine); 00971 osync_debug("ENG", 3, "finalizing engine %p", engine); 00972 00973 if (engine->thread) { 00974 g_main_loop_quit(engine->syncloop); 00975 g_thread_join(engine->thread); 00976 } 00977 00978 GList *c = NULL; 00979 for (c = engine->clients; c; c = c->next) { 00980 OSyncClient *client = c->data; 00981 osync_queue_disconnect(client->commands_from_osplugin, NULL); 00982 osync_client_finalize(client, NULL); 00983 } 00984 00985 osync_queue_disconnect(engine->commands_from_self, NULL); 00986 osync_queue_disconnect(engine->commands_to_self, NULL); 00987 00988 osync_queue_free(engine->commands_from_self); 00989 engine->commands_from_self = NULL; 00990 osync_queue_free(engine->commands_to_self); 00991 engine->commands_to_self = NULL; 00992 00993 osengine_mappingtable_close(engine->maptable); 00994 00995 if (engine->error) { 00996 /* If the error occured during connect, we 00997 * dont want to trigger a slow-sync the next 00998 * time. In the case the we have a slow-sync 00999 * right in the beginning, we also dont remove 01000 * the lockfile to trigger a slow-sync again 01001 * next time */ 01002 if (!osync_flag_is_set(engine->cmb_connected) && !engine->slowsync) 01003 osync_group_unlock(engine->group, TRUE); 01004 else 01005 osync_group_unlock(engine->group, FALSE); 01006 } else 01007 osync_group_unlock(engine->group, TRUE); 01008 01009 engine->is_initialized = FALSE; 01010 osync_trace(TRACE_EXIT, "osengine_finalize"); 01011 } 01012 01023 osync_bool osengine_synchronize(OSyncEngine *engine, OSyncError **error) 01024 { 01025 osync_trace(TRACE_INTERNAL, "synchronize now"); 01026 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine); 01027 g_assert(engine); 01028 01029 if (!engine->is_initialized) { 01030 osync_error_set(error, OSYNC_ERROR_GENERIC, "osengine_synchronize: Not initialized"); 01031 goto error; 01032 } 01033 01034 /* We now remember if slow-sync is set right from the start. 01035 * If it is, we dont remove the lock file in the case of 01036 * a error during connect. */ 01037 if (osync_group_get_slow_sync(engine->group, "data")) { 01038 engine->slowsync = TRUE; 01039 } else { 01040 engine->slowsync = FALSE; 01041 } 01042 01043 engine->wasted = 0; 01044 engine->alldeciders = 0; 01045 01046 osync_flag_set(engine->fl_running); 01047 01048 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNCHRONIZE, 0, error); 01049 if (!message) 01050 goto error; 01051 01052 if (!osync_queue_send_message(engine->commands_to_self, NULL, message, error)) 01053 goto error_free_message; 01054 01055 osync_message_unref(message); 01056 01057 osync_trace(TRACE_EXIT, "%s", __func__); 01058 return TRUE; 01059 01060 error_free_message: 01061 osync_message_unref(message); 01062 error: 01063 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 01064 return FALSE; 01065 } 01066 01074 void osengine_flag_only_info(OSyncEngine *engine) 01075 { 01076 osync_flag_unset(engine->fl_sync); 01077 } 01078 01086 void osengine_flag_manual(OSyncEngine *engine) 01087 { 01088 if (engine->syncloop) { 01089 g_warning("Unable to flag manual since engine is already initialized\n"); 01090 } 01091 engine->man_dispatch = TRUE; 01092 } 01093 01100 void osengine_pause(OSyncEngine *engine) 01101 { 01102 osync_flag_unset(engine->fl_running); 01103 } 01104 01112 void osengine_abort(OSyncEngine *engine) 01113 { 01114 osync_flag_set(engine->fl_stop); 01115 } 01116 01123 void osengine_allow_sync_alert(OSyncEngine *engine) 01124 { 01125 engine->allow_sync_alert = TRUE; 01126 } 01127 01134 void osengine_deny_sync_alert(OSyncEngine *engine) 01135 { 01136 engine->allow_sync_alert = FALSE; 01137 } 01138 01149 osync_bool osengine_sync_and_block(OSyncEngine *engine, OSyncError **error) 01150 { 01151 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error); 01152 01153 g_mutex_lock(engine->syncing_mutex); 01154 01155 if (!osengine_synchronize(engine, error)) { 01156 g_mutex_unlock(engine->syncing_mutex); 01157 goto error; 01158 } 01159 01160 g_cond_wait(engine->syncing, engine->syncing_mutex); 01161 g_mutex_unlock(engine->syncing_mutex); 01162 01163 if (engine->error) { 01164 osync_error_duplicate(error, &(engine->error)); 01165 goto error; 01166 } 01167 01168 osync_trace(TRACE_EXIT, "%s", __func__); 01169 return TRUE; 01170 01171 error: 01172 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 01173 return FALSE; 01174 } 01175 01186 osync_bool osengine_wait_sync_end(OSyncEngine *engine, OSyncError **error) 01187 { 01188 g_mutex_lock(engine->syncing_mutex); 01189 g_cond_wait(engine->syncing, engine->syncing_mutex); 01190 g_mutex_unlock(engine->syncing_mutex); 01191 01192 if (engine->error) { 01193 osync_error_duplicate(error, &(engine->error)); 01194 return FALSE; 01195 } 01196 return TRUE; 01197 } 01198 01205 void osengine_wait_info_end(OSyncEngine *engine) 01206 { 01207 g_mutex_lock(engine->info_received_mutex); 01208 g_cond_wait(engine->info_received, engine->info_received_mutex); 01209 g_mutex_unlock(engine->info_received_mutex); 01210 } 01211 01216 void osengine_one_iteration(OSyncEngine *engine) 01217 { 01218 /*TODO: Reimplement support to stepping mode on engine */ 01219 abort();//osync_queue_dispatch(engine->incoming); 01220 } 01221 01228 OSyncMapping *osengine_mapping_from_id(OSyncEngine *engine, long long int id) 01229 { 01230 return osengine_mappingtable_mapping_from_id(engine->maptable, id); 01231 } 01232