1 module statsd; 2 3 import std.random; 4 import std.socket; 5 6 /** 7 * The various types of metrics supported by StatsD. 8 */ 9 static immutable enum Types { 10 Counter = "c", 11 Gauge = "g", 12 Timing = "ms", 13 Set = "s" 14 } 15 16 /** 17 * A simple client for the [StatsD](https://github.com/etsy/statsd) protocol. 18 */ 19 struct StatsD { 20 21 /** 22 * NOTE: Assumes `socket` is in connected state, and that it 23 * remain so until this object is destroyed. If the socket is or 24 * becomes disconnected, metrics will be dropped. 25 * 26 * Setting SOCK_NONBLOCK is recommended to prevent blocking 27 * metrics-producing threads, and instead dropping metrics. 28 */ 29 this(socket_t socket, string prefix = "", ref Random rng = rndGen()) nothrow { 30 this.socket = socket; 31 this.prefix = prefix.idup; 32 this.rng = rng; 33 34 seed(); 35 } 36 37 @disable this(); 38 39 unittest { 40 import std.conv : emplace; 41 42 StatsD stats = void; 43 static assert(!__traits(compiles, emplace(&stats))); 44 } 45 46 /** 47 * Equivalent to `count(key, 1, frequency)`. 48 */ 49 void incr(scope immutable string key, float frequency = 1.0) nothrow { 50 count(key, 1, frequency); 51 } 52 53 unittest { 54 import std.string; 55 56 auto pair = socketPair(); 57 scope(exit) foreach (s; pair) s.close(); 58 59 pair[1].blocking(false); 60 61 auto stats = new StatsD(pair[0].handle, "myPrefix"); 62 63 auto buf = new ubyte[256]; 64 65 stats.incr("incr"); 66 67 pair[1].receive(buf); 68 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c"); 69 70 stats.incr("incr", float.max); 71 buf[] = 0; 72 pair[1].receive(buf); 73 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c"); 74 75 stats.seed(42); 76 stats.incr("incr", 0.5); 77 78 buf[] = 0; 79 pair[1].receive(buf); 80 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c|@0.50"); 81 82 stats.incr("incr", 0.5); 83 84 buf[] = 0; 85 pair[1].receive(buf); 86 assert(fromStringz(cast(char*)buf.ptr) == ""); 87 88 stats.incr("incr", 0.0); 89 90 pair[1].receive(buf); 91 assert(fromStringz(cast(char*)buf.ptr) == ""); 92 93 stats.incr("incr", float.nan); 94 95 pair[1].receive(buf); 96 assert(fromStringz(cast(char*)buf.ptr) == ""); 97 98 stats.incr("incr", float.infinity); 99 100 pair[1].receive(buf); 101 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixincr:1|c"); 102 103 stats.incr("incr", -float.infinity); 104 105 buf[] = 0; 106 pair[1].receive(buf); 107 assert(fromStringz(cast(char*)buf.ptr) == ""); 108 } 109 110 /** 111 * Equivalent to `count(key, -1, frequency)`. 112 */ 113 void decr(scope immutable string key, float frequency = 1.0) nothrow { 114 count(key, -1, frequency); 115 } 116 117 unittest { 118 import std.string; 119 120 auto pair = socketPair(); 121 scope(exit) foreach (s; pair) s.close(); 122 123 pair[1].blocking(false); 124 125 auto stats = new StatsD(pair[0].handle, "myPrefix"); 126 127 auto buf = new ubyte[256]; 128 129 stats.decr("decr"); 130 131 pair[1].receive(buf); 132 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixdecr:-1|c"); 133 134 stats.seed(42); 135 stats.decr("decr", 0.5); 136 137 buf[] = 0; 138 pair[1].receive(buf); 139 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixdecr:-1|c|@0.50"); 140 141 stats.decr("decr", 0.5); 142 143 buf[] = 0; 144 145 pair[1].receive(buf); 146 assert(fromStringz(cast(char*)buf.ptr) == ""); 147 148 stats.incr("decr", 0.0); 149 pair[1].receive(buf); 150 assert(fromStringz(cast(char*)buf.ptr) == ""); 151 152 stats.decr("decr", float.nan); 153 154 pair[1].receive(buf); 155 assert(fromStringz(cast(char*)buf.ptr) == ""); 156 157 stats.decr("decr", float.infinity); 158 159 pair[1].receive(buf); 160 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixdecr:-1|c"); 161 162 stats.decr("decr", -float.infinity); 163 164 buf[] = 0; 165 pair[1].receive(buf); 166 assert(fromStringz(cast(char*)buf.ptr) == ""); 167 } 168 169 /** 170 * Emits a simple counter metric. At each flush, a StatsD daemon 171 * will send the current count to an APM, and reset the count to 172 * 0. 173 */ 174 void count(scope immutable string key, int delta, float frequency = 1.0) nothrow { 175 send(key, delta, Types.Counter, frequency); 176 } 177 178 unittest { 179 import std.string; 180 181 auto pair = socketPair(); 182 scope(exit) foreach (s; pair) s.close(); 183 184 pair[1].blocking(false); 185 186 auto stats = new StatsD(pair[0].handle, "myPrefix"); 187 188 auto buf = new ubyte[256]; 189 190 stats.count("count", 42); 191 192 pair[1].receive(buf); 193 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixcount:42|c"); 194 195 stats.seed(42); 196 stats.count("count", 42, 0.5); 197 198 buf[] = 0; 199 pair[1].receive(buf); 200 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixcount:42|c|@0.50"); 201 202 stats.count("count", 42, 0.5); 203 204 buf[] = 0; 205 pair[1].receive(buf); 206 assert(fromStringz(cast(char*)buf.ptr) == ""); 207 208 stats.count("count", 42, 0.0); 209 210 pair[1].receive(buf); 211 assert(fromStringz(cast(char*)buf.ptr) == ""); 212 213 stats.count("count", 42, float.nan); 214 215 pair[1].receive(buf); 216 assert(fromStringz(cast(char*)buf.ptr) == ""); 217 218 stats.count("count", 42, float.infinity); 219 220 pair[1].receive(buf); 221 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixcount:42|c"); 222 223 stats.count("count", 42, -float.infinity); 224 225 buf[] = 0; 226 pair[1].receive(buf); 227 assert(fromStringz(cast(char*)buf.ptr) == ""); 228 } 229 230 /** 231 * Emits a gauge which maintains its value until it is next set. 232 * 233 * This implementation does not emit signed gauges (i.e. sending a 234 * delta such as -10 or +4). 235 */ 236 void gauge(scope immutable string key, uint value, float frequency = 1.0) nothrow { 237 send(key, value, Types.Gauge, frequency); 238 } 239 240 unittest { 241 import std.string; 242 243 auto pair = socketPair(); 244 scope(exit) foreach (s; pair) s.close(); 245 246 pair[1].blocking(false); 247 248 auto stats = new StatsD(pair[0].handle, "myPrefix"); 249 250 auto buf = new ubyte[256]; 251 252 stats.gauge("gauge", 128); 253 254 pair[1].receive(buf); 255 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixgauge:128|g"); 256 257 stats.seed(42); 258 stats.gauge("gauge", 128, 0.5); 259 260 buf[] = 0; 261 pair[1].receive(buf); 262 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixgauge:128|g|@0.50"); 263 264 stats.gauge("gauge", 128, 0.5); 265 266 buf[] = 0; 267 pair[1].receive(buf); 268 assert(fromStringz(cast(char*)buf.ptr) == ""); 269 270 stats.gauge("gauge", 128, 0.0); 271 272 pair[1].receive(buf); 273 assert(fromStringz(cast(char*)buf.ptr) == ""); 274 275 stats.gauge("gauge", 128, float.nan); 276 277 pair[1].receive(buf); 278 assert(fromStringz(cast(char*)buf.ptr) == ""); 279 280 stats.gauge("gauge", 128, float.infinity); 281 282 pair[1].receive(buf); 283 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixgauge:128|g"); 284 285 stats.gauge("gauge", 128, -float.infinity); 286 287 buf[] = 0; 288 pair[1].receive(buf); 289 assert(fromStringz(cast(char*)buf.ptr) == ""); 290 } 291 292 /** 293 * Emits a timing metric in milliseconds. A StatsD daemon will 294 * produce a histogram of these timings with a rollup duration 295 * equal to its flush interval. 296 * 297 * It will also maintain a counter with a sample rate equal to the 298 * given frequency. 299 */ 300 void timing(scope immutable string key, uint ms, float frequency = 1.0) nothrow { 301 send(key, ms, Types.Timing, frequency); 302 } 303 304 unittest { 305 import std.string; 306 307 auto pair = socketPair(); 308 scope(exit) foreach (s; pair) s.close(); 309 310 pair[1].blocking(false); 311 312 auto stats = new StatsD(pair[0].handle, "myPrefix"); 313 314 auto buf = new ubyte[256]; 315 316 stats.timing("timing", 2); 317 318 pair[1].receive(buf); 319 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixtiming:2|ms"); 320 321 stats.seed(42); 322 stats.timing("timing", 2, 0.5); 323 324 buf[] = 0; 325 pair[1].receive(buf); 326 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixtiming:2|ms|@0.50"); 327 328 stats.timing("timing", 2, 0.5); 329 330 buf[] = 0; 331 pair[1].receive(buf); 332 assert(fromStringz(cast(char*)buf.ptr) == ""); 333 334 stats.timing("timing", 2, 0.0); 335 336 pair[1].receive(buf); 337 assert(fromStringz(cast(char*)buf.ptr) == ""); 338 339 stats.timing("timing", 2, float.nan); 340 341 pair[1].receive(buf); 342 assert(fromStringz(cast(char*)buf.ptr) == ""); 343 344 stats.timing("timing", 2, float.infinity); 345 346 pair[1].receive(buf); 347 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixtiming:2|ms"); 348 349 stats.timing("timing", 2, -float.infinity); 350 351 buf[] = 0; 352 pair[1].receive(buf); 353 assert(fromStringz(cast(char*)buf.ptr) == ""); 354 } 355 356 /** 357 * Emits a set metric. A StatsD daemon will count unique 358 * occurrences of each value between flushes. 359 */ 360 void set(scope immutable string key, uint value) nothrow { 361 send(key, value, Types.Set, 1.0); 362 } 363 364 unittest { 365 import std.string; 366 367 auto pair = socketPair(); 368 scope(exit) foreach (s; pair) s.close(); 369 370 pair[1].blocking(false); 371 372 auto stats = new StatsD(pair[0].handle, "myPrefix"); 373 374 auto buf = new ubyte[256]; 375 376 stats.set("set", uint.max); 377 378 pair[1].receive(buf); 379 assert(fromStringz(cast(char*)buf.ptr) == "myPrefixset:4294967295|s"); 380 } 381 382 /** 383 * (Re-)seeds the random number generator used to sample metrics. 384 */ 385 void seed(uint seed = unpredictableSeed()) pure nothrow @nogc @safe { 386 rng.seed(seed); 387 } 388 389 private: 390 immutable socket_t socket; 391 immutable string prefix; 392 393 Random rng; 394 395 void send(scope immutable string key, long value, Types type, float frequency) nothrow { 396 import std.algorithm.comparison : clamp; 397 import std.exception : assumeWontThrow; // Pinky promise. 398 399 version (DigitalMars) { 400 import std.math : isClose; 401 import std.math.traits : isNaN; 402 } else version (LDC) { 403 import std.math : approxEqual, isNaN; 404 alias isClose = approxEqual; 405 } 406 407 import std.outbuffer; 408 409 const float freq = clamp(frequency, 0.0, 1.0); 410 411 // Drop everything. 412 if (isClose(0.0, freq) || isNaN(freq)) { 413 return; 414 } 415 416 // Sample if frequency is materially below 1.0. 417 const bool closeToOne = isClose(1.0, freq); 418 if (!closeToOne && freq < uniform01(rng)) { 419 return; 420 } 421 422 scope OutBuffer buf = new OutBuffer(); 423 buf.reserve(256); 424 425 if (closeToOne) { 426 assumeWontThrow(buf.writef("%s%s:%d|%r", prefix, key, value, type)); 427 } else { 428 assumeWontThrow(buf.writef("%s%s:%d|%r|@%.2f", prefix, key, value, type, frequency)); 429 } 430 431 scope ubyte[] bytes = buf.toBytes(); 432 433 version (Posix) { 434 import core.sys.posix.unistd : write; 435 436 write(socket, bytes.ptr, bytes.length); 437 } 438 } 439 }