-
Notifications
You must be signed in to change notification settings - Fork 5
/
ElasticFrameProtocol.cpp
1409 lines (1247 loc) · 68.1 KB
/
ElasticFrameProtocol.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
//
// ______ _ _ _ ______
// | ____|| | | | (_) | ____|
// | |__ | | __ _ ___ | |_ _ ___ | |__ _ __ __ _ _ __ ___ ___
// | __| | | / _` |/ __|| __|| | / __|| __|| '__|/ _` || '_ ` _ \ / _ \
// | |____ | || (_| |\__ \| |_ | || (__ | | | | | (_| || | | | | || __/
// |______||_| \__,_||___/ \__||_| \___||_| |_| \__,_||_| |_| |_| \___|
// Protocol
// Copyright Edgeware AB 2020, Agile Content 2021-2022
//
#include "ElasticFrameProtocol.h"
#include "ElasticInternal.h"
#include "logger.h"
constexpr int64_t WORKER_THREAD_SLEEP_US = 1000 * 10;
//---------------------------------------------------------------------------------------------------------------------
//
//
// ElasticFrameProtocolReceiver
//
//
//---------------------------------------------------------------------------------------------------------------------
ElasticFrameProtocolReceiver::ElasticFrameProtocolReceiver(uint32_t lBucketTimeoutMasterms,
uint32_t lHolTimeoutMasterms,
std::shared_ptr<ElasticFrameProtocolContext> pCTX,
EFPReceiverMode lReceiverMode) {
//Throw if you can't reserve the data.
mBucketList = new Bucket[CIRCULAR_BUFFER_SIZE + 1];
mCTX = std::move(pCTX);
c_receiveCallback = nullptr;
c_receiveEmbeddedDataCallback = nullptr;
receiveCallback = std::bind(&ElasticFrameProtocolReceiver::gotData, this, std::placeholders::_1,
std::placeholders::_2);
mBucketTimeoutms = lBucketTimeoutMasterms;
mHeadOfLineBlockingTimeoutms = lHolTimeoutMasterms;
mCurrentMode = lReceiverMode;
if (mCurrentMode == EFPReceiverMode::THREADED) {
mThreadActive = true;
mIsWorkerThreadActive = true;
mIsDeliveryThreadActive = true;
std::thread(std::bind(&ElasticFrameProtocolReceiver::receiverWorker, this)).detach();
std::thread(std::bind(&ElasticFrameProtocolReceiver::deliveryWorker, this)).detach();
}
}
ElasticFrameProtocolReceiver::~ElasticFrameProtocolReceiver() {
// If our worker is active we need to stop it.
if (mThreadActive) {
if (stopReceiver() != ElasticFrameMessages::noError) {
EFP_LOGGER(true, LOGG_ERROR, "Failed stopping worker thread.")
}
}
//We allocated so this cant be a nullptr
delete[] mBucketList;
}
// C API callback. Dummy callback if C++
void ElasticFrameProtocolReceiver::gotData(ElasticFrameProtocolReceiver::pFramePtr &rPacket,
ElasticFrameProtocolContext *pCTX) const
{
if (c_receiveCallback) {
size_t payloadDataPosition = 0;
if (c_receiveEmbeddedDataCallback && (rPacket->mFlags & static_cast<uint8_t>(INLINE_PAYLOAD)) && !rPacket->mBroken) {
std::vector<std::vector<uint8_t>> embeddedData;
std::vector<uint8_t> embeddedContentFlag;
//This method is not optimal since it moves data.. and there is no need to move any data. FIXME.
ElasticFrameMessages info = extractEmbeddedData(rPacket, &embeddedData, &embeddedContentFlag,
&payloadDataPosition);
if (info != ElasticFrameMessages::noError) {
EFP_LOGGER(true, LOGG_ERROR, "extractEmbeddedData fail")
return;
}
for (size_t x = 0; x < embeddedData.size(); x++) {
c_receiveEmbeddedDataCallback(embeddedData[x].data(), embeddedData[x].size(), embeddedContentFlag[x],
rPacket->mPts, mCTX->mUnsafePointer);
}
//Adjust the pointers for the payload callback
if (rPacket->mFrameSize < payloadDataPosition) {
EFP_LOGGER(true, LOGG_ERROR, "extractEmbeddedData out of bounds")
return;
}
}
c_receiveCallback(rPacket->pFrameData + payloadDataPosition, //compensate for the embedded data
rPacket->mFrameSize - payloadDataPosition, //compensate for the embedded data
rPacket->mDataContent,
static_cast<uint8_t>(rPacket->mBroken),
rPacket->mPts,
rPacket->mDts,
rPacket->mCode,
rPacket->mStreamID,
rPacket->mSource,
rPacket->mFlags,
mCTX->mUnsafePointer);
} else {
EFP_LOGGER(true, LOGG_ERROR, "Implement the receiveCallback method for the protocol to work.")
}
}
// This method is generating a uint64_t counter from the uint16_t counter
// The maximum count-gap this calculator can handle is INT16_MAX
// It's not sure this is enough in all situations keep an eye on this
uint64_t ElasticFrameProtocolReceiver::superFrameRecalculator(uint16_t lSuperFrame) {
if (mSuperFrameFirstTime) {
mOldSuperFrameNumber = lSuperFrame;
mSuperFrameRecalc = lSuperFrame;
mSuperFrameFirstTime = false;
return mSuperFrameRecalc;
}
int16_t lChangeValue = static_cast<int16_t>(lSuperFrame) - static_cast<int16_t>(mOldSuperFrameNumber);
mOldSuperFrameNumber = lSuperFrame;
mSuperFrameRecalc = mSuperFrameRecalc + static_cast<int64_t>(lChangeValue);
return mSuperFrameRecalc;
}
// Unpack method for type1 packets. Type1 packets are the parts of superFrames larger than the MTU
ElasticFrameMessages
ElasticFrameProtocolReceiver::unpackType1(const uint8_t *pSubPacket, size_t lPacketSize, uint8_t lFromSource) {
std::lock_guard<std::mutex> lock(mNetMtx);
auto *lType1Frame = reinterpret_cast<const ElasticFrameType1*>(pSubPacket);
Bucket *pThisBucket = &mBucketList[lType1Frame->hSuperFrameNo & CIRCULAR_BUFFER_SIZE];
//EFP_LOGGER(false, LOGG_NOTIFY, "superFrameNo1-> " << unsigned(type1Frame.superFrameNo))
// Is this entry in the buffer active? If no, create a new else continue filling the bucket with fragments.
if (!pThisBucket->mActive) {
//EFP_LOGGER(false,LOGG_NOTIFY,"Setting: " << unsigned(type1Frame.superFrameNo));
uint64_t lDeliveryOrderCandidate = superFrameRecalculator(lType1Frame->hSuperFrameNo);
//Is this a old fragment where we already delivered the superframe?
if (lDeliveryOrderCandidate == pThisBucket->mDeliveryOrder) {
return ElasticFrameMessages::tooOldFragment;
}
pThisBucket->mDeliveryOrder = lDeliveryOrderCandidate;
mBucketMap[pThisBucket->mDeliveryOrder] = pThisBucket;
pThisBucket->mActive = true;
pThisBucket->mSource = lFromSource;
pThisBucket->mFlags = lType1Frame->hFrameType & static_cast<uint8_t>(0xf0);
pThisBucket->mStream = lType1Frame->hStream;
Stream *pThisStream = &mStreams[lType1Frame->hStream];
pThisBucket->mDataContent = pThisStream->mDataContent;
pThisBucket->mCode = pThisStream->mCode;
pThisBucket->mSavedSuperFrameNo = lType1Frame->hSuperFrameNo;
pThisBucket->mHaveReceivedFragment.reset();
pThisBucket->mPts = UINT64_MAX;
pThisBucket->mDts = UINT64_MAX;
pThisBucket->mHaveReceivedFragment[lType1Frame->hFragmentNo] = true;
pThisBucket->mTimeout = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() + (mBucketTimeoutms * 1000);
pThisBucket->mFragmentCounter = 0;
pThisBucket->mOfFragmentNo = lType1Frame->hOfFragmentNo;
pThisBucket->mFragmentSize = (lPacketSize - sizeof(ElasticFrameType1));
size_t lInsertDataPointer = pThisBucket->mFragmentSize * lType1Frame->hFragmentNo;
pThisBucket->mBucketData = std::make_unique<SuperFrame>(
pThisBucket->mFragmentSize * (static_cast<size_t>(lType1Frame->hOfFragmentNo) + 1));
pThisBucket->mBucketData->mFrameSize = pThisBucket->mFragmentSize * lType1Frame->hOfFragmentNo;
if (pThisBucket->mBucketData->pFrameData == nullptr) {
mBucketMap.erase(pThisBucket->mDeliveryOrder);
pThisBucket->mActive = false;
return ElasticFrameMessages::memoryAllocationError;
}
std::copy_n(pSubPacket + sizeof(ElasticFrameType1), lPacketSize - sizeof(ElasticFrameType1),
pThisBucket->mBucketData->pFrameData + lInsertDataPointer);
return ElasticFrameMessages::noError;
}
// There is a gap in receiving the packets. Increase the bucket size list.. if the
// bucket size list is == X*UINT16_MAX you will no longer detect any buffer errors
if (lType1Frame->hSuperFrameNo != pThisBucket->mSavedSuperFrameNo) {
return ElasticFrameMessages::bufferOutOfResources;
}
// I'm getting a packet with data larger than the expected size
// this can be generated by wraparound in the bucket bucketList
// The notification about more than 50% buffer full level should already
// be triggered by now.
// I invalidate this bucket. The user
if (pThisBucket->mOfFragmentNo < lType1Frame->hFragmentNo ||
lType1Frame->hOfFragmentNo != pThisBucket->mOfFragmentNo) {
EFP_LOGGER(true, LOGG_FATAL, "bufferOutOfBounds")
mBucketMap.erase(pThisBucket->mDeliveryOrder);
pThisBucket->mActive = false;
return ElasticFrameMessages::bufferOutOfBounds;
}
// Have I already received this packet before? (duplicate/1+n where n > 0, n can be fractional)
if (pThisBucket->mHaveReceivedFragment[lType1Frame->hFragmentNo] == 1) {
return ElasticFrameMessages::duplicatePacketReceived;
} else {
pThisBucket->mHaveReceivedFragment[lType1Frame->hFragmentNo] = true;
}
// Increment the fragment counter
pThisBucket->mFragmentCounter++;
// Move the data to the correct fragment position in the frame.
// A bucket contains the frame data -> This is the internal data format
// |bucket start|information about the frame|bucket end| in the bucket there is a pointer to the actual data named framePtr this is the structure there ->
// linear array of -> |fragment start|fragment data|fragment end|
// lInsertDataPointer will point to the fragment start above and fill with the incoming data
size_t lInsertDataPointer = pThisBucket->mFragmentSize * lType1Frame->hFragmentNo;
std::copy_n(pSubPacket + sizeof(ElasticFrameType1), lPacketSize - sizeof(ElasticFrameType1),
pThisBucket->mBucketData->pFrameData + lInsertDataPointer);
return ElasticFrameMessages::noError;
}
// Unpack method for type2 packets. Where we know there is also type 1 packets involved and possibly type3.
// Type2 packets are also parts of frames smaller than the MTU
// The data IS the last data of a sequence
ElasticFrameMessages
ElasticFrameProtocolReceiver::unpackType2(const uint8_t *pSubPacket, size_t lPacketSize, uint8_t lFromSource) {
std::lock_guard<std::mutex> lock(mNetMtx);
auto *lType2Frame = reinterpret_cast<const ElasticFrameType2*>(pSubPacket);
if (lPacketSize < ((sizeof(ElasticFrameType2) + lType2Frame->hSizeOfData))) {
return ElasticFrameMessages::type2FrameOutOfBounds;
}
Bucket *pThisBucket = &mBucketList[lType2Frame->hSuperFrameNo & CIRCULAR_BUFFER_SIZE];
if (!pThisBucket->mActive) {
uint64_t lDeliveryOrderCandidate = superFrameRecalculator(lType2Frame->hSuperFrameNo);
//Is this a old fragment where we already delivered the super frame?
if (lDeliveryOrderCandidate == pThisBucket->mDeliveryOrder) {
return ElasticFrameMessages::tooOldFragment;
}
pThisBucket->mDeliveryOrder = lDeliveryOrderCandidate;
mBucketMap[pThisBucket->mDeliveryOrder] = pThisBucket;
pThisBucket->mActive = true;
pThisBucket->mSource = lFromSource;
pThisBucket->mFlags = lType2Frame->hFrameType & static_cast<uint8_t>(0xf0);
pThisBucket->mStream = lType2Frame->hStreamID;
Stream *pThisStream = &mStreams[lType2Frame->hStreamID];
pThisStream->mDataContent = lType2Frame->hDataContent;
pThisStream->mCode = lType2Frame->hCode;
pThisBucket->mDataContent = pThisStream->mDataContent;
pThisBucket->mCode = pThisStream->mCode;
pThisBucket->mSavedSuperFrameNo = lType2Frame->hSuperFrameNo;
pThisBucket->mHaveReceivedFragment.reset();
pThisBucket->mPts = lType2Frame->hPts;
if (lType2Frame->hDtsPtsDiff == UINT32_MAX) {
pThisBucket->mDts = UINT64_MAX;
} else {
pThisBucket->mDts = lType2Frame->hPts - static_cast<uint64_t>(lType2Frame->hDtsPtsDiff);
}
pThisBucket->mHaveReceivedFragment[lType2Frame->hOfFragmentNo] = true;
pThisBucket->mTimeout = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() + (mBucketTimeoutms * 1000);
pThisBucket->mOfFragmentNo = lType2Frame->hOfFragmentNo;
pThisBucket->mFragmentCounter = 0;
pThisBucket->mFragmentSize = lType2Frame->hType1PacketSize;
size_t lReserveThis = ((pThisBucket->mFragmentSize * lType2Frame->hOfFragmentNo) +
lType2Frame->hSizeOfData);
pThisBucket->mBucketData = std::make_unique<SuperFrame>(lReserveThis);
if (pThisBucket->mBucketData->pFrameData == nullptr) {
mBucketMap.erase(pThisBucket->mDeliveryOrder);
pThisBucket->mActive = false;
return ElasticFrameMessages::memoryAllocationError;
}
size_t lInsertDataPointer = (size_t) lType2Frame->hType1PacketSize * (size_t) lType2Frame->hOfFragmentNo;
std::copy_n(pSubPacket + sizeof(ElasticFrameType2), lType2Frame->hSizeOfData,
pThisBucket->mBucketData->pFrameData + lInsertDataPointer);
return ElasticFrameMessages::noError;
}
if (lType2Frame->hSuperFrameNo != pThisBucket->mSavedSuperFrameNo) {
return ElasticFrameMessages::bufferOutOfResources;
}
if (pThisBucket->mOfFragmentNo < lType2Frame->hOfFragmentNo ||
lType2Frame->hOfFragmentNo != pThisBucket->mOfFragmentNo) {
EFP_LOGGER(true, LOGG_FATAL, "bufferOutOfBounds")
mBucketMap.erase(pThisBucket->mDeliveryOrder);
pThisBucket->mActive = false;
return ElasticFrameMessages::bufferOutOfBounds;
}
if (pThisBucket->mHaveReceivedFragment[lType2Frame->hOfFragmentNo] == 1) {
return ElasticFrameMessages::duplicatePacketReceived;
} else {
pThisBucket->mHaveReceivedFragment[lType2Frame->hOfFragmentNo] = true;
}
// Type 2 frames contains the pts and code. If for some reason the type2 packet is missing or the frame is delivered
// Before the type2 frame arrives PTS,DTS and CODE are set to it's respective 'illegal' value. meaning you can't use them.
pThisBucket->mPts = lType2Frame->hPts;
if (lType2Frame->hDtsPtsDiff == UINT32_MAX) {
pThisBucket->mDts = UINT64_MAX;
} else {
pThisBucket->mDts = lType2Frame->hPts - static_cast<uint64_t>(lType2Frame->hDtsPtsDiff);
}
pThisBucket->mCode = lType2Frame->hCode;
pThisBucket->mFlags = lType2Frame->hFrameType & static_cast<uint8_t>(0xf0);
pThisBucket->mFragmentCounter++;
//set the content type
pThisBucket->mStream = lType2Frame->hStreamID;
Stream *thisStream = &mStreams[lType2Frame->hStreamID];
thisStream->mDataContent = lType2Frame->hDataContent;
thisStream->mCode = lType2Frame->hCode;
pThisBucket->mDataContent = thisStream->mDataContent;
pThisBucket->mCode = thisStream->mCode;
// When the type2 frames are received only then is the actual size to be delivered known... Now set the real size for the bucketData
if (lType2Frame->hSizeOfData) {
pThisBucket->mBucketData->mFrameSize =
(pThisBucket->mFragmentSize * lType2Frame->hOfFragmentNo) + lType2Frame->hSizeOfData;
// Type 2 is always at the end and is always the highest number fragment
size_t lInsertDataPointer = (size_t) lType2Frame->hType1PacketSize * (size_t) lType2Frame->hOfFragmentNo;
std::copy_n(pSubPacket + sizeof(ElasticFrameType2), lType2Frame->hSizeOfData,
pThisBucket->mBucketData->pFrameData + lInsertDataPointer);
}
return ElasticFrameMessages::noError;
}
// Unpack method for type3 packets. Type3 packets are the parts of frames where the reminder data does not fit a type2 packet. Then a type 3 is added
// in front of a type2 packet to catch the data overshoot.
// Type 3 frames MUST be the same header size as type1 headers (FIXME part of the opportunistic data discussion)
ElasticFrameMessages
ElasticFrameProtocolReceiver::unpackType3(const uint8_t *pSubPacket, size_t lPacketSize, uint8_t lFromSource) {
std::lock_guard<std::mutex> lock(mNetMtx);
auto *lType3Frame = reinterpret_cast<const ElasticFrameType3*>(pSubPacket);
Bucket *pThisBucket = &mBucketList[lType3Frame->hSuperFrameNo & CIRCULAR_BUFFER_SIZE];
// If there is a type3 frame it's the second last frame
uint16_t lThisFragmentNo = lType3Frame->hOfFragmentNo - 1;
// Is this entry in the buffer active? If no, create a new else continue filling the bucket with data.
if (!pThisBucket->mActive) {
//EFP_LOGGER(false,LOGG_NOTIFY,"Setting: " << unsigned(type1Frame.superFrameNo));
uint64_t lDeliveryOrderCandidate = superFrameRecalculator(lType3Frame->hSuperFrameNo);
//Is this a old fragment where we already delivered the super frame?
if (lDeliveryOrderCandidate == pThisBucket->mDeliveryOrder) {
return ElasticFrameMessages::tooOldFragment;
}
pThisBucket->mDeliveryOrder = lDeliveryOrderCandidate;
mBucketMap[pThisBucket->mDeliveryOrder] = pThisBucket;
pThisBucket->mActive = true;
pThisBucket->mSource = lFromSource;
pThisBucket->mFlags = lType3Frame->hFrameType & static_cast<uint8_t>(0xf0);
pThisBucket->mStream = lType3Frame->hStreamID;
Stream *thisStream = &mStreams[lType3Frame->hStreamID];
pThisBucket->mDataContent = thisStream->mDataContent;
pThisBucket->mCode = thisStream->mCode;
pThisBucket->mSavedSuperFrameNo = lType3Frame->hSuperFrameNo;
pThisBucket->mHaveReceivedFragment.reset();
pThisBucket->mPts = UINT64_MAX;
pThisBucket->mDts = UINT64_MAX;
pThisBucket->mHaveReceivedFragment[lThisFragmentNo] = true;
pThisBucket->mTimeout = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() + (mBucketTimeoutms * 1000);
pThisBucket->mFragmentCounter = 0;
pThisBucket->mOfFragmentNo = lType3Frame->hOfFragmentNo;
pThisBucket->mFragmentSize = lType3Frame->hType1PacketSize;
size_t lInsertDataPointer = pThisBucket->mFragmentSize * lThisFragmentNo;
size_t lReserveThis = ((pThisBucket->mFragmentSize * (lType3Frame->hOfFragmentNo - 1)) +
(lPacketSize - sizeof(ElasticFrameType3)));
pThisBucket->mBucketData = std::make_unique<SuperFrame>(lReserveThis);
if (pThisBucket->mBucketData->pFrameData == nullptr) {
mBucketMap.erase(pThisBucket->mDeliveryOrder);
pThisBucket->mActive = false;
return ElasticFrameMessages::memoryAllocationError;
}
std::copy_n(pSubPacket + sizeof(ElasticFrameType3), lPacketSize - sizeof(ElasticFrameType3),
pThisBucket->mBucketData->pFrameData + lInsertDataPointer);
return ElasticFrameMessages::noError;
}
// There is a gap in receiving the packets. Increase the bucket size list.. if the
// bucket size list is == X*UINT16_MAX you will no longer detect any buffer errors
if (lType3Frame->hSuperFrameNo != pThisBucket->mSavedSuperFrameNo) {
return ElasticFrameMessages::bufferOutOfResources;
}
// I'm getting a packet with data larger than the expected size
// this can be generated by wraparound in the bucket bucketList
// The notification about more than 50% buffer full level should already
// be triggered by now.
// I invalidate this bucket to save me but the user should be notified somehow about this state. FIXME
if (pThisBucket->mOfFragmentNo < lThisFragmentNo || lType3Frame->hOfFragmentNo != pThisBucket->mOfFragmentNo) {
EFP_LOGGER(true, LOGG_FATAL, "bufferOutOfBounds")
mBucketMap.erase(pThisBucket->mDeliveryOrder);
pThisBucket->mActive = false;
return ElasticFrameMessages::bufferOutOfBounds;
}
// Have I already received this packet before? (duplicate?)
if (pThisBucket->mHaveReceivedFragment[lThisFragmentNo] == 1) {
return ElasticFrameMessages::duplicatePacketReceived;
} else {
pThisBucket->mHaveReceivedFragment[lThisFragmentNo] = true;
}
// Increment the fragment counter
pThisBucket->mFragmentCounter++;
pThisBucket->mBucketData->mFrameSize =
(pThisBucket->mFragmentSize * (lType3Frame->hOfFragmentNo - 1)) +
(lPacketSize - sizeof(ElasticFrameType3));
// Move the data to the correct fragment position in the frame.
// A bucket contains the frame data -> This is the internal data format
// |bucket start|information about the frame|bucket end| in the bucket there is a pointer to the actual data named framePtr this is the structure there ->
// linear array of -> |fragment start|fragment data|fragment end|
// lInsertDataPointer will point to the fragment start above and fill with the incoming data
size_t lInsertDataPointer = pThisBucket->mFragmentSize * lThisFragmentNo;
std::copy_n(pSubPacket + sizeof(ElasticFrameType3), lPacketSize - sizeof(ElasticFrameType3),
pThisBucket->mBucketData->pFrameData + lInsertDataPointer);
return ElasticFrameMessages::noError;
}
//mNetMtx is already taken no need to lock anything
void ElasticFrameProtocolReceiver::runToCompletionMethod(
const std::function<void(pFramePtr &rPacket, ElasticFrameProtocolContext *pCTX)> &rReceiveFunction) {
int64_t lTimeNow = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
std::vector<uint64_t> lDeleteList;
for (const auto &rBucket: mBucketMap) {
if (mHeadOfLineBlockingTimeoutms) {
//HOL mode
if (mDeliveryHOLFirstRun) {
//It's the first run. We are in HOL mode (Run to completion)
//We can't wait for two frames since we don't know when
//we will be here again and we can't time out single frames since
//we are event driven externally. Set the HEAD speculatively and go with that.
mDeliveryHOLFirstRun = false;
mNextExpectedFrameNumber = mBucketMap.begin()->second->mDeliveryOrder;
}
if (rBucket.second->mDeliveryOrder == mNextExpectedFrameNumber &&
rBucket.second->mFragmentCounter == rBucket.second->mOfFragmentNo) {
//We got what we expected. Now deliver.
//Assemble all data for delivery
rBucket.second->mBucketData->mDataContent = rBucket.second->mDataContent;
rBucket.second->mBucketData->mBroken =
rBucket.second->mFragmentCounter != rBucket.second->mOfFragmentNo;
rBucket.second->mBucketData->mPts = rBucket.second->mPts;
rBucket.second->mBucketData->mDts = rBucket.second->mDts;
rBucket.second->mBucketData->mCode = rBucket.second->mCode;
rBucket.second->mBucketData->mStreamID = rBucket.second->mStream;
rBucket.second->mBucketData->mSource = rBucket.second->mSource;
rBucket.second->mBucketData->mFlags = rBucket.second->mFlags;
rBucket.second->mBucketData->mSuperFrameNo = rBucket.second->mSavedSuperFrameNo;
if (rReceiveFunction) {
rReceiveFunction(rBucket.second->mBucketData, mCTX ? mCTX.get() : nullptr);
} else {
receiveCallback(rBucket.second->mBucketData, mCTX ? mCTX.get() : nullptr);
}
lDeleteList.emplace_back(rBucket.first);
mNextExpectedFrameNumber++; //The next expected frame is this frame number + 1
} else if (rBucket.second->mTimeout + (mHeadOfLineBlockingTimeoutms * 1000) <= lTimeNow) {
//We got HOL but the next frame has timed out meaning the time out of the bucket + the HOL timeout
//We need now need to jump ahead and reset the mNextExpectedFrameNumber
//Assemble all data for delivery and reset the HOL pointer.
rBucket.second->mBucketData->mDataContent = rBucket.second->mDataContent;
rBucket.second->mBucketData->mBroken =
rBucket.second->mFragmentCounter != rBucket.second->mOfFragmentNo;
rBucket.second->mBucketData->mPts = rBucket.second->mPts;
rBucket.second->mBucketData->mDts = rBucket.second->mDts;
rBucket.second->mBucketData->mCode = rBucket.second->mCode;
rBucket.second->mBucketData->mStreamID = rBucket.second->mStream;
rBucket.second->mBucketData->mSource = rBucket.second->mSource;
rBucket.second->mBucketData->mFlags = rBucket.second->mFlags;
rBucket.second->mBucketData->mSuperFrameNo = rBucket.second->mSavedSuperFrameNo;
if (rReceiveFunction) {
rReceiveFunction(rBucket.second->mBucketData, mCTX ? mCTX.get() : nullptr);
} else {
receiveCallback(rBucket.second->mBucketData, mCTX ? mCTX.get() : nullptr);
}
lDeleteList.emplace_back(rBucket.first);
mNextExpectedFrameNumber = rBucket.second->mDeliveryOrder + 1;
} else {
//We're blocked (HOL). No frames are complete or timed out
//Look again at the delivery of the next fragment to see the status then.
break;
}
} else {
// We are not in HOL mode..
// This means just deliver as the frames arrive or times out
if (rBucket.second->mTimeout <= lTimeNow ||
rBucket.second->mFragmentCounter == rBucket.second->mOfFragmentNo) {
//Assemble all data for delivery
rBucket.second->mBucketData->mDataContent = rBucket.second->mDataContent;
rBucket.second->mBucketData->mBroken =
rBucket.second->mFragmentCounter != rBucket.second->mOfFragmentNo;
rBucket.second->mBucketData->mPts = rBucket.second->mPts;
rBucket.second->mBucketData->mDts = rBucket.second->mDts;
rBucket.second->mBucketData->mCode = rBucket.second->mCode;
rBucket.second->mBucketData->mStreamID = rBucket.second->mStream;
rBucket.second->mBucketData->mSource = rBucket.second->mSource;
rBucket.second->mBucketData->mFlags = rBucket.second->mFlags;
rBucket.second->mBucketData->mSuperFrameNo = rBucket.second->mSavedSuperFrameNo;
if (rReceiveFunction) {
rReceiveFunction(rBucket.second->mBucketData, mCTX ? mCTX.get() : nullptr);
} else {
receiveCallback(rBucket.second->mBucketData, mCTX ? mCTX.get() : nullptr);
}
lDeleteList.emplace_back(rBucket.first);
}
}
}
for (const auto bucketID: lDeleteList) {
auto lDeleteMe = mBucketMap[bucketID];
mBucketMap.erase(bucketID);
lDeleteMe->mActive = false;
lDeleteMe->mBucketData = nullptr;
}
}
//This thread is delivering the super frames to the host
void ElasticFrameProtocolReceiver::deliveryWorker() {
while (mThreadActive) {
pFramePtr lSuperframe = nullptr;
{
std::unique_lock<std::mutex> lk(mSuperFrameMtx);
if (!mSuperFrameReady)
mSuperFrameDeliveryConditionVariable.wait(lk,
[this] { return mSuperFrameReady; }); //if mSuperFrameReady == true we already got data no need to wait for signal
// We got a signal a frame is ready
// pop until queue is empty
if (!mSuperFrameQueue.empty()) {
lSuperframe = std::move(mSuperFrameQueue.front());
mSuperFrameQueue.pop_front();
}
// If there is more to pop don't close the semaphore else do.
if (mSuperFrameQueue.empty()) {
mSuperFrameReady = false;
}
}
//I want to be outside the scope of the lock when calling the callback. Else the
//callback may lock the internal workers.
if (lSuperframe) {
receiveCallback(lSuperframe, mCTX ? mCTX.get() : nullptr);
lSuperframe = nullptr; //Drop the ownership.
}
}
mIsDeliveryThreadActive = false;
}
// This is the thread going through the buckets to see if they should be delivered to
// the 'user'
void ElasticFrameProtocolReceiver::receiverWorker() {
int64_t lTimeReference = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
// uint32_t lTimedebuggerPointer = 0;
// int64_t lTimeDebugger[100];
while (mThreadActive) {
lTimeReference += WORKER_THREAD_SLEEP_US;
int64_t lTimeSample = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
int64_t lTimeCompensation = lTimeReference - lTimeSample;
// lTimeDebugger[lTimedebuggerPointer++]=lTimeCompensation;
// if (! (lTimedebuggerPointer % 100)) {
// std::cout << "Time Debug -> ";
// lTimedebuggerPointer=0;
// int64_t averageTime=0;
// for (int g = 0; g < 100; g++) {
// averageTime += WORKER_THREAD_SLEEP_US-lTimeDebugger[g];
// std::cout << int64_t(WORKER_THREAD_SLEEP_US-lTimeDebugger[g]) << " ";
// }
// std::cout << std::endl;
// averageTime = averageTime / 100;
// std::cout << "Average -> " << signed(averageTime) << std::endl;
// }
if (lTimeCompensation < 0) {
EFP_LOGGER(true, LOGG_WARN, "Worker thread overloaded by " << static_cast<signed>(lTimeCompensation) << " us")
lTimeReference = lTimeSample;
lTimeCompensation = 0;
} else {
std::this_thread::sleep_for(std::chrono::microseconds(lTimeCompensation));
}
mNetMtx.lock();
auto lActiveCount = static_cast<uint32_t>(mBucketMap.size());
if (!lActiveCount) {
mNetMtx.unlock();
continue; //Nothing to process
}
int64_t lTimeNow = lTimeSample + lTimeCompensation;
std::vector<uint64_t> lDeleteList;
for (const auto &rBucket: mBucketMap) {
if (mHeadOfLineBlockingTimeoutms) {
//HOL mode
if (mDeliveryHOLFirstRun) {
//It's the first run. We are in HOL mode
//We need at least two super frames to set the HEAD correct.
//However if a fragment has timed out we need to act on this and start the delivery
bool lHasTimedOut = false;
for (const auto &rBucketInner: mBucketMap) {
//Has the bucket Timed out or are all fragments collected?
if (rBucketInner.second->mTimeout <= lTimeNow) {
//Set the flag signaling at least one frame has timed out
//std::cout << "Time out " << std::endl;
lHasTimedOut = true;
}
}
if (mBucketMap.size() > 1 || lHasTimedOut) {
mDeliveryHOLFirstRun = false;
mNextExpectedFrameNumber = mBucketMap.begin()->second->mDeliveryOrder;
} else {
break; //Nothing to process
}
}
if (rBucket.second->mDeliveryOrder == mNextExpectedFrameNumber &&
rBucket.second->mFragmentCounter == rBucket.second->mOfFragmentNo) {
//We got what we expected. Now deliver.
//Assemble all data for delivery
{
std::lock_guard<std::mutex> lk(mSuperFrameMtx);
rBucket.second->mBucketData->mDataContent = rBucket.second->mDataContent;
rBucket.second->mBucketData->mBroken =
rBucket.second->mFragmentCounter != rBucket.second->mOfFragmentNo;
rBucket.second->mBucketData->mPts = rBucket.second->mPts;
rBucket.second->mBucketData->mDts = rBucket.second->mDts;
rBucket.second->mBucketData->mCode = rBucket.second->mCode;
rBucket.second->mBucketData->mStreamID = rBucket.second->mStream;
rBucket.second->mBucketData->mSource = rBucket.second->mSource;
rBucket.second->mBucketData->mFlags = rBucket.second->mFlags;
rBucket.second->mBucketData->mSuperFrameNo = rBucket.second->mSavedSuperFrameNo;
mSuperFrameQueue.push_back(std::move(rBucket.second->mBucketData));
mSuperFrameReady = true;
}
mSuperFrameDeliveryConditionVariable.notify_one();
lDeleteList.emplace_back(rBucket.first);
mNextExpectedFrameNumber++; //The next expected frame is this frame number + 1
} else if (rBucket.second->mTimeout + (mHeadOfLineBlockingTimeoutms * 1000) <= lTimeNow) {
//We got HOL but the next frame has timed out meaning the time out of the bucket + the HOL timeout
//We need now need to jump ahead and reset the mNextExpectedFrameNumber
//Assemble all data for delivery and reset the HOL pointer.
{
std::lock_guard<std::mutex> lk(mSuperFrameMtx);
rBucket.second->mBucketData->mDataContent = rBucket.second->mDataContent;
rBucket.second->mBucketData->mBroken =
rBucket.second->mFragmentCounter != rBucket.second->mOfFragmentNo;
rBucket.second->mBucketData->mPts = rBucket.second->mPts;
rBucket.second->mBucketData->mDts = rBucket.second->mDts;
rBucket.second->mBucketData->mCode = rBucket.second->mCode;
rBucket.second->mBucketData->mStreamID = rBucket.second->mStream;
rBucket.second->mBucketData->mSource = rBucket.second->mSource;
rBucket.second->mBucketData->mFlags = rBucket.second->mFlags;
rBucket.second->mBucketData->mSuperFrameNo = rBucket.second->mSavedSuperFrameNo;
mSuperFrameQueue.push_back(std::move(rBucket.second->mBucketData));
mSuperFrameReady = true;
}
mSuperFrameDeliveryConditionVariable.notify_one();
lDeleteList.emplace_back(rBucket.first);
mNextExpectedFrameNumber = rBucket.second->mDeliveryOrder + 1;
} else {
//We're blocked (HOL). No frames are complete or timed out
//Look again at the delivery of the next fragment to see the status then.
break;
}
} else {
// We are not in HOL mode..
// This means just deliver as the frames arrive or times out
if (rBucket.second->mTimeout <= lTimeNow ||
rBucket.second->mFragmentCounter == rBucket.second->mOfFragmentNo) {
//Assemble all data for delivery
{
std::lock_guard<std::mutex> lk(mSuperFrameMtx);
rBucket.second->mBucketData->mDataContent = rBucket.second->mDataContent;
rBucket.second->mBucketData->mBroken =
rBucket.second->mFragmentCounter != rBucket.second->mOfFragmentNo;
rBucket.second->mBucketData->mPts = rBucket.second->mPts;
rBucket.second->mBucketData->mDts = rBucket.second->mDts;
rBucket.second->mBucketData->mCode = rBucket.second->mCode;
rBucket.second->mBucketData->mStreamID = rBucket.second->mStream;
rBucket.second->mBucketData->mSource = rBucket.second->mSource;
rBucket.second->mBucketData->mFlags = rBucket.second->mFlags;
rBucket.second->mBucketData->mSuperFrameNo = rBucket.second->mSavedSuperFrameNo;
mSuperFrameQueue.push_back(std::move(rBucket.second->mBucketData));
mSuperFrameReady = true;
}
mSuperFrameDeliveryConditionVariable.notify_one();
lDeleteList.emplace_back(rBucket.first);
}
}
}
for (const auto bucketID: lDeleteList) {
auto lDeleteMe = mBucketMap[bucketID];
mBucketMap.erase(bucketID);
lDeleteMe->mActive = false;
lDeleteMe->mBucketData = nullptr;
}
// ------------------------------------------
mNetMtx.unlock();
// Is more than 75% of the buffer used. //FIXME notify the user in some way
if (lActiveCount > (CIRCULAR_BUFFER_SIZE / 4) * 3) {
EFP_LOGGER(true, LOGG_WARN, "Current active buckets are more than 75% of the circular buffer.")
}
}
mIsWorkerThreadActive = false;
}
// Stop receiver worker thread
ElasticFrameMessages ElasticFrameProtocolReceiver::stopReceiver() {
std::lock_guard<std::mutex> lock(mReceiveMtx);
//Set the semaphore to stop thread
mThreadActive = false;
uint32_t lLockProtect = 1000;
{
std::lock_guard lk(mSuperFrameMtx);
mSuperFrameReady = true;
}
mSuperFrameDeliveryConditionVariable.notify_one();
//check for it to actually stop
while (mIsWorkerThreadActive || mIsDeliveryThreadActive) {
std::this_thread::sleep_for(std::chrono::microseconds(1000));
if (!--lLockProtect) {
//we gave it a second now exit anyway
EFP_LOGGER(true, LOGG_FATAL, "Threads not stopping. Now crash and burn baby!!")
return ElasticFrameMessages::failedStoppingReceiver;
}
}
return ElasticFrameMessages::noError;
}
ElasticFrameMessages
ElasticFrameProtocolReceiver::receiveFragment(const std::vector<uint8_t> &rSubPacket, uint8_t lFromSource,
const std::function<void(pFramePtr &rPacket,
ElasticFrameProtocolContext *pCTX)> &rReceiveFunction) {
return receiveFragmentFromPtr(rSubPacket.data(), rSubPacket.size(), lFromSource, rReceiveFunction);
}
// Unpack method. We received a fragment of data or a full frame. Lets unpack it
ElasticFrameMessages
ElasticFrameProtocolReceiver::receiveFragmentFromPtr(const uint8_t *pSubPacket, size_t lPacketSize, uint8_t lFromSource,
const std::function<void(pFramePtr &rPacket,
ElasticFrameProtocolContext *pCTX)> &rReceiveFunction) {
// Type 0 packet. Discard and continue
// Type 0 packets can be used to fill with user data outside efp protocol packets just put a uint8_t = Frametype::type0 at position 0 and then any data.
// Type 1 are frames larger than MTU
// Type 2 are frames smaller than MTU
// Type 2 packets are also used at the end of Type 1 packet superFrames
// Type 3 frames carry the reminder of data when it's too large for type2 to carry.
ElasticFrameMessages lMessage;
std::lock_guard<std::mutex> lock(mReceiveMtx);
if (!(mIsWorkerThreadActive & mIsDeliveryThreadActive) && mCurrentMode == EFPReceiverMode::THREADED) {
EFP_LOGGER(true, LOGG_ERROR, "Receiver not running")
return ElasticFrameMessages::receiverNotRunning;
}
if ((pSubPacket[0] & static_cast<uint8_t>(0x0f)) == Frametype::type0) {
return ElasticFrameMessages::type0Frame;
} else if ((pSubPacket[0] & static_cast<uint8_t>(0x0f)) == Frametype::type1) {
if (lPacketSize < sizeof(ElasticFrameType1)) {
return ElasticFrameMessages::frameSizeMismatch;
}
lMessage = unpackType1(pSubPacket, lPacketSize, lFromSource);
if (mCurrentMode == EFPReceiverMode::RUN_TO_COMPLETION) {
runToCompletionMethod(rReceiveFunction);
}
return lMessage;
} else if ((pSubPacket[0] & static_cast<uint8_t>(0x0f)) == Frametype::type2) {
if (lPacketSize < sizeof(ElasticFrameType2)) {
return ElasticFrameMessages::frameSizeMismatch;
}
lMessage = unpackType2(pSubPacket, lPacketSize, lFromSource);
if (mCurrentMode == EFPReceiverMode::RUN_TO_COMPLETION) {
runToCompletionMethod(rReceiveFunction);
}
return lMessage;
} else if ((pSubPacket[0] & static_cast<uint8_t>(0x0f)) == Frametype::type3) {
if (lPacketSize < sizeof(ElasticFrameType3)) {
return ElasticFrameMessages::frameSizeMismatch;
}
lMessage = unpackType3(pSubPacket, lPacketSize, lFromSource);
if (mCurrentMode == EFPReceiverMode::RUN_TO_COMPLETION) {
runToCompletionMethod(rReceiveFunction);
}
return lMessage;
}
// Did not catch anything I understand
return ElasticFrameMessages::unknownFrameType;
}
ElasticFrameMessages ElasticFrameProtocolReceiver::extractEmbeddedData(ElasticFrameProtocolReceiver::pFramePtr &rPacket,
std::vector<std::vector<uint8_t>> *pEmbeddedDataList,
std::vector<uint8_t> *pDataContent,
size_t *pPayloadDataPosition) {
bool lMoreData;
size_t lHeaderSize = sizeof(ElasticFrameContentNamespace::ElasticEmbeddedHeader);
do {
ElasticFrameContentNamespace::ElasticEmbeddedHeader lEmbeddedHeader =
*reinterpret_cast<ElasticFrameContentNamespace::ElasticEmbeddedHeader*>(rPacket->pFrameData + *pPayloadDataPosition);
if (lEmbeddedHeader.mEmbeddedFrameType == ElasticEmbeddedFrameContent::illegal) {
return ElasticFrameMessages::illegalEmbeddedData;
}
pDataContent->emplace_back((lEmbeddedHeader.mEmbeddedFrameType & static_cast<uint8_t>(0x7f)));
std::vector<uint8_t> lEmbeddedData(lEmbeddedHeader.mSize);
std::copy_n(rPacket->pFrameData + lHeaderSize + *pPayloadDataPosition, lEmbeddedHeader.mSize,
lEmbeddedData.data());
pEmbeddedDataList->emplace_back(lEmbeddedData);
lMoreData = lEmbeddedHeader.mEmbeddedFrameType & static_cast<uint8_t>(0x80);
*pPayloadDataPosition += (lEmbeddedHeader.mSize + lHeaderSize);
if (*pPayloadDataPosition >= rPacket->mFrameSize) {
return ElasticFrameMessages::bufferOutOfBounds;
}
} while (!lMoreData);
return ElasticFrameMessages::noError;
}
//---------------------------------------------------------------------------------------------------------------------
//
//
// ElasticFrameProtocolSender
//
//
//---------------------------------------------------------------------------------------------------------------------
// Constructor setting the MTU
// Limit the MTU to uint16_t MAX and UINT8_MAX min.
// The lower limit is actually type2frameSize+1, keep it at 255 for now
ElasticFrameProtocolSender::ElasticFrameProtocolSender(uint16_t lSetMTU,
std::shared_ptr<ElasticFrameProtocolContext> pCTX) {
mCTX = std::move(pCTX);
c_sendCallback = nullptr;
mSendBufferEnd.reserve(lSetMTU);
mSendBufferFixed.resize(lSetMTU);
if (lSetMTU < UINT8_MAX) {
EFP_LOGGER(true, LOGG_ERROR, "MTU lower than " << unsigned(UINT8_MAX) << " is not accepted.")
mCurrentMTU = UINT8_MAX;
} else {
mCurrentMTU = lSetMTU;
}
sendCallback = std::bind(&ElasticFrameProtocolSender::sendData, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
}
ElasticFrameProtocolSender::~ElasticFrameProtocolSender() = default;
// Dummy callback for transmitter
void ElasticFrameProtocolSender::sendData(const std::vector<uint8_t> &rSubPacket, uint8_t lStreamID,
ElasticFrameProtocolContext *pCTX) const
{
if (c_sendCallback) {
c_sendCallback(rSubPacket.data(), rSubPacket.size(), lStreamID, mCTX->mUnsafePointer);
} else {
EFP_LOGGER(true, LOGG_ERROR, "Implement the sendCallback method for the protocol to work.")
}
}
// Pack data method. Fragments the data and calls the sendCallback method at the host level.
ElasticFrameMessages
ElasticFrameProtocolSender::packAndSend(const std::vector<uint8_t> &rPacket, ElasticFrameContent lDataContent,
uint64_t lPts,
uint64_t lDts,
uint32_t lCode, uint8_t lStreamID, uint8_t lFlags,
const std::function<void(const std::vector<uint8_t> &rSubPacket,
uint8_t streamID)> &rSendFunction) {
return packAndSendFromPtr(rPacket.data(), rPacket.size(), lDataContent, lPts, lDts, lCode, lStreamID, lFlags,
rSendFunction);
}
// Pack data method. Fragments the data and calls the sendCallback method at the host level.
ElasticFrameMessages
ElasticFrameProtocolSender::packAndSendFromPtr(const uint8_t *pPacket, size_t lPacketSize,
ElasticFrameContent lDataContent,
uint64_t lPts, uint64_t lDts,
uint32_t lCode, uint8_t lStreamID, uint8_t lFlags,
const std::function<void(const std::vector<uint8_t> &rSubPacket,
uint8_t streamID)> &rSendFunction) {
std::lock_guard<std::mutex> lock(mSendMtx);
if (sizeof(ElasticFrameType1) != sizeof(ElasticFrameType3)) {
return ElasticFrameMessages::type1And3SizeError;
}
if (lPts == UINT64_MAX) {
return ElasticFrameMessages::reservedPTSValue;
}
if (lDts == UINT64_MAX) {
return ElasticFrameMessages::reservedDTSValue;
}
if (lCode == UINT32_MAX) {
return ElasticFrameMessages::reservedCodeValue;
}
if (lStreamID == 0 && lDataContent != ElasticFrameContent::efpsig) {
return ElasticFrameMessages::reservedStreamValue;
}
uint64_t lPtsDtsDiff = lPts - lDts;
if (lPtsDtsDiff >= UINT32_MAX) {
return ElasticFrameMessages::dtsptsDiffToLarge;
}
lFlags &= static_cast<uint8_t>(0xf0);
// Will the data fit?
// We know that we can send uint16_t max (65535) packets
// The last packet will be a type2 packet.. so check against current MTU multiplied with uint16_t max subtracting the space the protocol needs for the headers
if (lPacketSize
> (((mCurrentMTU - sizeof(ElasticFrameType1)) * (std::numeric_limits<uint16_t>::max() - 1)) + (mCurrentMTU - sizeof(ElasticFrameType2)))) {
return ElasticFrameMessages::tooLargeFrame;
}
if ((lPacketSize + sizeof(ElasticFrameType2)) <= mCurrentMTU) {
mSendBufferEnd.resize(sizeof(ElasticFrameType2) + lPacketSize);
auto *pType2Frame = reinterpret_cast<ElasticFrameType2*>(mSendBufferEnd.data());
pType2Frame->hFrameType = Frametype::type2 | lFlags;
pType2Frame->hStreamID = lStreamID;
pType2Frame->hDataContent = lDataContent;
pType2Frame->hSizeOfData = static_cast<uint16_t>(lPacketSize);
pType2Frame->hSuperFrameNo = mSuperFrameNoGenerator;
pType2Frame->hOfFragmentNo = 0;
pType2Frame->hType1PacketSize = static_cast<uint16_t>(lPacketSize);
pType2Frame->hPts = lPts;
pType2Frame->hDtsPtsDiff = static_cast<uint32_t>(lPtsDtsDiff);
pType2Frame->hCode = lCode;
std::copy_n(pPacket, lPacketSize, mSendBufferEnd.data() + sizeof(ElasticFrameType2));
if (rSendFunction) {
rSendFunction(mSendBufferEnd, lStreamID);
} else {
sendCallback(mSendBufferEnd, lStreamID, mCTX ? mCTX.get() : nullptr);
}
mSuperFrameNoGenerator++;
return ElasticFrameMessages::noError;
}
uint16_t lFragmentNo = 0;
// The size is known for type1 packets no need to write it in any header.
size_t lDataPayloadType1 = static_cast<uint16_t>(mCurrentMTU - sizeof(ElasticFrameType1));
size_t lDataPayloadType2 = static_cast<uint16_t>(mCurrentMTU - sizeof(ElasticFrameType2));
uint64_t lDataPointer = 0;
auto lOfFragmentNo = static_cast<uint16_t>(floor(
static_cast<double>(lPacketSize) / static_cast<double>(mCurrentMTU - sizeof(ElasticFrameType1))));
uint16_t lOfFragmentNoType1 = lOfFragmentNo;
bool lType3needed = false;
size_t lReminderData = lPacketSize - (lOfFragmentNo * lDataPayloadType1);
if (lReminderData > lDataPayloadType2) {
// We need a type3 frame. The reminder is too large for a type2 frame
lType3needed = true;
lOfFragmentNo++;
}
auto *pType1Frame = reinterpret_cast<ElasticFrameType1*>(mSendBufferFixed.data());
pType1Frame->hFrameType = Frametype::type1 | lFlags;
pType1Frame->hStream = lStreamID;
pType1Frame->hSuperFrameNo = mSuperFrameNoGenerator;
pType1Frame->hOfFragmentNo = lOfFragmentNo;
while (lFragmentNo < lOfFragmentNoType1) {
pType1Frame->hFragmentNo = lFragmentNo++;
std::copy_n(pPacket + lDataPointer, lDataPayloadType1, mSendBufferFixed.data() + sizeof(ElasticFrameType1));