1+ import os
2+ import time
3+ import socket
4+ import threading
5+ from datetime import datetime
6+ from apscheduler .schedulers .background import BackgroundScheduler
7+ import requests
8+ import psutil
9+
10+ sched = BackgroundScheduler ()
11+
12+ class Cube ():
13+ def __init__ (self , api_key , node = socket .gethostname (), batch_size = 60 ,
14+ dispatch_interval = 60 , tags = None ):
15+ self .api_key = api_key
16+ self .node = node
17+ self .batch_size = batch_size
18+ self .dispatch_interval = dispatch_interval
19+ self .tags = tags
20+ self .start_time = time .time ()
21+ self .uptime = 0
22+ self .cpu = 0.0
23+ self .memory = 0
24+ self .active_requests = 0
25+ self .requests = []
26+ self .lock = threading .Lock ()
27+
28+ def system ():
29+ p = psutil .Process (os .getpid ())
30+ self .uptime = int (time .time () - self .start_time )
31+ self .cpu = p .cpu_percent ()
32+ self .memory = p .memory_full_info ().rss
33+
34+ sched .add_job (self ._dispatch , 'interval' , seconds = dispatch_interval )
35+ sched .add_job (system , 'interval' , seconds = 1 )
36+ sched .start ()
37+
38+ def _dispatch (self ):
39+ if not self .requests :
40+ return
41+
42+ r = requests .post ('https://api.labstack.com/cube' , headers = {
43+ 'User-Agent' : 'labstack/cube' ,
44+ 'Authorization' : 'Bearer ' + self .api_key
45+ }, json = self .requests )
46+ if not 200 <= r .status_code < 300 :
47+ # TOTO: handler error
48+ print ('cube error' , r .json ())
49+
50+ # Reset requests
51+ self .requests .clear ()
52+
53+ def start (self , request ):
54+ with self .lock :
55+ self .active_requests += 1
56+
57+ request ['time' ] = int (datetime .now ().timestamp () * 1000000 )
58+ request ['active' ] = self .active_requests
59+ request ['node' ] = self .node
60+ request ['uptime' ] = self .uptime
61+ request ['cpu' ] = self .cpu
62+ request ['memory' ] = self .memory
63+ request ['tags' ] = self .tags
64+ self .requests .append (request )
65+
66+ return request
67+
68+ def stop (self , request ):
69+ with self .lock :
70+ self .active_requests -= 1
71+ request ['latency' ] = int (datetime .now ().timestamp () * 1000000 ) - request ['time' ]
72+
73+ # Dispatch batch
74+ if len (self .requests ) >= self .batch_size :
75+ threading .Thread (target = self ._dispatch ).start ()
76+
0 commit comments