Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Data Maintenance #9

Merged
merged 21 commits into from
Jun 23, 2022
Merged

Conversation

wjxiz1992
Copy link
Collaborator

@wjxiz1992 wjxiz1992 commented May 26, 2022

Signed-off-by: Allen Xu allxu@nvidia.com

This PR add initial supports to do part of Data Maintenance work.

Data Maintenance requires ACID operations like INSERT, DELETE and Spark currently doesn't provide native supports for them. So we choose Iceberg as the data source metadata manager.

With this change, we will:

  • support generate UPDATE data for Data Maintenance
  • support saving the converted data into Iceberg tables directly.
  • run Data Maintenance Functions
    • INSERT
    • DELETE

fix: #4 , #8

wjxiz1992 added 2 commits May 26, 2022 18:02
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
@wjxiz1992 wjxiz1992 self-assigned this May 26, 2022
@wjxiz1992
Copy link
Collaborator Author

wjxiz1992 commented May 26, 2022

When runnning LF_SR function, we see

"Cannot write incompatible data to table 'spark_catalog.default.store_returns':\n- Cannot safely cast 'sr_ticket_number': string to int"

Cause:
in "store_returns" table, we define (official TPCDS_TOOLKIT/tools/tpcds.sql, old version: https://github.com/gregrahn/tpcds-kit/blob/master/tools/tpcds.sql#L332)

 StructField("sr_ticket_number", IntegerType(), nullable=False),

but in the DM data "s_store_returns" definition (official TPCDS_TOOLKIT/tools/tpcds_source.sql old version: https://github.com/gregrahn/tpcds-kit/blob/master/tools/tpcds_source.sql#L356):

StructField("sret_ticket_number", CharType(20)),

Those definitions are from TPC-DS Spec.
The insert SQL is:

CREATE view srv as
SELECT d_date_sk sr_returned_date_sk
 ,t_time_sk sr_return_time_sk
 ,i_item_sk sr_item_sk
 ,c_customer_sk sr_customer_sk
 ,c_current_cdemo_sk sr_cdemo_sk
 ,c_current_hdemo_sk sr_hdemo_sk
 ,c_current_addr_sk sr_addr_sk
 ,s_store_sk sr_store_sk
 ,r_reason_sk sr_reason_sk
 ,sret_ticket_number sr_ticket_number
 ,sret_return_qty sr_return_quantity
 ,sret_return_amt sr_return_amt
 ,sret_return_tax sr_return_tax
 ,sret_return_amt + sret_return_tax sr_return_amt_inc_tax
 ,sret_return_fee sr_fee
 ,sret_return_ship_cost sr_return_ship_cost
 ,sret_refunded_cash sr_refunded_cash
 ,sret_reversed_charge sr_reversed_charge
 ,sret_store_credit sr_store_credit
 ,sret_return_amt+sret_return_tax+sret_return_fee
 -sret_refunded_cash-sret_reversed_charge-sret_store_credit sr_net_loss
FROM s_store_returns 
LEFT OUTER JOIN date_dim 
 ON (cast(sret_return_date as date) = d_date)
LEFT OUTER JOIN time_dim 
 ON (( cast(substr(sret_return_time,1,2) AS integer)*3600
 +cast(substr(sret_return_time,4,2) AS integer)*60
 +cast(substr(sret_return_time,7,2) AS integer)) = t_time)
LEFT OUTER JOIN item ON (sret_item_id = i_item_id)
LEFT OUTER JOIN customer ON (sret_customer_id = c_customer_id)
LEFT OUTER JOIN store ON (sret_store_id = s_store_id)
LEFT OUTER JOIN reason ON (sret_reason_id = r_reason_id)
WHERE i_rec_end_date IS NULL
 AND s_rec_end_date IS NULL;
------------------------------------------------
insert into store_returns (select * from srv);

According to Spec:

2.2.2.1 Each column employs one of the following datatypes: a) Identifier means that the column shall be able to hold any key value generated for that column.

2.2.2.3 The implementation chosen by the test sponsor for a particular datatype definition shall be applied consistently to all the instances of that datatype definition in the schema, except for identifier columns, whose datatype may be selected to satisfy database scaling requirements.

I can fix this by changing
StructField("sret_ticket_number", CharType(20)) to StructField("sret_ticket_number", IntegerType()),

@@ -21,5 +21,13 @@ export SPARK_CONF=("--master" "yarn"
"--num-executors" "8"
"--executor-memory" "40G"
"--executor-cores" "12"
"--conf" "spark.task.cpus=1")
"--conf" "spark.task.cpus=1"
"--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the first character seems not aligned.

@@ -21,5 +21,13 @@ export SPARK_CONF=("--master" "yarn"
"--num-executors" "8"
"--executor-memory" "40G"
"--executor-cores" "12"
"--conf" "spark.task.cpus=1")
"--conf" "spark.task.cpus=1"
"--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we consider the case of not using iceberg?
In that case, the parameters for iceberg should be enabled by some options.

Copy link
Collaborator Author

@wjxiz1992 wjxiz1992 May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe What's our plan/strategy to do the WHOLE NDS test? Are we going to do them all based on Iceberg? For #8 I've made the transcode step to save the data ONLY to Iceberg, should we keep our old way to save them just to a folder? The old way may be more friendly to users who doesn't know Iceberg, but eventually, if they want to perform the whole NDS test including Data Maintenance, they will come back to do Iceberg writing again... Any suggestions for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do the entire NDS suite one would need to use a format that supports the entire set of operations required by the entire suite. That would mean using Iceberg, Delta Lake, or some other format that allows incremental table update.

I've made the transcode step to save the data ONLY to Iceberg

That's not desired. We want to support transcoding to a bunch of different formats, because we're not always going to run the entire suite. We get a lot of useful information from running the significant portion of NDS that works on raw Parquet and ORC files, and we do not want to lose the ability to setup those benchmarks. The transcode needs to be flexible, allowing outputs ideally to every major output format that we want to bench. For now that definitely includes raw Parquet and ORC along with Iceberg (and the ability to control settings for these formats such as compression codec, probably via separate configs spec'd either inline or sideband in the Spark instance to use).

wjxiz1992 added 4 commits May 27, 2022 09:14
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
@wjxiz1992
Copy link
Collaborator Author

wjxiz1992 commented May 31, 2022

For DELETE functions, e.g. DF_CS.sql

delete from catalog_returns 
   where cr_order_number in 
      (select cs_order_number from 
             catalog_sales, date_dim 
                 where cs_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2');

results in

 java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.execution.SparkPlan
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
	at org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)

I can break the SQL into

cs_order_number = spark.sql("""
          select cs_order_number from catalog_sales, date_dim where cs_sold_date_sk=d_date_sk and d_date between '2000-05-20' and '2000-05-21'
          """).collect()

cs_order_number_lst = [x['cs_order_number'] for x in cs_order_number]

spark.sql(f"""
          delete from catalog_returns
          where cr_order_number in ({str(cs_order_number_lst)[1:-1]});
          """)

And this can work.

wjxiz1992 added 2 commits June 2, 2022 16:01
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
@wjxiz1992 wjxiz1992 changed the title WIP: init support for Data Maintenance Initial support for Data Maintenance Jun 2, 2022
Signed-off-by: Allen Xu <allxu@nvidia.com>
@jlowe
Copy link
Member

jlowe commented Jun 8, 2022

java.lang.ClassCastException: org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to org.apache.spark.sql.execution.SparkPlan

This seems like potentially a bug in Spark 3.2, especially since the error is such a low-level class cast error. I was able to get the same query to plan without an error on Spark 3.1.2.

@wjxiz1992
Copy link
Collaborator Author

One issue for our use case, we want to use Spark 3.2.1 as our NDS 2.0 benchmark environment due to some performance consideration especially for query77(there's a huge performance drop in 3.1.2).
I'm going to file an issue at Spark Jira.

@wjxiz1992
Copy link
Collaborator Author

wjxiz1992 commented Jun 13, 2022

Filed a Spark issue: https://issues.apache.org/jira/browse/SPARK-39454

Update: it's said that this will be fixed in Spark 3.3.0 and Spark 3.2.2

@wjxiz1992
Copy link
Collaborator Author

wjxiz1992 commented Jun 14, 2022

I verified on Spark 3.2.2, it can work.

Signed-off-by: Allen Xu <allxu@nvidia.com>
@wjxiz1992 wjxiz1992 requested a review from jlowe June 14, 2022 09:26
nds/README.md Show resolved Hide resolved
nds/README.md Outdated Show resolved Hide resolved
nds/README.md Outdated Show resolved Hide resolved
nds/convert_submit_cpu_iceberg.template Show resolved Hide resolved
nds/convert_submit_cpu_iceberg.template Show resolved Hide resolved
nds/nds_maintenance.py Outdated Show resolved Hide resolved
nds/nds_transcode.py Outdated Show resolved Hide resolved
nds/nds_transcode.py Outdated Show resolved Hide resolved
nds/README.md Outdated Show resolved Hide resolved
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
nds/README.md Outdated Show resolved Hide resolved
nds/nds_transcode.py Outdated Show resolved Hide resolved
Signed-off-by: Allen Xu <allxu@nvidia.com>
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is close. Minor comments on documentation and waiting to hear back on copyright/license question.

nds/README.md Outdated Show resolved Hide resolved
nds/nds_transcode.py Outdated Show resolved Hide resolved
wjxiz1992 and others added 2 commits June 16, 2022 23:05
Co-authored-by: Jason Lowe <jlowe@nvidia.com>
Co-authored-by: Jason Lowe <jlowe@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
Signed-off-by: Allen Xu <allxu@nvidia.com>
@wjxiz1992 wjxiz1992 requested a review from jlowe June 22, 2022 01:55
Signed-off-by: Allen Xu <allxu@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Data Maintenance Benchmark
3 participants