Building Africa’s Talking Data Infrastructure

Building a data infrastructure on spark, hdfs and mesos

Ian Juma
Africa's Talking

--

Ian JumaSian Meoli

the deluge of data — africa

Introduction

Our data pipeline consists of Spark, Mesos, Kafka and HDFS.

This blogpost highlights our motivation for using Mesos and Spark and the challenges we faced along the way. We also share some insights into the deployment of Spark jobs on a Mesos cluster. Last but not least we outline our plans for future work on the data platform.

mesos.apache.org

Why Mesos?

Our main motivations for using Mesos were to make use of containers and increase our utilisation rates.

We plan to containerize every service running on the data platform. Mesos plays well with container technologies and allows us to use newer technologies such as Docker and the Mesos Universal Containerizer to greatly simplify application deployments. Developers can write, package and ship code without having to interact with a server directly.

Mesos also allows us to experiment with “cool” features such as resource over-subscription to maximise utilisation of the underlying hardware to a great degree. With a well-written framework we can use this feature to creatively run more jobs on the cluster. Batch jobs can be run on over-subscribed resources and killed when high priority jobs need to be run. This presents a new and interesting way to start more tasks without necessarily having to increase the compute resources available. We would of course require a way to snapshot the job before it is is killed.

Challenges we faced setting up a Mesos cluster

We had to perform a significant number of manual setup tasks that made this process especially painful. This included the generation of certificates and editing of configuration settings. We believe Puppet can be used to automate a large part of these manual tasks.

Debugging of communication issues across the cluster (using tcpdump and ssldump) also proved to be quite involving. A large number of the issues that arose when using SSL certificates can be alleviated by using an internal DNS server, such as BIND.

Other challenges we faced related to the logging behaviour of Mesos. For example, the only way to set verbose logging levels is by toggling an API endpoint. Apart from this, you can only set flags to toggle between INFO, WARNING and ERROR levels. This adds to the difficulty in discovering issues in such a distributed system.

Why Spark?

We picked Spark because of the volume of data we intend to process and its speed — the in-memory store provided by Spark can greatly improve performance over Hadoop. Spark further proved to be a great choice for MapReduce jobs because we only consume Hadoop’s HDFS.

Spark also provides comprehensive machine learning libraries allowing us build models and extend functionality on our data pipeline. Spark streaming options additionally provide us with easy to use connections with existing pipeline tools such as Kafka. A Kafka-Spark streaming connector allows us to extend and build services faster since there is a ready integration for these two critical components of our pipeline.

Yet another great reason for us to choose Spark was its native support for Python and Scala out of the box. This allows us to quickly build prototypes in Python and move to production with Scala.

Overview of Spark running on a Mesos cluster

The Spark framework (for Machine Learning jobs) running on a Mesos cluster launches a driver to manage a specific job. The framework registers with Mesos and then launches a driver. The driver in turn breaks the job into tasks that it assigns to executors. These tasks are launched on registered slaves that meet particular criteria (such as GPU and CPU attributes). All the executors return the results of their computation to the driver.

The Spark driver is defined in code as a Spark context. The context specifies operational parameters such as the amount of memory allocated to the driver as well as the workers (executors). The driver resource requirements are largely determined by the amount of memory required by the job at hand..

Spark on Mesos Deployment Modes

Spark provides two methods for deployment on a Mesos cluster — cluster mode and client mode.

Cluster mode is especially is it allows for jobs to be launched from a remote station that does not have a publicly accessible IP.

In cluster mode, a Spark dispatcher must first be registered as a framework with the Mesos cluster. Code running on a remote machine can then be used to launch a Spark context which then communicates with the Mesos master via the dispatcher. A request is then made to the Mesos master for resources necessary to launch a driver. (This means that the driver can run on any machine in the Mesos cluster.)

While the spark-mesos-dispatcher does not provide authentication, it is safe to use as it’s only role is to place the driver on an appropriate slave in the cluster. Drivers still have to be authenticated before they can start a task; the driver is launched as a different framework and once registered can spin off tasks.

