Gregory Trubetskoy

Notes to self.

Tgres 0.10.0b - Time Series With Go and PostgreSQL

| Comments

After nearly two years of hacking, I am tagging this version of Tgres as beta. It is functional and stable enough for people to try out and not feel like they are wasting their time. There is still a lot that could and should be improved, but at this point the most important thing is to get more people to check it out.

What is Tgres?

Tgres is a Go program which can receive time series data via Graphite, Statsd protocols or an http pixel, store it in PostgreSQL, and provide Graphite-like access to the data in a way that is compatible with tools such as Grafana. You could think of it as a drop-in Graphite/Statsd replacement, though I’d rather avoid direct comparison, because the key feature of Tgres is that data is stored in PostgreSQL.

Why PostgreSQL?

The “grand vision” for Tgres begins with the database. Relational databases have the most man-decades of any storage type invested into them, and PostgreSQL is probably the most advanced implementation presently in existence.

If you search for “relational databases and time series” (or some variation thereupon), you will come across the whole gamut of opinions (if not convictions) varying so widely it is but discouraging. This is because time series storage, while simple at first glance, is actually fraught with subtleties and ambiguities that can drive even the most patient of us up the wall.

Avoid Solving the Storage Problem.

Someone once said that “anything is possible when you don’t know what you’re talking about”, and nowhere is it more evident than in data storage. File systems and relational databases trace their origin back to the late 1960s and over half a century later I doubt that any field experts would say “the storage problem is solved”. And so it seems almost foolish to suppose that by throwing together a key-value store and a concensus algorithm or some such it is possible to come up with something better? Instead of re-inventing storage, why not focus on how to structure the data in a way that is compatible with a storage implementation that we know works and scales reliably?

As part of the Tgres project, I thought it’d be interesting to get to the bottom of this. If not bottom, then at least deeper than most people dare to dive. I am not a mathematician or a statistician, nor am I a data scientist, whatever that means, but I think I understand enough about the various subjects involved, including programming, that I can come up with something more than just another off-the-cuff opinion.

And so now I think I can conclude definitively that time series data can be stored in a relational database very efficently, PostgreSQL in particular for its support for arrays. The general approach I described in a series of blogs starting with this one, Tgres uses the technique described in the last one. In my performance tests the Tgres/Postgres combination was so efficient it was possibly outperforming its time-series siblings.

The good news is that as a user you don’t need to think about the complexities of the data layout, Tgres takes care of it. Still I very much wish people would take more time to think about how to organize data in a tried and true solution like PostgreSQL before jumping ship into the murky waters of the “noSQL” ocean, lured by alternative storage sirens, big on promise but shy on delivery, only to drown where no one could come to the rescue.

How else is Tgres different?

Tgres is a single program, a single binary which does everything (one of my favorite things about Go). It supports all of Graphite and Statsd protocols without having to run separate processes, there are no dependencies of any kind other than a PostgreSQL database. No need for Python, Node or a JVM, just the binary, the config file and access to a database.

And since the data is stored in Postgres, virtually all of the features of Postgres are available: from being able to query the data using real SQL with all the latest features, to replication, security, performance, back-ups and whatever else Postgres offers.

Another benefit of data being in a database is that it can be accessible to any application frameworks in Python, Ruby or whatever other language as just another database table. For example in Rails it might be as trivial as class Tv < ActiveRecord::Base; end et voilà, you have the data points as a model.

It should also be mentioned that Tgres requires no PostgreSQL extensions. This is because optimizing by implementing a custom extension which circumvents the PostgreSQL natural way of handling data means we are solving the storage problem again. PostgreSQL storage is not broken to begin with, no customization is necessary to handle time series.

In addition to being a standalone program, Tgres packages aim to be useful on their own as part of any other Go program. For example it is very easy to equip a Go application with Graphite capabilities by providing it access to a database and using the provided http handler. This also means that you can use a separate Tgres instance dedicated to querying data (perhaps from a downstream Potgres slave).

Some Internals Overview

Internally, Tgres series identification is tag-based. The series are identified by a JSONB field which is a set of key/value pairs indexed using a GIN index. In Go, the JSONB field becomes a serde.Ident. Since the “outside” interface Tgres is presently mimicking is Graphite, which uses dot-separated series identifiers, all idents are made of just one tag “name”, but this will change as we expand the DSL.

Tgres stores data in evenly-spaced series. The conversion from the data as it comes in to its evenly-spaced form happens on-the-fly, using a weighted mean method, and the resulting stored rate is actually correct. This is similar to how RRDTool does it, but different from many other tools which simply discard all points except for last in the same series slot as I explained in this post.

Tgres maintains a (configurable) number of Round-Robin Archives (RRAs) of varying length and resolution for each series, this is an approach similar to RRDTool and Graphite Whisper as well. The conversion to evenly-spaced series happens in the rrd package.

Tgres does not store the original (unevenly spaced) data points. The rationale behind this is that for analytical value you always inevitably have to convert an uneven series to a regular one. The problem of storing the original data points is not a time-seires problem, the main challenge there is the ability to keep up with a massive influx of data, and this is what Hadoop, Cassandra, S3, BigQuery, etc are excellent at.

While Tgres code implements most of the Graphite functions, complete compatibility with the Graphite DSL is not a goal, and some functions will probably left uniplemented. In my opinion the Graphite DSL has a number of shortcomings by design. For example, the series names are not strings but are syntactically identifiers, i.e. there is no difference between scale(foo.bar, 10) and scale("foo.bar", 10), which is problematic in more than one way. The dot-names are ingrained into the DSL, and lots of functions take arguments denoting position within the dot-names, but they seem unnecessary. For example there is averageSeriesWithWildcards and sumSeriesWithWildcards, while it would be cleaner to have some kind of a wildcard() function which can be passed into average() or sum(). Another example is that Graphite does not support chaining (but Tgres already does), e.g. scale(average("foo.*"), 10) might be better as average("foo.*").scale(10). There are many more similar small grievances I have with the DSL, and in the end I think that the DSL ought to be revamped to be more like a real language (or perhaps just be a language, e.g. Go itself), exactly how hasn’t been crystalized just yet.

