When working with Big Data, it’s frequent to have the need to aggregate data in real-time, whether it comes from a specific service, such as social networks (Twitter, Facebook…) or even from more diverse sources, like a weather station. A good way to process these large amounts of information is with Spark Streaming, it provides us all the data in real time, but it has one problem: you have to program it yourself.

How to process and aggregate data


To avoid this, we can use Stratio Sparta to process and aggregate data. It helps us by reducing raw information and making it useful after it’s processed. Later we’ll persist the result in a MongoDB cluster and provide the streaming through WebSockets with Node.js.

Stratio Sparta is a very flexible, simple to use tool that allows us to transform raw information with Morphline, through its web interface, and aggregate data in different dimensions and time ranges.

First Steps with Stratio Sparta

First of all, we must install Stratio Sparta as indicated in Stratio’s documentation. For this example we’ve used the Stratio Manager and installed our 3 machine clusters with MongoDB and Stratio Sparta. With it we can also monitor our different service resources.

After we install it, we can execute it and it will open its service in the 9090 doc where we can start to set up the different parameters:

  • Input: It’s the origin of the data. It can come from Flume, Kafka, RabbitMQ, from a Socket, from a WebSocket or directly from Twitter.
  • Output: We can persist or show the information in different ways like MongoDB, Elasticsearch, Cassandra, Parquet, Redis, a CSV file or on the screen.
  • Policies: It’s the most complex part of the set-up. We can do the following:
    • Configure the input.
    • Configure the outputs.
    • Make transformations with Morphlines by type of data and date.
    • Make aggregations of different dimensions and time granularity. We can also make common functions in data groupings, such as countsummaxmin and others.

In this example we’ll take the data API from meetup.com, which provides a WebSocket with the information that is being created or being updated in the service. Later, we’ll aggregate it by country and we’ll do an hourly recount.

Afterwards we’ll add 3 nodes with MongoDB as an output and we’ll establish the name of the collection that it will generate.

Of all the raw information provided by the WebSocket, we’ll only take 3 of the attributes and give them the proper format. We’ll establish the output of the transformation as countryresponse and modified.The last one to easily return the updates from Node.js.

{
  "morphline": {
    "id": "morphline1",
    "importCommands": [
      "org.kitesdk.**"
    ],
    "commands": [
      {
        "readJson": {}
      },
      {
        "extractJsonPaths": {
          "paths": {
            "response": "/response",
            "country": "/group/group_country",
            "modified": "/mtime"
          }
        }
      },
      {
        "removeFields": {
          "blacklist": [
            "literal:_attachment_body"
          ]
        }
      }
    ]
  }
}

After we establish what the structure of the processed information is going to be like, we’ll set up an aggregation in the following way:

  • Time dimension: name of the attribute, to which we’ll add the beginning of the aggregation date (days, hours…).
  • Granurality: time rate in which the information will be aggregated. In this case it’s hourly.
  • Dimensions: We’ll group by country, though it’s possible to do a country and response aggregation.
  • Operators: We’ll make several operations, the first one is the event recount (count), afterwards it will add the last value of the modified entry, which was taken from the  mtime attribute from the WebSocket (with  lastValue).

When all these actions are done, we only have to establish a way out (in this case MongoDB) and execute the policy.

With this information the aggregations should start in the following way:

We’ll get the result in real time in our MongoDB cluster, and we can make queries to it from any other technology.

WebSocket Server with Node.js

To set up our MongoDB connected WebSocket server, we need to have previously installed both modules. Later on we’ll be able to read the database registry that will be sent by the WebSocket:

var MongoClient = require('mongodb').MongoClient;

MongoClient.connect('mongodb://localhost:27017/sparta', function(err, db) {
  db
    .collection('id_country_hour')
    .find()
    .toArray(function(err, docs){
      console.log(docs);
    });
});

When we execute the script with the node, it will show the MongoDB content in a console.

After we check the access to the information, we’ll create the WebSocket server in order to transmit the changes produced by Stratio Sparta in real-time.

To serve all the streaming data we must do the following:

  • In first place, when a new client logs in we must store the connection that’s generated by the WebSocket API to make a broadcast later on. We have to eliminate the connection after, as well.
  • It’s also important to send all the existent registries to date.
  • Every so often we send the changes that have taken place on MongoDB using the modified attribute to find them.
  • For the example, we’ll filter the countries to reduce the amount of data that will be sent.
  • We’ll also filter the attributes that will get sent, since we’ll only be using countryhour and count.

 The example is brief and you can consult it here.

WebSocket Client with JavaScript

Once we have all the data in our front-end we can show them in a table, chart or console. In this case we’ll show the information in a table with console.table to simplify the visualization.

var ws = new WebSocket('ws://127.0.0.1:8008/');
var data = {};

ws.onmessage = function(response){
  var responseData = JSON.parse(response.data);

  responseData
    .map(function(row){
      row.hour = new Date(row.hour).toISOString().slice(0, 16).replace('T',' ');
      return row;
    })
    .forEach(function(row){
      if(!data[row.hour]){
        data[row.hour] = {};
      }
      data[row.hour][row.country] = row.count;
    });

  console.clear();
  console.log("Last update:", new Date().toLocaleString());
  console.table(data);
}

The result is simple but functional:

We can also make a more aesthetic visualization with charts. A quick way to make them, is with Chart.jsWe simply have to format the data so that they can adjust themselves to the Chart.js scheme and we’ll get the following result.

You can find this simple data visualization example here.

It’s really simple to handle information from JavaScript and there are many data visualization libraries, this makes it very easy to use and visualize data generated by Stratio Sparta data aggregations and make statistical reports, which would be very costly to handle without a tool that processes data in real-time.

Author Pedro Gutiérrez

Pedro Gutierrez works as a front-end architect, focused on data visualisation and interface modularity. He is still alive after a thousand battles vs AngularJS.

1 Comment

  1. Jagadeesh M Reply

    Hi –

    My messages in Kafka are custom Avro Serialized. When I configure the Input Type in Sparta to Kafka, it may have trouble for deserializing the Avro messages. I am wondering if there is a way to extend Kafka Input Type to support custom Avro deserialization?

    Also, for Output Type, is there a REST Output Type? Essentially, after pre-aggregations, I would want to post the output to a REST end-point.

    Thoughts?

Write A Comment

Exit mobile version