Gregory Trubetskoy

Notes to self.

Checking Out Cloudera Impala

| Comments

I’ve decided to check out Impala last week and here’s some notes on how that went.

First thoughts

I was very impressed with how easy it was to install, even considering our unusual set up (see below). In my simple ad-hoc tests Impala performed orders of magnitude faster than Hive. So far it seems solid down to the little details, like the shell prompt with a fully functional libreadline and column headers nicely formatted.

Installing

The first problem I encountered was that we use Cloudera tarballs in our set up, but Impala is only available as a package (RPM in our case). I tried compiling it from source, but it’s not a trivial compile - it requires LLVM (which is way cool, BTW) and has a bunch of dependencies, it didn’t work out-of-the-box for me so I’ve decided to take an alternative route (I will definitely get it compiled some weekend soon).

Retreiving contents of an RPM is trivial (because it’s really a cpio archive), and then I’d just have to “make it work”.

1
2
3
4
$ curl -O http://archive.cloudera.com/impala/redhat/6/x86_64/impala/1.0/RPMS/x86_64/impala-server-1.0-1.p0.819.el6.x86_64.rpm
$ mkdir impala
$ cd impala
$ rpm2cpio ../impala-server-1.0-1.p0.819.el6.x86_64.rpm | cpio -idmv

I noticed that usr/bin/impalad is a shell script, and it appears to rely on a few environment vars for configuration, so I created a shell script that sets them which looks approximately like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
export JAVA_HOME=/usr/java/default
export IMPALA_LOG_DIR= # your log dir
export IMPALA_STATE_STORE_PORT=24000
export IMPALA_STATE_STORE_HOST= # probably namenode host or whatever
export IMPALA_BACKEND_PORT=22000

export IMPALA_HOME= # full path to usr/lib/impala from the RPM, e.g. /home/grisha/impala/usr/lib/impala
export IMPALA_CONF_DIR= # config dir, e.g. /home/grisha/impala/etc/impala"
export IMPALA_BIN=${IMPALA_HOME}/sbin-retail
export LIBHDFS_OPTS=-Djava.library.path=${IMPALA_HOME}/lib
export MYSQL_CONNECTOR_JAR= # full path a mysql-connect jar

export HIVE_HOME= # your hive home - note: every impala nodes needs it, just config, not the whole Hive install
export HIVE_CONF_DIR= # this seems redundant, my guess HIVE_HOME is enough, but whatever
export HADOOP_CONF_DIR= # path the hadoop config, the dir that has hdfs-site.xml, etc.

export IMPALA_STATE_STORE_ARGS=" -log_dir=${IMPALA_LOG_DIR} -state_store_port=${IMPALA_STATE_STORE_PORT}"
export IMPALA_SERVER_ARGS=" \                                                                                                                                                                                  -log_dir=${IMPALA_LOG_DIR} \                                                                                                                                                                              -state_store_port=${IMPALA_STATE_STORE_PORT} \                                                                                                                                                            -use_statestore \                                                                                                                                                                                         -state_store_host=${IMPALA_STATE_STORE_HOST} \                                                                                                                                                            -be_port=${IMPALA_BACKEND_PORT}"

With the above environment vars set, starting Impala should amount to the following (you probably want to run those in separate windows, also note that the state store needs to be started first):

1
2
$ ./usr/bin/statestored ${IMPALA_STATE_STORE_ARGS} # do this on IMPALA_STATE_STORE_HOST only
$ ./usr/bin/impalad ${IMPALA_SERVER_ARGS} # do this on every node

The only problem that I encountered was that Impala needed short-circuit access enabled, so I had to add the following to the hdfs-site.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 <property>
   <name>dfs.client.read.shortcircuit</name>
   <value>true</value>
 </property>
 <property>
   <name>dfs.domain.socket.path</name>
<!-- adjust this to your set up: -->
   <value>/var/run/dfs_domain_socket_PORT.sock</value>
 </property>
 <property>
   <name>dfs.client.file-block-storage-locations.timeout</name>
   <value>3000</value>
 </property>
 <property>
<!-- adjust this too: -->
   <name>dfs.block.local-path-access.user</name>
   <value><!-- user name --></value>
 </property>

