Gregory Trubetskoy

Notes to self.

Apache Avro

| Comments

Short version

  • Avro is better than Json for storing table data
  • Avro supports schema resolution so that the schema can evolve over time
  • Hive supports Avro and schema resolution nicely
  • Impala (1.0) can read Avro tables, but does not support schema resolution
  • Mixing compression codecs in the same table works in both Hive and Impala

The TL;DR version

Introduction

If you’re logging data into Hadoop to be analyzed, chances are you’re using JSON. JSON is great because it’s easy to generate in most any language, it’s human-readable, it’s universally supported and infinitely flexible.

It is also space inefficient, prone to errors, the standard has a few ambiguities, all of which eventually catches up to you. It only takes one bad record to spoil a potentially massive amount of data, and finding the bad record and figuring out the root cause of the problem is usually difficult and often even impossible.

So you might be considering a slightly more rigid and space efficient format, and in the Hadoop world it is Apache Avro. Avro is especially compelling because it is supported by Impala, while JSON isn’t (not yet, at least).

Named after a British aircraft maker, Avro is a schema-enforced format for serializing arbitrary data. It is in the same category as Thrift, only it seems like Thrift has found its niche in RPC, whereas Avro appears more compelling as the on-disk format (even though both Avro and Thrift were designed for both storage and RPC). Thrift seems more insistent on you using its code generator, whereas Avro does it the old-school way, but providing you libraries you can use in your code. (It does code generation as well, if that’s your thing. I prefer to hand-write all my code).

I actually don’t want to focus on the details of what Avro is as there is plenty information on that elsewhere. I want to share my findings regarding Avro’s suitability as an alternative to JSON used with Hive and JSON SerDe.

Schema (and its Resolution)

Every Avro file contains a header with the schema describing (in JSON!) the contents of the file’s records. This is very nice, because the file contains all the knowledge necessary to be able to read it.

Avro was designed with the understanding that the schema may change over time (e.g. columns added or changed), and that software designed for a newer schema may need to read older schema files. To support this it provides something called Schema Resolution.

Imagine you’ve been storing people’s names in a file. Then later on you decided to add “age” as another attribute. Now you’re got two schemas, one with “age” and one without. In JSON world you’d have to adjust your program to be able to read old files with some kind of an if/then statement to make sure that when “age” is not there the program knows what to do. In Avro, the new schema can specify a default for the age (e.g. 0), and whatever Avro lib you’d be using should be able to convert a record of the old schema to the new schema automatically, without any code modifications necessary. This is called schema resolution.

Avro support in Hive

First we need an Avro file. Our schema is just one string column named “test”. Here’s a quick Ruby program to generate a file of 10 random records (you’ll need to gem install avro):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
require 'rubygems'
require 'avro'

schema = Avro::Schema.parse('{"name":"test_record", ' +
                            ' "type":"record", ' +
                            ' "fields": [' +
                            '   {"name":"full_name",  "type":"string"}]}')

writer = Avro::IO::DatumWriter.new(schema)
file = File.open('test.avro', 'wb')
dw = Avro::DataFile::Writer.new(file, writer, schema)
3.times do
  dw << {'full_name'=>"X#{rand(10000)} Y#{rand(10000)}"}
end
dw.flush
dw.close

Then we need to create a Hive table and load our file into it:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE test_avro
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
 STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
 OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
 TBLPROPERTIES (
    'avro.schema.literal'='{"name":"test_record",
                            "type":"record",
                            "fields": [
                               {"name":"full_name", "type":"string"}]}');

LOAD DATA LOCAL INPATH 'test.avro' OVERWRITE INTO TABLE test_avro;

Note that the table definition needs its own schema definition, even though our file already contains a schema. This is not a mistake. This is the schema Hive will expect. And if the file that it’s reading is of a different schema, it will attempt to convert it using Avro schema resolution. Also noteworthy is that this table defines no columns. The entire definition is in the avro.schema.literal property.

Let’s make sure this is working:

1
2
3
4
5
hive> select * from test_avro;
OK
X1800 Y9002
X3859 Y8971
X6935 Y5523

