Grisha Trubetskoy

Notes to self.

Tgres Load Testing Follow Up

| Comments

To follow up on the previous post, after a bunch of tweaking, here is Tgres (commit) receiving over 150,000 data points per second across 500,000 time series without any signs of the queue size or any other resource blowing up.

This is both Tgres and Postgres running on the same i2.2xlarge EC2 instance (8 cores, 64GB, SSD).

At this point I think there’s been enough load testing and optimization, and I am going to get back to crossing the t’s and dotting the i’s so that we can release the first version of Tgres.

PostgreSQL vs Whisper, Which Is Faster?

| Comments

Note: there is an update to this post.

TL;DR

On a 8 CPU / 16 GB EC2 instance, Tgres can process 150,000 data points per second across 300,000 series (Postgres running on the same machine). With some tweaks we were able to get the number of series to half a million, flushing ~60K data points per second.

Now the long version…

If you were to ask me whether Tgres could outperform Graphite, just a couple of months ago my answer would have been “No”. Tgres uses Postgres to store time series data, while Graphite stores data by writing to files directly, the overhead of the relational database just seemed too great.

Well, I think I’ve managed to prove myself wrong. After re-working Tgres to use the write-optimized layout, I’ve run some tests on AWS yielding unexpectedly promising results.

As a benchmark I targeted the excellent blog post by Jason Dixon describing his AWS Graphite test. My goal was to get to at least half the level of performance described therein. But it appears the combination of Go, Postgres and some clever data structuring has been able to beat it, not without breaking a little sweat, but it has.

My test was conducted on a c4.2xlarge instance, which has 8 cores and 16 GB, using 100GB EBS (which, if I understood it correctly, comes with 300 IOPS, please comment if I’m wrong). The “c4” instances are supposed to be some of the highest speed CPU AWS has to offer, but compare this with the instance used in the Graphite test, an i2.4xlarge (16 CPU/ 122GB), it had half the CPU cores and nearly one tenth of the RAM.

Before I go any further, here is the obligatory screenshot, then my observations and lessons learned in the process, as well as a screenshot depicting even better performance.

The Tgres version running was this one, with the config detailed at the bottom of the post.

Postgres was whatever yum install postgresql95-server brings your way, with the data directory moved to the EBS volume formatted using ext4 (not that I think it matters). The Postgres config was modified to allow a 100ms commit delay and to make autovacuum extra aggressive. I did not increase any memory buffers and left everything else as is. Specifically, these were the changes:

1
2
3
4
5
6
7
8
autovacuum_work_mem = -1
synchronous_commit = off
commit_delay = 100000
autovacuum_max_workers = 10
autovacuum_naptime = 1s
autovacuum_vacuum_threshold = 2000
autovacuum_vacuum_scale_factor = 0.0
autovacuum_vacuum_cost_delay = 0

The data points for the test were generated by a goroutine in the Tgres process itself. In the past I’ve found that blasting a server with this many UDP packets can be tricky and hardware/network intensive. It’s also hard to tell when/if they get dropped and why, etc. Since Go is not known for having problems in its network stack, I was not too worried about it, I just wanted a reliable and configurable source of incoming packets, and in Go world writing a simple goroutine seemed like the right answer.

Somewhat Random Notes and Making Tgres Even Faster

Determining failure

Determining when we are “at capacity” is tricky. I’ve mostly looked at two factors (aside from the obvious - running out of memory/disk, becoming unresponsive, etc): receiver queue size and Postgres table bloat.

Queue size

Tgres uses “elastic channels” (so eloquently described here by Nick Patavalis) for incoming data points and to load series from Postgres. These are channel-like structures that can grow to arbitrary length only limited by the memory available. This is done so as to be able to take maximum advantage of the hardware at hand. If any of those queues starts growing out of control, we are failing. You can see in the picture that at about 140K data points per second the receiver queue started growing, though it did stay steady at this size and never spun out of control (the actual test was left overnight at this rate just to make sure).

PG Table Bloat

Table bloat is a phenomenon affecting Postgres in write-intensive situations because of its adherence to the MVCC. It basically means that pages on disk are being updated faster than the autovacuum process can keep up with them and the table starts growing out of control.

To monitor for table bloat, I used a simple formula which determined the approximate size of the table based on the row count (our data is all floats, which makes it very predictable) and compared it with the actual size. If the actual size exceeded the estimated size, that’s considered bloat. Bloat is reported in the “TS Table Size” chart. A little bloat is fine, and you can see that it stayed in fairly low percent throughout the test.

In the end, though more research is warranted, it may just turn out that contrary to every expectation PostgreSQL was not the limiting factor here. The postmaster processes stayed below 170MB RSS, which is absolutely remarkable, and Grafana refreshes were very quick even at peak loads.

Memory consumption

Tgres has a slight limitation in that creating a series is expensive. It needs to check with Postgres and for reasons I don’t want to bore you with it’s always a SELECT, optionally followed by an “UPSERT”. This takes time, and during the ramp-up period when the number of series is growing fast and lots of them need to be created, the Go runtime ends up consuming a lot of memory. You can see that screenshot image reports 4.69GB. If I were to restart Tgres (which would cause all existing DS names to be pre-cached) its memory footprint stayed at about 1.7GB. More work needs to be done to figure out what accounts for the difference.