Once the above works, we need impala-shell to test it. Again, I pulled it out of the RPM:

1
2
3
$ curl -O http://archive.cloudera.com/impala/redhat/6/x86_64/impala/1.0/RPMS/x86_64/impala-shell-1.0-1.p0.819.el6.x86_64.rpm
$ mkdir shell ; cd shell
$ rpm2cpio ../impala-shell-1.0-1.p0.819.el6.x86_64.rpm | cpio -idmv

I was then able to start the shell and connect. You can connect to any Impala node (read the docs):

1
2
3
4
5
6
7
8
9
10
11
12
13
$ ./usr/bin/impala-shell
[localhost:21000] > connect some_node;
Connected to some_node:21000
Server version: impalad version 1.0 RELEASE (build d1bf0d1dac339af3692ffa17a5e3fdae0aed751f)
[some_node:21000] > select count(*) from your_favorite_table;
Query: select count(*) from your_favorite_table
Query finished, fetching results ...
+-----------+
| count(*)  |
+-----------+
| 302052158 |
+-----------+
Returned 1 row(s) in 2.35s

Ta-da! The above query takes a good few minutes in Hive, BTW.

Other Notes

  • Impala does not support custom SerDe’s so it won’t work if you’re relying on JSON. It does support Avro.
  • There is no support for UDF’s, so our HiveSwarm is of no use.
  • INSERT OVERWRITE works, which is good.
  • LZO support works too.
  • Security Warning: Everything Impala does will appear in HDFS as the user under which Impala is running. Be careful with this if you’re relying on HDFS permissions to prevent an accidental “INSERT OVERWRITE”, as you might inadvertently give your users superuser privs on HDFS via Hue, for example. (Oh did I mention Hue completely supports Impala too?). From what I can tell there is no way to set a username, this is a bit of a show-stopper for us, actually.

Relational Database on Top of Key-value Store Explained (or Why B-trees Are Cool)

| Comments

This post attempts to explain how a relational database can be implemented atop a key/value store, a subject that I’ve long found rather mysterious.

Every once in a while I would come across a mention that a relational database can be implemented using a key/value store (aka dictionary, hash table or hash map - for brevity I’ll be using map from here on).

Whenever I thought about it, it just didn’t make sense. A relational database needs to store rows in order, and that’s one feature that maps do not provide. Imagine we have a table keyed by employee id stored in a map and we need to traverse it by id in ascending order. A hypothetical keys() method would return us a list of ids ordered randomly. It’s impossible to iterate over a hash map in order. So how would a relational database work then?

It took a while for me to realize the root of my misunderstanding. I naively was trying to picture how tables, rows and values can be represented as key/value pairs, and that was the wrong path to take. I was focusing on the wrong layer of abstraction.

As it turns out the key [NPI] to this problem is the clever data structure commonly used to store data in a relational DB known as B-Tree (or a variation thereof, B+Tree). Okay, B-trees are nothing new and I’m sure we’ve all heard of them. In fact B-trees were desgined in the 1970’s as a generalization of the Binary Search Tree that was more suited for block storage.

But there is something about B-trees that I did not know, and which now that I do know, seems absolutely essential as well as simply brilliant. In his 1979 paper “The Ubiquitous B-Tree” Douglas Comer writes:

The availability of demand paging hardware suggests an interesting implementation of B-trees. Through careful allocation, each node of the B-tree can be mapped into one page of the virtual address space. Then the user treats the B-tree as if it were in memory. Accesses to nodes (pages) which are not in memory cause the system to “page-in” the node from secondary storage.

The above paragraph implies that the B-Tree and all its data can be stored in pages. In fact, if you look at the file format of a SQLite 3 database (who says source code comments are bad?) you’ll see it states quite plainly that the file is divided into pages. (You will also see a fantastic description of exactly how a B+tree works, but that’s way outside the scope of this post.)

The important point is that the entire file consists of pages and nothing else. Inside those pages live the B-tree structure, as well as the data. Each table is a B-tree and to access it we need to know the starting page number, which in turn is stored in the sqlite_master table whose root page is always the first page of the file. The root page of a table is the head of the B-tree strucutre, and it may refer to other pages, which in turn may be additional nodes of the tree or pure data.

