Welcome to this tutorial on building a deep learning model for malware classification.
As we all know , malware detection is one of the most challenging task for IT companies. A huge effort is made by antivirus vendors to always update their solution in order to thwart new types of computer viruses. On the other hand, malware signature is constantly modified in order to bypass antivirus.
The good news here is that great deal of new malware versions are slight modifications of older ones . In other words, the binary of new malware versions differs very slightly from those of versions that are already managed by antiviruses.
Therefore, with a well-optimized Deep Learning model, it is possible to detect and classify malware by comparing their binary with those of already known malwares. Deep Learning models have the advantage of being able to learn accurately particular characteristics from data passed them .
The Procedure
Many papers ( Read this Paper) on malware classification using deep learning describe the following workflow :
- Obtain the malware binary file
- Convert the binary file into Grayscale image : this can be done using BinaryToImage
- Feed the resulting images to a Convolutional Neural Network for classification
Implementation
For this tutorial, we have used the malimg dataset.
Malimg dataset contains 9339 malware grayscale image from 25 malware famillies. Images ar 32 x 32 pixels and each is labeled with a number between 0 and 24, corresponding to the malware class.
For example, the first class has a label of 0, the malware class is Worn, an the familly is Allaple.L
For deep learning , we have created a model by taking the first 8 layers of the ResNet101 Convolutional Neural Network , to which we have concatenated a Flatten Layer , followed by a Fully Connected Neural Network. The output layer is a Dense Layer of 25 units (1 for each class ) with the softmax activation function
Figure 1: Deep Learning Model
The Code
The following lines describe the code created for data preparation , model designing, training and testing
These lines were exported from a Google Colab notebook
#Imports
import tensorflow as tf
from google.colab import drive
import os
from os import path
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Flatten,Input,Conv2D,MaxPooling2D,MaxPool2D
from tensorflow.keras.callbacks import EarlyStopping
If using Google Colab notebook, mount your google drive to allow accessing your drive files from the notebook
drive.mount('/content/drive')
Before proceeding, create a folder MLDATASET in your google drive, downlaod maling.npz and malimg_classes.csv files and put them in MLDATASET folder
malimg contains image data along with label number . malimg_classes is a csv file where the first column is the class label and the remaining 2 columns are the malware class and class familly
#Load data into numpy array
malimg= '/content/drive/MyDrive/MLDATASET/malimg.npz'
classes_data_path = '/content/drive/MyDrive/MLDATASET/malimg_classes.csv'
classes = pd.read_csv(classes_data_path,sep=';')
data = np.load(malimg,allow_pickle=True)
files = data.files
imgs = [data[i] for i in files]
npArr = imgs[0]
# This method is for shuffling and splitting data into 60% for training, 10% for validation and 30% for testing
def generate_train_valid_test_data():
np.random.shuffle(npArr)
idx_train = npArr.shape[0]*50//100
idx_valid = idx_train + npArr.shape[0]*10//100
idx_test = idx_valid
train_data = npArr[:idx_train]
valid_data = npArr[idx_train:idx_valid]
test_data = npArr[idx_valid:]
return(train_data,valid_data,test_data)
#this method generate a nice classname from a label number ( label can take a value from 0 to 24 )
def get_class_name_from_label_number(label):
cur_class = classes.loc[classes["label"]==label]
c1 = cur_class["class"].values[0]
c2= cur_class["familly_name"].values[0]
class_name =c1 + "/" + c2
return class_name
#Plot some sample images
fig , ax = plt.subplots(1,4, figsize=(10,3))
for i in range(4):
idx = random_indx[i]
image = npArr[idx][0]
label1 = npArr[idx][1]
class_name = get_class_name_from_label_number(int(label1))
(ax[i]).imshow(image)
(ax[i]).set_title("Class:\n "+str(class_name))
Figure 2:Malware Image samples
#Load the ResNet101 CNN and take the first 8 layers : This Model will constitute our CNN.
resnet_cnn_model = tf.keras.applications.ResNet101(include_top=False, weights=None, input_shape=(32, 32,1))
output = resnet_cnn_model.get_layer('conv2_block1_1_conv').output
resnet_cnn_model_new = Model(resnet_cnn_model.input,output)
resnet_cnn_model_new.summary()
Model: "model" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= input_1 (InputLayer) [(None, 32, 32, 1)] 0 _________________________________________________________________ conv1_pad (ZeroPadding2D) (None, 38, 38, 1) 0 _________________________________________________________________ conv1_conv (Conv2D) (None, 16, 16, 64) 3200 _________________________________________________________________ conv1_bn (BatchNormalization (None, 16, 16, 64) 256 _________________________________________________________________ conv1_relu (Activation) (None, 16, 16, 64) 0 _________________________________________________________________ pool1_pad (ZeroPadding2D) (None, 18, 18, 64) 0 _________________________________________________________________ pool1_pool (MaxPooling2D) (None, 8, 8, 64) 0 _________________________________________________________________ conv2_block1_1_conv (Conv2D) (None, 8, 8, 64) 4160 ================================================================= Total params: 7,616 Trainable params: 7,488 Non-trainable params: 128 _________________________________________________________________
#Apply the Flatten Layer to the output of the previous CNN , followed by a Fully connected Neural Network formed by 3 Dense Layers
#Then compile the resulting model
input = resnet_cnn_model_new.input
resnet_new_outputs= resnet_cnn_model_new.output
resnet_cnn_model_new
h = Flatten()(resnet_new_outputs)
h = Dense(64,activation="relu")(h)
h = Dense(32,activation="relu")(h)
outputs = Dense(25,activation="softmax")(h)
model = Model(inputs =input,outputs=outputs)
model.compile(loss='sparse_categorical_crossentropy',metrics=["accuracy"])
model.summary()
Model: "model_1" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= input_1 (InputLayer) [(None, 32, 32, 1)] 0 _________________________________________________________________ conv1_pad (ZeroPadding2D) (None, 38, 38, 1) 0 _________________________________________________________________ conv1_conv (Conv2D) (None, 16, 16, 64) 3200 _________________________________________________________________ conv1_bn (BatchNormalization (None, 16, 16, 64) 256 _________________________________________________________________ conv1_relu (Activation) (None, 16, 16, 64) 0 _________________________________________________________________ pool1_pad (ZeroPadding2D) (None, 18, 18, 64) 0 _________________________________________________________________ pool1_pool (MaxPooling2D) (None, 8, 8, 64) 0 _________________________________________________________________ conv2_block1_1_conv (Conv2D) (None, 8, 8, 64) 4160 _________________________________________________________________ flatten_1 (Flatten) (None, 4096) 0 _________________________________________________________________ dense (Dense) (None, 64) 262208 _________________________________________________________________ dense_1 (Dense) (None, 32) 2080 _________________________________________________________________ dense_2 (Dense) (None, 25) 825 ================================================================= Total params: 272,729 Trainable params: 272,601 Non-trainable params: 128 _________________________________________________________________
#Prepare the test , train, and validation dataset for model training
train , valid , test= generate_train_valid_test_data()
x_train = np.stack(train[:,0])
x_train = x_train[:,:,:,np.newaxis]
y_train = train[:,1].astype(int)
x_val = np.stack(valid[:,0])
x_val = x_val[:,:,:,np.newaxis]
y_val = valid[:,1].astype(int)
x_test = np.stack(test[:,0])
x_test = x_test[:,:,:,np.newaxis]
y_test = test[:,1].astype(int)
# Use early stopping as rgularization
earlystop = EarlyStopping(monitor='val_accuracy',mode='max',patience=3, verbose=2)
#Train the model
history = model.fit(x_train,y_train,batch_size=100,epochs=100,validation_data=(x_val,y_val),callbacks=[earlystop])
Epoch 1/100 47/47 [==============================] - 5s 89ms/step - loss: 1.9561 - accuracy: 0.4892 - val_loss: 8.1584 - val_accuracy: 0.3333 Epoch 2/100 47/47 [==============================] - 4s 84ms/step - loss: 0.8638 - accuracy: 0.7340 - val_loss: 5.2910 - val_accuracy: 0.4009 Epoch 3/100 47/47 [==============================] - 4s 85ms/step - loss: 0.5796 - accuracy: 0.7925 - val_loss: 2.4729 - val_accuracy: 0.4920 Epoch 4/100 47/47 [==============================] - 4s 83ms/step - loss: 0.4347 - accuracy: 0.8306 - val_loss: 1.7647 - val_accuracy: 0.5723 Epoch 5/100 47/47 [==============================] - 4s 85ms/step - loss: 0.3276 - accuracy: 0.8728 - val_loss: 0.4186 - val_accuracy: 0.8328 Epoch 6/100 47/47 [==============================] - 4s 83ms/step - loss: 0.2572 - accuracy: 0.8931 - val_loss: 0.4189 - val_accuracy: 0.8328 Epoch 7/100 47/47 [==============================] - 4s 84ms/step - loss: 0.1670 - accuracy: 0.9353 - val_loss: 0.4783 - val_accuracy: 0.8264 Epoch 8/100 47/47 [==============================] - 4s 84ms/step - loss: 0.1126 - accuracy: 0.9602 - val_loss: 0.7770 - val_accuracy: 0.8199 Epoch 00008: early stopping
#Plot accuracy and loss curves for both training and validation data
acc = history.history['accuracy']
loss = history.history['loss']
val_acc = history.history['val_accuracy']
val_loss = history.history['val_loss']
fig, ax = plt.subplots(1,2,figsize=(10,5))
ax[0].plot(acc,label='accuracy')
ax[0].plot(val_acc,label='val_accuracy')
ax[0].set_title("Accuracy")
ax[0].legend()
ax[1].plot(loss, label='Loss')
ax[1].plot(val_loss,label='val_loss')
ax[1].set_title("Loss")
ax[1].legend()
Figure 3: Learning curves
#THIS METHOD COMPRARE THE PREDICTED MALWARE CLASSES WITH THE ACTUAL VALUES,
#THE RESULT IS DISPLAYED AS A PANDAS TABLE
def compare_predictions(pred,labels):
pandas_cols = ["Predicted Value", "Actual Value"]
data = []
for i in range(pred.shape[0]):
idxmax = pred[i]
get_class_name_from_label_number(idxmax)
get_class_name_from_label_number(idxmax)
predicted_class = classes.loc[classes["label"]==idxmax]
c1 = predicted_class["class"].values[0]
c2= predicted_class["familly_name"].values[0]
pred_class_name =get_class_name_from_label_number(idxmax)
label = labels[i]
classe = get_class_name_from_label_number(label)
data.append([pred_class_name,classe])
df = pd.DataFrame(data, columns=pandas_cols)
display(df)
#Randomly take 50 data from test data and predict their classes
random_indx = np.random.randint(0, high=x_test.shape[0], size=50)
q =x_test[random_indx]
labels = y_test[random_indx]
pred = model.predict(q)
pred=pred.argmax(axis=1)
compare_predictions(pred,labels)
Predicted Value | Actual Value | |
---|---|---|
0 | Rogue Fakerean | Rogue Fakerean |
1 | Trojan C2Lop.gen!G | Trojan Downloader Obfuscator.AD |
2 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
3 | Worn Youner.A | Worn Youner.A |
4 | Worn Youner.A | Dialer Adialer.C |
5 | Rogue Fakerean | Rogue Fakerean |
6 | Worn Youner.A | Worn Youner.A |
7 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
8 | Trojan Downloader Swizzor.gen!E | Trojan Downloader Swizzor.gen!E |
9 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
10 | PWS Lolyda.AA 1 | Worn Youner.A |
11 | Rogue Fakerean | Rogue Fakerean |
12 | Worn:AutoIT Autorun.K | Worn:AutoIT Autorun.K |
13 | Worn Allaple.A | Worn Allaple.A |
14 | PWS Lolyda.AA 2 | PWS Lolyda.AA 2 |
15 | Worn Youner.A | Worn Youner.A |
16 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
17 | Trojan Downloader Swizzor.gen!E | Trojan Downloader Swizzor.gen!E |
18 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
19 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
20 | PWS Lolyda.AA 3 | PWS Lolyda.AA 3 |
21 | Worn Youner.A | Trojan C2Lop.P |
22 | Worn Youner.A | Trojan C2Lop.P |
23 | Worn:AutoIT Autorun.K | Worn:AutoIT Autorun.K |
24 | Worn:AutoIT Autorun.K | Worn:AutoIT Autorun.K |
25 | PWS Lolyda.AA 1 | Worn Youner.A |
26 | PWS Lolyda.AA 1 | Worn Youner.A |
27 | Worn Youner.A | Worn Youner.A |
28 | Worn Youner.A | Worn Youner.A |
29 | PWS Lolyda.AA 3 | PWS Lolyda.AA 3 |
30 | PWS Lolyda.AA 1 | Worn Youner.A |
31 | Worn VB.AT | Worn VB.AT |
32 | PWS Lolyda.AA 1 | Worn Youner.A |
33 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
34 | PWS Lolyda.AA 1 | Worn Youner.A |
35 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
36 | PWS Lolyda.AA 1 | Worn Youner.A |
37 | Worn VB.AT | Worn VB.AT |
38 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
39 | Trojan Skintrim.N | Trojan Skintrim.N |
40 | Trojan Downloader Wintrim.BX | Trojan Downloader Wintrim.BX |
41 | Worn Youner.A | Trojan C2Lop.P |
42 | Trojan Skintrim.N | Trojan Skintrim.N |
43 | Backdoor Agent.FYI | Trojan Downloader Obfuscator.AD |
44 | PWS Lolyda.AA 1 | Worn Youner.A |
45 | Trojan Skintrim.N | Trojan Skintrim.N |
46 | Trojan Skintrim.N | Trojan Skintrim.N |
47 | PWS Lolyda.AA 1 | PWS Lolyda.AA 1 |
48 | Worn Allaple.A | Worn Allaple.A |
49 | Worn Youner.A | Worn Youner.A |
#FEEL FREE TO DISPLAY THE CONFUSION MATRIX FOR FURTHER ANALYSING MODEL PERFORMANCE
#confusion = tf.math.confusion_matrix(labels=labels, predictions=pred, num_classes=25)
#print(confusion)
Conclusion
In this tutorial, we have trainned a simple deep learning model for maalware classification. With the maling dataset, we have managed to obtain a model with 96% of accuracy.
Build your own digital document repository from your paper documents
Welcome to this article on building a digital document repository from your paper documents
Today, many organizations are adopting digital behavoius in their day to day processes. This might include providing a Web interface for digitalizing
a process that used to be 100% presential or include digital maketing in their strategy checklist. But transforming business processes for digital
often leads to the need
for an Electronic Document Management System ( EDMS )
to suitably store, track and search documents.
Depending on the context, a document can be a simple bill or a copy of a contract. In some other cases documents may consist of images .
Whatever the case, adopting an EDMS is not a challenging task as there are many vendors who provide high quality EDMS
software ready to use.
However, integrating an EDMS also involves being able to digitize and store historical paper document
which used be kept physically in the past. Those documents need to be stored in a digital repository for them to be searchable.
In this article, we aim to provide simple “do it yourself” ( DIY ) guidelines to build your own digital document repository from your historical paper documents.
Our goal is to build a repository for efficiently store , index and allow searching documents using tags and keywords.
1- Our solution
Figure 1 below , describes successive steps followed when moving paper document in a digital document repository.
Figure 1: Adding paper documents to the repository
first, documents are scanned, then the digital output is processed to extract tags and keywords.
Next those tags and keywords are stored in a metadata repository and the digital document
is stored somewhere in a document storage system. The metadata repository keeps the location of the digital document.
This way, when an authorized user tries to search a document, search terms entered by the user are analysed and a query is executed against
the metadata repository.
The result of this query is an ordered list of documents , with the first of the list being the one that matches the most the search query.
The solution presented here is suitable for documents whose content is mainly text, but it can easilly be extended to support more complex documents
like images.
2- Implementation
As we mentioned before, implementing the digital repository presented here is very straightforward. The most challenging task here is to find the best tool or
technology for each components.
For this article, we have designed an architecture displayed in Figure 2 below. Bear in mind that there is no unique way of implementating this solution as
someone else can come out with a better implemention.
Figure 2: An implementation of digital document repository from paper doucments
Digital documents from scanner are processed using Python
library pytesseract ,which uses
Optical Character Recognition ( OCR )
algorithms to extract text from image. Then the output text file is fed
into another python script built around Rapid Automatic Keyword Extraction
( RAKE ) library, to
generate metadata.
Extracted metadata are then indexed in
elasticsearch with the following content :
metadataDoc = { "doc_name": this is the name of the document, "content" : all the key phrases extracted from the document using RAKE, "tags" : the list of top 5 most frequent words form the text }
Later, the actual digital document is stored in Google Cloud Filestore and its metadata is updated with document’s location.
The choice of Google Cloud Filestore as storage backend for the digital document is ramdon. Other possible storage solutions might include a local Network File System ( NFS) , an FTP server or any file storage solution available.
Finally, we use Kibana for searching and retrieving documents. A better implementation of the search interface could consist in building a web page with forms for searching and displaying digital documents.
For people with technical background interested in testing the solution presented in this article, we have developed two python Scripts for Image to Text conversion and Metadata extration from text.
Both scripts are available on our GitHub : Image to Text Converter and Metadata Extractor . The whole python virtual environement is also available Here
3- Conclusion
In this article,we presented a simple and “easy to implement” way of building your own digital document repository from your paper document.
The solution laverage elasticsearch for document searching. The advantage of elasticsearch as search engine is that it is scalable and provide a security layer for
controling acces to documents.
Our intent wasn’t to build a cutting edge commercial product but to present simple DIY guidelines for building your own repository.
Real world Document Management Systems are more complex and include manay other components. For people interested in extending the solution presented here
with more advanced features, feel free to do so and share the results.
Internet Penetration Rate Africa 2021
Source : Internet World Stats
Country | Population Estimation in 2021 | Internet Users 31-12-2020 | Internet Penetration Rate |
---|---|---|---|
Kenya | 54,985,698 | 46,870,422 | 85.2 % |
Libya | 6,958,532 | 5,857,000 | 84.2 % |
Nigeria | 211,400,708 | 154,301,195 | 73.0 % |
Mauritius | 1,273,433 | 919,000 | 72.2 % |
Morocco | 37,344,795 | 25,589,581 | 68.5 % |
Tunisia | 11,935,766 | 8,170,000 | 68.4 % |
Cabo Verde | 561,898 | 352,120 | 62.7 % |
Gabon | 2,278,825 | 1,367,641 | 60.0 % |
Eritrea | 3,601,467 | 248,199 | 6.9 % |
Mali | 20,855,735 | 12,480,176 | 59.8 % |
South Africa | 60,041,994 | 34,545,165 | 57.5 % |
Algeria | 44,616,624 | 25,428,159 | 57.0 % |
Eswatini | 1,172,362 | 665,245 | 56.7 % |
Senegal | 17,196,301 | 9,749,527 | 56.7 % |
Zimbabwe | 15,092,171 | 8,400,000 | 55.7 % |
Djibouti | 1,002,187 | 548,832 | 54.8 % |
Egypt | 104,258,327 | 54,741,493 | 52.5 % |
Zambia | 18,920,651 | 9,870,427 | 52.2 % |
Namibia | 2,587,344 | 1,347,418 | 52.1 % |
Botswana | 2,397,241 | 1,139,000 | 47.5 % |
Ghana | 31,732,129 | 14,767,818 | 46.5 % |
Cote d’Ivoire | 27,053,629 | 12,253,653 | 45.3 % |
Rwanda | 13,276,513 | 5,981,638 | 45.1 % |
Uganda | 47,123,531 | 18,502,166 | 39.3 % |
Tanzania | 61,498,437 | 23,142,960 | 37.6 % |
Lesotho | 2,159,079 | 682,990 | 31.6 % |
Benin | 12,451,040 | 3,801,758 | 30.5 % |
Sudan | 44,909,353 | 13,124,100 | 29.2 % |
Cameroon | 27,224,265 | 7,878,422 | 28.9 % |
Sao Tome & Principe | 223,368 | 63,864 | 28.6 % |
Angola | 33,933,610 | 8,980,670 | 26.5 % |
Equatorial Guinea | 1,449,896 | 362,891 | 25.0 % |
Comoros | 888,451 | 193,700 | 21.8% |
Burkina Faso | 21,497,096 | 4,594,265 | 21.4 % |
Mauritania | 4,775,119 | 969,519 | 20.3 % |
Mozambique | 32,163,047 | 6,523,613 | 20.3 % |
Gambia | 2,486,945 | 442,05 | 19.0 % |
Guinea | 13,497,244 | 2,551,672 | 18.9 % |
Ethiopia | 117,876,227 | 21,147,255 | 17.9 % |
Congo, Dem. Rep. | 92,377,993 | 16,355,917 | 17.7 % |
Congo | 5,657,013 | 833,2 | 14.7 % |
Liberia | 5,180,203 | 760,994 | 14.7 % |
Malawi | 19,647,684 | 2,717,243 | 13.8 % |
Niger | 25,130,817 | 3,363,848 | 13.4 % |
Chad | 16,914,985 | 2,237,932 | 13.2 % |
Burundi | 12,255,433 | 1,606,122 | 13.1 % |
Sierra Leone | 8,141,343 | 1,043,725 | 12.8 % |
Somalia | 16,359,504 | 2,089,900 | 12.8 % |
Guinea-Bissau | 2,015,494 | 250,000 | 12.4 % |
Togo | 8,478,250 | 1,011,837 | 11.9 % |
Central African Rep. | 4,919,981 | 557,085 | 11.3 % |
Madagascar | 28,427,328 | 2,864,000 | 10.1 % |
Source : Internet World Stats
Migrating an On-Premise Hadoop Cluster to Google Cloud Platform
Welcome to this article on migrating an On-Premise Hadoop Cluster to Google Cloud Platform.
Many Companies are migrating their Big Data workloads form the local infrastructure to the Cloud.
The reasons for switching from an On-premise deployment model to the Cloud are multiples and do not concern only Big Data solutions. This stuation is also observed in software deployement where Companies are more and more opting for Cloud Platform as a Service(PaaS) deployement model.
In both cases, the indubitable reason for migrating to a Cloud environement is an attempt to reduce as much as possible the effort dadicated on infrastructure management, so the available team will be more focused on building the best solution or product for the Company.
The particular case of migrating a Hadoop Cluster to the Cloud usually involves moving both Data and Transformation pipelines to the Cloud.
The possible reasons for migrating an On-Premise Hadoop Cluster to the Cloud includes:
– Reduce the cost of storing an querying Data
– Build an autoscaling architecture
– Leverage Serverless Services
– Leverage advanced security, governance and regulation compliance provided by the Cloud vendor.
Migrating to Google Cloud Platform
As we mentioned before, migrating a Hadoop Cluster to the Cloud usually involves moving both Data and transformation pipeline.
The transformation pipeline includes all consecutive algorithms applied to the Data from when they are ingested in the Cluster, to their final state where they have been enriched, formatted
and are ready to be served.
In the case of Hadoop Clusters, those pipelines generally consist of various Apache Spark jobs.
One of the crucial question usually answered before starting the migration is whether the existing pipeline should be moved to the Cloud as it is or the code has to be
adapted or even redesigned from scratch in order to be adapted to the Cloud environment.
It is obvious that the first option is the most factible and requires less effort.
Google Cloud Platform supports both scenarios , by integrationg various open source software and technologies.
Deploying without redesigning Apache Spark workloads
In this scenario, Data and processing pipeline are moved to the Cloud with low modification.
Google Cloud Platform supports this process by providing Cloud Dataproc
, a fully managed and highly scalable service for running Apache Spark. Cloud Dataproc also supports other open source softwares for Data processing like Apache Flink .
All features associated to Cloud Dataproc can be found Here.
It is possible to launch a Cloud Dataproc Cluster along with the HDFS for persisting Data , but this is not cost effective. Instead, Google recommends decoupling the Storage layer from the Processing layer for cost optimization. That way, Companies only create a Cloud Dataproc Cluster for a given process , and destroy the Cluster as soon as the process is completed.
Figure 1: Recommended process flow
Let’s consider the example of a Company that has been hosting a Cloudera Cluster on local servers and is now interested in migrating to Google Cloud platform.
The Cluster, stores both raw and processed Data in the HDFS and uses Apache Impala for Data Warehousing.
We can map each layer of this On-premise solution with a corresponding layer on Google Cloud as described in Figure 2 below
Figure 2: Layers Mapping Cloudera On-Prem vs Google Cloud
It is worth mentioning that Google BigQuery is actually formed by a storage backend and a query service. So using BigQuery for serving Data consists in storing the processed and structured Data from Cloud Dataproc to BigQuery storage and query them using BigQuery query service.
In summary , Figure 3 below , describes examples of End to End tools available on Google Cloud Platform for migrating an On-Premise Hadoop Cluster without rewriting the Spark code for Data transformation
Figure 3: An example of End-2-End tools integration
The previous scenario described how to migrate an On-premise Hadoop Cluster to Google Cloud Platform
Whithout redesigning the Data transformation pipeline.It avoids wasting time and human resources on a task that was already completed.
But for Companies that can afford building a new Data pipeline from scratch, Google Cloud Platform also provides
Cloud DataFlow for building fast, and cost-effective Data pipelines .
Cloud DataFlow is built around Apache Beam, which is an open source , unified model for defining both batch and streaming Data parallel processing pipelines.
So, building the Data pipelines with Google Cloud Dataflow has the advantage of supporting both batch en streaming Data processing.
Therefore another option available when migrating an On-Premise Hodoop Cluster to Google Cloud Platform might consist in leveraging Cloud Dataflow for the processing layer.
The difference here is that the pipeline should be rewritten using Apache Beam Programming Model
( there is support for Java,
, Python and Go).
For more on Cloud DataFlow, please refer to the Official documentation.
This Conclude the article. For more real wold uses cases on migrating an On-Premise Hadoop Cluster to Google Cloud Platform , visit the Use Cases Section on Cloud DataProc documentation
DEPLOYING A BIG DATA SOLUTION ON PREMISE – A SIMPLE PRACTICAL GUIDE
Most Big Data solutions include a data ingestion layer, a data storage layer, a data transformation layer
and a data visualization or data consumption layer.
Figure 1 : Common layers in Big Data Architectures
Each previously mentioned layer can be build using a wide range of tools , softwares and technologies. For example, the ingestion layer can be based on a tool such
as Apache sqoop or Apache Nifi, it can event consist of a simple Shell
or Python script.
The storage layer can be a cloud object storage a Hadoop HDFS
and any of storage technologies available out there.
The point here is that whatever the list of technologies or software chosen for a Big Data Architecture,
it will be necessary to configure each of them according to the environment and also to guarantee a quick delivery of a consistent version of the solution to
a production environment, where the end user or analysts can use the resulting data reliably.
In order to guarantee the promotion of the solution between different environments, the architecture must clearly define a way to deploy the solution.
Some of the challenges encountred when deploying a big data solution inludes :
– A big data architecture usually requires various tools and technologies, each of them having their own configurations files. This involves being able
to maintain each tools or technologies selectect for the architecture
– The variety of potential data sources may causes requirements to change on a regular basis. Thus the architecture should be quickly updatable
and redeployable .Beside, new requirements may involves modifying and redeploying the ingestion pipeline as well as changing or updating the storage layer.
– The volume of data to be stored and processed can encrease very quickly . So the architecture should allow new updates to garantee both
horizontal and vertical scalabilities . In some cases this can even lead to replacing or adding tools to the initial architecture and all those changes can require
redeploying part or the complete architecture.
-The transformations algorithms are constantly modified the take in charge new kind of incoming data or the produce a different view of the data to the business
layer.
Despide the fact that many cloud based solutions like Azure Databricks
provides a CI/CD process for deploying big data artifacts, many companies still prefer having their own deployment pipelines and in most of the cases ,
they prefer deploying their big data solution on premise.
In this article, we present a simple guide for designing the deployment process of a
Big Data solution and
we propose a simple implementation of a deployment on premise.
1- Designing the deployment process
A Big Data solution deployment process will hugely depend on the tools and components
selected for the architecture. However , in almost every cases, following actions should be recommendable when building a deployment process :
• Identify the deployable items
This requirement apply to each of the 4 layers of a big data architecture. It can be an environment dependant configuration file , a jar file or a python script.
For example if the ingestion layer is built with Apache Nifi which get
some data from a relational database and upload them to a Hadoop HDFS,
some deployable
items might include
Deployable Item | Description |
---|---|
/NIFI_HOME/conf/nifi.properties | This is the main configution file of apache Nifi |
/NIFI_HOME/conf/zookeper.properties | This is useful if Nifi operating in cluster mode |
/NIFI_HOME/conf/authorizers.xml | This file describes the authorizations policies Nifi : which group or users have access to what ? |
/NIFI_HOME/conf/login-identity-providers.xml | This file defines the authentication mechanism to nifi: ldap ? local ? … |
/NIFI_HOME/conf/flow.xml.gz | This is the resulting flow diagram wich can be moved between environments |
Table 1 : An example of deployable items for Apache Nifi
So whatever is the tools list chosen, an effort should be made to idententify each
deployable items. Some of them are deployed once at the begining of the project, some others are environment independents and should not be deployed.
• Identify environment variables
In the context of software deployment, each environment has it own informations. This can be a list of hostnames , a path where a software should be installed,
the path to the keytab file or any variable inherent to the corresponding environment. As a best practice, if each environment is formed by a cluster of nodes,
the environment variables should be the same for every nodes of the cluster. For example, environment viriables like $JAVA_HOME should hold the same value for
every nodes of the cluster. This allows having a single list of environment variables for each environment.
On the other hand, those variables for each environment should kept into a txt or a property file and stored somewhere where it should be accessed
by the deployment tool.
For that purpose, some people keep those files on a git repository and allow the deployment automation tools to read them before installing the solution.
• Select a software packaging and publication tool
Depending on the type of the project , there are multitude of tools for packaging and publishing a software project . These include :
Apache Maven, Apache ANT,
twine. Some people also build their own tool.
• Setup a control version repo
Today, almost every project uses GitLab or GitHub as a control
version repo.
• Setup an artifact repository if required
Some options includes Apache Nexus ,
JFROG Artifactory , CloudRepo
, PyPI
• Choose a deploymenent automation tools
Whether the project only requires deploying configuration files or also include proprietary software, deploying a Big Data solution requires the use of a deployment
automation tool as this kind of project generally run on clusters with multiples machines.
The selected tool should be able to detect the environment, resolve each environment variable for the current environment, pull the deployable artifact
from the artifact repos, an run the instalation script on every computer involved in the project. Some of the tools available include Jenkins ,
Puppet ,
Ansible ,
Fabric ,
• Very important advise
never hardcode an environment variable in a configuration file or a source code. Put it as a variable and let the deployment process overwrite them with the actual
value during the installation process.
2- A simple example of deploying a Big Data Solution on premise
The best way to illustrate how a big data solution could be deployed on premise , might be definining a simple use case.
************************* A media company is interested in knowing the most engaging tweets from a list of public persona, they are constatly checking on twitter to see if one of their interested perfile has twitted something new and to know how people reacted to those tweets. The company started with a list of few public profiles who constatly generate noise with their tweets. But their are interested in keeping updating their twitter list by adding new person or by removing those whose tweets start to engage less. For that purpose , they constantly search for the most reacted tweets on twitter to identify if someone not alredy present on their list is making a lot of noise on twitter. On the other hand, by 3 pm every day , the executive team is interested in the list of profiles whose tweets have generated the most reactions and what have they said. They are also interested in keeping historical most prolific tweets so those tweets can be easily searchable in the future.
To mesure the performance of a tweet, informations like number of replies, number of retweet, number of likes are used to compute the weight of the tweet
Therefore, the company is interessed in deploying a big data solution on premise. One of their critical requirement is that de architecture should be flexible and allow quick redeployement without compromising the ovarall system. *************************
  The first step for handling this simple project should consists of designing the architecture of the solution. The steps involved in designing a big data architecture is beyond the scope of this article. So we consider that the architecture is already designed ( see Figure 2 Bellow), and we will focus on implementing the deployment process
Figure 2 : The selected Architecture
Now that we know from Figure 2 the list of tools and technologies selected for the architecture ,we can start desiging the deployment process.
2.1 Identify the deployable items
Table 2 bellow contains a list of the posible deployable items:
Tool | Deployable Items | Description | Layer |
---|---|---|---|
Python, Tweepy | GetDataFromTwitter.py | Python script for requesting tweets from Twitter Api and save them in hdfs | Ingestion |
Python, Spark | MoveDataFromHdfsToElasticsearch.py | A PySpark Script for extracting key information from raw data available in hdfs and save them in elasticsearch for historical search |
Transformation |
Scala, Spark | TweetPerformanceMotor.jar | A Scala Spark Motor for classifiying the most prolific tweets of the day . Transform raw data from hdfs, perform classification and publish the result to elasticsearch ,allowing building dashboard on kibana |
Transformation |
Elasticsearch | elasticsearch.yml, jvm.options, log4j2.properties | Elasticsearch will be used to index both the everyday most interesting tweets and the historical tweets for future seach |
Storage |
Apache Spark | Cloudera manager will be used to configure both Spark and HDFS so nothing will be deployed here |
Transformation | |
Hadoop HDFS | Cloudera manager will be used to configure both Spark and HDFS so nothing will be deployed here |
Storage | |
Kibana | kibana.yml | Kibana will be used to build dashboard for viewing the list of the most interesting tweets, but also to seach certain tweets in elasticsearch. |
Visualization |
Oozie | job.properties, workflow.xml, coordinator.xml | Oozie will be used to schedule execution of the ingestion script and each Spark transformation motor |
Transformation |
TwitterProfiles.txt | This file will contains the list of twitter profiles whose tweets will be gathered and ingested. this list will change almost every day or week. And each change to this file will requires its redeployment |
Table 2 : List deployable items for the use case Some of the listed items in the previous table will be deployed once , other are even environment independent and those file do not need to be deployed.
2.2 environments variables
The possibles environment variable for this project might be the following :
environment_name=production java_home=/usr/java/default elasticsearch_conf_dir=/etc/elasticsearch kibana_conf_dir= ingest_script_dir=/ingest transform_script_dir=/transform oozie_server=http://oozie_server_prod/ oozie_port=11000 oozie_workflow_dir=/oozie/workflow oozie_job_properties_dir=/oozie/job ....
2.3 Software packaging and publication tool
Apache Maven has been selected as packaging and publication tool.
Maven has an Assembly Plugin which allows developers to
design how the artifact
should be assembled. Maven also allow the assembled artifact to be plublished to a local or remote artifact repository.
For this use case, the assembly descriptor file has been designed to generate a tar.gz artifact , which contains the following directory stucture
+---conf | TwitterProfiles.txt | +---elasticsearch | elasticsearch.yml | jvm.options | log4j.properties | +---ingest | GetDataFromTwitter.py | +---kibana | kibana.yml | +---oozie | job.properties | workflow.xml | \---transform MoveDataFromHdfsToElasticsearch.py TweetPerformanceMotor.jar
2.4 Control version repository
A GitHub repository has been generated as the software control v
ersions repository.
2.5 Artifact repository
Apache Nexus has been selected to setup the artifact repository
2.6 Deploymenent automation tools
A Jenkins pipeline has been designed to get the corresponding tar.gz artifact version from
Apache Nexus , decompress the tgz file, and run an instalation script, while
replacing the environment variables with those available for the corresponding environment. In other words , the installation script need to pull the right values
of the environment variables from the git repository, before running the actual installation process
The complete deployment flow diagram for this use case can be find on Figure 3 below.
Figure 3 : The deployment flow diagram
The complete maven source code for this simple use case is available
on my GitHub repository. Pay a special atention to the files
Jenkinsfile src/assembly/assembly.xml pom.xml
As these files are the most important for the publish and deployment process.
In in our deployment diagram we have omitted important pharses as Unit testing and sonarqube code evaluation . Those pharses are imprescindibles for a real world
use case.
3- Conclusion
The purpose of this article was to provide a simple guide to design a deploymenent process of a big data solution. The guide provided here is suitable for projects that will be deployed on premise.
MIGRATING FROM A RELATIONAL DATABASE TO A Neo4j GRAPH DATABASE
Welcome to this article on how to migrate from a relational database to neo4j.
Through this article, following topics will be covered:
1 -What is Neo4j
Neo4j is a graph database.
This simple definition of neo4j raises another important question : what is a graph database?
According to [1], a graph database management system is an online database management system with Create, Read, Update, and Delete (CRUD) methods that expose a graph data model. Graph databases are generally built for use with transactional (OLTP) systems.
This concept of graph database is inspired from the graph theory , which defines a graph ( in the context of graph theory ) as a collection of vertices (or nodes ) connected by edges( or links ). Figure 1, below shows an example of graph in the context of graph theory
Figure 1 : Example of graph in the context of graph theory
If we bring this definition of graph in the field of database systems, we can simply say that a graph database is system in which the data model is formed by nodes that correspond to the actual data or entities, and edges or links corresponding to relationships between those entities.
Therefore, Neo4j is an implementation of graph database management management system. It provides a set of tools for building a graph data model and allows querying them.
Of course, one could ask the question, why use neo4j or graph database?. Besides, the title of this article wasn’t how to migrate a relational database to neo4j ?. In order to answer these questions, it is worth describing the reason to be of graph databases and then clarify situations in which it might be recommendable to migrate a relational database to a graph database.
2 – From relational to graph
Graph databases cannot be considerated as a replacement to tradictional relational databases. In fact, many companies today still use relational database to store their everyday operational data and it works very well. However, as entities become more and more linked by some sort of relationship , graph data can become the best candidate.
Highly connected data are very dificult to be handled by relational databases because they require relationship between entities to be modeled as joins tables. But at query time, joins are very expensive to be calculated and can represent a bottleneck if querying large number of them.
Moreover, a large number of joins tables in a relational database makes it dificult to evolve the data system in order to meet new business requirements, due to their dependance on a rigid schema
Therefore in order to stay up to date, in the current context where data are more and more connected, especially with the expansion of the web and social networks, many companies are finding themselves in the obligation to migrate their current data systems to new data systems capable of handling a large volume of highly connected data. For this purpose, graph databases are game changers, since they have been designed to handle connected so that data analyst can navigate through their complete data only by following relationship between them. Graph database enables business holder to easily and quickly answer questions like : which product can interest this customer ? how customer A buying habit is related to that of customer B ? what product should I recommend the this customer based on products clicks and visited pages?
Those questions might seam to be very easy to answer but when it comes to query the results from a relational database , this could become a nightmare for data analysts
In this article, we describe a simple process to migrate data from a relational database to a Graph database under Neo4j based on a practical use case
3 -A simple scenario
Now that we know when migrating from relational database to a graph database can be recommendable, let’s see a simple scenario of migrating from relational to graph.
Imagine a data as a service company which hosts historical data about Football. Their client are mainly bet companies, football magazines and all kind of media companies.Up to now, they have been pouring their data into an oracle database. Clients access data by filling a search form on a web interface. For the purpose of this tutorial, we consider that the database contains information about players, information about teams and information about Trophies (or League) won by teams. Four Leagues have been chosen for this example : Champions League, Ligue 1, La Liga and Serie A.
Clients often ask simple questions like: Which players have won the most trophies in their career? or more complex , like : Which players have won two different trophies together with different teams?
From information given in the previews paragraph, it results that the main entities are : Player,Team and Trophie( or League).
For deriving data model for this use case, let’s consider following assumptions:
– A player have been playing in 1 or several teams in his career.
– A team has been formed by one or several players in his history
– A team has won 0 or several league or trophie in his history
– A title or trophie have been acquired by one or several team (but only one team winning the trophie each year)
The logical model of our relational database is shown on Figure 2
Figure 2 : Logical model of the relational database
On the previous Image, we have 2 join tables that have been used to model the many-to-many relationships that exist both between player – team entities and team – trophie entities
This use case shows an example of highly conected entities and an attempt to answer complex question from client will requires running a query with multiple joins.
Imagine for example that client wants to list all couple of players who won two different trophies together with two defferent teams
An sql query for this case might looks like :
SELECT p1.name , p2.name , t1.name , tr1.name , t2.name, tr1.year, tr2.year FROM PLAYER p1 JOIN PLAYER_TEAM pt1 on p1.ID = pt1.playerid JOIN PLAYER_TEAM pt2 on pt2.teamid = pt1.teamid JOIN PLAYER p2 on p2.id = pt2.playerid JOIN PLAYER_TEAM pt3 on pt3.playerid = p1.id JOIN PLAYER_TEAM pt4 on pt4.playerID = p2.id JOIN TEAM t1 on ( t1.id = pt1.teamid and t1.id = pt2.teamid ) JOIN TEAM t2 on (t2.id = pt3.teamid and t2.id = pt4.teamid) JOIN TEAM_TROPHIE t_t1 on t1.id = t_t1.teamid JOIN TEAM_TROPHIE t_t2 on t2.id = t_t2.teamid JOIN TEAM_TROPHIE t_t3 on t1.id = t_t3.teamid JOIN TEAM_TROPHIE t_t4 on t2.id = t_t4.teamid JOIN TROPHIE tr1 on (tr1.id = t_t1.trophieid and tr1.id = t_t2.trophieid) JOIN TROPHIE tr2 on (tr2.id = t_t3.trophieid and tr1.id = t_t4.trophieid) WHERE (p1.id p2.id and t1.id t2.id and tr1.idtr2.id )
Even if some sql expert tries to optimize the previous query for performance optimization , it will still end up having a lot of JOIN clauses.
As the volume of data in this database becomes larger , the clients will start experiencing unacceptable lags between the time they submit the query and the time
they receive the results.
For that reason, the company has decided to migrate to a graph database and they have decided to use Neo4j .
The first step in the migration process should consists of converting the relational data model to a graph data model.
As mentioned in [4], following conventions are to be considered when converting a relational data model into a neo4j data model :
– each row in the relational database becomes a node in the graph
– each table name becomes a node label
– each column name becomes a property of the corresponding Node ( or relationship)
– each join table or foreign key becomes a relationalship
From the previous assumptions we can derive the graph data model drawn on Figure 3 below
Figure 3 : Translated graph model
The model shown on figure 3 is very easy to read . The join tables PLAYER_TEAM and TEAM_TROPHIE from the relational model have been converted to simple unidirectional relationships ( PLAYED_WITH and HAS_WON). When creating concretes nodes and relationship in neo4j, each node will have a label corresponding to the name of the table in the relational database. A node will also carry each property which correspond to column names from the relational database. Each relationship will also carry a single property corresponding to the year the events PLAYED_WITH and HAS_WON actually occured
After drawing the graph model on the whiteboard , next step should consists of exporting data from the RDBMS system to graph system
There are many ways to move data from relational system to graph. Most of them are described here: relational-to-graph-import
The easiest aproach consists of:
– exporting each table data into a csv file
– copying and pasting the exported csv files into the import folder of your Neo4j
database instance
– execute the neo4j load csv Cypher query to load de csv files and create nodes and relationships.
We have used Neo4j Desktop to create a local Neo4j database instance.
From oracle sql developer, we have exported each table data into csv.
The csv data can be downlodaded here :
– Data of Players Table : allplayers.csv
– Data of Teams Table : allteams.csv
– Data of Trophies Table : Trophies.csv
– Data of Player-Team join Table : players_id_teams.csv
– Data of Team-Trophie join Table : teams_id_trophies.csv
Those csv files have been copied and pasted into the import directory of our local Desktop Neo4j instance.
After that, following Cypher queries have been executed in the Neo4j Browser to create Nodes and Relationships for a our graph database
LOAD CSV WITH HEADERS FROM "file:///allplayers.csv" AS csvLine CREATE (p:Player {id: toInteger(csvLine.id), name: csvLine.player_name}) LOAD CSV WITH HEADERS FROM "file:///allteams.csv" AS csvLine CREATE (p:Team {id: toInteger(csvLine.id), name: csvLine.team_name}) LOAD CSV WITH HEADERS FROM "file:///Trophies.csv" AS csvLine CREATE (p:League {id: toInteger(csvLine.id), name: csvLine.trophie}) USING PERIODIC COMMIT 500 LOAD CSV WITH HEADERS FROM "file:///players_id_teams.csv" AS csvLine MATCH (p:Player {id: toInteger(csvLine.player_id)}), (t:Team {id: toInteger(csvLine.team_id)}) CREATE (p)-[:PLAYED_WITH {season: csvLine.season}]->(t) USING PERIODIC COMMIT 500 LOAD CSV WITH HEADERS FROM "file:///teams_id_trophies.csv" AS csvLine MATCH (t:Team {id: toInteger(csvLine.team_id)}), (tr:League {id: toInteger(csvLine.trophie_id)}) CREATE (t)-[:HAS_WON {season: csvLine.season}]->(tr)
With all this done , our graph database contained all nodes and relationships required to satisfy clients expectations.
Now imagine for example that a client need to know all players and the team who won the champions league in the season 2017_2018.
The Cypher query to answer that question is pretty much easy to write :
MATCH (p)-[r:PLAYED_WITH]->(t)-[r1:HAS_WON]->(l) where r.season='2017_2018' and r1.season='2017_2018' and l.id = 0 return p,t,l
an the result in the Neo4j Desktop user interface is very easy to interpret as it appears in Figure 4 below
Figure 4 : Example of Graph Cypher query result in Neo4j
The above graph result shows that Real Madrid won the Champions league in the 2017/18 season, and a list of 26 player were in the squad that same year. Each player is related to the team with a PLAYED_WITH relationship, and the relationalship itself carries a season property
This is a simple illustration of the power of graph database , the queries are simple to write, the result are simple to read and understand , complex quieries run very faster compared to relational database query especially when the it involve many JOIN close on very lage data.
REFERENCES
[1]: I.Robinson, J. Webber, E. Eifrem (2013). Graph Databases
[2]: https://en.wikipedia.org/wiki/Graph_theory
[3]: Batra S., Tyagi C. (2012). Comparative analysis of relational and graph databases. International Journal of Soft Computing and Engineering (IJSCE), 2(2), 509-512.
[4]: https://neo4j.com/developer/guide-importing-data-and-etl/
DESIGNING A DATA-LAKE FROM A-2-Z PART-3
Welcome to this third article on building a data lake from A to Z.
In the previous article , we have described the proposed architecture designed to fulfill the client requirements. We have also listed the softwares used to simulate our data sources. In this article following topics will be discussed:
1- Ingesting historical sales data
2- Ingesting online compotitors prices
1- Ingesting historical sales data
For this article, we have prepared to zip file of historical sales.
These data are hosted on a simple apache server
The ingestion pipeline for these historical data will consist of a simple shell script that execute wget to download the files, unzip them and uploads the resulted csv file to hdfs. Here is the code performing this task
wget -P /home/oracle/uploads/ -r -np -l 1 -A zip http://www.leshx.com/blogs/data/ allZip=`ls /home/oracle/uploads/www.leshx.com/blogs/data/` mkdir /home/oracle/uploads/www.leshx.com/blogs/data/entities/ for zipFile in $allZip; do unzip /home/oracle/uploads/www.leshx.com/blogs/data/$zipFile -d /home/oracle/uploads/www.leshx.com/blogs/data/entities/ done hdfs dfs -copyFromLocal /home/oracle/uploads/www.leshx.com/blogs/data/entities/*.csv /data/raw/sales/
After running this scripts, the result can be observed in our data in the hdfs ( see Figure 1 bellow)
Figure 1 : Sales Data
2- Ingesting online compotitors prices
One of the client requirements was being able to constantly pull data of current prices from compotitors so they can adjust their prices to avoid clients losses. This requirement is taken into account in our proposed architecture. In order to simulate competitors prices, we have developed a simple REST API using play framework. The prices cant be request by making a get request to the url http://bigdatalite:11000/prices . The result will consist of a json , containing the lists of prices of different items from various competitors.
For ingesting those data to our data lake , we have prepared shell script that perform a curl to the api, then the script have been programmed to be executed each hour using crontab. the response from the call to the api is saved in folder of our edge node as a json file. We have confugured a flume agent to listen to new json file and upload new files directly to the hdfs.
The simple shell script that get prices is the following :
now=`date +'%Y%m%d_%H%M%S'` curl http://bigdatalite:11000/prices > /home/oracle/uploads/prices/prices_"$now".json
As it appears in the previous code , we make simple client url call to the api , whitch is running on port 11000 ( in our case ) and the result is saved localy in a file with the timestamp.
On cloudera Manager ( running on port 8888) we have configured a flume agent which detects new file in the directory /home/oracle/uploads/prices and uploads
the new created file to hdfs. Here is the flume agent configuration we have used :
Agent1.sources = spooldirsource Agent1.sinks =hdfs_sink Agent1.channels = Mchannel Agent1.sources.spooldirsource.type = spooldir Agent1.sources.spooldirsource.spoolDir =/home/oracle/uploads/prices Agent1.sources.spooldirsource.fileHeader=false #defining sink Agent1.sinks.hdfs_sink.type = hdfs Agent1.sinks.hdfs_sink.hdfs.path =hdfs://bigdatalite.localdomain:8020/data/raw/prices #defining channel Agent1.channels.Mchannel.type = memory Agent1.channels.Mchannel.capacity = 10000 #Binding channel between source and sink Agent1.sources.spooldirsource.channels = Mchannel Agent1.sinks.hdfs_sink.channel = Mchannel
We have also confugured a crontab job to run each minutes ( for demonstration ) and the job execute the previous shell script.
The crontab script is the following :
* * * * * /home/oracle/ingest/ingest_prices.sh
To show that everything works find, if we list the files availaible in our local prices directory, we get the following result :
-rwxrwxrwx. 1 oracle oinstall 143109 Jul 31 07:39 prices_20200731_073901.json.COMPLETED -rwxrwxrwx. 1 oracle oinstall 143759 Jul 31 07:40 prices_20200731_074001.json.COMPLETED -rwxrwxrwx. 1 oracle oinstall 143619 Jul 31 07:41 prices_20200731_074102.json.COMPLETED -rwxrwxrwx. 1 oracle oinstall 143639 Jul 31 07:42 prices_20200731_074201.json.COMPLETED -rwxrwxrwx. 1 oracle oinstall 143649 Jul 31 07:43 prices_20200731_074301.json.COMPLETED
All files marked with .COMPLETED have been already treated by flume and have been uploaded to the hdfs. If we browse our prices folder in our data lake ( from hue) we can see that flume is constantly ingesting prices data as soon as they are queried from the rest api (see Figure 2 below)
Figure 2 : Price data ingested with flume
3- Data our data lake
To sum all we have been doing until now, we have implemented an architecture allowing us to capture and maintain both bulk sales data and online competitors prices as well.
For the staorage layer we have selected a cloudera distribution of hadoop.
So we have created layer called “raw” in the hdfs . This layer is the landing layer , where all raw data from source are poured. This layer contain subdirectories for each type
of source. In our case, we have created de following directory structure in the hdfs:
/data/ /data/raw/ /data/raw/sales /data/raw/prices
The /data directory should contain all data for. The /data/raw directory is our landing layer . This layer contains 2 subdirectories : sales and prices that should hold both sales data and competitors price data . If we were to ingest a new type of data from a different source, another directory for that new source have to be created as a subdirectory of /data/raw . This way, we have a repository for ingesting data from various source and storing them for future transformation( a data lake )
4- Next steps
Now that our data lake is configured and some data have been ingested in there , we have already acomplished the “E” (extract ) and the “L” ( load) processes. The next step
here is to transform (“T”) those data for value extraction. Generally, in companies , Data Scientists are responsible of writting algorithms for transforming raw data available in the landing
layer so that the resulted output can be analysed by data analyst or visualized in Business Intelligence tools. This transformation usualy requires more data layers apart from the raw layer.
In this article we have created two layers silver and gold.
The silver layer is intended to hold pre-processed data. This data use to be the result of data validation algorithms . Here data use to be lowly aggregated as well .
The gold layer is intended to hold business oriented data for specific departments. It can be seen as a layer of data-mart , where data coming from the
silver layer is more transformed and aggregated in order the serve the needs of a particular area of the business.
Several tools for data transformation are available out there. In the hadoop ecosystem,Spark is commonly used to transform raw data , and provided queryable data organized in hive (and impala if working
with cloudera ) tables.
DESIGNING A DATA-LAKE FROM A-2-Z PART-2
Welcome to this second article on building a data lake from A to Z.
In the previous article , we have defined the concept of data lake, we have shown the key differences between data lakes and data warehouses and we he presented a real world use case which will be used for our designs.
For this article , the following topics will be covered
3- Tools and Development environment
1- Description of data source
From the previous article, the use case requirement described 2 type of data source :
– last month sales data : zip file containing csv files of each days sales and stored on each server
– the near real time price information from competitors : these data are aquired from a web rest api ( fictitious) and stored as json files.
As also mentioned in the previous article , the store sales following items : sweaters, tracksuits, shorts and socks.
Each line in a csv file containing the following informations :
sale_date;server_name;item_name;item_brand;item_color;item_size;item_price
where “item_name” has one of the following values : sweater, tracksuit, short and socks
the sale_date format is yyyyDDmm
for example , a line should look like this :
20200218;server1;sweater;adidas;black;L;40€
Regarding prices informations from competitors, these data are captured on hour basis from a public REST API and ingested json format.
Each get request sends no parameter to the
Web api , and the response is a json containing a list of prices and competitor_name of every items. A reponse from a get query might look like this :
[{"timestamp":"20200707_182611","name":"c00","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c01","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c02","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c03","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c04","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c05","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c06","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c07","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}},
{"timestamp":"20200707_182611","name":"c08","itemb":{"name":"short","brand":"nike","color":"black","size":"M","price":34.33}}]
2- The proposed architecture
To meet the client requirements , the proposed architecture should be able to handle both streaming and batch data ingestion. For that purpose , we have designed the architecture displayed in Figure 1 below.
Figure 1 : The Proposed Architecture
As observed on figure 1, the architecture include an edge node from wich all ingestion scripts will be executed. Zip sales files are gathered from servers, then are unziped locally on the edge node and uploded to hdfs , in the landing layer.
Online competitor prices are requested from the egde node using “curl” command , and the results are saved in a local file on the edge node. A flume agent constantly detect new data added to the result directory and the captured result is uploaded to the landing layer of the hdfs
Thus, the “landing” layer of the hdfs is the core of our data lake as it is where all data from various sources are stored as they come from the source. Later they can be transformed and moved the other layers like “silver” layer and “gold” layer using tool like Spark, in other to provide cleanned data to data analysts.
3- Tools and Development environment
As mentioned in part 2 of this article, the data storage of our data lake will be based on hdfs. We will be using a cloudera distribution of hadood to build a one node hadoop cluster.
Cloudera have now migrated to CDP ( Cloudera Data Platform), and they provide a free trial CDP version for testing purpose . This free trial CDP version can be downloaded here : CDP DATA CENTER TRIAL.But in this project, we have been using the Oracle Big Data Lite Virtual Machine, which is an Oracle appliance. It comes with a wide ranges of oracle big data services. It also include Cloudera manager . The v4.11 can be downloaded here Oracle Big Data Lite Virtual Machine V 4.11
In order to simulate competitors and historical sales prices, we have developed two pieces of software. A simple REST API based on play framework and Scala laguage have been developed
for continously tracking the compotitor prices. The source code can be downloaded the GitHub repo price_simulator_api.
We have already a zip containing the executable of this software and it is available for download here prices-simulator-api-1.0-SNAPSHOT.zip.
The required steps for its usage include downloading the zip file, unzip,
navigate to the conf folder, open the file application.conf and modify it as follows:
locate the line with play.http.secret.key = “xxxxx” and change the value of the secret to one of your choice.
Locate the allowed host filter and set it as follows:
hosts { # Allow requests to example.com, its subdomains, and localhost:9000. allowed = ["bigdatalite:1100", "localhost:11000"] }
then navigate to the bin folder and execute startup script as follows :
/path_to/prices-simulator-api/bin/prices-simulator-api -Dhttp.port=port
replacing the parameters path_to and port with your path value and the your disired port number ( the default port is 9000). use the .bat script if you are on windows
Another piece of sotware have been developed for simulating the historical sales csv files of a month. This is a simple scala applications which randomly choose a month of the year 2020, and build fictitious sales csv files for each days of the selected month. The source code of this simple scala app can be found on the following github repo price-simulator-csv. Some sample sales data generated buy this app can be downloaded Here
Now that we have described the proposed architecture, the data sources and listed the softwares used to simulate our data generation, we can start building our ingestion pipeline and start pouring some data to our data lake. This will be developed in the next article
DESIGNING A DATA-LAKE FROM A-2-Z PART-1
In this serie of articles, the concept behind data-lake is described and en simple Proof-of-Concept ( PoC) is also implemented
The following topics will be covered within article:
2 -Data lake vs Data warehouse
1 -What is a data lake?
According to wikipedia , A data lake is a system or repository of data stored in its natural/raw format,usually object blobs or files. At its core, a data lake is a central location in which to store all your data, regardless of its source or format [2].It refers to a massively scalable storage repository that holds a vast amount of raw data in its native format («as is») until it is needed plus processing systems (engine) that can ingest data without compromising the data structure[3] . In other words , a data lake can be seen as a data storage system, capable of storing huge volume of data comming quickly from various sources for future analysis. Data ingested in data lake can be structured or unstructured and all data type is accepted, this make data lakes powerful alternative to the challenges presented by data integration in a traditional Data Warehouse[2].
Companies can benefit from data lakes both at the Business level as at the technonolies level. Figure 1 below , from [4] displays some general benefits of data lakes. For more details about each individual benefit, click on https://ovaledge.com/benefits-data-lake/
Figure 1 : benefits of data lakes
In order to clarify more the concept of data lakes, it might be interesting to compare them with other traditional types of data repository generally encountred in companies. A famous central reposotry of integrated data, historically used in the industry is entreprise data warehouse (EDW). In the next section we will describe the main differences between EDW and data lakes .
2 -Data lake vs Data warehouse
Some diferences between data lakes and entreprise data warehouses are mainly related to how data are ingested in the repository , how insight are derived from them and who generally consult them. Data warehouses are used for analyzing archived structured data, while data lakes are used to store big data of all structures. Data warehouse requires applying certain schema to raw data prior their ingestion. This concept known as schema on write makes it difficult to insert new data in EDW as a lot of effort use to be required on giving format to those data before ingenstion , Moreover the word “ETL” ( Extract-Transform-Load) fits very well in EDW world as data sis first extracted from the source, then transformed by applying de required EDW schema and loaded in the repository . In contrast , data lakes allow ingesting data as it comes from the source , no schema need to be applied and data can come from various sources, this case correspond to the ELT pattern (Extract-Load-Transform) as that extracted from the source is directly laoded in the repository for future transformation using Spark or Machine Learning algorithms. Leveraging the EDW for heavy data transformation can have unintended consequences including greater costs and complexity, as well as processing bottlenecks and missed SLAs that cause business users to wait days, weeks, or even months for the reports they need [5].Data lakes handle very well big data and complexe data transformation at very low cost.
Table 1 bellow , from data-lake-vs-data-warehouse [6] describes in more details the main differences between data lakes and data-warehouse.
Table 1 : data lake vs data warehouse [6]
After defining the concept of data lakes, let’s jump into more practical stuff, consisting of a PoC based on a real world example. Before doing so, let’s describe first our real world use case that will be used through our design steps.
3 -A real world scenario
MainSport is an online sportswear store. The store sells clothes from 3 different brands, adidas, nike and reebok. The following clothing categories are available on the web: Sweaters, tracksuits, shorts, and socks. Each item available in the store is identifiable by its color, type , size,brand and price. The store website run on mutiple servers. Each server keep a zip file of csv containing the sales data made on that server during the current month At the end of each month, the owners are interested in bringing all the sales data in a central repository for deep analizis and pattern extraction. Also, throughout the current year, they want to be able to adjust the prices of their items based on the cost of the same items in competitors stores to minimize their customer loss.
To do so, they need a platform capable of:
1 – ingesting and storing all sales data. These data consist of thousand of csv file , compressed and stored on various servers
2 – continuously ingest prices information from competitors . This data is obtained in json formats from a public REST API.
This way, data scientists will be able to write some price and stock recommendation algorithms using historical sales data and current market prices from competitors.
This is the end of the first article. In the next part will jump into more technical aspect. We will start designing the architecture of our data lake which meet the requirements described by our client MainSport
References
1 – https://en.wikipedia.org/wiki/Data_lake
2 – Architecting data lakes, Ben sharma
3 – Big Data, Fast Data and Data Lake Concepts, Natalia Miloslavskaya and Alexander Tolstoy
4 – https://ovaledge.com/benefits-data-lake/
5 – https://www.syncsort.com/en/glossary/enterprise-data-warehouse
6 – https://www.guru99.com/data-lake-vs-data-warehouse.html