These tasks can also run on any machine within the cluster (provided that no constraints are given). All dependencies and data required by the Spark job must be available to/on all slaves in the Mesos cluster. The driver can be monitored from the master and restarted if necessary.

Workers return their results to the driver when the task is complete. These results are available from the Mesos UI but can also be written to a file or database.

In client mode on the other hand, there is no need for a dispatcher. The code is run and the context launched in turn launching the driver directly on the (remote) machine running the code. After successful authentication with the Mesos master, the driver makes a request for resources to launch executors. This is similar to the driver operations in cluster mode.

The driver also launches a Netty HTTP server that distributes all data and dependencies (for the Spark job) to the workers in the cluster. This is a great convenience as one does not have to worry about distributing Spark job dependencies or data to every slave within the cluster.

When workers complete the computation, they return results to the driver. These results are readily available from the machine on which the driver was launched.

In both client and cluster modes, the driver exits once all tasks are complete and the context ceases to exist. It is important to note that if an application exits due to an exception, the driver will also exit and the Spark context may cease to exist, depending on how the context handles exceptions.

Common issues with Spark and deploy modes

Cluster mode

We encountered a significant number of challenges running Spark in cluster mode, not least of which was the scarcity of documentation on how this mode works and how to set it up.

There was no clear explanation about the role of the dispatcher or even the exact way to run a job in cluster mode. We spent hours, if not days, trying to run a Spark job in cluster mode, receiving an ambiguous error, only to discover that the issue was with the order in which we were passing parameters to the spark-submit command.

As the Spark driver can be launched on any slave in the cluster, the Spark binary must be pre-downloaded on each and every machine within the Mesos cluster.

Once the driver was running we also struggled to find information that would give insight on the status of the driver. We eventually discovered that the dispatcher when registered with the Mesos cluster launches a web UI. This provided us with some information on the status of drivers in the Mesos cluster, their IDs and the slaves on which they were running.

Further, the documentation only briefly states that the results of Spark jobs (run in cluster mode) are available from the Mesos UI. It took quite a while to figure out where driver information could be found in the Mesos UI and then track down the results. The documentation does not clearly state whether results streamed back to the driver are available from the Spark context. Further, there are no guidelines or recommendations on how best to store or access results from the driver in cluster mode.

Yet another significant challenge, was the need to make any dependencies or data required by the Spark jobs available to all slaves in the cluster. This means that all dependencies or data must be copied to a location accessible by all slaves in the cluster. This is not too bad if you have an S3 bucket or HDFS setup but quickly becomes a lot more complicated when you have to place dependencies or data on each and every slave.

Client mode

We found that client mode was much better documented and easier to get started with. Unfortunately, it required that we either run our code from a machine within the cluster or open up our Mesos master for direct communication over the public Internet.

The first option is highly undesirable as it requires anyone who wants to run a Spark job to have access to the Mesos cluster backend. The second option while introducing security risks further requires that the remote machine on which the driver is launched have a publicly reachable IP address.

Another major problem with this approach is that the Spark binary as well as Mesos libraries must be available on the machine on which the driver is launched. This introduces a lengthy setup process before one can launch a job from a remote machine.

Best of both worlds with Docker

Faced with the two deployment modes above, we felt dissatisfied. We were determined to do more so we decided to use Docker to get the best of both worlds.

We started off by creating a Docker registry within our cluster. We then built a Docker image that had the necessary Spark binary and Mesos libraries. Once the image was built, we pushed it to the registry and then used Marathon to launch the Docker container within the Mesos cluster to run a Spark job in client mode.

We preferred client mode, in this case, as it had fewer complications when deploying Spark jobs on a Mesos cluster. Most importantly, we were able to run Spark jobs in client mode without compromising on security. This approach also makes it much, much easier for someone to write and deploy Spark jobs directly from their PC. The use of Marathon to launch the Docker container within the Mesos cluster greatly helps us better utilise resources within the cluster when launching the driver.

However, while Docker addresses a large number of the challenges we faced with deploying Spark jobs in a Mesos cluster, there is still a long way to go. The deployment process can still be refined further; a lot more can be done to simplify and secure the end-to-end process.

Other challenges we faced running Spark on Mesos

Debugging issues with Spark using Mesos logs proved to be a great challenge. The logs provided by Mesos proved to be difficult to locate as each executor is assigned a random ID. The Mesos UI, however, vastly improved things by collecting all the executor logs belonging to one Spark job in a single place. This helped to trace and debug a significant number of issues. The major disadvantage with this approach was that it required us to open up the Mesos UI to the public Internet.

To get around this challenge, we decided to set up a centralised logging system to help in collecting disparate executor logs. These are then fed into Graylog and indexed by Elasticsearch. We hope that this will help us discover issues much faster and provide us with better insight into the operation of our Spark cluster.

The biggest pain by far was the endless poking at firewall (rules) within the cluster to open up communication links between executors and drivers. This was further exacerbated in cluster mode where the driver could be launched on any slave within the cluster.

A brief security overview

Security is a major concern in any IT infrastructure deployment. Unfortunately, we found that there is not enough salient documentation on how to secure a Mesos cluster. Consequently, this was by far the most tiresome and involving task when setting up the Mesos cluster. There are two issues to consider when securing the cluster — authentication of slaves and frameworks and encryption of any data in flight within the cluster.

Kerberos provides a great way to centrally manage authentication for the various components in the cluster — Mesos slaves/masters and Spark jobs (frameworks). The initial setup can however be quite daunting for those new to Kerberos.

All communication within the cluster should always be over a private network. With this in place, we initially thought firewall rules would be sufficient to manage/secure inter-cluster communication. We soon discovered this was inadequate and we therefore additionally set up SSL encryption for inter-cluster communication. We faced a considerable number of challenges getting this to work especially because we were using private IP addresses. As mentioned previously, a lot of these challenges are easily resolved by setting up a private DNS server to reliably resolve hostnames to the private IP addresses.

Future work to improve our data platform

Moving forward, we plan to set up an API to allow our internal developer teams to write jobs, package them as Docker containers and submit them through a repository. Alternatively, we could also set up a git hook and monitor code changes and build containers on the fly and re-submit these jobs on dev-request. The main goal being to simplify deployment of data jobs on our platform by offering easy to consume APIs.

Docker also greatly simplifies application/service discovery. Using mesos-dns or etcd, services can communicate with each other using names rather than IPs. We plan to take advantage of this in future.

Additionally, we hope to improve security within our cluster by implementing network isolation. Tools such as calico make it possible to get full network isolation of services as well as fine-grained network policy control in the cluster. Fine-grained network policies would ensure that each task is only able to communicate with tasks within the same overlay network guaranteeing security.

With network isolation in place, there will be complete isolation — process, disk and network — for each job. Each job can then run within its own unique environment while sharing resources with other jobs on the same machine. Further isolation could also be implemented to ensure that staging tasks are run separate from production tasks. Staging tasks could be differentiated with a staging-* tag before the task ID.

Conclusion

Mesos and Spark are great tools for building a data platform.

Mesos allows us to make better use of resources. However, there still remain important questions around resource over-subscription such as how to handle ‘over-subscribed’ tasks, how to snapshot a job before killing it, how to define job priorities in a multi-tenant cluster.

Spark on the other hand, allows us to process large volumes of data at a good speed. Technologies such as Docker greatly contribute to the usability of these two tools and allow for the development of additional features.

We found that observability poses a huge challenge for such a project; proper monitoring and logging is essential for debugging and identifying causes of failure. Properly securing such a data platform is no trivial task either.

A lot of extensions/features need to be added on to Mesos/Spark to have a fully functional data platform but we believe there are answers out there!

References

https://spark.apache.org/docs/latest

--

--

Scala, Pythonista and some Go lang- Interested in distributed systems, concurrency and functional languages. https://t.co/bAIjRS08nK