Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transaction message support #51

Merged
merged 5 commits into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 69 additions & 38 deletions src/PythonWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "CCommon.h"
#include "CMessage.h"
#include "CMessageExt.h"
#include "CBatchMessage.h"
#include "CSendResult.h"
#include "CProducer.h"
#include "CPushConsumer.h"
Expand All @@ -33,6 +32,8 @@ const char *VERSION =
"PYTHON_CLIENT_VERSION: " PYTHON_CLIENT_VERSION ", BUILD DATE: " PYCLI_BUILD_DATE " ";

map<CPushConsumer *, pair<PyObject *, object>> g_CallBackMap;
map<CProducer *, PyObject *> g_TransactionCheckCallBackMap;


class PyThreadStateLock {
public:
Expand Down Expand Up @@ -96,18 +97,6 @@ int PySetMessageDelayTimeLevel(void *msg, int level) {
return SetDelayTimeLevel((CMessage *) msg, level);
}

//batch message
void *PyCreateBatchMessage() {
return (void *) CreateBatchMessage();
}

int PyAddMessage(void *batchMsg, void *msg) {
return AddMessage((CBatchMessage *) batchMsg, (CMessage *) msg);
}

int PyDestroyBatchMessage(void *batchMsg) {
return DestroyBatchMessage((CBatchMessage *) batchMsg);
}

//messageExt
const char *PyGetMessageTopic(PyMessageExt msgExt) {
Expand All @@ -134,6 +123,27 @@ void *PyCreateProducer(const char *groupId) {
PyEval_InitThreads(); // ensure create GIL, for call Python callback from C.
return (void *) CreateProducer(groupId);
}

void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback) {
PyEval_InitThreads();
CProducer *producer = CreateTransactionProducer(groupId, &PyLocalTransactionCheckerCallback, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find CreateTransactionProducer implementation in the cpp repository, only the function prototype was found.

g_TransactionCheckCallBackMap[producer] = localTransactionCheckerCallback;
return producer;
}

CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data) {
PyThreadStateLock pyThreadLock; // ensure hold GIL, before call python callback
PyMessageExt message = {.pMessageExt = msg};
map<CProducer *, PyObject *>::iterator iter;
iter = g_TransactionCheckCallBackMap.find(producer);
if (iter != g_TransactionCheckCallBackMap.end()) {
PyObject *pCallback = iter->second;
CTransactionStatus status = boost::python::call<CTransactionStatus>(pCallback, message);
return status;
}
return CTransactionStatus::E_UNKNOWN_TRANSACTION;
}

int PyDestroyProducer(void *producer) {
return DestroyProducer((CProducer *) producer);
}
Expand Down Expand Up @@ -190,23 +200,23 @@ int PySendMessageOneway(void *producer, void *msg) {
return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
}

void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){
void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback) {
PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
PySendResult sendResult;
sendResult.sendStatus = result.sendStatus;
sendResult.offset = result.offset;
strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
PyCallback *callback = (PyCallback *)pyCallback;
PyCallback *callback = (PyCallback *) pyCallback;
boost::python::call<void>(callback->successCallback, sendResult, (void *) msg);
delete pyCallback;
}


void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){
void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback) {
PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
PyMQException exception;
PyCallback *callback = (PyCallback *)pyCallback;
PyCallback *callback = (PyCallback *) pyCallback;
exception.error = e.error;
exception.line = e.line;
strncpy(exception.file, e.file, MAX_EXEPTION_FILE_LENGTH - 1);
Expand All @@ -219,47 +229,58 @@ void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){
delete pyCallback;
}

int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){
PyCallback* pyCallback = new PyCallback();
int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback) {
PyCallback *pyCallback = new PyCallback();
pyCallback->successCallback = sendSuccessCallback;
pyCallback->exceptionCallback = sendExceptionCallback;
return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback);
return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback,
(void *) pyCallback);
}

PySendResult PySendBatchMessage(void *producer, void *batchMessage) {

PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) {
PySendResult ret;
CSendResult result;
SendBatchMessage((CProducer *) producer, (CBatchMessage *) batchMessage, &result);
PyUserData userData = {queueSelector, args};
SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes,
&result);
ret.sendStatus = result.sendStatus;
ret.offset = result.offset;
strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
return ret;
}

int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) {
PyUserData *userData = (PyUserData *) args;
int index = boost::python::call<int>(userData->pyObject, size, (void *) msg, userData->pData);
return index;
}

PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) {
PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) {
PySendResult ret;
CSendResult result;
PyUserData userData = {queueSelector,args};
SendMessageOrderly((CProducer *) producer, (CMessage *) msg, &PyOrderlyCallbackInner, &userData, autoRetryTimes, &result);
SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result);
ret.sendStatus = result.sendStatus;
ret.offset = result.offset;
strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
return ret;
}

int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) {
PyUserData *userData = (PyUserData *)args;
int index = boost::python::call<int>(userData->pyObject, size, (void *) msg, userData->pData);
return index;
CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data) {
PyUserData *localCallback = (PyUserData *) data;
CTransactionStatus status = boost::python::call<CTransactionStatus>(localCallback->pyObject, (void *) msg,
localCallback->pData);
return status;
}

PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) {
PySendResult PySendMessageInTransaction(void *producer, void *msg, PyObject *localTransactionCallback, void *args) {
PyUserData userData = {localTransactionCallback, args};
PySendResult ret;
CSendResult result;
SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result);
SendMessageTransaction((CProducer *) producer, (CMessage *) msg, &PyLocalTransactionExecuteCallback, &userData,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't find SendMessageTransaction function, not even function prototype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relative c exporter hasn't released and it will be published ASAP, you can ignore this for now.

&result);
ret.sendStatus = result.sendStatus;
ret.offset = result.offset;
strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
Expand All @@ -286,6 +307,15 @@ int PyDestroyPushConsumer(void *consumer) {
}
return DestroyPushConsumer(consumerInner);
}
int PyDestroyTransactionProducer(void *producer) {
CProducer *producerInner = (CProducer *) producer;
map<CProducer *, PyObject *>::iterator iter;
iter = g_TransactionCheckCallBackMap.find(producerInner);
if (iter != g_TransactionCheckCallBackMap.end()) {
g_TransactionCheckCallBackMap.erase(iter);
}
return DestroyProducer(producerInner);
}
int PyStartPushConsumer(void *consumer) {
return StartPushConsumer((CPushConsumer *) consumer);
}
Expand All @@ -308,7 +338,7 @@ int PyRegisterMessageCallback(void *consumer, PyObject *pCallback, object args)
return RegisterMessageCallback(consumerInner, &PythonMessageCallBackInner);
}