Tgres also aims to be a useful time-series processing Golang package (or a set of packages). This means that in Go the code also needs to be clean and readable, and that there ought to be a conceptual correspondence between the DSL and how one might to something at the lower level in Go. Again, the vision here is still blurry, and more thinking is required.

For Statsd functionality, the network protocol is supported by the tgres/statsd package while the aggregation is done by the tgres/aggregator. In addition, there is also support for “paced metrics” which let you aggregate data before it is passed on to the Tgres receiver and becomes a data point, which is useful in situations where you have some kind of an iteration that would otherwise generate millions of measurements per second.

The finest resolution for Tgres is a millisecond. Nanoseconds seems too small to be practical, though it shouldn’t be too hard to change it, as internally Tgres uses native Go types for time and duration - the milliseconds are the integers in the database.

When the Data points are received via the network, the job of parsing the network stuff is done by the code in the tgres/daemon package with some help from tgres/http and tgres/statsd, as well as potentially others (e.g. Python pickle decoding).

Once received and correctly parsed, they are passed on to the tgres/receiver. The receiver’s job is to check whether this series ident is known to us by checking the cache or that it needs to be loaded from the database or created. Once the appropriate series is found, the receiver updates the in-memory cache of the RRAs for the series (which causes the data points to be evenly spaced) as well as periodically flushes data points to the data base. The receiver also controls the aggregator of statsd metrics.

The database interface code is in the tgres/serde package which supports PostgreSQL or an in-memory database (useful in situations where persistence is not required or during testing).

When Tgres is queried for data, it loads it from the database into a variety of implementations of the Series interface in the tgres/series package as controlled by the tgres/dsl responsible for figuring out what is asked of it in the query.

In addition to all of the above, Tgres supports clustering, though this is highly experimental at this point. The idea is that a cluster of Tgres instances (all backed by the same database, at least for now) would split the series amongst themselves and forward data points to the node which is responsible for a particular series. The nodes are placed behind a load-balancer of some kind, and with this set up nodes can go in and out of the cluster without any overall downtime for maximum availability. The clustering logic lives in tgres/cluster.

This is an overly simplistic overview which hopefully conveys that there are a lot of pieces to Tgres.

Future

In addition to a new/better DSL, there are lots of interesting ideas, and if you have any please chime in on Github.

One thing that is missing in the telemetry world is encryption, authentication and access control so that tools like Tgres could be used to store health data securely.

A useful feature might be interoperability with big data tools to store the original data points and perhaps provide means for pulling them out of BigQuery or whatever and replay them into series - this way we could change the resolution to anything at will.

Or little details like a series alias - so that a series could be renamed. The way this would work is you rename a series while keeping its old ident as an alias, then take your time to make sure all the agents send data under the new name, at which point the alias can go away.

Lots can also be done on the scalability front with improved clustering, sharding, etc.

We Could Use Your Help

Last but not least, this is an Open Source project. It works best when people who share the vision also contribute to the project, and this is where you come in. If you’re interested in learning more about time series and databases, please check it out and feel free to contribute in any way you can!

Tgres Load Testing Follow Up

| Comments

To follow up on the previous post, after a bunch of tweaking, here is Tgres (commit) receiving over 150,000 data points per second across 500,000 time series without any signs of the queue size or any other resource blowing up.

This is both Tgres and Postgres running on the same i2.2xlarge EC2 instance (8 cores, 64GB, SSD).

At this point I think there’s been enough load testing and optimization, and I am going to get back to crossing the t’s and dotting the i’s so that we can release the first version of Tgres.

PostgreSQL vs Whisper, Which Is Faster?

| Comments

Note: there is an update to this post.

TL;DR

On a 8 CPU / 16 GB EC2 instance, Tgres can process 150,000 data points per second across 300,000 series (Postgres running on the same machine). With some tweaks we were able to get the number of series to half a million, flushing ~60K data points per second.

Now the long version…

If you were to ask me whether Tgres could outperform Graphite, just a couple of months ago my answer would have been “No”. Tgres uses Postgres to store time series data, while Graphite stores data by writing to files directly, the overhead of the relational database just seemed too great.

Well, I think I’ve managed to prove myself wrong. After re-working Tgres to use the write-optimized layout, I’ve run some tests on AWS yielding unexpectedly promising results.

As a benchmark I targeted the excellent blog post by Jason Dixon describing his AWS Graphite test. My goal was to get to at least half the level of performance described therein. But it appears the combination of Go, Postgres and some clever data structuring has been able to beat it, not without breaking a little sweat, but it has.

My test was conducted on a c4.2xlarge instance, which has 8 cores and 16 GB, using 100GB EBS (which, if I understood it correctly, comes with 300 IOPS, please comment if I’m wrong). The “c4” instances are supposed to be some of the highest speed CPU AWS has to offer, but compare this with the instance used in the Graphite test, an i2.4xlarge (16 CPU/ 122GB), it had half the CPU cores and nearly one tenth of the RAM.

Before I go any further, here is the obligatory screenshot, then my observations and lessons learned in the process, as well as a screenshot depicting even better performance.

The Tgres version running was this one, with the config detailed at the bottom of the post.

Postgres was whatever yum install postgresql95-server brings your way, with the data directory moved to the EBS volume formatted using ext4 (not that I think it matters). The Postgres config was modified to allow a 100ms commit delay and to make autovacuum extra aggressive. I did not increase any memory buffers and left everything else as is. Specifically, these were the changes:

