I’ve decided to check out
Impala
last week and here’s some notes on how that went.
First thoughts
I was very impressed with how easy it was to install, even considering
our unusual set up (see below). In my simple ad-hoc tests Impala
performed orders of magnitude faster than Hive. So far it seems solid
down to the little details, like the shell prompt with a fully
functional libreadline and column headers nicely formatted.
Installing
The first problem I encountered was that we use Cloudera
tarballs
in our set up, but Impala is only available as a package (RPM in our
case). I tried compiling it from
source, but it’s not a trivial
compile - it requires LLVM (which is way cool,
BTW) and has a bunch of dependencies, it didn’t work out-of-the-box
for me so I’ve decided to take an alternative route (I will definitely get it compiled some weekend soon).
Retreiving contents of an RPM is trivial (because it’s really a cpio
archive), and then I’d just have to “make it work”.
I noticed that usr/bin/impalad is a shell script, and it appears to
rely on a few environment vars for configuration, so I created a shell
script that sets them which looks approximately like this:
123456789101112131415161718
export JAVA_HOME=/usr/java/default
export IMPALA_LOG_DIR=# your log direxport IMPALA_STATE_STORE_PORT=24000
export IMPALA_STATE_STORE_HOST=# probably namenode host or whateverexport IMPALA_BACKEND_PORT=22000
export IMPALA_HOME=# full path to usr/lib/impala from the RPM, e.g. /home/grisha/impala/usr/lib/impalaexport IMPALA_CONF_DIR=# config dir, e.g. /home/grisha/impala/etc/impala"export IMPALA_BIN=${IMPALA_HOME}/sbin-retail
export LIBHDFS_OPTS=-Djava.library.path=${IMPALA_HOME}/lib
export MYSQL_CONNECTOR_JAR=# full path a mysql-connect jarexport HIVE_HOME=# your hive home - note: every impala nodes needs it, just config, not the whole Hive installexport HIVE_CONF_DIR=# this seems redundant, my guess HIVE_HOME is enough, but whateverexport HADOOP_CONF_DIR=# path the hadoop config, the dir that has hdfs-site.xml, etc.export IMPALA_STATE_STORE_ARGS=" -log_dir=${IMPALA_LOG_DIR} -state_store_port=${IMPALA_STATE_STORE_PORT}"export IMPALA_SERVER_ARGS=" \ -log_dir=${IMPALA_LOG_DIR} \ -state_store_port=${IMPALA_STATE_STORE_PORT} \ -use_statestore \ -state_store_host=${IMPALA_STATE_STORE_HOST} \ -be_port=${IMPALA_BACKEND_PORT}"
With the above environment vars set, starting Impala should amount to
the following (you probably want to run those in separate windows, also note that
the state store needs to be started first):
12
$ ./usr/bin/statestored ${IMPALA_STATE_STORE_ARGS}# do this on IMPALA_STATE_STORE_HOST only$ ./usr/bin/impalad ${IMPALA_SERVER_ARGS}# do this on every node
The only problem that I encountered was that Impala needed
short-circuit access enabled, so I had to add the following to the hdfs-site.xml:
123456789101112131415161718
<property><name>dfs.client.read.shortcircuit</name><value>true</value></property><property><name>dfs.domain.socket.path</name><!-- adjust this to your set up: --><value>/var/run/dfs_domain_socket_PORT.sock</value></property><property><name>dfs.client.file-block-storage-locations.timeout</name><value>3000</value></property><property><!-- adjust this too: --><name>dfs.block.local-path-access.user</name><value><!-- user name --></value></property>
Once the above works, we need impala-shell to test it. Again, I pulled it out of the RPM:
I was then able to start the shell and connect. You can connect to any Impala node (read the docs):
12345678910111213
$ ./usr/bin/impala-shell
[localhost:21000] > connect some_node;
Connected to some_node:21000
Server version: impalad version 1.0 RELEASE (build d1bf0d1dac339af3692ffa17a5e3fdae0aed751f)
[some_node:21000] > select count(*) from your_favorite_table;
Query: select count(*) from your_favorite_table
Query finished, fetching results ...
+-----------+
| count(*) |
+-----------+
| 302052158 |
+-----------+
Returned 1 row(s) in 2.35s
Ta-da! The above query takes a good few minutes in Hive, BTW.
Other Notes
Impala does not support custom SerDe’s so it won’t work if you’re relying on JSON. It does support Avro.
There is no support for UDF’s, so our HiveSwarm is of no use.
INSERT OVERWRITE works, which is good.
LZO support works too.
Security Warning: Everything Impala does will appear in HDFS as
the user under which Impala is running. Be careful with this if
you’re relying on HDFS permissions to prevent an accidental “INSERT
OVERWRITE”, as you might inadvertently give your users superuser
privs on HDFS via Hue, for example. (Oh did I mention Hue completely
supports Impala too?). From what I can tell there is no way to set a
username, this is a bit of a show-stopper for us, actually.
This post attempts to explain how a relational database can be
implemented atop a key/value store, a subject that I’ve long found
rather mysterious.
Every once in a while I would come across a mention that a relational
database can be implemented using a key/value store (aka dictionary,
hash table or hash map - for brevity I’ll be using map from here on).
Whenever I thought about it, it just didn’t make sense. A relational
database needs to store rows in order, and that’s one feature that
maps do not provide. Imagine we have a table keyed by employee id
stored in a map and we need to traverse it by id in ascending order. A
hypothetical keys() method would return us a list of ids ordered
randomly. It’s impossible to iterate over a hash map in
order. So how would a relational database work then?
It took a while for me to realize the root of my misunderstanding. I
naively was trying to picture how tables, rows and values can be
represented as key/value pairs, and that was the wrong path to take. I
was focusing on the wrong layer of abstraction.
As it turns out the
key [NPI] to this problem is the clever data structure commonly used
to store data in a relational DB known as
B-Tree (or a variation
thereof, B+Tree).
Okay, B-trees are nothing new and I’m sure we’ve all heard of them. In fact B-trees were
desgined in the 1970’s as a generalization of the
Binary Search Tree that was
more suited for block storage.
But there is something about B-trees that I did not know, and which
now that I do know, seems absolutely essential as well as simply brilliant. In his 1979 paper “The
Ubiquitous B-Tree” Douglas Comer writes:
The availability of demand paging hardware suggests an
interesting implementation of B-trees. Through careful allocation,
each node of the B-tree can be mapped into one page of the virtual
address space. Then the user treats the B-tree as if it were in
memory. Accesses to nodes (pages) which are not in memory cause the
system to “page-in” the node from secondary storage.
The above paragraph implies that the B-Tree and all its data can be
stored in pages. In fact, if you look at the file
format of a SQLite 3 database
(who says source code comments are bad?) you’ll see it states quite plainly that the file
is divided into pages. (You will also see a fantastic description of exactly
how a B+tree works, but that’s way outside the scope of this post.)
The important point is that the entire file consists of pages and
nothing else. Inside those pages live the B-tree structure, as well as
the data. Each table is a B-tree and to access it we need to know the
starting page number, which in turn is stored in the sqlite_master
table whose root page is always the first page of the file. The root
page of a table is the head of the B-tree strucutre, and it may refer
to other pages, which in turn may be additional nodes of the tree or
pure data.
All pages are of the same size and are numbered
sequentially, thus we can easily retreive any page by its number
because its offset into the file is the page number multiplied by the
page size. (By default a SQLite3 page is 1K and will hold 4 keys,
i.e. the order of the tree is 4).
And bingo, there is our key/value pair: the page number is the key,
and the page itself is the value! All you need to do is stick those
pages into your favorite key/value store keyed by page number and
you’ve got a relational database atop a key/value store. It’s that
simple.
P.S. An astute reader may point out that there is such a thing as a
sorted map. But a sorted map is very different from a “pure” hash
map. The miracle of hashing is that not only does it let you find
elements in O(1) time, but more importantly that it is very suitable
for distributed processing, where the map may be spread across
multiple servers. And if you start thinking about how a sorted map
might be implemented in a distributed fashion, you will ultimately
loop back to B-trees, because that’s typically how it’s actually done.
Mapjoin is a little-known feature of Hive. It allows a table to be
loaded into memory so that a (very fast) join could be performed
entirely within a mapper without having to use a Map/Reduce step. If
your queries frequently rely on small table joins (e.g. cities or
countries, etc.) you might see a very substantial speed-up from using
mapjoins.
There are two ways to enable it. First is by using a hint, which looks
like /*+ MAPJOIN(aliasname), MAPJOIN(anothertable) */. This C-style comment
should be placed immediately following the SELECT. It directs Hive
to load aliasname (which is a table or alias of the query) into
memory.
Another (better, in my opinion) way to turn on mapjoins is to let Hive
do it automatically. Simply set hive.auto.convert.join to true in
your config, and Hive will automatically use mapjoins for any tables
smaller than hive.mapjoin.smalltable.filesize (default is 25MB).
Mapjoins have a limitation in that the same table or alias cannot be
used to join on different columns in the same query. (This makes sense
because presumably Hive uses a HashMap keyed on the column(s) used in
the join, and such a HashMap would be of no use for a join on
different keys).
The workaround is very simple - do not use the same aliases in your
query.
I also found that when the Hive documentation states that such queries
are “not supported”
they mean that the query will fail in unexpected
ways, sometimes with a Java traceback.
One of the answers was on the subject of understanding of pointers:
At the opposite end of the spectrum, I actually wish more people
understood the really core low-level kind of coding. Not big, complex
stuff like the lockless name lookup, but simply good use of
pointers-to-pointers etc. For example, I’ve seen too many people who
delete a singly-linked list entry by keeping track of the “prev”
entry, and then to delete the entry, doing something like
if (prev)
prev->next = entry->next;
else
list_head = entry->next;
and whenever I see code like that, I just go “This person doesn’t
understand pointers”. And it’s sadly quite common.
People who understand pointers just use a “pointer to the entry
pointer”, and initialize that with the address of the list_head. And
then as they traverse the list, they can remove the entry without
using any conditionals, by just doing a “*pp = entry->next”
There were a few comments, but none explained what he really
meant. So here it is.
You need to iterate over it from the begining to end and remove a
specific element whose value equals the value of to_remove. The more
obvious way to do this would be:
1234567891011121314
list_entry*entry=head;/* assuming head exists and is the first entry of the list */list_entry*prev=NULL;while(entry){if(entry->val==to_remove)/* this is the one to remove */if(prev)prev->next=entry->next;/* remove the entry */elsehead=entry->next;/* special case - first entry *//* move on to the next entry */prev=entry;entry=entry->next;}
What we are doing above is iterating over the list until entry is
NULL, which means we’ve reached the end of the list (line 4). When we
come across an entry we want removed (line 5), we assign the value of
current next pointer to the previous one, thus eliminating the
current element (line 7).
There is a special case above - at the beginning of the iteration
there is no previous entry (prev is NULL), and so to remove the
first entry in the list you have to modify head itself (line 9).
What Linus was saying is that the above code could be simplified by
making the previous element a pointer to a pointer rather than just a
pointer. The code then looks like this:
12345678910
list_entry**pp=&head;/* pointer to a pointer */list_entry*entry=head;while(entry){if(entry->val==to_remove)*pp=entry->next;pp=&entry->next;entry=entry->next;}
The above code is very similar to the previous variant, but notice how
we no longer need to watch for the special case of the first element
of the list, since pp is not NULL at the beginning. Simple and
clever.
Also, someone in that thread commented that the reason this is better
is because *pp = entry->next is atomic. It is most certainly NOT
atomic. The above expression contains two dereference operators (*
and ->) and one assignment, and neither of those three things is
atomic. This is a common misconception, but alas pretty much nothing
in C should ever be assumed to be atomic (including the ++ and --
operators)!
Some random thoughts on having tinkered with
Storm over the past few weeks.
It took me some time to understand what Storm is, and I am still not
clear I have found a perfect use for it. (This is not
a criticism of Storm, the point is that the concepts it introduces are
new, somewhat diffuclt and will need some time so sync in). The best way to get the basic
understanding of Storm concepts is to watch Nathan Marz’s excellent presentation.
In simple terms, Storm is a tool that lets you run code in parallel
across a cluster of servers. It differs from Map/Reduce in that the
actual algorithm is entirely up to you, and in essence all that Storm
provides is the framework that supervises all the moving pieces of your
application (known as a topology) and provides a uniform way of
creating, testing locally, sumbitting to a cluster, logging,
monitoring, as well as primitives for sending data between components
such as grouping data by using hashing, etc.
Storm is mainly meant for stream processing. A stream could be
anything, some of the most obvious examples may be your web logs,
tables containing user actions such as clicks, transactions,
purchases, trades, etc. If the data is coming in at a rate where it’s
challenging to process it on one server, Storm provides a way to scale
it across a cluster of servers and can handle ridiculous amounts of
incoming data. The result is a real-time view of summary data that is
always up to date.
Storm is written in Java and Clojure, which makes the JVM the common
denominator, so any JVM language should work as “native”. Storm also provides a
primitive for using pipes to a process which means that you can write
a component in anything - from a Bash script to C, all it needs to do
is read stdin and write stdout.
For those who would prefer to try it out using a higher-level
language, there is an excellent project called
Redstorm which lets you
write your topology in JRuby. While a Redstorm topology may not be as
fast as something written in pure Java, the reduced development
time is well worth any trade offs, and you always have the option of
perfecting it later by porting your code to something JVM-native when
your understanding of how it ought to work is solidified in your mind.
If you’re going to go the Redstorm route, a couple of gotchas that I
came across were:
Storm 0.8.2 and JRuby 1.7.2 disagree on the version of Yaml parsing
jar (snakeyaml). Don’t know what the solution is if you absolutely must parse
Yaml other than downgrading to JRuby 1.6.8, otherwise you can just
use something other than Yaml: JSON or just plain eval().
If you’re going to use ActiveRecord (which does work fine), watch
out for how to properly use it in a multi-threaded environment. You
might need to wrap some code in synchronize (see Concurrency in JRuby.
You will also need make sure your ActiveRecord connections are not
shared by concurrent threads by using
connection_pool.with_connection
Every item on a TODO list can be classified as urgent, important
or neither. We should act on important items first, urgent second and
ignore the rest.
Sometimes an item lands on our TODO list described as extremely urgent
without any explanation of importance. In this case the important item
(and thus to be done first) becomes determining the importance of the
extremely urgent item in question, even if it means delaying it.
The reason I so strongly believe that understanding the importance of
every thing we do is essential is quite simple: understanding
the importance implies understanding of the ultimate objective. And
inversely, not understanding the importance implies not understanding
the objective.
And if after some discussion and thinking one still cannot assess the
importance of a task, and nobody can explain it, then it
is simply not important, however urgent it may seem.
There are exceptions, however. Sometimes importance can be difficult
to verbalize. We should always be attuned to that. In my personal
experience reiterated urgency in response to “why it is important” is
a bad sign, whereas an emotional reaction of the “you just don’t get
it, how can I explain this to you?” is a very good sign.
And sometimes, when you trust that the person requesting something of
you truly believes it is both important and urgent but is unable to
verbalize the importance sufficiently well, you may have to take their
word for it and just do it, without fully understanding the
importance.
It is also important to remember to always take the time to explain
importance of things we request of others. We naturally enjoy working
on important things and resent “urgent” and unimportant tasks.
Once upon a time (over a year ago) I found myself needing to store large numbers of
integers in memory. The goal was to store a graph of all our
purchasers and items purchased, so that we could quickly identify
like-minded purchasers based on common purchases and make real-time
recommendations of the form “people like you also bought”. This
approach is commonly known as collaborative filtering,
and exactly how we did it would be a subject of some future post
(perhaps).
At the time, I was looking at tens of millions of purchases by tens of
millions people of hundreds of thousands of items. The only
information I needed to store were id’s of people and items, which
were just integers. While this seemed like a lot of data, I
believed it was entirely feasible to store them all in memory.
I didn’t have time to write my own implementation for storing this
graph, so I looked at a bunch of tools out there, asked around, and
the only one that seemed to fit the bill exactly in the end was
Redis. Yes, there are a few projects out there
that tout graph storage as their specialty, but none of them could
scale anywhere close to the level I needed. And in the end the term
“graph database” turned out to be a red herring of sorts. Any language
such as Python, Ruby or Java provides the basic data structures
quite sufficient for storing a graph as an adjacency list
out-of-the-box. You can store a graph in any key-value store, or even
in your favorite RDBMS. (To this day I’m not convinced there is any
good use case for the so-called graph databases out there.)
There were a few things that set Redis apart:
First, it keeps everything in RAM, which meant that updating this
dataset would be very fast, fast enough to keep it up-to-date in real
time.
The second great thing about Redis is Sorted Sets. This data structure
and the operations it supports fit what we needed to do
precisely. (Again, sorry for sparing you the details, but roughly, you
need to store a Set of item ids for every person as well as a Set of
person ids for every item, and “people like you” then becomes the
union of all the Sets of items that are directly linked to “you”.)
Thirdly, Redis supports replication, which meant that if the most
CPU-intensive task of computing the actual recommendations (which
requires union-ing of a large number of large Sorted Sets) becomes a
bottle neck, we could address this by running it on slaves, and
we could easily scale the system by simply adding more slaves.
Last (but hardly least) is Redis’ ability to persist and quickly load
the in-memory database. You begin to appreciate the immense value of
this once you start populating Redis by pulling historical data from
your RDBMS and realize that it could take many hours or even days.
Everything was going great with my plan but soon I ran into a problem.
Not even a quarter of the way through the initial load process, I
noticed Redis reporting 20+ GB being used, which meant that the
particular machine I was testing this on wouldn’t have enough
RAM. That was a bummer. Especially because it began to look like the
whole architecture would require more memory than would be financially
sensible for this project (yes, you could get a machine with 1TB of
memory, but it was and still is prohibitively expensive).
My hunch (supported by some quick back-of-the-napkin calculations) was
that this was a software problem, not a hardware one.
The first obvious inefficiency of storing integers on a 64-bit
system is how much space an integer takes up. 64 bits (or 8 bytes)
is enough to store a number as large as 92,23,372,036,854,775,807. Yet
this number takes up exactly as much memory as 17 or 1234 (pick your
favorite small number). In fact, the range of integers I was dealing
with was well under 1 billion and 32 bits would more than suffice.
Add to this that on a 64-bit system every pointer is also (you guessed
it) - 64 bits. So if you’re storing a (singly) linked list of
integers, you end up with 8 bytes for the integer and 8 bytes for the
“next” pointer, or 16 bytes per integer. And if your data structure
is even more complex, such as a Redis Sorted Set, which is actually
implemented as two structures updated simultaneously (a Skip List and a
Hash), well, then you begin to see that our integers may end up taking
up as much if not less memory than the pointers pointing to them.
One simple way to reduce the memory bloat was to compile Redis in
32-bit mode. Redis makes it super easy with “make 32bit”. Because of
the smaller pointer size the 32-bit mode uses much less memory, but of
course the caveat is that the total address space is limited to 32
bits or about 4GB. While this did reduce the footprint by a great
deal, it wasn’t sufficient for my data set, which still looked to be
more than 4GB in size.
Then I came across this page on memory optimization. Little
did I know Redis already provided a very compact way of storing
integers. For small lists, sets or hashes, Redis uses a special
structure it calls ziplist that can store variable-length
integers and strings. The advantage is that it is very compact, but
the flipside is that such lists can only be processed
sequentially. (This is because you can’t access an n-th element in
such a list because sizes of elements vary, so you must scan from
beginning). But it tunrs out that sequential processing is actually
more efficient for small lists rather than following a more complex
algorithm (hashing or whatever) because it requires no
indirection and can be accomplished with simple pointer math.
Redis’ zset-max-ziplist-entries config setting sets a threshold - any
Sorted Set that has fewer elements than the setting is stored as a
ziplist and as soon as it reaches the number greater than the setting
it is converted to the full-fledged Sorted Set data
structure.
What was interesting is that in my tests bumping up the value from the
default of 128 to as high as 10000 didn’t seem to have any noticeable
performance impact while reduced the memory usage by an order of
magnitude. My best guess is that even at 10K elements this list is
small enough to be processed entirely in the CPU cache.
The effect of tweaking this setting seemed like pure magic, so I just
had to dig deeper and figure out exactly how it works. You can see the
description of the format in the comments for this file in Redis
source: src/ziplist.c.
The technique is very simple - the first 4 bits are used to identify
the size of the integer. The relevant comment text:
Actually, back when I looked at it, there was no 24-bit integer
encoding, which led me to submitting a patch, which
was gladly accepted (and corrected for two’s complement support) by antirez.
Since that time I’ve been noticing different takes on variable-length
integer storage in other projects.
For example Bitcoin uses variable-length integers to minimize the total size of the block
chain. The bitcoin algo is as follows: