From 54659b9af71aef872f05b28b520f66ab8f5afbf3 Mon Sep 17 00:00:00 2001 From: Romain Ruaud Date: Fri, 22 Mar 2024 16:49:38 +0100 Subject: [PATCH] Add Ingest Pipeline support. --- .../Api/Client/ClientInterface.php | 18 +++ .../Api/Index/Ingest/PipelineInterface.php | 44 ++++++++ .../Index/Ingest/PipelineManagerInterface.php | 48 ++++++++ .../Ingest/PipelineProcessorInterface.php | 31 ++++++ .../PipelineProcessorProviderInterface.php | 33 ++++++ .../Client/Client.php | 16 +++ .../Index/AsyncIndexOperation.php | 5 +- .../Index/IndexOperation.php | 23 +++- .../Index/Ingest/Pipeline.php | 68 ++++++++++++ .../Index/Ingest/PipelineManager.php | 103 ++++++++++++++++++ .../Ingest/PipelineProcessorProvider.php | 60 ++++++++++ .../Test/Unit/Index/IndexOperationTest.php | 6 +- src/module-elasticsuite-core/etc/di.xml | 10 ++ 13 files changed, 459 insertions(+), 6 deletions(-) create mode 100644 src/module-elasticsuite-core/Api/Index/Ingest/PipelineInterface.php create mode 100644 src/module-elasticsuite-core/Api/Index/Ingest/PipelineManagerInterface.php create mode 100644 src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorInterface.php create mode 100644 src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorProviderInterface.php create mode 100644 src/module-elasticsuite-core/Index/Ingest/Pipeline.php create mode 100644 src/module-elasticsuite-core/Index/Ingest/PipelineManager.php create mode 100644 src/module-elasticsuite-core/Index/Ingest/PipelineProcessorProvider.php diff --git a/src/module-elasticsuite-core/Api/Client/ClientInterface.php b/src/module-elasticsuite-core/Api/Client/ClientInterface.php index 25c747e5f..3b4512291 100644 --- a/src/module-elasticsuite-core/Api/Client/ClientInterface.php +++ b/src/module-elasticsuite-core/Api/Client/ClientInterface.php @@ -228,4 +228,22 @@ public function deleteByQuery(array $params): array; * @return array */ public function updateByQuery(array $params): array; + + /** + * Run an putPipeline request. + * + * @param array $params Pipeline params. + * + * @return array + */ + public function putPipeline(array $params): array; + + /** + * Run an getPipeline request. + * + * @param string $name Pipeline. + * + * @return array + */ + public function getPipeline(string $name): array; } diff --git a/src/module-elasticsuite-core/Api/Index/Ingest/PipelineInterface.php b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineInterface.php new file mode 100644 index 000000000..9628698cc --- /dev/null +++ b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineInterface.php @@ -0,0 +1,44 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ +namespace Smile\ElasticsuiteCore\Api\Index\Ingest; + +/** + * Pipeline Interface + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +interface PipelineInterface +{ + /** + * Get name + * + * @return string + */ + public function getName(): string; + + /** + * Get description + * + * @return string + */ + public function getDescription(): string; + + /** + * Get processors + * + * @return array + */ + public function getProcessors(): array; +} diff --git a/src/module-elasticsuite-core/Api/Index/Ingest/PipelineManagerInterface.php b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineManagerInterface.php new file mode 100644 index 000000000..9e690fcca --- /dev/null +++ b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineManagerInterface.php @@ -0,0 +1,48 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ +namespace Smile\ElasticsuiteCore\Api\Index\Ingest; + +use Smile\ElasticsuiteCore\Index\Ingest\Pipeline; + +/** + * Pipeline Manager + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +interface PipelineManagerInterface +{ + /** + * Create ingest pipeline. + * + * @param string $name name pipeline + * @param string $description description pipeline + * @param array $processors processors pipeline + */ + public function create(string $name, string $description, array $processors): ?Pipeline; + + /** + * Get Pipeline by name. + * + * @param string $name Pipeline name. + */ + public function get(string $name): ?Pipeline; + + /** + * Create Pipeline by index identifier. + * + * @param string $identifier Index identifier + */ + public function createByIndexIdentifier(string $identifier): ?Pipeline; +} diff --git a/src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorInterface.php b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorInterface.php new file mode 100644 index 000000000..597f19de6 --- /dev/null +++ b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorInterface.php @@ -0,0 +1,31 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ + +namespace Smile\ElasticsuiteCore\Api\Index\Ingest; + +/** + * Pipeline Processor + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +interface PipelineProcessorInterface +{ + /** + * Get Pipeline processors. + * + * @return array + */ + public function getProcessors(): array; +} diff --git a/src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorProviderInterface.php b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorProviderInterface.php new file mode 100644 index 000000000..86976f477 --- /dev/null +++ b/src/module-elasticsuite-core/Api/Index/Ingest/PipelineProcessorProviderInterface.php @@ -0,0 +1,33 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ + +namespace Smile\ElasticsuiteCore\Api\Index\Ingest; + +/** + * Pipeline Processor Provider + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +interface PipelineProcessorProviderInterface +{ + /** + * Get Pipeline processors by index identifier. + * + * @param string $indexIdentifier Index Identifier + * + * @return \Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineProcessorInterface[] + */ + public function getProcessors(string $indexIdentifier) : array; +} diff --git a/src/module-elasticsuite-core/Client/Client.php b/src/module-elasticsuite-core/Client/Client.php index aaab3eca6..f22ae5c68 100644 --- a/src/module-elasticsuite-core/Client/Client.php +++ b/src/module-elasticsuite-core/Client/Client.php @@ -261,6 +261,22 @@ public function updateByQuery(array $params): array return $this->getEsClient()->updateByQuery($params); } + /** + * {@inheritDoc} + */ + public function putPipeline(array $params): array + { + return $this->getEsClient()->ingest()->putPipeline($params); + } + + /** + * {@inheritDoc} + */ + public function getPipeline(string $name): array + { + return $this->getEsClient()->ingest()->getPipeline(['id' => $name]); + } + /** * @return \OpenSearch\Client */ diff --git a/src/module-elasticsuite-core/Index/AsyncIndexOperation.php b/src/module-elasticsuite-core/Index/AsyncIndexOperation.php index 9c4287348..229a21cba 100644 --- a/src/module-elasticsuite-core/Index/AsyncIndexOperation.php +++ b/src/module-elasticsuite-core/Index/AsyncIndexOperation.php @@ -15,6 +15,7 @@ use Smile\ElasticsuiteCore\Api\Client\ClientConfigurationInterface; use Smile\ElasticsuiteCore\Api\Index\AsyncIndexOperationInterface; +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineManagerInterface; /** * Asynchronous Index Operations interface @@ -47,6 +48,7 @@ class AsyncIndexOperation extends IndexOperation implements AsyncIndexOperationI * @param \Smile\ElasticsuiteCore\Api\Client\ClientInterface $client ES client. * @param \Smile\ElasticsuiteCore\Api\Client\ClientConfigurationInterface $clientConfiguration ES client configuration. * @param \Smile\ElasticsuiteCore\Api\Index\IndexSettingsInterface $indexSettings ES settings. + * @param PipelineManagerInterface $pipelineManager Ingest Pipeline Manager. * @param \Psr\Log\LoggerInterface $logger Logger access. */ public function __construct( @@ -54,11 +56,12 @@ public function __construct( \Smile\ElasticsuiteCore\Api\Client\ClientInterface $client, \Smile\ElasticsuiteCore\Api\Client\ClientConfigurationInterface $clientConfiguration, \Smile\ElasticsuiteCore\Api\Index\IndexSettingsInterface $indexSettings, + PipelineManagerInterface $pipelineManager, \Psr\Log\LoggerInterface $logger ) { $this->client = $client; $this->parallelHandles = $clientConfiguration->getMaxParallelHandles(); - parent::__construct($objectManager, $client, $indexSettings, $logger); + parent::__construct($objectManager, $client, $indexSettings, $pipelineManager, $logger); } /** diff --git a/src/module-elasticsuite-core/Index/IndexOperation.php b/src/module-elasticsuite-core/Index/IndexOperation.php index d2773b4d8..45c2d2ffc 100644 --- a/src/module-elasticsuite-core/Index/IndexOperation.php +++ b/src/module-elasticsuite-core/Index/IndexOperation.php @@ -16,6 +16,7 @@ use Smile\ElasticsuiteCore\Api\Index\Bulk\BulkResponseInterface; use Smile\ElasticsuiteCore\Api\Index\IndexOperationInterface; +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineManagerInterface; /** * Default implementation of operation on indices (\Smile\ElasticsuiteCore\Api\Index\IndexOperationInterface). @@ -51,6 +52,11 @@ class IndexOperation implements IndexOperationInterface */ private $client; + /** + * @var \Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineManagerInterface + */ + private $pipelineManager; + /** * @var \Psr\Log\LoggerInterface */ @@ -59,21 +65,24 @@ class IndexOperation implements IndexOperationInterface /** * Instanciate the index operation manager. * - * @param \Magento\Framework\ObjectManagerInterface $objectManager Object manager. - * @param \Smile\ElasticsuiteCore\Api\Client\ClientInterface $client ES client. - * @param \Smile\ElasticsuiteCore\Api\Index\IndexSettingsInterface $indexSettings ES settings. - * @param \Psr\Log\LoggerInterface $logger Logger access. + * @param \Magento\Framework\ObjectManagerInterface $objectManager Object manager. + * @param \Smile\ElasticsuiteCore\Api\Client\ClientInterface $client ES client. + * @param \Smile\ElasticsuiteCore\Api\Index\IndexSettingsInterface $indexSettings ES settings. + * @param PipelineManagerInterface $pipelineManager Ingest Pipeline Manager. + * @param \Psr\Log\LoggerInterface $logger Logger access. */ public function __construct( \Magento\Framework\ObjectManagerInterface $objectManager, \Smile\ElasticsuiteCore\Api\Client\ClientInterface $client, \Smile\ElasticsuiteCore\Api\Index\IndexSettingsInterface $indexSettings, + PipelineManagerInterface $pipelineManager, \Psr\Log\LoggerInterface $logger ) { $this->objectManager = $objectManager; $this->client = $client; $this->indexSettings = $indexSettings; $this->indicesConfiguration = $indexSettings->getIndicesConfig(); + $this->pipelineManager = $pipelineManager; $this->logger = $logger; } @@ -138,6 +147,12 @@ public function createIndex($indexIdentifier, $store) // @codingStandardsIgnoreEnd $indexSettings['settings']['analysis'] = $this->indexSettings->getAnalysisSettings($store); + // Add (and create, if needed) default pipeline. + $pipeline = $this->pipelineManager->createByIndexIdentifier($indexIdentifier); + if ($pipeline !== null) { + $indexSettings['settings']['default_pipeline'] = $pipeline->getName(); + } + $this->client->createIndex($index->getName(), $indexSettings); $this->client->putMapping($index->getName(), $index->getMapping()->asArray()); diff --git a/src/module-elasticsuite-core/Index/Ingest/Pipeline.php b/src/module-elasticsuite-core/Index/Ingest/Pipeline.php new file mode 100644 index 000000000..7af08a7d4 --- /dev/null +++ b/src/module-elasticsuite-core/Index/Ingest/Pipeline.php @@ -0,0 +1,68 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ + +namespace Smile\ElasticsuiteCore\Index\Ingest; + +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineInterface; + +/** + * Ingest Pipeline Implementation + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +class Pipeline implements PipelineInterface +{ + /** + * @param string $name Pipeline name + * @param string $description Pipeline description + * @param array $processors Pipeline processors + */ + public function __construct( + protected string $name, + protected string $description, + protected array $processors + ) { + } + + /** + * Get name + * + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * Get description + * + * @return string + */ + public function getDescription(): string + { + return $this->description; + } + + /** + * Get processors + * + * @return array + */ + public function getProcessors(): array + { + return $this->processors; + } +} diff --git a/src/module-elasticsuite-core/Index/Ingest/PipelineManager.php b/src/module-elasticsuite-core/Index/Ingest/PipelineManager.php new file mode 100644 index 000000000..e0b638abf --- /dev/null +++ b/src/module-elasticsuite-core/Index/Ingest/PipelineManager.php @@ -0,0 +1,103 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ + +namespace Smile\ElasticsuiteCore\Index\Ingest; + +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineManagerInterface; +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineProcessorProviderInterface; +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineInterfaceFactory; +use Smile\ElasticsuiteCore\Api\Client\ClientInterface; + +/** + * Pipeline Manager + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +class PipelineManager implements PipelineManagerInterface +{ + /** + * @param ClientInterface $client Elasticsuite Client + * @param PipelineProcessorProviderInterface $processorProvider Processor Provider + * @param PipelineInterfaceFactory $pipelineFactory Processor Provider + * @param string $pipelinePrefix Pipeline Prefix + */ + public function __construct( + private ClientInterface $client, + private PipelineProcessorProviderInterface $processorProvider, + private PipelineInterfaceFactory $pipelineFactory, + private string $pipelinePrefix = "es-llm-pipeline-" + ) { + } + + /** + * {@inheritDoc} + */ + public function create(string $name, string $description, array $processors): ?Pipeline + { + $result = null; + + if (!empty($processors)) { + $query = [ + 'id' => $name, + 'body' => [ + 'description' => $description, + 'processors' => $processors, + ], + ]; + $this->client->putPipeline($query); + + $result = $this->pipelineFactory->create(['name' => $name, 'description' => $description, 'processors' => $processors]); + } + + return $result; + } + + /** + * {@inheritDoc} + */ + public function get(string $name): ?Pipeline + { + try { + $data = $this->client->getPipeline($name); + $description = ''; + if (\array_key_exists('description', $data[$name])) { + $description = $data[$name]['description']; + } + + return $this->pipelineFactory->create( + ['name' => $name, 'description' => $description, 'processors' => $data[$name]['processors']] + ); + } catch (\Exception $exception) { + return null; + } + } + + /** + * {@inheritDoc} + * + * @throws \Exception + */ + public function createByIndexIdentifier(string $identifier): ?Pipeline + { + $pipelineName = $this->pipelinePrefix . $identifier; + $processors = []; + + foreach ($this->processorProvider->getProcessors($identifier) as $processor) { + $processors = array_merge_recursive($processors, $processor->getProcessors()); + } + + return $this->create($pipelineName, $pipelineName, $processors); + } +} diff --git a/src/module-elasticsuite-core/Index/Ingest/PipelineProcessorProvider.php b/src/module-elasticsuite-core/Index/Ingest/PipelineProcessorProvider.php new file mode 100644 index 000000000..6c72a53bc --- /dev/null +++ b/src/module-elasticsuite-core/Index/Ingest/PipelineProcessorProvider.php @@ -0,0 +1,60 @@ + + * @copyright 2024 Smile + * @license Open Software License ("OSL") v. 3.0 + */ + +namespace Smile\ElasticsuiteCore\Index\Ingest; + +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineProcessorProviderInterface; + +/** + * Pipeline Processor Provider + * + * @category Smile + * @package Smile\ElasticsuiteCore + * @author Pierre Gauthier + */ +class PipelineProcessorProvider implements PipelineProcessorProviderInterface +{ + /** + * @var array + */ + private $processors; + + /** + * PipelineProcessorProvider constructor. + * + * @param array $processors The processors (from DI). + */ + public function __construct(array $processors = []) + { + $this->processors = $processors; + } + + /** + * {@inheritDoc} + */ + public function getProcessors(string $indexIdentifier) : array + { + $processors = []; + + foreach ($this->processors[$indexIdentifier] ?? [] as $name => $processor) { + if (!$processor instanceof \Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineProcessorInterface) { + throw new \InvalidArgumentException( + 'Processor must implement ' . \Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineProcessorInterface::class + ); + } + $processors[$name] = $processor; + } + + return $processors; + } +} diff --git a/src/module-elasticsuite-core/Test/Unit/Index/IndexOperationTest.php b/src/module-elasticsuite-core/Test/Unit/Index/IndexOperationTest.php index edc1d1b58..fc3e4cf15 100644 --- a/src/module-elasticsuite-core/Test/Unit/Index/IndexOperationTest.php +++ b/src/module-elasticsuite-core/Test/Unit/Index/IndexOperationTest.php @@ -13,6 +13,7 @@ */ namespace Smile\ElasticsuiteCore\Test\Unit\Index; +use Smile\ElasticsuiteCore\Api\Index\Ingest\PipelineManagerInterface; use Smile\ElasticsuiteCore\Index\IndexOperation; /** @@ -52,9 +53,12 @@ protected function setUp(): void $objectManager = $this->getObjectManagerMock(); $indexSettings = $this->getIndexSettingsMock(); + $pipelineManager = $this->getMockBuilder(PipelineManagerInterface::class) + ->disableOriginalConstructor() + ->getMock(); $logger = $this->getLoggerMock(); - $this->indexOperation = new IndexOperation($objectManager, $this->clientMock, $indexSettings, $logger); + $this->indexOperation = new IndexOperation($objectManager, $this->clientMock, $indexSettings, $pipelineManager, $logger); } /** diff --git a/src/module-elasticsuite-core/etc/di.xml b/src/module-elasticsuite-core/etc/di.xml index 9a5fa6cb6..2149afad3 100644 --- a/src/module-elasticsuite-core/etc/di.xml +++ b/src/module-elasticsuite-core/etc/di.xml @@ -82,6 +82,16 @@ + + + + + + +