diff --git a/sync-core/src/main/java/eu/darken/octi/sync/core/SyncManager.kt b/sync-core/src/main/java/eu/darken/octi/sync/core/SyncManager.kt index 03b01f1b..babf178a 100644 --- a/sync-core/src/main/java/eu/darken/octi/sync/core/SyncManager.kt +++ b/sync-core/src/main/java/eu/darken/octi/sync/core/SyncManager.kt @@ -93,6 +93,7 @@ class SyncManager @Inject constructor( log(TAG) { "sync(options=$options)" } val syncJobs = connectors.first().map { scope.launch { + // TODO error handling sync(it.identifier, options = options) } } diff --git a/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt b/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt index 087d06eb..29f2297f 100644 --- a/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt +++ b/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt @@ -25,6 +25,7 @@ import eu.darken.octi.sync.core.SyncConnector import eu.darken.octi.sync.core.SyncConnectorState import eu.darken.octi.sync.core.SyncOptions import eu.darken.octi.sync.core.SyncRead +import eu.darken.octi.sync.core.SyncSettings import eu.darken.octi.sync.core.SyncWrite import eu.darken.octi.syncs.gdrive.core.GDriveEnvironment.Companion.APPDATAFOLDER import kotlinx.coroutines.CoroutineScope @@ -45,15 +46,17 @@ import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import okio.IOException import java.time.Instant +import com.google.api.services.drive.model.File as GDriveFile class GDriveAppDataConnector @AssistedInject constructor( @Assisted private val client: GoogleClient, @AppScope private val scope: CoroutineScope, - private val dispatcherProvider: DispatcherProvider, + dispatcherProvider: DispatcherProvider, @ApplicationContext private val context: Context, private val networkStateProvider: NetworkStateProvider, private val supportedModuleIds: Set<@JvmSuppressWildcards ModuleId>, + private val syncSettings: SyncSettings, ) : GDriveBaseConnector(dispatcherProvider, context, client), SyncConnector { data class State( @@ -110,10 +113,12 @@ class GDriveAppDataConnector @AssistedInject constructor( override suspend fun resetData(): Unit = withContext(NonCancellable) { log(TAG, INFO) { "resetData()" } runDriveAction("reset-data") { - appDataRoot - .listFiles() - .forEach { it.deleteAll() } - _state.updateBlocking { copy(isDead = true) } + appDataRoot.child(DEVICE_DATA_DIR_NAME) + ?.listFiles() + ?.forEach { file: GDriveFile -> + log(TAG, INFO) { "resetData(): Deleting $file" } + file.deleteAll() + } } } @@ -124,9 +129,12 @@ class GDriveAppDataConnector @AssistedInject constructor( ?.listFiles() ?.onEach { log(TAG, DEBUG) { "deleteDevice(): Checking $it" } } ?.singleOrNull { it.name == deviceId.id } - ?.onEach { log(TAG, WARN) { "deleteDevice(): Deleting $it" } } + ?.onEach { log(TAG, INFO) { "deleteDevice(): Deleting $it" } } ?.deleteAll() - _state.updateBlocking { copy(isDead = true) } + if (deviceId == syncSettings.deviceId) { + log(TAG, WARN) { "We just deleted ourselves, this connector is dead now" } + _state.updateBlocking { copy(isDead = true) } + } } } @@ -153,12 +161,11 @@ class GDriveAppDataConnector @AssistedInject constructor( val deviceFetchJobs = validDeviceDirs.map { deviceDir -> scope.async deviceFetch@{ + log(TAG, DEBUG) { "readDrive(): Reading module data for device: $deviceDir" } val moduleDirs = deviceDir.listFiles().filter { supportedModuleIds.contains(ModuleId(it.name)) } - - log(TAG, VERBOSE) { "readDrive(): Reading module data for device: $deviceDir" } - val moduleFetchJobs = moduleDirs.map { moduleFile -> scope.async moduleFetch@{ + log(TAG, VERBOSE) { "readDrive(): Reading ${moduleFile.name} for ${deviceDir.name}" } val payload = moduleFile.readData() if (payload == null) { @@ -172,11 +179,12 @@ class GDriveAppDataConnector @AssistedInject constructor( moduleId = ModuleId(moduleFile.name), modifiedAt = Instant.ofEpochMilli(moduleFile.modifiedTime.value), payload = payload, - ).also { log(TAG, VERBOSE) { "readDrive(): Module data: $it" } } + ).also { log(TAG, VERBOSE) { "readDrive(): Got module data: $it" } } } } val moduleData = moduleFetchJobs.awaitAll().filterNotNull() + log(TAG, DEBUG) { "readDrive(): Finished ${deviceDir.name}" } GDriveDeviceData( deviceId = DeviceId(deviceDir.name), @@ -186,8 +194,8 @@ class GDriveAppDataConnector @AssistedInject constructor( } val devices = deviceFetchJobs.awaitAll() - log(TAG) { "readDrive() took ${System.currentTimeMillis() - start}ms" } + return GDriveData( connectorId = identifier, devices = devices, @@ -246,7 +254,6 @@ class GDriveAppDataConnector @AssistedInject constructor( } } catch (e: Exception) { log(TAG, ERROR) { "sync(): Failed to read: ${e.asLog()}" } - _state.updateBlocking { copy(lastError = e) } } } @@ -257,7 +264,7 @@ class GDriveAppDataConnector @AssistedInject constructor( } private suspend fun GDriveEnvironment.writeDrive(data: SyncWrite) = withContext(NonCancellable) { - log(TAG, DEBUG) { "writeDrive(): $data)" } + log(TAG, DEBUG) { "writeDrive(): $data" } // TODO cache write data for when we are online again? if (!isInternetAvailable()) { @@ -268,8 +275,9 @@ class GDriveAppDataConnector @AssistedInject constructor( val userDir = appDataRoot.child(DEVICE_DATA_DIR_NAME) ?.also { if (!it.isDirectory) throw IllegalStateException("devices is not a directory: $it") } ?: run { - appDataRoot.createDir(folderName = DEVICE_DATA_DIR_NAME) - .also { log(TAG, INFO) { "write(): Created devices dir $it" } } + appDataRoot.createDir(folderName = DEVICE_DATA_DIR_NAME).also { + log(TAG, INFO) { "write(): Created devices dir $it" } + } } val deviceIdRaw = data.deviceId.id @@ -323,16 +331,18 @@ class GDriveAppDataConnector @AssistedInject constructor( return try { _state.updateBlocking { copy(activeActions = activeActions + 1) } - try { - withDrive { - driveLock.withLock { - block() - } + + withDrive { + driveLock.withLock { + block() } - } catch (e: Exception) { - log(TAG, ERROR) { "runDriveAction($tag) failed: ${e.asLog()}" } - throw e + }.also { + _state.updateBlocking { copy(lastError = null) } } + } catch (e: Exception) { + log(TAG, ERROR) { "runDriveAction($tag) failed: ${e.asLog()}" } + _state.updateBlocking { copy(lastError = e) } + throw e } finally { _state.updateBlocking { log(TAG, VERBOSE) { "runDriveAction($tag) finished" } diff --git a/syncs-kserver/src/main/java/eu/darken/octi/syncs/kserver/core/KServerConnector.kt b/syncs-kserver/src/main/java/eu/darken/octi/syncs/kserver/core/KServerConnector.kt index 93297233..5f34506d 100644 --- a/syncs-kserver/src/main/java/eu/darken/octi/syncs/kserver/core/KServerConnector.kt +++ b/syncs-kserver/src/main/java/eu/darken/octi/syncs/kserver/core/KServerConnector.kt @@ -183,7 +183,6 @@ class KServerConnector @AssistedInject constructor( } } catch (e: Exception) { log(TAG, ERROR) { "Failed to read: ${e.asLog()}" } - _state.updateBlocking { copy(lastError = e) } } } } @@ -258,11 +257,6 @@ class KServerConnector @AssistedInject constructor( log(TAG, VERBOSE) { "writeServer(): Done" } } - private fun getStorageStats(): SyncConnectorState.Quota { - log(TAG, VERBOSE) { "getStorageStats()" } - return SyncConnectorState.Quota() - } - private suspend fun runServerAction( tag: String, block: suspend () -> R, @@ -273,14 +267,13 @@ class KServerConnector @AssistedInject constructor( return try { _state.updateBlocking { copy(activeActions = activeActions + 1) } - try { - serverLock.withLock { - withContext(NonCancellable) { block() } - } - } catch (e: Exception) { - log(TAG, ERROR) { "runServerAction($tag) failed: ${e.asLog()}" } - throw e + serverLock.withLock { + withContext(NonCancellable) { block() } } + } catch (e: Exception) { + log(TAG, ERROR) { "runServerAction($tag) failed: ${e.asLog()}" } + _state.updateBlocking { copy(lastError = e) } + throw e } finally { _state.updateBlocking { log(TAG, VERBOSE) { "runServerAction($tag) finished" }