1
2
3
4
5
6
7
8
autovacuum_work_mem = -1
synchronous_commit = off
commit_delay = 100000
autovacuum_max_workers = 10
autovacuum_naptime = 1s
autovacuum_vacuum_threshold = 2000
autovacuum_vacuum_scale_factor = 0.0
autovacuum_vacuum_cost_delay = 0

The data points for the test were generated by a goroutine in the Tgres process itself. In the past I’ve found that blasting a server with this many UDP packets can be tricky and hardware/network intensive. It’s also hard to tell when/if they get dropped and why, etc. Since Go is not known for having problems in its network stack, I was not too worried about it, I just wanted a reliable and configurable source of incoming packets, and in Go world writing a simple goroutine seemed like the right answer.

Somewhat Random Notes and Making Tgres Even Faster

Determining failure

Determining when we are “at capacity” is tricky. I’ve mostly looked at two factors (aside from the obvious - running out of memory/disk, becoming unresponsive, etc): receiver queue size and Postgres table bloat.

Queue size

Tgres uses “elastic channels” (so eloquently described here by Nick Patavalis) for incoming data points and to load series from Postgres. These are channel-like structures that can grow to arbitrary length only limited by the memory available. This is done so as to be able to take maximum advantage of the hardware at hand. If any of those queues starts growing out of control, we are failing. You can see in the picture that at about 140K data points per second the receiver queue started growing, though it did stay steady at this size and never spun out of control (the actual test was left overnight at this rate just to make sure).

PG Table Bloat

Table bloat is a phenomenon affecting Postgres in write-intensive situations because of its adherence to the MVCC. It basically means that pages on disk are being updated faster than the autovacuum process can keep up with them and the table starts growing out of control.

To monitor for table bloat, I used a simple formula which determined the approximate size of the table based on the row count (our data is all floats, which makes it very predictable) and compared it with the actual size. If the actual size exceeded the estimated size, that’s considered bloat. Bloat is reported in the “TS Table Size” chart. A little bloat is fine, and you can see that it stayed in fairly low percent throughout the test.

In the end, though more research is warranted, it may just turn out that contrary to every expectation PostgreSQL was not the limiting factor here. The postmaster processes stayed below 170MB RSS, which is absolutely remarkable, and Grafana refreshes were very quick even at peak loads.

Memory consumption

Tgres has a slight limitation in that creating a series is expensive. It needs to check with Postgres and for reasons I don’t want to bore you with it’s always a SELECT, optionally followed by an “UPSERT”. This takes time, and during the ramp-up period when the number of series is growing fast and lots of them need to be created, the Go runtime ends up consuming a lot of memory. You can see that screenshot image reports 4.69GB. If I were to restart Tgres (which would cause all existing DS names to be pre-cached) its memory footprint stayed at about 1.7GB. More work needs to be done to figure out what accounts for the difference.

Data Point Rate and Number of Series

The rate of data points that need to be saved to disk is a function of the number of series and the resolution of the RRAs. To illustrate, if I have one series at 1 point per second, even if I blast a million data points per second, still only 1 data point per second needs to be saved.

There is an important difference between Graphite and Tgres in that Tgres actually adjusts the final value considering the every data point value using weighted mean, while Graphite just ignores all points but the last. So Tgres does a bit more work, which adds up quickly at 6-figure rates per second.

The Graphite test if I read the chart correctly was able to process ~70K data points per second across 300K series. My test had 300K series and data points were coming in at over 150K/s. But just out of curiosity, I tried to push it to its limit.

At 400 series, you can see clear signs of deterioration. You can see how vcache isn’t flushed fast enough leaving gaps at the end of series. If we stop the data blast, it does eventually catch up, so long as there is memory for the cache.

If you don’t catch this condition in time, Tgres will die with:

1
2
3
4
5
6
fatal error: runtime: out of memory

runtime stack:
runtime.throw(0xa33e5a, 0x16)
        /home/grisha/.gvm/gos/go1.8/src/runtime/panic.go:596 +0x95
...

Segment Width

There is still one easy performance card we can play here. Segment width is how many data points are stored in one row, it is also the limit on how many points we can transfer in a single SQL operation. Segment width by default is 200, because a width higher than that causes rows to exceed a page and trigger TOAST. TOAST can be good or bad because it means data is stored in a separate table (not so good), but it also means it’s compressed, which may be an I/O win.

So what would happen if we set the segment width to 1000?

The picture changes significantly (see below). I was able to get the number of series to 500K, note the whopping 52,602 data points being written to the database per second! You can see we’re pushing it to the limit because the receiver queue is beginning to grow. I really wanted to get the rate up to 150K/sec, but it just didn’t want to go there.

And what would happen if we set the segment width to 4096?

Interestingly, the memory footprint is a tad larger while the vcache is leaner, the number of data points flushed per second is about same, though in fewer SQL statements, and the overall picture is about the same and the incoming queue still skyrockets at just about 100K/sec over 500K series.

Conclusion

There is plenty of places in Tgres code that could still be optimized.

One issue that would be worth looking into is exposing Tgres to the firehose on an empty database. The current code runs out of memory in under a minute when suddenly exposed to 300K new series at 150K/s. Probably the simplest solution to this would be to somehow detect that we’ve unable to keep up and start dropping data points. Eventually, when all the series are created and cached, performance should even out after the initial spike and all should be well.

In any event, it’s nice to be able to do something like this and know that it is performant as well:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tgres=> select t, r from ds
 join tv  on tv.ds_id = ds.id
where ident @> '{"name":"tgres.0_0_0_0.runtime.load.five"}'
  and tv.step_ms = 10000
order by t desc
limit 5;
           t            |       r
------------------------+----------------
 2017-02-23 22:31:50+00 | 1.256833462648
 2017-02-23 22:26:30+00 | 1.305209492142
 2017-02-23 22:24:10+00 | 1.554056287975
 2017-02-23 22:24:00+00 | 1.453365774931
 2017-02-23 22:23:50+00 | 1.380504724386
(5 rows)

Reference

