Tuesday, August 15, 2017

SparkSQL


SparkSQL is a way for people to use SQL-like language to query their data with ease while taking advantage of the speed of Spark, a fast, general engine for data processing that runs over Hadoop. I wanted to test this out on a dataset I found from Walmart with their stores’ weekly sales numbers. I put the csv into our cluster’s HDFS (in /var/walmart) making it accessible to all Flux Hadoop users.


Spark has an SQLContext available by default as “sqlContext.sql”. SQLContext is a class that allows a user to run SQL-like queries in Spark. To use it, simply call it from the scala> prompt, and anything in parentheses will be interpreted as an SQL-style query.  First, I start up Spark in the normal way:


spark-shell --master yarn-client --queue <your_queue> --num-executors 35 --executor-cores 4 --executor-memory 5g


Then, I use sqlContext to create a table that describes the Walmart data. The “CREATE EXTERNAL TABLE” call will create a table in the metastore in my Flux home directory.

sqlContext.sql("CREATE EXTERNAL TABLE sales(store INT, dept INT, date STRING, weekly_sales BIGINT, IsHoliday BOOLEAN) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/var/walmart'")


Now that I have a table, I can run queries. I will start with a simple line count:

val count = sqlContext.sql(“SELECT COUNT(*) FROM sales”)


Note that no computations are actually done until you view the result of your query.  This means that all the above command has done so far is create a dataframe called “count”. A dataframe is essentially a data structure that contains “tabular” data. That is, it consists of rows and columns of data that can, for example, store the results of an SQL-style query. In order to have Spark perform the line count, I would run:

count.show()


This does the computation and gives me the line count of the dataset (421571), and the dataframe “count” now contains the result of my query. Knowing that this simple test was successful, I can now move on to running a more complex query:

val holiday = sqlContext.sql("SELECT date, store, dept FROM sales WHERE weekly_sales = 0 AND IsHoliday = TRUE GROUP BY date, store, dept")


That command created the dataframe “holiday”.  The output of this query should show all the departments from every store that had 0 sales during a holiday.  Like before, Spark will compute once I ask for the result:

holiday.show()


Notice that by default Spark will populate "holiday" with the entire result, but only show the top 20 rows of output.  To view more output, you can specify the amount of rows you want to see as a parameter of show(). For example, since "holiday" has 31 rows, you would run:

holiday.show(31)


When Spark populates a dataframe, it stores data in memory. Because of this, subsequent queries will run faster. However, if you quit your spark-shell, your data is gone. Therefore, it may be a good idea to save the result in your HDFS home directory. Run this command:
result.javaRDD.saveAsTextFile("sales-results")


On our cluster, this would save the output in the directory “/user/<uniqname>/sales-results”.


As mentioned before, the table “sales” was created in the metastore in my home directory. The metastore is external to Spark and my current Spark session, so the tables will be available even after a spark-shell exits. You may leave tables in your metastore if you would like to query them again in the future. However, if not, it’s a good idea to clean up your tables in case you want to use the same name in the future. I decided that I did not want the table anymore, so I ran:
sqlContext.sql(“DROP table sales”)


