Skip to content

Commit

Permalink
wrapper for rd_kafka_query_watermark_offsets. Ref #94
Browse files Browse the repository at this point in the history
  • Loading branch information
sshanks-kx authored Dec 20, 2022
1 parent c5fb5d0 commit 1017415
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
18 changes: 18 additions & 0 deletions kfk.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,24 @@ EXP K1(kfkUnsub){
return knk(0);
}

EXP K4(kfkqueryWatermark){
rd_kafka_t *rk;
rd_kafka_resp_err_t err;
int64_t low,high;
K w;
if(!checkType("isjj",x,y,z,r))
return KNL;
if(!(rk= clientIndex(x)))
return KNL;
err= rd_kafka_query_watermark_offsets(rk,y->s,z->j,&low,&high,r->j);
if(KFK_OK != err)
return krr((S) rd_kafka_err2str(err));
w=ktn(KJ,2);
kJ(w)[0]=low;
kJ(w)[1]=high;
return w;
}

// https://github.com/edenhill/librdkafka/wiki/Manually-setting-the-consumer-start-offset
EXP K3(kfkassignOffsets){
rd_kafka_t *rk;
Expand Down
4 changes: 3 additions & 1 deletion kfk.q
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ funcs:(
(`kfkPositionOffsets;3);
// .kfk.CommittedOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
(`kfkCommittedOffsets;3);
// .kfk.assignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
// .kfk.QueryWatermark[client_id:i:topic:s;partition:j;timeout:j]:_
(`kfkqueryWatermark;4);
// .kfk.AssignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkassignOffsets;3);
// .kfk.Threadcount[]:i
(`kfkThreadCount;1);
Expand Down

0 comments on commit 1017415

Please sign in to comment.