For completness sake, the instance was created using Terraform config approximately like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
variable "aws_region" { default = "us-east-1" }
variable "aws_zone" { default = "us-east-1a" }
variable "key_name" { default = "REDACTED"

provider "aws" {
  region = "${var.aws_region}"
}

resource "aws_ebs_volume" "ebs_volume" {
  availability_zone = "${var.aws_zone}"
  size = 100
}

resource "aws_volume_attachment" "ebs_att" {
  device_name = "/dev/sdh"
  volume_id = "${aws_ebs_volume.ebs_volume.id}"
  instance_id = "${aws_instance.tgres-test-tmp.id}"
}

resource "aws_instance" "tgres-test-tmp" {
  ami = "ami-0b33d91d"
  instance_type = "c4.2xlarge"
  subnet_id = "REDACTED"
  vpc_security_group_ids = [
    "REDACTED"
  ]
  associate_public_ip_address = true
  key_name = "${var.key_name}"
}

And then the following commands were used to prime everyting:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
sudo mke2fs /dev/sdh
sudo mkdir /ebs
sudo mount /dev/sdh /ebs

sudo yum install -y postgresql95-server
sudo service postgresql95 initdb
sudo mkdir /ebs/pg
sudo mv /var/lib/pgsql95/data /ebs/pg/data
sudo ln -s /ebs/pg/data /var/lib/pgsql95/data

sudo vi /var/lib/pgsql95/data/postgresql.conf
# BEGIN postgres config - paste this somewhere in the file
autovacuum_work_mem = -1
synchronous_commit = off
commit_delay = 100000
autovacuum_max_workers = 10
autovacuum_naptime = 1s
autovacuum_vacuum_threshold = 2000
autovacuum_vacuum_scale_factor = 0.0
autovacuum_vacuum_cost_delay = 0
# END postgres config

sudo service postgresql95 restart

# create PG database

sudo su - postgres
createuser -s ec2-user   # note -s is superuser - not necessary for tgres but just in case
createdb tgres
exit

# Tgres (requires Go - I used 1.8)
# (or you can just scp it from some machine where you already have go environment)
mkdir golang
export GOPATH=~/golang/
go get github.com/tgres/tgres
cd /home/ec2-user/golang/src/github.com/tgres/tgres
go build
cp etc/tgres.conf.sample etc/tgres.conf

The tgres.conf file looked like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
min-step                = "10s"

pid-file =                 "tgres.pid"
log-file =                 "log/tgres.log"
log-cycle-interval =       "24h"

max-flushes-per-second      = 1000000 # NB - Deprecated setting
workers                     = 4       # NB - Deprecated setting

http-listen-spec            = "0.0.0.0:8888"
graphite-line-listen-spec   = "0.0.0.0:2003"
graphite-text-listen-spec   = "0.0.0.0:2003"
graphite-udp-listen-spec    = "0.0.0.0:2003"
graphite-pickle-listen-spec = "0.0.0.0:2004"

statsd-text-listen-spec     = "0.0.0.0:8125"
statsd-udp-listen-spec      = "0.0.0.0:8125"
stat-flush-interval         = "10s"
stats-name-prefix           = "stats"

db-connect-string = "host=/tmp dbname=tgres sslmode=disable"

[[ds]]
regexp = ".*"
step = "10s"
#heartbeat = "2h"
rras = ["10s:6h", "1m:7d", "1h:1y"]

Tgres was running with the following. The TGRES_BLASTER starts the blaster goroutine.

1
TGRES_BIND=0.0.0.0 TGRES_BLASTER=1 ./tgres

Once you have Tgres with the blaster running, you can control it via HTTP, e.g. the following would set it to 50K/s data points across 100K series. Setting rate to 0 pauses it.

1
curl -v "http://127.0.0.1:8888/blaster/set?rate=50000&n=100000"

Storing Time Series in PostgreSQL - Optimize for Write

| Comments

Continuing on the previous write up on how time series data can be stored in Postgres efficiently, here is another approach, this time providing for extreme write performance.

The “horizontal” data structure in the last article requires an SQL statement for every data point update. If you cache data points long enough, you might be able to collect a bunch for a series and write them out at once for a slight performance advantage. But there is no way to update multiple series with a single statement, it’s always at least one update per series. With a large number of series, this can become a performance bottleneck. Can we do better?

One observation we can make about incoming time series data is that commonly the data points are roughly from the same time period, the current time, give or take. If we’re storing data at regularly-spaced intervals, then it is extremely likely that many if not all of the most current data points from various time series are going to belong to the exact same time slot. Considering this observation, what if we organized data points in rows of arrays, only now we would have a row per timestamp while the position within the array would determine the series?

Lets create the tables:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE rra_bundle (
  id SERIAL NOT NULL PRIMARY KEY,
  step_ms INT NOT NULL,
  steps_per_row INT NOT NULL,
  size INT NOT NULL,
  latest TIMESTAMPTZ DEFAULT NULL);

CREATE TABLE rra (
  id SERIAL NOT NULL PRIMARY KEY,
  ds_id INT NOT NULL,
  rra_bundle_id INT NOT NULL,
  pos INT NOT NULL);

CREATE TABLE ts (
  rra_bundle_id INT NOT NULL,
  i INT NOT NULL,
  dp DOUBLE PRECISION[] NOT NULL DEFAULT '{}');

Notice how the step and size now become properties of the bundle rather than the rra which now refers to a bundle. In the ts table, i is the index in the round-robin archive (which in the previous “horizontal” layout would be the array index).

The data we used before was a bunch of temperatures, lets add two more series, one where temperature is 1 degree higher, and one where it’s 1 degree lower. (Not that it really matters).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
INSERT INTO rra_bundle VALUES (1, 60000, 1440, 28, '2008-04-02 00:00:00-00');

INSERT INTO rra VALUES (1, 1, 1, 1);
INSERT INTO rra VALUES (2, 2, 1, 2);
INSERT INTO rra VALUES (3, 3, 1, 3);

INSERT INTO ts VALUES (1, 0, '{64,65,63}');
INSERT INTO ts VALUES (1, 1, '{67,68,66}');
INSERT INTO ts VALUES (1, 2, '{70,71,69}');
INSERT INTO ts VALUES (1, 3, '{71,72,70}');
INSERT INTO ts VALUES (1, 4, '{72,73,71}');
INSERT INTO ts VALUES (1, 5, '{69,70,68}');
INSERT INTO ts VALUES (1, 6, '{67,68,66}');
INSERT INTO ts VALUES (1, 7, '{65,66,64}');
INSERT INTO ts VALUES (1, 8, '{60,61,59}');
INSERT INTO ts VALUES (1, 9, '{58,59,57}');
INSERT INTO ts VALUES (1, 10, '{59,60,58}');
INSERT INTO ts VALUES (1, 11, '{62,63,61}');
INSERT INTO ts VALUES (1, 12, '{68,69,67}');
INSERT INTO ts VALUES (1, 13, '{70,71,69}');
INSERT INTO ts VALUES (1, 14, '{71,72,70}');
INSERT INTO ts VALUES (1, 15, '{72,73,71}');
INSERT INTO ts VALUES (1, 16, '{77,78,76}');
INSERT INTO ts VALUES (1, 17, '{70,71,69}');
INSERT INTO ts VALUES (1, 18, '{71,72,70}');
INSERT INTO ts VALUES (1, 19, '{73,74,72}');
INSERT INTO ts VALUES (1, 20, '{75,76,74}');
INSERT INTO ts VALUES (1, 21, '{79,80,78}');
INSERT INTO ts VALUES (1, 22, '{82,83,81}');
INSERT INTO ts VALUES (1, 23, '{90,91,89}');
INSERT INTO ts VALUES (1, 24, '{69,70,68}');
INSERT INTO ts VALUES (1, 25, '{75,76,74}');
INSERT INTO ts VALUES (1, 26, '{80,81,79}');
INSERT INTO ts VALUES (1, 27, '{81,82,80}');

Notice that every INSERT adds data for all three of our series in a single database operation!

Finally, let us create the view. (How it works is described in detail in the previous article)

1
2
3
4
5
6
7
8
9
CREATE VIEW tv AS
  SELECT rra.id as rra_id,
     rra_bundle.latest - INTERVAL '1 MILLISECOND' * rra_bundle.step_ms * rra_bundle.steps_per_row *
       MOD(rra_bundle.size + MOD(EXTRACT(EPOCH FROM rra_bundle.latest)::BIGINT*1000/(rra_bundle.step_ms * rra_bundle.steps_per_row),
       rra_bundle.size) - i, rra_bundle.size) AS t,
     dp[pos] AS r
  FROM rra AS rra
  JOIN rra_bundle AS rra_bundle ON rra_bundle.id = rra.rra_bundle_id
  JOIN ts AS ts ON ts.rra_bundle_id = rra_bundle.id;

And now let’s verify that it works:

1
2
3
4
5
6
7
8
=> select * from tv where rra_id = 1 order by t;
 rra_id |           t            | r
 --------+------------------------+----
       1 | 2008-03-06 00:00:00-00 | 64
       1 | 2008-03-07 00:00:00-00 | 67
       1 | 2008-03-08 00:00:00-00 | 70
 ...

This approach makes writes blazingly fast though it does have its drawbacks. For example there is no way to read a single series - even though the view selects a single array element, under the hood Postgres reads the whole row. Given that time series is more write intensive and rarely read, this may not be a bad compromise.

Simple Tgres Part II - a High Rate Counter

| Comments

Continuing on the the previous post on simple use of Tgres components, let’s try to count something that goes by really fast.

This time let’s start out with creating a memory-based SerDe. This means that all our data is in memory and there is no database backing our series.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "fmt"
    "net/http"
    "time"

    "github.com/tgres/tgres/dsl"
    h "github.com/tgres/tgres/http"
    "github.com/tgres/tgres/receiver"
    "github.com/tgres/tgres/rrd"
    "github.com/tgres/tgres/serde"
)