This deleted the table. Then, I exited the shell (simply by typing “exit”). I wanted to view my whole output which was conveniently saved in my HDFS home directory. I was able to do so by running:
hdfs dfs -cat sales-results/*


Overall, I found that SparkSQL was a great way for me to quickly run SQL-style queries and easily save the results into my HDFS home directory.



Friday, September 23, 2016

Hive on Spark

I have been testing Hive on Spark. One of the main benefits of running Hive on Spark (rather than on MapReduce) is that it would be able to run much faster. 

I wanted to run a pretty simple query on Hive using the Google Ngrams dataset. First, I just used it running on MapReduce, its default.

SELECT COUNT(*) FROM ngrams;

That query took 1 minutes and 12 seconds to run. Next, I decided to try it with Spark, which was accomplished by simply typing:

set hive.execution.engine=spark;

I ran the same query again, but this time it took 3 minutes and 12 seconds, which was almost 3 times as long as MapReduce.

It turned out the default settings for Spark on the cluster were pretty conservative, so they needed to be modified to run optimally. The next thing that I wanted to do was make sure that I found an “ideal” tuning for our cluster. I referenced this blog post in order to do so. We needed more executors and cores per executor in order for more tasks to be executed at once. We also required a change in how much memory each executor was allocated in order to take advantage of the large number of resources our cluster has, while not completely overwhelming it. Tuning Spark was more of an art than a science, but after testing and tweaking, I found that for the Flux Hadoop cluster, the settings should be 35 executors with 4 cores each, with about 5g of memory per executor.

set spark.executor.instances=35;
set spark.executor.cores=4;
set spark.executor.memory=5g;


That same query then took only 46 seconds to run. Spark, with all of the settings tuned, was about 36% faster than MapReduce in this example.

Some more examples of queries I ran and how Spark (with tuned settings) compared to MapReduce:


SELECT year, COUNT(ngram) FROM ngrams WHERE volumes = 1 GROUP BY year


MapReduce: 1:06
Spark: 0:55

Spark was about 17% faster.


We had another, larger data set to test this on, so the time differences for that are shown below. It seemed that the larger the data, the greater the difference between MR and Spark.


SELECT COUNT(*) FROM entries


MapReduce: 1:53
Spark: 1:06

Spark was about 47% faster.


SELECT owner FROM entries WHERE gr_name = ‘psych’ GROUP BY owner


MapReduce: 2:54
Spark: 1:01

Spark was about 65% faster.


Clearly, with the right tuning, Spark can be a decent amount faster than MapReduce on Hive for large datasets. Finding the ideal tuning does take some tweaking and testing, but ultimately leads to faster jobs.

Tuesday, September 29, 2015

Flux 7 Node Final Configuration

For details please refer to: Next Flux Bulk Purchase

We have the final configuration for Flux 7 the nodes are:


  • 2 x E5-2680V3  (24 total core)
  • 8 x 16GB DDR4 2133MHz (128GB)
  • 4TB 7200RPM Drive
  • EDR ConnectX 4 100Gbps Infiniband Adaptor

For price contact hpc-support@umich.edu

Desired quantities must be reported by 5pm Tuesday, October 13th.

Thursday, September 17, 2015

Globus endpoint sharing available to UM researchers

We have described in a number of Blog posts some features and benefits of using the the Globus File Transfer service.  Now that UM is a Globus Provider you have a new feature available to you, sharing of directories and files with your collaborators who are Globus users as well.

There are two avenues of sharing possible for you now. The first is via standard server endpoints that have sharing enabled and another via "Globus Connect Personal" client endpoints. Today I will describe sharing for standard servers endpoints only. Sharing for Personal Connect endpoints is a bit more complicated due to differences between OS versions of the client and will be described later.

To see if the endpoint you use has sharing enabled navigate to the endpoint in "Manage Endpoints" within the Globus web interface.  Click on the Sharing tab, note that you may have to Activate (login) a session on the endpoint first.  If sharing is enabled you will be told so and will see a "Add Shared Endpoint" button in the panel.  Shared endpoints are essentially sub-endpoints you can create and provide access to any other Globus user.

Lets go ahead and make a shared endpoint from umich#flux by clicking on the button.  You are presented a web form to provide required information:

Host Path  ~/Test_share

You can either give a complete absolute path or use unix shorthand (~/) for my home directory as I have done (make sure the shared directory exists first!).

New Endpoint Name   traeker#Test_share

Description Tell others know what this is about.


Clicking on the "Create and Manage permissions" button creates the shared endpoint and presents you with a new panel to manage permissions. It shows you the current access setting and clicking on the "Add Permission" button presents you with a number of options of how to share this endpoint with other Globus users.

Share With     check which one to use among  email, user, group, all users

Permissions  check one or both of read, write


A couple of things you to keep in mind as you set these parameters:
  • Be careful about choosing all users as this will allow all users logged into Globus to access this share.
  • By default only read permission is set. If you allow write permission you could get files containing viruses and also get yourself into trouble with any disk usage quotas.


One easy way to manage permissions to a large group of people is to create a Globus group and populate it with users.  Be advised that the entire group will have the same permissions so if you need some users to have different permissions, you either create a different group or add each user to the share individually. Using groups comes in handy when you have multiple shared directories to similar sets of collaborators.

Once a directory is shared with another Globus user he/she can find that endpoint name via the "shared with me" filter on the top of the endpoint list panel.  With name in hand they can now transfer files from/to that endpoint by typing in the name under the "Transfer Files" screen just like another other endpoint they have access to.

You can go back to this shared endpoint to add new or edit any access settings.


Globus endpoint sharing is very powerful as it gives non-UM collaborators access to your research data without having to create a UM "Sponsored account" for them to access your systems.  This is very similar to other cloud file sharing services like Box and Dropbox.  The big difference is that Globus does not store the data and thus quotas are managed by your systems policies.



Saturday, August 22, 2015

Flux High-Speed Data Transfer Service

Do you have a large data set on your own storage equipment that you would like to process on Flux? We can accommodate up to 40 gigabits per second of data transfer in and out of Flux via the campus Ethernet backbone. There is no additional cost to use this service, but you do need to contact us in order to set it up.

By default, network traffic between Flux compute nodes and other systems on campus takes place over standard one gigabit Ethernet connections. This is sufficient for modest amounts of traffic such as that generated by administrative tasks, monitoring, and home directory access.

Traffic between Flux and its high-speed /scratch filesystem runs over a separate 40 gigabit per second InfiniBand network within the datacenter, and data between Flux and off-campus systems on the Internet can be staged through our transfer server at up to 10 gigabits per second. This would seem to leave a gap though: what if you want direct high-speed connections between the Flux nodes and other systems on campus? We provide such connections using a Mellanox BX5020 InfiniBand/Ethernet gateway:

The Flux BX5020 Gateway

The gateway connects to the Flux InfiniBand network and to the campus Ethernet network and allows traffic to flow between the two networks. The InfiniBand network runs at 40 gigabits per second, and the gateway has four 10 gigabit links to the campus Ethernet network. This allows any Flux node to communicate with any system on campus at up to 40 gbit/s.

We have a customer that has multiple petabytes of data on their own storage equipment which they have been using Flux to process. We mount this customer's NFS servers on Flux and route the traffic through the gateway. The customer is currently running jobs on Flux against two of their 10-gigabit connected servers, and last weekend they reached a sustained data transfer rate into Flux of 14.3 gigabits per second.

Gateway traffic for the week of 8/11/2015 - 8/18/2015
Although we have pushed more than 14 gbit/s through the gateway during testing, this is a new record for production traffic through the system.

Our gateway is currently connected to the Ethernet network at 40 gigabits per second, but it can be readily expanded to 80 and possibly 120 gigabits per second as needed. Additionally, we plan to replace the existing gateway in the near future with newer equipment. The planned initial bandwidth for the new equipment is 160 gbit/s, and there is room for growth even beyond that.

No changes to your network configuration are needed to use the gateway; those changes take place on our end only. All you have to do is export your storage to our IP ranges. If you want to discuss or get set up for this service, please let us know! Our email address is hpc-support@umich.edu and we will be happy to answer any questions you have.

If you are are interested in the technical details of how the gateway works, this presentation from Mellanox on the Ethernet over InfiniBand (EoIB) technology used by the system should prove informative. There is no need to know anything about EoIB in order to use the service; the link is provided strictly for the curious.

Next Flux Bulk Purchase and Flux Operating Environment

Update 9/29: Final quoting took longer than expected see our post for details. Additions to the order must be placed by October 13th.

Update 8/28: The date for expressing interest was extended to Tuesday, September 8th.  After September 8th, a final pricing proposal will be sent to the vendors.

Flux will be purchasing new cores for the Standard and Large Memory Service.  Because we realize that not all funding sources allow for the purchasing of a service like Flux we provide The Flux Operating Environment (FOE).

FOE is the Flux service minus the hardware, thus a grant that provides only hardware (capital) funds is able to add nodes to Flux, where ARC-TS provides login, storage, network, support, power, etc.

More importantly grants submitted from LSA, COE, and the Medical School have no cost for placing grant nodes in FOE.  Thus the only cost to the researcher is the node and is granted dedicated access to it.

Because Flux is going to be making a larger (4000 core) purchase any faculty with such funds are invited to join in our purchase process.  If you are interested email hpc-support@umich.edu by August 28th with your node requirements.

Flux 7 2 socket nodes:

  • 128GB Ram
  • 2 x E5-2680V3 CPU (24 Total Core)
  • 3TB 7200 RPM HDD or 1TB 7200RPM HDD
  • EDR Infiniband (100Gbit ConnectX-4)
Flux 7 4 socket nodes:
  • 1024 - 2048GB Ram
  • 4x E5 class CPU (40-48 core)
  • 3TB 7200 RPM HDD
  • EDR Infiniband (100Gbit ConnectX-4)
Faculty purchasing their own via FOE can modify the drive and memory types and quantity to match their need and likely still get the bulk purchasing power by purchasing with Flux.  Researchers who wish to purchase other specialty nodes (GPU, Xeon-PHI, FPGA, Hadoop/Spark, etc.) are still encouraged to contact us.

Sunday, August 2, 2015

XSEDE15 Updates

We recently returned from the XSEDE 15 representing Michigan and learning about the new resources and features coming online at XSEDE.  What follows are our notes;  there will be a live stream webinar August 6th 10:30am for one hour.  If you have questions please attend:

Webinar: ARC-TS XSEDE[15] Faculty and Student Update
Location: http://univofmichigan.adobeconnect.com/flux/
Time: August 6th 10:30am-11:30am
Login:  (Select Guest, use uniquename)

Champions Program Update

Michigan currently participates in the Campus Champions program via the staff at ARC-TS.  There are two newer programs that faculty and students might take interest in:

Domain Champions

Domain Champions are XSEDE participants like Campus Champions but sorted by field.  These Champions are available nationally to help researchers in their fields even if they do not use XSEDE resources:

Domain Champion Institution
Data Analysis Rob Kooper University of Illinois
Finance Mao Ye University of Illinois
Molecular Dynamics Tom Cheatham University of Utah
Genomics Brian Couger Oklahoma State University
Digital Humanities Virginia Kuhn University of Southern California
Digital Humanities Michael Simeone Arizona State University
Chemistry and Material Science Sudhakar Pamidighantam Indiana University

Student Champion

The Student Champions program is a way for graduate students (preferred but not required) to get more plugged into supporting researchers in research computing.  Michigan does not currently have any student champions.  If you are interested contact ARC-TS at hpc-support@umich.edu.

New Clusters and Clouds

Many of the new XSEDE resources coming online or already available are adding virtualization capability. This ability is sometimes called cloud but can have subtle differences depending what resources you are using.  If you have questions about using any of the XSEDE resources contact ARC-TS at hpc-support@umich.edu.

NSF and XSEDE have recognized that data plays a much larger role than in the past.  Many of the resources have added persistent storage options (file space that isn't purged) as well as database hosting and other resources normally not found on HPC clusters.

Wrangler 

Wrangler is a new data focused computer and is in production.  Notable features are:
  • iRODS Service Available and persistent storage options
  • Can host long running reservations for databases and other services if needed.
  • 600TB of Flash storage directly attached. This storage can change its identity to provide different service types (GPFS, Object, HDFS, etc.).  Sustains over 4.5TB/minute terasort benchmark.

Comet

Comet is a very large traditional HPC system recently in production.  It provides over 2 petaflops of compute mostly in the form of 47,000+ cpu cores.  Notable features are:
  • Host Virtual Clusters, these are customized cluster images when researchers need to make modifications that are not possible in the traditional batch hosting environment. 
  • 36 nodes with 2x Nvidia k80 GPUs (4 total GPU dies / node)
  • SSD in each local node for fast local IO.
  • 4 nodes with 1.5TB

Bridges

Bridges is a large cluster that will support more interactive work, virtual machines, and database hosting along with traditional batch HPC processing.  Bridges is not yet in production, some notable features are:
  • Nodes with 128GB, 3TB, and 12TB of RAM
  • Reservations for long running database, web server and other services
  • Planned support for Docker containers

Jetstream

Jetstream is a cloud platform for science.  It is OpenStack based and will give researchers great control over their exact computing environment.  Jetstream is not yet in production, notable features are:
  • Libraries of VM's will be created and hosted in Atmosphere, researchers will be able to contribute their own images, or use other images already configured for their needs. 
  • Split across two national sites geographically distant

Chameleon

Chameleon is an experimental environment for large-scale cloud research. Chameleon will allow researchers to not only reconfigure the images as virtual machines but as bare metal.  Chameleon is now in production, some notable features are:
  • Geographically separated OpenStack private cloud
  • Not allocated by XSEDE but allocated in a similar way

CloudLab

CloudLab is a unique environment where researchers can deploy their own cloud to do research about clouds or on clouds.  It is in production, some notable features are:
  • Able to prototype entire cloud stacks under researcher control, or bare metal
  • Geographically distributed across three sites
  • Support multiple network types (ethernet, infiniband)
  • Supports multiple CPU types (Intel/X86, ARM64)

XSEDE 2.0

XSEDE was a 5 year proposal we are wrapping up year 4.  The XSEDE proposal doesn't actually provide any of the compute resources these are their own awards and are allocated only by the XSEDE process.  A new solicitation was extended for another 5 years and a response is currently under review by NSF.  The next generation of XSEDE aims to be even more inclusive and focus more on data intensive computing.