Data Point Rate and Number of Series

The rate of data points that need to be saved to disk is a function of the number of series and the resolution of the RRAs. To illustrate, if I have one series at 1 point per second, even if I blast a million data points per second, still only 1 data point per second needs to be saved.

There is an important difference between Graphite and Tgres in that Tgres actually adjusts the final value considering the every data point value using weighted mean, while Graphite just ignores all points but the last. So Tgres does a bit more work, which adds up quickly at 6-figure rates per second.

The Graphite test if I read the chart correctly was able to process ~70K data points per second across 300K series. My test had 300K series and data points were coming in at over 150K/s. But just out of curiosity, I tried to push it to its limit.

At 400 series, you can see clear signs of deterioration. You can see how vcache isn’t flushed fast enough leaving gaps at the end of series. If we stop the data blast, it does eventually catch up, so long as there is memory for the cache.

If you don’t catch this condition in time, Tgres will die with:

1
2
3
4
5
6
fatal error: runtime: out of memory

runtime stack:
runtime.throw(0xa33e5a, 0x16)
        /home/grisha/.gvm/gos/go1.8/src/runtime/panic.go:596 +0x95
...

Segment Width

There is still one easy performance card we can play here. Segment width is how many data points are stored in one row, it is also the limit on how many points we can transfer in a single SQL operation. Segment width by default is 200, because a width higher than that causes rows to exceed a page and trigger TOAST. TOAST can be good or bad because it means data is stored in a separate table (not so good), but it also means it’s compressed, which may be an I/O win.

So what would happen if we set the segment width to 1000?

The picture changes significantly (see below). I was able to get the number of series to 500K, note the whopping 52,602 data points being written to the database per second! You can see we’re pushing it to the limit because the receiver queue is beginning to grow. I really wanted to get the rate up to 150K/sec, but it just didn’t want to go there.

And what would happen if we set the segment width to 4096?

Interestingly, the memory footprint is a tad larger while the vcache is leaner, the number of data points flushed per second is about same, though in fewer SQL statements, and the overall picture is about the same and the incoming queue still skyrockets at just about 100K/sec over 500K series.

Conclusion

There is plenty of places in Tgres code that could still be optimized.

One issue that would be worth looking into is exposing Tgres to the firehose on an empty database. The current code runs out of memory in under a minute when suddenly exposed to 300K new series at 150K/s. Probably the simplest solution to this would be to somehow detect that we’ve unable to keep up and start dropping data points. Eventually, when all the series are created and cached, performance should even out after the initial spike and all should be well.

In any event, it’s nice to be able to do something like this and know that it is performant as well:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tgres=> select t, r from ds
 join tv  on tv.ds_id = ds.id
where ident @> '{"name":"tgres.0_0_0_0.runtime.load.five"}'
  and tv.step_ms = 10000
order by t desc
limit 5;
           t            |       r
------------------------+----------------
 2017-02-23 22:31:50+00 | 1.256833462648
 2017-02-23 22:26:30+00 | 1.305209492142
 2017-02-23 22:24:10+00 | 1.554056287975
 2017-02-23 22:24:00+00 | 1.453365774931
 2017-02-23 22:23:50+00 | 1.380504724386
(5 rows)

Reference