func main() {

    step := 1 * time.Second // 1 second resolution
    span := 600 * step      // spanning 10 minutes

    // In-memory SerDe
    ms := serde.NewMemSerDe()

    // Create a receiver of our data points backed by the above
    // memory SerDe
    rcvr := receiver.New(ms, &receiver.SimpleDSFinder{&rrd.DSSpec{
        Step: step,
        RRAs: []rrd.RRASpec{
            rrd.RRASpec{Function: rrd.WMEAN,
                Step: step,
                Span: span,
            },
        }}})
    rcvr.Start()

Now let’s create a goroutine which creates data points as fast as it can, the difference from the previous blog post is that we are using QueueGauge(), which is a paced metric, meaning that it flushes to the time series only periodically (once per second by default) so as to not overwhelm the I/O and or network (even though in this case it doesn’t really matter since we’re using a memory-based SerDe anyway).

1
2
3
4
5
6
7
8
9
10
    start := time.Now()
    end := start.Add(span)

    go func() {
        n := 0
        for t := time.Now(); t.Before(end); t = time.Now() {
            rcvr.QueueGauge(serde.Ident{"name":"foo.bar"}, float64(n)/(t.Sub(start)).Seconds())
            n++
        }
    }()

And finally, as before, we need to hook up a couple of http handlers:

1
2
3
4
5
6
7
8
9
10
    db := dsl.NewNamedDSFetcher(ms.Fetcher())

    http.HandleFunc("/metrics/find", h.GraphiteMetricsFindHandler(db))
    http.HandleFunc("/render", h.GraphiteRenderHandler(db))

    listenSpec := ":8088"
    fmt.Printf("Waiting for requests on %s\n", listenSpec)
    http.ListenAndServe(listenSpec, nil)

} // end of main()

