Tuesday, January 16, 2018

Paper summary. A Berkeley view of systems challenges for AI

This position paper from Berkeley identifies an agenda for systems research in AI for the next 10 years. The paper also serves to publicize/showcase their research, and steer interest towards these directions, which is why you really write position papers.

The paper motivates the systems agenda by discussing how systems research/development played a crucial role in fueling AI’s recent success. It says that the remarkable progress in AI has been made possible by a "perfect storm" emerging over the past two decades, bringing together: (1) massive amounts of data, (2) scalable computer and software systems, and (3) the broad accessibility of these technologies.

The rest of the paper talks about the trends in AI and how those map to their systems research agenda for AI.

Trends and challenges

The paper identifies 4 basic trends in the AI area:
  • Mission-critical AI: Design AI systems that learn continually by interacting with a dynamic environment in a timely, robust, and secure manner.
  • Personalized AI: Design AI systems that enable personalized applications and services while respecting users’ privacy and security.
  • AI across organizations: Design AI systems that can train on datasets owned by different organizations without compromising their confidentiality. (I think it was possible to simplify presentation by combining this with the Personalized AI.)
  • AI demands outpacing the Moore’s Law: Develop domain-specific architectures and distributed software systems to address the performance needs of future AI applications in the post-Moore’s Law era.
To enable progress on these fronts, the paper then identifies 9 research topics, across 3 main areas: Acting in dynamic environments, Secure AI, and AI specific architectures.

Acting in dynamic environments

R1: Continual learning

Despite Reinforcement Learning (RL)'s successes (Atari games, AlphaGo in chess and Go games), RL has not seen widescale real-world application. The paper argues that coupling advances in RL algorithms with innovations in systems design will drive new RL applications.

Research: (1) Build systems for RL that fully exploit parallelism, while allowing dynamic task graphs, providing millisecond-level latencies, and running on heterogeneous hardware under stringent deadlines. (2) Build systems that can faithfully simulate the real-world environment, as the environment changes continually and unexpectedly, and run faster than real time.

https://muratbuffalo.blogspot.com/2017/12/paper-summary-real-time-machine.html
Of course, the second part here refers to research described in "Real-Time Machine Learning: The Missing Pieces". Simulated Reality (SR) focuses on continually simulating the physical world with which the agent is interacting. Trying to simulate multiple possible futures of a physical environment in high fidelity within a couple milliseconds is a very ambitious goal. But research here can also help other fields, so this is interesting.

R2: Robust decisions

The challenges here are: (1) robust learning in the presence of noisy and adversarial feedback, and (2) robust decision-making in the presence of unforeseen and adversarial inputs.

Research: (1) Build  fine grained provenance support into AI systems to connect outcome changes (e.g., reward or state) to the data sources that caused these changes, and automatically learn causal, source-specific noise models. (2) Design API and language support for developing systems that maintain confidence intervals for decision-making, and in particular can flag unforeseen inputs.

R3: Explainable decisions

Here we are in the domain of causal inference, a field "which will be essential in many future AI applications, and one which has natural connections to diagnostics and provenance ideas in databases."

Research: Build AI systems that can support interactive diagnostic analysis, that faithfully replay past executions, and that can help to determine the features of the input that are responsible for a particular decision, possibly by replaying the decision task against past perturbed inputs. More generally, provide systems support for causal inference.

Secure AI

R4: Secure enclaves

A secure enclave is a secure execution environment—which protects the application running within from malicious code running outside.

Research: Build AI systems that leverage secure enclaves to ensure data confidentiality, user privacy and decision integrity, possibly by splitting the AI system’s code between a minimal code base running within the enclave, and code running outside the enclave. Ensure the code inside the enclave does not leak information, or compromise decision integrity.

R5: Adversarial learning

The adaptive nature of ML algorithms opens the learning systems to new categories of attacks: evasion attacks and data poisoning attacks.

Research: Build AI systems that are robust against adversarial inputs both during training and prediction (e.g., decision making), possibly by designing new machine learning models and network architectures, leveraging provenance to track down fraudulent data sources, and replaying to redo decisions after eliminating the fraudulent sources.

R6: Shared learning on confidential data

The paper observes that, despite the large volume of theoretical research, there are few practical differential privacy systems in use today, and proposes to simplify differential privacy use for real-world applications.

Research: Build AI systems that (1) can learn across multiple data sources without leaking information from a data source during training or serving, and (2) provide incentives to potentially competing organizations to share their data or models.

AI specific architectures

R7: Domain specific hardware

The paper argues that "the one path left  to continue the improvements in performance-energy-cost of processors is developing domain-specific processors." It mentions the Berkeley Firebox project, which proposes a multi-rack supercomputer that connects thousands of processor chips with thousands of DRAM chips and nonvolatile storage chips using fiber optics to provide low-latency, high-bandwidth, and long physical distance.

Research: (1) Design domain-specific hardware architectures to improve the performance and reduce power consumption of AI applications by orders of magnitude, or enhance the security of these applications. (2) Design AI software systems to take advantage of these domain-specific architectures, resource disaggregation architectures, and future non-volatile storage technologies.

R8: Composable AI systems

The paper says modularity and composition will be key to increasing development speed and adoption of AI. The paper cites the Clipper project.

Research: Design AI systems and APIs that allow the composition of models and actions in a modular and  flexible manner, and develop rich libraries of models and options using these APIs to dramatically simplify the development of AI applications.

R9: Cloud-edge systems

The paper mentions the need to repurpose code to multiple heterogeneous platforms via re-targetable software design and compiler technology. It says "To address the wide heterogeneity of edge devices and the relative difficulty of upgrading the applications running on these devices, we need new software stacks that abstract away the heterogeneity of devices by exposing the hardware capabilities to the application through common APIs."

Research: Design cloud-edge AI systems that (1) leverage the edge to reduce latency, improve safety and security, and implement intelligent data retention techniques, and (2) leverage the cloud to share data and models across edge devices, train sophisticated computation-intensive models, and take high quality decisions.

MAD questions

(The questions that led to these explanations are left as an exercise to the reader.)

1) In 2009, there was a similar position paper from Berkeley called "Above the Clouds: A Berkeley View of Cloud Computing". That paper did a very good job of summarizing, framing, and selling the cloud computing idea to the academia. But it looks like the research agenda/directions from that report didn't fare very well after 8 years---which is totally expected. Plans are useless but planning is indispensable. The areas of interest change after some time and the research adapts to it. It is impossible to tightly plan and manage exploratory research in CS areas (maybe this is different in biology and sciences areas.)

I think it is a YES for items 4, 5, 6, and partial for the rest, with very little progress in items 2 and 9. While the opportunities did not include them, the following developments have since reshaped the cloud computing landscape:

  • dominance of machine learning workloads in the cloud
  • the rise of NewSQL systems, the trend for more consistent distributed databases, and the importance of coordination/Paxos/ZooKeeper in the cloud 
  • the development of online in-memory dataflow and stream processing systems, such as Spark, which came out of Berkeley
  • the race towards finer-granularity virtualization via containers and functions as a service
  • the prominence of SLAs (mentioned only once in the paper)

So even though the AI-systems agenda from Berkeley makes a lot of sense, it will be instructive to watch how these pan out and what unexpected big AI-systems areas will open up in the coming years.

2) Stanford also released a similar position paper earlier this year, although theirs was for a limited scope/question for developing a [re]usable infrastructure for ML. Stanford's DAWN project aims to target end-to-end ML workflows, empower domain experts, and optimize end-to-end. This figure summarizes their vision for the reusable ML stack:

Of course, again, this inevitably reflects the strengths and biases of the Stanford team; they are more on the database, datascience, production side of things. It  looks like this has some commonalities with the AI-specific architectures section of the Berkeley report, but different approaches are proposed for the same questions.