For completness sake, the instance was created using Terraform config approximately like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
variable "aws_region" { default = "us-east-1" }
variable "aws_zone" { default = "us-east-1a" }
variable "key_name" { default = "REDACTED"

provider "aws" {
  region = "${var.aws_region}"
}

resource "aws_ebs_volume" "ebs_volume" {
  availability_zone = "${var.aws_zone}"
  size = 100
}

resource "aws_volume_attachment" "ebs_att" {
  device_name = "/dev/sdh"
  volume_id = "${aws_ebs_volume.ebs_volume.id}"
  instance_id = "${aws_instance.tgres-test-tmp.id}"
}

resource "aws_instance" "tgres-test-tmp" {
  ami = "ami-0b33d91d"
  instance_type = "c4.2xlarge"
  subnet_id = "REDACTED"
  vpc_security_group_ids = [
    "REDACTED"
  ]
  associate_public_ip_address = true
  key_name = "${var.key_name}"
}

And then the following commands were used to prime everyting:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
sudo mke2fs /dev/sdh
sudo mkdir /ebs
sudo mount /dev/sdh /ebs

sudo yum install -y postgresql95-server
sudo service postgresql95 initdb
sudo mkdir /ebs/pg
sudo mv /var/lib/pgsql95/data /ebs/pg/data
sudo ln -s /ebs/pg/data /var/lib/pgsql95/data

sudo vi /var/lib/pgsql95/data/postgresql.conf
# BEGIN postgres config - paste this somewhere in the file
autovacuum_work_mem = -1
synchronous_commit = off
commit_delay = 100000
autovacuum_max_workers = 10
autovacuum_naptime = 1s
autovacuum_vacuum_threshold = 2000
autovacuum_vacuum_scale_factor = 0.0
autovacuum_vacuum_cost_delay = 0
# END postgres config

sudo service postgresql95 restart

# create PG database

sudo su - postgres
createuser -s ec2-user   # note -s is superuser - not necessary for tgres but just in case
createdb tgres
exit

# Tgres (requires Go - I used 1.8)
# (or you can just scp it from some machine where you already have go environment)
mkdir golang
export GOPATH=~/golang/
go get github.com/tgres/tgres
cd /home/ec2-user/golang/src/github.com/tgres/tgres
go build
cp etc/tgres.conf.sample etc/tgres.conf

The tgres.conf file looked like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
min-step                = "10s"

pid-file =                 "tgres.pid"
log-file =                 "log/tgres.log"
log-cycle-interval =       "24h"

max-flushes-per-second      = 1000000 # NB - Deprecated setting
workers                     = 4       # NB - Deprecated setting

http-listen-spec            = "0.0.0.0:8888"
graphite-line-listen-spec   = "0.0.0.0:2003"
graphite-text-listen-spec   = "0.0.0.0:2003"
graphite-udp-listen-spec    = "0.0.0.0:2003"
graphite-pickle-listen-spec = "0.0.0.0:2004"

statsd-text-listen-spec     = "0.0.0.0:8125"
statsd-udp-listen-spec      = "0.0.0.0:8125"
stat-flush-interval         = "10s"
stats-name-prefix           = "stats"

db-connect-string = "host=/tmp dbname=tgres sslmode=disable"

[[ds]]
regexp = ".*"
step = "10s"
#heartbeat = "2h"
rras = ["10s:6h", "1m:7d", "1h:1y"]

Tgres was running with the following. The TGRES_BLASTER starts the blaster goroutine.

1
TGRES_BIND=0.0.0.0 TGRES_BLASTER=1 ./tgres

Once you have Tgres with the blaster running, you can control it via HTTP, e.g. the following would set it to 50K/s data points across 100K series. Setting rate to 0 pauses it.

1
curl -v "http://127.0.0.1:8888/blaster/set?rate=50000&n=100000"

Storing Time Series in PostgreSQL - Optimize for Write

| Comments

Continuing on the previous write up on how time series data can be stored in Postgres efficiently, here is another approach, this time providing for extreme write performance.

The “horizontal” data structure in the last article requires an SQL statement for every data point update. If you cache data points long enough, you might be able to collect a bunch for a series and write them out at once for a slight performance advantage. But there is no way to update multiple series with a single statement, it’s always at least one update per series. With a large number of series, this can become a performance bottleneck. Can we do better?

One observation we can make about incoming time series data is that commonly the data points are roughly from the same time period, the current time, give or take. If we’re storing data at regularly-spaced intervals, then it is extremely likely that many if not all of the most current data points from various time series are going to belong to the exact same time slot. Considering this observation, what if we organized data points in rows of arrays, only now we would have a row per timestamp while the position within the array would determine the series?

Lets create the tables:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE rra_bundle (
  id SERIAL NOT NULL PRIMARY KEY,
  step_ms INT NOT NULL,
  steps_per_row INT NOT NULL,
  size INT NOT NULL,
  latest TIMESTAMPTZ DEFAULT NULL);

CREATE TABLE rra (
  id SERIAL NOT NULL PRIMARY KEY,
  ds_id INT NOT NULL,
  rra_bundle_id INT NOT NULL,
  pos INT NOT NULL);

CREATE TABLE ts (
  rra_bundle_id INT NOT NULL,
  i INT NOT NULL,
  dp DOUBLE PRECISION[] NOT NULL DEFAULT '{}');

Notice how the step and size now become properties of the bundle rather than the rra which now refers to a bundle. In the ts table, i is the index in the round-robin archive (which in the previous “horizontal” layout would be the array index).

The data we used before was a bunch of temperatures, lets add two more series, one where temperature is 1 degree higher, and one where it’s 1 degree lower. (Not that it really matters).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
INSERT INTO rra_bundle VALUES (1, 60000, 1440, 28, '2008-04-02 00:00:00-00');

INSERT INTO rra VALUES (1, 1, 1, 1);
INSERT INTO rra VALUES (2, 2, 1, 2);
INSERT INTO rra VALUES (3, 3, 1, 3);

INSERT INTO ts VALUES (1, 0, '{64,65,63}');
INSERT INTO ts VALUES (1, 1, '{67,68,66}');
INSERT INTO ts VALUES (1, 2, '{70,71,69}');
INSERT INTO ts VALUES (1, 3, '{71,72,70}');
INSERT INTO ts VALUES (1, 4, '{72,73,71}');
INSERT INTO ts VALUES (1, 5, '{69,70,68}');
INSERT INTO ts VALUES (1, 6, '{67,68,66}');
INSERT INTO ts VALUES (1, 7, '{65,66,64}');
INSERT INTO ts VALUES (1, 8, '{60,61,59}');
INSERT INTO ts VALUES (1, 9, '{58,59,57}');
INSERT INTO ts VALUES (1, 10, '{59,60,58}');
INSERT INTO ts VALUES (1, 11, '{62,63,61}');
INSERT INTO ts VALUES (1, 12, '{68,69,67}');
INSERT INTO ts VALUES (1, 13, '{70,71,69}');
INSERT INTO ts VALUES (1, 14, '{71,72,70}');
INSERT INTO ts VALUES (1, 15, '{72,73,71}');
INSERT INTO ts VALUES (1, 16, '{77,78,76}');
INSERT INTO ts VALUES (1, 17, '{70,71,69}');
INSERT INTO ts VALUES (1, 18, '{71,72,70}');
INSERT INTO ts VALUES (1, 19, '{73,74,72}');
INSERT INTO ts VALUES (1, 20, '{75,76,74}');
INSERT INTO ts VALUES (1, 21, '{79,80,78}');
INSERT INTO ts VALUES (1, 22, '{82,83,81}');
INSERT INTO ts VALUES (1, 23, '{90,91,89}');
INSERT INTO ts VALUES (1, 24, '{69,70,68}');
INSERT INTO ts VALUES (1, 25, '{75,76,74}');
INSERT INTO ts VALUES (1, 26, '{80,81,79}');
INSERT INTO ts VALUES (1, 27, '{81,82,80}');

Notice that every INSERT adds data for all three of our series in a single database operation!

Finally, let us create the view. (How it works is described in detail in the previous article)

1
2
3
4
5
6
7
8
9
CREATE VIEW tv AS
  SELECT rra.id as rra_id,
     rra_bundle.latest - INTERVAL '1 MILLISECOND' * rra_bundle.step_ms * rra_bundle.steps_per_row *
       MOD(rra_bundle.size + MOD(EXTRACT(EPOCH FROM rra_bundle.latest)::BIGINT*1000/(rra_bundle.step_ms * rra_bundle.steps_per_row),
       rra_bundle.size) - i, rra_bundle.size) AS t,
     dp[pos] AS r
  FROM rra AS rra
  JOIN rra_bundle AS rra_bundle ON rra_bundle.id = rra.rra_bundle_id
  JOIN ts AS ts ON ts.rra_bundle_id = rra_bundle.id;

And now let’s verify that it works:

1
2
3
4
5
6
7
8
=> select * from tv where rra_id = 1 order by t;
 rra_id |           t            | r
 --------+------------------------+----
       1 | 2008-03-06 00:00:00-00 | 64
       1 | 2008-03-07 00:00:00-00 | 67
       1 | 2008-03-08 00:00:00-00 | 70
 ...

This approach makes writes blazingly fast though it does have its drawbacks. For example there is no way to read a single series - even though the view selects a single array element, under the hood Postgres reads the whole row. Given that time series is more write intensive and rarely read, this may not be a bad compromise.

Simple Tgres Part II - a High Rate Counter

| Comments

Continuing on the the previous post on simple use of Tgres components, let’s try to count something that goes by really fast.

This time let’s start out with creating a memory-based SerDe. This means that all our data is in memory and there is no database backing our series.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "fmt"
    "net/http"
    "time"

    "github.com/tgres/tgres/dsl"
    h "github.com/tgres/tgres/http"
    "github.com/tgres/tgres/receiver"
    "github.com/tgres/tgres/rrd"
    "github.com/tgres/tgres/serde"
)