Now if we run the above code with something like go run simpletgres.go, we’ll notice that unlike with the previous example, the web server starts right away, and the data points are being written while the server is running. If we aim Grafana at it, we should be able to see the chart update in real time.

After a couple of minutes, mine looks like this:

So my macbook can crank these out at about 2.5 million per second.

In my experience instrumenting my apps with simple counters like this and having them available directly from the app without having to send them to a separate statsd server somewhere has been extremely useful in helping understand performance and other issues.

Why Is There No Formal Definition of Time Series?

| Comments

If you’re reading this, chances are you may have searched for definition of “Time Series”. And, like me, you were probably disappointed by what you’ve found.

The most popular “definition” I come across amongst our fellow programmer folk is that it’s “data points with timestamps”. Or something like that. And you can make charts from it. And that’s about it, alas.

The word time suggests that is has something to do with time. At first it seems reasonable, I bite. The word series is a little more peculiar. A mathematician would argue that a series is a sum of a sequence. Most people though think “series” and “sequence” are the same thing, and that’s fine. But it’s a clue that time series is not a scientific term, because it would have been called time sequence most likely.

Lets get back to the time aspect of it. Why do data points need timestamps? Or do they? Isn’t it the time interval between points that is most essential, rather than the absolute time? And if the data points are spaced equally (which conforms to the most common definiton of time series), then what purpose would any time-related information attached to a data point serve?

To understand this better, picture a time chart. Of anything - temperature, price of bitcoin over a week, whatever. Now think - does the absolute time of every point provide any useful information to you? Does the essential meaning of the chart change depending on whether it shows the price of bitcoin in the year 2016 or 2098 or 10923?

Doesn’t it seem like “time” in “time series” is a bit of a red herring?

Here is another example. Let’s say I decide to travel from San-Francisco to New York taking measurements of elevation above the sea level at every mile. I then plot that sequence on a chart where x-axis is distance traveled and y-axis is elevation. You would agree that this chart is not a “time series” by any stretch, right? But then if I renamed x-axis to “time traveled” (let’s assume I moved at constant speed), the chart wouldn’t change at all, but now it’s okay to call it “time series”?

So it’s no surprise that there is no formal definition of “time series”. In the end a “time series” is just a sequence. There are no timestamps required and there is nothing at all special regarding a dimension being time as opposed to any other unit, which is why there is no mathematical definition of “time series”. Time series is a colloquial term etymological origins of which are not known to me, but it’s not a thing from a scientific perspective, I’m afraid.

Next time you hear “time series” just substitute it with “sequence” and see how much sense that makes. For example a “time series database” is a “sequence database”, i.e. database optimized for sequences. Aren’t all relational databases optimized for sequences?

Something to think about over the holidays…

Edit: Someone brought up the subject of unevenly-spaced time series. All series are evenly spaced given proper resolution. An unevenly-spaced time series with timestamps accurate to 1 millisecond is a sparse evenly-spaced series with a 1 millisecond resolution.

Simple Time Series App With Tgres

| Comments

Did you know you can use Tgres components in your code without PostgreSQL, and in just a dozen lines of code instrument your program with a time series. This example shows a complete server emulating Graphite API which you can use with Grafana (or any other tool).

In this example we will be using three Tgres packages like so (in addition to a few standard ones, I’m skipping them here for brevity - complete source code gist):

1
2
3
4
5
import (
    "github.com/tgres/tgres/dsl"
    h "github.com/tgres/tgres/http"
    "github.com/tgres/tgres/rrd"
)

First we need a Data Source. This will create a Data Source containing one Round Robin Archive with a 10 second resolution spanning 1000 seconds.

1
2
3
4
5
6
7
8
9
step := 10 * time.Second
span := 100 * step

ds := rrd.NewDataSource(rrd.DSSpec{
    Step: 1 * time.Second,
    RRAs: []rrd.RRASpec{
        rrd.RRASpec{Step: step, Span: span},
    },
})

Let’s shove a bunch of data points into it. To make it look extra nice, we can make these points look like a sinusoid with this little function:

1
2
3
4
func sinTime(t time.Time, span time.Duration) float64 {
    x := 2 * math.Pi / span.Seconds() * float64(t.Unix()%(span.Nanoseconds()/1e9))
    return math.Sin(x)
}

And now for the actual population of the series:

1
2
3
4
5
6
start := time.Now().Add(-span)

for i := 0; i < int(span/step); i++ {
    t := start.Add(time.Duration(i) * step)
    ds.ProcessDataPoint(sinTime(t, span), t)
}

We will also need to create a NamedDSFetcher, the structure which knows how to search dot-separated series names a la Graphite.

1
db := dsl.NewNamedDSFetcherMap(map[string]rrd.DataSourcer{"foo.bar": ds})

Finally, we need to create two http handlers which will mimic a Graphite server and start listening for requests:

1
2
3
4
5
6
http.HandleFunc("/metrics/find", h.GraphiteMetricsFindHandler(db))
http.HandleFunc("/render", h.GraphiteRenderHandler(db))

listenSpec := ":8088"
fmt.Printf("Waiting for requests on %s\n", listenSpec)
http.ListenAndServe(listenSpec, nil)

Now if you point Grafana at it, it will happily think it’s Graphite and should show you a chart like this:

Note that you can use all kinds of Graphite functions at this point - it all “just works”.

Enjoy!

Storing Time Series in PostgreSQL (Continued)

| Comments

Edit: there is now a part iii in this series of articles.

I have previously written how time series can be stored in PostgreSQL efficiently using arrays.

As a continuation of that article, I shall attempt to describe in detail the inner workings of an SQL view that Tgres uses to make an array of numbers appear as a regular table (link to code).

In short, I will explain how incomprehensible data like this:

1
2
3
4
5
6
7
=> select * from ts;
 rra_id | n |           dp