All pages are of the same size and are numbered sequentially, thus we can easily retreive any page by its number because its offset into the file is the page number multiplied by the page size. (By default a SQLite3 page is 1K and will hold 4 keys, i.e. the order of the tree is 4).

And bingo, there is our key/value pair: the page number is the key, and the page itself is the value! All you need to do is stick those pages into your favorite key/value store keyed by page number and you’ve got a relational database atop a key/value store. It’s that simple.

P.S. An astute reader may point out that there is such a thing as a sorted map. But a sorted map is very different from a “pure” hash map. The miracle of hashing is that not only does it let you find elements in O(1) time, but more importantly that it is very suitable for distributed processing, where the map may be spread across multiple servers. And if you start thinking about how a sorted map might be implemented in a distributed fashion, you will ultimately loop back to B-trees, because that’s typically how it’s actually done.

MapJoin: A Simple Way to Speed Up Your Hive Queries

| Comments

Mapjoin is a little-known feature of Hive. It allows a table to be loaded into memory so that a (very fast) join could be performed entirely within a mapper without having to use a Map/Reduce step. If your queries frequently rely on small table joins (e.g. cities or countries, etc.) you might see a very substantial speed-up from using mapjoins.

There are two ways to enable it. First is by using a hint, which looks like /*+ MAPJOIN(aliasname), MAPJOIN(anothertable) */. This C-style comment should be placed immediately following the SELECT. It directs Hive to load aliasname (which is a table or alias of the query) into memory.

1
SELECT /*+ MAPJOIN(c) */ * FROM orders o JOIN cities c ON (o.city_id = c.id);

Another (better, in my opinion) way to turn on mapjoins is to let Hive do it automatically. Simply set hive.auto.convert.join to true in your config, and Hive will automatically use mapjoins for any tables smaller than hive.mapjoin.smalltable.filesize (default is 25MB).

Mapjoins have a limitation in that the same table or alias cannot be used to join on different columns in the same query. (This makes sense because presumably Hive uses a HashMap keyed on the column(s) used in the join, and such a HashMap would be of no use for a join on different keys).

The workaround is very simple - do not use the same aliases in your query.

I also found that when the Hive documentation states that such queries are “not supported” they mean that the query will fail in unexpected ways, sometimes with a Java traceback.

Linus on Understanding Pointers

| Comments

A while back Linus Torvalds answered questions on Slashdot.

One of the answers was on the subject of understanding of pointers:

At the opposite end of the spectrum, I actually wish more people understood the really core low-level kind of coding. Not big, complex stuff like the lockless name lookup, but simply good use of pointers-to-pointers etc. For example, I’ve seen too many people who delete a singly-linked list entry by keeping track of the “prev” entry, and then to delete the entry, doing something like

if (prev)
prev->next = entry->next;
else
list_head = entry->next;

and whenever I see code like that, I just go “This person doesn’t understand pointers”. And it’s sadly quite common.

People who understand pointers just use a “pointer to the entry pointer”, and initialize that with the address of the list_head. And then as they traverse the list, they can remove the entry without using any conditionals, by just doing a “*pp = entry->next”

There were a few comments, but none explained what he really meant. So here it is.

Imagine you have a linked list defined as:

1
2
3
4
typedef struct list_entry {
    int val;
    struct list_entry *next;
} list_entry;

You need to iterate over it from the begining to end and remove a specific element whose value equals the value of to_remove. The more obvious way to do this would be:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
list_entry *entry = head; /* assuming head exists and is the first entry of the list */
list_entry *prev = NULL;

while (entry) {
    if (entry->val == to_remove)     /* this is the one to remove */
        if (prev)
           prev->next = entry->next; /* remove the entry */
        else
            head = entry->next;      /* special case - first entry */

    /* move on to the next entry */
    prev = entry;
    entry = entry->next;
}

What we are doing above is iterating over the list until entry is NULL, which means we’ve reached the end of the list (line 4). When we come across an entry we want removed (line 5), we assign the value of current next pointer to the previous one, thus eliminating the current element (line 7).