func main() {

    step := 1 * time.Second // 1 second resolution
    span := 600 * step      // spanning 10 minutes

    // In-memory SerDe
    ms := serde.NewMemSerDe()

    // Create a receiver of our data points backed by the above
    // memory SerDe
    rcvr := receiver.New(ms, &receiver.SimpleDSFinder{&rrd.DSSpec{
        Step: step,
        RRAs: []rrd.RRASpec{
            rrd.RRASpec{Function: rrd.WMEAN,
                Step: step,
                Span: span,
            },
        }}})
    rcvr.Start()

Now let’s create a goroutine which creates data points as fast as it can, the difference from the previous blog post is that we are using QueueGauge(), which is a paced metric, meaning that it flushes to the time series only periodically (once per second by default) so as to not overwhelm the I/O and or network (even though in this case it doesn’t really matter since we’re using a memory-based SerDe anyway).

1
2
3
4
5
6
7
8
9
10
    start := time.Now()
    end := start.Add(span)

    go func() {
        n := 0
        for t := time.Now(); t.Before(end); t = time.Now() {
            rcvr.QueueGauge(serde.Ident{"name":"foo.bar"}, float64(n)/(t.Sub(start)).Seconds())
            n++
        }
    }()

And finally, as before, we need to hook up a couple of http handlers:

1
2
3
4
5
6
7
8
9
10
    db := dsl.NewNamedDSFetcher(ms.Fetcher())

    http.HandleFunc("/metrics/find", h.GraphiteMetricsFindHandler(db))
    http.HandleFunc("/render", h.GraphiteRenderHandler(db))

    listenSpec := ":8088"
    fmt.Printf("Waiting for requests on %s\n", listenSpec)
    http.ListenAndServe(listenSpec, nil)

} // end of main()