3) For R2: Robust decisions, it seems like formal methods, modeling, invariant-based reasoning, can be useful, especially when concurrency control becomes an issue in distributed ML deployments.

Sunday, January 14, 2018

The Lambda and the Kappa Architectures

This article, by Jimmy Lin, looks at the Lambda and Kappa architectures, and through them considers a larger question: Can one size fit all?

The answer, it concludes, is it depends on what year you ask! The pendulum swings between the apex of one tool to rule them all, and the other apex of multiple tools for maximum efficiency. Each apex has its drawbacks: One tool leaves efficiency on the table, multiple tools spawns integration problems.

In the RDBMS world, we already saw this play out. One size RDBMS fitted all, until it couldn't anymore. Stonebraker declared "one size does not fit all", and we have seen a split to dedicated OLTP and OLAP databases connected by extract-transform-load (ETL) pipelines. But these last couple years we are seeing a lot of one size fits all "Hybrid Transactional/Analytical Processing (HTAP)" solutions being introduced again.

Lambda and Kappa

OK, back to telling the story from the Lambda and Kappa architectures perspective. What are the Lambda and Kappa architectures anyway?

Lambda, from Nathan Marz, is the multitool solution. There is a batch computing layer, and on top there is a fast serving layer. The batch layer provides the "stale" truth, in contrast, the realtime results are fast, but approximate and transient. In Twitter's case, the batch layer was the MapReduce framework, and Storm was the serving layer on top. This enabled fast response at the serving layer, but introduced an integration hell. Lambda meant everything must be written twice: once for the batch platform and again for the real-time platform.The two platforms need to be indefinitely maintained in parallel and kept in sync with respect to how each interact with other components and integrates features.

Kappa, from Jay Kreps, is the "one tool fits all" solution. The Kafka log streaming platform considers everything as a stream. Batch processing is simply streaming through historic data. Table is merely the cache of the latest value of each key in the log and the log is a record of each update to the table. Kafka streams adds the table abstraction as a first-class citizen, implemented as compacted topics. (This is of course already familiar/known to database people as incremental view maintenance.)

Kappa gives you a "one tool fits all" solution, but the drawback is it can't be as efficient as a batch solution, because it is general and needs to prioritize low-latency response to individual events than to high-throughput response to batch of events.

What about Spark and Apache Beam?

Spark considers everything as batch. Then, the online stream processing is considered as microbatch processing. So Spark is still a one tool solution. I had written earlier about Mike Franklin's talk which compared Spark and Kappa architecture.

Apache Beam provides abstractions/APIs for big data processing. It is an implementation of Google Dataflow framework, as explained in the Millwheel paper. It differentiates between event time and processing time, and uses a watermark to capture the relation between the two. Using the watermark it provides information about the completeness of observed data with respect to event times, such as 99% complete in 5 minute mark. The late arriving messages trigger a makeup procedure to amend previous results. This is of course close to the Kappa solution, because it treats everything, even batch, as stream.

I would say Naiad, TensorFlow, timely dataflow, differential dataflow are "one tool fits all" solutions, using similar dataflow concepts as in Apache Beam.

Can you have your cake and eat it too? and other MAD questions.

Here are the important claims in the paper:

  • Right now, integration is a bigger pain point, so the pendulum is now on the one-tool solution side.
  • Later, when efficiency becomes a bigger pain point, the pendulum will swing back to the multi-tool solution, again.
  • The pendulum will keep swinging back and forth because there cannot be a best of both worlds solution.

1) The paper emphasizes that there is no free lunch. But why not?
I think the argument is that a one tool solution cannot be as efficient as a batch solution, because it needs to prioritize low-latency response to individual events rather than prioritizing high-throughput response to batch of events.

Why can't the one tool solution be more refined and made more efficient? Why can't we have finely-tunable/configurable tools? Not the crude hammer, but the nanotechnology transformable tool such as the ones in the Diamond Age book by Neal Stephenson?

If we had highly parallel I/O and computation flow, would that help achieve a best of both worlds solution?

2) The paper mentions using API abstractions as a compromise solution, but quickly cautions that this will also not be able to achieve best of both worlds, because abstractions leak.

Summingbird at Twitter is an example of an API based solution: Reduced expressiveness (DAG computations) is traded of for achieving simplicity (no need to maintain separate batch and realtime implementations). Summingbird is a domain specific language (DSL) that allows queries to be automatically translated into MapReduce jobs and Storm topologies.

3) Are there analogous pendulums for other problems?
The other day, I posted a summary of Google's TFX (TensorFlow Extended) platform. It is a one tool to fit all solutions approach, like most ML approaches today. I think the reason is because integration and ease-of-development is the biggest pain point these days. The efficiency for training is addressed by having parallel training in the backend, and training is already accepted to be a batch solution. When integration/development problems are alleviated, and we start seeing very low-latency training demand for machine learning workloads, we may expect to see the pendulum to swing to multitool/specialization solutions in the space.

Another example of the pendulum thinking is in the decentralized versus centralized coordination problem. My take on this is that centralized coordination is simple and efficient, so it has a strong attraction. You go with a decentralized coordination solution only if you have a big pain point with the centralized solution, such as geographic separation induced latency. But even then hierarchical solutions or federations can get you close to best of both worlds.

The presentation of the paper

I respect Jimmy Lin's take on the subject because he has been in the trenches in his Twitter times, and he is also an academic and can evaluate intrinsic strength of the ideas abstracted away from the technologies. And I really enjoyed reading the paper in this format. This is a "big data bite" article, so it is written in a relaxed format, and manages to teach a lot in 6 pages.

However, I was worried when I read the first two paragraphs, as it gave some bad signals.  The first paragraph referred to the one tool solution as a "hammer", which is associated with crude and rough. The next paragraph said: "My high level message is simple: there is no free lunch." That is a very safe position, it may even be vacuous. And I was concerned that Jimmy Lin is refraining to take any positions. Well, it turns out, this was indeed his final take all things considered, and he took some strong positions in the article. His first rant (yes, really, he has a sidebar called "Rant") about Lambda architecture has some strong words.

Friday, January 12, 2018

Paper summary. TFX: A TensorFlow-Based Production-Scale Machine Learning Platform

This paper from Google appeared at KDD 2017 Applied Data Science track. The paper discusses Google's quality assurance extensions to their machine learning (ML) platforms, called TensorFlow Extended (TFX). (Google is not very creative with names, they should take cue from Facebook.)

TFX supports continuous training and serving pipelines and integrates best practices to achieve production-level reliability and scalability. You can argue that the paper does not have a deep research component and a novel insight/idea. But you can argue the same thing for the checklist manifesto by Atul Gowande, which nevertheless does not decrease from its effectiveness, usefulness, and impact.

On the other hand, the paper could definitely have been written much succinctly. In fact, I found this blog post by Martin Zinkevich, the last author of the paper, much easier to follow than the paper. (Are we pushed to make papers artificially obfuscated to be publication-worthy?)  This blog post on serving skew, a major topic discussed in the TFX paper, was both very succinct and accessible.

While we are on the topic of related work, the NIPS 2016 paper, "What is your ML score? A rubric for ML production systems", from a subset of the authors of the TFX paper, is also related.   A big motivation for this paper is another previous Google paper, titled: "Hidden technical debt in machine learning systems".

The paper focuses its presentation on the following components of the TFX.

Data analysis, transformation, and validation

Data analysis: This component gathers statistics over feature values: for continuous features, the statistics include quantiles, equi-width histograms, the mean and standard deviation. For discrete features they include the top-K values by frequency.

Data Transformation: This component implements a suite of data transformations to allow "feature wrangling" for model training and serving. The paper says: "Representing features in ID space often saves memory and computation time as well. Since there can be a large number (∼1–100B) of unique values per sparse feature, it is a common practice to assign unique IDs only to the most “relevant” values. The less relevant values are either dropped (i.e., no IDs assigned) or are assigned IDs from a fixed set of IDs."

