diff --git a/backend/plugins/github_graphql/tasks/job_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go index 73c914a8b74..af25d589790 100644 --- a/backend/plugins/github_graphql/tasks/job_collector.go +++ b/backend/plugins/github_graphql/tasks/job_collector.go @@ -51,7 +51,11 @@ type GraphqlQueryCheckSuite struct { // equal to Job in rest CheckRuns struct { TotalCount int - Nodes []struct { + PageInfo struct { + EndCursor string `graphql:"endCursor"` + HasNextPage bool `graphql:"hasNextPage"` + } + Nodes []struct { Id string Name string DetailsUrl string @@ -79,7 +83,7 @@ type GraphqlQueryCheckSuite struct { } } `graphql:"steps(first: 50)"` } - } `graphql:"checkRuns(first: 50)"` + } `graphql:"checkRuns(first: $pageSize, after: $skipCursor)"` } `graphql:"... on CheckSuite"` } @@ -95,7 +99,45 @@ var CollectJobsMeta = plugin.SubTaskMeta{ DomainTypes: []string{plugin.DOMAIN_TYPE_CICD}, } -var _ plugin.SubTaskEntryPoint = CollectAccount +var _ plugin.SubTaskEntryPoint = CollectJobs + +func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { + queryWrapper := query.(*GraphqlQueryCheckRunWrapper) + hasNextPage := false + endCursor := "" + for _, node := range queryWrapper.Node { + if node.CheckSuite.CheckRuns.PageInfo.HasNextPage { + hasNextPage = true + endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor + break + } + } + return &helper.GraphqlQueryPageInfo{ + EndCursor: endCursor, + HasNextPage: hasNextPage, + }, nil +} + +func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + query := &GraphqlQueryCheckRunWrapper{} + if reqData == nil { + return query, map[string]interface{}{}, nil + } + workflowRuns := reqData.Input.([]interface{}) + checkSuiteIds := []map[string]interface{}{} + for _, iWorkflowRuns := range workflowRuns { + workflowRun := iWorkflowRuns.(*SimpleWorkflowRun) + checkSuiteIds = append(checkSuiteIds, map[string]interface{}{ + `id`: graphql.ID(workflowRun.CheckSuiteNodeID), + }) + } + variables := map[string]interface{}{ + "node": checkSuiteIds, + "pageSize": graphql.Int(reqData.Pager.Size), + "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor), + } + return query, variables, nil +} func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() @@ -137,26 +179,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ Input: iterator, - InputStep: 20, + InputStep: 10, GraphqlClient: data.GraphqlClient, - BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { - query := &GraphqlQueryCheckRunWrapper{} - if reqData == nil { - return query, map[string]interface{}{}, nil - } - workflowRuns := reqData.Input.([]interface{}) - checkSuiteIds := []map[string]interface{}{} - for _, iWorkflowRuns := range workflowRuns { - workflowRun := iWorkflowRuns.(*SimpleWorkflowRun) - checkSuiteIds = append(checkSuiteIds, map[string]interface{}{ - `id`: graphql.ID(workflowRun.CheckSuiteNodeID), - }) - } - variables := map[string]interface{}{ - "node": checkSuiteIds, - } - return query, variables, nil - }, + BuildQuery: buildQuery, + GetPageInfo: getPageInfo, ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { query := queryWrapper.(*GraphqlQueryCheckRunWrapper) for _, node := range query.Node { @@ -168,12 +194,13 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(*updatedAt) { return messages, helper.ErrFinishCollect } - messages = append(messages, errors.Must1(json.Marshal(node))) + messages = append(messages, errors.Must1(json.Marshal(checkRun))) } } return }, IgnoreQueryErrors: true, + PageSize: 20, }) if err != nil { return err