Now if we run the above code with something like go run simpletgres.go, we’ll notice that unlike with the previous example, the web server starts right away, and the data points are being written while the server is running. If we aim Grafana at it, we should be able to see the chart update in real time.

After a couple of minutes, mine looks like this:

So my macbook can crank these out at about 2.5 million per second.

In my experience instrumenting my apps with simple counters like this and having them available directly from the app without having to send them to a separate statsd server somewhere has been extremely useful in helping understand performance and other issues.

Why Is There No Formal Definition of Time Series?

| Comments

If you’re reading this, chances are you may have searched for definition of “Time Series”. And, like me, you were probably disappointed by what you’ve found.

The most popular “definition” I come across amongst our fellow programmer folk is that it’s “data points with timestamps”. Or something like that. And you can make charts from it. And that’s about it, alas.

The word time suggests that is has something to do with time. At first it seems reasonable, I bite. The word series is a little more peculiar. A mathematician would argue that a series is a sum of a sequence. Most people though think “series” and “sequence” are the same thing, and that’s fine. But it’s a clue that time series is not a scientific term, because it would have been called time sequence most likely.

Lets get back to the time aspect of it. Why do data points need timestamps? Or do they? Isn’t it the time interval between points that is most essential, rather than the absolute time? And if the data points are spaced equally (which conforms to the most common definiton of time series), then what purpose would any time-related information attached to a data point serve?

To understand this better, picture a time chart. Of anything - temperature, price of bitcoin over a week, whatever. Now think - does the absolute time of every point provide any useful information to you? Does the essential meaning of the chart change depending on whether it shows the price of bitcoin in the year 2016 or 2098 or 10923?

Doesn’t it seem like “time” in “time series” is a bit of a red herring?

Here is another example. Let’s say I decide to travel from San-Francisco to New York taking measurements of elevation above the sea level at every mile. I then plot that sequence on a chart where x-axis is distance traveled and y-axis is elevation. You would agree that this chart is not a “time series” by any stretch, right? But then if I renamed x-axis to “time traveled” (let’s assume I moved at constant speed), the chart wouldn’t change at all, but now it’s okay to call it “time series”?

So it’s no surprise that there is no formal definition of “time series”. In the end a “time series” is just a sequence. There are no timestamps required and there is nothing at all special regarding a dimension being time as opposed to any other unit, which is why there is no mathematical definition of “time series”. Time series is a colloquial term etymological origins of which are not known to me, but it’s not a thing from a scientific perspective, I’m afraid.

Next time you hear “time series” just substitute it with “sequence” and see how much sense that makes. For example a “time series database” is a “sequence database”, i.e. database optimized for sequences. Aren’t all relational databases optimized for sequences?

Something to think about over the holidays…

Edit: Someone brought up the subject of unevenly-spaced time series. All series are evenly spaced given proper resolution. An unevenly-spaced time series with timestamps accurate to 1 millisecond is a sparse evenly-spaced series with a 1 millisecond resolution.

Simple Time Series App With Tgres

| Comments

Did you know you can use Tgres components in your code without PostgreSQL, and in just a dozen lines of code instrument your program with a time series. This example shows a complete server emulating Graphite API which you can use with Grafana (or any other tool).

In this example we will be using three Tgres packages like so (in addition to a few standard ones, I’m skipping them here for brevity - complete source code gist):

1
2
3
4
5
import (
    "github.com/tgres/tgres/dsl"
    h "github.com/tgres/tgres/http"
    "github.com/tgres/tgres/rrd"
)

First we need a Data Source. This will create a Data Source containing one Round Robin Archive with a 10 second resolution spanning 1000 seconds.

1
2
3
4
5
6
7
8
9
step := 10 * time.Second
span := 100 * step

ds := rrd.NewDataSource(rrd.DSSpec{
    Step: 1 * time.Second,
    RRAs: []rrd.RRASpec{
        rrd.RRASpec{Step: step, Span: span},
    },
})

Let’s shove a bunch of data points into it. To make it look extra nice, we can make these points look like a sinusoid with this little function:

1
2
3
4
func sinTime(t time.Time, span time.Duration) float64 {
    x := 2 * math.Pi / span.Seconds() * float64(t.Unix()%(span.Nanoseconds()/1e9))
    return math.Sin(x)
}

And now for the actual population of the series:

1
2
3
4
5
6
start := time.Now().Add(-span)

for i := 0; i < int(span/step); i++ {
    t := start.Add(time.Duration(i) * step)
    ds.ProcessDataPoint(sinTime(t, span), t)
}

We will also need to create a NamedDSFetcher, the structure which knows how to search dot-separated series names a la Graphite.

1
db := dsl.NewNamedDSFetcherMap(map[string]rrd.DataSourcer{"foo.bar": ds})

Finally, we need to create two http handlers which will mimic a Graphite server and start listening for requests:

