From 434109c2c204a752afc23c790f13bf2f31d07128 Mon Sep 17 00:00:00 2001 From: MishkaRogachev Date: Fri, 28 Jul 2023 14:44:21 +0400 Subject: [PATCH] feat: implement both strategies for fetching community metrics --- protocol/messenger_community_metrics.go | 84 +++++-- protocol/messenger_community_metrics_test.go | 216 +++++++++--------- protocol/persistence_metrics.go | 77 +++---- .../requests/community_metrics_request.go | 28 +-- 4 files changed, 221 insertions(+), 184 deletions(-) diff --git a/protocol/messenger_community_metrics.go b/protocol/messenger_community_metrics.go index c7860399cfb..687d4c52666 100644 --- a/protocol/messenger_community_metrics.go +++ b/protocol/messenger_community_metrics.go @@ -2,52 +2,94 @@ package protocol import ( "errors" + "sort" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/requests" ) +type MetricsIntervalResponse struct { + StartTimestamp uint64 `json:"startTimestamp"` + EndTimestamp uint64 `json:"endTimestamp"` + Timestamps []uint64 `json:"timestamps"` + Count int `json:"count"` +} + type CommunityMetricsResponse struct { Type requests.CommunityMetricsRequestType `json:"type"` CommunityID types.HexBytes `json:"communityId"` - Entries map[uint64]uint `json:"entries"` + Intervals []MetricsIntervalResponse `json:"intervals"` } -func floorToRange(value uint64, start uint64, end uint64, step uint64) uint64 { - for timestamp := start + step; timestamp < end; timestamp += step { - if value <= timestamp { - return timestamp - } +func (m *Messenger) getChatIdsForCommunity(communityID types.HexBytes) ([]string, error) { + community, err := m.GetCommunityByID(communityID) + if err != nil { + return []string{}, err + } + + if community == nil { + return []string{}, errors.New("no community found") } - return end + return community.ChatIDs(), nil } -func (m *Messenger) collectCommunityMessagesMetrics(request *requests.CommunityMetricsRequest) (*CommunityMetricsResponse, error) { - community, err := m.GetCommunityByID(request.CommunityID) +func (m *Messenger) collectCommunityMessagesTimestamps(request *requests.CommunityMetricsRequest) (*CommunityMetricsResponse, error) { + chatIDs, err := m.getChatIdsForCommunity(request.CommunityID) if err != nil { return nil, err } - if community == nil { - return nil, errors.New("no community found") + intervals := []MetricsIntervalResponse{} + for _, sourceInterval := range request.Intervals { + // TODO: messages count should be stored in special table, not calculated here + timestamps, err := m.persistence.SelectMessagesTimestampsForChatsByPeriod(chatIDs, sourceInterval.StartTimestamp, sourceInterval.EndTimestamp) + if err != nil { + return nil, err + } + + // ther is no built-in sort for uint64 + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + intervals = append(intervals, MetricsIntervalResponse{ + StartTimestamp: sourceInterval.StartTimestamp, + EndTimestamp: sourceInterval.EndTimestamp, + Timestamps: timestamps, + }) + } + + response := &CommunityMetricsResponse{ + Type: request.Type, + CommunityID: request.CommunityID, + Intervals: intervals, } - // TODO: timestamp summary should be stored in special table, not calculated here - timestamps, err := m.persistence.FetchMessageTimestampsForChatsByPeriod(community.ChatIDs(), request.StartTimestamp, request.EndTimestamp) + return response, nil +} + +func (m *Messenger) collectCommunityMessagesCount(request *requests.CommunityMetricsRequest) (*CommunityMetricsResponse, error) { + chatIDs, err := m.getChatIdsForCommunity(request.CommunityID) if err != nil { return nil, err } - entries := map[uint64]uint{} - for _, timestamp := range timestamps { - value := floorToRange(timestamp, request.StartTimestamp, request.EndTimestamp, request.StepTimestamp) - entries[value] += 1 + intervals := []MetricsIntervalResponse{} + for _, sourceInterval := range request.Intervals { + // TODO: messages count should be stored in special table, not calculated here + count, err := m.persistence.SelectMessagesCountForChatsByPeriod(chatIDs, sourceInterval.StartTimestamp, sourceInterval.EndTimestamp) + if err != nil { + return nil, err + } + intervals = append(intervals, MetricsIntervalResponse{ + StartTimestamp: sourceInterval.StartTimestamp, + EndTimestamp: sourceInterval.EndTimestamp, + Count: count, + }) } response := &CommunityMetricsResponse{ Type: request.Type, CommunityID: request.CommunityID, - Entries: entries, + Intervals: intervals, } return response, nil @@ -59,8 +101,10 @@ func (m *Messenger) CollectCommunityMetrics(request *requests.CommunityMetricsRe } switch request.Type { - case requests.CommunityMetricsRequestMessages: - return m.collectCommunityMessagesMetrics(request) + case requests.CommunityMetricsRequestMessagesTimestamps: + return m.collectCommunityMessagesTimestamps(request) + case requests.CommunityMetricsRequestMessagesCount: + return m.collectCommunityMessagesCount(request) default: return nil, errors.New("metrics is not implemented yet") } diff --git a/protocol/messenger_community_metrics_test.go b/protocol/messenger_community_metrics_test.go index 1ce1f8afad0..1b0deabdf81 100644 --- a/protocol/messenger_community_metrics_test.go +++ b/protocol/messenger_community_metrics_test.go @@ -21,7 +21,7 @@ type MessengerCommunityMetricsSuite struct { MessengerBaseTestSuite } -func (s *MessengerCommunityMetricsSuite) prepareCommunity() *communities.Community { +func (s *MessengerCommunityMetricsSuite) prepareCommunityAndChatIDs() (*communities.Community, []string) { description := &requests.CreateCommunity{ Membership: protobuf.CommunityPermissions_NO_MEMBERSHIP, Name: "status", @@ -29,12 +29,63 @@ func (s *MessengerCommunityMetricsSuite) prepareCommunity() *communities.Communi Description: "status community description", } response, err := s.m.CreateCommunity(description, true) + s.Require().NoError(err) + s.Require().NotNil(response) + + s.Require().Len(response.Communities(), 1) + community := response.Communities()[0] + s.Require().Len(community.ChatIDs(), 1) + chatIDs := community.ChatIDs() + + // Create another chat + chat := &protobuf.CommunityChat{ + Permissions: &protobuf.CommunityPermissions{ + Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, + }, + Identity: &protobuf.ChatIdentity{ + DisplayName: "status", + Emoji: "👍", + Description: "status community chat", + }, + } + response, err = s.m.CreateCommunityChat(community.ID(), chat) s.Require().NoError(err) s.Require().NotNil(response) s.Require().Len(response.Communities(), 1) + s.Require().Len(response.Chats(), 1) + + chatIDs = append(chatIDs, response.Chats()[0].ID) + + return community, chatIDs +} + +func (s *MessengerCommunityMetricsSuite) prepareCommunityChatMessages(communityID string, chatIDs []string) { + s.generateMessages(chatIDs[0], communityID, []uint64{ + // out ouf range messages in the begining + 1690162000, + // 1st column, 1 message + 1690372200, + // 2nd column, 1 message + 1690372800, + // 3rd column, 1 message + 1690373000, + // out ouf range messages in the end + 1690373100, + }) - return response.Communities()[0] + s.generateMessages(chatIDs[1], communityID, []uint64{ + // out ouf range messages in the begining + 1690151000, + // 1st column, 2 messages + 1690372000, + 1690372100, + // 2nd column, 1 message + 1690372700, + // 3rd column empty + // out ouf range messages in the end + 1690373100, + }) } func (s *MessengerCommunityMetricsSuite) generateMessages(chatID string, communityID string, timestamps []uint64) { @@ -65,15 +116,12 @@ func (s *MessengerCommunityMetricsSuite) generateMessages(chatID string, communi s.Require().NoError(err) } -func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMessagesMetricsEmpty() { - community := s.prepareCommunity() +func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMetricsEmptyInterval() { + community, _ := s.prepareCommunityAndChatIDs() request := &requests.CommunityMetricsRequest{ - CommunityID: community.ID(), - Type: requests.CommunityMetricsRequestMessages, - StartTimestamp: 1690279200, - EndTimestamp: 1690282800, // one hour - StepTimestamp: 100, + CommunityID: community.ID(), + Type: requests.CommunityMetricsRequestMessagesTimestamps, } // Expect empty metrics @@ -82,125 +130,87 @@ func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMessagesMetricsEmpt s.Require().NotNil(resp) // Entries count should be empty - s.Require().Len(resp.Entries, 0) + s.Require().Len(resp.Intervals, 0) } -func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMessagesMetricsOneChat() { - community := s.prepareCommunity() - - s.Require().Len(community.ChatIDs(), 1) - chatId := community.ChatIDs()[0] +func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMessagesTimestamps() { + community, chatIDs := s.prepareCommunityAndChatIDs() - s.generateMessages(chatId, string(community.ID()), []uint64{ - // out ouf range messages in the begining - 1690162000, - 1690371999, - // 1st column, 3 message - 1690372000, - 1690372100, - 1690372200, - // 2nd column, 2 messages - 1690372700, - 1690372800, - // 3rd column, 1 message - 1690373000, - // out ouf range messages in the end - 1690373100, - 1690374000, - 1690383000, - }) + s.prepareCommunityChatMessages(string(community.ID()), chatIDs) // Request metrics request := &requests.CommunityMetricsRequest{ - CommunityID: community.ID(), - Type: requests.CommunityMetricsRequestMessages, - StartTimestamp: 1690372000, - EndTimestamp: 1690373000, // one hour - StepTimestamp: 300, + CommunityID: community.ID(), + Type: requests.CommunityMetricsRequestMessagesTimestamps, + Intervals: []requests.MetricsIntervalRequest{ + requests.MetricsIntervalRequest{ + StartTimestamp: 1690372000, + EndTimestamp: 1690372300, + }, + requests.MetricsIntervalRequest{ + StartTimestamp: 1690372400, + EndTimestamp: 1690372800, + }, + requests.MetricsIntervalRequest{ + StartTimestamp: 1690372900, + EndTimestamp: 1690373000, + }, + }, } resp, err := s.m.CollectCommunityMetrics(request) s.Require().NoError(err) s.Require().NotNil(resp) - // floor(1000 / 300) == 3 - s.Require().Len(resp.Entries, 3) + s.Require().Len(resp.Intervals, 3) - s.Require().Equal(resp.Entries[1690372300], uint(3)) - // No entries for 1690372600 - s.Require().Equal(resp.Entries[1690372900], uint(2)) - s.Require().Equal(resp.Entries[1690373000], uint(1)) -} - -func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMessagesMetricsMultipleChats() { - community := s.prepareCommunity() - - s.Require().Len(community.ChatIDs(), 1) - chatIds := community.ChatIDs() - - // Create another chat - chat := &protobuf.CommunityChat{ - Permissions: &protobuf.CommunityPermissions{ - Access: protobuf.CommunityPermissions_NO_MEMBERSHIP, - }, - Identity: &protobuf.ChatIdentity{ - DisplayName: "status", - Emoji: "👍", - Description: "status community chat", - }, + for i := 0; i < 3; i++ { + s.Require().Equal(resp.Intervals[i].StartTimestamp, request.Intervals[i].StartTimestamp) + s.Require().Equal(resp.Intervals[i].EndTimestamp, request.Intervals[i].EndTimestamp) } - response, err := s.m.CreateCommunityChat(community.ID(), chat) - s.Require().NoError(err) - s.Require().NotNil(response) - s.Require().Len(response.Communities(), 1) - s.Require().Len(response.Chats(), 1) - chatIds = append(chatIds, response.Chats()[0].ID) + s.Require().Equal(resp.Intervals[0].Timestamps, []uint64{1690372000, 1690372100, 1690372200}) + s.Require().Equal(resp.Intervals[1].Timestamps, []uint64{1690372700, 1690372800}) + s.Require().Equal(resp.Intervals[2].Timestamps, []uint64{1690373000}) +} - s.generateMessages(chatIds[0], string(community.ID()), []uint64{ - // out ouf range messages in the begining - 1690162000, - // 1st column, 1 message - 1690372200, - // 2nd column, 1 message - 1690372800, - // 3rd column, 1 message - 1690373000, - // out ouf range messages in the end - 1690373100, - }) +func (s *MessengerCommunityMetricsSuite) TestCollectCommunityMessagesCount() { + community, chatIDs := s.prepareCommunityAndChatIDs() - s.generateMessages(chatIds[1], string(community.ID()), []uint64{ - // out ouf range messages in the begining - 1690152000, - // 1st column, 2 messages - 1690372000, - 1690372100, - // 2nd column, 1 message - 1690372700, - // 3rd column empty - // out ouf range messages in the end - 1690373100, - }) + s.prepareCommunityChatMessages(string(community.ID()), chatIDs) // Request metrics request := &requests.CommunityMetricsRequest{ - CommunityID: community.ID(), - Type: requests.CommunityMetricsRequestMessages, - StartTimestamp: 1690372000, - EndTimestamp: 1690373000, // one hour - StepTimestamp: 300, + CommunityID: community.ID(), + Type: requests.CommunityMetricsRequestMessagesCount, + Intervals: []requests.MetricsIntervalRequest{ + requests.MetricsIntervalRequest{ + StartTimestamp: 1690372000, + EndTimestamp: 1690372300, + }, + requests.MetricsIntervalRequest{ + StartTimestamp: 1690372400, + EndTimestamp: 1690372800, + }, + requests.MetricsIntervalRequest{ + StartTimestamp: 1690372900, + EndTimestamp: 1690373000, + }, + }, } resp, err := s.m.CollectCommunityMetrics(request) s.Require().NoError(err) s.Require().NotNil(resp) - // floor(1000 / 300) == 3 - s.Require().Len(resp.Entries, 3) + s.Require().Len(resp.Intervals, 3) + + for i := 0; i < 3; i++ { + s.Require().Equal(resp.Intervals[i].StartTimestamp, request.Intervals[i].StartTimestamp) + s.Require().Equal(resp.Intervals[i].EndTimestamp, request.Intervals[i].EndTimestamp) + } - s.Require().Equal(resp.Entries[1690372300], uint(3)) - // No entries for 1690372600 - s.Require().Equal(resp.Entries[1690372900], uint(2)) - s.Require().Equal(resp.Entries[1690373000], uint(1)) + s.Require().Equal(resp.Intervals[0].Count, 3) + s.Require().Equal(resp.Intervals[1].Count, 2) + s.Require().Equal(resp.Intervals[2].Count, 1) } diff --git a/protocol/persistence_metrics.go b/protocol/persistence_metrics.go index 510e1474307..c4478062095 100644 --- a/protocol/persistence_metrics.go +++ b/protocol/persistence_metrics.go @@ -1,20 +1,28 @@ package protocol import ( - "context" "database/sql" + "fmt" + "strings" ) -func (db sqlitePersistence) fetchMessagesTimestampsForPeriod(tx *sql.Tx, chatID string, startTimestamp uint64, endTimestamp uint64) ([]uint64, error) { - rows, err := tx.Query(` - SELECT whisper_timestamp FROM user_messages - WHERE local_chat_id = ? AND - whisper_timestamp >= ? AND - whisper_timestamp <= ?`, - chatID, - startTimestamp, - endTimestamp, - ) +func querySeveralChats(chatIDs []string) string { + if len(chatIDs) == 0 { + return "" + } + + var conditions []string + for _, chatID := range chatIDs { + conditions = append(conditions, fmt.Sprintf("local_chat_id = '%s'", chatID)) + } + return fmt.Sprintf("(%s) AND", strings.Join(conditions, " OR ")) +} + +func (db sqlitePersistence) SelectMessagesTimestampsForChatsByPeriod(chatIDs []string, startTimestamp uint64, endTimestamp uint64) ([]uint64, error) { + query := fmt.Sprintf("SELECT whisper_timestamp FROM user_messages WHERE %s whisper_timestamp >= ? AND whisper_timestamp <= ?", + querySeveralChats(chatIDs)) + + rows, err := db.db.Query(query, startTimestamp, endTimestamp) if err != nil { return []uint64{}, err } @@ -33,44 +41,17 @@ func (db sqlitePersistence) fetchMessagesTimestampsForPeriod(tx *sql.Tx, chatID return timestamps, nil } -func (db sqlitePersistence) FetchMessageTimestampsForChatByPeriod(chatID string, startTimestamp uint64, endTimestamp uint64) ([]uint64, error) { - tx, err := db.db.BeginTx(context.Background(), &sql.TxOptions{}) - if err != nil { - return []uint64{}, err - } - defer func() { - if err == nil { - err = tx.Commit() - return - } - // don't shadow original error - _ = tx.Rollback() - }() +func (db sqlitePersistence) SelectMessagesCountForChatsByPeriod(chatIDs []string, startTimestamp uint64, endTimestamp uint64) (int, error) { + query := fmt.Sprintf("SELECT COUNT(*) FROM user_messages WHERE %s whisper_timestamp >= ? AND whisper_timestamp <= ?", + querySeveralChats(chatIDs)) - return db.fetchMessagesTimestampsForPeriod(tx, chatID, startTimestamp, endTimestamp) -} - -func (db sqlitePersistence) FetchMessageTimestampsForChatsByPeriod(chatIDs []string, startTimestamp uint64, endTimestamp uint64) ([]uint64, error) { - tx, err := db.db.BeginTx(context.Background(), &sql.TxOptions{}) - if err != nil { - return []uint64{}, err - } - defer func() { - if err == nil { - err = tx.Commit() - return + var count int + if err := db.db.QueryRow(query, startTimestamp, endTimestamp).Scan(&count); err != nil { + if err == sql.ErrNoRows { + return 0, nil } - // don't shadow original error - _ = tx.Rollback() - }() - - var timestamps []uint64 - for _, chatID := range chatIDs { - chatTimestamps, err := db.fetchMessagesTimestampsForPeriod(tx, chatID, startTimestamp, endTimestamp) - if err != nil { - return []uint64{}, err - } - timestamps = append(timestamps, chatTimestamps...) + return 0, err } - return timestamps, nil + + return count, nil } diff --git a/protocol/requests/community_metrics_request.go b/protocol/requests/community_metrics_request.go index d1ceb24b428..b5ca58da31d 100644 --- a/protocol/requests/community_metrics_request.go +++ b/protocol/requests/community_metrics_request.go @@ -7,23 +7,26 @@ import ( ) var ErrNoCommunityId = errors.New("community metrics request has no community id") -var ErrInvalidTimestampInterval = errors.New("community metrics request invalid time interval") -var ErrInvalidTimestampStep = errors.New("community metrics request invalid time step") +var ErrInvalidTimestampIntervals = errors.New("community metrics request invalid time intervals") type CommunityMetricsRequestType uint const ( - CommunityMetricsRequestMessages CommunityMetricsRequestType = iota + 1 + CommunityMetricsRequestMessagesTimestamps CommunityMetricsRequestType = iota + 1 + CommunityMetricsRequestMessagesCount CommunityMetricsRequestType = iota + 1 CommunityMetricsRequestMembers CommunityMetricsRequestControlNodeUptime ) +type MetricsIntervalRequest struct { + StartTimestamp uint64 `json:"startTimestamp"` + EndTimestamp uint64 `json:"endTimestamp"` +} + type CommunityMetricsRequest struct { - CommunityID types.HexBytes `json:"communityId"` - Type CommunityMetricsRequestType `json:"type"` - StartTimestamp uint64 `json:"startTimestamp"` - EndTimestamp uint64 `json:"endTimestamp"` - StepTimestamp uint64 `json:"stepTimestamp"` + CommunityID types.HexBytes `json:"communityId"` + Type CommunityMetricsRequestType `json:"type"` + Intervals []MetricsIntervalRequest `json:"intervals"` } func (r *CommunityMetricsRequest) Validate() error { @@ -31,12 +34,11 @@ func (r *CommunityMetricsRequest) Validate() error { return ErrNoCommunityId } - if r.StartTimestamp == 0 || r.EndTimestamp == 0 || r.StartTimestamp >= r.EndTimestamp { - return ErrInvalidTimestampInterval + for _, interval := range r.Intervals { + if interval.StartTimestamp == 0 || interval.EndTimestamp == 0 || interval.StartTimestamp >= interval.EndTimestamp { + return ErrInvalidTimestampIntervals + } } - if r.StepTimestamp < 1 || r.StepTimestamp > (r.EndTimestamp-r.StartTimestamp) { - return ErrInvalidTimestampStep - } return nil }