Data validation: To perform validation, the component relies on a schema that provides a versioned, succinct description of the expected properties of the data.

The other day, I wrote about modeling use cases, which included data modeling. That kind of TLA+/PlusCal modeling may have applications here to design and enforce a rich/sophisticated schema, with high-level specifications of some of the main operations on the data.

Model training

This section talks about warm-starting, which is inspired by transfer learning. The idea is to first train a base network on some base dataset, then use the ‘general’ parameters from the base network to initialize the target network, and finally train the target network on the target dataset. This cuts down the training time significantly. When applying this to continuous training, TFX helps you identify a few general features of the network being trained (e.g., embeddings of sparse features). When training a new version of the network, TFX initializes (or warm-starts) the parameters corresponding to these features from the previously trained version of the network and fine tune them with the rest of the network.

I first thought whether it would be beneficial to check when warm training would be applicable/beneficial. But then I realized, why bother? ML is empirical and practical; try it and see if warm training helps, and if not, don't use it. On the other hand, if the design space becomes very large, this kind of applicability check can help save time, and guide the development process.

This section also talks about FeatureColumns which help users focus on which features to use in their machine learning model. These provide a declarative way of defining the input layer of a model.

Model evaluation and validation

A good model meets a desired prediction quality, and is safe to serve.

It turns out the "safe to serve" part is not trivial at all: "The model should not crash or cause errors in the serving system when being loaded, or when sent bad or unexpected inputs, and the model shouldn’t use too many resources (such as CPU or RAM). One specific problem we have encountered is when the model is trained using a newer version of a machine learning library than is used at serving time, resulting in a model representation that cannot be used by the serving system."

Model serving

This component aims to scale serving to varied traffic patterns. They identified interference between the request processing and model-load processing flows of the system which caused latency peaks during the interval when the system was loading a new model or a new version of an existing model. To solve this they provide a separate dedicated threadpool for model-loading operations, which reduces the peak latencies by an order of magnitude.

This section first says it is important to use a common data format for standardization, but then backtracks on that: "Non neural network (e.g., linear) models are often more data intensive than CPU intensive. For such models, data input, output, and preprocessing tend to be the bottleneck. Using a generic protocol buffer parser proved to be inefficient. To resolve this, a specialized protocol buffer parser was built based on profiles of various real data distributions in multiple parsing configurations. Lazy parsing was employed, including skipping complete parts of the input protocol buffer that the configuration specified as unnecessary. The application of the specialized protocol buffer parser resulted in a speedup of 2-5 times on benchmarked datasets."

In NIPS 2017, Google had a more detailed paper on the Tensorflow serving layer.

Case Study: Google Play

One of the first deployments of TFX is the recommender system for the Google Play mobile app store. TFX is used for the Google Play recommender system, whose goal is to recommend relevant Android apps to the Play app users. Wow, talk about scale: Google Play has over one billion active users and over one million apps.

This part was very interesting and is a testament to the usefulness of TFX:
"The data validation and analysis component helped in discovering a harmful training-serving feature skew. By comparing the statistics of serving logs and training data on the same day, Google Play discovered a few features that were always missing from the logs, but always present in training. The results of an online A/B experiment showed that removing this skew improved the app install rate on the main landing page of the app store by 2%."

MAD questions

1) The paper provides best practices for validating the sanity of ML pipelines, in order to avoid the Garbage In Garbage Out (GIGO) syndrome. How much of these best practices is likely to change over the years? I can already see a paper coming in the next couple years, titled: "One size does not fit all for machine learning".

In fact, this thought send me down a rabbit hole, where I read about Apache Beam, Google Dataflow, and then the Lambda versus Kappa architecture. Very interesting work, which I will summarize soon.

2) Why do research papers not have a MAD questions section?
(I am not picking on this paper.) I guess the research papers have to claim authority, and provide a sense of everything is under control. Pointing out unclosed-loops and open-ended questions may give a bad impression for the paper. The future work sections often come as one paragraph at the end of the paper, and play it safe. I don't think it should be that way though. More relaxed venues, such as HOT-X and workshops can provide a venue for papers that raise questions.

Wednesday, January 10, 2018

Why you should use modeling [with TLA+/PlusCal]

I recently gave a two day seminar on "debugging your designs with TLA+/PlusCal" at Dell. So I wanted to write some of the motivation for modeling and debugging your models while this is still fresh in my mind.

You need modeling


No, not that kind of modeling! Actually the naming clash is not accidental after all: fashion designers need models to test/showcase their designs.

You need modeling because:

  • Failing to plan is planning to fail 
  • Everything is a distributed system
  • The corner cases ... they are so many
  • Do it for the development process
  • Being smart does not scale

Failing to plan is planning to fail

This is from the paper, "Use of formal methods at Amazon Web Services, 2014".
"Before launching any complex service, we need to reach extremely high confidence that the core of the system is correct. We have found that the standard verification techniques in industry (deep design reviews, code reviews, static code analysis, stress testing, fault-injection testing, etc.) are necessary but not sufficient.
Human fallibility means that some of the more subtle, dangerous bugs turn out to be errors in design; the code faithfully implements the intended design, but the design fails to correctly handle a particular ‘rare’ scenario. We have found that testing the code is inadequate as a method to find subtle errors in design."

Modeling shows you how sloppy your "design" is. You think you got the design right, but for a complex service worth its salt you almost always get it wrong (more on this below). You won't find what you got wrong unless you model your design and validate it. And you want to find that out early on, without the sunken investment of correctly implementing your flawed design. Otherwise, even after Jepsen shows you that you screwed up, you are already too much invested into this flawed design, and you try to patch it, and you end up with a slow and bloated system.


Everything is a distributed system

There's just no getting around it: You're building a distributed system.

In this process, you are very likely to make an assumption that will bite you back, such as one hop is faster than two hops, zero hops is faster than one hop, and the network is reliable. What you assumed was an atomic block of execution will be violated because another process will execute concurrently and change the system state in a way you didn't anticipate. And don't even get me started on faults, they are in a league of their own, they will collude with your program actions to screw you up.

The corner cases, they are so many

In the 2004, "Consensus on Transaction Commit" paper, Lamport and Gray mentioned that they could not find a correct three-phase commit protocol in database textbooks/papers because each one fails to account for a corner case.
"Three-Phase Commit protocols ... have been proposed, and a few have been implemented [3, 4, 19]. They have usually attempted to “fix” the Two-Phase Commit protocol by choosing another TM if the first TM fails. However, we know of none that provides a complete algorithm proven to satisfy a clearly stated correctness condition. For example, the discussion of non-blocking commit in the classic text of Bernstein, Hadzilacos, and Goodman [3] fails to explain what a process should do if it receives messages from two different processes, both claiming to be the current TM. Guaranteeing that this situation cannot arise is a problem that is as difficult as implementing a transaction commit protocol."


Do it for the development process

Modeling is good for achieving clarity of thinking and communication. Lamport used TLA+ without a model checker from 1990s to 2010. Even without the model checker, he still found value in modeling. It made him nail down the specifications and communicate them with others precisely. When you write things down precisely, it enables your brain to move on and do more with it. Clarity begets more clarity. Focus begets more focus.

Once you abstract away the clutter, come up with a precise model in Pluscal, and validate it with exhaustive model-checking, you can focus on the essence of the problem, and see alternative ways to implement it. And through this development process where you refine/implement the design, the PlusCal model will help a lot for communicating the design with other engineers, and check which implementations would work for each subsystem.



Being smart does not scale; exhaustive model checking comes to the rescue

After you get the design down in pseudocode (but not in TLA+ or PlusCal), couldn't that work for invariant-based design? Can't you just check each action in your pseudocode to see if it preserves your safety/invariant conditions, and be done with this? There is no need to model with TLA+/PlusCal and model-check, right?

Sigh. Did you read the above carefully? Everything is a distributed system, and there are many corner cases. A sloppy pseudocode is not going to cut it. And don't trust your deduction abilities for proving that each action preserves the safety conditions you identify. That works for simple toy examples, but for complicated examples you need to do a lot of extra mental inferencing/linking of concepts/being creative which is very error-prone.

Consider the hygienic philosophers example I discussed earlier. Your invariant will talk about being in critical section, but the actions talk about ... forks ... per edges ... over a dynamic priority graph. So doing that mental mapping would be very hard. Instead TLA+/PlusCal model checker gets you covered with exhaustive checking on the breadth first traversal of all possible permutations of action scheduling and show you if there is any possible execution (including the fault actions you model) that can violate your invariants.

This is why I so happily adopted TLA+/PlusCal for my distributed systems class.  Even for sophisticated algorithms, I can refer the students to the TLA+/PlusCal model to practice and play with the algorithm, so they can internalize what is going on.

Conclusion

This already got long. So in a later post, I will write more about the modeling/abstracting process, the mathematical/invariant-based thinking, and about some shortcomings of modeling.

Drop me a message if you are interested in having me over for a training/talk!

MAD questions

This section is here because of my New Year's commitment.

1) In addition to protocol/system modeling, workflow modeling, business logic modeling, TLA+/PlusCal has also been used for data modeling. Are there any other uses? If you have interesting use cases, please let me know as I am curious. 

2) Actually, I am aware of another use case for PlusCal model-checking, but it seems to be mostly for toy examples so far. You define actors/operators that can act at any order, and you challenge the model checker and claim that concurrent operation of those actors/operators cannot ever satisfy a condition that you like to happen. And the model checker, being the jerk it is, responds with a trace showing that it is possible, and you adopt this as the solution. The die hard puzzle is an example of this. This approach is useful for scheduling, maybe even cluster job scheduling under concerns and even anticipating some statical failures and still hitting the deadline. But I am not aware of any real-world use of this. Is this used in practice?

3) Is there a bad time for modeling? When should you not model?
Sometimes you may need to go bottom up to figure out the domain/problems first. After you have an idea of the domain, then you can start to model and go top down. I think it would not make sense to be opinionated and making modeling calls, before you are informed about the domain and issues. I think modeling is just thinking made more rigorous, and you should get the ball rolling on the thinking/understanding part a bit first before attempting to model.

Monday, January 8, 2018

Salute to Prof. Mohamed Gouda: Elegance in computing

A couple months ago, I attended a special half-day workshop organized honoring Prof. Mohamed Gouda's contributions to computer science, and particularly the self-stabilizing systems community.

Mohamed is the Mike A. Myers Centennial Professor at University of Texas at Austin. He has been at Austin Texas since 1980, for almost 40 years. His research contributions to the distributed systems has been phenomenal (borrowing a word Mohamed likes to use for things that excite him.) I am proud that Mohamed is my academic grandfather; he was the PhD advisor of my PhD advisor, Prof. Anish Arora. I wrote about "how to find your advisor" in my previous post, I hope elegance/rigor from Mohamed and Anish rubbed off on me a bit.

At the workshop, there were about 10 talks technical in nature, but at the end of the talks, each speaker mentioned how their research and career has been enriched by Mohamed's contributions/help.

I talked about my technical report on "Does the cloud need stabilizing?", and at the end I took some time to mention what I learned from Mohamed, and how I benefited being around him, albeit for a limited time.

The most significant things I learned from Mohamed is not from what he taught on the board or at the conferences, but from how he acted. Actions speak louder than words.

Always be passionate about research

When I was a PhD student, I had time to spend with Mohamed at self-stabilizing systems workshops and later as part of our work on the DARPA NEST wireless sensor networks project.

What impressed me most was Mohamed's passion for research. At that time he was involved with research 30+ years already, but he was as fresh  and enthusiastic about research problems as PhD students. When someone mentioned a research problem, you could see his ears prick up and his eyes literally sparkle. And this was often after a long tiring day of research talks/meetings.

There has been many days in my academic career where I was feeling tired and down, and I motivated myself by remembering Mohamed's passion about research, and asking myself what excuse do I have to be bored/unexcited/unmotivated. (I will be amiss if I don't add my advisor Anish Arora and postdoc supervisor Nancy Lynch to this mention. I try to imitate their genuine excitement about research.)

Never turn down collaboration opportunities

Mohamed has always been very positive and full of energy and humor. He has been graceful and gentle with everyone.

He jokes that he never says no to any collaboration opportunities after he regretted to say no to collaborating at that time on the newly starting field of "ad hoc networks" (I am most likely misremembering the name/anecdote but was that with Charles Perkins), saying that "ad hoc networks" does not sound like a respectable name for a research area :-)

He helped build the self-stabilizing systems community, as many speakers credited him at the workshop. If you help people, you help yourself as well.

Clarify, Simplify, and Generalize

On the technical side of things, I learned from Mohamed the art of abstraction. He really has a knack for simplifying things to their essentials and contribute there and generalize from there. Mohamed is famous for saying, he does not trust any distributed algorithm that has more than 3 actions. You can see this elegance/simplicity at work in his classical papers. He was colleagues with Dijkstra for many years, who was a grandmaster of the art of abstraction.

In a research talk you could find Mohamed listening carefully, asking a couple trivial seeming definition questions, and towards the end of the talk making very insightful comment/observation about the work. After knowing how Mohamed works, I can see he is simplifying and mapping the work to his mental framework and generalizing and contributing from there.

Mohamed's book on "elements of network protocol design" thought me about networks, after the networking class I took made me hate networking. (I wrote about it here earlier.) The book also showed how you abstract away details and simplify things to their essence and focus that essence.

MAD questions

(This section is here because of my new year's commitment.)

1) Mohamed has always been an exceptionally good storyteller as well. Several successful researchers I know are great storytellers. I wonder if this is an important feature for successful researchers. Could this be correlated with the researchers' ability to frame/present their work better?

2) Shouldn't we create more opportunities for our graduate students to spend quality time (or hanging around time -- or maybe they are the same) with such energizing and inspiring researchers? Conferences these days look far away from fulfilling that goal, maybe very small workshops or summer schools are better places for that. What are other ways to cultivate that interaction? Or to make this scale, how can we capture the thought-processes of these people better? The research papers are finished products and fail miserably at capturing the thought processes of the researchers.

3) I called it the "art of abstraction", because I don't think that there are well-defined rules to abstraction. What is the right level of abstraction to treat things at? Should you abstracts things as far as most people accuse you to go too far?

In a quote attributed to Einstein, he says: "Everything should be made as simple as possible, but not simpler." Could this pass as a helpful heuristic? What is the limit of possible?

Saturday, January 6, 2018

How to find your advisor

I had tweeted this earlier about "Rocking your PhD":



It is that simple. This is actually a concise version of a longer advice I provided earlier.

Since I haven't talked about it before, I like to now write some suggestions on finding an advisor.

How to find your advisor

Ask around and get advice from senior PhD students in the department about faculty as potential advisors.

In the first semester of your graduate studies, take 3-4 classes you are interested in. This provides a good opportunity to meet and impress your prospective advisor. If there is a class project, go overboard and exceed expectations. Try to improve on an algorithm mentioned in the class, and discuss this with the prospective advisor.

Before you commit with an advisor, make sure it is a good match. As you take a class from your advisor, see if you can work together during the semester on a small entry-level problem/project. Because after you commit working with an advisor, it is messy/costly to switch advisors, since there is a sunken cost of investment from both parties.

You want to find a hardworking advisor, so you can work hard as well. Assistant professors have a lot of incentive to work hard to get tenure. On the other hand, many Associate and Full Professors also work hard, and may provide additional benefits: more experience and an established research group where you can get mentoring from senior students. In any case, you should check the track record of the faculty, giving emphasis on recent publications.