1
2
3
4
5
6
http.HandleFunc("/metrics/find", h.GraphiteMetricsFindHandler(db))
http.HandleFunc("/render", h.GraphiteRenderHandler(db))

listenSpec := ":8088"
fmt.Printf("Waiting for requests on %s\n", listenSpec)
http.ListenAndServe(listenSpec, nil)

Now if you point Grafana at it, it will happily think it’s Graphite and should show you a chart like this:

Note that you can use all kinds of Graphite functions at this point - it all “just works”.

Enjoy!

Storing Time Series in PostgreSQL (Continued)

| Comments

Edit: there is now a part iii in this series of articles.

I have previously written how time series can be stored in PostgreSQL efficiently using arrays.

As a continuation of that article, I shall attempt to describe in detail the inner workings of an SQL view that Tgres uses to make an array of numbers appear as a regular table (link to code).

In short, I will explain how incomprehensible data like this:

1
2
3
4
5
6
7
=> select * from ts;
 rra_id | n |           dp
--------+---+------------------------
      1 | 0 | {64,67,70,71,72,69,67}
      1 | 1 | {65,60,58,59,62,68,70}
      1 | 2 | {71,72,77,70,71,73,75}
      1 | 3 | {79,82,90,69,75,80,81}

… can be transformed in an SQL view to appear as so:

1
2
3
4
5
6
7
8
=> select * from tv order by t;
 rra_id |           t            | r
--------+------------------------+----
      1 | 2008-03-06 00:00:00+00 | 64
      1 | 2008-03-07 00:00:00+00 | 67
      1 | 2008-03-08 00:00:00+00 | 70
      1 | 2008-03-09 00:00:00+00 | 71
...

This write up will make a lot more sense if you read the previous post first. To recap, Tgres stores series in an array broken up over multiple table rows each containing an array representing a segment of the series. The series array is a round-robin structure, which means that it occupies a fixed amount of space and we do not need to worry about expiring data points: the round-robin nature of the array takes care of it by overwriting old data with new on assignment.

An additional benefit of such a fixed interval round-robin structure is that we do not need to store timestamps for every data point. If we know the timestamp of the latest entry along with the series step and size, we can extrapolate the timestamp of any point in the series.

Tgres creates an SQL view which takes care of this extrapolation and makes this data easy to query. Tgres actually uses this view as its only source of time series information when reading from the database thus delegating all the processing to the database server, where it is close to the data and most efficient.

If you would like to follow along on the Postgres command line, feel free to create and populate the tables with the following SQL, which is nearly identical to the schema used by Tgres:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE rra (
  id SERIAL NOT NULL PRIMARY KEY,
  step_s INT NOT NULL,
  steps_per_row INT NOT NULL,
  size INT NOT NULL,
  width INT NOT NULL,
  latest TIMESTAMPTZ DEFAULT NULL);

CREATE TABLE ts (
  rra_id INT NOT NULL,
  n INT NOT NULL,
  dp DOUBLE PRECISION[] NOT NULL DEFAULT '{}');

INSERT INTO rra VALUES (1, 60, 1440, 28, 7, '2008-04-02 00:00:00-00');

INSERT INTO ts VALUES (1, 0, '{64,67,70,71,72,69,67}');
INSERT INTO ts VALUES (1, 1, '{65,60,58,59,62,68,70}');
INSERT INTO ts VALUES (1, 2, '{71,72,77,70,71,73,75}');
INSERT INTO ts VALUES (1, 3, '{79,82,90,69,75,80,81}');

And finally create the view:

1
2
3
4
5
6
7
8
CREATE VIEW tv AS
  SELECT rra.id rra_id,
         latest - INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row *
           MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
           - (generate_subscripts(dp,1) + n * width), rra.size) AS t,
         UNNEST(dp) AS r
    FROM rra
   INNER JOIN ts ts ON ts.rra_id = rra.id;

Now give it a whirl with a SELECT * FROM tv ORDER BY t. Impressive? So how does it work?

First let’s go over the columns of the rra table.

  • step_s: the minimal unit of time expressed in seconds (60 or 1 minute in the above data).
  • steps_per_row: the number of the step_s intervals in one slot of our time series. In our example it is 1440, which is the number of minutes in a day, thus making our time series resolution one day.
  • size: number of slots in the series. Ours is 28, i.e. four weeks.
  • width: size of a segment which will be stored in a single row, which in our case is 7 (one week).
  • latest: the timestamp of the last data point in the series.

Next, let’s look at the UNNEST keyword in the SQL of the view. UNNEST takes an array and turns it into row, e.g.:

1
2
3
4
5
6
=> SELECT UNNEST(dp) AS r FROM ts;
 r
----
 64
 67
...

UNNEST works in conjunction with the generate_subscripts PostgreSQL function which generates index values:

1
2
3
4
5
6
=> SELECT generate_subscripts(dp,1) AS i, UNNEST(dp) AS r FROM ts;
 i | r
---+----
 1 | 64
 2 | 67
...

Let us now zoom in on the very long expression in the view, here it is again:

1
2
3
latest - INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row *
  MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
  - (generate_subscripts(dp,1) + n * width), rra.size) AS t

