1 module statsd;
2 
3 import std.random;
4 import std.socket;
5 
6 /**
7  * The various types of metrics supported by StatsD.
8  */
9 static immutable enum Types {
10     Counter = "c",
11     Gauge = "g",
12     Timing = "ms",
13     Set = "s"
14 }
15 
16 /**
17  * A simple client for the [StatsD](https://github.com/etsy/statsd) protocol.
18  */
19 struct StatsD {
20 
21     /**
22      * NOTE: Assumes `socket` is in connected state, and that it
23      * remain so until this object is destroyed. If the socket is or
24      * becomes disconnected, metrics will be dropped.
25      *
26      * Setting SOCK_NONBLOCK is recommended to prevent blocking
27      * metrics-producing threads, and instead dropping metrics.
28      */
29     this(socket_t socket, string prefix = "", ref Random rng = rndGen()) nothrow {
30         this.socket = socket;
31         this.prefix = prefix.idup;
32         this.rng = rng;
33 
34         seed();
35     }
36 
37     @disable this();
38 
39     unittest {
40         import std.conv : emplace;
41 
42         StatsD stats = void;
43         static assert(!__traits(compiles, emplace(&stats)));
44     }
45 
46     /**
47      * Equivalent to `count(key, 1, frequency)`.
48      */
49     void incr(scope immutable string key, float frequency = 1.0) nothrow {
50         count(key, 1, frequency);
51     }
52 
53     unittest {
54         import std.string;
55 
56         auto pair = socketPair();
57         scope(exit) foreach (s; pair) s.close();
58 
59         pair[1].blocking(false);
60 
61         auto stats = new StatsD(pair[0].handle, "myPrefix");
62 
63         auto buf = new ubyte[256];
64 
65         stats.incr("incr");
66 
67         pair[1].receive(buf);
68         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c");
69 
70         stats.incr("incr", float.max);
71         buf[] = 0;
72         pair[1].receive(buf);
73         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c");
74 
75         stats.seed(42);
76         stats.incr("incr", 0.5);
77 
78         buf[] = 0;
79         pair[1].receive(buf);
80         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c|@0.50");
81 
82         stats.incr("incr", 0.5);
83 
84         buf[] = 0;
85         pair[1].receive(buf);
86         assert(fromStringz(cast(char*)buf.ptr) == "");
87 
88         stats.incr("incr", 0.0);
89 
90         pair[1].receive(buf);
91         assert(fromStringz(cast(char*)buf.ptr) == "");
92 
93         stats.incr("incr", float.nan);
94 
95         pair[1].receive(buf);
96         assert(fromStringz(cast(char*)buf.ptr) == "");
97 
98         stats.incr("incr", float.infinity);
99 
100         pair[1].receive(buf);
101         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c");
102 
103         stats.incr("incr", -float.infinity);
104 
105         buf[] = 0;
106         pair[1].receive(buf);
107         assert(fromStringz(cast(char*)buf.ptr) == "");
108     }
109 
110     /**
111      * Equivalent to `count(key, -1, frequency)`.
112      */
113     void decr(scope immutable string key, float frequency = 1.0) nothrow {
114         count(key, -1, frequency);
115     }
116 
117     unittest {
118         import std.string;
119 
120         auto pair = socketPair();
121         scope(exit) foreach (s; pair) s.close();
122 
123         pair[1].blocking(false);
124 
125         auto stats = new StatsD(pair[0].handle, "myPrefix");
126 
127         auto buf = new ubyte[256];
128 
129         stats.decr("decr");
130 
131         pair[1].receive(buf);
132         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixdecr:-1|c");
133 
134         stats.seed(42);
135         stats.decr("decr", 0.5);
136 
137         buf[] = 0;
138         pair[1].receive(buf);
139         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixdecr:-1|c|@0.50");
140 
141         stats.decr("decr", 0.5);
142 
143         buf[] = 0;
144 
145         pair[1].receive(buf);
146         assert(fromStringz(cast(char*)buf.ptr) == "");
147 
148         stats.incr("decr", 0.0);
149         pair[1].receive(buf);
150         assert(fromStringz(cast(char*)buf.ptr) == "");
151 
152         stats.decr("decr", float.nan);
153 
154         pair[1].receive(buf);
155         assert(fromStringz(cast(char*)buf.ptr) == "");
156 
157         stats.decr("decr", float.infinity);
158 
159         pair[1].receive(buf);
160         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixdecr:-1|c");
161 
162         stats.decr("decr", -float.infinity);
163 
164         buf[] = 0;
165         pair[1].receive(buf);
166         assert(fromStringz(cast(char*)buf.ptr) == "");
167     }
168 
169     /**
170      * Emits a simple counter metric. At each flush, a StatsD daemon
171      * will send the current count to an APM, and reset the count to
172      * 0.
173      */
174     void count(scope immutable string key, int delta, float frequency = 1.0) nothrow {
175         send(key, delta, Types.Counter, frequency);
176     }
177 
178     unittest {
179         import std.string;
180 
181         auto pair = socketPair();
182         scope(exit) foreach (s; pair) s.close();
183 
184         pair[1].blocking(false);
185 
186         auto stats = new StatsD(pair[0].handle, "myPrefix");
187 
188         auto buf = new ubyte[256];
189 
190         stats.count("count", 42);
191 
192         pair[1].receive(buf);
193         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixcount:42|c");
194 
195         stats.seed(42);
196         stats.count("count", 42, 0.5);
197 
198         buf[] = 0;
199         pair[1].receive(buf);
200         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixcount:42|c|@0.50");
201 
202         stats.count("count", 42, 0.5);
203 
204         buf[] = 0;
205         pair[1].receive(buf);
206         assert(fromStringz(cast(char*)buf.ptr) == "");
207 
208         stats.count("count", 42, 0.0);
209 
210         pair[1].receive(buf);
211         assert(fromStringz(cast(char*)buf.ptr) == "");
212 
213         stats.count("count", 42, float.nan);
214 
215         pair[1].receive(buf);
216         assert(fromStringz(cast(char*)buf.ptr) == "");
217 
218         stats.count("count", 42, float.infinity);
219 
220         pair[1].receive(buf);
221         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixcount:42|c");
222 
223         stats.count("count", 42, -float.infinity);
224 
225         buf[] = 0;
226         pair[1].receive(buf);
227         assert(fromStringz(cast(char*)buf.ptr) == "");
228     }
229 
230     /**
231      * Emits a gauge which maintains its value until it is next set.
232      *
233      * This implementation does not emit signed gauges (i.e. sending a
234      * delta such as -10 or +4).
235      */
236     void gauge(scope immutable string key, uint value, float frequency = 1.0) nothrow {
237         send(key, value, Types.Gauge, frequency);
238     }
239 
240     unittest {
241         import std.string;
242 
243         auto pair = socketPair();
244         scope(exit) foreach (s; pair) s.close();
245 
246         pair[1].blocking(false);
247 
248         auto stats = new StatsD(pair[0].handle, "myPrefix");
249 
250         auto buf = new ubyte[256];
251 
252         stats.gauge("gauge", 128);
253 
254         pair[1].receive(buf);
255         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixgauge:128|g");
256 
257         stats.seed(42);
258         stats.gauge("gauge", 128, 0.5);
259 
260         buf[] = 0;
261         pair[1].receive(buf);
262         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixgauge:128|g|@0.50");
263 
264         stats.gauge("gauge", 128, 0.5);
265 
266         buf[] = 0;
267         pair[1].receive(buf);
268         assert(fromStringz(cast(char*)buf.ptr) == "");
269 
270         stats.gauge("gauge", 128, 0.0);
271 
272         pair[1].receive(buf);
273         assert(fromStringz(cast(char*)buf.ptr) == "");
274 
275         stats.gauge("gauge", 128, float.nan);
276 
277         pair[1].receive(buf);
278         assert(fromStringz(cast(char*)buf.ptr) == "");
279 
280         stats.gauge("gauge", 128, float.infinity);
281 
282         pair[1].receive(buf);
283         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixgauge:128|g");
284 
285         stats.gauge("gauge", 128, -float.infinity);
286 
287         buf[] = 0;
288         pair[1].receive(buf);
289         assert(fromStringz(cast(char*)buf.ptr) == "");
290     }
291 
292     /**
293      * Emits a timing metric in milliseconds. A StatsD daemon will
294      * produce a histogram of these timings with a rollup duration
295      * equal to its flush interval.
296      *
297      * It will also maintain a counter with a sample rate equal to the
298      * given frequency.
299      */
300     void timing(scope immutable string key, uint ms, float frequency = 1.0) nothrow {
301         send(key, ms, Types.Timing, frequency);
302     }
303 
304     unittest {
305         import std.string;
306 
307         auto pair = socketPair();
308         scope(exit) foreach (s; pair) s.close();
309 
310         pair[1].blocking(false);
311 
312         auto stats = new StatsD(pair[0].handle, "myPrefix");
313 
314         auto buf = new ubyte[256];
315 
316         stats.timing("timing", 2);
317 
318         pair[1].receive(buf);
319         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixtiming:2|ms");
320 
321         stats.seed(42);
322         stats.timing("timing", 2, 0.5);
323 
324         buf[] = 0;
325         pair[1].receive(buf);
326         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixtiming:2|ms|@0.50");
327 
328         stats.timing("timing", 2, 0.5);
329 
330         buf[] = 0;
331         pair[1].receive(buf);
332         assert(fromStringz(cast(char*)buf.ptr) == "");
333 
334         stats.timing("timing", 2, 0.0);
335 
336         pair[1].receive(buf);
337         assert(fromStringz(cast(char*)buf.ptr) == "");
338 
339         stats.timing("timing", 2, float.nan);
340 
341         pair[1].receive(buf);
342         assert(fromStringz(cast(char*)buf.ptr) == "");
343 
344         stats.timing("timing", 2, float.infinity);
345 
346         pair[1].receive(buf);
347         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixtiming:2|ms");
348 
349         stats.timing("timing", 2, -float.infinity);
350 
351         buf[] = 0;
352         pair[1].receive(buf);
353         assert(fromStringz(cast(char*)buf.ptr) == "");
354     }
355 
356     /**
357      * Emits a set metric. A StatsD daemon will count unique
358      * occurrences of each value between flushes.
359      */
360     void set(scope immutable string key, uint value) nothrow {
361         send(key, value, Types.Set, 1.0);
362     }
363 
364     unittest {
365         import std.string;
366 
367         auto pair = socketPair();
368         scope(exit) foreach (s; pair) s.close();
369 
370         pair[1].blocking(false);
371 
372         auto stats = new StatsD(pair[0].handle, "myPrefix");
373 
374         auto buf = new ubyte[256];
375 
376         stats.set("set", uint.max);
377 
378         pair[1].receive(buf);
379         assert(fromStringz(cast(char*)buf.ptr) == "myPrefixset:4294967295|s");
380     }
381 
382     /**
383      * (Re-)seeds the random number generator used to sample metrics.
384      */
385     void seed(uint seed = unpredictableSeed()) pure nothrow @nogc @safe {
386         rng.seed(seed);
387     }
388 
389 private:
390     immutable socket_t socket;
391     immutable string prefix;
392 
393     Random rng;
394 
395     void send(scope immutable string key, long value, Types type, float frequency) nothrow {
396         import std.algorithm.comparison : clamp;
397         import std.exception : assumeWontThrow; // Pinky promise.
398 
399         version (DigitalMars) {
400             import std.math : isClose;
401             import std.math.traits : isNaN;
402         } else version (LDC) {
403             import std.math : approxEqual, isNaN;
404             alias isClose = approxEqual;
405         }
406 
407         import std.outbuffer;
408 
409         const float freq = clamp(frequency, 0.0, 1.0);
410 
411         // Drop everything.
412         if (isClose(0.0, freq) || isNaN(freq)) {
413             return;
414         }
415 
416         // Sample if frequency is materially below 1.0.
417         const bool closeToOne = isClose(1.0, freq);
418         if (!closeToOne && freq < uniform01(rng)) {
419             return;
420         }
421 
422         scope OutBuffer buf = new OutBuffer();
423         buf.reserve(256);
424 
425         if (closeToOne) {
426             assumeWontThrow(buf.writef("%s%s:%d|%r", prefix, key, value, type));
427         } else {
428             assumeWontThrow(buf.writef("%s%s:%d|%r|@%.2f", prefix, key, value, type, frequency));
429         }
430 
431         scope ubyte[] bytes = buf.toBytes();
432 
433         version (Posix) {
434             import core.sys.posix.unistd : write;
435 
436             write(socket, bytes.ptr, bytes.length);
437         }
438     }
439 }