The Twitch Statistics Pipeline, Part 2

This is part two in our three part series on our statistics pipeline. The first part described our pipeline. In this part we’ll go into detail concerning particular design decisions. The third part will cover the history of analytics at Twitch. Head over to our team page to get to know more about what we do.

Also, join the discussion on Hacker News!


When bringing together our Data Science team we established a clear goal:

Make our co-workers want to give us their data

A desirable “3rd party system” (i.e. non-mission critical and not built by the core engineering team) must be rock solid and fail predictably. Those that violate these criteria tend not to be used for long. With this in mind we created our “trifecta rubric”:
  • Ease of integration
  • Simplicity
  • Scale
Each of our key architectural decisions are motivated by at least two of these principles.

HTTP.GET(base64(jsonify(data)))

As detailed in the first post, our clients send base64 encoded json data using HTTP GETs. At the core of this decision was ease of integration; by choosing to stay Mixpanel compatible we were able to use our existing Mixpanel client libraries. This permitted our engineering team to continue to use the clients they trust and removed the need for us to build and maintain a new set of clients in various languages. The use of GETs is driven by the same-origin policy that browsers enforce. We’ve a far wider range of clients than browsers and as such it is on our short term roadmap to support POSTs. As CORS becomes more widely supported we’ll embrace POSTs there also.

Storage & Querying

Simplicity and scale form the basis of our choice for RedShift. We needed a system that could store many terabytes of data while permitting fast and simple fetching of that data. Any time you talk about huge volumes of data Hadoop and Cassandra are natural contenders. RedShift was the standout option for us on account of it being PostgreSQL based and, as such, having great client support along with the ability to support arbitrary joins. When coupled with the COPY command for bulk inserts (we get 100k inserts a second, and haven’t begun optimizing yet) RedShift is a tectonic level play by Amazon; our buddies over at Pinterest and Airbnb agree and both have great write-ups on how they use it.

Core Pipeline Implementation

The decision for how to transform data from our clients into rows in RedShift was influenced by all three elements of our rubric. We knew we would need something fast with good concurrency primitives, and support for the libraries that we’d want to use. We ended up with Clojure and Golang as our main candidates. Java and C ruled themselves out for not having simple concurrency primitives. Clojure, on the other hand, has great concurrency primitives and can interop with Java very elegantly.

In the end Golang won out over Clojure due to institutional knowledge; we’ve a number of engineers at Twitch that are deeply into Golang. It is not unusual to stumble into a lunchtime conversation covering the pros and cons of the new precise garbage collector and the various patches to Golang we need due to page size allocation changes introduced as part of Go 1.2. We have reached this point through being an early adopter of Golang and running it under some incredibly high loads, oftentimes discovering aspects of the runtime that were interesting to the Golang core development team. Golang’s approach to concurrency is beautiful and the decision to build statically linked binaries makes deployment very easy. As a result almost all green field projects at Twitch are in Golang. I can honestly say, if you want to work with Go there is nowhere better than Twitch.

Given we’re hosted on Amazon we could also have opted to use Data Pipeline or Kinesis. Kinesis didn’t exist when we first started work on this project, though with it being so highly Java orientated we may have still opted not to use it. One of the things we’re super excited about is open sourcing all of our work, as we firmly believe that making good product decisions shouldn’t be something that is bound by available technology. We’re currently sweeping through code replacing AWS credentials with IAM roles from the UserData API (we recently patched goamz to fix role timeout issues); when we’re done we’ll release everything! Expect more on this in a couple of weeks.

Hosting