A perhaps not immediately apparent trick to how all this works is that all our series are aligned on the beginning of the epoch. This means that at UNIX time 0, any series’ slot index is 0. From then on it increments sequentially until the series size is reached, at which point it wraps-around to 0 (thus “round-robin”). Armed with this information we can calculate the index for any point in time.

The formula for calculating the index i for a given time t is:

1
i = t/step % size.

We need time to be expressed as a UNIX time which is done with EXTRACT(EPOCH FROM rra.latest)::BIGINT. Now you should recognize the above formula in the more verbose expression

1
MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size)

where rra.step_s * rra.steps_per_row is the size of our series in seconds.

Next, we need to compute the distance between the current slot and the last slot (for which we know the timestamp). I.e. if the last slot is i and the slot we need the timestamp for is j, the distance between them is i-j, but with a caveat: it is possible for j to be greater than i if the series wraps around, in which case the distance is the sum of the distance from j to the end of the series and the distance from the beginning to i. If you ponder over it with a pencil and paper long enough, you will arrive at the following formula for distance between two slots i and j in a wrap-around array:

1
distance = (size + j - i) % size

Another thing to consider is that we’re splitting our series across multiple rows, thus the actual index of any point is the subscript into the current segment plus the index of the segment itself (the n column) multiplied by the wdith of the segment: generate_subscripts(dp,1) + n * width.

Which pieced together in SQL now looks like this:

1
2
MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
  - (generate_subscripts(dp,1) + n * width), rra.size)

Astute readers should notice an unexplained + 1. This is because PostgreSQL arrays are 1-based.

Now we need to convert the distance expressed in array slots into a time interval, which we do by multiplying it by INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row.

And finally, we need to subtract the above time interval from the latest stamp which yields (ta-da!) the timestamp of the current slot:

1
2
3
latest - INTERVAL '1 SECOND' * rra.step_s * rra.steps_per_row *
  MOD(rra.size + MOD(EXTRACT(EPOCH FROM rra.latest)::BIGINT/(rra.step_s * rra.steps_per_row), size) + 1
  - (generate_subscripts(dp,1) + n * width), rra.size) AS t

That’s it! And even though this may look complicated, from the computational view point it is very efficient, and PostgreSQL can handle it easily.

As an exercise, try setting latest to various timestamps and observe how it affects the output of the view and see if you can explain how and why it happens.

Parsing Table Names From SQL

| Comments

Sometimes it is useful to extract table names from an SQL statement, for example if you are trying to figure out dependencies for your Hive or BigQuery (or whatever) tables.

It is actually a lot simpler than it seems and you don’t need to write your own SQL parser or find one out there. In SQL table names always follow the FROM and JOIN keywords. So all you have to do is split the statemement into tokens, and scan the list for any mention of FROM or JOIN and grab the next token.

Here is a very simplistic Python function that does this using regular expressions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def tables_in_query(sql_str):

    # remove the /* */ comments
    q = re.sub(r"/\*[^*]*\*+(?:[^*/][^*]*\*+)*/", "", sql_str)

    # remove whole line -- and # comments
    lines = [line for line in q.splitlines() if not re.match("^\s*(--|#)", line)]

    # remove trailing -- and # comments
    q = " ".join([re.split("--|#", line)[0] for line in lines])

    # split on blanks, parens and semicolons
    tokens = re.split(r"[\s)(;]+", q)

    # scan the tokens. if we see a FROM or JOIN, we set the get_next
    # flag, and grab the next one (unless it's SELECT).

    result = set()
    get_next = False
    for tok in tokens:
        if get_next:
            if tok.lower() not in ["", "select"]:
                result.add(tok)
            get_next = False
        get_next = tok.lower() in ["from", "join"]

    return result

This is obviously not perfect, for example in BigQuery there is a possibility that what follows SELECT is a UDF name, but I’ll leave working around that as an exercise for the reader.

Load Testing Tgres

| Comments

Edit: There is an update to this story.

So I finally got around to some load testing of Tgres. Load testing is mysterious, it never goes the way you think it would, and what you learn is completely unexpcted.

Given that I presently don’t have any spare big iron at my disposal and my “servers” are my macbook and an old thinkpad, all I really was after is making sure that Tgres is “good enough” whatever that means. And I think it is.

I was hoping to gather some concrete numbers and may be even make a chart or two, but in the end it all turned out to be so tedious and time consuming, running the tests with various setting for hours on, that I just gave up for now - after all, “premature optimization is the root of all evil”.

I also wanted to see how it stacks up against Graphite carbon-cache.py. As in, is it on par, or much better or much worse. My expectation was that Graphite could outperform it, because what it does is so much simpler (and I was right). First thing I tried to do is overwhelm Graphite. I never succeeded in that - I probably could have tried harder, but I quickly learned that I don’t know what symptoms I’m looking for. I wronte a Go program that blasted UDP data points at 10K/sec across 10K different series, and taking it to over 20K/sec saturated my network before Graphite showed any signs of deterioration. There was also no reliable way for me to audit the data points - may be some of them got lost, but at 600K+ per minute, I don’t know of any practical way of doing it. Not without a lot of work, at least.

