diff --git a/packages/sync/src/class-listener.php b/packages/sync/src/class-listener.php index a5b9c668d307f..8073e11b87f79 100644 --- a/packages/sync/src/class-listener.php +++ b/packages/sync/src/class-listener.php @@ -211,14 +211,6 @@ public function action_handler( ...$args ) { public function bulk_enqueue_full_sync_actions( $action_name, $args_array ) { $queue = $this->get_full_sync_queue(); - /* - * Periodically check the size of the queue, and disable adding to it if - * it exceeds some limit AND the oldest item exceeds the age limit (i.e. sending has stopped). - */ - if ( ! $this->can_add_to_queue( $queue ) ) { - return; - } - /* * If we add any items to the queue, we should try to ensure that our script * can't be killed before they are sent. diff --git a/packages/sync/src/class-lock.php b/packages/sync/src/class-lock.php new file mode 100644 index 0000000000000..84d87bc87e965 --- /dev/null +++ b/packages/sync/src/class-lock.php @@ -0,0 +1,65 @@ +continue_enqueuing( $full_sync_config, $enqueue_status ); + $this->continue_enqueuing( $full_sync_config ); return true; } @@ -174,78 +174,108 @@ public function start( $module_configs = null ) { * @access public * * @param array $configs Full sync configuration for all sync modules. - * @param array $enqueue_status Current status of the queue, indexed by sync modules. */ - public function continue_enqueuing( $configs = null, $enqueue_status = null ) { - if ( ! $this->is_started() || $this->get_status_option( 'queue_finished' ) ) { + public function continue_enqueuing( $configs = null ) { + if ( ! $this->is_started() || ! ( new Lock() )->attempt( self::ENQUEUE_LOCK_NAME ) || $this->get_status_option( 'queue_finished' ) ) { return; } - // If full sync queue is full, don't enqueue more items. - $max_queue_size_full_sync = Settings::get_setting( 'max_queue_size_full_sync' ); - $full_sync_queue = new Queue( 'full_sync' ); + $this->enqueue( $configs ); - $available_queue_slots = $max_queue_size_full_sync - $full_sync_queue->size(); + ( new Lock() )->remove( self::ENQUEUE_LOCK_NAME ); + } - if ( $available_queue_slots <= 0 ) { - return; - } else { - $remaining_items_to_enqueue = min( Settings::get_setting( 'max_enqueue_full_sync' ), $available_queue_slots ); - } + /** + * Get Modules that are configured to Full Sync and haven't finished enqueuing + * + * @param array $configs Full sync configuration for all sync modules. + * + * @return array + */ + public function get_remaining_modules_to_enqueue( $configs ) { + $enqueue_status = $this->get_enqueue_status(); + return array_filter( + Modules::get_modules(), + /** + * Select configured and not finished modules. + * + * @var $module Module + * @return bool + */ + function ( $module ) use ( $configs, $enqueue_status ) { + // Skip module if not configured for this sync or module is done. + if ( ! isset( $configs[ $module->name() ] ) ) { + return false; + } + if ( ! $configs[ $module->name() ] ) { + return false; + } + if ( isset( $enqueue_status[ $module->name() ][2] ) ) { + if ( true === $enqueue_status[ $module->name() ][2] ) { + return false; + } + } + return true; + } + ); + } + + /** + * Enqueue the next items to sync. + * + * @access public + * + * @param array $configs Full sync configuration for all sync modules. + */ + public function enqueue( $configs = null ) { if ( ! $configs ) { $configs = $this->get_config(); } - if ( ! $enqueue_status ) { - $enqueue_status = $this->get_enqueue_status(); + $enqueue_status = $this->get_enqueue_status(); + $full_sync_queue = new Queue( 'full_sync' ); + $available_queue_slots = Settings::get_setting( 'max_queue_size_full_sync' ) - $full_sync_queue->size(); + + if ( $available_queue_slots <= 0 ) { + return; } - $modules = Modules::get_modules(); - $modules_processed = 0; - foreach ( $modules as $module ) { - $module_name = $module->name(); - - // Skip module if not configured for this sync or module is done. - if ( ! isset( $configs[ $module_name ] ) - || // No module config. - ! $configs[ $module_name ] - || // No enqueue status. - ! $enqueue_status[ $module_name ] - || // Finished enqueuing this module. - true === $enqueue_status[ $module_name ][2] ) { - $modules_processed ++; - continue; - } + $remaining_items_to_enqueue = min( Settings::get_setting( 'max_enqueue_full_sync' ), $available_queue_slots ); - list( $items_enqueued, $next_enqueue_state ) = $module->enqueue_full_sync_actions( $configs[ $module_name ], $remaining_items_to_enqueue, $enqueue_status[ $module_name ][2] ); + /** + * If a module exits early (e.g. because it ran out of full sync queue slots, or we ran out of request time) + * then it should exit early + */ + foreach ( $this->get_remaining_modules_to_enqueue( $configs ) as $module ) { + list( $items_enqueued, $next_enqueue_state ) = $module->enqueue_full_sync_actions( $configs[ $module->name() ], $remaining_items_to_enqueue, $enqueue_status[ $module->name() ][2] ); - $enqueue_status[ $module_name ][2] = $next_enqueue_state; + $enqueue_status[ $module->name() ][2] = $next_enqueue_state; // If items were processed, subtract them from the limit. if ( ! is_null( $items_enqueued ) && $items_enqueued > 0 ) { - $enqueue_status[ $module_name ][1] += $items_enqueued; - $remaining_items_to_enqueue -= $items_enqueued; + $enqueue_status[ $module->name() ][1] += $items_enqueued; + $remaining_items_to_enqueue -= $items_enqueued; } - if ( true === $next_enqueue_state ) { - $modules_processed ++; - } - // Stop processing if we've reached our limit of items to enqueue. - if ( 0 >= $remaining_items_to_enqueue ) { - break; + if ( 0 >= $remaining_items_to_enqueue || true !== $next_enqueue_state ) { + $this->set_enqueue_status( $enqueue_status ); + return; } } + $this->queue_full_sync_end( $configs ); $this->set_enqueue_status( $enqueue_status ); + } - if ( count( $modules ) > $modules_processed ) { - return; - } - - // Setting autoload to true means that it's faster to check whether we should continue enqueuing. - $this->update_status_option( 'queue_finished', time(), true ); - + /** + * Enqueue 'jetpack_full_sync_end' and update 'queue_finished' status. + * + * @access public + * + * @param array $configs Full sync configuration for all sync modules. + */ + public function queue_full_sync_end( $configs ) { $range = $this->get_content_range( $configs ); /** @@ -259,6 +289,9 @@ public function continue_enqueuing( $configs = null, $enqueue_status = null ) { * @param array $range Range of the sync items, containing min and max IDs for some item types. */ do_action( 'jetpack_full_sync_end', '', $range ); + + // Setting autoload to true means that it's faster to check whether we should continue enqueuing. + $this->update_status_option( 'queue_finished', time(), true ); } /** @@ -537,6 +570,7 @@ public function clear_status() { public function reset_data() { $this->clear_status(); $this->delete_config(); + ( new Lock() )->remove( self::ENQUEUE_LOCK_NAME ); $listener = Listener::get_instance(); $listener->get_full_sync_queue()->reset(); @@ -636,68 +670,4 @@ private function get_config() { return \Jetpack_Options::get_raw_option( 'jetpack_sync_full_config' ); } - /** - * Update an option manually to bypass filters and caching. - * - * @access private - * - * @param string $name Option name. - * @param mixed $value Option value. - * @return int The number of updated rows in the database. - */ - private function write_option( $name, $value ) { - // We write our own option updating code to bypass filters/caching/etc on set_option/get_option. - global $wpdb; - $serialized_value = maybe_serialize( $value ); - - /** - * Try updating, if no update then insert - * TODO: try to deal with the fact that unchanged values can return updated_num = 0 - * below we used "insert ignore" to at least suppress the resulting error. - */ - $updated_num = $wpdb->query( - $wpdb->prepare( - "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", - $serialized_value, - $name - ) - ); - - if ( ! $updated_num ) { - $updated_num = $wpdb->query( - $wpdb->prepare( - "INSERT IGNORE INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )", - $name, - $serialized_value - ) - ); - } - return $updated_num; - } - - /** - * Update an option manually to bypass filters and caching. - * - * @access private - * - * @param string $name Option name. - * @param mixed $default Default option value. - * @return mixed Option value. - */ - private function read_option( $name, $default = null ) { - global $wpdb; - $value = $wpdb->get_var( - $wpdb->prepare( - "SELECT option_value FROM $wpdb->options WHERE option_name = %s LIMIT 1", - $name - ) - ); - $value = maybe_unserialize( $value ); - - if ( null === $value && null !== $default ) { - return $default; - } - - return $value; - } } diff --git a/tests/php/sync/test_class.jetpack-sync-lock.php b/tests/php/sync/test_class.jetpack-sync-lock.php new file mode 100644 index 0000000000000..1d6740b73c6ab --- /dev/null +++ b/tests/php/sync/test_class.jetpack-sync-lock.php @@ -0,0 +1,50 @@ +assertTrue( ( new Lock() )->attempt( 'test' ) ); + $this->assertFalse( ( new Lock() )->attempt( 'test' ) ); + } + + /** + * Test remove lock + */ + public function test_remove_lock() { + $this->assertTrue( ( new Lock() )->attempt( 'test' ) ); + ( new Lock() )->remove( 'test' ); + $this->assertTrue( ( new Lock() )->attempt( 'test' ) ); + } + + /** + * Test two locks with different name + */ + public function test_two_locks_different_name() { + $this->assertTrue( ( new Lock() )->attempt( 'test' ) ); + $this->assertTrue( ( new Lock() )->attempt( 'test2' ) ); + } + + /** + * Test two locks with different name remove one lock + */ + public function test_two_locks_different_name_remove_one_lock() { + $this->assertTrue( ( new Lock() )->attempt( 'test' ) ); + $this->assertTrue( ( new Lock() )->attempt( 'test2' ) ); + ( new Lock() )->remove( 'test' ); + $this->assertFalse( ( new Lock() )->attempt( 'test2' ) ); + $this->assertTrue( ( new Lock() )->attempt( 'test' ) ); + } +}