Skip to content

Imperative Squall interface

Aleksandar Vitorovic edited this page Mar 27, 2016 · 6 revisions

Squall translates SQL queries to query plans. You can also write Squall plans by hand. Squall already contains several manually-specified query plans: TPC-H Q3, Q4, Q5, Q7, Q8, Q9 and Q10.

Imperative Squall interface: Local Mode

As usual, testing code in local mode is way better than in a cluster environment. Namely, in local mode all the errors go to the standard output/error, whereas in cluster mode one has to search for errors across many nodes. The same holds for presenting the final result.

Prerequisite: We assume that you already set up Squall such that it works with SQL queries in Local Mode.

We will demonstrate running Squall on manually-specified query plans on hyracks (in source code: squall-examples/squall-java-examples/src/ch/epfl/data/squall/examples/imperative/shj/HyracksPlan.java). We will use test/squall_plan_runner/confs/local/0_01G_hyracks.

You can run Squall in Local Mode:

    cd bin
    ./squall_local.sh PLAN_RUNNER ..test/squall_plan_runner/confs/local/0_01G_hyracks

Now we will talk about an output of a Squall/Storm run. In Local mode, all the output goes to console. First, Storm produces some output. You can safely ignore messages of the form Task 0_01G_hyracks-1-1333023576:1 timed out. This kind of messages always occurs when a task is started.

Then you will see something like:

    ... TopologyKiller: Received EOF message from: 2
    ... TopologyKiller: 1 remaining
    ... TopologyKiller: Received EOF message from: 3
    ... TopologyKiller: 0 remaining
    ... TopologyKiller: Received EOF from all spouts. Killing cluster...

This shows the information about Spouts that have finished processing their input. When all Spouts are done, Squall/Storm produces the final result. In this case, it is:

    All the tasks of the last component in total received 15000 tuples.
    FURNITURE = 3007 
    AUTOMOBILE = 2979 
    MACHINERY = 2536 
    BUILDING = 3706 
    HOUSEHOLD = 2772

Iteration refers to the number of tuples the last component (named CUSTOMER_ORDERS) received.

In test/squall_plan_runner/confs you can find more examples of config files. In order to run Squall in Local Mode with some other config file, you must set DIP_DATA_PATH for that config file such that it points to a database of the appropriate size on your machine and then run the following commands:

    cd bin
    ./squall_local.sh PLAN_RUNNER $CONFIG_FILE_PATH

where $CONFIG_FILE_PATH is an absolute or relative path to a config file.

The corresponding SQL for hyracks query is as follows:

    SELECT C_MKTSEGMENT, COUNT(O_ORDERKEY)
    FROM CUSTOMER join ORDERS on C_CUSTKEY = O_CUSTKEY
    GROUP BY C_MKTSEGMENT

Alt text

For the SQL query we showed above, the corresponding query plan is presented below (this is actually a part of squall-examples/squall-java-examples/src/ch/epfl/data/squall/examples/imperative/shj/HyracksPlan.java class):

Component customer = new DataSourceComponent("customer", conf)
                            .add(new ProjectOperator(0, 6));
Component orders = new DataSourceComponent("orders", conf)
                            .add(new ProjectOperator(1));
Component custOrders = new EquiJoinComponent(customer, 0, orders, 0)
                            .add(new AggregateCountOperator(conf).setGroupByColumns(1));

You need not to specify schema if you are writing query plans manually, because the system refer to a column using an absolute position (index) within a tuple.

Note that we do not have to perform any projections on the CUSTOMER_ORDERS join component. The output tuple does not contain CUSTKEY from the right parent, since it is already included (with the same value) from the left parent. A hash from a parent relation (CUSTOMER, ORDERS) refers to a position(s) in a tuple after projection is performed. The hash denotes the columns from a tuple that are join keys in the join component. You can find more query plan examples in the package plan_runner.query_plans. Their corresponding config files can be found in test/squall_plan_runner/confs.

Now we discuss parameters inside a config file. Once more, we will use 0.01G_hyracks_serial for illustration:

    DIP_DISTRIBUTED false
    DIP_QUERY_NAME hyracks
    DIP_TOPOLOGY_NAME_PREFIX username
    DIP_NUM_ACKERS 0

    DIP_DATA_PATH ../test/data/tpch/0.01G/
    DIP_RESULT_ROOT ../test/results/

    CUSTOMER_PAR 1
    ORDERS_PAR 1

    CUSTOMER_ORDERS_PAR 1

    #below are unlikely to change
    DIP_EXTENSION .tbl
    DIP_READ_SPLIT_DELIMITER \|
    DIP_GLOBAL_ADD_DELIMITER |
    DIP_GLOBAL_SPLIT_DELIMITER \|

    DIP_KILL_AT_THE_END true

    # Storage manager parameters
    # Storage directory for local runs
    STORAGE_LOCAL_DIR /tmp/ramdisk
    # Storage directory for cluster runs
    STORAGE_DIP_DIR /export/home/squalldata/storage 
    STORAGE_COLD_START true
    MEMORY_SIZE_MB 4096

DIP_DISTRIBUTED must be false to execute the query plan in Local mode. DIP_QUERY_NAME must be case-insensitive equal to a query plan from plan_runner.query_plans package, without the “Plan” suffix at the end. For example, here we have a query named “hyracks” which targets HyracksPlan class. Topology name is built by concatenation of DIP_TOPOLOGY_NAME_PREFIX and config file name. DIP_TOPOLOGY_NAME_PREFIX is there to distinguish different users, but it must be set only in Cluster Mode.

