Implementing a Real-Time Analytics Engine Using Big Data Principles
One of our clients wanted to build an in-transit entertainment system. Commuters would connect over WiFi to a “media box” installed in a bus and stream audio-visual content on their mobile devices when travelling. Think of it like Netflix in transit.
Due to an overwhelming response to the pilot run in one city, their backend analytics system started having scale issues. At the peak of the usage, the bus systems were transmitting data of around 50 million data points back to its servers. The data analytics system was not capable enough to handle the volume and the velocity of the data, and the analytics team saw response time increasing up to 48 hours.
We helped them solve this problem and scale their analytics system by building data pipelines based on Apache Spark and other tools in the big data ecosystem. Read on more to find out how.
The existing architecture as shown below had events coming from media boxes to an API endpoint and eventually stored in the database. The current system had all the data being stored in a traditional RDBMS. Analytical queries were being done using a raw data store, connected to a data visualisation tool. The query time kept on increasing as the volume of data increased. This was impacting the technology and was proving to be a hindrance in their business decision making.
The current system was built on OLTP concepts. However, the velocity and the volume of data in question was way higher for the system to be fast enough.
The challenge was to build an OLAP system that allows them to ‘slice and dice’ their data within a permissible time period. This system was required to return an ad-hoc query within 5 minutes.
They also wanted the data to be consumed by different users in different formats. As an example, they wanted customised dashboards for their analytics team, a separate dashboard for their marketing team and another for the advertisers.
A very straightforward approach was to create a read-replica system but that would have reduced a single point of failure and not solved performance issues. Other way was to do vertical scaling but the problem with traditional MySQL is parallelization of computing resources is not possible (though PerconaDB has introduced the feature lately) and with the given volume, churning 50 GB data a day would not have helped. We were looking at Volume and Variety and had to look towards the Big Data Ecosystem. Apache Spark qualified for computations and support for batch and stream data processing.
We started building a Data Warehouse, with multiple data marts, each of them customized for one user type. A Data Warehouse allows a variety of data to be stored and is built with retrievability (fast queries) in mind. The solution also included ad-hoc query time on exploratory data to be under 5 minutes. A few reports and graphs on a dashboard were created to simplify business decision making.
The data stored in a Warehouse needs to be pristine and this is where the first problem came up. The data being used by the old system was not pristine enough to be directly stored in the Data Warehouse. The Ingestion, therefore, required a lot of data transformation, cleansing and massaging. As an example, the timestamp was required to be converted from a string type to a DateTime type variable. Some other issues that had to be taken care of before we could successfully build the Warehouse:
- Converting Snowflake to Star Schema - The existing system used a Snowflake schema with multiple normalised Dimension tables. The name ‘Snowflake’ comes from the fact that the entity-relationship diagram resembles a snowflake. Because the dimension tables are completely normalised, creating an ER diagram with a central Fact Table with the Dimension Tables around it would appear similar to a snowflake. Querying a Snowflake schema always requires joins, and as the joins get more complex, the querying becomes more expensive. We denormalized and converted the schema into a Star representation. Denormalizing adds redundancy to the tables but reduces the number of tables required to represent data. This denormalisation also aided in the construction of OLAP cubes later on.
Tackling Slowly Changing Dimensions - Slowly changing dimensions change rather unpredictably. For example, the geographical location of a customer. She might have been an avid customer in city A but she recently moved to city B. Overwriting her location as city B will exclude her from any historical analysis done on customers in city A. Although the entertainment boxes used in the buses were unique to each bus, they often faced technical troubles - both the boxes and the buses. In certain cases, the boxes were replaced. Such data was getting overwritten rendering certain historical analysis incomplete. We used the Type 2 SCD management methodology which involves creating new records and adding a surrogate key (eg. version number). Adding ‘effective date’ columns, Created_On and Updated_On allowed us to preserve unlimited history for every key.
Deduplication - Certain applications in the current system were causing logical duplication of data. This was a cause of concern and had to be tackled during ingestion. The solution we reached for deduplication was building a Bloom Filter - a space-efficient probabilistic data structure used to test if an element is a member of a set.
Type Conversion - Certain variable types were being converted by the data visualisation tool resulting in a time and effort overhead. The timestamp was being saved as a String type. This was converted to a DateTime data type during ingestion. Geo-coordinates were also being stored as a string data type. These were converted to Double data type.
Computation - We used Apache Spark as the distributed computing framework. It offers a 10-100x speed performance when compared to Hadoop’s MapReduce. There is also standard SQL support and availability of connectors for diverse data sources, making it easier to query and consume data.
Storage - This solution required a distributed storage system with High Availability and Resiliency. HDFS or Hadoop File System is a tried and tested storage system, with High Availability and Resiliency, making it a good fit. It also allows for a variety of data (structured, semi-structured and unstructured).
Storage Format - Parquet was the preferred choice for the file storage format. Columnar format and non-dependency on schemas made it the preferred choice.
Storing Cubes - OLAP Cubes are the materialised views built for the Data Warehouse. Stored in MySQL, these multidimensional databases are much more efficient and are the preferred type for advanced business analytics. The reports are generated on the ingested data at small intervals. To avoid discrepancy, we had to remove the previous data at every interval. MySQL allows us to delete data and recompute the OLAP cube every time.
Exploratory Data - Using the Cubes, we built an exploratory table for the client to provide the ability to explore raw data with discrete and rolling rate limitations. We provided different views based on rolling date partitions. The views were partitioned for 30 days and 90 days, termed as Hot and Warm views respectively. The cold view had all the data.
Query Engine - Hive was the query engine used to parse SQL queries created with data visualisation like Tableau.
The number of records ingested per day to Data Warehouse - 50 million
Time taken for a simple query-response for unique visitors (30 days of data) - reduced from 3 hours to 2 minutes
Earlier for the data analyst, the reports generated was of two days back and also had to be prepared beforehand, with the new solution, the delay was reduced to one hour and could be generated in an ad-hoc manner, thus reducing the effort by at-least 50% and take a faster data-driven approach for product decisions.