This is an example of using Akka Clustering to perform log distribution
between distributed servers.
I was hitching to test the clustering mode in Akka and wanted to find
a "real world" example that wasn't too big either. That's when I
stumbled over this technical assignment and I thought it would be
a fun week end project.
Checkout the annotated code.
While the code works, I am still working on the literal description
of what's going on.
You have a number of web servers (let's assume three) that receive
load balanced traffic for your web site. On each server, you log
each connection, including a cookie containing a userid. The logs you
collect are ordered by time.
Write a program that re-groups all the connection logs for one user on
the same server. You can place the grouped output on any one server or
on many servers, as long as all the log entries of one user are
together on the same physical server.
Example of a log file:
18.104.22.168 - - [15/Aug/2013:13:54:38 -0300] "GET /meme.jpg HTTP/1.1" 200 2148 "-" "userid=5352b590-05ac-11e3-9923-c3e7d8408f3a"
22.214.171.124 - - [15/Aug/2013:13:54:38 -0300] "GET /lolcats.jpg HTTP/1.1" 200 5143 "-" "userid=f85f124a-05cd-11e3-8a11-a8206608c529"
126.96.36.199 - - [15/Aug/2013:13:57:48 -0300] "GET /lolcats.jpg HTTP/1.1" 200 5143 "-" "userid=5352b590-05ac-11e3-9923-c3e7d8408f3a"
Parsing wasn't the point of this exercise, the distribution of the file
is the interesting part. While this could be accomplished with hadoop,
I wanted to play with a lighter setup.
The idea behind my approach is to use Akka+scala clustering to
distribute the logs between the servers. The code is extensively
documented, but please let me know if you have questions.
Once you are done reading this, you might want to start exploring from
the main class: LogSplitApp.
Akka deals with the communication between servers and once all nodes
are registered on the cluster, the actors talk to each other
transparently, without having to know if an actor is on a remote
server or in the same JVM.
The data flow is divided in two main steps:
The distribution of work assumes the following:
This is a common pattern in log systems and the assumptions help in
distributing work to separate workers (by just giving different files
to the workers). If we were dealing with a big giant file, we could
always chunk it between actors by use of nio channels.
(note that the descending/ascending order of logs in each file is
not very important, as long as they are sorted in one way or the other)
The data distribution step relies on two main type of actors:
The reader and writer do not need to know on which node the actor
they are talking to are located. They could be on the local node or
the remote nodes.
Each reader is assigned one writer per node (including the local
node) and a set of local part files to read from.
Each writer can deal with multiple readers and usually manages a fixed
set of parts/bucket.
The reader is mostly passive, it will fill up a buffer and then wait
for the writers to pull log lines. Writers are either blocked writing
to the local disk or pulling work from the assigned readers.
Because all part files are sorted in the original log files, when we
send them over to a single server, we keep them in separate sorted
part files for one user. At the end of the data distribution step, the
output directory will contain something like:
where we have for each userid a set of part files per server (first
id) and per bucket from that server.
The sort step is responsible to merge the partial sorted files in one
single file for each user. The sorter actor doesn't need to know it's
running in a cluster and is only responsible for distributing parallel
sorting tasks to local actors.
This is an sbt/scala project, so you will need java, scala and sbt
you can build a package jar with
sbt clean assembly
you can also run the following commands directly with sbt 'run-main...'
without running the assembly.
You can generate a sample of log files with the utils.LogGenerator.
> java -cp LogSplit-assembly-0.1-SNAPSHOT.jar net.pierreandrews.utils.LogGenerator --output /tmp/logsplittest/servers --linePerFile 10000 --numUsers 1000 --numFiles 50
This will generate three folders in /tmp/logsplittest/servers with 50
files each containing 10000 lines. The userids will be selected from a
random pool of 1000 users.
You can then either transfer each serverN file to a separate machine,
or just run three separate JVMs pointing to these separate folders.
The app is currently configured to run with three separate JVMs on the
same machine (localhost) on the ports 2550, 2551 and 2552. You can
start the three JVMs with:
> java -cp LogSplit-assembly-0.1-SNAPSHOT.jar net.pierreandrews.LogSplitApp --port 2550 --output /tmp/logsplittest/outputs/server0 --input /tmp/logsplittest/servers/server0 --serverID 0
> java -cp LogSplit-assembly-0.1-SNAPSHOT.jar net.pierreandrews.LogSplitApp --port 2551 --output /tmp/logsplittest/outputs/server1 --input /tmp/logsplittest/servers/server1 --serverID 1
> java -cp LogSplit-assembly-0.1-SNAPSHOT.jar net.pierreandrews.LogSplitApp --port 2552 --output /tmp/logsplittest/outputs/server2 --input /tmp/logsplittest/servers/server2 --serverID 2
If you are going to run each JVM on separate machines, you need to
change the seeds, either with the --seeds arguments, e.g. for one
> java -cp LogSplit-assembly-0.1-SNAPSHOT.jar net.pierreandrews.LogSplitApp --port 2550 --output /tmp/logsplittest/outputs/server0 --input /tmp/logsplittest/servers/server0 --serverID 0 --seeds 192.168.1.21:2550,192.168.1.23:2551,192.168.1.1:2550
or by updating the application.conf settings. Given the limited time
and resources I had, I couldn't test this extensively on distributed
servers but it should work transparently.
You can tune the number of workers, cache/buffer sizes, etc. by
changing the command line arguments. Please see LogsplitAppArgs for
documentation. The current settings work "OK" on my laptop, but are
The major assumption is that user-ids are evenly distributed between
each server, that is, there no one server where a id would appear a
lot more than another server. We assume some fair load-balancing
between servers etc. that would create such balanced logs.
This assumption is important as the architecture of the system evenly
distributes each users logs to each server. That means that with three
servers, around 2/3 of the logs of a node will be transferred over to
the other nodes in the cluster. If users are evenly distributed, this
is as good a solution as any; however, if logs are unbalanced to start
with the current data flow will create more data transfers than
If we were to deal with unbalanced logs, it would be better to split
logs per user on each server first, then negotiate between each node
which one contains the most logs for each user and send the data over
from the smallest nodes to the bigger node.
Running this on an hadoop or spark cluster would definitely require a
lot less code scaffolding, even if it would require a larger server
architecture. Apache Samza (LinkedIn) also seemed to be a good
solution but required to setup kafka, zookeeper and YARN, which was
quite some overhead.