Now we also happen to have Impala running, let’s see if it’s able to read this file:

1
2
3
4
5
6
7
8
9
10
> select * from test_avro;
Query: select * from test_avro
Query finished, fetching results ...
+-------------+
| full_name   |
+-------------+
| X1800 Y9002 |
| X3859 Y8971 |
| X6935 Y5523 |
+-------------+

So far so good! Now let’s create a second avro file, with one additional column age, using the following Ruby:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
schema = Avro::Schema.parse('{"name":"test_record", ' +
                            ' "type":"record", ' +
                            ' "fields": [' +
                            '   {"name":"full_name",  "type":"string"},' +
                            '   {"name":"age",        "type":"int"}]}')

writer = Avro::IO::DatumWriter.new(schema)
file = File.open('test2.avro', 'wb')
dw = Avro::DataFile::Writer.new(file, writer, schema)
3.times do
  dw << {'full_name'=>"X#{rand(10000)} Y#{rand(10000)}", 'age'=>rand(100)}
end
dw.flush
dw.close

Let’s load this into Hive and see if it still works. (No OVERWRITE keyword this time, we’re appending a second file to our table).

1
2
3
4
5
6
7
8
9
10
hive> LOAD DATA LOCAL INPATH 'test2.avro' INTO TABLE test_avro;
OK
hive> select * from test_avro;
OK
X1800 Y9002
X3859 Y8971
X6935 Y5523
X4720 Y1361
X4605 Y3067
X7007 Y7852

This is working exactly as expected. Hive has shown the 3 original records as before, and the 3 new ones got converted to Hive’s version of the schema, where the “age” column does not exist.

Let’s see what Impala thinks of this:

1
2
3
4
5
6
7
8
9
10
> select * from test_avro;
Query: select * from test_avro
Query finished, fetching results ...
+-------------+
| full_name   |
+-------------+
| X1800 Y9002 |
| X3859 Y8971 |
| X6935 Y5523 |
+-------------+

Alas - we’re only getting the 3 original rows. Bummer! What’s worrisome is that no indication was given to us that 3 other rows got swallowed because Impala didn’t do schema resolution. (I’ve posted regarding this on the Impala users list, awaiting response).

Now let’s alter the table schema so that age is part of it. (This is not your typical ALTER TABLE, we’re just changing avro.schema.literal).

1
2
3
4
5
6
7
8
9
10
hive> ALTER TABLE test_avro SET TBLPROPERTIES (
    'avro.schema.literal'='{"name":"test_record",
                            "type":"record",
                            "fields": [
                              {"name":"full_name", "type":"string"},
                              {"name":"age",  "type":"int"}]}');
OK
hive> select * from test_avro;
OK
Failed with exception java.io.IOException:org.apache.avro.AvroTypeException: Found test_record, expecting test_record

This error should not be surprising. Hive is trying to provide a value for age for those records where it did not exist, but we neglected to specify a default. (The error message is a little cryptic, though). So let’s try again, this time with a default:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hive> ALTER TABLE test_avro SET TBLPROPERTIES (
    'avro.schema.literal'='{"name":"test_record",
                            "type":"record",
                            "fields": [
                              {"name":"full_name", "type":"string"},
                              {"name":"age",  "type":"int", "default":999}]}');
OK
hive> select * from test_avro;
OK
X1800 Y9002     999
X3859 Y8971     999
X6935 Y5523     999
X4720 Y1361     10
X4605 Y3067     34
X7007 Y7852     17

Woo-hoo!

Other notes

  • There is a nice Avro-C library, but it currently does not support defaults (version 1.7.4).
  • Some benchmarks
  • Avro’s integer encoding is same as Lucene with zigzag encoding on top of it. (It’s just like SQLite, only little-endian). Curiously, it is used for all integers, including array indexes internal to Avro, which can never be negative and thus ZigZag is of no use. This is probably to keep all integer operation consistent.
  • A curious post on how Lucene/Avro variable-integer format is CPU-unfriendly

Comments