From bd19ad786703e4ebba76697cc05fe33c945ea6d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Tue, 27 Aug 2024 10:06:54 +0200 Subject: [PATCH] Add a "main loop" that processes each stage of the pipeline explicitly --- pipes-controller.php | 103 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 91 insertions(+), 12 deletions(-) diff --git a/pipes-controller.php b/pipes-controller.php index d70c772..5fc8510 100644 --- a/pipes-controller.php +++ b/pipes-controller.php @@ -599,6 +599,11 @@ function (ZipStreamReader $zip_reader, ByteStreamState $state) { } } +// ---------------------------------------------------------------------------- +// Here's a stream-based pipeline that fetches a ZIP file from a remote server, +// unzips it, skips the first file, processes the XML files, and uppercases the +// output. +// ---------------------------------------------------------------------------- $chain = new StreamChain( [ 'http' => HTTP_Client::stream([ @@ -624,18 +629,92 @@ function (ZipStreamReader $zip_reader, ByteStreamState $state) { // var_dump([$chain->next_chunk(), strlen($chain->get_bytes()), $chain->get_last_error()]); // Or like this: -$chain->stop_on_errors(true); -foreach($chain as $chunk) { - switch($chunk->get_chunk_type()) { - case '#error': - echo "Error: " . $chunk->get_last_error() . "\n"; - break; - case '#bytes': - var_dump([ - $chunk->get_bytes(), - 'zip file_id' => isset($chain['zip']) ? $chain['zip']->get_file_id() : null - ]); +// $chain->stop_on_errors(true); +// foreach($chain as $chunk) { +// switch($chunk->get_chunk_type()) { +// case '#error': +// echo "Error: " . $chunk->get_last_error() . "\n"; +// break; +// case '#bytes': +// var_dump([ +// $chunk->get_bytes(), +// 'zip file_id' => isset($chain['zip']) ? $chain['zip']->get_file_id() : null +// ]); +// break; +// } +// } + + +// ---------------------------------------------------------- +// And here's a loop-based pipeline that does the same thing: +// ---------------------------------------------------------- + +$client = new Client(); +$client->enqueue([ + new Request('http://127.0.0.1:9864/export.wxr.zip'), + new Request('http://127.0.0.1:9865') +]); + +$zip_readers = []; +$xml_processors = []; +$xml_tokens_found = []; +while ($client->await_next_event()) { + // Fetch HTTP data + $request = $client->get_request(); + switch ($client->get_event()) { + case Client::EVENT_BODY_CHUNK_AVAILABLE: + // Continue to the next stage break; + case Client::EVENT_FAILED: + error_log('Request failed: ' . $request->error); + default: + continue 2; + } + + // Unzip the file + $zip_reader = $zip_readers[$request->id] ?? new ZipStreamReader(); + $zip_reader->append_bytes($client->get_response_body_chunk()); + while ($zip_reader->next()) { + switch ($zip_reader->get_state()) { + case ZipStreamReader::STATE_FILE_ENTRY: + // Continue to the next stage + break; + default: + continue 2; + } + + if($zip_reader->get_file_path() === 'export.wxr') { + continue; + } + + // Process the XML + $xml_processor = $xml_processors[$request->id] ?? new WP_XML_Processor('', [], WP_XML_Processor::IN_PROLOG_CONTEXT); + $xml_processor->append_bytes($zip_reader->get_file_body_chunk()); + + $xml_tokens_found[$request->id] ??= 0; + while ($xml_processor->next_token()) { + ++$xml_tokens_found[$request->id]; + // Process the XML + } + + $buffer = ''; + if ($xml_tokens_found[$request->id] > 0) { + $buffer .= $xml_processor->get_updated_xml(); + } else if ( + $xml_tokens_found[$request->id] === 0 && + !$xml_processor->is_paused_at_incomplete_input() && + $xml_processor->get_current_depth() === 0 + ) { + // We've reached the end of the document, let's finish up. + // @TODO: Fix this so it doesn't return the entire XML + $buffer .= $xml_processor->get_unprocessed_xml(); + } + + if (!strlen($buffer)) { + continue; + } + + // Uppercase the output + echo strtoupper($buffer); } } -