--------+---+------------------------
      1 | 0 | {64,67,70,71,72,69,67}
      1 | 1 | {65,60,58,59,62,68,70}
      1 | 2 | {71,72,77,70,71,73,75}
      1 | 3 | {79,82,90,69,75,80,81}

… can be transformed in an SQL view to appear as so:

1
2
3
4
5
6
7
8
=> select * from tv order by t;
 rra_id |           t            | r
--------+------------------------+----
      1 | 2008-03-06 00:00:00+00 | 64
      1 | 2008-03-07 00:00:00+00 | 67
      1 | 2008-03-08 00:00:00+00 | 70
      1 | 2008-03-09 00:00:00+00 | 71
...

This write up will make a lot more sense if you read the previous post first. To recap, Tgres stores series in an array broken up over multiple table rows each containing an array representing a segment of the series. The series array is a round-robin structure, which means that it occupies a fixed amount of space and we do not need to worry about expiring data points: the round-robin nature of the array takes care of it by overwriting old data with new on assignment.

An additional benefit of such a fixed interval round-robin structure is that we do not need to store timestamps for every data point. If we know the timestamp of the latest entry along with the series step and size, we can extrapolate the timestamp of any point in the series.

Tgres creates an SQL view which takes care of this extrapolation and makes this data easy to query. Tgres actually uses this view as its only source of time series information when reading from the database thus delegating all the processing to the database server, where it is close to the data and most efficient.

If you would like to follow along on the Postgres command line, feel free to create and populate the tables with the following SQL, which is nearly identical to the schema used by Tgres:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE rra (
  id SERIAL NOT NULL PRIMARY KEY,
  step_s INT NOT NULL,
  steps_per_row INT NOT NULL,
  size INT NOT NULL,
  width INT NOT NULL,
  latest TIMESTAMPTZ DEFAULT NULL);

CREATE TABLE ts (
  rra_id INT NOT NULL,
  n INT NOT NULL,
  dp DOUBLE PRECISION[] NOT NULL DEFAULT '{}');

INSERT INTO rra VALUES (1, 60, 1440, 28, 7, '2008-04-02 00:00:00-00');

INSERT INTO ts VALUES (1, 0, '{64,67,70,71,72,69,67}');
INSERT INTO ts VALUES (1, 1, '{65,60,58,59,62,68,70}');
INSERT INTO ts VALUES (1, 2, '{71,72,77,70,71,73,75}');
INSERT INTO ts VALUES (1, 3, '{79,82,90,69,75,80,81}');

And finally create the view:

1
2
3
4
5
6
7
8
CREATE VIEW tv AS
  SELECT rra.id rra_id,
         latest - INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row *
           MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
           - (generate_subscripts(dp,1) + n * width), rra.size) AS t,
         UNNEST(dp) AS r
    FROM rra
   INNER JOIN ts ts ON ts.rra_id = rra.id;

Now give it a whirl with a SELECT * FROM tv ORDER BY t. Impressive? So how does it work?

First let’s go over the columns of the rra table.

  • step_s: the minimal unit of time expressed in seconds (60 or 1 minute in the above data).
  • steps_per_row: the number of the step_s intervals in one slot of our time series. In our example it is 1440, which is the number of minutes in a day, thus making our time series resolution one day.
  • size: number of slots in the series. Ours is 28, i.e. four weeks.
  • width: size of a segment which will be stored in a single row, which in our case is 7 (one week).
  • latest: the timestamp of the last data point in the series.

Next, let’s look at the UNNEST keyword in the SQL of the view. UNNEST takes an array and turns it into row, e.g.:

1
2
3
4
5
6
=> SELECT UNNEST(dp) AS r FROM ts;
 r
----
 64
 67
...

UNNEST works in conjunction with the generate_subscripts PostgreSQL function which generates index values:

1
2
3
4
5
6
=> SELECT generate_subscripts(dp,1) AS i, UNNEST(dp) AS r FROM ts;
 i | r
---+----
 1 | 64
 2 | 67
...

Let us now zoom in on the very long expression in the view, here it is again:

1
2
3
latest - INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row *
  MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
  - (generate_subscripts(dp,1) + n * width), rra.size) AS t

A perhaps not immediately apparent trick to how all this works is that all our series are aligned on the beginning of the epoch. This means that at UNIX time 0, any series’ slot index is 0. From then on it increments sequentially until the series size is reached, at which point it wraps-around to 0 (thus “round-robin”). Armed with this information we can calculate the index for any point in time.

The formula for calculating the index i for a given time t is:

1
i = t/step % size.

We need time to be expressed as a UNIX time which is done with EXTRACT(EPOCH FROM rra.latest)::BIGINT. Now you should recognize the above formula in the more verbose expression

1
MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size)

where rra.step_s * rra.steps_per_row is the size of our series in seconds.

Next, we need to compute the distance between the current slot and the last slot (for which we know the timestamp). I.e. if the last slot is i and the slot we need the timestamp for is j, the distance between them is i-j, but with a caveat: it is possible for j to be greater than i if the series wraps around, in which case the distance is the sum of the distance from j to the end of the series and the distance from the beginning to i. If you ponder over it with a pencil and paper long enough, you will arrive at the following formula for distance between two slots i and j in a wrap-around array:

1
distance = (size + j - i) % size

Another thing to consider is that we’re splitting our series across multiple rows, thus the actual index of any point is the subscript into the current segment plus the index of the segment itself (the n column) multiplied by the wdith of the segment: generate_subscripts(dp,1) + n * width.

Which pieced together in SQL now looks like this:

1
2
MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
  - (generate_subscripts(dp,1) + n * width), rra.size)

Astute readers should notice an unexplained + 1. This is because PostgreSQL arrays are 1-based.

Now we need to convert the distance expressed in array slots into a time interval, which we do by multiplying it by INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row.