int PyRegisterMessageCallbackOrderly(void *consumer, PyObject *pCallback, object args){
int PyRegisterMessageCallbackOrderly(void *consumer, PyObject *pCallback, object args) {
CPushConsumer *consumerInner = (CPushConsumer *) consumer;
g_CallBackMap[consumerInner] = make_pair(pCallback, std::move(args));
return RegisterMessageCallbackOrderly(consumerInner, &PythonMessageCallBackInner);
Expand Down Expand Up @@ -418,6 +448,10 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {
.value("E_LOG_LEVEL_TRACE", E_LOG_LEVEL_TRACE)
.value("E_LOG_LEVEL_LEVEL_NUM", E_LOG_LEVEL_LEVEL_NUM);

enum_<CTransactionStatus>("TransactionStatus")
.value("E_COMMIT_TRANSACTION", E_COMMIT_TRANSACTION)
.value("E_ROLLBACK_TRANSACTION", E_ROLLBACK_TRANSACTION)
.value("E_UNKNOWN_TRANSACTION", E_UNKNOWN_TRANSACTION);

//For Message
def("CreateMessage", PyCreateMessage, return_value_policy<return_opaque_pointer>());
Expand All @@ -430,11 +464,6 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {
def("SetMessageProperty", PySetMessageProperty);
def("SetDelayTimeLevel", PySetMessageDelayTimeLevel);

//For batch message
def("CreateBatchMessage", PyCreateBatchMessage, return_value_policy<return_opaque_pointer>());
def("AddMessage", PyAddMessage);
def("DestroyBatchMessage", PyDestroyBatchMessage);

//For MessageExt
def("GetMessageTopic", PyGetMessageTopic);
def("GetMessageTags", PyGetMessageTags);
Expand All @@ -445,7 +474,9 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {

//For producer
def("CreateProducer", PyCreateProducer, return_value_policy<return_opaque_pointer>());
def("CreateTransactionProducer", PyCreateTransactionProducer, return_value_policy<return_opaque_pointer>());
def("DestroyProducer", PyDestroyProducer);
def("DestroyTransactionProducer", PyDestroyTransactionProducer);
def("StartProducer", PyStartProducer);
def("ShutdownProducer", PyShutdownProducer);
def("SetProducerNameServerAddress", PySetProducerNameServerAddress);
Expand All @@ -462,11 +493,11 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {

def("SendMessageSync", PySendMessageSync);
def("SendMessageAsync", PySendMessageAsync);
def("SendBatchMessage", PySendBatchMessage);

def("SendMessageOneway", PySendMessageOneway);
def("SendMessageOrderly", PySendMessageOrderly);
def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey);
def("SendMessageInTransaction", PySendMessageInTransaction);

//For Consumer
def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy<return_opaque_pointer>());
Expand Down
13 changes: 6 additions & 7 deletions src/PythonWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "CCommon.h"
#include "CMessage.h"
#include "CMessageExt.h"
#include "CBatchMessage.h"
#include "CSendResult.h"
#include "CProducer.h"
#include "CPushConsumer.h"
Expand Down Expand Up @@ -91,11 +90,6 @@ int PySetByteMessageBody(void *msg, const char *body, int len);
int PySetMessageProperty(void *msg, const char *key, const char *value);
int PySetMessageDelayTimeLevel(void *msg, int level);

//batch message
void *PyCreateBatchMessage();
int PyAddMessage(void *batchMsg, void *msg);
int PyDestroyBatchMessage(void *batchMsg);

//messageExt
const char *PyGetMessageTopic(PyMessageExt msgExt);
const char *PyGetMessageTags(PyMessageExt msgExt);
Expand All @@ -106,7 +100,12 @@ const char *PyGetMessageId(PyMessageExt msgExt);

//producer
void *PyCreateProducer(const char *groupId);
CTransactionStatus PyLocalTransactionCheckerCallback(CProducer *producer, CMessageExt *msg, void *data);
CTransactionStatus PyLocalTransactionExecuteCallback(CProducer *producer, CMessage *msg, void *data);
void *PyCreateTransactionProducer(const char *groupId, PyObject *localTransactionCheckerCallback);

int PyDestroyProducer(void *producer);
int PyDestroyTransactionProducer(void *producer);
int PyStartProducer(void *producer);
int PyShutdownProducer(void *producer);
int PySetProducerNameServerAddress(void *producer, const char *namesrv);
Expand All @@ -127,9 +126,9 @@ void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback);
void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback);
int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback);

PySendResult PySendBatchMessage(void *producer, void *msg);
PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector);
PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey);
PySendResult PySendMessageInTransaction(void *producer , void *msg, PyObject *localTransactionExecuteCallback , void *args);

int PyOrderlyCallbackInner(int size, CMessage *msg, void *args);

Expand Down
32 changes: 21 additions & 11 deletions test/TestSendMessages.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,18 @@ def init_producer():
StartProducer(producer)
return producer

def transaction_local_checker(msg):
print 'begin check for msg: ' + GetMessageId(msg)
return TransactionStatus.E_COMMIT_TRANSACTION

producer = init_producer()
def init_transaction_producer():
producer = CreateTransactionProducer('TransactionTestProducer', transaction_local_checker)
SetProducerLogLevel(producer, CLogLevel.E_LOG_LEVEL_INFO)
SetProducerNameServerAddress(producer, name_srv)
StartProducer(producer)
return producer

producer = init_transaction_producer()
tag = 'rmq-tag'
key = 'rmq-key'

Expand Down Expand Up @@ -257,24 +267,24 @@ def send_message_async_fail(msg, exception):
print 'send message failed'
print 'error msg: ' + exception.GetMsg()

def send_batch_message(batch_count):
def send_transaction_message(count):
key = 'rmq-key'
print 'start send batch message'
print 'start send transaction message'
tag = 'test'
batchMsg = CreateBatchMessage()

for n in range(count):
body = 'hi rmq message, now is' + str(n)
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)
AddMessage(batchMsg, msg)
DestroyMessage(msg)

SendBatchMessage(producer, batchMsg)
DestroyBatchMessage(batchMsg)
print 'send batch message done'
SendMessageInTransaction(producer, msg, transaction_local_execute, None)
print 'send transaction message done'
time.sleep(10000)

def transaction_local_execute(msg, args):
print 'begin execute local transaction'
return TransactionStatus.E_UNKNOWN_TRANSACTION

if __name__ == '__main__':
send_message_async(10)
send_transaction_message(10)