import json import sqlite3 import time from datetime import datetime from rich.progress import track def upload_heartbeat(heartbeat): conn = sqlite3.connect("./data/wakapi.db") cursor = conn.cursor() cursor.execute( "INSERT INTO heartbeats (user_id, entity, type, category, project, branch, language, is_write, editor, operating_system, machine, user_agent, time, hash, origin, origin_id, created_at, lines, line_no, cursor_pos, line_deletions, line_additions, project_root_count) VALUES (:user_id, :entity, :type, :category, :project, :branch, :language, :is_write, :editor, :operating_system, :machine, :user_agent, :time, :hash, :origin, :origin_id, :created_at, :lines, :line_no, :cursor_pos, :line_deletions, :line_additions, :project_root_count)", heartbeat, ) conn.commit() conn.close() def convert_utc(timestamp): dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") return dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-4] + "+00:00" def convert_unix_time(unix_stamp): dt = datetime.fromtimestamp(unix_stamp) formatted_date = dt.strftime("%Y-%m-%d %H:%M:%S%z") return f"{formatted_date}+00:00" def map_json_heartbeat(heartbeat): mapped = { "user_id": "thomasabishop", "entity": heartbeat.get("entity", "no entity"), "type": heartbeat.get("type", None), "category": heartbeat.get("category", None), "project": heartbeat.get("project", None), "branch": heartbeat.get("branch", "not-available"), "language": heartbeat.get("language", None), "is_write": heartbeat.get("is_write", None), "editor": heartbeat.get("editor", "not-known"), "operating_system": heartbeat.get("operating_system", "not-known"), "machine": heartbeat.get("machine", "not-known"), "user_agent": heartbeat.get("user_agent", None), "time": convert_unix_time(heartbeat.get("time", 0)), "hash": heartbeat.get("hash", None), "origin": heartbeat.get("origin", None), "origin_id": heartbeat.get("origin_id", None), "created_at": convert_utc(heartbeat.get("created_at", 0)), "lines": heartbeat.get("lines", 0), "line_no": heartbeat.get("line_no", 0), "cursor_pos": heartbeat.get("cursor_pos", 0), "line_deletions": heartbeat.get("line_deletions", 0), "line_additions": heartbeat.get("line_additions", 0), "project_root_count": heartbeat.get("project_root_count", 0), } return mapped deadletter = [] print("----") print("INFO Collating heartbeats data...") f = open("heartbeats.json") data = json.load(f) days = data["days"] total_days = len(days) print(f"INFO Total days = {total_days}") print("----") for i in range(0, total_days): day_count = i + 1 print("----") print(f"INFO Processing day {day_count} ") hbeats = days[i]["heartbeats"] hbeats_count = len(days[i]["heartbeats"]) print(f"INFO Day {day_count} has {hbeats_count} heartbeats") if hbeats_count == 0: print("INFO Nothing to upload") else: for j in track( range(0, hbeats_count), description=f"INFO Uploading Day {day_count} heartbeats...", ): try: mapped_hb = map_json_heartbeat(hbeats[j]) upload_heartbeat(mapped_hb) except sqlite3.Error as e: print( "ERROR Heartbeat could not be uploaded. Sending to dead letter..." ) deadletter.append( {"sqlite_error_code": e.sqlite_errorname, "heartbeat": hbeats[j]} ) continue print("----") time.sleep(0.25) if len(deadletter): print("----") print("INFO Some heartbeats could not be uploaded") for dl in deadletter: print(json.dumps(dl))