And finally, we need to subtract the above time interval from the latest stamp which yields (ta-da!) the timestamp of the current slot:

1
2
3
latest - INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row *
  MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
  - (generate_subscripts(dp,1) + n * width), rra.size) AS t

That’s it! And even though this may look complicated, from the computational view point it is very efficient, and PostgreSQL can handle it easily.

As an exercise, try setting latest to various timestamps and observe how it affects the output of the view and see if you can explain how and why it happens.

Parsing Table Names From SQL

| Comments

Sometimes it is useful to extract table names from an SQL statement, for example if you are trying to figure out dependencies for your Hive or BigQuery (or whatever) tables.

It is actually a lot simpler than it seems and you don’t need to write your own SQL parser or find one out there. In SQL table names always follow the FROM and JOIN keywords. So all you have to do is split the statemement into tokens, and scan the list for any mention of FROM or JOIN and grab the next token.

Here is a very simplistic Python function that does this using regular expressions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def tables_in_query(sql_str):

    # remove the /* */ comments
    q = re.sub(r"/\*[^*]*\*+(?:[^*/][^*]*\*+)*/", "", sql_str)

    # remove whole line -- and # comments
    lines = [line for line in q.splitlines() if not re.match("^\s*(--|#)", line)]

    # remove trailing -- and # comments
    q = " ".join([re.split("--|#", line)[0] for line in lines])

    # split on blanks, parens and semicolons
    tokens = re.split(r"[\s)(;]+", q)

    # scan the tokens. if we see a FROM or JOIN, we set the get_next
    # flag, and grab the next one (unless it's SELECT).

    result = set()
    get_next = False
    for tok in tokens:
        if get_next:
            if tok.lower() not in ["", "select"]:
                result.add(tok)
            get_next = False
        get_next = tok.lower() in ["from", "join"]

    return result

This is obviously not perfect, for example in BigQuery there is a possibility that what follows SELECT is a UDF name, but I’ll leave working around that as an exercise for the reader.

Load Testing Tgres

| Comments

Edit: There is an update to this story.

So I finally got around to some load testing of Tgres. Load testing is mysterious, it never goes the way you think it would, and what you learn is completely unexpcted.

Given that I presently don’t have any spare big iron at my disposal and my “servers” are my macbook and an old thinkpad, all I really was after is making sure that Tgres is “good enough” whatever that means. And I think it is.

I was hoping to gather some concrete numbers and may be even make a chart or two, but in the end it all turned out to be so tedious and time consuming, running the tests with various setting for hours on, that I just gave up for now - after all, “premature optimization is the root of all evil”.

I also wanted to see how it stacks up against Graphite carbon-cache.py. As in, is it on par, or much better or much worse. My expectation was that Graphite could outperform it, because what it does is so much simpler (and I was right). First thing I tried to do is overwhelm Graphite. I never succeeded in that - I probably could have tried harder, but I quickly learned that I don’t know what symptoms I’m looking for. I wronte a Go program that blasted UDP data points at 10K/sec across 10K different series, and taking it to over 20K/sec saturated my network before Graphite showed any signs of deterioration. There was also no reliable way for me to audit the data points - may be some of them got lost, but at 600K+ per minute, I don’t know of any practical way of doing it. Not without a lot of work, at least.

With Tgres things were much easier. The weakest link is, not surpisingly, PostgreSQL. What I learned was that there are two kinds of deterioration when it comes to PostgreSQL though. The first one is outright, and that one manifests in database requests getting progressively slower until Tgres gets stuck with all its channels full.

You can make PostgreSQL very significantly faster with a few simple tricks. For example the following settings can make it much faster:

1
2
synchronous_commit = off
commit_delay = 100000

This post isn’t about PostgreSQL, and so I’m not going to get into the details of what this does, there is plenty of documentation and blog posts on the subject. If you plan on hosting a busy Tgres set up, you should probably have the above settings.

The second way PostgreSQL deteriorates is not immediately apparent - it is the infamous table bloat. Getting autovacuum to keep up with the ts table (which stores all the time series) is tricky, and once you’ve ran out of options to tweak, this is probably it - the maximum load the database can handle, even if it may seem relatively calm.

Autovacuum has a lot of knobs, but ultimately they all exist to take advantage of the variability of load in a database, i.e. you can let it get behind during the day and catch up at night when the database is not as busy. It doesn’t really work with time series, which are not variable by nature - if you’re receiving 5 thousand data points per second at noon, you can expect the same rate at 4am. I think the setting that worked best for me were:

1
2
3
4
5
autovacuum_max_workers = 10
autovacuum_naptime = 1s
autovacuum_vacuum_threshold = 2000
autovacuum_vacuum_scale_factor = 0.0
autovacuum_vacuum_cost_delay = 0 # disable cost based

To the best of my undestanding the above setting disables cost-based autovacuum (meaning it doesn’t pause periodically to yield resources to the normal db tasks), makes autovacuum kick in after 2K updates (which happens in no time), and sleeps 1s in between runs, which means it’s running pretty much continuosly.

I was able to sustain a load of ~6K datapoints per second across 6K series - anything higher caused my “database server” (which is a 2010 i7 Thinkpad) autovacuum to get behind.

I also did some testing of how TOAST affects performance. There is no setting for turning TOAST on or off, but it can easily be done in Tgres by changing the number of data points per row. The default is 768 which is about 75% of a page. If you for example double it, then each row becomes larger than a page and TOAST kicks in. TOAST is compressed, which is an advantage, but it is a separate table, which is a disadvantage. In the end it seemed like the database detirorated quicker with TOAST, but it was rather inconclusive.

In the end the key factor, or the weakest link, was the rate of queries per second. I now added a special rate limiting setting feature to Tgres (max-flushes-per-second) which trumps all other settings and will keep your database happy at the expense of Tgres possibly caching a little more points in memory than expected.

I will probably get back to some more load testing in a while, but for now this is it.