DIP_NUM_ACKERS represents the number of nodes used for ensuring that each tuple is fully propagated, so the final result and the full execution time can be acquired. If a positive value is set, we ack each and every tuple. If you set this parameter to 0, each Spout sends a special message as the last tuple. For more information about implications of this parameter, please consult Squall query plans vs Storm topologies, section To ack or not to ack?. DIP_DATA_PATH points to a location of your database.

The parallelism of a component is denoted through COMPONENT_NAME_PAR. Note the convention for naming an EquiJoinComponent, it should have the following form: LEFTPARENT_RIGHTPARENT_PAR. Due to the constrained main memory, you cannot run arbitrary large database with small component parallelism. For information on detecting this behavior, please consult Squall query plans vs Storm topologies, section How to know we run out of memory?.

Now we explain the parameters you most likely would not need to change; DIP_EXTENSION refers to file extension in your database. In our case, the names of the database files were customer.tbl, orders.tbl, etc. DIP_READ_SPLIT_DELIMITER is a regular expression used for delimiting columns of a tuple in a database file. DIP_GLOBAL_ADD_DELIMITER and DIP_GLOBAL_SPLIT_DELIMITER are used in Squall internally for serializing and deserializing tuples between different components. DIP_KILL_AT_THE_END assures your topology is killed after the final result is written to a file. If you set this to false, your topology will execute forever, consuming resources that could be used by other topologies executing at the same time.

Imperative Squall interface: Cluster Mode

Other than for reading logs, you do not need to ssh to any of the cluster ma- chines.

Prerequisite: We assume that you already set up Squall such that it works with SQL queries in Cluster Mode.

We will illustrate the procedure with test/squall_plan_runner/confs/cluster/1G_hyracks config file. You have to set DIP_DATA_PATH such that it points to a 1-scalling factor TPC-H database on the cluster.

You can run Squall with the following commands:

    cd bin
    ./squall_cluster.sh PLAN_RUNNER ../test/squall_plan_runner/confs/cluster/1G_hyracks

You can run Squall in Cluster Mode with some other config file:

    cd bin
    ./squall_cluster.sh PLAN_RUNNER $CONFIG_FILE_PATH

where CONFIG_FILE_PATH is a path to a config file. You can also write config files from scratch. Keep in mind that for any config file you are going to use which use different database size, you have to specify DIP_DATA_PATH properly.

The config file test/squall_plan_runner/confs/cluster/1G_hyracks) is presented here:

    DIP_DISTRIBUTED true
    DIP_QUERY_NAME hyracks
    DIP_TOPOLOGY_NAME_PREFIX username

    # the following two are optional, by default they use topology.workers and topology.ackers from storm.yaml
    #DIP_NUM_WORKERS 176
    #DIP_NUM_ACKERS 0

    DIP_DATA_PATH /export/home/squalldata/tpchdb/1G/

    CUSTOMER_PAR 8
    ORDERS_PAR 8

    CUSTOMER_ORDERS_PAR 8

    #below are unlikely to change
    DIP_EXTENSION .tbl
    DIP_READ_SPLIT_DELIMITER \|
    DIP_GLOBAL_ADD_DELIMITER |
    DIP_GLOBAL_SPLIT_DELIMITER \|

    DIP_KILL_AT_THE_END true

    # Storage manager parameters
    # Storage directory for local runs
    STORAGE_LOCAL_DIR /tmp/ramdisk
    # Storage directory for cluster runs
    STORAGE_DIP_DIR /export/home/squalldata/storage 
    STORAGE_COLD_START true
    MEMORY_SIZE_MB 4096

Now we explain the parameters which differs from their counterparts in Local Mode. DIP_DISTRIBUTED is set to true. The DIP_NUM_PARALLELISM reflects the maximum number of physical nodes you want to use. You cannot use more than what the cluster offers. If your topology requires more nodes than are available through this parameter, some of the tasks will be collocated in a single node. More information about that can be found in Squall query plans vs Storm topologies, section Assigning Storm components to Nodes.

Due to the constrained main memory, you cannot run arbitrary large database with small component parallelism. For information on detecting this behavior, please consult Squall query plans vs Storm topologies, section How to know we run out of memory?.

In Cluster Mode, DIP_NUM_ACKERS is commented out, because the default value (0) is set in storm.yaml. For more information about implications of this parameter, please consult Squall query plans vs Storm topologies, section To ack or not to ack?. DIP_NUM_WORKERS represents number of worker processes across the cluster. We didn't say number of worker nodes, but processes, because there might be multiple Storm processes on each node. It directly corresponds to topology.workers from storm.yaml, and it takes its default value from there.

When you run Squall in Cluster Mode, it will return as soon as the topology is submitted to the cluster. You can monitor the execution of your topology at http://STORM_UI_SERVER:8080. Here you can find various information such as information about active topologies and the number of tuples sent between Spouts and Bolts. Unfortunately, you can monitor your topology only in Cluster Mode.

Your topology will be killed after the final result is produced. You can also kill it explicitly:

    storm kill myTopologyName

Now we are using 1GB database, so the correct result is:

    The result for topology `username_1G_hyracks`
    Component COUNTAGG:
    Iteration 1500000:
    FURNITURE = 299461
    BUILDING = 303959
    MACHINERY = 298980
    HOUSEHOLD = 300147
    AUTOMOBILE = 297453