Skip to content

Commit

Permalink
kafka convert json to producer message.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Oct 23, 2015
1 parent 8e344f1 commit 76cd3f8
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 9 deletions.
109 changes: 101 additions & 8 deletions trunk/src/protocol/srs_kafka_stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ using namespace std;

#ifdef SRS_AUTO_KAFKA

#define SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS 300000

SrsKafkaString::SrsKafkaString()
{
_size = -1;
Expand All @@ -45,11 +47,10 @@ SrsKafkaString::SrsKafkaString()

SrsKafkaString::SrsKafkaString(string v)
{
_size = (int16_t)v.length();
_size = -1;
data = NULL;

srs_assert(_size > 0);
data = new char[_size];
memcpy(data, v.data(), _size);
set_value(v);
}

SrsKafkaString::~SrsKafkaString()
Expand All @@ -76,6 +77,19 @@ string SrsKafkaString::to_str()
return ret;
}

void SrsKafkaString::set_value(string v)
{
// free previous data.
srs_freep(data);

// copy new value to data.
_size = (int16_t)v.length();

srs_assert(_size > 0);
data = new char[_size];
memcpy(data, v.data(), _size);
}

int SrsKafkaString::nb_bytes()
{
return _size == -1? 2 : 2 + _size;
Expand Down Expand Up @@ -149,11 +163,10 @@ SrsKafkaBytes::SrsKafkaBytes()

SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v)
{
_size = (int16_t)nb_v;
_size = -1;
data = NULL;

srs_assert(_size > 0);
data = new char[_size];
memcpy(data, v, _size);
set_value(v, nb_v);
}

SrsKafkaBytes::~SrsKafkaBytes()
Expand All @@ -171,6 +184,24 @@ bool SrsKafkaBytes::empty()
return _size <= 0;
}

void SrsKafkaBytes::set_value(string v)
{
set_value(v.data(), (int)v.length());
}

void SrsKafkaBytes::set_value(const char* v, int nb_v)
{
// free previous data.
srs_freep(data);

// copy new value to data.
_size = (int16_t)nb_v;

srs_assert(_size > 0);
data = new char[_size];
memcpy(data, v, _size);
}

int SrsKafkaBytes::nb_bytes()
{
return 4 + (_size == -1? 0 : _size);
Expand Down Expand Up @@ -479,6 +510,32 @@ SrsKafkaRawMessage::~SrsKafkaRawMessage()
srs_freep(value);
}

int SrsKafkaRawMessage::create(SrsJsonObject* obj)
{
int ret = ERROR_SUCCESS;

// current must be 0.
magic_byte = 0;

// no compression codec.
attributes = 0;

// dumps the json to string.
value->set_value(obj->dumps());

// TODO: FIXME: implements it.
crc = 0;

message_size = raw_message_size();

return ret;
}

int SrsKafkaRawMessage::raw_message_size()
{
return 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
}

int SrsKafkaRawMessage::nb_bytes()
{
return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
Expand Down Expand Up @@ -1352,6 +1409,42 @@ int SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse**
int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector<SrsJsonObject*>& msgs)
{
int ret = ERROR_SUCCESS;

SrsKafkaProducerRequest* req = new SrsKafkaProducerRequest();

// 0 the server will not send any response.
req->required_acks = 0;
// timeout of producer message.
req->timeout = SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS;

// create the topic and partition to write message to.
SrsKafkaProducerTopicMessages* topics = new SrsKafkaProducerTopicMessages();
SrsKafkaProducerPartitionMessages* partitions = new SrsKafkaProducerPartitionMessages();

topics->partitions.append(partitions);
req->topics.append(topics);

topics->topic_name.set_value(topic);
partitions->partition = partition;

// convert json objects to kafka raw messages.
vector<SrsJsonObject*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsJsonObject* obj = *it;
SrsKafkaRawMessage* msg = new SrsKafkaRawMessage();

if ((ret = msg->create(obj)) != ERROR_SUCCESS) {
srs_freep(msg);
srs_freep(req);
srs_error("kafka write messages failed. ret=%d", ret);
return ret;
}

partitions->messages.append(msg);
}

partitions->message_set_size = partitions->messages.nb_bytes();

// TODO: FIXME: implements it.
return ret;
}
Expand Down
15 changes: 14 additions & 1 deletion trunk/src/protocol/srs_kafka_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class SrsKafkaString : public ISrsCodec
virtual bool null();
virtual bool empty();
virtual std::string to_str();
virtual void set_value(std::string v);
// interface ISrsCodec
public:
virtual int nb_bytes();
Expand All @@ -103,6 +104,8 @@ class SrsKafkaBytes : public ISrsCodec
public:
virtual bool null();
virtual bool empty();
virtual void set_value(std::string v);
virtual void set_value(const char* v, int nb_v);
// interface ISrsCodec
public:
virtual int nb_bytes();
Expand Down Expand Up @@ -531,6 +534,16 @@ struct SrsKafkaRawMessage : public ISrsCodec
public:
SrsKafkaRawMessage();
virtual ~SrsKafkaRawMessage();
public:
/**
* create message from json object.
*/
virtual int create(SrsJsonObject* obj);
private:
/**
* get the raw message, bytes after the message_size.
*/
virtual int raw_message_size();
// interface ISrsCodec
public:
virtual int nb_bytes();
Expand Down Expand Up @@ -768,7 +781,7 @@ struct SrsKafkaProducerTopicMessages : public ISrsCodec
*/
class SrsKafkaProducerRequest : public SrsKafkaRequest
{
private:
public:
/**
* This field indicates how many acknowledgements the servers should receive
* before responding to the request. If it is 0 the server will not send any
Expand Down

0 comments on commit 76cd3f8

Please sign in to comment.