forked from go-telegram/bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_updates.go
87 lines (73 loc) · 1.75 KB
/
get_updates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package bot
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/go-telegram/bot/models"
)
const (
maxTimeoutAfterError = time.Second * 5
)
type getUpdatesParams struct {
Offset int64 `json:"offset,omitempty"`
Limit int `json:"limit,omitempty"`
Timeout int `json:"timeout,omitempty"`
AllowedUpdates []string `json:"allowed_updates,omitempty"`
}
// GetUpdates https://core.telegram.org/bots/api#getupdates
func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
var timeoutAfterError time.Duration
for {
select {
case <-ctx.Done():
return
default:
}
if timeoutAfterError > 0 {
if b.isDebug {
b.debugHandler("wait after error, %v", timeoutAfterError)
}
select {
case <-ctx.Done():
return
case <-time.After(timeoutAfterError):
}
}
params := &getUpdatesParams{
Timeout: int((b.pollTimeout - time.Second).Seconds()),
Offset: atomic.LoadInt64(&b.lastUpdateID) + 1,
}
var updates []*models.Update
errRequest := b.rawRequest(ctx, "getUpdates", params, &updates)
if errRequest != nil {
if errors.Is(errRequest, context.Canceled) {
return
}
b.error("error get updates, %v", errRequest)
timeoutAfterError = incErrTimeout(timeoutAfterError)
continue
}
timeoutAfterError = 0
for _, upd := range updates {
atomic.StoreInt64(&b.lastUpdateID, upd.ID)
select {
case b.updates <- upd:
default:
b.error("error send update to processing, channel is full")
}
}
}
}
func incErrTimeout(timeout time.Duration) time.Duration {
if timeout == 0 {
return time.Millisecond * 100 // first timeout
}
timeout *= 2
if timeout > maxTimeoutAfterError {
return maxTimeoutAfterError
}
return timeout
}