Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] Fix #3489: Spark repartitionForData can potentially sh… #3654

Merged
merged 1 commit into from
Oct 3, 2018
Merged

Conversation

weitian
Copy link
Contributor

@weitian weitian commented Aug 30, 2018

ranker training needs grouped data and the order of labeledPoint within a group has to be kept for evaluation.
For distributed training, repartition train data is important to keep the workload balanced. But the RDD repartition here shuffles all data, which breaks group and loses ordering required for ranking.

Also the random splitting the test data out train data here breaks the group, too.

The fix is to group the train data as RDD[Array[XGBLabeledPoint]] before the repartitioning and splitting.

While loading training data into RDD partitions, we could not control the edge of each partition, and the LabeledPoints belonging to one group may be broken into two RDD partitions. The chunks are stitched together by group id.

Another challenge is to conserve the JVM heap memory usage, so we can leave more memory to the XGBoost native library for training. Based on the streaming feature of Scala, the Iterator of a collection is "view based" rather than "builder based".
It means the Scala Iterator is lazy-evaluation which does not load all data into memory.
So we need to keep all transformation steps lazy (Spark transformation is lazy by default). This is the design idea for LabeledPointGroupIterator.

Based on same reason, duplication of Iterator here is not very memory efficient since the gap between two iterators are cached in memory.

@hcho3 hcho3 changed the title Fix #3489: [jvm-packages] Spark repartitionForData can potentially sh… [jvm-packages] Fix #3489: Spark repartitionForData can potentially sh… Aug 30, 2018
@CodingCat CodingCat self-requested a review August 30, 2018 21:21
@weitian weitian closed this Aug 30, 2018
@weitian weitian reopened this Aug 30, 2018
@weitian weitian closed this Aug 31, 2018
@weitian weitian reopened this Aug 31, 2018
@weitian
Copy link
Contributor Author

weitian commented Aug 31, 2018

The build was passed the first time, but it failed after I closed / reopen the PR

@CodingCat
Copy link
Member

something wrong when downloading file from github....retriggered the test

Copy link
Member

@CodingCat CodingCat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the first round of review and have some questions about the correctness of the algorithm

@CodingCat
Copy link
Member

the new changes lead to the stuck of some test case?

@weitian
Copy link
Contributor Author

weitian commented Sep 11, 2018

the new changes lead to the stuck of some test case?

hmm, all test passed in my local build.

the missing test starts from "distributed training with the specified worker number"
And this test does not have grouped data.

Here is my local log:
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.0.2.15, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=3}
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-13075541539431860144/train.row.page
[14:54:14] SparsePageSource: Finished writing to /tmp/1-cache-13075541539431860144/train
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-13075541539431860144/test.row.page
[14:54:14] SparsePageSource: Finished writing to /tmp/1-cache-13075541539431860144/test
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-28775758788172922589/train.row.page
[14:54:14] SparsePageSource: Finished writing to /tmp/1-cache-28775758788172922589/train
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-28775758788172922589/test.row.page
[14:54:14] SparsePageSource: Finished writing to /tmp/1-cache-28775758788172922589/test
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-02848682310356378771/train.row.page
[14:54:14] SparsePageSource: Finished writing to /tmp/1-cache-02848682310356378771/train
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-02848682310356378771/test.row.page
[14:54:14] SparsePageSource: Finished writing to /tmp/1-cache-02848682310356378771/test
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-13075541539431860144/train.col.page
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-02848682310356378771/train.col.page
[14:54:14] SparsePage::Writer Finished writing to /tmp/1-cache-28775758788172922589/train.col.page

  • distributed training with the specified worker number

Compared with Javis log:
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.20.0.139, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=3}
[23:10:31] SparsePage::Writer Finished writing to /tmp/1-cache-18689073119010624543/train.row.page
[23:10:31] SparsePageSource: Finished writing to /tmp/1-cache-18689073119010624543/train
[23:10:31] SparsePage::Writer Finished writing to /tmp/1-cache-18689073119010624543/test.row.page
[23:10:31] SparsePageSource: Finished writing to /tmp/1-cache-18689073119010624543/test
[23:10:31] SparsePage::Writer Finished writing to /tmp/1-cache-08996467644075568956/train.row.page
[23:10:31] SparsePageSource: Finished writing to /tmp/1-cache-08996467644075568956/train
[23:10:31] SparsePage::Writer Finished writing to /tmp/1-cache-08996467644075568956/test.row.page
[23:10:31] SparsePageSource: Finished writing to /tmp/1-cache-08996467644075568956/test
No output has been received in the last 10m0s, this potentially indicates a stalled build or something wrong with the build itself.

