Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impossible to create pipeline with multiple processors with same name #1810

Closed
ThibautSF opened this issue Oct 19, 2020 · 4 comments · Fixed by #2218
Closed

Impossible to create pipeline with multiple processors with same name #1810

ThibautSF opened this issue Oct 19, 2020 · 4 comments · Fixed by #2218

Comments

@ThibautSF
Copy link
Contributor

ThibautSF commented Oct 19, 2020

Edit 1

Last sample workaround in #1810 (comment)

Original

Hi,

Use
Elasticsearch 7.9.2
Elastica 7.0.0

I would like to create the following pipeline from PHP with Elastica :

PUT _ingest/pipeline/pageattachment
{
    "description": "Extract attachment information",
    "processors": [
        {
            "attachment": {
                "field": "pageDescBinary",
                "indexed_chars": -1,
                "target_field": "desc_attachment"
            }
        },
        {
            "remove": {
                "field": "pageDescBinary"
            }
        },
        {
            "attachment": {
                "field": "pageSpreadsheetBinary",
                "indexed_chars": -1,
                "target_field": "spreadsheet_attachment"
            }
        },
        {
            "remove": {
                "field": "pageSpreadsheetBinary"
            }
        }
    ]
}

Through manual curl query and some indexation test it works well, but when I want to do this by PHP all processors type which appears more that one time are erased (only the last added is keeped)

$pipeline = new Pipeline($client);
$pipeline->setId('pageattachment')->setDescription('Extract attachment information');

//Create attachment processor
$attachproc = new Attachment('pageDescBinary');
$attachproc->setIndexedChars(-1);
$attachproc->setTargetField('desc_attachment');

//Create remove processor 
$removeproc = new Remove('pageDescBinary');

//Create second attachment processor 
$attachproc2 = new Attachment('pageSpreadsheetBinary');
$attachproc2->setIndexedChars(-1);
$attachproc2->setTargetField('spreadsheet_attachment');

//Create second remove processor 
$removeproc2 = new Remove('pageSpreadsheetBinary');

//Add processors to the pipeline
$pipeline->addProcessor($attachproc);
$pipeline->addProcessor($removeproc);
$pipeline->addProcessor($attachproc2);
$pipeline->addProcessor($removeproc2);

$response = $pipeline->create();

But then the produced pipeline is :

{
  "pageattachment": {
    "description": "Extract attachment information",
    "processors": [
      {
        "attachment": {
          "field": "pageSpreadsheetBinary",
          "indexed_chars": -1,
          "target_field": "spreadsheet_attachment"
        },
        "remove": {
          "field": "pageSpreadsheetBinary"
        }
      }
    ]
  }
}

As we can see, only the second 'attachement' and 'remove' processors are keeped.
The problem is caused by the associative array used by Pipeline class which associate the processor type as a key :

Array ( 
	[processors] => Array ( 
		[attachment] => Array ( 
			[field] => pageSpreadsheetBinary 
			[indexed_chars] => -1 
			[target_field] => spreadsheet_attachment 
		) 
		[remove] => Array (
			[field] => pageSpreadsheetBinary 
		)
	)
)

I tried to find a workarround by using setRawProcessors

$pipeline = new Pipeline($client);
$pipeline->setId('pageattachment')->setDescription('Extract attachment information');

$processors = [
    'processors' => [
        [
            'attachment' => [
                'field' => 'pageDescBinary',
                'indexed_chars' => -1,
                'target_field' => 'desc_attachment',
            ],
        ],
        [
            'remove' => [
                'field' => 'pageDescBinary',
            ],
        ],
        [
            'attachment' => [
                'field' => 'pageSpreadsheetBinary',
                'indexed_chars' => -1,
                'target_field' => 'spreadsheet_attachment',
            ],
        ],
        [
            'remove' => [
                'field' => 'pageSpreadsheetBinary',
            ],
        ],
    ],
];

$pipeline->setRawProcessors($processors);
$response = $pipeline->create();

But this produces an error on Elasticsearch (certainly because php uses 0,1,2... as keys)