There is a special case above - at the beginning of the iteration there is no previous entry (prev is NULL), and so to remove the first entry in the list you have to modify head itself (line 9).

What Linus was saying is that the above code could be simplified by making the previous element a pointer to a pointer rather than just a pointer. The code then looks like this:

1
2
3
4
5
6
7
8
9
10
list_entry **pp = &head; /* pointer to a pointer */
list_entry *entry = head;

while (entry) {
    if (entry->val == to_remove)
        *pp = entry->next;

    pp = &entry->next;
    entry = entry->next;
}

The above code is very similar to the previous variant, but notice how we no longer need to watch for the special case of the first element of the list, since pp is not NULL at the beginning. Simple and clever.

Also, someone in that thread commented that the reason this is better is because *pp = entry->next is atomic. It is most certainly NOT atomic. The above expression contains two dereference operators (* and ->) and one assignment, and neither of those three things is atomic. This is a common misconception, but alas pretty much nothing in C should ever be assumed to be atomic (including the ++ and -- operators)!

Storm Notes

| Comments

Some random thoughts on having tinkered with Storm over the past few weeks.

It took me some time to understand what Storm is, and I am still not clear I have found a perfect use for it. (This is not a criticism of Storm, the point is that the concepts it introduces are new, somewhat diffuclt and will need some time so sync in). The best way to get the basic understanding of Storm concepts is to watch Nathan Marz’s excellent presentation.

In simple terms, Storm is a tool that lets you run code in parallel across a cluster of servers. It differs from Map/Reduce in that the actual algorithm is entirely up to you, and in essence all that Storm provides is the framework that supervises all the moving pieces of your application (known as a topology) and provides a uniform way of creating, testing locally, sumbitting to a cluster, logging, monitoring, as well as primitives for sending data between components such as grouping data by using hashing, etc.

Storm is mainly meant for stream processing. A stream could be anything, some of the most obvious examples may be your web logs, tables containing user actions such as clicks, transactions, purchases, trades, etc. If the data is coming in at a rate where it’s challenging to process it on one server, Storm provides a way to scale it across a cluster of servers and can handle ridiculous amounts of incoming data. The result is a real-time view of summary data that is always up to date.

Storm is written in Java and Clojure, which makes the JVM the common denominator, so any JVM language should work as “native”. Storm also provides a primitive for using pipes to a process which means that you can write a component in anything - from a Bash script to C, all it needs to do is read stdin and write stdout.

For those who would prefer to try it out using a higher-level language, there is an excellent project called Redstorm which lets you write your topology in JRuby. While a Redstorm topology may not be as fast as something written in pure Java, the reduced development time is well worth any trade offs, and you always have the option of perfecting it later by porting your code to something JVM-native when your understanding of how it ought to work is solidified in your mind.