It looks like the cache writing was not finished. (with "useExternalMemory = true") ?

@weitian weitian closed this Sep 12, 2018
@weitian
Copy link
Contributor Author

weitian commented Sep 12, 2018

re-trigger the build

@weitian weitian reopened this Sep 12, 2018
@weitian
Copy link
Contributor Author

weitian commented Sep 12, 2018

@CodingCat the reason that java test hung was that I set "nWorkers = 3" in the test which exceeded the CPU cores in testing container. I changed it to "nWorkers = numWorkers"

@weitian weitian closed this Sep 12, 2018
@weitian weitian reopened this Sep 12, 2018
@weitian weitian closed this Sep 13, 2018
@weitian weitian reopened this Sep 13, 2018
@weitian
Copy link
Contributor Author

weitian commented Sep 13, 2018

@CodingCat Need some help here. Do you have any idea why the python-test failed?

@hcho3
Copy link
Collaborator

hcho3 commented Sep 13, 2018

@weitian I restarted the test. It had failed due to random out of memory error.

@weitian weitian closed this Sep 14, 2018
@weitian weitian reopened this Sep 14, 2018
@weitian
Copy link
Contributor Author

weitian commented Sep 14, 2018

@hcho3 pre-merge still failed, any idea?

@hcho3
Copy link
Collaborator

hcho3 commented Sep 14, 2018

@weitian Tests have been flaky lately. I'll need to take a look.

@weitian weitian closed this Sep 14, 2018
@weitian weitian reopened this Sep 14, 2018
@hcho3
Copy link
Collaborator

hcho3 commented Sep 14, 2018

Why did you restart the tests? Jenkins was still running

@weitian weitian closed this Sep 17, 2018
@weitian weitian reopened this Sep 17, 2018
@weitian
Copy link
Contributor Author

weitian commented Sep 18, 2018

@hcho3 @CodingCat
Any comment?

@hcho3
Copy link
Collaborator

hcho3 commented Sep 21, 2018

@weitian I have reset the Jenkins environment to get rid of residual files that was causing conflicts.

@CodingCat Does it look good to you? Should we merge it once tests are fixed?

@CodingCat
Copy link
Member

I haven't got bandwidth to take a second round on this, maybe @yanboliang can help on reviewing

@CodingCat
Copy link
Member

I took a quick review of the current approach, the assumptions here still raise some concerns to me

how about we make it as simple as

  1. if it's ranking training application, we just do not reshuffle the input rdd but instead starting N workers for the user (N is the number of partitions in the input rdd) if N doesn't match with nWorkers set by the user, we can give a WARN log

OR

  1. if it's ranking training application, we just do not reshuffle the input rdd, if user's configuration of nWorkers doesn't match with the partition number of input rdd, we stop the application with the reason giving to the user....