[2020-10-19T14:20:04,866][WARN ][r.suppressed             ] [lc-master-1] path: /_ingest/pipeline/pageattachment, params: {id=pageattachment}
java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class java.util.Map (java.util.ArrayList and java.util.Map are in module java.base of loader 'bootstrap')
        at org.elasticsearch.ingest.ConfigurationUtils.readProcessorConfigs(ConfigurationUtils.java:334) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.ingest.Pipeline.create(Pipeline.java:74) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.ingest.IngestService.validatePipeline(IngestService.java:435) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.ingest.IngestService.putPipeline(IngestService.java:340) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.ingest.PutPipelineTransportAction.lambda$masterOperation$0(PutPipelineTransportAction.java:88) ~[elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:89) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:83) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.finishHim(TransportNodesAction.java:241) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.onOperation(TransportNodesAction.java:218) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction.access$000(TransportNodesAction.java:147) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction$1.handleResponse(TransportNodesAction.java:196) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$AsyncAction$1.handleResponse(TransportNodesAction.java:188) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$6.handleResponse(TransportService.java:632) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1162) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1162) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1240) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1220) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:52) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:249) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:245) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java:257) [x-pack-security-7.9.2.jar:7.9.2]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.messageReceived(SecurityServerTransportInterceptor.java:315) [x-pack-security-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:72) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.transport.TransportService$8.doRun(TransportService.java:800) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:737) [elasticsearch-7.9.2.jar:7.9.2]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.9.2.jar:7.9.2]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]

Will try to use Pipeline processor as second workaround and do nested pipelines.
Open for better workaround because this one isn't ideal.

@ruflin
Copy link
Owner

ruflin commented Oct 21, 2020

Probably the best workaround would be to add full or at least have some support for this "natively" in Elastica?

@ThibautSF
Copy link
Contributor Author

I think it will need some enhancements of how processors are stored in Pipeline (maybe even params) and/or how Pipeline is serialized in order to be sent to Elasticsearch.

But I still don't understand enough that part of the code to be sure of how to do that.

@ruflin
Copy link
Owner

ruflin commented Oct 22, 2020

I saw you opened a PR, probably that will make the discussion easier.

@ThibautSF
Copy link
Contributor Author

ThibautSF commented Oct 27, 2020

So!

I found a much better workaround (might be an idea to follow)

use Elastica\Processor\Attachment;
use Elastica\Processor\Lowercase;
use Elastica\Processor\Remove;
use Elastica\Request;

//Create first attachment processor
$attachprocDesc = new Attachment('descBinary');
$attachprocDesc->setIndexedChars(-1);
$attachprocDesc->setTargetField('desc_attachment');

//Create first remove processor
$removeprocDesc = new Remove('descBinary');

//Create second attachment processor (used in a foreach processor)
$attachprocNeweditor = new Attachment('_ingest._value.contentBinary');
$attachprocNeweditor->setIndexedChars(-1);
$attachprocNeweditor->setTargetField('_ingest._value.content');

//Create second remove processor (used in a foreach processor)
$removeprocNeweditor = new Remove('_ingest._value.contentBinary');

$pipelineId = 'mypipeline';
$pipeline = [
    'description' => 'a pipeline',
    'processors' => [
        $attachprocDesc->toArray(), //1st attachment
        $removeprocDesc->toArray(), //1st remove
        [ //1st foreach (manual because not implemented in Elastica)
            'foreach' => [
                'field' => 'subContents',
                'ignore_missing' => true,
                'processor' => $attachprocNeweditor->toArray(), //2nd attachment
            ],
        ],
        [ //2nd foreach (manual because not implemented in Elastica)
            'foreach' => [
                'field' => 'subContents',
                'ignore_missing' => true,
                'processor' => $removeprocNeweditor->toArray(), //2nd remove
            ],
        ],
        (new Lowercase('somefield'))->toArray(),
    ],
];

$path = "_ingest/pipeline/{$pipelineId}";

$client->request($path, Request::PUT, json_encode($pipeline));

Will produce the following pipeline (named 'mypipeline')

{
    "description": "a pipeline",
    "processors": [
        {
            "attachment": {
                "field": "descBinary",
                "indexed_chars": -1,
                "target_field": "desc_attachment"
            }
        },
        {
            "remove": {
                "field": "descBinary"
            }
        },
        {
            "foreach": {
                "field": "subContents",
                "ignore_missing": true,
                "processor": {
                    "attachment": {
                        "field": "_ingest._value.contentBinary",
                        "indexed_chars": -1,
                        "target_field": "_ingest._value.content"
                    }
                }
            }
        },
        {
            "foreach": {
                "field": "subContents",
                "ignore_missing": true,
                "processor": {
                    "remove": {
                        "field": "_ingest._value.contentBinary"
                    }
                }
            }
        },
        {
            "lowercase": {
                "field": "somefield"
            }
        }
    ]
}

McFistyBuns added a commit to McFistyBuns/Elastica that referenced this issue Jun 25, 2024
Refactored processor handling to more closely resemble what Elasticsearch ingest pipeline endpoint expects.

Fixes ruflin#1810
@ruflin ruflin closed this as completed in 8b1826d Jun 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants