python 实现skywalking的trace模块过滤和报警

skywalking本身的报警功能,用起来视乎不是特别好用,目前想实现对skywalking的trace中的错误接口进行过滤并报警通知管理员和开发。所以自己就用python对skywalking做了二次数据清洗实现。项目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有兴趣的同学可以做二次改造,共同学习。下面简单列出了代码内容:

sw-trace.py

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
import smtplib
from email.mime.text import MIMEText
import re
def interface_content_filter(trace_id):
  '''
  对详细日志内容(业务逻辑报错)进行过滤
  :param trace_id:
  :return: 【1|0】
  '''
  url = "http://172.16.53.232:50001/query"
  params = {
      "trace_id": trace_id
  }
  detail_trace_id_log = requests.request(method="GET",url=url,params=params)
  detail_trace_id_log = detail_trace_id_log.text
  print(detail_trace_id_log)
  print(type(detail_trace_id_log))
  with open("blackname_keyword_list","r",encoding="utf-8") as f:
      for line in f:
          print(line)
          result = re.search(line.strip(),detail_trace_id_log)
          print(result)
          if result != None:
              print("哥们匹配到日志黑名单关键字了:%s" % line)
              return 0
  print("提示:%s不在关键字黑名单中" % trace_id)
  return 1
def interface_filter(endpointName):
  """
  设置接口黑名单
  :param endpointName:
  :return: 【1|0】
  """
  endpointName = re.sub("\(|\)",".",endpointName)
  with open("blackname_list","r",encoding="utf-8") as f:
      bn_list = f.read()
  match_result = re.search(endpointName.strip(),bn_list)
  if match_result == None:
      print("提示:接口不存在黑名单中")
      return 1
  print("提示:接口在黑名单中")
  return 0
def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr):
  """
  skywalking trace功能对错误接口进行过滤,默认最大一次获取2000条数据,每分钟执行一次
  :param start_time:
  :param end_time:
  :return:
  """
  url = sw_url
  data = {
        "query": "query queryTraces($condition: TraceQueryCondition) {\n  data: queryBasicTraces(condition: $condition) {\n    traces {\n      key: segmentId\n      endpointNames\n      duration\n      start\n      isError\n      traceIds\n    }\n    total\n  }}",
        "variables": {
          "condition": {
            "queryDuration": {
              "start": start_time, #"2021-12-07 1734"
              "end": end_time,
              "step": "MINUTE"
            },
            "traceState": "ERROR",
            "paging": {
              "pageNum": 1,
              "pageSize": per_page_size,
              "needTotal": "true"
            },
            "queryOrder": "BY_START_TIME"
            # "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105"
          }
        }
      }
  result = requests.request(method="post",url=url,json=data)
  i = 0
  # print(result.content)
  # print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))))
  with open("mail.html","w",encoding="utf-8") as f:
      f.write('<head><meta charset="UTF-8"><title>Title</title><style>.t {border-right: 2px solid black;border-bottom: 2px solid black;}.t th,td {border-top: 2px solid black;border-left: 2px solid black;font-size: 10px;}</style></head><body><div style="color:red;font-size=15px;">最近15分钟统计:</div><table class="t" border="0" cellspacing="0" cellpadding="10px"><thead><tr style="<th style="width: 100px;">时间</th><th>持续时长</th><th>接口名称</th><th>追踪ID</th></tr></thead><tbody>')
  for trace in result.json()["data"]["data"]["traces"]:
      # print(trace["endpointNames"])
      print("时间:%s\n" % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))),
            "持续时长:%s\n" % trace["duration"],
            "接口名称:%s\n" % trace["endpointNames"][0],
            "跟踪ID:%s" % trace["traceIds"][0])
      # print(time.localtime(1638869640.194))
      i+=1
      print(i)
      s_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))
      dur_time =  trace["duration"]
      endpointName = trace["endpointNames"][0]
      trace_id = trace["traceIds"][0]
      # 调用接口黑名单过滤功能
      result = interface_filter(endpointName)
      if result == 0:
          print("哥们进入黑名单了!",endpointName)
          continue
      # 调用关键字黑名单过滤功能
      keyword_result = interface_content_filter(trace_id)
      if keyword_result == 0:
          print("哥们进入关键字黑名单了!", trace_id)
          continue
      with open("mail.html","a",encoding="utf-8") as f:
          f.write('<tr><td>%s</td><td>%s</td><td>%s</td><td><a href="http://%s/query?trace_id=%s" rel="external nofollow" >%s</a></td></tr>' %(s_time,dur_time,endpointName,trace_detail_addr,trace_id,trace_id))
  with open("mail.html","a",encoding="utf-8") as f:
      f.write('</tbody></table></body>')
def send_mail(receiver):
  """
  发送报错接口邮件
  :return:
  """
  server = "mail.test.com"
  sender = "sa@test.com"
  sender_pwd = "1qaz@WSX"
  send_addr = "sa@test.com"
  receiver = receiver
  with open("mail.html","r",encoding="utf-8") as f:
      content = f.read()
  if re.search("<td>",content) == None:
      print("无报错接口!",content)
      return 0
  print("邮件前",content)
  msg_mail = MIMEText(content,"html","utf-8")
  msg_mail["Subject"] = "Skywalking报错接口统计"
  msg_mail["From"] = sender
  msg_mail["To"] = receiver
  server_obj = smtplib.SMTP_SSL(server)
  server_obj.connect(server,465)
  server_obj.login(sender,sender_pwd)
  server_obj.sendmail(send_addr,receiver,msg_mail.as_string())
if __name__ == "__main__":
  # 设定查询时间间隔,默认900s(15min)
  end_time = time.time()
  start_time = end_time - 900
  start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time))
  end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time))
  print(start_time)
  print(end_time)
  sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服务的地址和端口
  per_page_size = 5000  #指定一次获取endpoint接口的数目
  trace_detail_addr = "127.0.0.1:5000" #指定查询指定trace_id详细日志
  receiver = "shy@test.com"  #报警邮件接收人地址
  trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr)
  send_mail(receiver)
  # interface_filter()
  # interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")

sw-trace-id.py

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
from flask import Flask,request
app = Flask(__name__)
@app.route("/query",methods=["get"])
def trace_id_query():
  """
  查询指定trace_id详细日志信息
  :return: f.read()
  """
  trace_id = request.args.get("trace_id")
  url="http://172.16.53.232:9412/graphql"
  # url="http://skywalking.roulw.com/graphql"
  data = {
    "query": "query queryTrace($traceId: ID!) {\n  trace: queryTrace(traceId: $traceId) {\n    spans {\n      traceId\n      segmentId\n      spanId\n      parentSpanId\n      refs {\n        traceId\n        parentSegmentId\n        parentSpanId\n        type\n      }\n      serviceCode\n      serviceInstanceName\n      startTime\n      endTime\n      endpointName\n      type\n      peer\n      component\n      isError\n      layer\n      tags {\n        key\n        value\n      }\n      logs {\n        time\n        data {\n          key\n          value\n        }\n      }\n    }\n  }\n  }",
    "variables": {
      "traceId": trace_id
    }
  }
  result = requests.request(method="post",url=url,json=data)
  with open("detail_log", "w", encoding="utf-8") as f:
      f.write("<div style='color: red;font-size: 30px;'>生产Skywalking报错接口跟踪日志日志:<br /></div>")
  for trace_id in result.json()["data"]["trace"]["spans"]:
      if trace_id["isError"]:
          # print(trace_id)
          print("服务名称:%s\n" % trace_id["serviceCode"],
                "开始时间:%s\n" % trace_id["startTime"],
                "接口名称:%s\n" % trace_id["endpointName"],
                "peer名称:%s\n" % trace_id["peer"],
                "tags名称:%s\n" % trace_id["tags"],
                "详细日志:%s" % trace_id["logs"])
          content = "服务名称:%s<br />开始时间:%s<br />接口名称:%s<br />peer名称:%s<br />tags名称:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"])
          with open("detail_log","a",encoding="utf-8") as f:
              f.write(content)
              f.write("<br />********详细日志**********<br />")
          for logs in trace_id["logs"]:
              for log in logs["data"]:
                  if log["key"] == "message":
                      print(log["value"])
                      with open("detail_log","a",encoding="utf-8") as f:
                          f.write(log["value"])
                      # return log["value"]
                  elif log["key"] == "stack":
                      print(log["value"])
                      with open("detail_log","a",encoding="utf-8") as f:
                          f.write(log["value"])
          with open("detail_log", "a", encoding="utf-8") as f:
              f.write("<div style='color: red;font-size: 20px;'><br />========下一个接口信息=========<br /></div>")
  with open("detail_log","r",encoding="utf-8") as f:
      return f.read()
if __name__ == "__main__":
  # trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785"
  # trace_id_query(trace_id)
  app.run()

关于python实现skywalking的trace模块过滤和报警的文章就介绍至此,更多相关python trace模块过滤和报警内容请搜索编程宝库以前的文章,希望以后支持编程宝库

本文提供Python上传minio以及阿里oss文件工具。环境依赖安装minio以及oss2依赖pip install minio -i https://pypi.douban.com/simp ...