With Tgres things were much easier. The weakest link is, not surpisingly, PostgreSQL. What I learned was that there are two kinds of deterioration when it comes to PostgreSQL though. The first one is outright, and that one manifests in database requests getting progressively slower until Tgres gets stuck with all its channels full.

You can make PostgreSQL very significantly faster with a few simple tricks. For example the following settings can make it much faster:

1
2
synchronous_commit = off
commit_delay = 100000

This post isn’t about PostgreSQL, and so I’m not going to get into the details of what this does, there is plenty of documentation and blog posts on the subject. If you plan on hosting a busy Tgres set up, you should probably have the above settings.

The second way PostgreSQL deteriorates is not immediately apparent - it is the infamous table bloat. Getting autovacuum to keep up with the ts table (which stores all the time series) is tricky, and once you’ve ran out of options to tweak, this is probably it - the maximum load the database can handle, even if it may seem relatively calm.

Autovacuum has a lot of knobs, but ultimately they all exist to take advantage of the variability of load in a database, i.e. you can let it get behind during the day and catch up at night when the database is not as busy. It doesn’t really work with time series, which are not variable by nature - if you’re receiving 5 thousand data points per second at noon, you can expect the same rate at 4am. I think the setting that worked best for me were:

1
2
3
4
5
autovacuum_max_workers = 10
autovacuum_naptime = 1s
autovacuum_vacuum_threshold = 2000
autovacuum_vacuum_scale_factor = 0.0
autovacuum_vacuum_cost_delay = 0 # disable cost based

To the best of my undestanding the above setting disables cost-based autovacuum (meaning it doesn’t pause periodically to yield resources to the normal db tasks), makes autovacuum kick in after 2K updates (which happens in no time), and sleeps 1s in between runs, which means it’s running pretty much continuosly.

I was able to sustain a load of ~6K datapoints per second across 6K series - anything higher caused my “database server” (which is a 2010 i7 Thinkpad) autovacuum to get behind.

I also did some testing of how TOAST affects performance. There is no setting for turning TOAST on or off, but it can easily be done in Tgres by changing the number of data points per row. The default is 768 which is about 75% of a page. If you for example double it, then each row becomes larger than a page and TOAST kicks in. TOAST is compressed, which is an advantage, but it is a separate table, which is a disadvantage. In the end it seemed like the database detirorated quicker with TOAST, but it was rather inconclusive.

In the end the key factor, or the weakest link, was the rate of queries per second. I now added a special rate limiting setting feature to Tgres (max-flushes-per-second) which trumps all other settings and will keep your database happy at the expense of Tgres possibly caching a little more points in memory than expected.

I will probably get back to some more load testing in a while, but for now this is it.

Golang Receiver vs Function Argument

| Comments

What is the difference between a Go receiver (as in “method receiver”) and a function argument? Consider these two bits of code:

1
2
3
func (d *duck) quack() { // receiver
     // do something
}

versus

1
2
3
func quack(d *duck) { // funciton argument
    // do something
}

The “do something” part above would work exactly the same regardless of how you declare the function. Which begs the question, which should you use?

In the object-oriented world we were used to objects doing things, and in that context d.quack() may seem more intuitive or familiar than quack(d) because it “reads better”. After all, one could argue that the former is a duck quacking, but the latter reads like you’re quacking a duck, and what does that even mean? I have learned that you should not think this way in the Go universe, and here is why.

First, what is the essential difference? It is that at the time of the call, the receiver is an interface and the function to be called is determined dynamically. If you are not using interfaces, then this doesn’t matter whatsoever and the only benefit you are getting from using a method is syntactic sweetness.

But what if you need to write a test where you want to stub out quack(). If your code looks like this, then it is not possible, because methods are attached to their types inflexibly, you cannot change them, and there is no such thing as a “method variable”:

1
2
3
4
5
6
7
8
9
10
type duck struct{}

func (d *duck) quack() {
     // do something
}

// the function we are testing:
func testme(d *duck) {
    d.quack() // cannot be stubbed
}

However, if you used a function argument, it would be easy:

1
2
3
4
5
6
7
8
9
10
type duck struct{}

var quack = func(d *duck) {
     // do something
}

// the function we are testing:
func foo(d *duck) {
    quack(d)
}

Now you can assign another function to quack at test time, e.g. quack = func(d *duck) { // do something else } and all is well.

Alternatively, you can use an interface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type quacker interface {
    quack()
}

type duck struct{}

var func (d *duck) quack() { // satisfies quacker
     // do something
}

// the function we are testing:
func foo(d quacker) {
    d.quack()
}

Here, if we need to test foo() we can provide a different quacker.

Bottom line is that it only makes sense to use a receiver if this function is part of an interface implementation, OR if you never ever need to augment (stub) that function for testing or some other reason. As a practical matter, it seems like (contrary to how it’s done in the OO world) it is better to always start out with quack(d) rather than d.quack().