val edgeGroups = trainingData.mapPartitions(
new LabeledPointGroupIterator(_)).filter(_._1).map(r => (TaskContext.getPartitionId(), r._2))
// the edgeGroups should be small enough to be processed in one partition
val stitchedGroup = edgeGroups.coalesce(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this assumption true? what if we have 4 partitions each with millions of records belonging to a single group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do some math here.

Assume that we have a 100GB training data, the HDFS block size is 128MB, we have 800 partitions when loading data into RDD.
for each RDD partition, there are two edge groups, the first group and the last group, so the total number of edge groups are 800 * 2, which is small enough to be coalesced to one executor and processed.

What is an edge group? See if a group has 100 LabeledPoint, 30 of them may in one RDD partition, the 70 of them may in the next RDD partition. So there are only two edge groups in each partition.

The millions of groups other then the first and last groups in a RDD partition are normal groups and we do not need to stitch them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how you guarantee

  1. the data is loaded as plain HDFS file

  2. user didn't apply any additional transformation including repartition before feeding as training data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just use HDFS as an example, since we only collect two groups from each partition, the total size of edge groups is relatively small. So this is actually a optimization for performance, not causing performance issue.

Or we can use RDD group by which actually does the same logic but in a distributed fashion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you still didn’t answer my previous question, how about the user pass in a RDD containing 4 partitions each of which contains only 1/2 group data,

All groups will be collected into a single partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all data are collection into one partition, and then produces a list of groups in one partition.

@hcho3
Copy link
Collaborator

hcho3 commented Sep 23, 2018

@weitian Please ignore the failed test continuous-integration/jenkins/pr-head. (This was due to misconfiguration of Jenkins CI server.) Only continuous-integration/jenkins/pr-merge is relevant.

@weitian
Copy link
Contributor Author

weitian commented Sep 23, 2018

@CodingCat To push the group partition logic to the user may not be a good idea.

Sharing my own experience, when I started to use the xgboost-spark package, I just called the XGBoostRegressor API, and there was no error or exception. I found this bug until I saw the metrics which was very different from the result of the model trained in a single node.

Since evenly partitioned data is very critical for parallel training, the RDD has to be repartitioned and user has to deal with the crossing border issue with groups, which is very tricky.

Another issue is that, even the data is pre-partitioned, the current logic to split the training and testing sets still mess up the groups.

I know this is a PR with a lot of changes, but logically all those fixes in the PR are connected and it is hard to break them into multiple PRs.

@CodingCat
Copy link
Member

@weitian it's not about that this PR has many changes, it's that some assumptions which this PR is based on either bring potentially-incorrect results (previous dropping group version) or performance bottleneck (current coalesce(1) version)

my suggestion is

  1. disable training and test split for ranking task

  2. validate if the input RDD follows the constraints for ranking task

2.1) if not follow, by default, stop the application and give the reason

2.2) user can switch to the dropping group version if they know what they are doing

@weitian
Copy link
Contributor Author

weitian commented Sep 24, 2018

@CodingCat

  1. I changed coalesce(1) to use standard RDD groupBy. There should be no performance issue.
  2. disable training and test split gives user a bad experience since the metrics on test split is needed.
  3. It is very hard to explain to the user that there are so many restrictions if she/he want to training a ranker. And user will be confused on how to prepare the training data.
  4. without group, user can not do pairwise.

@hcho3
Copy link
Collaborator

hcho3 commented Sep 24, 2018

I have filed an issue #3720 to address flaky tests once and for all.

@CodingCat
Copy link
Member

generally LGTM, more comments though

assert(groups(15)._2(0).group == 16)
}

test("distributed training with group data") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any making-sense but small dataset we can use for ranking unit test? I think the current unit test regarding ranking are all testing if the training can finished

Copy link
Contributor Author

@weitian weitian Oct 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is to make sure that, after training data is repartitioned and split, the training groups are still correct and the order of LabeledPoints within a group are kept.

My changed unit test has a good coverage on the corner cases: test("repartitionForTrainingGroup with group data")

Yes, it only tests if training can finished. I can not use my training data from Apple, so let's see if we could find some sample data set for ranking.

…uffle all data and lose ordering required for ranking objectives
@weitian
Copy link
Contributor Author

weitian commented Oct 1, 2018

@hcho3 still some failure in AppVeyor build. Any idea?

@weitian weitian closed this Oct 2, 2018
@weitian weitian reopened this Oct 2, 2018
@hcho3
Copy link
Collaborator

hcho3 commented Oct 2, 2018

@weitian That was a weird failure. I think it was due to spurious download failure for R packages.

@CodingCat
Copy link
Member

thanks, merged !

@CodingCat CodingCat merged commit efc4f85 into dmlc:master Oct 3, 2018
alois-bissuel pushed a commit to criteo-forks/xgboost that referenced this pull request Dec 4, 2018
…y shuffle all data and lose ordering required for ranking objectives (dmlc#3654)
@lock lock bot locked as resolved and limited conversation to collaborators Jan 1, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants