Skip to content

Commit

Permalink
[update] StackFlow subscriber_event_call decode_stream,delet simdjson…
Browse files Browse the repository at this point in the history
…. fix decodebase64
  • Loading branch information
dianjixz committed Nov 26, 2024
1 parent 6881a5a commit 225ab5b
Show file tree
Hide file tree
Showing 19 changed files with 31 additions and 48 deletions.
39 changes: 12 additions & 27 deletions ext_components/StackFlow/stackflow/StackFlow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
#include "StackFlow.h"
#include "sample_log.h"
#include <simdjson.h>

using namespace StackFlows;

Expand Down Expand Up @@ -33,34 +32,21 @@ llm_channel_obj::~llm_channel_obj()
void llm_channel_obj::subscriber_event_call(const std::function<void(const std::string &, const std::string &)> &call,
pzmq *_pzmq, const std::string &raw)
{
try {
simdjson::padded_string json_string(raw);
simdjson::ondemand::document doc;
auto parser = _pzmq->getContextPtr<simdjson::ondemand::parser>();
auto error = parser->iterate(json_string).get(doc);
if (error) {
return;
}
std::string action;
error = doc["action"].get_string(action);
if (action == "inference") {
std::string zmq_com;
error = doc["zmq_com"].get_string(zmq_com);
const char *user_inference_flage_str = "\"action\"";
std::size_t pos = raw.find(user_inference_flage_str);
while (true) {
if (pos == std::string::npos) {
break;
} else if ((pos > 0) && (raw[pos - 1] != '\\')) {
std::string zmq_com = sample_json_str_get(raw, "zmq_com");
if (!zmq_com.empty()) set_push_url(zmq_com);
error = doc["request_id"].get_string(request_id_);
error = doc["work_id"].get_string(work_id_);
}
std::string object;
error = doc["object"].get_string(object);
auto result = doc["data"].raw_json();
if (result.error() == simdjson::SUCCESS) {
call(object, result.value().data());
} else {
std::cerr << "result: " << result.value() << "error mesg:" << result.error() << std::endl;
request_id_ = sample_json_str_get(raw, "request_id");
work_id_ = sample_json_str_get(raw, "work_id");
break;
}
} catch (simdjson::simdjson_error &e) {
std::cerr << "Error: " << simdjson::error_message(e.error()) << std::endl;
pos = raw.find(user_inference_flage_str, pos + sizeof(user_inference_flage_str));
}
call(sample_json_str_get(raw, "object"), sample_json_str_get(raw, "data"));
}

int llm_channel_obj::subscriber_work_id(const std::string &work_id,
Expand Down Expand Up @@ -88,7 +74,6 @@ int llm_channel_obj::subscriber_work_id(const std::string &work_id,
zmq_[id_num] = std::make_shared<pzmq>(
subscriber_url, ZMQ_SUB,
std::bind(&llm_channel_obj::subscriber_event_call, this, call, std::placeholders::_1, std::placeholders::_2));
zmq_[id_num]->newContextPtr<simdjson::ondemand::parser>();
return 0;
}

Expand Down
7 changes: 3 additions & 4 deletions ext_components/StackFlow/stackflow/StackFlowUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "StackFlowUtil.h"
#include <vector>
#include "pzmq.hpp"
#include <simdjson.h>

std::string StackFlows::sample_json_str_get(const std::string &json_str, const std::string &json_key)
{
Expand Down Expand Up @@ -185,9 +184,9 @@ bool StackFlows::decode_stream(const std::string &in, std::string &out,
{
int index = std::stoi(StackFlows::sample_json_str_get(in, "index"));
std::string finish = StackFlows::sample_json_str_get(in, "finish");
std::string delta = StackFlows::sample_json_str_get(in, "delta");
stream_buff[index] = delta;
if (finish.find("true") != std::string::npos) {
stream_buff[index] = StackFlows::sample_json_str_get(in, "delta");
// sample find flage: false:true
if (finish.find("f") == std::string::npos) {
for (size_t i = 0; i < stream_buff.size(); i++) {
out += stream_buff.at(i);
}
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ STATIC_FILES += [AFile('../static_lib/sherpa/ncnn/libsherpa-ncnn-core.so'),
AFile('../static_lib/sherpa/ncnn/libkaldi-native-fbank-core.so'),
AFile('../static_lib/libonnxruntime.so.1.14.0')
]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'static_file',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class llm_llm : public StackFlow {
std::string tmp_msg2;
if (object.find("base64") != std::string::npos) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
error_body["code"] = -23;
error_body["message"] = "Base64 decoding error.";
send("None", "None", error_body, unit_name_);
Expand Down
1 change: 0 additions & 1 deletion projects/llm_framework/main_asr/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ REQUIREMENTS += ['ncnn', 'sherpa-ncnn-core']

STATIC_FILES += [AFile('sherpa-ncnn-streaming-zipformer-20M-2023-02-17.json'), AFile('sherpa-ncnn-streaming-zipformer-zh-14M-2023-02-23.json')]

REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_asr',
'SRCS':SRCS,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_asr/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class llm_asr : public StackFlow {
std::string tmp_msg2;
if (object.find("base64") != std::string::npos) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
error_body["code"] = -23;
error_body["message"] = "Base64 decoding error.";
send("None", "None", error_body, unit_name_);
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_audio/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ REQUIREMENTS += ['ax_sys', 'ax_interpreter', 'ax_audio', 'ax_audio_3a', 'ax_fdk'
REQUIREMENTS += ['tinyalsa', 'opus', 'samplerate', 'fdk-aac']

STATIC_FILES += [AFile('audio.json')]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_audio',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_kws/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ LDFLAGS += ['-l:libcargs.a', '-l:libonnxruntime.a',

STATIC_FILES += [AFile('sherpa-onnx-kws-zipformer-gigaspeech-3.3M-2024-01-01.json'),
AFile('sherpa-onnx-kws-zipformer-wenetspeech-3.3M-2024-01-01.json')]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_kws',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_kws/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class llm_kws : public StackFlow {
std::string tmp_msg2;
if (object.find("base64") != std::string::npos) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
error_body["code"] = -23;
error_body["message"] = "Base64 decoding error.";
send("None", "None", error_body, unit_name_);
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_llm/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ STATIC_FILES += [AFile('qwen2.5-0.5B-prefill-20e.json'),
AFile('openbuddy-llama3.2-1b-ax630c_tokenizer.py'),
AFile('qwen2.5-coder-0.5B-ax630c_tokenizer.py')
]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_llm',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_llm/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class llm_llm : public StackFlow {
std::string tmp_msg2;
if (object.find("base64") != std::string::npos) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
error_body["code"] = -23;
error_body["message"] = "Base64 decoding error.";
send("None", "None", error_body, unit_name_);
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_melotts/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ INCLUDE += [ADir('../include')]
INCLUDE += [ADir('src/runner'), ADir('../include/onnxruntime/core/session')]

STATIC_FILES += [AFile('melotts_zh-cn.json')]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_melotts',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_melotts/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ class llm_tts : public StackFlow {
std::string tmp_msg2;
if (enbase64) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
return;
}
next_data = &tmp_msg2;
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_tts/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ INCLUDE += [ADir('src/runner/eigen-3.4.0'), ADir('src/runner/src/tn/header'), AD
STATIC_FILES += [AFile('single_speaker_english_fast.json'),
AFile('single_speaker_fast.json')
]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_tts',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_tts/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class llm_tts : public StackFlow {
std::string tmp_msg2;
if (enbase64) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
return;
}
next_data = &tmp_msg2;
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_vlm/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ STATIC_LIB += static_file * 4
STATIC_FILES += [AFile('internvl2-1b-ax630c.json'),
AFile('internvl2-1b-ax630c_tokenizer.py')
]
REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_vlm',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_vlm/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class llm_llm : public StackFlow {
std::string tmp_msg2;
if (object.find("base64") != std::string::npos) {
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
error_body["code"] = -23;
error_body["message"] = "Base64 decoding error.";
send("None", "None", error_body, unit_name_);
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_yolo/SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ STATIC_LIB += static_file * 2

STATIC_FILES += [AFile('yolo11s.json'), AFile('yolo11s-seg.json'), AFile('yolo11s-pose.json')]

REQUIREMENTS += ['simdjson_component']

env['COMPONENTS'].append({'target':'llm_yolo',
'SRCS':SRCS,
'INCLUDE':INCLUDE,
Expand Down
2 changes: 1 addition & 1 deletion projects/llm_framework/main_yolo/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class llm_yolo : public StackFlow {
// must encode base64
std::string tmp_msg2;
ret = decode_base64((*next_data), tmp_msg2);
if (!ret) {
if (ret == -1) {
error_body["code"] = -23;
error_body["message"] = "Base64 decoding error.";
send("None", "None", error_body, unit_name_);
Expand Down

0 comments on commit 225ab5b

Please sign in to comment.