Try to find an advisor that you can learn a good toolkit, e.g., formal methods, graph theory, probability theory, or machine learning theory. Application domains come and go following trend cycles, on the other hand, toolkits are fundamental and can be applied to application domains as needed. To give a personal example, after learning formal methods and reasoning about distributed algorithms as my toolkit, I was able to apply them to wireless sensor networks domain and later to cloud computing domain, distributed databases domain, and hopefully soon to the distributed machine learning domain.

To balance the previous advise, make sure the advisor is also doing work on the practical side of things. Faculty jobs are very sparse, and after your PhD, you should have the option of joining industrial labs/positions.

MAD questions

Is a good advisor enough?
I think it is necessary but not sufficient.

Can you succeed with a bad advisor?
First, define "succeed". If it is a good faculty job or a prominent industrial research position, I don't know examples of this. It may be possible, if there is good support from other graduate students/faculty.

Are there bad advisors?
There are definitely advisors that don't care. Or that are too busy and don't help enough.

Is PhD largely an apprenticeship?
Yes, I think so. You learn by diffusion your advisor's taste in finding problems and devising solutions. In CS there are distinct categories of theory, systems, metrics, engineering, and algorithms people. I think the advisor imprints on the student not just on the research category but also perspective on things, such as openness to new things, having a home conference versus being more promiscuous, etc.

Is there a personality match thing?
Even though this is a professional relationship, personality clashes between the advisor and student may impede progress, or sometimes even lead to a blow up. A student that is very stubborn/uncoachable is not good, but the student shouldn't just follow or wait for instructions either. I like the student to push back and defend what he/she thinks is true, but also to be open-minded/receptive to alternative opinions/perspectives.

I knew of a faculty who asked students for a Myer-Briggs test (the one where we all get INTJ category). He was a very smart professor, so probably he found utility in that. There is also the big-five test. Personality styles may be useful to gauge how the student can fare with different advisement styles: socratic method, sink or swim method, trainer method, coaching method, and manager method. As for me, I don't care to know personality types of my PhD students, I just want to see if we can work well, discuss well, be productive and grow together.

Related links

I enjoy yakking about academic advice.

Thursday, January 4, 2018

Logical clocks and Vector clocks modeling in TLA+/PlusCal

In a distributed system, there is no shared state, counter, or any other kind of global clock.  So we can not implicitly associate an event with its time, because one node may have a different clock than another. Time synchronization is not easy to achieve, and failures complicate things.

It turns out we care about time because of its utility in ordering of the events. Using this observation, in 1978, Leslie Lamport offered a time-free definition for "happens before": Event A happens before event B (denoted as A hb B) if and only if A can causally affect B.

In the context of distributed systems, A hb B iff
1. A and B are on the same node and A is earlier in computation than B
2. A is the send of a message and B is the receive event for that message
3. There is some third event C, which A hb C, and C hb B.

This also suggest the definition for "concurrent" relation. Events A and B are concurrent iff $\neg( A ~hb~ B) \land \neg( B ~hb~ A)$

To capture the hb relation, logical clocks and vector clocks are proposed. I modeled these in PlusCal, and I wanted to share that in this post.

Logical clocks

A logical clock is a way of assigning number to an event. Assume each node $j$ has a logical clock, $lc.j$, that assigns a number to any event in that node. $lc.j$ can simply be a counter.

We want the following property from $lc$: $A ~hb~ B \Rightarrow lc.A < lc.B$
Note that the converse might not hold.

The algorithm is simple. For a send event at $j$: The clock is incremented and this updated value is the timestamp of the new event and of the message being sent.

For a receive event at $j$: The clock is updated by taking the maximum of the current clock at $j$ and the timestamp of the message (time of the corresponding send event). This maximum must be incremented to ensure that the new clock value is strictly larger than both previous events.

Here is the PlusCal modeling of the algorithm.


I use STOP to limit the scope of model checking, often the interesting behavior would present itself in a limited time, we don't need to go to logical clock values of 100s. Another way to do this would be to  go to the model checker advanced options tab, and use depth-first execution, and limit depth. 

I don't have interesting invariants to check for logical clocks, because I don't know of a good way to create maintain events with timestamps and also capture the events "causality relationships" and crosscheck that with the event timestamps. Maybe if I had created a time graph, and run the LC model over the time graph that might work.

Instead, I use a bait invariant to get a trace that violates the invariant, so I can observe that the model indeed operates as I intended it to. BaitInv == (\A k \in Procs: lc[k] < STOP /\ msg[k]<STOP) \* violated!

Vector clocks

With logical clocks, we have $A ~hb~ B \Rightarrow lc.A < lc.B$, and we can not determine whether $A ~hb~ B$ simply by examining their timestamps. For example, for $lc.A_j=3$ and $lc.B_k=9$, what can we say about the relationship between these two events?  We know that $\neg (B ~hb~ A)$ (by the contrapositive of the property above). But do we know that $A ~hb~ B$?

The answer is no. A and B may be events on different processes and B's timestamp might have reached to 9 without any message-chain (causality link) from A. Or not. The process B occurs might indeed have received a message from A or following event, and that might have drove up the lc at that process. The problem with logical clocks is these chains are not captured, because there is only one scalar to keep the counter/clock.

In order to keep a tab on which values were last learned from which processes, we need to extend the scalar counter to be a vector of size N, the number of processes.

The algorithm is then similar to logical clocks algorithm, except for maintaining a vector of "logical clocks".

Send event at $j$: The clock entry of $j$ in the vector clock at $j$ is incremented and the resulting vector is assigned as the timestamp of the event and message being sent.

Receive event at $j$: The vector clock is updated by taking the pair-wise maximum of the current vector and the timestamp of the received message. Then clock entry for $j$ in the vector clock must be incremented to ensure that the new clock value is strictly larger than both previous events.

Here is the PlusCal model of vector clocks algorithm.


The PairMax function is neat for defining the pairwise maximum of two vectors of size N. Otherwise, you will notice the model is similar to that of LC.

Again, I am unable to check with respect to events. But here I use this invariant to check that the VC structural property holds. It says, at any point in execution process $k$'s knowledge of its own clock vc[k][k] is greater than or equal to process $l$'s knowledge of $k$'s clock, vc[l][k]. This is because $l$ can only learn of $k$'s clock from a communication chain originating at $k$ and cannot advance $k$'s clock itself.
VCOK == (\A k,l \in Procs: vc[k][k] >= vc[l][k])

MAD questions