If you’re going to go the Redstorm route, a couple of gotchas that I came across were:

  • Storm 0.8.2 and JRuby 1.7.2 disagree on the version of Yaml parsing jar (snakeyaml). Don’t know what the solution is if you absolutely must parse Yaml other than downgrading to JRuby 1.6.8, otherwise you can just use something other than Yaml: JSON or just plain eval().

  • If you’re going to use ActiveRecord (which does work fine), watch out for how to properly use it in a multi-threaded environment. You might need to wrap some code in synchronize (see Concurrency in JRuby. You will also need make sure your ActiveRecord connections are not shared by concurrent threads by using connection_pool.with_connection

On Prioritization - Important vs Urgent

| Comments

Every item on a TODO list can be classified as urgent, important or neither. We should act on important items first, urgent second and ignore the rest.

Sometimes an item lands on our TODO list described as extremely urgent without any explanation of importance. In this case the important item (and thus to be done first) becomes determining the importance of the extremely urgent item in question, even if it means delaying it.

The reason I so strongly believe that understanding the importance of every thing we do is essential is quite simple: understanding the importance implies understanding of the ultimate objective. And inversely, not understanding the importance implies not understanding the objective.

And if after some discussion and thinking one still cannot assess the importance of a task, and nobody can explain it, then it is simply not important, however urgent it may seem.

There are exceptions, however. Sometimes importance can be difficult to verbalize. We should always be attuned to that. In my personal experience reiterated urgency in response to “why it is important” is a bad sign, whereas an emotional reaction of the “you just don’t get it, how can I explain this to you?” is a very good sign.

And sometimes, when you trust that the person requesting something of you truly believes it is both important and urgent but is unable to verbalize the importance sufficiently well, you may have to take their word for it and just do it, without fully understanding the importance.

It is also important to remember to always take the time to explain importance of things we request of others. We naturally enjoy working on important things and resent “urgent” and unimportant tasks.

On Keeping Lots of Integers in Memory

| Comments

Once upon a time (over a year ago) I found myself needing to store large numbers of integers in memory. The goal was to store a graph of all our purchasers and items purchased, so that we could quickly identify like-minded purchasers based on common purchases and make real-time recommendations of the form “people like you also bought”. This approach is commonly known as collaborative filtering, and exactly how we did it would be a subject of some future post (perhaps).

At the time, I was looking at tens of millions of purchases by tens of millions people of hundreds of thousands of items. The only information I needed to store were id’s of people and items, which were just integers. While this seemed like a lot of data, I believed it was entirely feasible to store them all in memory.

I didn’t have time to write my own implementation for storing this graph, so I looked at a bunch of tools out there, asked around, and the only one that seemed to fit the bill exactly in the end was Redis. Yes, there are a few projects out there that tout graph storage as their specialty, but none of them could scale anywhere close to the level I needed. And in the end the term “graph database” turned out to be a red herring of sorts. Any language such as Python, Ruby or Java provides the basic data structures quite sufficient for storing a graph as an adjacency list out-of-the-box. You can store a graph in any key-value store, or even in your favorite RDBMS. (To this day I’m not convinced there is any good use case for the so-called graph databases out there.)

There were a few things that set Redis apart:

First, it keeps everything in RAM, which meant that updating this dataset would be very fast, fast enough to keep it up-to-date in real time.

The second great thing about Redis is Sorted Sets. This data structure and the operations it supports fit what we needed to do precisely. (Again, sorry for sparing you the details, but roughly, you need to store a Set of item ids for every person as well as a Set of person ids for every item, and “people like you” then becomes the union of all the Sets of items that are directly linked to “you”.)

Thirdly, Redis supports replication, which meant that if the most CPU-intensive task of computing the actual recommendations (which requires union-ing of a large number of large Sorted Sets) becomes a bottle neck, we could address this by running it on slaves, and we could easily scale the system by simply adding more slaves.

Last (but hardly least) is Redis’ ability to persist and quickly load the in-memory database. You begin to appreciate the immense value of this once you start populating Redis by pulling historical data from your RDBMS and realize that it could take many hours or even days.

Everything was going great with my plan but soon I ran into a problem. Not even a quarter of the way through the initial load process, I noticed Redis reporting 20+ GB being used, which meant that the particular machine I was testing this on wouldn’t have enough RAM. That was a bummer. Especially because it began to look like the whole architecture would require more memory than would be financially sensible for this project (yes, you could get a machine with 1TB of memory, but it was and still is prohibitively expensive).

My hunch (supported by some quick back-of-the-napkin calculations) was that this was a software problem, not a hardware one.

The first obvious inefficiency of storing integers on a 64-bit system is how much space an integer takes up. 64 bits (or 8 bytes) is enough to store a number as large as 92,23,372,036,854,775,807. Yet this number takes up exactly as much memory as 17 or 1234 (pick your favorite small number). In fact, the range of integers I was dealing with was well under 1 billion and 32 bits would more than suffice.

Add to this that on a 64-bit system every pointer is also (you guessed it) - 64 bits. So if you’re storing a (singly) linked list of integers, you end up with 8 bytes for the integer and 8 bytes for the “next” pointer, or 16 bytes per integer. And if your data structure is even more complex, such as a Redis Sorted Set, which is actually implemented as two structures updated simultaneously (a Skip List and a Hash), well, then you begin to see that our integers may end up taking up as much if not less memory than the pointers pointing to them.

One simple way to reduce the memory bloat was to compile Redis in 32-bit mode. Redis makes it super easy with “make 32bit”. Because of the smaller pointer size the 32-bit mode uses much less memory, but of course the caveat is that the total address space is limited to 32 bits or about 4GB. While this did reduce the footprint by a great deal, it wasn’t sufficient for my data set, which still looked to be more than 4GB in size.

Then I came across this page on memory optimization. Little did I know Redis already provided a very compact way of storing integers. For small lists, sets or hashes, Redis uses a special structure it calls ziplist that can store variable-length integers and strings. The advantage is that it is very compact, but the flipside is that such lists can only be processed sequentially. (This is because you can’t access an n-th element in such a list because sizes of elements vary, so you must scan from beginning). But it tunrs out that sequential processing is actually more efficient for small lists rather than following a more complex algorithm (hashing or whatever) because it requires no indirection and can be accomplished with simple pointer math.

Redis’ zset-max-ziplist-entries config setting sets a threshold - any Sorted Set that has fewer elements than the setting is stored as a ziplist and as soon as it reaches the number greater than the setting it is converted to the full-fledged Sorted Set data structure.

What was interesting is that in my tests bumping up the value from the default of 128 to as high as 10000 didn’t seem to have any noticeable performance impact while reduced the memory usage by an order of magnitude. My best guess is that even at 10K elements this list is small enough to be processed entirely in the CPU cache.

The effect of tweaking this setting seemed like pure magic, so I just had to dig deeper and figure out exactly how it works. You can see the description of the format in the comments for this file in Redis source: src/ziplist.c.

The technique is very simple - the first 4 bits are used to identify the size of the integer. The relevant comment text:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* |11000000| - 1 byte
* Integer encoded as int16_t (2 bytes).
* |11010000| - 1 byte
* Integer encoded as int32_t (4 bytes).
* |11100000| - 1 byte
* Integer encoded as int64_t (8 bytes).
* |11110000| - 1 byte
* Integer encoded as 24 bit signed (3 bytes).
* |11111110| - 1 byte
* Integer encoded as 8 bit signed (1 byte).
* |1111xxxx| - (with xxxx between 0000 and 1101) immediate 4 bit integer.
* Unsigned integer from 0 to 12. The encoded value is actually from
* 1 to 13 because 0000 and 1111 can not be used, so 1 should be
* subtracted from the encoded 4 bit value to obtain the right value.

Actually, back when I looked at it, there was no 24-bit integer encoding, which led me to submitting a patch, which was gladly accepted (and corrected for two’s complement support) by antirez.

Since that time I’ve been noticing different takes on variable-length integer storage in other projects.

For example Bitcoin uses variable-length integers to minimize the total size of the block chain. The bitcoin algo is as follows:

1
2
3
4
5
6
7
8
9
 * Examine at the first byte
 *  - If that first byte is less than 253,
 *    use the byte literally
 *  - If that first byte is 253, read the next two bytes
 *    as a little endian 16-bit number (total bytes read = 3)
 *  - If that first byte is 254, read the next four bytes
 *    as a little endian 32-bit number (total bytes read = 5)
 *  - If that first byte is 255, read the next eight bytes
 *   as a little endian 64-bit number (total bytes read = 9)

SQLite3 uses its own variable-length integer format, possibly cleverer than the two above:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
** Cell content makes use of variable length integers.  A variable
** length integer is 1 to 9 bytes where the lower 7 bits of each
** byte are used.  The integer consists of all bytes that have bit 8 set and
** the first byte with bit 8 clear.  The most significant byte of the integer
** appears first.  A variable-length integer may not be more than 9 bytes long.
** As a special case, all 8 bytes of the 9th byte are used as data.  This
** allows a 64-bit integer to be encoded in 9 bytes.
**
**    0x00                      becomes  0x00000000
**    0x7f                      becomes  0x0000007f
**    0x81 0x00                 becomes  0x00000080
**    0x82 0x00                 becomes  0x00000100
**    0x80 0x7f                 becomes  0x0000007f
**    0x8a 0x91 0xd1 0xac 0x78  becomes  0x12345678
**    0x81 0x81 0x81 0x81 0x01  becomes  0x10204081
**
** Variable length integers are used for rowids and to hold the number of
** bytes of key and data in a btree cell.

There are also other more sophisticated techniques of storing lists of integers such as Elias encoding and Golomb coding.