Personally one of the most significant technology decisions I think we’ve made is to host our pipeline on AWS. We have historically bought our own hardware and co-located it at various datacenters. This works wonders for managing the cost of a world wide video CDN; our ops and video engineers are constantly finding ways to squeeze more out of our current set of machines while ensuring our next generation of machines can achieve the best QoS for our users. When it came to our data pipeline we really had no idea what we would be bound by; we initially opted for two beefy boxes in our SFO datacenter. The first weekend taking significant traffic we exhausted our available RAM and our pipeline ground to a halt. Faced with the option of finding significant memory improvements or provision more capacity we chose to move to the cloud, as it would remove the burden we’d put on our ops team by iterating on our stack. When it came to cloud partners we had a couple of options: Digital Ocean, Rackspace or AWS. Since we were already using S3 and RedShift, AWS made the most sense.

Conclusion

Our decisions to make things simple, scalable and easy to integrate has permitted us to build a very efficient pipeline. If you share our passion for making smart decisions and being a member of a high leverage team then let us know as we’re currently looking for our 5th team member.

The Twitch Statistics Pipeline

This is part one of a three part series covering all aspects of our data analysis. This part describes our pipeline. The second part will go into detail concerning particular design decisions. The third part covers this history of analytics at Twitch. If at any point you think to yourself you'd like to know more about what we do you should check out our team page.

In the beginning we logged all of our data to Mixpanel. As our user base grew we logged an increasing number of events, this growth in data points vastly outstripped our user growth to the point where we were sending Mixpanel billions of events per week. As our growth continued we have needed to make better decisions based upon joining different events to gain really deep insight into our users' behaviour. Count based metrics, such as those provided by Mixpanel or statsd, are insufficient when it comes to this and given the ever increasing cost of Mixpanel we brought together a team to work on storing our event data in an economical fashion while providing the tools to query the data without these downsides.

    Today we have replaced the near-real-time nature of Mixpanel with our own bulk ingest solution which gives achieves our goal of <24 hour data latency. Currently our latency is ~5 hours and we'll soon be working on a "fast-layer" on top of our current solution to provide us with the ability to have dashboards displaying real time stats.

    We formed the team at the tail end of 2013 and began work on the project in earnest at the start of 2014. The new pipeline, currently in its third revision, is hosted in AWS where we use Asgard to coordinate deploys and elastic scaling. The pipeline is composed of the following core components:

    1. The client sending us the stats
    2. An edge server
    3. A processor
    4. An ingest coordinator
    5. Storage

    The Client

    Naturally the pipeline starts with users on client applications. We have a wide variety of clients: Xbox One, iOS, our website, our Flash video player, various Android ports, etc. Since these clients all have mature and robust HTTP clients we use HTTP to send events to our stats pipeline edge servers.

    These clients know when to emit events that are pertinent to our product as a whole. The most common stat which the clients log is "minute-watched"; each of our video players emit this for each minute of video that is watched by a viewer, along with a bundle of JSON metadata describing the client's state. The most critical stat is "buffer-empty" which is sent when video stutters. When these happen we know someone had a poor viewing experience - our video and networking teams base critical scaling decisions on these events as part of our never-ending pursuit of great global video QoS.

    The Edge Server

    Our clients send base64 encoded JSON objects to our edge server using HTTP GETs. The edge server consists of an nginx instance and a logging daemon written in Golang. The nginx instance merely passes the HTTP request to the logging daemon, which in turn logs the HTTP request to a file. Every 100mb of logged data, we upload the log to S3 and emit a SQS message with data about the file to a queue which the processor listens on.

    An example data packet from a client.

    These edge servers sit behind an ELB and are configured as an Auto Scale Group. We scale up and down based on the overall traffic level we see coming in to the cluster. Our primary concern is to ensure that we do not lose too much data in the event of a machine being taken down by something at AWS [1].

    The logging daemon, which we'll soon open source, can be configured to rotate log files on two criteria:

    1. creation time
    2. file size

    This permits us to minimize our risk of data loss even during low tide, though with 100mb rotations we never rotate on creation time. When the file is rotated it signals an in-process uploader to upload the file to an S3 bucket and publishes a corresponding SQS message.

    The Processor

    The processor is a daemon, written in Golang, which listens to the SQS queue published to from the edge layer.

    For each inbound file the processor unpacks the data and, assuming it is valid, extracts the data from it and writes it into a target file. One file for each type of event we have. This processor uses the same logging library that the edge server uses, however it rotates on either 5 hours or 1GB of data. The processor will continue to write to the same output file until it is rotated out of the way, which once again triggers an S3 upload and an accompanying SQS message.

    The target file is a gzipped TSV [2] with a line per processed packet. Subsequent runs, in between rotations, will write to the same file; gzip supports this form of appending of data to a file (go ahead and try catting two gzip files into one and then zcatting them!). The ordering of the columns in the TSV is critical since it must match the ordering of the columns of the target table. The ordering is informed by a "schema" server we have. This will be discussed in the Storage section of this post.

    An example conversion. Not a great one :)

    The Ingest Coordinator

    As with the processing layer this layer receives SQS messages and triggers imports into our storage layer. We use Redshift to store our data, from within Redshift you have numerous ways to import data; we use the COPY command since it can read from S3, and supports reading in gzipped files. Our decision to store files in a TSV stems from wanting to be Redshift COPY compatible.

    The ingester itself is an area that we've room for significant improvement, notably Redshift supports manifest files which can dramatically increase the performance of imports. Currently it takes us around 3 hours a day to ingest 24 hours of data, as we scale up we'll need to improve this rate, the manifest file option permits each node to be responsible for a file versus our current strategy of one file at a time per COPY command.

    An example COPY

    Storage

    We have a four dw1.xlarge node Redshift cluster in which we've a table per event type. Each table has a column per property. Redshift permits you to specify a distribution key which is used during imports to ensure data which you intend to join on is stored on the same node - this minimizes leader node work and network traffic thusly improving join performance massively. In our case we distribute on a semi-permanent [3] client identifier and sort on timestamp.

    Our schema server coordinates the processor layer and the storage layer. Since TSVs have no header row the order of the entries is critical and must match the target table. The processor layer caches a version of the current known schema for a few minutes at a time and as it processes data it emits the TSV with data in the correct column order. Each time a new event is to be logged we use the schema server to issue the necessary CREATE TABLE commands in Redshift. If a new property is to be added to an event the schema server issues an ALTER TABLE to add the corresponding column. Our ingester uses the FILLRECORD modifier to the COPY command which prevents these ALTERs from breaking imports. This structure allows us to rapidly log new and richer data which supports making rapid decisions by our product teams. It is a short term aim of ours to make the the schema server really bulletproof so that we do not need to be part of this flow.

    Conclusion

    This design allows us to reach ~100,000 inserts a second. Previous versions of the pipeline capped out far below that. We've some ambitious plans for the future. Most notably building an ETL flow which permits us to create a richer data set by combining our current event data into something which tells a more holistic story of the people using our product. An easy to grok example of data that we're interested in is which initial referrers turn into the most loyal users and how is that loyalty impacted by QoS; while it sounds simple there is the very interesting task of aggregating loyalty per referrer while also computing initial referrer - do we take the entire data set and run it through a Map Reduce job? How do we do that? What intermediate data do we store that prevents this from being an agonizingly long query in the not too distant future? What happens if we load data from the past that may not have been loaded due to an error and that changes the initial referrer for a range of users?

    If you're interested in knowing more or have a strong idea around where we can bound our problems please feel free to contact us, details can be found on our team page. Additionally, feel free to comment/ask us questions on the Hacker News thread.


    [0] Ideally we'd like CLOUD_ENVIRONMENT to be 'integration' but Asgard makes that difficult; they effectively map CLOUD_ENVIRONMENT to an AWS account - in this case meaning we'd need a different AWS account for our integration. This doesn't jive with the AWS management console so well. Using CLOUD_DEV_PHASE felt like a reasonable compromise here.

    [1] We're currently only running in one AWS AZ, near term work will move us up to running edges on the east and west cost of the USA as well as one in the EU and possibly Sao Paulo.

    [2] We're looking into alternatives for this. Redshift understands this format natively, but it is not convenient when running map reduce jobs since gzip cannot be split easily.

    [3] It is actually quite stunning how non-permanent permanent storage is. For example some consoles do not permit applications the ability to store data, so instead you must compute data in a consistent way. Of course there is also the iOS UUID situation.

    Appendix: Noteworthy mention: Asgard and Cloud Configuration

    At the heart of our operation is Asgard which, when fully configured, initializes user data on your EC2 instances as they come up. We use the excellent goamz library from CrowdMob to source this user data and make decisions at run time as to where data should flow to. For example our integration test servers have CLOUD_DEV_PHASE set which we use to override CLOUD_ENVIRONMENT [0] which configures which SQS queues and S3 buckets we use at each stage of the pipeline.

    Spade is our internal name for the pipeline. It flies against my religion of naming things  what they do!

    What I Learned When The Internet Read My Post

    Having a post of mine re-tweeted by a co-worker was a very interesting situation. First, it took place when emotions were running high. Second, it was incorrectly advertised as a technical post. Were it a technical post it would have included technical details about the surge, there would have been graphs showing previous highs versus what our systems were taking due to TPP. Rather, it was a post laying out thoughts about how and why our chat protocol consistently lends to being innovated upon. I learned a lot and I hope I built more empathy between us and our users.

    I did learn one crucial lesson: write defensively.

    Defensive writing, similar to defensive driving, is the process of trying to spot negative situations before they arise. In terms of writing that usually involves using simple words, short sentences as well as qualifying most (all?) statements to ensure there is no ambiguity in their meaning. This can easily result in a massive increase in the number of words needed to convey a point and, given how quickly misinformation can spread these days, makes good writing an art I have even more appreciation of.

    Writing defensively isn't something that comes naturally to me. I wonder how many people outside of the higher levels of academia actually get experience writing for an audience that want to use what you write against you. An audience looking for holes. An audience looking for a partial sentence that can be taken out of context to mean something very different. I should have re-read my post from the point of view of a range of readers. First time through: product visionary. Second time through: technical fellow. Third time through: passionate, and frustrated, twitch community participants. Each time through I should have edited it appropriately; once done, I should have re-read it again from each of those points of view.

    The fact that my lack of defensive writing caused such a firestorm doesn't sit well with me. My co-workers are rock stars and my loose statements being used against their superhuman abilities as developers really left its mark on me. I will ensure my writing does not provide ammo to those that wish to disparage their efforts, and help educate those of you that are interested in what we do along the way.


    My thanks go out to Spencer Nelson for playing editor on this piece.

    Chat Scalability Improvements

    My post yesterday was purely about the chat product as a concept. In it I made a loose comment around "resisting improving chat", Izlsnizzt completely hit the nail on the head when he asked "are you ready for that to be taken out of context" :)

    While I was busy thinking about our stats system and musing about how great chat is (as a product), two of our engineers were continuing their work on scaling our chat system. For some context: this is a constant ongoing process. That is to say, it is one of our core arcs of work, it is omnipresent and we invest a good deal in monitoring, performance profiling, and development to make it a better system overall. When a phenomenon like TPP comes along which increases load on the system many fold it gives us a great opportunity to discover and fix new issues and issues that only raise their head under super high load.

    Yesterday we found our redis servers were pegged at 100% CPU - we've ramped up more of them and we're ensuring our monitoring picks up this class of issue in the future. Additionally we found some configurations that were not optimal in our chat stack itself. These two things have had a massive impact:

    <3 those lovely smooth lines :) ... Also you guys seem to think it is helping too:

    Why does "Twitch Plays Pokemon" Work?

    EDIT 1: In v1 of this post some of my wording was misinterpreted to mean we don't care about scaling chat - we do! In fact late last night we released some improvements that you can read about in the follow up piece Chat Scalability Improvements.

    EDIT 2: Added a new third paragraph, bumped original third down to fourth with slight edits - notably removing that statement :) Also edited "Edit 1".

    I joined Justin.tv Inc three years ago, I was the first developer to work on our new gaming vertical "Xarth"; 6 weeks of hard graft by Jacob and I, and a last minute name change, and Twitch was born. I'm constantly humbled by how big we've become, as part of trying to deal with the shock I've spent a while thinking about why certain parts of our product work. "Twitch Plays Pokemon" is the perfect segue to discuss one of the core parts of our experience: "Live Chat".

    Chat has been a constant scaling PITA. We are pretty sure we're one of largest IRC networks in the world (at time of writing, a low tide, we have nearly 500k concurrent users). It is also one of the strongest reasons to use, and return, to twitch. Our broadcasters are the reason we have a community, but they'd not be able to interact with their viewers were it not for chat, thusly I think of it as our community fulcrum. But why would something so simple be so successful? There are far more compelling features on sites that have far less success than our chat feature does. I believe the answer is fairly simple: text chat is a lowest common denominator technology. It is a platform that permits others to build on top of it. Similar to how twitter succeeded due to its API and the clients that third parties developed for it, our chat succeeds because IRC is incredibly easy to integrate with and our custom bindings are few and far between - thus you get the same experience on our client as you do in your IRC client. We've had people build auto moderation tools, poll tools, random chatter selection tools (for things like: "enter a pool to play with this pro if you subscribe to their channel"), and now massively multiplayer pokemon! Who'd have thunk it, eh?

    TPP puts a very new type of stress on our system: massive numbers of inbound messages. Up until now the major scaling challenge has been delivering one message to many viewers of a channel. The decision making as to whether we should deliver a message is fairly heavy; we need to make decisions like "is this person a subscriber", "have they been banned", etc, etc. With such a large increase in inbound message volumne we're seeing parts of that pipeline struggle (caches, db access, etc).

    For use cases like TPP there are a number of potential solutions that we've posited in the past. For example one such idea was "kappa aggregation" where, instead of delivering the billions of kappas we deliver, we deliver one message every N-seconds which contains the count of kappas. While many of the potential improvements would improve the overall scalability of the system it would impact the creativity-potential due to the current stream of bits being re-written into something with more structure; thus limiting creativity.The correct thing to do, and the thing we're constantly doing, is to double down on the scalability of our chat system.

    If you'd like to help out think about joining our team :) Hit me up on twitter ( @ossareh ) if you've any questions about what working here is like.

    Permanent Objects

    I'm currently reading "Fight Club", it is a brilliant book. It is a rare gem in that the film and the book, while different, are both absolutely astonishingly good. One of the core concepts that the narrator covers is the sense of "permanent objects" - these are objects that you have that result in you checking them off your list of things to buy in life. When you buy a sofa you think you'll "never buy a sofa again", you may replace your sofa, you may upgrade your sofa - but at this point in your life you have a sofa and you're very unlikely to ever reach a point where you do not have a sofa. It is a permanent object. Today I sold a permanent object. I sold my car. For me a car is a symbol of independence and freedom. Those two aspects of it do not redeem it though. A large part of my stress in the past few years has been related to my car; looking for accommodation which has parking, it getting bumped into, general wear and tear and maintenance visits, etc. This is gone now. In Fight Club the narrators apartment is destroyed by an explosion, all of his worldly goods are lost and it is the point in which he gives in to his alter ego (there are other moments of caving to "Tyler Durden"). I have no "Tyler Durden"-esque issues. I think I'm going to be just fine.

    Missed a day!

    I missed my first day of writing yesterday. I realized when I went to bed. I spent a while laying there thinking about "the why?". I've settled on the reason being down to me sliding ever more into the makers schedule. My day tends to break out into procrastinate before lunch time, then when everyone is at lunch start working and keep going until 1800 or so when I break to eat, by the time I'm back after dinner everyone has gone home so I can easily get into a groove and work through the late evening. Writing tends to take place somewhere in there, I'm either waiting for a long running process to finish (builds, or, as with now, a map reduce test to run before unleashing the real thing). Yesterday I had an interview at 1400 and a sync up at 1730 (arranged by the hiring manager and the recruiter) and as such my pre-lunch procrastination turned into a pre-1400 procrastination which bled into a 1500-1730 procrastination which then resulted in me playing video games and then going home. As 50% of the development team on a highly important project we just cannot afford a loss of 20% productivity; I'm no longer interviewing for these non-my-team-positions (and hopefully not having any reason not to write!)

    Slowing down

    Carl Honore's Ted talk entitled "In Praise of Slowness" is brilliant. I highly recommend watching it and taking note. In my case I immediately put the work into practice, I went from "hi" in the corridors at work to stopping and engaging with people. This slowness costs me zero and keeps returning value. Alongside unsubscribing from the news a year ago it is easily one of the best changes I've made in my life. 

    Today it paid off hugely. Last week I called around trying to get information about my car aftermarket warranty. I was in a rush: the location was awkward, the data I had to operate on was sketchy, I didn't have any note taking devices. Result: "D-". I tried again this morning, from my home, with full access to the internet and a scratch pad in front of me. While these remedies definitely made for a better experience the actual value came out of my reduced stress resulting in me being able to take it slow with the people on the other end of the phone. I didn't let being on hold get to me, and when I got through to a reps (I had to call a few companies and piece together the patchwork behind car warranties) I took the time to ask them how their day was going, to shoot the shit a little, tell them a little about how I was feeling (fighting off a flu-like thing), always with the aim of putting them at ease: I'm not here to yell at you for being on hold, I'm not going to be curt with you, I'm going to treat you like a person that I need help from, that I respect is one of the few people that can actually help me. Not only did they answer my questions, but they offered to help me in ways that I wasn't offered last week.

    I had a great experience with my single serving friends! Try it out!

    Tools

    I had one of those rare successes recently. It occurred in two parts. The first was my co-worker using a build flow that I'd been working on much of last week - it produces AMIs which we then deploy using Netflix's Asgard tool - I was very nervous but things worked out just fine. By the point that he was using this I had started work on making it a tool which helps bootstrap the whole process - this took the form of an epic 18 hour day yesterday resulting in a tool called 'molecule'. The second part of the success was the same co-worker integrating molecule fairly quickly into a project that we have, and deploying it into our AWS cloud. All within an hour or so.

    There is still much improvement that can take place in molecule, notably around the debug-ability of builds. It is too easy to get to one of the latter stages and find it fails for some reason. I'm starting to formulate some ideas as to how to rectify these issues. Using it more is helping my understanding of where it lacks.

    molecule itself is a build pipeline, starting with your computer and ending up as an AMI written into S3. First your computer boots up a Vagrant instance, it pulls in some requirements from our in-house git and then starts up packer, which starts an EC2 instance, configures itself and then writes down an image of itself to S3. The main win in breaking molecule out of its parent project is now all of our builds can follow this pattern and as we improve molecule all of our projects will benefit.

    I hope to open source it soon, currently there are some aspects of it that are tied to twitch (for example setting up the SSH config file for our git repo) and I'm not entirely sure how to go about abstracting that just yet.

    I may be falling off the exercise bandwagon again

    I bounce between gagging for a run and wanting to stay very far away from my running shoes! I need to find a balance but it can be so hard sometimes. The main reasons I've established for tardy exercise habits are wanting to sleep an extra 15 minutes in the morning or being deep into something at work. In either case I need to sort it out. Morning time would be my preferred exercise time. Maybe on Monday? ;)