(This section is here due to my new year's resolution.)

Of course there is also matrix clocks. While vector clocks maintain what a process knows of the clocks of other processes, matrix clocks maintain what a process knows of what other processes know of other processes as well. "A matrix clock maintains a vector of the vector clocks for each communicating host. Every time a message is exchanged, the sending host sends not only what it knows about the global state of time, but also the state of time that it received from other hosts. This allows establishing a lower bound on what other hosts know, and is useful in applications such as checkpointing and garbage collection." (Frankly I am not aware of matrix clocks being used in practice.)

There is also version vectors, an application of the vector clocks idea to distributed replica maintenance: instead of having N to be the number of clients/processes, N becomes the number of replicas, which is a smaller number, 3-5. Version vectors enable causality tracking among data replicas and are a basic mechanism for optimistic replication.

I wonder if we could have version matrices, that extends version vectors  analogous to how matrix clocks extend vector clocks. Would that have any utility for distributed replicas?

Related links

Earlier I had provided a PlusCal model of hybrid logical clocks that augment logical clocks with physical clocks.

Here is some previous discussion/context about why I started assigning TLA+/PlusCal modeling projects in distributed systems classes.

There is a vibrant Google Groups forum for TLA+ : https://groups.google.com/forum/#!forum/tlaplus
Clicking on label "tla" at the end of the post you can reach all my posts about TLA+

Tuesday, January 2, 2018

Mad questions

I am a very curious natured person. Every child starts asking lots of questions around 3-4, but according to my mom, I took that to another level constantly asking "but why?" and drove her crazy. On the other hand, I believe I owe my being curious to my mom. She was an elementary school teacher (a damn good one), and was instrumental in my development. She was (and still is) a very curious person, and she taught me how to ask more and better questions. For example, while traveling, she would notice different plants and would ask me why the landscape is different here? And we would make guesses.

The Turkish education system was not big on asking questions (these days it is waaaay waaaaay worse). Since the lazy path is to memorize and regurgitate answers, that is what it demanded from the students. But I think my questioning skills mostly survived. Among my friends, I was famous for replying questions with questions of my own, and if not, my answer was often "I don't know", because the question led me to ask more questions internally and I really don't know what I think about it yet.

For my PhD studies, I got a boost from my advisor Prof. Anish Arora, because as any great researcher he knew how to ask good questions and I learned from his example. Questions are the fuel needed for doing research. You can argue that the hard part of research is to figure out the right questions to ask!

Questions are a good way to test your sanity and mindfulness as well. Questioning helps you think meta and jump out of the system. Therefore I try to incorporate questioning in my studying as I wrote earlier: "For studying, writing, or researching, my process is also to take on the task in units of 30 minutes. I use the pomodoro technique and customized it over time a lot. I am right now on my 3rd version of pomodoro process. I will write about my recent pomodoro setup later on. It involves detaching and going meta for 4 minutes in the beginning and end of pomodoro. In these slots, I step back and force myself to think meta: Is this the right approach? What else I can try? Am I making good progress? Can this be made transformative?"

Unfortunately I feel like I got more into a routine with those "trying to ask questions", and I think the questions often come out as shallow/predictable. Those are still useful---at least I am double checking my sanity/approach--- but not really transformative. But yeah, I cannot be sure whether I have gone compliant/docile recently, can I? Maybe I am losing my edge, because I have been in the system long enough. Maybe I start seeing myself as experienced now, and it is affecting me.

So my New Year's resolution is to ask more/better/crazier questions.

To ensure that, I will try to install a system. One part of that system is to ask at least a couple questions in each blog post. Another part is to add a new tag to my blog called "mad questions".

Mad questions are questions beyond normal/expected questions, but are out of the left field questions. Asking mad questions will require a lot of effort and will drive me out of my comfort zone. I will try to give this my best shot, and I think the blogging format is a good medium to try this.

The questions for this post

1. Where does the question mark come from? It turns out there is no definite answer. I like its shape, though. It is like a hook.

2. Wikipedia says that some experts say animals can't ask questions:
"Enculturated apes Kanzi, Washoe, Sarah and a few others who underwent extensive language training programs (with the use of gestures and other visual forms of communications) successfully learned to answer quite complex questions and requests (including question words "who" what", "where"), although so far they failed to learn how to ask questions themselves. For example, David and Anne Premack wrote: "Though she [Sarah] understood the question, she did not herself ask any questions — unlike the child who asks interminable questions, such as What that? Who making noise? When Daddy come home? Me go Granny's house? Where puppy? Sarah never delayed the departure of her trainer after her lessons by asking where the trainer was going, when she was returning, or anything else".[10] The ability to ask questions is often assessed in relation to comprehension of syntactic structures. It is widely accepted, that the first questions are asked by humans during their early infancy, at the pre-syntactic, one word stage of language development, with the use of question intonation.[11]"
That is interesting, but also not very believable to me. It is known that dogs, monkeys, even cats can be surprised by illusion tricks. So, why is being surprised not acceptable as a question?
Maybe we don't understand the apes' questioning. If the ape phrases a question as surprise/frustration, or stating its desire to confirm/learn that thing, shouldn't that may be count as a question? What is the criteria used here? Taking this further, could hypothetical aliens say humans are unable to ask "questions" and therefore not intelligent?

3. What makes a question a "good" question? Can you stumble upon an insightful question by chance? What are some heuristics that help with asking good questions? Does a good question involve a perspective-change? (In science, it often seems to be so. When it is very hard to directly confront the issue, the skilled researcher looks at the problem sideways, formulates the question with a new perspective, and then finding a solution becomes feasible.)

4. When I ask a lot of questions in a blog post, how does that make you feel? Charged? Tired? Irritated?

Friday, December 29, 2017

Thursday, December 28, 2017

Paper summary. Real-Time Machine Learning: The Missing Pieces

This paper, dated March 11, 2017 on arxiv, is from UB Berkeley.  Here is Prof. Michael Jordan's Strata Hadoop conference talk on this.

The paper first motivates the need for real-time machine learning. For this it mentions in-situ reinforcement learning (RL) that closes the loop by taking actions that affect the sensed environment. (The second paragraph mentions that such RL can be trained more feasibly by using simulated/virtual environments: by first trying multiple actions [potentially in parallel] to see their affect in simulation before interacting with the real world. Again this requires real-time performance as the simulation should be performed faster than real-time interaction.)


Based on this application scenario, here are their desired requirement from the ML platform.
R1: low latency
R2: high throughput
R3: dynamic task creation (RL primitives such as Monte Carlo tree search may generate new tasks during execution)
R4: heterogeneous tasks (tasks would have widely different execution times and resource requirements)
R5: arbitrary dataflow dependencies (BSP doesn't cut it)
R6: transparent fault-tolerance
R7: debuggability and profiling

The platform architecture

The paper does not give a name to the platform, but the talk calls it Ray. Ray allows arbitrary functions to be specified as remotely executable tasks, with dataflow dependencies between them. Ray uses imperative programming and does not support symbolic computation graphs, as far as I can see. The talk mentions that programming is done in Python. So, at this point Ray is more like a set of Python libraries paired with Redis database for keeping control state and with Spark RDD support for maintaining object-store as shared memory.


Two principal architectural features are a centralized control plane and a hybrid scheduler. The centralized control state is held by Redis, a replicated key-value store. It looks like the control state does not have any control logic in it, it is just passive storage. (So TensorFlow's variables also qualify as control state.)  The hybrid scheduler idea aims to help with providing low-latency. Workers submit tasks to their local schedulers which decide to either assign the tasks to other workers on the same physical node or to “spill over” the tasks to a global scheduler. Global schedulers can then assign tasks to local schedulers based on global information about resource availability. Neither the logically centralized control state nor the two-level hierarchy scheduling are new innovative concepts.

The tasks creation is left totally to the application developer. Any task can create new tasks without blocking on their completion, but this creates a dependency from the caller to the callee. Moreover, Ray uses the dataflow execution model in which tasks become available for execution if and only if their dependencies have finished executing. The combination of this unrestrained task creation with hybrid scheduling provides a lot of rope to the developer to hang himself.

Tasks are called with split-phase asynchronous execution model. When you call a task, the task returns "future", which just denotes acknowledgement, but the task will later call you back with the result when its computation is completed. The caller may potentially call "get" on the future to block until the callee finishes execution. Ray also has a "wait" primitive to time out from waiting on straggler tasks. Again it is the developer's responsibility to figure out how to use this correctly.

Conclusions 

I think the platform is weak on "ease of use". Ray is so minimal that it is unclear if we couldn't have gotten the same level of support from using a systems programming language with concurrency primitives and thread safety, such as Rust. Rust uses the actor model and is very suitable for building a dataflow execution application, as has been demonstrated by rewriting Naiad on Rust recently.

While Ray aims real-time machine learning, it doesn't have a way for shedding load. To provide load shedding support, it is possible to adopt the SEDA architecture, so the system does not grind to a halt when it is presented with too many tasks at once.

Tuesday, December 26, 2017

TensorFlow-Serving: Flexible, High-Performance ML Serving

This paper by Google appeared at NIPS 2017. The paper presents a system/framework to serve machine learning (ML) models.

The paper gives a nice motivation for why there is a need for productizing model-serving using a reusable, flexible, and extendable framework. ML serving infrastructure were mostly ad-hoc non-reusable solutions, e.g. "just put the models in a BigTable, and write a simple server that loads from there and handles RPC requests to the models."
However, those solutions quickly get complicated and intractable as they add support for:
+ model versioning (for model updates with a rollback option)
+ multiple models (for experimentation via A/B testing)
+ ways to prevent latency spikes for other models or versions concurrently serving, and
+ asynchronous batch scheduling with cross-model interleaving (for using GPUs and TPUs).

This work reminded me of the Facebook Configerator. It solves the configuration management/deployment problem but for ML models.
"What is even more surprising than daily Facebook code deployment is this: Facebook's various configurations are changed even more frequently, currently thousands of times a day. And hold fast: every single engineer can make live configuration changes! ... Discipline sets you free. By being disciplined about the deployment process, by having built the configerator, Facebook lowers the risks for deployments and can give freedom to its developers to deploy frequently."
The paper is also related to the "TFX: A TensorFlow-Based Production-Scale Machine Learning Platform" paper. While that one focused on the data management and model training side of things, this one focuses more on model serving.

The model serving framework

The TensorFlow-Serving framework, the paper presents can be used in any of these ways:
(1) a C++ library consisting of APIs and modules from which to construct an ML server,
(2) an assemblage of the library modules into a canonical server binary, and
(3) a hosted service.

The framework can serve TensorFlow models as well as other types of ML models. The libraries, and an instance of the binary, are provided as open-source.


The TensorFlow Serving library

The library has two parts: (1) lifecycle management modules that decide which models to load into memory, sequence the loading/unloading of specific versions, and offer reference-counted access to them; (2) modules that service RPC requests to carry out inference using loaded models, with optional cross-request batching.

In the model lifecycle management, the paper mentions "the canary and rollback" usecase. By default the Source aspires the latest (largest numbered) version of each /servable/, but you can override it and go with a canary/rollback policy. Here after the newest version is deployed, the system continues to send all prediction request traffic to the (now) second-newest version, while also teeing a sample of the traffic to the newest version to enable a comparison of their predictions. Once there is enough confidence in the newest version, the user would then switch to aspiring only that version and unloads the second-newest one. If a flaw is detected with the current primary serving version (which was not caught via canary), the user can request to rollback to aspiring a specific older version.

While optimizing for fast inference is mentioned as a theme, the paper does not elaborate on this. It just says that the framework can support using TPUs/GPUs by performing inter-request batching similar to the one in https://arxiv.org/pdf/1705.07860.pdf.

While the paper mentions that the framework can serve even lookup tables that encode feature transformations, there is no explanation whether any special optimization is (or can be) employed for improving the performance for serving different types of models. For example, for the nonparametric and sparse models that are popularly used in recommendation systems, would the framework provide optimizations for faster lookup/inference?

The TensorFlow serving hosted service

With the hosted service, Google likes to capture the users who has money and wants a no-fuss solution. To use the hosted service, the user just uploads her model to it and it gets served. It looks like the hosted service is also integrated with a datacenter resource management and scheduler as well. The hosted service also offers features such as validating model quality before serving a new version, or logging inferences to catch training/serving skew bugs.

In the figure above, the synchronizer is the master that commands and monitors the workers serving the jobs. The router component is interesting; the paper mentions it uses a hedged backup requests to mitigate latency spikes. This means the request is sent to multiple job servers and the earliest response is used; effective protection against stragglers.

Sunday, December 24, 2017

WPaxos: a wide area network Paxos protocol (Part 1)

Paxos is great for solving the fault-tolerant coordination problem in a datacenter. But for cross datacenter coordination (which is needed for distributed databases, filesystems, configuration management, etc.), it hits  the WAN latency barrier. The multi-decree Paxos (Multi-Paxos) algorithm, implemented in variants like Raft and Zab, relies on electing a distinguished leader to serialize updates and hence cannot deal with write-intensive scenarios across the wide area networks (WAN).

An alternative is the leaderless Paxos algorithms. Generalized Paxos and EPaxos employ opportunistic leaders for non-interfering commands and are able to reduce 3 message delays to 2, and allow concurrent leaders to commit. But the fast agreement incurs the cost of a much larger quorum named fast-quorum (3/4ths of the nodes) and hits the WAN latency barrier as well.

Another alternative (as employed in Google Spanner) is to use multiple Paxos groups with partitioned data/object space. In order to provide flexibility/versatility, this partitioned approach employs an external service to manage configuration and move data/objects across groups.

Recently we introduced another alternative with our WPaxos work. WPaxos is a simple and pure Paxos protocol that provides flexible, performant, and fault-tolerant coordination over the WAN. WPaxos is lean and obviates the need for another service/protocol for data/object relocation. Since WPaxos provides the same Paxos safety guarantees to the face of concurrency, asynchrony, and faults, its performance can be tuned orthogonally and aggressively by tuning a couple parameters. In this post, jointly written with Ailidani Ailijiang and Aleksey Charapko, we give an overview of WPaxos. For details and performance evaluation, we refer you to our paper.

Flexible flexible quorums

WPaxos leverages on the flexible quorum idea that weaken the "all quorums should intersect" assertion in Paxos to instead "quorums from different phases should intersect". That means, it is possible to use any phase-1 quorum (Q1) that intersect with any phase-2 quorum (Q2), instead of using majority quorums. A clever instantiation of this is the grid quorum system. Let Q1 be a row in the grid, and Q2 be a column. Since any row and column always intersects at one node, any $q1 \in Q1$ is guaranteed to intersect with any $q2 \in Q2$.

Our insight is to notice that if we deploy each column in one of the geo-distributed regions/zones, we can achieve a really fast Paxos phase-2 since the Q2 quorum is within the same LAN. Note that, when failures and leader changes are rare, the phase-2 (where the leader tells the acceptors to decide values) occurs much more frequent than phase-1 (where a new leader is elected). So it makes sense to improve the performance by reducing Q2 latency at the expense of making the infrequently used Q1 slower.


Before going further with describing the protocol, let's elaborate on why the grid-shaped deployment is safe for using in Paxos. In majority quorums, any 2 phase-1 quorums intersect, which means 2 nodes cannot be elected as a leader for the same ballot. However, this is not necessary! In the grid quorum, node i can be elected as leader by the first row for some ballot b, while node j is elected by the second row for a b'>b. In order to make a decision, Node i has to start phase-2 and satisfy a phase-2 (column) quorum. Since the Q2 always include one node from the second row, that node rejects node i's phase-2 message, preventing any conflicting decisions to be made by i and j.

Unfortunately, this grid quorum deployment cannot tolerate a single zone failure. The WPaxos default quorum derives from the grid quorum layout, and picks f+1 (majority) nodes in a zone of 2f+1 nodes to tolerate f node failures. In addition, to tolerate F zone failures within Z zones, Q1 is selected from Z-F zones and Q2 from F+1 zones. For example, in the following figure of 12 nodes, while a majority quorum may tolerate 5 failures, the WPaxos default quorum can tolerate one row plus one column, in total of 6 failures.

Here is a TLA+ specification for the Q1 and Q2 quorum systems used in WPaxos. Q1 and Q2 columns do not need to be rigid rows and columns; the first responding node set that satisfy a Q1 or Q2 definition suffice.

The multi-leader federation

WPaxos uses multiple leader nodes to concurrently serve a set of objects in the system. The nodes may steal leadership of objects from each other using phase-1 of Paxos executed over a Q1 quorum. Then the node commits the updates to those objects over its corresponding Q2 quorums, and can execute phase-2 multiple times until another node steals those objects.

To mitigate the dueling leaders problem, where two nodes constantly propose a higher ballot number than the opponent, each object gets its own commit log with separate ballot and slot numbers. This also means that WPaxos provides  per-object linearizability.

Extensions

Since the basic WPaxos is a simple and pure flavor of Paxos, it enjoys its safety guarantees. Since the basic WPaxos is also very flexible and offers knobs for tunability, we can extend the protocol to improve its performance easily.

The locality adaptive object stealing optimization moderates the trigger-happy object stealing in WPaxos based on a migration policy. The intuition behind the policy is to move objects to a zone where the clients will benefit the most, since moving objects frequently is expensive. By using an invited-stealing approach, the permission to steal is handed to the zone that has the most requests for the objects in some period of time.

The replication set optimization allows a configurable replication factor where a subset of Q2 quorum is selected to send phase-2 messages, instead of broadcasting to entire system. The size of replication set ranges from F+1 zones up to the total number of Z zones. This provides a trade-off between communication overhead and a more predictable latency, since the replication zone may not always be the fastest to reply.

Transactions can be implemented on top of WPaxos entirely within the protocol, and avoids the need for integrating an extra 2-phase-commit service. The node that initiates a transaction operation, first steals all objects needed for that transaction via multiple Q1 accesses. This is done in increasing order of the objects IDs to avoid deadlock and livelock. Then the node commits the transaction in phase-2 in seperate object logs, and collating/serializing the logs together by comparing the slot number of common objects in the transactions. (We have not implemented transactions in WPaxos yet!)

Dynamic reconfiguration is achieved similar to Raft in two steps, where current configuration C = <Q1, Q2>, the new configuration C’ = <Q1’, Q2’>. First, a union of both old and new configuration C+C’ is proposed and committed by the quorums combined. Then the leader may propose the new config C’ and activate after commit in Q2’. WPaxos further reduces the two steps into one in special cases where the reconfiguration operation is limited to add/remove one row or column at a time.

Give this a Go

We model checked our WPaxos specification in TLA+/PlusCal to verify its consistency properties.

We also implemented WPaxos in Go to evaluate its performance. Please give it a whirl, and let us know what you think.

Friday, December 22, 2017

Retroscope: Retrospective cut-monitoring of distributed systems (part 3)

This post continues the discussion on monitoring distributed systems with Retroscope. Here we focus on cut monitoring approach Retroscope uses. (This post is jointly written with Aleksey Charapko and Ailidani Ailijiang.)

Retroscope is a monitoring system for exploring global/nonlocal state history of a distributed system. It differs from other monitoring tools due to the way it inspects the system state. While request tracers inspect the system by following the trace of a request (i.e. request r in the figure), Retroscope performs cut monitoring and examines the system at consistent global cuts, observing the state across many machines and requests. It moves along the system history and scans a progression of states one cut at a time, checking cut  Ts1 and then Ts2 and so on.

Retroscope’s cut monitoring approach is complementary to the request tracing solutions, and brings a number of advantages. First, by exposing the nonlocal state, Retroscope enables users to examine nonlocal properties of distributed applications. Using Retroscope you can inspect state distributed across many machines and can reason about the execution of a complex distributed application through invariant checking. Furthermore, by sifting through many past nonlocal states, you can perform root-cause analysis and use the across-node context to diagnose race conditions, nonlocal state inconsistencies, and nonlocal invariant violations.

To illustrate some of these benefits, we use Retroscope and the Retroscope Query Language (RQL) to study the data staleness of replica nodes in a ZooKeeper cluster. Staleness is a non-local property that cannot be easily observed by other monitoring techniques. To our surprise, we found that even a normally operating cluster can have a large staleness. In one of our observations in AWS EC2, some ZooKeeper replicas were lagging by as much as 22 versions behind the rest of the cluster as we discuss at the end of this post.

Feasibility of Cut Monitoring

Ok, if cut monitoring is so useful why was this not done before? The answer is cut monitoring was not very feasible. A standard way to do cut monitoring is with vector clocks (VC), but VC do not scale well for large systems due to its O(N) space complexity. Moreover, using VC results in identifying excess number of concurrent cuts for a given point, many of which are false positives that do not occur in actual system execution.

Retroscope employs hybrid logical clocks (HLC) and a scalable stream processing architecture to provide a feasible end-to-end solution for cut monitoring. The NTP-synchronized physical clock component of HLC shrinks the number of consistent cuts at a given point to only 1. (It may be argued that this reduces the theoretical coverage compared to VC, but this a good tradeoff to take to improve performance and avoid false-positives resulting from VC.) Using HLC also allows us to construct consistent cuts without the need to coordinate across nodes. Finally, the HLC size is constant, and this reduces the communication overheads. We talked about these advantages in Part 1.

To achieve a scalable implementation of Retroscope, we leveraged Apache Ignite for stream processing, computation, and storage. We arranged the log ingestion in a way to minimize data movement and to improve data locality and achieve maximal parallelism when searching. We had covered these issues in Part 2. 

In our prototype, Retroscope processing deployed on one quad-core server was processing over 150,000 consistent cuts per second. Horizontal scalability is one of the strongholds of Retroscope’s architecture. Adding more compute power, allows Retroscope to redistribute the tasks evenly across all available servers and achieve a nearly perfect speedup (93% going from 4 to 8 servers).


Ok, now back to the ZooKeeper case study to show the advantages cut monitoring approach.

The ZooKeeper Case Study 

Users interact with Retroscope via the declarative Retroscope Query Language (RQL). The users only need to specify the nonlocal predicates to search for, and leave the rest for the system to figure out.

To illustrate Retroscope and RQL, we considered the replica staleness monitoring in Apache ZooKeeper a. In ZooKeeper, a client can read data from any single replica, and if the replica is not fully up-to-date, the client will read stale data. The staleness is a nonlocal property, because it is defined by considering the states of other replicas at that same point in time.  Using a simple RQL query, we can find the cuts that violate normal (less than 2 versions) staleness behavior of a cluster:
SELECT r1 FROM zklog
WHEN Max(r1) - Min (r1) > 1 ;
In this query, r1 is the version of a node’s state. The system retrospectively looks at past application states and search for the ones that satisfy this staleness predicate.

We observed many cuts having the staleness problem, with a few larger spike (up to 22 version stale!) that captured our attention. To investigate the causes for the excessive staleness cases, we need to inspect the message exchange in the system at those points. Here is the query we use for that:
SELECT r1, sentCount, recvCount, diff, staleness
FROM zklog
COMPUTE
GLOBAL diff
AND GLOBAL staleness
AND (staleness := Max(r1) - Min (r1))
AND (diff:= NodeSum(sentCount) - NodeSum(recvCount))
AT TIME t1 TO t2

In this query we included another nonlocal property: the number of messages in transit between nodes. The query scans through past cuts around the time of observed staleness we identified earlier. This allows us to visualize both staleness and the number of messages being in-transit between nodes in the cluster. We see that the staleness spikes at the same time as the number of “in-flight” messages increases.

The number of messages “stuck” in the network tells us still only a little about the communication patterns in the cluster. To gain more insight in the message exchanges, we look at the in-flight messages more rigorously and examine the sets of sent and received messages at each node with this query:
SELECT sentM, recvM, inFlight, r1, staleness
FROM zklog
COMPUTE
GLOBAL staleness
AND (staleness := Max(r1) - Min(r1))
AND GLOBAL inFlight
AND (inFlight := Flatten(sentM) \ Flatten(recvM))
AT TIME x TO y

We run this query with a custom query processor that visualizes the results as a “heat-map” of message exchange. Here is an example of how messages were flowing in the system right before and at the peak of the staleness event. The deeper blue color represents greater number of messages being in the network between nodes. We see more messages in-flight in both directions between nodes #3 (leader) and #4, suggesting that staleness is caused by messages being stuck in-transit between these nodes for longer than usual. This indicates a possibility of a momentary millibottleneck in the network between the node #3 and node #4.

Our Retroscope implementation is available as open source project on GitHub. We invite you to use the tool and drop us a note about your use cases.