Commit 1845f024 authored by 王英豪's avatar 王英豪

合并下

parents c3713726 099f1574
...@@ -22,6 +22,9 @@ from automated_main.view.web_socket.socket import ChatConsumer ...@@ -22,6 +22,9 @@ from automated_main.view.web_socket.socket import ChatConsumer
urlpatterns = [ urlpatterns = [
# path('admin/', admin.site.urls), # path('admin/', admin.site.urls),
# 测试人员进度-人员分配接口
path('', include("automated_main.url.testing_progress_url.testing_progress_url")),
# 用户接口 # 用户接口
path('', include("automated_main.url.user_url.user_url")), path('', include("automated_main.url.user_url.user_url")),
......
# Generated by Django 3.1.3 on 2022-10-24 20:37
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('automated_main', '0003_apimanagement_yapi_id'),
]
operations = [
migrations.CreateModel(
name='TestingProgress',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('testing_progress_title', models.CharField(max_length=50, verbose_name='测试进度标题')),
('start_date', models.TextField(default='', max_length=200, null=True, verbose_name='开始时间')),
('end_date', models.TextField(default='', max_length=200, null=True, verbose_name='开始时间')),
('duration', models.TextField(default='', max_length=200, null=True, verbose_name='测试内容-标题')),
('group', models.IntegerField(default=1, verbose_name='是否分组')),
('parent', models.IntegerField(default=0, max_length=50, null=True, verbose_name='父id')),
('type', models.CharField(max_length=50, verbose_name='图表颜色')),
('description', models.TextField(default='', max_length=20000, null=True, verbose_name='描述')),
('update_time', models.DateTimeField(auto_now_add=True)),
('create_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
),
]
# Generated by Django 3.1.3 on 2022-10-24 20:44
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('automated_main', '0004_testingprogress'),
]
operations = [
migrations.AlterField(
model_name='testingprogress',
name='parent',
field=models.IntegerField(default=0, null=True, verbose_name='父id'),
),
]
...@@ -10,10 +10,37 @@ from automated_main.models.ui_automation.ui_element_operation import UIElementsO ...@@ -10,10 +10,37 @@ from automated_main.models.ui_automation.ui_element_operation import UIElementsO
from automated_main.models.ui_automation.ui_page_element import UIPageElement from automated_main.models.ui_automation.ui_page_element import UIPageElement
from automated_main.models.ui_automation.ui_test_case import UITestCase, UITestCaseAssociated from automated_main.models.ui_automation.ui_test_case import UITestCase, UITestCaseAssociated
from automated_main.models.ui_automation.ui_test_task import UITestTask, UITestResult from automated_main.models.ui_automation.ui_test_task import UITestTask, UITestResult
<<<<<<< HEAD
from automated_main.models.performance_test.performance_project import PerformanceProject from automated_main.models.performance_test.performance_project import PerformanceProject
from automated_main.models.performance_test.performance_script import PerformanceScript from automated_main.models.performance_test.performance_script import PerformanceScript
from automated_main.models.performance_test.performance_report import PerformanceReport from automated_main.models.performance_test.performance_report import PerformanceReport
=======
"""
8.19 弃用
"""
# from automated_main.models.api_automation.api_project import APIProject
# from automated_main.models.api_automation.api_module import APIModule
from automated_main.models.api_automation.api_environment import APIEnvironment
"""
8.19 弃用
"""
# from automated_main.models.api_automation.api_test_case import ApiTestCase
# from automated_main.models.api_automation.api_business_test import ApiBusinessTest, ApiBusinessTestAssociated
# from automated_main.models.api_automation.api_test_task import APITestTask, APITestResultAssociated, APITestResult
from automated_main.models.performance_test.performance_project import PerformanceProject
from automated_main.models.performance_test.performance_script import PerformanceScript
from automated_main.models.performance_test.performance_report import PerformanceReport
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.models.api_automation.api_management import ApiManagement
from automated_main.models.api_automation.api_interfaces_case import ApiInterfacesCase
from automated_main.models.api_automation.api_scenarios_case import ApiScenariosCaseAssociated, \
ApiScenariosCaseAssociated
from automated_main.models.api_automation.api_test_plan import ApiTestPlan
from automated_main.models.testing_progress.testing_progress import TestingProgress
>>>>>>> test
from django.db import models
class TestingProgress(models.Model):
"""
测试进度
"""
testing_progress_title = models.CharField("测试进度标题", max_length=50, null=False)
start_date = models.TextField("开始时间", default="", max_length=200, null=True)
end_date = models.TextField("开始时间", default="", max_length=200, null=True)
duration = models.TextField("测试内容-标题", default="", max_length=200, null=True)
group = models.IntegerField("是否分组", default=1) # 1 是 0 不是
parent = models.IntegerField("父id", null=True, default=0)
type = models.CharField("图表颜色", max_length=50, null=False)
description = models.TextField("描述", default="", max_length=20000, null=True)
update_time = models.DateTimeField(auto_now_add=True)
create_time = models.DateTimeField("创建时间", auto_now_add=True)
def __str__(self):
return self.testing_progress_title
# -*- coding: utf-8 -*-
# @Time : 2022/10/24 21:06
# @Author : wyh
# @File : __init__.py.py
# @desc:
# -*- coding: utf-8 -*-
# @Time : 2022/10/24 21:06
# @Author : wyh
# @File : testing_progress_url.py
# @desc:
from django.urls import path
from automated_main.view.testing_progress.PersonnelAllocation.personnel_allocation_view import PersonnelAllocationView
urlpatterns = [
path("api/backend/personnel_allocation/", PersonnelAllocationView.as_view()),
path("api/backend/personnel_allocation_list/", PersonnelAllocationView.as_view()),
]
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/4/14 18:59
# @Author : wangyinghao
# @FileName: api_business_test_view.py
# @Software: PyCharm
from django.views.generic import View
import json
import ast
import re
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_project import APIProject
from automated_main.models.api_automation.api_module import APIModule
from automated_main.models.api_automation.api_environment import APIEnvironment
from automated_main.models.api_automation.api_business_test import ApiBusinessTest, ApiBusinessTestAssociated
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.utils.handle_db import HandleDB
from automated_main.form.api_business_test import ApiBusinessTestForm
from automated_main.utils.api_utils import RegularMatch, InterfaceRequest, ExtractParameters
from automated_main.models.api_automation.api_test_case import ApiTestCase, ApiParameterExtraction, ApiParameterData
import logging
logger = logging.getLogger('django')
class GetApiBusinessTestSelectData(View):
def get(self, request, api_project_id, *args, **kwargs):
"""
获取api项目---模块---测试用例
:param request:
:param api_project_id: Api项目id
:param args:
:param kwargs:
:return:
"""
api_projects = APIProject.objects.filter(id=api_project_id)
data_list = []
for api_project in api_projects:
project_dict = {
"project_id": api_project.id,
"api_project_name": api_project.api_project_name
}
api_modules = APIModule.objects.filter(api_project_id=api_project.id)
api_module_list = []
for api_module in api_modules:
api_test_cases = ApiTestCase.objects.filter(api_module_id=api_module.id)
api_test_case_list = []
for api_test_case in api_test_cases:
api_test_case_list.append({
"api_test_case_id": api_test_case.id,
"api_test_case_name": api_test_case.api_test_case_name,
})
api_module_list.append({
"api_module_id": api_module.id,
"api_module_name": api_module.api_module_name,
"api_test_case_list": api_test_case_list,
})
project_dict["module_list"] = api_module_list
data_list.append(project_dict)
return response_success(data_list)
class ApiBusinessTestView(View):
def delete(self, request, api_business_test_id, *args, **kwargs):
"""
代表删除API业务测试
:param request:
:param api_business_test_id: 业务测试id
:param args:
:param kwargs:
:return:
"""
ApiBusinessTest.objects.delete()
return response_success("删除API业务测试成功")
def get(self, request, api_business_test_id, *args, **kwargs):
"""
获取单独API业务测试
:param request:
:param api_business_test_id: 业务测试ID
:param args:
:param kwargs:
:return:
"""
api_business_test = ApiBusinessTest.objects.get(id=api_business_test_id)
api_business_test_associated = ApiBusinessTestAssociated.objects.filter(bid_id=api_business_test_id)
if api_business_test is None:
return response_success()
else:
api_business_test_data_list = []
for api_business_test_associateds in api_business_test_associated:
api_business_test_dict = {
"api_module_id": api_business_test_associateds.api_module_id,
"api_test_case_id": api_business_test_associateds.api_test_case_id,
"steps": int(api_business_test_associateds.case_steps),
}
api_business_test_data_list.append(api_business_test_dict)
api_business_test_data_list_sorting = sorted(api_business_test_data_list, key=lambda x: x['steps'])
return response_success({"api_business_test_name": api_business_test.api_business_test_name,
"api_project_id": api_business_test.api_project_id,
"api_business_test_describe": api_business_test.api_business_test_describe,
"api_business_test_data": api_business_test_data_list_sorting})
def post(self, request, api_business_test_id, *args, **kwargs):
"""
编辑API业务测试
:param request:
:param api_business_test_id:
:param args:
:param kwargs:
:return:
"""
api_business_test = ApiBusinessTest.objects.get(id=api_business_test_id)
if api_business_test is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiBusinessTestForm(data)
api_business_test.api_business_test_name = data['api_business_test_name']
api_business_test.api_project.id = data['api_project_id']
api_business_test.id = data['api_business_test_id']
api_business_test.api_business_test_describe = data['api_business_test_describe']
api_business_test.save()
ApiBusinessTestAssociated.objects.delete()
if form.is_valid():
for i in data["api_business_test_data"]:
api_module_id = (i['api_module_id'])
api_test_case_id = (i['api_test_case_id'])
case_steps = (i['steps'])
ApiBusinessTestAssociated.objects.create(bid_id=api_business_test_id, api_module_id=api_module_id,
api_test_case_id=api_test_case_id, case_steps=case_steps)
return response_success("编辑API业务测试")
else:
raise MyException()
def put(self, request, *args, **kwargs):
"""
创建API业务测试
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiBusinessTestForm(data)
if form.is_valid():
api_business_test = ApiBusinessTest.objects.create(**form.cleaned_data)
api_business_test_id = api_business_test.id
for i in data["api_business_test_data"]:
api_module_id = (i['api_module_id'])
api_test_case_id = (i['api_test_case_id'])
case_steps = (i['steps'])
ApiBusinessTestAssociated.objects.create(bid_id=api_business_test_id, api_module_id=api_module_id,
api_test_case_id=api_test_case_id, case_steps=case_steps)
return response_success("创建API业务测试用例成功")
else:
raise MyException()
class ApiBusinessTestDebugView(View):
def post(self, request, *args, **kwargs):
"""
API业务测试DEBUG
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
for case in data['api_business_test_data']:
test_case = ApiTestCase.objects.get(id=case['api_test_case_id'])
api_environment = APIEnvironment.objects.get(id=data['environment'])
# 请求地址
api_url = api_environment.api_title + test_case.api_url
api_url = RegularMatch(api_url, data['environment'])
if api_url[0] == "该变量数据库未找到":
return response_failed("4000", str(api_url[0] + api_url[1]))
api_headers = RegularMatch(test_case.api_headers, data['environment'])
if api_headers[0] == "该变量数据库未找到":
return response_failed("4000", str(api_headers[0] + api_headers[1]))
json_header = api_headers.replace("\'", "\"")
if test_case.api_headers == '':
header = test_case.api_headers
else:
try:
header = json.loads(json_header)
except json.decoder.JSONDecodeError:
return response_failed('30000', "header类型错误,用例名称: " + str(test_case.api_test_case_name))
logger.info(json_header)
api_parameter_body = RegularMatch(test_case.api_parameter_body, data['environment'])
if api_parameter_body[0] == "该变量数据库未找到":
return response_failed('40000', "该变量数据库未找到,用例名称: " + str(test_case.api_test_case_name) + "\n" + "变量值:" +
str(api_parameter_body[0] + api_parameter_body[1]))
json_par = api_parameter_body.replace("\'", "\"")
if api_parameter_body == '':
payload = api_parameter_body
logger.info(payload)
else:
try:
payload = json.loads(json_par)
logger.info(payload)
except json.decoder.JSONDecodeError:
return response_failed('30000', "参数类型错误,用例名称: " + str(test_case.api_test_case_name))
# Params参数
parameter_data = ApiParameterData.objects.filter(api_test_case_id=case['api_test_case_id'])
api_parameter_dict = {}
for api_parameters in parameter_data:
if api_parameters.api_must_parameter == "true":
api_parameter_dict.update(
{api_parameters.api_parameter_name: api_parameters.api_parameter_value})
else:
pass
if api_parameter_dict == {}:
api_parameter_data = {}
else:
api_parameter_data = RegularMatch(str(api_parameter_dict), data['environment'])
if api_parameter_data == {}:
data_payload = api_parameter_data
logger.info(data_payload)
else:
try:
data_payload = ast.literal_eval(api_parameter_data)
except json.decoder.JSONDecodeError:
return response_failed("30000", "Parameter参数类型错误" + api_parameter_data)
r = InterfaceRequest(str(test_case.api_method), str(test_case.api_parameter_types),
api_url, header, payload, data_payload)
if r == "请求超时":
return response_failed('30000', "接口请求超时:" + str(test_case.api_test_case_name))
try:
response_message = r.json()
except json.decoder.JSONDecodeError:
return response_failed('30000', "接口返回结果类型错误非JSON格式:" + str(test_case.api_test_case_name))
# 断言
if test_case.api_assert_type == '':
result = "断言内容为空"
logger.error(result)
elif test_case.api_assert_type == 1:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(test_case.api_assert_text.split())
assert r_text_data in r_text_test
except Exception as e:
result = "断言失败: 响应内容中未包含 断言内容,断言内容: %s ,响应内容:%s" % (r_text_data, r_text_test)
logger.error(result + str(e))
return response_failed('30000', "断言失败,用例名称: " + str(test_case.api_test_case_name) + "\n" +
"断言结果:" + result)
elif test_case.api_assert_type == 2:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(test_case.api_assert_text.split())
assert r_text_data == r_text_test
except Exception as e:
result = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (r_text_data, r_text_test)
logger.error(result, e)
return response_failed('30000', "断言失败,用例名称: " + str(test_case.api_test_case_name) + "\n" +
"断言结果:" + result)
elif test_case.api_assert_type == 3:
database = APIDatabase.objects.get(id=test_case.dataBase_id)
db = HandleDB(database.api_host, int(database.api_port), database.user, database.password,
database.database)
sql = data['database_sql']
count = db.get_count(sql)
db.close()
if str(count) == str(test_case.api_assert_text):
result = '断言成功'
logger.error(result)
else:
result = '断言失败,查询的数据是: %s' % str(count)
logger.error(result)
return response_failed('30000', "断言失败,用例名称: " + str(test_case.api_test_case_name) + "\n" +
"断言结果:" + result)
elif test_case.api_assert_type == 4:
try:
assert test_case.api_assert_text == str(r.status_code)
result = "断言成功"
except Exception as e:
result = "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (test_case.api_assert_text, str(r.status_code))
logger.error(result, e)
return response_failed('30000', "断言失败,用例名称: " + str(test_case.api_test_case_name) + "\n" +
"断言结果:" + result)
parameter_extraction = ApiParameterExtraction.objects.filter(api_test_case_id=case['api_test_case_id'])
if parameter_extraction == "" or None:
pass
else:
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
logger.info('json转换失败(变量):' + str(e))
pass
for parameter_extractions in parameter_extraction:
v_text = parameter_extractions.api_value_variable.split(".")
try:
api_results = ExtractParameters(v_text, api_result)
api_parameter_extraction = ApiParameterExtraction.objects.get(
api_key_variable=str(parameter_extractions.api_key_variable))
api_parameter_extraction.api_variable_results = str(api_results)
api_parameter_extraction.save()
except Exception as e:
logger.info('参数提取失败' + str(e))
return response_failed('30000', "参数提取失败,用例名称:" +
str(test_case.api_test_case_name) + '\n' + "异常信息:" + str(e))
return response_success("调试成功")
# -*- coding: utf-8 -*-
# @Time : 2021/10/21 17:24
# @Author : wangyinghao
# @File : api_database_view.py
# @Software: PyCharm
from django.views.generic import View
import json
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.form.api_database import ApiDatabaseForm
from automated_main.utils.handle_db import HandleDB
class ApiDatabaseView(View):
def get(self, request, api_database_id, *args, **kwargs):
"""
代表获取单个API数据库
:param request:
:param api_database_id:数据库的id
:param args:
:param kwargs:
:return:
"""
api_database = APIDatabase.objects.filter(id=api_database_id).first()
if api_database is None:
return response_success()
else:
return response_success(model_to_dict(api_database))
def post(self, request, api_database_id, *args, **kwargs):
"""
代表更改API数据库
:param request:
:param api_database_id:
:param args:
:param kwargs:
:return:
"""
api_database = APIDatabase.objects.filter(id=api_database_id).first()
if api_database is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiDatabaseForm(data)
if form.is_valid():
APIDatabase.objects.filter(id=api_database_id).update(**form.cleaned_data)
return response_success("编辑成功")
else:
raise MyException()
def delete(self, request, api_database_id, *args, **kwargs):
"""
代表删除单独API数据库
:param request:
:param api_database_id: API数据库ID
:param args:
:param kwargs:
:return:
"""
APIDatabase.objects.delete()
return response_success("删除成功")
def put(self, request, *args, **kwargs):
"""
代表创建API数据库
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiDatabaseForm(data)
if form.is_valid():
APIDatabase.objects.create(**form.cleaned_data)
return response_success("创建成功")
else:
raise MyException(message="创建失败")
class DatabaseProcessingView(View):
def post(self, request, *args, **kwargs):
"""
连接数据库
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
db = HandleDB(data['api_host'], int(data['api_port']), data['user'], data['password'], data['database'])
sql = 'SELECT VERSION()'
try:
db.execution_results(sql)
db.close()
return response_success("调试成功")
except Exception as e:
print(e)
pass
return MyException(40000, message="调试失败")
# -*- coding: utf-8 -*-
# @Time : 2021/3/25 15:28
# @Author : wangyinghao
# @FileName: api_environment_view.py
# @Software: PyCharm
from django.views.generic import View
import json
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_environment import APIEnvironment
from automated_main.models.api_automation.api_environment import APIGlobalVariable
from automated_main.form.api_environment import ApiEnvironmentForm
class ApiEnvironmentView(View):
def get(self, request, api_environment_id, *args, **kwargs):
"""
代表获取单个API环境
:param request:
:param api_environment_id:
:param args:
:param kwargs:
:return:
"""
api_environment = APIEnvironment.objects.filter(id=api_environment_id).first()
api_environment_data = model_to_dict(api_environment)
api_global_variable = APIGlobalVariable.objects.filter(api_environment_id=api_environment_id)
global_variable_list = []
if api_global_variable.count() == 0:
global_variable_dict = {
"api_global_variable_name": "",
"api_global_variable_value": "",
"api_global_variable_describe": ""
}
global_variable_list.append(global_variable_dict)
api_environment_data["global_variable"] = global_variable_list
else:
for global_variable in api_global_variable:
global_variable_dict = {
"api_global_variable_name": global_variable.api_global_variable_name,
"api_global_variable_value": global_variable.api_global_variable_value,
"api_global_variable_describe": global_variable.api_global_variable_describe
}
global_variable_list.append(global_variable_dict)
api_environment_data["global_variable"] = global_variable_list
if api_environment is None:
return response_success()
else:
return response_success(api_environment_data)
def post(self, request, api_environment_id, *args, **kwargs):
"""
代表更改API环境
:param request:
:param api_environment_id:
:param args:
:param kwargs:
:return:
"""
api_environment = APIEnvironment.objects.filter(id=api_environment_id).first()
if api_environment is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiEnvironmentForm(data)
if form.is_valid():
APIEnvironment.objects.filter(id=api_environment_id).update(**form.cleaned_data)
if APIGlobalVariable.objects.filter(api_environment_id=api_environment_id).count() >= 0:
APIGlobalVariable.objects.delete()
for global_variable in data['global_variable_data']:
APIGlobalVariable.objects.create(
api_environment_id=api_environment_id,
api_global_variable_name=global_variable['api_global_variable_name'],
api_global_variable_value=global_variable['api_global_variable_value'],
api_global_variable_describe=global_variable['api_global_variable_describe'],
)
return response_success("编辑API环境成功")
else:
raise MyException()
def delete(self, request, api_environment_id, *args, **kwargs):
"""
代表删除单独API环境
:param request:
:param api_environment_id: API环境ID
:param args:
:param kwargs:
:return:
"""
APIEnvironment.objects.delete()
return response_success("删除API环境成功")
def put(self, request, *args, **kwargs):
"""
代表创建API环境
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiEnvironmentForm(data)
if form.is_valid():
api_environment = APIEnvironment.objects.create(**form.cleaned_data)
api_environment.save()
if APIGlobalVariable.objects.filter(api_environment_id=api_environment.id).count() >= 0:
APIGlobalVariable.objects.delete()
for global_variable in data['global_variable_data']:
APIGlobalVariable.objects.create(
api_environment_id=api_environment.id,
api_global_variable_name=global_variable['api_global_variable_name'],
api_global_variable_value=global_variable['api_global_variable_value'],
api_global_variable_describe=global_variable['api_global_variable_describe'],
)
return response_success("创建成功")
else:
raise MyException(message="创建失败")
# -*- coding: utf-8 -*-
# @Time : 2022/7/15 17:01
# @Author : wangyinghao
# @Site :
# @File : api_interfaces_case_view.py
# @Software: PyCharm
from datetime import datetime
from django.views.generic import View
import json
import re
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_interfaces_case import ApiInterfacesCase, ApiInterfacesParameterData
from automated_main.form.api_interfaces_case import ApiInterfacesCaseForm
from automated_main.utils.api_utils import RegularMatch, InterfaceRequest, ExtractParameters
from automated_main.models.api_automation.api_environment import APIEnvironment
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.utils.handle_db import HandleDB
from automated_main.models.api_automation.api_interfaces_case import ApiInterfacesParameterExtraction
import logging
logger = logging.getLogger('django')
class ApiInterfacesCaseView(View):
def put(self, request, *args, **kwargs):
"""
创建接口测试用例
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiInterfacesCaseForm(data)
if form.is_valid():
api_interfaces_case = ApiInterfacesCase.objects.create(**form.cleaned_data)
api_test_case_id = api_interfaces_case.id
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_key_variable'] == "":
pass
elif api_parameter_extraction['api_key_variable'] != "":
if ApiInterfacesParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() > 0:
api_test_case_id.delete()
return response_failed("30000", "变量名称重复,请重新填写")
else:
ApiInterfacesParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_parameter_extraction[
'api_variable_results'],
api_value_variable=api_parameter_extraction[
'api_value_variable'],
api_key_variable=api_parameter_extraction[
'api_key_variable'])
for api_parameter in data['api_parameter']:
ApiInterfacesParameterData.objects.create(api_test_case_id=api_test_case_id,
api_parameter_name=api_parameter['api_parameter_name'],
api_must_parameter=api_parameter['api_must_parameter'],
api_parameter_value=api_parameter['api_parameter_value'],
api_field_describe=api_parameter['api_field_describe'])
return response_success("创建API测试用例成功")
else:
raise MyException()
def get(self, request, api_interfaces_case_id, *args, **kwargs):
"""
获取单独接口测试用例
:param request:
:param args:
:param api_interfaces_case_id:
:param kwargs:
:return:
"""
api_test_case = ApiInterfacesCase.objects.filter(id=api_interfaces_case_id).first()
api_parameter_extractions = ApiInterfacesParameterExtraction.objects.filter(
api_test_case_id=api_interfaces_case_id)
api_parameter_extraction_list = []
api_parameter_data_list = []
api_parameter_data = ApiInterfacesParameterData.objects.filter(api_test_case_id=api_interfaces_case_id)
if api_parameter_data.count() > 0:
for api_parameter in api_parameter_data:
api_parameter_dict = {
"api_must_parameter": api_parameter.api_must_parameter,
"api_parameter_name": api_parameter.api_parameter_name,
"api_parameter_value": api_parameter.api_parameter_value,
"api_field_describe": api_parameter.api_field_describe
}
api_parameter_data_list.append(api_parameter_dict)
if api_parameter_extractions.count() > 0:
for api_parameter_extraction in api_parameter_extractions:
api_parameter_extraction_dict = {
"api_variable_results": api_parameter_extraction.api_variable_results,
"api_value_variable": api_parameter_extraction.api_value_variable,
"api_key_variable": api_parameter_extraction.api_key_variable
}
api_parameter_extraction_list.append(api_parameter_extraction_dict)
else:
api_parameter_extraction_dict = {
"api_variable_results": "",
"api_value_variable": "",
"api_key_variable": ""
}
api_parameter_extraction_list.append(api_parameter_extraction_dict)
if api_test_case is None:
return response_success()
else:
api_test_case_dict = model_to_dict(api_test_case)
api_test_case_dict['api_parameter_extraction'] = api_parameter_extraction_list
api_test_case_dict['api_parameter'] = api_parameter_data_list
return response_success(api_test_case_dict)
def post(self, request, api_interfaces_case_id, *args, **kwargs):
"""
编辑API测试用例
:param request:
:param args:
:param api_interfaces_case_id:
:param kwargs:
:return:
"""
api_test_case = ApiInterfacesCase.objects.filter(id=api_interfaces_case_id).first()
if api_test_case is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiInterfacesCaseForm(data)
if form.is_valid():
if ApiInterfacesParameterExtraction.objects.filter(api_test_case_id=api_interfaces_case_id).count() > 0:
ApiInterfacesParameterExtraction.objects.delete()
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_key_variable'] == "":
pass
elif ApiInterfacesParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() > 0:
return response_failed("30000", "变量名称重复,请重新填写")
else:
ApiInterfacesParameterExtraction.objects.create(api_test_case_id=api_interfaces_case_id,
api_variable_results=api_parameter_extraction[
'api_variable_results'],
api_value_variable=api_parameter_extraction[
'api_value_variable'],
api_key_variable=api_parameter_extraction[
'api_key_variable'])
else:
logger.info("自定义变量表中 无 该 case_id")
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_key_variable'] == "":
logger.info("api_key_variable是空")
pass
elif api_parameter_extraction['api_key_variable'] != "":
if ApiInterfacesParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() > 0:
return response_failed("30000", "变量名称重复,请重新填写")
else:
ApiInterfacesParameterExtraction.objects.create(api_test_case_id=api_interfaces_case_id,
api_variable_results=
api_parameter_extraction[
'api_variable_results'],
api_value_variable=api_parameter_extraction[
'api_value_variable'],
api_key_variable=api_parameter_extraction[
'api_key_variable'])
if ApiInterfacesParameterData.objects.filter(api_test_case_id=api_interfaces_case_id).count() > 0:
ApiInterfacesParameterData.objects.delete()
for api_parameter in data['api_parameter']:
ApiInterfacesParameterData.objects.create(api_test_case_id=api_interfaces_case_id,
api_parameter_name=api_parameter['api_parameter_name'],
api_must_parameter=api_parameter['api_must_parameter'],
api_parameter_value=api_parameter['api_parameter_value'],
api_field_describe=api_parameter['api_field_describe'])
else:
for api_parameter in data['api_parameter']:
ApiInterfacesParameterData.objects.create(api_test_case_id=api_interfaces_case_id,
api_parameter_name=api_parameter['api_parameter_name'],
api_must_parameter=api_parameter['api_must_parameter'],
api_parameter_value=api_parameter['api_parameter_value'],
api_field_describe=api_parameter['api_field_describe'])
ApiInterfacesCase.objects.filter(id=api_interfaces_case_id).update(update_time=datetime.now())
ApiInterfacesCase.objects.filter(id=api_interfaces_case_id).update(**form.cleaned_data)
return response_success("修改API测试用例成功")
else:
raise MyException()
def delete(self, request, api_interfaces_case_id, *args, **kwargs):
"""
删除接口测试用例
:param request:
:param api_interfaces_case_id:
:param args:
:param kwargs:
:return:
"""
ApiInterfacesCase.objects.delete()
return response_success("删除API测试用例成功")
class ApiInterfacesCaseDeBugView(View):
def post(self, request, *args, **kwargs):
"""
API测试用例调试
:param request:
:param args:
:param kwargs:
:return:
"""
data_payload = ''
api_result = ''
body = request.body
if not body:
return response_success()
data = json.loads(body)
# noinspection PyBroadException
try:
api_environment = APIEnvironment.objects.get(id=data['api_environment_id'])
except Exception:
return response_failed("30000", "请选择域名")
# 请求地址
api_url = api_environment.api_title + data['api_url']
api_url = RegularMatch(api_url, data['api_environment_id'], regular="new")
logger.info("************************")
logger.info("请求地址: %s", api_url)
logger.info("************************")
if api_url[0] == "该变量数据库未找到":
return response_failed("4000", str(api_url[0] + api_url[1]))
api_headers = RegularMatch(data['api_headers'], data['api_environment_id'], regular="new")
if api_headers[0] == "该变量数据库未找到":
return response_failed("4000", str(api_headers[0] + api_headers[1]))
json_header = api_headers.replace("\'", "\"")
if data['api_headers'] == '':
header = data['api_headers']
else:
try:
header = json.loads(json_header)
except json.decoder.JSONDecodeError:
return response_failed("30000", "header类型错误" + json_header)
logger.info(json_header)
# Params参数
api_parameter = {}
for api_parameters in data['api_parameter']:
if api_parameters['api_must_parameter'] == "true":
api_parameter.update({api_parameters['api_parameter_name']: api_parameters['api_parameter_value']})
else:
pass
api_parameter_data = RegularMatch(str(api_parameter), data['api_environment_id'], regular="new")
if api_parameter_data[0] == "该变量数据库未找到":
return response_failed("4000", str(api_parameter_data[0] + api_parameter_data[1]))
data_parameter = api_parameter_data.replace("\'", "\"")
if data_parameter == '':
data_payload = api_parameter_data
logger.info(data_payload)
else:
try:
data_payload = json.loads(data_parameter)
logger.info(data_payload)
except json.decoder.JSONDecodeError:
return response_failed("30000", "参数类型错误" + data_payload)
api_parameter_body = RegularMatch(data['api_parameter_body'], data['api_environment_id'], regular="new")
if api_parameter_body[0] == "该变量数据库未找到":
return response_failed("4000", str(api_parameter_body[0] + api_parameter_body[1]))
json_par = api_parameter_body.replace("\'", "\"")
if api_parameter_body == '':
payload = api_parameter_body
logger.info(payload)
else:
try:
payload = json.loads(json_par)
logger.info(payload)
except json.decoder.JSONDecodeError:
return response_failed("30000", "参数类型错误" + json_par)
r = InterfaceRequest(str(data['api_method']), str(data['api_parameter_types']), api_url, header, payload,
data_payload)
logger.info("测试用例名称: %s", data['api_interfaces_case_name'])
logger.info("请求地址: %s", r.url)
logger.info("请求json数据: %s", payload)
logger.info("请求parameter数据: %s", data_payload)
logger.info("请求头: %s", r.headers)
logger.info("请求响应: %s", r.text)
if r == "请求超时":
return response_failed("30000", "接口超时", {'api_url': api_url,
"api_method": int(data['api_method']),
'api_header': header,
'api_body': payload,
'api_code': "请求超时",
'api_assert': "请求超时,断言失败",
'response_message': {"results": "请求超时"},
'response_time': 0,
'api_parameter_data': api_parameter})
# r.encoding = r.apparent_encoding
try:
response_message = r.json()
except json.decoder.JSONDecodeError:
return response_failed("30000", "接口返回结果类型错误非JSON格式", {'api_url': api_url,
"api_method": int(data['api_method']),
'api_header': header,
'api_body': payload,
'api_code': str(r.status_code),
'response_message': r.text,
'response_time': str(r.elapsed.total_seconds()),
'api_parameter_data': api_parameter})
# 断言
if data['api_assert_type'] == '':
result = "断言内容为空"
elif data['api_assert_type'] == 1:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"), r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(data['api_assert_text'].split())
assert r_text_data in r_text_test
result = "断言成功"
except Exception as e:
result = "断言失败: 响应内容中未包含 断言内容,断言内容: %s ,响应内容:%s" % (
data['api_assert_text'], r_text_test)
logger.error(result + str(e))
pass
elif data['api_assert_type'] == 2:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"), r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(data['api_assert_text'].split())
assert r_text_data == r_text_test
result = "断言成功"
except Exception as e:
result = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (
data['api_assert_text'], r_text_test)
logger.error(result + str(e))
pass
elif data['api_assert_type'] == 3:
database = APIDatabase.objects.get(id=data['dataBase'])
db = HandleDB(database.api_host, int(database.api_port), database.user, database.password,
database.database)
sql = data['database_sql']
count = db.get_count(sql)
db.close()
if str(count) == str(data['api_assert_text']):
result = '断言成功'
else:
result = '断言失败,查询的数据是: %s' % str(count)
elif data['api_assert_type'] == 4:
try:
assert data['api_assert_text'] == str(r.status_code)
result = "断言成功"
except Exception as e:
result = "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (data['api_assert_text'], str(r.status_code))
logger.error(result + str(e))
pass
api_variable_results = []
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_value_variable'] == "":
api_parameter_extraction_dict = {
"api_variable_results": "无参数提取",
"api_value_variable": api_parameter_extraction['api_value_variable'],
"api_key_variable": api_parameter_extraction['api_key_variable'],
}
api_variable_results.append(api_parameter_extraction_dict)
elif ApiInterfacesParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() >= 0:
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
logger.info('json转换失败(变量):' + str(e))
pass
v_text = api_parameter_extraction['api_value_variable'].split(".")
logger.info('这是提取' + str(v_text))
try:
api_result = ExtractParameters(v_text, api_result)
api_parameter_extraction_dict = {
"api_value_variable": api_parameter_extraction['api_value_variable'],
"api_key_variable": api_parameter_extraction['api_key_variable'],
"api_variable_results": str(api_result)
}
api_variable_results.append(api_parameter_extraction_dict)
except Exception as e:
logger.info('参数提取异常' + str(e))
api_parameter_extraction_dict = {
"api_variable_results": api_parameter_extraction['api_key_variable'],
"api_value_variable": api_parameter_extraction['api_value_variable'],
"api_key_variable": api_parameter_extraction['api_key_variable'],
}
api_variable_results.append(api_parameter_extraction_dict)
logger.info(api_variable_results)
return response_success({'api_url': api_url,
'api_header': header,
'api_body': payload,
'api_code': str(r.status_code),
'api_assert': str(result),
"api_method": int(data['api_method']),
'response_message': response_message,
'response_time': str(r.elapsed.total_seconds()),
'api_parameter_extraction': api_variable_results,
'api_parameter_data': api_parameter})
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2022/5/24 21:30
# @Author : wangyinghao
# @Site :
# @File : api_management_detail_view.py
# @Software: PyCharm
from django.views.generic import View
import json
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.form.api_management import ApiManagementForm
from automated_main.models.api_automation.api_management import ApiManagement
from datetime import datetime
class ApiManagementView(View):
def get(self, request, api_management_id, *args, **kwargs):
"""
代表获取单个服务
:param request:
:param api_management_id:
:param args:
:param kwargs:
:return:
"""
try:
api_management = ApiManagement.objects.get(id=api_management_id)
except ApiManagement.DoesNotExist:
return response_failed()
else:
return response_success(model_to_dict(api_management))
def post(self, request, api_management_id, *args, **kwargs):
"""
更新单个服务
:param request:
:param api_management_id:
:param args:
:param kwargs:
:return:
"""
body = request.body
params = json.loads(body)
form = ApiManagementForm(params)
result = form.is_valid()
if result:
api_management_data = ApiManagement.objects.get(id=api_management_id)
api_management_data.name = params['name']
api_management_data.description = params['description']
api_management_data.parent = params['parent']
api_management_data.update_time = datetime.now()
api_management_data.save()
else:
print(form.errors.as_json())
raise MyException()
return response_success()
def delete(self, request, api_management_id, *args, **kwargs):
ApiManagement.objects.delete()
ApiManagement.objects.delete()
return response_success()
# -*- coding: utf-8 -*-
# @Time : 2021/3/3 11:02
# @Author : wangyinghao
# @FileName: api_module_view.py
# @Software: PyCharm
from django.views.generic import View
import json
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_module import APIModule
from django.core.paginator import Paginator
import arrow
from automated_main.form.api_module import ApiModuleForm
class ApiModuleView(View):
def get(self, request, api_module_id, *args, **kwargs):
"""
代表获取单个API模块
:param request:
:param api_module_id: API模块ID
:param args:
:param kwargs:
:return:
"""
api_module = APIModule.objects.filter(id=api_module_id).first()
if api_module is None:
return response_success()
else:
return response_success(model_to_dict(api_module))
def post(self, request, api_module_id, *args, **kwargs):
"""
代表更改页面
:param request:
:param api_module_id: API模块ID
:param args:
:param kwargs:
:return:
"""
api_module = APIModule.objects.filter(id=api_module_id).first()
if api_module is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiModuleForm(data)
if form.is_valid():
APIModule.objects.filter(id=api_module_id).update(**form.cleaned_data)
return response_success("编辑API模块成功")
else:
raise MyException()
def delete(self, request, api_module_id, *args, **kwargs):
"""
代表删除单独模块
:param request:
:param api_module_id: API模块ID
:param args:
:param kwargs:
:return:
"""
APIModule.objects.delete()
return response_success("删除模块成功")
def put(self, request, *args, **kwargs):
"""
代表创建模块
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiModuleForm(data)
if form.is_valid():
APIModule.objects.create(**form.cleaned_data)
return response_success("创建成功")
else:
raise MyException(message="创建失败")
class ApiProjectModuleView(View):
def get(self, request, api_project_id, size_page, page, *args, **kwargs):
"""
获取 单个API项目中包含得所有模块
:param page: 页数
:param size_page: 展示条数
:param request:
:param api_project_id: api项目id
:param args:
:param kwargs:
:return:
"""
api_module = APIModule.objects.filter(api_project_id=api_project_id)
api_module_list = []
for api_modules in api_module:
api_module_dict = {
"id": api_modules.id,
"api_project_name": api_modules.api_project.api_project_name,
"api_module_name": api_modules.api_module_name,
"api_module_describe": api_modules.api_module_describe,
"updata_time": arrow.get(str(api_modules.updata_time)).format('YYYY-MM-DD HH:mm:ss'),
"create_time": arrow.get(str(api_modules.create_time)).format('YYYY-MM-DD HH:mm:ss'),
}
api_module_list.append(api_module_dict)
p = Paginator(api_module_list, size_page)
if int(page) > int(p.num_pages):
page1 = p.page(p.num_pages)
current_page = page1.object_list
else:
page1 = p.page(page)
current_page = page1.object_list
total_module = len(api_module_list)
if api_module is None:
return response_success()
else:
return response_success({'status': 200, 'data': current_page, 'total_module': total_module})
# -*- coding: utf-8 -*-
# @Time : 2022/7/20 10:30
# @Author : wangyinghao
# @Site :
# @File : api_scenarios_case_view.py
# @Software: PyCharm
from django.views.generic import View
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.models.api_automation.api_environment import APIEnvironment
from automated_main.utils.handle_db import HandleDB
from automated_main.utils.api_utils import RegularMatch, InterfaceRequest, ExtractParameters
from automated_main.models.api_automation.api_scenarios_case import ApiScenariosCaseAssociated, ApiScenariosCase
from automated_main.models.api_automation.api_interfaces_case import ApiInterfacesCase, \
ApiInterfacesParameterExtraction, ApiInterfacesParameterData
from automated_main.form.api_scenarios_case import ApiScenariosCaseForm
from automated_main.models.api_automation.api_management import ApiManagement
from datetime import datetime
import arrow
import ast
import json
import re
import logging
logger = logging.getLogger('django')
class ApiScenariosCaseView(View):
def get(self, request, api_management_id, *args, **kwargs):
"""
获取场景用例列表
:param request:
:param args:
:param api_management_id:
:param kwargs:
:return:
"""
api_scenarios_case_data = ApiScenariosCase.objects.filter(api_management_id=api_management_id)
api_scenarios_case_data_list = []
for api_scenarios_case in api_scenarios_case_data:
api_scenarios_case_data_dict = {
"id": api_scenarios_case.id,
"api_scenarios_case_name": api_scenarios_case.api_scenarios_case_name,
"api_scenarios_case_describe": api_scenarios_case.api_scenarios_case_describe,
"update_time": arrow.get(str(api_scenarios_case.update_time)).format('YYYY-MM-DD HH:mm:ss'),
"create_time": arrow.get(str(api_scenarios_case.create_time)).format('YYYY-MM-DD HH:mm:ss')
}
api_scenarios_case_data_list.append(api_scenarios_case_data_dict)
return response_success(api_scenarios_case_data_list)
def delete(self, request, scenarios_case_id, *args, **kwargs):
"""
删除场景测试用例
:param request:
:param args:
:param scenarios_case_id:
:param kwargs:
:return:
"""
ApiScenariosCase.objects.delete()
return response_success("删除API测试用例成功")
def put(self, request, *args, **kwargs):
"""
创建API场景测试
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiScenariosCaseForm(data)
if form.is_valid():
api_scenarios_case = ApiScenariosCase.objects.create(**form.cleaned_data)
api_scenarios_case_id = api_scenarios_case.id
for i in data["api_scenarios_case_data"]:
api_interfaces_case_id = (i['api_interfaces_case_id'])
case_steps = (i['case_steps'])
ApiScenariosCaseAssociated.objects.create(sid_id=api_scenarios_case_id,
interfaces_case_id=api_interfaces_case_id,
case_steps=case_steps
)
return response_success("创建API业务测试用例成功")
else:
raise MyException()
def post(self, request, scenarios_case_id, *args, **kwargs):
"""
编辑API场景测试
:param request:
:param args:
:param scenarios_case_id:
:param kwargs:
:return:
"""
api_scenarios_case = ApiScenariosCase.objects.get(id=scenarios_case_id)
if api_scenarios_case is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiScenariosCaseForm(data)
api_scenarios_case.api_scenarios_case_name = data['api_scenarios_case_name']
api_scenarios_case.api_scenarios_case_describe = data['api_scenarios_case_describe']
api_scenarios_case.api_management_id = data['api_management_id']
api_scenarios_case.update_time = datetime.now()
api_scenarios_case.save()
ApiScenariosCaseAssociated.objects.delete()
if form.is_valid():
for i in data["api_scenarios_case_data"]:
api_interfaces_case_id = (i['api_interfaces_case_id'])
case_steps = (i['case_steps'])
ApiScenariosCaseAssociated.objects.create(sid_id=api_scenarios_case.id,
interfaces_case_id=api_interfaces_case_id,
case_steps=case_steps)
return response_success("编辑API场景测试用例成功")
else:
raise MyException()
class ApiScenariosCaseGetView(View):
def get(self, request, scenarios_case_id, *args, **kwargs):
"""
获取场景用例-组合
:param request:
:param args:
:param scenarios_case_id:
:param kwargs:
:return:
"""
api_scenarios_case_associated_data = ApiScenariosCaseAssociated.objects.filter(sid_id=scenarios_case_id)
api_scenarios_case_associated_data_list = []
for api_scenarios_case_associated in api_scenarios_case_associated_data:
api_case = ApiInterfacesCase.objects.get(id=api_scenarios_case_associated.interfaces_case_id)
interfaces_name = ApiManagement.objects.get(id=api_case.api_management_id)
api_module_name = ApiManagement.objects.get(id=interfaces_name.parent)
api_service_name = ApiManagement.objects.get(id=api_module_name.parent)
api_scenarios_case_associated_dict = {
"case_steps": api_scenarios_case_associated.case_steps,
"api_scenarios_case_name": api_scenarios_case_associated.interfaces_case.api_interfaces_case_name,
"api_interfaces_case_id": api_scenarios_case_associated.interfaces_case.id,
"interfaces_name": interfaces_name.name,
"api_module_name": api_module_name.name,
"api_service_name": api_service_name.name,
}
api_scenarios_case_associated_data_list.append(api_scenarios_case_associated_dict)
return response_success(api_scenarios_case_associated_data_list)
class ApiScenariosCaseDebugView(View):
def post(self, request, *args, **kwargs):
"""
API场景测试DEBUG
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
for case in data['api_scenarios_case_data']:
interfaces_case = ApiInterfacesCase.objects.get(id=case['api_interfaces_case_id'])
api_environment = APIEnvironment.objects.get(id=data['environment'])
# 请求地址
api_url = api_environment.api_title + interfaces_case.api_url
api_url = RegularMatch(api_url, data['environment'], regular="new")
if api_url[0] == "该变量数据库未找到":
return response_failed("4000", str(api_url[0] + api_url[1]))
api_headers = RegularMatch(interfaces_case.api_headers, data['environment'], regular="new")
if api_headers[0] == "该变量数据库未找到":
return response_failed("4000", str(api_headers[0] + api_headers[1]))
json_header = api_headers.replace("\'", "\"")
if interfaces_case.api_headers == '':
header = interfaces_case.api_headers
else:
try:
header = json.loads(json_header)
except json.decoder.JSONDecodeError:
return response_failed('30000', "header类型错误,用例名称: " + str(interfaces_case.api_interfaces_case_name))
logger.info(json_header)
api_parameter_body = RegularMatch(interfaces_case.api_parameter_body, data['environment'], regular="new")
if api_parameter_body[0] == "该变量数据库未找到":
return response_failed('40000', "该变量数据库未找到,用例名称: " + str(
interfaces_case.api_interfaces_case_name) + "\n" + "变量值:" +
str(api_parameter_body[0] + api_parameter_body[1]))
json_par = api_parameter_body.replace("\'", "\"")
if api_parameter_body == '':
payload = api_parameter_body
logger.info(payload)
else:
try:
payload = json.loads(json_par)
logger.info(payload)
except json.decoder.JSONDecodeError:
return response_failed('30000', "参数类型错误,用例名称: " + str(interfaces_case.api_interfaces_case_name))
parameter_data = ApiInterfacesParameterData.objects.filter(api_test_case_id=case['api_interfaces_case_id'])
api_parameter_dict = {}
for api_parameters in parameter_data:
if api_parameters.api_must_parameter == "true":
api_parameter_dict.update(
{api_parameters.api_parameter_name: api_parameters.api_parameter_value})
else:
pass
if api_parameter_dict == {}:
api_parameter_data = {}
else:
api_parameter_data = RegularMatch(str(api_parameter_dict), data['environment'], regular="new")
if api_parameter_data == {}:
data_payload = api_parameter_data
logger.info(data_payload)
else:
try:
data_payload = ast.literal_eval(api_parameter_data)
except json.decoder.JSONDecodeError:
return response_failed("30000", "Parameter参数类型错误" + api_parameter_data)
r = InterfaceRequest(str(interfaces_case.api_method), str(interfaces_case.api_parameter_types),
api_url, header, payload, data_payload)
logger.info("请求地址: %s", r.url)
logger.info("请求json数据: %s", payload)
logger.info("请求parameter数据: %s", data_payload)
logger.info("请求头: %s", r.headers)
logger.info("请求响应: %s", r.text)
if r == "请求超时":
return response_failed('30000', "接口请求超时:" + str(interfaces_case.api_interfaces_case_name))
try:
response_message = r.json()
except json.decoder.JSONDecodeError:
return response_failed('30000', "接口返回结果类型错误非JSON格式:" + str(
interfaces_case.api_interfaces_case_name) + "\n" + str(r.text))
# 断言
if interfaces_case.api_assert_type == '':
result = "断言内容为空"
logger.error(result)
elif interfaces_case.api_assert_type == 1:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(interfaces_case.api_assert_text.split())
assert r_text_data in r_text_test
except Exception as e:
result = "断言失败: 响应内容中未包含 断言内容,断言内容: %s ,响应内容:%s" % (
interfaces_case.api_assert_text, r_text)
logger.error(result + str(e))
return response_failed('30000',
"断言失败,用例名称: " + str(interfaces_case.api_interfaces_case_name) + "\n" +
"断言结果:" + result)
elif interfaces_case.api_assert_type == 2:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(interfaces_case.api_assert_text.split())
assert r_text_data == r_text_test
except Exception as e:
result = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (
interfaces_case.api_assert_text, r_text)
logger.error(result, e)
return response_failed('30000',
"断言失败,用例名称: " + str(interfaces_case.api_interfaces_case_name) + "\n" +
"断言结果:" + result)
elif interfaces_case.api_assert_type == 3:
database = APIDatabase.objects.get(id=interfaces_case.dataBase_id)
db = HandleDB(database.api_host, int(database.api_port), database.user, database.password,
database.database)
sql = data['database_sql']
count = db.get_count(sql)
db.close()
if str(count) == str(interfaces_case.api_assert_text):
result = '断言成功'
logger.error(result)
else:
result = '断言失败,查询的数据是: %s' % str(count)
logger.error(result)
return response_failed('30000',
"断言失败,用例名称: " + str(interfaces_case.api_interfaces_case_name) + "\n" +
"断言结果:" + result)
elif interfaces_case.api_assert_type == 4:
try:
assert interfaces_case.api_assert_text == str(r.status_code)
result = "断言成功"
except Exception as e:
result = "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (interfaces_case.api_assert_text, str(r.status_code))
logger.error(result + str(e))
return response_failed('30000',
"断言失败,用例名称: " + str(interfaces_case.api_interfaces_case_name) + "\n" +
"断言结果:" + result)
parameter_extraction = ApiInterfacesParameterExtraction.objects.filter(
api_test_case_id=case['api_interfaces_case_id'])
if parameter_extraction == "" or None:
pass
else:
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
logger.info('json转换失败(变量):' + str(e))
pass
for parameter_extractions in parameter_extraction:
v_text = parameter_extractions.api_value_variable.split(".")
try:
api_results = ExtractParameters(v_text, api_result)
api_parameter_extraction = ApiInterfacesParameterExtraction.objects.get(
api_key_variable=str(parameter_extractions.api_key_variable))
api_parameter_extraction.api_variable_results = str(api_results)
api_parameter_extraction.save()
except Exception as e:
logger.info('参数提取失败' + str(e))
return response_failed('30000', "参数提取失败,用例名称:" +
str(interfaces_case.api_interfaces_case_name) + '\n' + "异常信息:" + str(e))
return response_success("调试成功")
# -*- coding: utf-8 -*-
# @Time : 2021/4/1 10:50
# @Author : wangyinghao
# @FileName: api_test_case_view.py
# @Software: PyCharm
from django.views.generic import View
import json
import re
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_test_case import ApiTestCase, ApiParameterExtraction, ApiParameterData
from automated_main.form.api_test_case import ApiTestCaseForm
from automated_main.models.api_automation.api_environment import APIEnvironment
from automated_main.models.api_automation.api_project import APIProject
from automated_main.models.api_automation.api_module import APIModule
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.utils.handle_db import HandleDB
from automated_main.utils.api_utils import RegularMatch, InterfaceRequest, ExtractParameters
from AutomatedTestPlatform import settings
from automated_main.utils.api_test_case_excel import HandleExcel
from django.http import FileResponse
import time
import random
import os
import logging
logger = logging.getLogger('django')
class ApiTestCaseView(View):
def delete(self, request, api_test_case_id, *args, **kwargs):
"""
代表删除API测试用例
:param request:
:param api_test_case_id:
:param args:
:param kwargs:
:return:
"""
ApiTestCase.objects.delete()
return response_success("删除API测试用例成功")
def put(self, request, *args, **kwargs):
"""
创建API测试用例
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiTestCaseForm(data)
if form.is_valid():
api_test_case = ApiTestCase.objects
api_test_case_id = api_test_case.id
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_key_variable'] == "":
pass
elif api_parameter_extraction['api_key_variable'] != "":
if ApiParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() > 0:
api_test_case.delete()
return response_failed("30000", "变量名称重复,请重新填写")
else:
ApiParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_parameter_extraction[
'api_variable_results'],
api_value_variable=api_parameter_extraction[
'api_value_variable'],
api_key_variable=api_parameter_extraction[
'api_key_variable'])
for api_parameter in data['api_parameter']:
ApiParameterData.objects.create(api_test_case_id=api_test_case_id,
api_parameter_name=api_parameter['api_parameter_name'],
api_must_parameter=api_parameter['api_must_parameter'],
api_parameter_value=api_parameter['api_parameter_value'],
api_field_describe=api_parameter['api_field_describe'])
return response_success("创建API测试用例成功")
else:
raise MyException()
def post(self, request, api_test_case_id, *args, **kwargs):
"""
编辑API测试用例
:param request:
:param args:
:param api_test_case_id:
:param kwargs:
:return:
"""
api_test_case = ApiTestCase.objects.filter(id=api_test_case_id).first()
if api_test_case is None:
return response_success()
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiTestCaseForm(data)
if form.is_valid():
if ApiParameterExtraction.objects.filter(api_test_case_id=api_test_case_id).count() > 0:
ApiParameterExtraction.objects.delete()
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_key_variable'] == "":
pass
elif ApiParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() > 0:
return response_failed("30000", "变量名称重复,请重新填写")
else:
ApiParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_parameter_extraction[
'api_variable_results'],
api_value_variable=api_parameter_extraction[
'api_value_variable'],
api_key_variable=api_parameter_extraction[
'api_key_variable'])
else:
logger.info("自定义变量表中 无 该 case_id")
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_key_variable'] == "":
logger.info("api_key_variable是空")
pass
elif api_parameter_extraction['api_key_variable'] != "":
if ApiParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() > 0:
return response_failed("30000", "变量名称重复,请重新填写")
else:
ApiParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_parameter_extraction[
'api_variable_results'],
api_value_variable=api_parameter_extraction[
'api_value_variable'],
api_key_variable=api_parameter_extraction[
'api_key_variable'])
if ApiParameterData.objects.filter(api_test_case_id=api_test_case_id).count() > 0:
ApiParameterData.objects.delete()
for api_parameter in data['api_parameter']:
ApiParameterData.objects.create(api_test_case_id=api_test_case_id,
api_parameter_name=api_parameter['api_parameter_name'],
api_must_parameter=api_parameter['api_must_parameter'],
api_parameter_value=api_parameter['api_parameter_value'],
api_field_describe=api_parameter['api_field_describe'])
else:
for api_parameter in data['api_parameter']:
ApiParameterData.objects.create(api_test_case_id=api_test_case_id,
api_parameter_name=api_parameter['api_parameter_name'],
api_must_parameter=api_parameter['api_must_parameter'],
api_parameter_value=api_parameter['api_parameter_value'],
api_field_describe=api_parameter['api_field_describe'])
ApiTestCase.objects.filter(id=api_test_case_id).update(**form.cleaned_data)
return response_success("修改API测试用例成功")
else:
raise MyException()
def get(self, request, api_test_case_id, *args, **kwargs):
"""
获取单独API测试用例
:param request:
:param args:
:param api_test_case_id:
:param kwargs:
:return:
"""
api_test_case = ApiTestCase.objects.filter(id=api_test_case_id).first()
api_parameter_extractions = ApiParameterExtraction.objects.filter(api_test_case_id=api_test_case_id)
api_parameter_extraction_list = []
api_parameter_data_list = []
api_parameter_data = ApiParameterData.objects.filter(api_test_case_id=api_test_case_id)
if api_parameter_data.count() > 0:
for api_parameter in api_parameter_data:
api_parameter_dict = {
"api_must_parameter": api_parameter.api_must_parameter,
"api_parameter_name": api_parameter.api_parameter_name,
"api_parameter_value": api_parameter.api_parameter_value,
"api_field_describe": api_parameter.api_field_describe
}
api_parameter_data_list.append(api_parameter_dict)
if api_parameter_extractions.count() > 0:
for api_parameter_extraction in api_parameter_extractions:
api_parameter_extraction_dict = {
"api_variable_results": api_parameter_extraction.api_variable_results,
"api_value_variable": api_parameter_extraction.api_value_variable,
"api_key_variable": api_parameter_extraction.api_key_variable
}
api_parameter_extraction_list.append(api_parameter_extraction_dict)
else:
api_parameter_extraction_dict = {
"api_variable_results": "",
"api_value_variable": "",
"api_key_variable": ""
}
api_parameter_extraction_list.append(api_parameter_extraction_dict)
if api_test_case is None:
return response_success()
else:
api_test_case_dict = model_to_dict(api_test_case)
api_test_case_dict['api_parameter_extraction'] = api_parameter_extraction_list
api_test_case_dict['api_parameter'] = api_parameter_data_list
return response_success(api_test_case_dict)
class ApiTestCaseDeBugView(View):
def post(self, request, *args, **kwargs):
"""
API测试用例调试
:param request:
:param args:
:param kwargs:
:return:
"""
data_payload = ''
api_result = ''
body = request.body
if not body:
return response_success()
data = json.loads(body)
# noinspection PyBroadException
try:
api_environment = APIEnvironment.objects.get(id=data['api_environment_id'])
except Exception:
return response_failed("30000", "请选择域名")
# 请求地址
api_url = api_environment.api_title + data['api_url']
api_url = RegularMatch(api_url, data['api_environment_id'])
if api_url[0] == "该变量数据库未找到":
return response_failed("4000", str(api_url[0] + api_url[1]))
api_headers = RegularMatch(data['api_headers'], data['api_environment_id'])
if api_headers[0] == "该变量数据库未找到":
return response_failed("4000", str(api_headers[0] + api_headers[1]))
json_header = api_headers.replace("\'", "\"")
if data['api_headers'] == '':
header = data['api_headers']
else:
try:
header = json.loads(json_header)
except json.decoder.JSONDecodeError:
return response_failed("30000", "header类型错误" + json_header)
logger.info(json_header)
# Params参数
api_parameter = {}
for api_parameters in data['api_parameter']:
if api_parameters['api_must_parameter'] == "true":
api_parameter.update({api_parameters['api_parameter_name']: api_parameters['api_parameter_value']})
else:
pass
api_parameter_data = RegularMatch(str(api_parameter), data['api_environment_id'])
if api_parameter_data[0] == "该变量数据库未找到":
return response_failed("4000", str(api_parameter_data[0] + api_parameter_data[1]))
data_parameter = api_parameter_data.replace("\'", "\"")
if data_parameter == '':
data_payload = api_parameter_data
logger.info(data_payload)
else:
try:
data_payload = json.loads(data_parameter)
logger.info(data_payload)
except json.decoder.JSONDecodeError:
return response_failed("30000", "参数类型错误" + data_payload)
api_parameter_body = RegularMatch(data['api_parameter_body'], data['api_environment_id'])
if api_parameter_body[0] == "该变量数据库未找到":
return response_failed("4000", str(api_parameter_body[0] + api_parameter_body[1]))
json_par = api_parameter_body.replace("\'", "\"")
if api_parameter_body == '':
payload = api_parameter_body
logger.info(payload)
else:
try:
payload = json.loads(json_par)
logger.info(payload)
except json.decoder.JSONDecodeError:
return response_failed("30000", "参数类型错误" + json_par)
r = InterfaceRequest(str(data['api_method']), str(data['api_parameter_types']), api_url, header, payload,
data_payload)
logger.info("测试用例名称: %s", data['api_test_case_name'])
logger.info("请求地址: %s", r.url)
logger.info("请求json数据: %s", payload)
logger.info("请求parameter数据: %s", data_payload)
logger.info("请求头: %s", r.headers)
logger.info("请求响应: %s", r.text)
logger.info(r.text)
if r == "请求超时":
return response_failed("30000", "接口超时", {'api_url': api_url,
"api_method": int(data['api_method']),
'api_header': header,
'api_body': payload,
'api_code': "请求超时",
'api_assert': "请求超时,断言失败",
'response_message': {"results": "请求超时"},
'response_time': 0,
'api_parameter_data': api_parameter})
# r.encoding = r.apparent_encoding
try:
response_message = r.json()
except json.decoder.JSONDecodeError:
return response_failed("30000", "接口返回结果类型错误非JSON格式", {'api_url': api_url,
"api_method": int(data['api_method']),
'api_header': header,
'api_body': payload,
'api_code': str(r.status_code),
'response_message': r.text,
'response_time': str(r.elapsed.total_seconds()),
'api_parameter_data': api_parameter})
# 断言
if data['api_assert_type'] == '':
result = "断言内容为空"
elif data['api_assert_type'] == 1:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"), r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(data['api_assert_text'].split())
assert r_text_data in r_text_test
result = "断言成功"
except Exception as e:
result = "断言失败: 响应内容中未包含 断言内容,断言内容: %s ,响应内容:%s" % (
data['api_assert_text'], r_text_test)
logger.error(result + str(e))
pass
elif data['api_assert_type'] == 2:
try:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"), r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(data['api_assert_text'].split())
assert r_text_data == r_text_test
result = "断言成功"
except Exception as e:
result = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (
data['api_assert_text'], r_text_test)
logger.error(result + str(e))
pass
elif data['api_assert_type'] == 3:
database = APIDatabase.objects.get(id=data['dataBase'])
db = HandleDB(database.api_host, int(database.api_port), database.user, database.password,
database.database)
sql = data['database_sql']
count = db.get_count(sql)
db.close()
if str(count) == str(data['api_assert_text']):
result = '断言成功'
else:
result = '断言失败,查询的数据是: %s' % str(count)
elif data['api_assert_type'] == 4:
try:
assert data['api_assert_text'] == str(r.status_code)
result = "断言成功"
except Exception as e:
result = "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (data['api_assert_text'], str(r.status_code))
logger.error(result + str(e))
pass
api_variable_results = []
for api_parameter_extraction in data['api_parameter_extraction']:
if api_parameter_extraction['api_value_variable'] == "":
api_parameter_extraction_dict = {
"api_variable_results": "无参数提取",
"api_value_variable": api_parameter_extraction['api_value_variable'],
"api_key_variable": api_parameter_extraction['api_key_variable'],
}
api_variable_results.append(api_parameter_extraction_dict)
elif ApiParameterExtraction.objects.filter(
api_key_variable=api_parameter_extraction['api_key_variable']).count() >= 0:
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
logger.info('json转换失败(变量):' + str(e))
pass
v_text = api_parameter_extraction['api_value_variable'].split(".")
logger.info('这是提取' + str(v_text))
try:
api_result = ExtractParameters(v_text, api_result)
api_parameter_extraction_dict = {
"api_value_variable": api_parameter_extraction['api_value_variable'],
"api_key_variable": api_parameter_extraction['api_key_variable'],
"api_variable_results": str(api_result)
}
api_variable_results.append(api_parameter_extraction_dict)
except Exception as e:
logger.info('参数提取异常' + str(e))
api_parameter_extraction_dict = {
"api_variable_results": api_parameter_extraction['api_key_variable'],
"api_value_variable": api_parameter_extraction['api_value_variable'],
"api_key_variable": api_parameter_extraction['api_key_variable'],
}
api_variable_results.append(api_parameter_extraction_dict)
logger.info(api_variable_results)
return response_success({'api_url': api_url,
'api_header': header,
'api_body': payload,
'api_code': str(r.status_code),
'api_assert': str(result),
"api_method": int(data['api_method']),
'response_message': response_message,
'response_time': str(r.elapsed.total_seconds()),
'api_parameter_extraction': api_variable_results,
'api_parameter_data': api_parameter})
class UploadTestCases(View):
def post(self, request, *args, **kwargs):
"""
上传API测试用例-excel
"""
file_obj = request.FILES.get("file")
name = file_obj.name
fn = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
fn = fn + '-%d' % random.randint(0, 100)
path = os.path.join(settings.API_TEST_CASE_ROOT, fn + '-' + name)
with open(path, "wb") as f_write:
for line in file_obj:
f_write.write(line)
return response_success({"file": 'http://' + request.get_host() + "/api/api_test_case_excel/" + fn + '-' + name,
"result": "OK",
"fileName": name,
"path": path})
def put(self, request, *args, **kwargs):
"""
解析excel并创建测试用例
:api_excel_name: 接口excel名称
:param request:
:param args:
:param kwargs:
:return:
"""
upload_api_test_case_total = ""
api_assert_type = ""
api_parameter_types = ""
api_method = ""
body = request.body
if not body:
return response_success()
data = json.loads(body)
dir_test_case = settings.API_TEST_CASE_ROOT
dir_list = os.listdir(dir_test_case)
upload_results_list = []
upload_api_test_case_id = []
for api_test_case_excel in dir_list:
if api_test_case_excel == str(data['upload_case_address_name']):
api_excel = HandleExcel(dir_test_case + api_test_case_excel, 'Sheet1')
api_cases = api_excel.read_all_datas()
upload_api_test_case_total = len(api_cases)
for i in range(len(api_cases)):
time.sleep(0.5)
# 获取项目id
try:
api_project_id = APIProject.objects.filter(
api_project_name=str(api_cases[i]['项目名称']).replace("\n", ""))
except Exception as e:
return response_failed(3001, message="excel格式错误,请重新上传")
# 获取模块id
api_module_id = APIModule.objects.filter(api_project_id=api_project_id[0].id,
api_module_name=str(api_cases[i]['模块']).replace("\n", ""))
if ApiTestCase.objects.filter(api_test_case_name=str(api_cases[i]['测试用例名称']).replace("\n", ""),
api_module_id=api_module_id[0].id).count() > 0:
upload_results_dict = {
"test_case_name": str(api_cases[i]['测试用例名称']).replace("\n", ""),
"api_module_name": str(api_cases[i]['模块']).replace("\n", ""),
"api_url": str(api_cases[i]['请求地址']).replace("\n", ""),
"api_project_name": str(api_cases[i]['项目名称']).replace("\n", ""),
"error_reason": "用例名称重复,请重新上传"
}
upload_results_list.append(upload_results_dict)
for test_case_id in upload_api_test_case_id:
ApiTestCase.objects.delete()
return response_failed(3000, {'status': 200, 'upload_results_list': upload_results_list,
'upload_api_test_case_total': upload_api_test_case_total,
'success_upload': '0', 'error_upload': len(upload_results_list),
"error_reason": "用例名称重复,请重新上传"})
# 获取域名id
api_environment_id = APIEnvironment.objects.filter(
api_title=(api_cases[i]['域名']).replace("\n", ""))
# 获取请求方式
if str(api_cases[i]['请求方式']).replace("\n", "").upper() == 'GET':
api_method = 1
elif str(api_cases[i]['请求方式']).replace("\n", "").upper() == 'POST':
api_method = 2
elif str(api_cases[i]['请求方式']).replace("\n", "").upper() == 'PUT':
api_method = 3
elif str(api_cases[i]['请求方式']).replace("\n", "").upper() == 'DELETE':
api_method = 4
# 获取body类型
# 1:form-data 2: json 3:x-www-form-urlencoded
if str(api_cases[i]['body类型']).replace("\n", "").upper() == 'FORM-DATA':
api_parameter_types = 1
elif str(api_cases[i]['body类型']).replace("\n", "").upper() == 'JSON':
api_parameter_types = 2
elif str(api_cases[i]['body类型']).replace("\n", "").upper() == 'X-WWW-FORM-URLENCODED':
api_parameter_types = 3
# 1:包含contains 2: 匹配MATHCHES 3:数据库校验SQL 4:CODE
if str(api_cases[i]['断言类型']).replace("\n", "").upper() == 'CONTAINS':
api_assert_type = 1
elif str(api_cases[i]['断言类型']).replace("\n", "").upper() == 'MATHCHES':
api_assert_type = 2
elif str(api_cases[i]['断言类型']).replace("\n", "").upper() == 'SQL':
api_assert_type = 3
elif str(api_cases[i]['断言类型']).replace("\n", "").upper() == 'CODE':
api_assert_type = 4
# 获取数据库
api_data_base_id = APIDatabase.objects.filter(
api_database_title=str(api_cases[i]['数据库环境']).replace("\n", ""))
if str(api_cases[i]['数据库环境']).replace("\n", "") == "None":
data_base_id = ""
else:
data_base_id = api_data_base_id[0].id
if str(api_cases[i]['数据库语句']).replace("\n", "") == "None":
database_sql = ""
else:
database_sql = str(api_cases[i]['数据库语句']).replace("\n", "")
# 获取body
if str(api_cases[i]['body']).replace("\n", "") == 'None':
api_parameter_body = {}
else:
api_parameter_body = str(api_cases[i]['body']).replace("\n", "")
# 获取请求头
if str(api_cases[i]['请求头']).replace("\n", "") == 'None':
header = {}
else:
header = str(api_cases[i]['请求头']).replace("\n", "")
# 获取断言内容
if str(api_cases[i]['断言内容']).replace("\n", "") == "None":
api_assert_text = ""
else:
api_assert_text = str(api_cases[i]['断言内容']).replace("\n", "")
# 创建测试用例
api_test_case = ApiTestCase.objects.create(
api_test_case_name=str(api_cases[i]['测试用例名称']).replace("\n", ""),
api_module_id=api_module_id[0].id,
api_documentation=api_cases[i]['接口文档地址'],
api_environment_id=api_environment_id[0].id,
api_method=api_method,
api_url=str(api_cases[i]['请求地址']).replace("\n", ""),
api_parameter_types=api_parameter_types,
api_headers=header,
api_parameter_body=api_parameter_body,
api_assert_type=api_assert_type,
dataBase_id=data_base_id,
database_sql=database_sql,
api_assert_text=api_assert_text
)
# 获取变量名称
variable_name = str(api_cases[i]['变量名称']).split(',')
# 获取变量提取表达式
api_value_variable = str(api_cases[i]['提取表达式']).split(',')
variable_name_list = []
for value in variable_name:
if ApiParameterExtraction.objects.filter(api_key_variable=value).count() > 0:
variable_name_dict = {
"api_key_variable": "变量名称重复" + str(value)
}
variable_name_list.append(variable_name_dict)
upload_results_dict = {
"test_case_name": str(api_cases[i]['测试用例名称']).replace("\n", ""),
"api_module_name": str(api_cases[i]['模块']).replace("\n", ""),
"api_url": str(api_cases[i]['请求地址']).replace("\n", ""),
"api_project_name": str(api_cases[i]['项目名称']).replace("\n", ""),
"error_reason": "参数变量重复,请重新上传"
}
upload_results_list.append(upload_results_dict)
for test_case_id in upload_api_test_case_id:
ApiTestCase.objects.delete()
return response_failed(3000, {'status': 200, 'upload_results_list': upload_results_list,
'upload_api_test_case_total': upload_api_test_case_total,
'success_upload': '0',
'error_upload': len(upload_results_list),
"error_reason": "参数变量重复,请重新上传"})
elif value == "None":
variable_name_list = []
else:
variable_name_dict = {
"api_key_variable": str(value)
}
variable_name_list.append(variable_name_dict)
for v in range(len(variable_name_list)):
if len(variable_name_list) == 0:
pass
else:
variable_name_list[v]['api_value_variable'] = api_value_variable[v]
ApiParameterExtraction.objects.create(
api_key_variable=variable_name_list[v]['api_key_variable'],
api_value_variable=api_value_variable[v], api_test_case_id=api_test_case.id)
# data参数 创建
if str(api_cases[i]['Params']).replace("\n", "") == "None":
ApiParameterData.objects.create(api_must_parameter="false",
api_test_case_id=api_test_case.id)
else:
params = json.loads(str(api_cases[i]['Params']).replace("\n", ""))
for key in params:
ApiParameterData.objects.create(api_parameter_name=key, api_must_parameter="true",
api_parameter_value=params[key],
api_test_case_id=api_test_case.id)
upload_api_test_case_id.append({'api_test_case_id': api_test_case.id})
success_upload = upload_api_test_case_total - len(upload_results_list)
return response_success({'status': 200, 'upload_results_list': upload_results_list,
'upload_api_test_case_total': upload_api_test_case_total,
'success_upload': success_upload, 'error_upload': len(upload_results_list)})
class DownloadApiCaseTemplate(View):
def get(self, request, *args, **kwargs):
"""
下载API用例模板
:param request:
:param args:
:param kwargs:
:return:
"""
logger.info("API测试用例地址:" + settings.API_TEMPLATE)
file = open(settings.API_TEMPLATE, 'rb')
response = FileResponse(file)
response['Content-Type'] = 'application/octet-stream'
response['Content-Disposition'] = 'attachment;' + ' filename=' + "ApiTestCaseTemplate.xlsx"
return response
# -*- coding: utf-8 -*-
# @Time : 2022/7/21 14:46
# @Author : wangyinghao
# @Site :
# @File : api_test_plan_view.py
# @Software: PyCharm
from django.views.generic import View
import json
from datetime import datetime
from django.forms import model_to_dict
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.models.api_automation.api_test_plan import ApiTestPlan, APITestPlanResult, APITestPlanResultAssociated
from automated_main.models.api_automation.api_scenarios_case import ApiScenariosCase
from automated_main.form.api_test_plan import ApiTestPlanForm
from automated_main.models.api_automation.api_management import ApiManagement
from automated_main.view.api_automation.api_test_plan.extend.test_plan_thread import TaskThread
from django.core.paginator import Paginator
import arrow
import logging
logger = logging.getLogger('django')
class APITestPlanSuspended(View):
def post(self, request, *args, **kwargs):
"""
暂停接口计划
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
api_task = ApiTestPlan.objects.get(id=data["api_test_plan_id"])
api_task.status = 2
api_task.save()
return response_success("暂停API测试计划成功")
class ApiTestPlanView(View):
def get(self, request, api_test_plan_id, *args, **kwargs):
"""
获取单个API测试计划
:param request:
:param api_test_plan_id: API测试计划ID
:param args:
:param kwargs:
:return:
"""
api_test_plan = ApiTestPlan.objects.get(id=api_test_plan_id)
return response_success(model_to_dict(api_test_plan))
def delete(self, request, api_test_plan_id, *args, **kwargs):
"""
删除API测试计划
:param request:
:param api_test_plan_id:任务ID
:param args:
:param kwargs:
:return:
"""
ApiTestPlan.objects.delete()
return response_success("删除API测试计划成功")
def put(self, request, *args, **kwargs):
"""
创建API测试任务
:param request:
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiTestPlanForm(data)
if form.is_valid():
if data['api_test_plan_id'] == 0:
ApiTestPlan.objects.create(**form.cleaned_data)
return response_success("创建API测试任务成功")
else:
raise MyException()
def post(self, request, api_test_plan_id, *args, **kwargs):
"""
更改API测试计划
:param request:
:param api_test_plan_id: API测试任务ID
:param args:
:param kwargs:
:return:
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
form = ApiTestPlanForm(data)
if form.is_valid():
api_test_plan = ApiTestPlan.objects.get(id=api_test_plan_id)
api_test_plan.api_test_plan_name = data['api_test_plan_name']
api_test_plan.api_test_plan_describe = data['api_test_plan_describe']
api_test_plan.cases = data['cases']
api_test_plan.api_management_id = data['api_management_id']
api_test_plan.api_send_email = data['api_send_email']
api_test_plan.api_send_enterprise_wechat = data['api_send_enterprise_wechat']
api_test_plan.api_environment_id = data['api_environment_id']
api_test_plan.database_id = data['database_id']
api_test_plan.time_interval_seconds = data['time_interval_seconds']
api_test_plan.time_interval_hours = data['time_interval_hours']
api_test_plan.time_interval_minutes = data['time_interval_minutes']
api_test_plan.time_interval_day = data['time_interval_day']
api_test_plan.end_time = data['end_time']
api_test_plan.starting_time = data['starting_time']
api_test_plan.timing_task_status = data['timing_task_status']
api_test_plan.update_time = datetime.now()
api_test_plan.save()
return response_success("编辑API测试计划成功")
else:
raise MyException()
class GetScenariosCaseTree(View):
def get(self, request, api_management_id, *args, **kwargs):
"""
获取Api测试用例树形结构
:param request:
:param api_management_id:
:param args:
:param kwargs:
:return:
"""
api_management_data = ApiManagement.objects.filter(id=api_management_id)
data_list = []
for api_management in api_management_data:
api_management_dict = {
"api_project_name": api_management.name,
"isParent": True
}
api_scenarios_case_list = []
api_scenarios_case_data = ApiScenariosCase.objects.filter(api_management_id=api_management_id)
for api_scenarios_case in api_scenarios_case_data:
api_case_dict = {
"api_scenarios_case_name": api_scenarios_case.api_scenarios_case_name,
"isParent": False,
"api_scenarios_case_id": api_scenarios_case.id
}
api_scenarios_case_list.append(api_case_dict)
api_management_dict["children"] = api_scenarios_case_list
data_list.append(api_management_dict)
return response_success(data_list)
class PerformApiPlan(View):
def post(self, request, api_test_plan_id, *args, **kwargs):
"""
执行当前API测试计划
:param request:
:param api_test_plan_id:
:param args:
:param kwargs:
:return:
"""
if api_test_plan_id == "":
return response_failed({"status": 10200, "message": "api_test_plan_id is null"})
# 1.在执行线程之前,判断当前任务是否在执行
api_tasks = ApiTestPlan.objects.get(id=api_test_plan_id)
if api_tasks.status == 1:
return response_failed({"status": 10200, "message": "当前该测试任务正在执行!"})
else:
# 2. 修改任务的状态为:1-执行中
api_tasks = ApiTestPlan.objects.get(id=api_test_plan_id)
api_tasks.status = 1
api_tasks.save()
# 通过多线程运行测试任务
TaskThread(api_test_plan_id).run()
return response_success({"status": 10200, "message": "任务开始执行!"})
def get(self, request, *args, **kwargs):
"""
执行所有API测试计划
:param request:
:param args:
:param kwargs:
:return:
"""
api_test_plan = ApiTestPlan.objects.filter(timing_task_status="true")
for plan in api_test_plan:
# 1.在执行线程之前,判断当前有没有任务在执行
api_tasks = ApiTestPlan.objects.get(id=plan.id)
if api_tasks.status == 1:
logger.info("该任务正在执行中:" + plan.api_test_task_name)
continue
else:
# 2. 修改任务的状态为:1-执行中
api_plan = ApiTestPlan.objects.get(id=plan.id)
api_plan.status = 1
api_plan.save()
# 通过多线程运行测试任务
TaskThread(plan.id).run()
return response_success({"status": 10200, "message": "测试计划开始执行!"})
class CheckApiPlanResultList(View):
def get(self, request, api_test_plan_id, size_page, page, *args, **kwargs):
"""
查看API测试计划报告列表
:param page:
:param size_page:
:param request:
:param api_test_plan_id:
:param args:
:param kwargs:
:return:
"""
if api_test_plan_id == "":
return response_failed({"status": 10102, "message": "api_test_task_id不能为空"})
result_data = APITestPlanResult.objects.filter(api_test_plan_id=api_test_plan_id).order_by('-create_time')
data = []
for i in result_data:
result = {
"id": i.id,
"api_test_result_name": i.api_test_result_name,
"create_time": i.create_time,
"api_error_total_number": i.api_error_total_number,
"api_successful_total_number": i.api_successful_total_number,
"api_total_number": i.api_total_number
}
data.append(result)
api_result_total = len(data)
p = Paginator(data, size_page)
if int(page) > int(p.num_pages):
page1 = p.page(p.num_pages)
current_page = page1.object_list
else:
page1 = p.page(page)
current_page = page1.object_list
return response_success({'status': 10102, 'data': current_page, 'api_result_total': api_result_total})
class CheckApiResult(View):
def get(self, request, api_test_result_id, size_page, page, *args, **kwargs):
"""
查看任务--测试报告列表--测试结果列表
:param size_page: 展示条数
:param page: 页数
:param request:
:param api_test_result_id:
:param args:
:param kwargs:
:return:
"""
if api_test_result_id == "":
return response_failed({"status": 10102, "message": "api_test_task_id不能为空"})
r = APITestPlanResultAssociated.objects.filter(api_result_id=api_test_result_id)
data = []
for i in r:
result = {
"id": i.id,
"api_test_case_name": i.api_test_case_name,
"api_task_name": i.api_test_plan.api_test_plan_name,
"api_business_test_name": i.api_business_test_name,
"api_error": i.api_error,
"api_successful": i.api_successful,
"abnormal": i.abnormal,
"json_extract_variable_conversion": i.json_extract_variable_conversion,
"api_assertion_results": i.api_assertion_results,
"api_request_results": i.api_request_results,
"api_result_id": i.api_result_id,
"api_task_id": i.api_test_plan_id,
"create_time": arrow.get(str(i.create_time)).format('YYYY-MM-DD HH:mm:ss'),
}
data.append(result)
p = Paginator(data, size_page)
if int(page) > int(p.num_pages):
page1 = p.page(p.num_pages)
current_page = page1.object_list
else:
page1 = p.page(page)
current_page = page1.object_list
api_result = APITestPlanResult.objects.get(id=api_test_result_id)
case_result_total = [int(api_result.api_successful_total_number), int(api_result.api_error_total_number)]
return response_success({'status': 10102, 'data': current_page, "case_result_total": case_result_total})
def post(self, request, api_test_case_result_id, *args, **kwargs):
"""
查看任务--测试报告列表--测试结果列表--单独测试用例报告
:param request:
:param api_test_case_result_id:API测试结果关联表的ID
:param args:
:param kwargs:
:return:
"""
if api_test_case_result_id == "":
return response_failed({"status": 10102, "message": "api_test_case_result_id不能为空"})
r = APITestPlanResultAssociated.objects.filter(id=api_test_case_result_id)
data = []
for i in r:
result = {
"id": i.id,
"api_test_case_name": i.api_test_case_name,
"api_task_name": i.api_test_plan.api_test_plan_name,
"api_business_test_name": i.api_business_test_name,
"api_error": i.api_error,
"api_successful": i.api_successful,
"abnormal": i.abnormal,
"json_extract_variable_conversion": i.json_extract_variable_conversion,
"api_assertion_results": i.api_assertion_results,
"api_request_results": i.api_request_results,
"api_result_id": i.api_result_id,
"api_task_id": i.api_test_plan_id,
"create_time": i.create_time,
"api_variable_results": i.api_variable_results,
"api_header": i.api_header,
"api_url": i.api_url,
"api_body": i.api_body
}
data.append(result)
return response_success({'status': 10102, 'data': data})
def delete(self, request, api_test_result_id, *args, **kwargs):
"""
查看任务--测试报告列表--删除测试报告
:param request:
:param api_test_result_id:API测试结果关联表的ID
:param args:
:param kwargs:
:return:
"""
if api_test_result_id == "":
return response_failed({"status": 10102, "message": "api_test_result_id不能为空"})
APITestPlanResult.objects.delete()
return response_success("删除测试报告成功")
class CheckApiResultErrorList(View):
def get(self, request, api_test_result_id, size_page, page, *args, **kwargs):
"""
查看任务--测试报告列表--测试结果列表
:param size_page: 展示条数
:param page: 页数
:param request:
:param api_test_result_id:
:param args:
:param kwargs:
:return:
"""
if api_test_result_id == "":
return response_failed({"status": 10102, "message": "api_test_task_id不能为空"})
r = APITestPlanResultAssociated.objects.filter(api_result_id=api_test_result_id, api_error=1)
data = []
for i in r:
result = {
"id": i.id,
"api_test_case_name": i.api_test_case_name,
"api_task_name": i.api_test_plan.api_test_plan_name,
"api_business_test_name": i.api_business_test_name,
"api_error": i.api_error,
"api_successful": i.api_successful,
"abnormal": i.abnormal,
"json_extract_variable_conversion": i.json_extract_variable_conversion,
"api_assertion_results": i.api_assertion_results,
"api_request_results": i.api_request_results,
"api_result_id": i.api_result_id,
"api_task_id": i.api_test_plan_id,
"create_time": arrow.get(str(i.create_time)).format('YYYY-MM-DD HH:mm:ss')
}
data.append(result)
p = Paginator(data, size_page)
if int(page) > int(p.num_pages):
page1 = p.page(p.num_pages)
current_page = page1.object_list
else:
page1 = p.page(page)
current_page = page1.object_list
api_result = APITestPlanResult.objects.get(id=api_test_result_id)
case_result_total = [int(api_result.api_successful_total_number), int(api_result.api_error_total_number)]
return response_success({'status': 10102, 'data': current_page, "case_result_total": case_result_total})
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/11/25 19:56
# @Author : wangyinghao
# @Site :
# @File : run_test_plan_task.py
# @Software: PyCharm
import django
import json
import os
import re
import sys
from os.path import dirname, abspath
BASE_DIR = dirname(dirname(dirname(abspath(__file__))))
BASE_PATH = BASE_DIR.replace("\\", "/")
sys.path.append(BASE_PATH)
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
sys.path.extend(['/home/AutomatedTestPlatform'])
# project_name 项目名称
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "AutomatedTestPlatform.settings")
django.setup()
from automated_main.models.api_automation.api_interfaces_case import ApiInterfacesParameterExtraction
from automated_main.models.api_automation.api_test_plan import APITestPlanResultAssociated
from automated_main.utils.handle_db import HandleDB
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.utils.api_utils import RegularMatch
from automated_main.utils.api_utils import InterfaceRequest
import logging
import ast
logger = logging.getLogger('django')
logger.info("运行测试文件:" + BASE_PATH)
# 定义扩展的目录
EXTEND_DIR = BASE_PATH + "/api_test_plan/extend/"
logger.info("地址信息" + EXTEND_DIR)
def new_test_run_cases(case_id):
for case in case_id:
api_test_case_id = (case['api_test_case_id'])
api_test_case_name = (case['api_test_case_name'])
api_url = (case['api_url'])
api_method = (case['api_method'])
api_parameter_types = (case['api_parameter_types'])
api_headers = (case['api_headers'])
api_parameter_body = (case['api_parameter_body'])
api_assert_type = (case['api_assert_type'])
api_parameter_extraction = (case['api_parameter_extraction'])
parameter_data = (case['parameter_data'])
api_assert_text = (case['api_assert_text'])
api_result_id = (case['api_result_id'])
api_task_id = (case['api_task_id'])
database_id = (case['dataBase_id'])
database_sql = (case['database_sql'])
api_business_test_name = (case['api_scenarios_case_name'])
api_environment_id = (case['api_environment_id'])
api_assertion_results = ""
api_error = ""
api_successful = ""
api_variable_results = ""
abnormal = ""
json_loads = ""
try:
api_url = RegularMatch(api_url, api_environment_id, "1", regular="new")
# if "${" in api_url and "}" in api_url:
# key = re.findall(r"\${(.+?)}", api_url)
# for a in range(len(key)):
# key1 = "${" + key[a] + "}"
#
# api_global_variable = APIGlobalVariable.objects.filter(api_global_variable_name=key1,
# api_environment_id=api_environment_id)
# if api_global_variable.count() > 0:
# variable = api_global_variable[0].api_global_variable_value
# api_url = api_url.replace(key1, variable)
# else:
# value_variable = ApiParameterExtraction.objects.filter(api_key_variable=key1)
# variable = value_variable[0].api_variable_results
# api_url = api_url.replace(key1, variable)
if api_headers == "{}" or "":
header_dict = {}
else:
api_headers = RegularMatch(api_headers, api_environment_id, "1", regular="new")
# if "${" in api_headers and "}" in api_headers:
# key = re.findall(r"\${(.+?)}", api_headers)
# for b in range(len(key)):
# key1 = "${" + key[b] + "}"
#
# api_global_variable = APIGlobalVariable.objects.filter(api_global_variable_name=key1,
# api_environment_id=api_environment_id)
# if api_global_variable.count() > 0:
# variable = api_global_variable[0].api_global_variable_value
# api_headers = api_headers.replace(key1, variable)
# else:
# value_variable = ApiParameterExtraction.objects.filter(api_key_variable=key1)
# variable = value_variable[0].api_variable_results
# api_headers = api_headers.replace(key1, variable)
header = api_headers.replace("\'", "\"")
header_dict = json.loads(header)
if api_parameter_body == "{}" or "":
api_parameter_body_dict = {}
else:
api_parameter_body = RegularMatch(api_parameter_body, api_environment_id, "1", regular="new")
# if "${" in api_parameter_body and "}" in api_parameter_body:
# key = re.findall(r"\${(.+?)}", api_parameter_body)
# for b in range(len(key)):
# key1 = "${" + key[b] + "}"
#
# api_global_variable = APIGlobalVariable.objects.filter(api_global_variable_name=key1,
# api_environment_id=api_environment_id)
# if api_global_variable.count() > 0:
# variable = api_global_variable[0].api_global_variable_value
# api_parameter_body = api_parameter_body.replace(key1, variable)
# else:
#
# value_variable = ApiParameterExtraction.objects.filter(api_key_variable=key1)
# variable = value_variable[0].api_variable_results
# api_parameter_body = api_parameter_body.replace(key1, variable)
api_parameter = api_parameter_body.replace("\'", "\"")
api_parameter_body_dict = json.loads(api_parameter)
if parameter_data == "{}" or "":
data_payload = {}
else:
parameter_data_task = RegularMatch(str(parameter_data), api_environment_id, "1", regular="new")
data_payload = ast.literal_eval(parameter_data_task)
# if "${" in parameter_data and "}" in parameter_data:
# key = re.findall(r"\${(.+?)}", parameter_data)
# for b in range(len(key)):
# key1 = "${" + key[b] + "}"
#
# api_global_variable = APIGlobalVariable.objects.filter(api_global_variable_name=key1,
# api_environment_id=api_environment_id)
# if api_global_variable.count() > 0:
# variable = api_global_variable[0].api_global_variable_value
# parameter_data = parameter_data.replace(key1, variable)
# else:
#
# value_variable = ApiParameterExtraction.objects.filter(api_key_variable=key1)
# variable = value_variable[0].api_variable_results
# parameter_data = parameter_data.replace(key1, variable)
r = InterfaceRequest(str(api_method), str(api_parameter_types), api_url, header_dict,
api_parameter_body_dict, data_payload)
logger.info("测试用例名称: %s", api_test_case_name)
logger.info("请求地址: %s", r.url)
logger.info("请求json数据: %s", api_parameter_body_dict)
logger.info("请求parameter数据: %s", data_payload)
logger.info("请求头: %s", r.headers)
logger.info("请求响应: %s", r.text)
if r == "请求超时":
api_error = 1
api_successful = 0
APITestPlanResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results={"results": "请求超时,断言失败"},
api_variable_results={"results": "请求超时无法提取变量"},
api_request_results={"results": "请求超时"},
api_result_id=api_result_id,
api_test_plan_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=api_url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
else:
# r.encoding = r.apparent_encoding
if api_assert_type == '':
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
elif api_assert_type == 1:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(api_assert_text.split())
try:
assert r_text_data in r_text_test
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
except Exception as e:
logger.error(
"断言失败,用例名称是:" + api_test_case_name + "断言内容 %s 与响应结果不匹配,%s" % (
api_assert_text, r_text) + str(e))
api_assertion_results = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (
api_assert_text, r_text)
api_error = 1
api_successful = 0
pass
elif api_assert_type == 2:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(api_assert_text.split())
try:
assert r_text_data == r_text_test
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
except Exception as e:
logger.error(
"断言失败,用例名称是:" + api_test_case_name + "断言内容 %s 与响应结果不匹配,%s" % (
api_assert_text, r_text) + str(e))
api_assertion_results = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (
api_assert_text, r_text)
api_error = 1
api_successful = 0
pass
elif api_assert_type == 3:
database = APIDatabase.objects.get(id=database_id)
db = HandleDB(database.api_host, int(database.api_port), database.user, database.password,
database.database)
count = db.get_count(database_sql)
db.close()
if str(count) == str(api_assert_text):
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
else:
logger.error("断言失败,用例名称是:" + api_test_case_name + '断言失败,查询的数据是: %s' % str(count))
api_assertion_results = '断言失败,查询的数据是: %s' % str(count)
api_error = 1
api_successful = 0
pass
elif api_assert_type == 4:
try:
assert str(api_assert_text) == str(r.status_code)
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
except Exception as e:
logger.error("断言失败,用例名称是:" + api_test_case_name + "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (
api_assert_text, str(r.status_code)) + str(e))
api_assertion_results = "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (api_assert_text, str(r.status_code))
api_error = 1
api_successful = 0
pass
if ApiInterfacesParameterExtraction.objects.filter(api_test_case_id=api_test_case_id).count() > 0:
ApiInterfacesParameterExtraction.objects.delete()
for api_parameter_extractions in api_parameter_extraction:
if api_parameter_extractions['api_value_variable'] == "":
pass
else:
v_text = api_parameter_extractions['api_value_variable'].split(".")
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
json_loads = "提取变量json.loads(r.text) 失败:" + str(e)
logger.error("提取变量json.loads(r.text) 失败:" + str(e))
pass
try:
for a in v_text:
if "[" in a and "]" in a:
variable_1 = a.split('[')[0]
variable_2 = a.split('[')[1].split(']')[0]
if variable_1 == "":
api_result = api_result[0]
else:
api_result = api_result[variable_1][int(variable_2)]
else:
if a == "0":
api_result = api_result[int(a)]
else:
api_result = api_result[a]
api_variable_results = str(api_result)
except Exception as e:
api_variable_results = api_parameter_extractions['api_key_variable']
pass
logger.info("自定义变量表中有该case_id")
ApiInterfacesParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_variable_results,
api_value_variable=
api_parameter_extractions[
'api_value_variable'],
api_key_variable=api_parameter_extractions[
'api_key_variable'])
else:
logger.info("自定义变量表中 无 该 case_id")
for api_parameter_extractions in api_parameter_extraction:
if api_parameter_extractions['api_key_variable'] == "":
pass
else:
v_text = api_parameter_extractions['api_value_variable'].split(".")
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
json_loads = "提取变量json.loads(r.text) 失败:" + str(e)
logger.error("提取变量json.loads(r.text) 失败:" + str(e))
pass
try:
for a in v_text:
if "[" in a and "]" in a:
variable_1 = a.split('[')[0]
variable_2 = a.split('[')[1].split(']')[0]
api_result = api_result[variable_1][int(variable_2)]
else:
api_result = api_result[a]
api_variable_results = str(api_result)
except Exception as e:
logger.info(str(api_test_case_name) + " 参数提取失败" + api_parameter_extractions[
'api_key_variable'] + str(e))
api_variable_results = api_parameter_extractions['api_key_variable']
logger.error(api_variable_results)
pass
if ApiInterfacesParameterExtraction.objects.filter(
api_key_variable=api_parameter_extractions['api_key_variable']).count() > 0:
api_parameter_extraction_list = (
str(api_parameter_extractions['api_key_variable']) + "变量名称重复,请重新填写")
else:
ApiInterfacesParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=
api_parameter_extractions[
'api_variable_results'],
api_value_variable=api_variable_results,
api_key_variable=
api_parameter_extractions[
'api_key_variable'])
api_parameter_extraction = ApiInterfacesParameterExtraction.objects.filter(
api_test_case_id=api_test_case_id)
api_parameter_extraction_list = []
if api_parameter_extraction.count() > 0:
for api_parameter_extractions in api_parameter_extraction:
api_parameter_extraction_dict = {
"api_key_variable": api_parameter_extractions.api_key_variable,
"api_variable_results": api_parameter_extractions.api_variable_results
}
api_parameter_extraction_list.append(api_parameter_extraction_dict)
APITestPlanResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results=str(api_parameter_extraction_list),
api_request_results=r.json(),
api_result_id=api_result_id,
api_test_plan_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=r.url,
api_body=api_parameter_body_dict,
api_header=header_dict
)
except Exception as e:
abnormal = str(e)
if api_assertion_results == "断言成功":
api_error = 0
api_successful = 1
APITestPlanResultAssociated.objects.create(api_test_case_name=api_test_case_name,
api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results=str(api_parameter_extraction_list),
api_request_results=r.text.encode().decode("unicode_escape"),
api_result_id=api_result_id,
api_test_plan_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=r.url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
else:
api_error = 1
api_successful = 0
APITestPlanResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results=str(api_parameter_extraction_list),
api_request_results=r.text.encode().decode("unicode_escape"),
api_result_id=api_result_id,
api_test_plan_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=api_url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
continue
# -*- coding: utf-8 -*-
# @Time : 2021/11/25 19:56
# @Author : wangyinghao
# @Site :
# @File : new_run_task.py
# @Software: PyCharm
import django
import json
import os
import re
import sys
from os.path import dirname, abspath
BASE_DIR = dirname(dirname(dirname(abspath(__file__))))
BASE_PATH = BASE_DIR.replace("\\", "/")
sys.path.append(BASE_PATH)
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
sys.path.extend(['/home/AutomatedTestPlatform'])
# project_name 项目名称
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "AutomatedTestPlatform.settings")
django.setup()
from automated_main.models.api_automation.api_test_case import ApiParameterExtraction
from automated_main.models.api_automation.api_test_task import APITestResultAssociated
from automated_main.utils.handle_db import HandleDB
from automated_main.models.api_automation.api_database import APIDatabase
from automated_main.utils.api_utils import RegularMatch
from automated_main.utils.api_utils import InterfaceRequest
import logging
import ast
logger = logging.getLogger('django')
logger.info("运行测试文件:" + BASE_PATH)
# 定义扩展的目录
EXTEND_DIR = BASE_PATH + "/api_test_task/extend/"
logger.info("地址信息" + EXTEND_DIR)
def new_test_run_cases(case_id):
for case in case_id:
api_test_case_id = (case['api_test_case_id'])
api_test_case_name = (case['api_test_case_name'])
api_url = (case['api_url'])
api_method = (case['api_method'])
api_parameter_types = (case['api_parameter_types'])
api_headers = (case['api_headers'])
api_parameter_body = (case['api_parameter_body'])
api_assert_type = (case['api_assert_type'])
api_parameter_extraction = (case['api_parameter_extraction'])
parameter_data = (case['parameter_data'])
api_assert_text = (case['api_assert_text'])
api_result_id = (case['api_result_id'])
api_task_id = (case['api_task_id'])
database_id = (case['dataBase_id'])
database_sql = (case['database_sql'])
api_business_test_name = (case['api_business_test_name'])
api_environment_id = (case['api_environment_id'])
api_assertion_results = ""
api_error = ""
api_successful = ""
api_variable_results = ""
abnormal = ""
json_loads = ""
try:
api_url = RegularMatch(api_url, api_environment_id, "1")
if api_headers == "{}" or "":
header_dict = {}
else:
api_headers = RegularMatch(api_headers, api_environment_id, "1")
header = api_headers.replace("\'", "\"")
header_dict = json.loads(header)
if api_parameter_body == "{}" or "":
api_parameter_body_dict = {}
else:
api_parameter_body = RegularMatch(api_parameter_body, api_environment_id, "1")
api_parameter = api_parameter_body.replace("\'", "\"")
api_parameter_body_dict = json.loads(api_parameter)
if parameter_data == "{}" or "":
data_payload = {}
else:
parameter_data_task = RegularMatch(str(parameter_data), api_environment_id, "1")
data_payload = ast.literal_eval(parameter_data_task)
r = InterfaceRequest(str(api_method), str(api_parameter_types), api_url, header_dict,
api_parameter_body_dict, data_payload)
if r == "请求超时":
api_error = 1
api_successful = 0
APITestResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results={"results": "请求超时,断言失败"},
api_variable_results={"results": "请求超时无法提取变量"},
api_request_results={"results": "请求超时"},
api_result_id=api_result_id,
api_task_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=api_url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
else:
if api_assert_type == '':
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
elif api_assert_type == 1:
try:
r_text = re.sub(r'(\\u[\s\S]{4})',
lambda x: x.group(1).encode("utf-8").decode("unicode-escape"), r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(api_assert_text.split())
assert r_text_data in r_text_test
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
except Exception as e:
logger.error(
"断言失败,用例名称是:" + api_test_case_name + "断言内容 %s 与响应结果不匹配,%s" % (
api_assert_text, r_text) + str(e))
api_assertion_results = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (api_assert_text, r_text)
api_error = 1
api_successful = 0
pass
elif api_assert_type == 2:
r_text = re.sub(r'(\\u[\s\S]{4})', lambda x: x.group(1).encode("utf-8").decode("unicode-escape"),
r.text)
r_text_test = "".join(r_text.split())
r_text_data = "".join(api_assert_text.split())
try:
assert r_text_data == r_text_test
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
except Exception as e:
logger.error(
"断言失败,用例名称是:" + api_test_case_name + "断言内容 %s 与响应结果不匹配,%s" % (
api_assert_text, r_text) + str(e))
api_assertion_results = "断言失败: 断言内容 %s 与响应结果不匹配,%s" % (api_assert_text, r_text)
api_error = 1
api_successful = 0
pass
elif api_assert_type == 3:
database = APIDatabase.objects.get(id=database_id)
db = HandleDB(database.api_host, int(database.api_port), database.user, database.password,
database.database)
count = db.get_count(database_sql)
db.close()
if str(count) == str(api_assert_text):
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
else:
logger.error("断言失败,用例名称是:" + api_test_case_name + '断言失败,查询的数据是: %s' % str(count))
api_assertion_results = '断言失败,查询的数据是: %s' % str(count)
api_error = 1
api_successful = 0
pass
elif api_assert_type == 4:
try:
assert str(api_assert_text) == str(r.status_code)
api_error = 0
api_successful = 1
api_assertion_results = "断言成功"
except Exception as e:
logger.error("断言失败,用例名称是:" + api_test_case_name + "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (
api_assert_text, str(r.status_code)) + str(e))
api_assertion_results = "断言失败: code状态码 %s 与响应状态码不匹配,%s" % (api_assert_text, str(r.status_code))
api_error = 1
api_successful = 0
pass
try:
response_message = r.json()
except json.decoder.JSONDecodeError:
api_error = 1
api_successful = 0
APITestResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results="",
api_request_results={"result": "接口返回结果类型错误非JSON格式"},
api_result_id=api_result_id,
api_task_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=api_url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
continue
if ApiParameterExtraction.objects.filter(api_test_case_id=api_test_case_id).count() > 0:
ApiParameterExtraction.objects.delete()
for api_parameter_extractions in api_parameter_extraction:
if api_parameter_extractions['api_value_variable'] == "":
pass
else:
v_text = api_parameter_extractions['api_value_variable'].split(".")
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
json_loads = "提取变量json.loads(r.text) 失败:" + str(e)
logger.error("提取变量json.loads(r.text) 失败:" + str(e))
pass
try:
for a in v_text:
if "[" in a and "]" in a:
variable_1 = a.split('[')[0]
variable_2 = a.split('[')[1].split(']')[0]
if variable_1 == "":
api_result = api_result[0]
else:
api_result = api_result[variable_1][int(variable_2)]
else:
if a == "0":
api_result = api_result[int(a)]
else:
api_result = api_result[a]
api_variable_results = str(api_result)
except Exception as e:
api_variable_results = api_parameter_extractions['api_key_variable']
pass
ApiParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_variable_results,
api_value_variable=api_parameter_extractions[
'api_value_variable'],
api_key_variable=api_parameter_extractions[
'api_key_variable'])
else:
for api_parameter_extractions in api_parameter_extraction:
if api_parameter_extractions['api_key_variable'] == "":
pass
else:
v_text = api_parameter_extractions['api_value_variable'].split(".")
# 提取变量
try:
api_result = json.loads(r.text)
except Exception as e:
json_loads = "提取变量json.loads(r.text) 失败:" + str(e)
logger.error("提取变量json.loads(r.text) 失败:" + str(e))
pass
try:
for a in v_text:
if "[" in a and "]" in a:
variable_1 = a.split('[')[0]
variable_2 = a.split('[')[1].split(']')[0]
api_result = api_result[variable_1][int(variable_2)]
else:
api_result = api_result[a]
api_variable_results = str(api_result)
except Exception as e:
logger.info(str(api_test_case_name) + " 参数提取失败" + api_parameter_extractions[
'api_key_variable'] + str(e))
api_variable_results = api_parameter_extractions['api_key_variable']
logger.error(api_variable_results)
pass
if ApiParameterExtraction.objects.filter(
api_key_variable=api_parameter_extractions['api_key_variable']).count() > 0:
api_parameter_extraction_list = (
str(api_parameter_extractions['api_key_variable']) + "变量名称重复,请重新填写")
else:
ApiParameterExtraction.objects.create(api_test_case_id=api_test_case_id,
api_variable_results=api_parameter_extractions[
'api_variable_results'],
api_value_variable=api_variable_results,
api_key_variable=api_parameter_extractions[
'api_key_variable'])
api_parameter_extraction = ApiParameterExtraction.objects.filter(api_test_case_id=api_test_case_id)
api_parameter_extraction_list = []
if api_parameter_extraction.count() > 0:
for api_parameter_extractions in api_parameter_extraction:
api_parameter_extraction_dict = {
"api_key_variable": api_parameter_extractions.api_key_variable,
"api_variable_results": api_parameter_extractions.api_variable_results
}
api_parameter_extraction_list.append(api_parameter_extraction_dict)
APITestResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results=str(api_parameter_extraction_list),
api_request_results=r.json(),
api_result_id=api_result_id,
api_task_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=r.url,
api_body=api_parameter_body_dict,
api_header=header_dict
)
except Exception as e:
abnormal = str(e)
if api_assertion_results == "断言成功":
api_error = 0
api_successful = 1
APITestResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results=str(api_parameter_extraction_list),
api_request_results=r.text.encode().decode("unicode_escape"),
api_result_id=api_result_id,
api_task_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=r.url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
else:
try:
result = r.json()
except Exception as e:
result = r.text
pass
api_error = 1
api_successful = 0
APITestResultAssociated.objects.create(api_test_case_name=api_test_case_name, api_error=api_error,
api_successful=api_successful, abnormal=abnormal,
json_extract_variable_conversion=json_loads,
api_assertion_results=api_assertion_results,
api_variable_results=str(api_parameter_extraction_list),
api_request_results=result,
api_result_id=api_result_id,
api_task_id=api_task_id,
api_business_test_name=api_business_test_name,
api_url=api_url,
api_header=header_dict,
api_body=api_parameter_body_dict,
)
continue
...@@ -12,6 +12,7 @@ from automated_main.exception.my_exception import MyException ...@@ -12,6 +12,7 @@ from automated_main.exception.my_exception import MyException
from automated_main.models.performance_test.performance_project import PerformanceProject from automated_main.models.performance_test.performance_project import PerformanceProject
from automated_main.form.performance_project import PerformanceProjectForm from automated_main.form.performance_project import PerformanceProjectForm
import logging import logging
logger = logging.getLogger('django') logger = logging.getLogger('django')
...@@ -70,7 +71,7 @@ class PerformanceProjectView(View): ...@@ -70,7 +71,7 @@ class PerformanceProjectView(View):
:return: :return:
""" """
PerformanceProject.objects.filter(id=performance_project_id).delete() PerformanceProject.objects.delete()
return response_success() return response_success()
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
......
...@@ -26,13 +26,9 @@ from django.conf import settings ...@@ -26,13 +26,9 @@ from django.conf import settings
from django.core.paginator import Paginator from django.core.paginator import Paginator
from automated_main.models.performance_test.performance_report import PerformanceReport, PerformanceReportError from automated_main.models.performance_test.performance_report import PerformanceReport, PerformanceReportError
from pathlib import Path from pathlib import Path
from django.db.models import Q
from automated_main.utils.jmeter_perform import create_para_jmx
import logging import logging
import csv import csv
import pandas as pd import pandas as pd
import numpy as np
from functools import reduce
logger = logging.getLogger('django') logger = logging.getLogger('django')
...@@ -133,7 +129,7 @@ class PerformanceScriptView(View): ...@@ -133,7 +129,7 @@ class PerformanceScriptView(View):
:return: :return:
""" """
PerformanceScript.objects.filter(id=performance_script_id).delete() PerformanceScript.objects.delete()
return response_success("删除单独性能脚本") return response_success("删除单独性能脚本")
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
...@@ -437,7 +433,7 @@ class PerformanceScriptReport(View): ...@@ -437,7 +433,7 @@ class PerformanceScriptReport(View):
if report_id == "": if report_id == "":
return response_failed({"status": 10102, "message": "report_id不能为空"}) return response_failed({"status": 10102, "message": "report_id不能为空"})
PerformanceReport.objects.filter(id=report_id).delete() PerformanceReport.objects.delete()
dir = settings.JMETER_REPORT dir = settings.JMETER_REPORT
dir_list = os.listdir(dir) dir_list = os.listdir(dir)
for report in dir_list: for report in dir_list:
......
# -*- coding: utf-8 -*-
# @Time : 2022/10/24 20:49
# @Author : wyh
# @File : __init__.py.py
# @desc:
# -*- coding: utf-8 -*-
# @Time : 2022/10/24 20:50
# @Author : wyh
# @File : personnel_allocation_view.py
# @desc:
from django.views.generic import View
from automated_main.utils.http_format import response_success, response_failed
from automated_main.exception.my_exception import MyException
from automated_main.models.testing_progress.testing_progress import TestingProgress
import json
import arrow
class PersonnelAllocationView(View):
def get(self, request, *args, **kwargs):
"""
获取任务
"""
testing_progress_data = TestingProgress.objects.all()
testing_progress_list = []
for testing_progress in testing_progress_data:
if testing_progress.group == "1":
render = "split"
else:
render = ""
testing_progress_dict = {
"text": testing_progress.testing_progress_title,
"start_date": testing_progress.start_date,
"id": testing_progress.id,
"duration": testing_progress.duration,
"parent": testing_progress.parent,
"type": testing_progress.type,
"end_date": testing_progress.end_date,
"render": render,
"description": testing_progress.description
}
testing_progress_list.append(testing_progress_dict)
return response_success(testing_progress_list)
def put(self, request, *args, **kwargs):
"""
新增测试任务
"""
body = request.body
if not body:
return response_success()
data = json.loads(body)
TestingProgress.objects.create(testing_progress_title=data['text'],
start_date=arrow.get(str(data['start_date'])).format('YYYY-MM-DD'),
end_date=arrow.get(str(data['end_date'])).format('YYYY-MM-DD'),
duration=data['duration'],
group=data['group'],
parent=data['parent'],
type=data['color'],
description=data['description'])
return response_success("创建成功")
def post(self, request, *args, **kwargs):
return response_success()
def delete(self, *args, **kwargs):
return response_success()
...@@ -68,7 +68,7 @@ class UIElementOperationView(View): ...@@ -68,7 +68,7 @@ class UIElementOperationView(View):
:return: :return:
""" """
UIElementsOperation.objects.filter(id=element_operation_id).delete() UIElementsOperation.objects.delete()
return response_success("删除元素操作成功") return response_success("删除元素操作成功")
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
......
...@@ -67,7 +67,7 @@ class UiPageView(View): ...@@ -67,7 +67,7 @@ class UiPageView(View):
:return: :return:
""" """
UIPage.objects.filter(id=ui_page_id).delete() UIPage.objects.delete()
return response_success("删除页面成功") return response_success("删除页面成功")
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
......
...@@ -99,7 +99,7 @@ class UIPageElementView(View): ...@@ -99,7 +99,7 @@ class UIPageElementView(View):
delete_ui_page_elemnt_id = list(set(page_element_all_id_list).difference(set(edit_page_element_id_list))) delete_ui_page_elemnt_id = list(set(page_element_all_id_list).difference(set(edit_page_element_id_list)))
for delete_elemnt_id in delete_ui_page_elemnt_id: for delete_elemnt_id in delete_ui_page_elemnt_id:
UIPageElement.objects.filter(id=delete_elemnt_id).delete() UIPageElement.objects.delete()
return response_success("编辑页面元素成功") return response_success("编辑页面元素成功")
else: else:
raise MyException() raise MyException()
...@@ -114,7 +114,7 @@ class UIPageElementView(View): ...@@ -114,7 +114,7 @@ class UIPageElementView(View):
:return: :return:
""" """
UIPageElement.objects.filter(id=ui_page_element_id).delete() UIPageElement.objects.delete()
return response_success("删除页面元素成功") return response_success("删除页面元素成功")
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
......
...@@ -66,7 +66,7 @@ class UIPositioningView(View): ...@@ -66,7 +66,7 @@ class UIPositioningView(View):
:return: :return:
""" """
UIPositioning.objects.filter(id=element_positioning_id).delete() UIPositioning.objects.delete()
return response_success("删除元素定位成功") return response_success("删除元素定位成功")
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
......
...@@ -67,7 +67,7 @@ class UiProjectView(View): ...@@ -67,7 +67,7 @@ class UiProjectView(View):
:return: :return:
""" """
UIProject.objects.filter(id=ui_project_id).delete() UIProject.objects.delete()
return response_success() return response_success()
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
......
...@@ -97,7 +97,7 @@ class UITestCaseView(View): ...@@ -97,7 +97,7 @@ class UITestCaseView(View):
ui_test_case.ui_project_id = data['ui_project_id'] ui_test_case.ui_project_id = data['ui_project_id']
ui_test_case.id = data['ui_test_case_id'] ui_test_case.id = data['ui_test_case_id']
ui_test_case.save() ui_test_case.save()
UITestCaseAssociated.objects.filter(cid_id=ui_test_case_id).delete() UITestCaseAssociated.objects.delete()
if form.is_valid(): if form.is_valid():
for i in data["ui_test_case_data"]: for i in data["ui_test_case_data"]:
...@@ -127,7 +127,7 @@ class UITestCaseView(View): ...@@ -127,7 +127,7 @@ class UITestCaseView(View):
:return: :return:
""" """
UITestCase.objects.get(id=ui_test_case_id).delete() UITestCase.objects.delete()
return response_success("删除UI测试用例成功") return response_success("删除UI测试用例成功")
def put(self, request, *args, **kwargs): def put(self, request, *args, **kwargs):
...@@ -146,7 +146,7 @@ class UITestCaseView(View): ...@@ -146,7 +146,7 @@ class UITestCaseView(View):
form = UiTestCaseForm(data) form = UiTestCaseForm(data)
if form.is_valid(): if form.is_valid():
ui_test_case = UITestCase.objects.create(**form.cleaned_data) ui_test_case = UITestCase.objects
ui_test_case_id = ui_test_case.id ui_test_case_id = ui_test_case.id
print(data["ui_test_case_data"]) print(data["ui_test_case_data"])
......
...@@ -45,7 +45,7 @@ class UITestTaskView(View): ...@@ -45,7 +45,7 @@ class UITestTaskView(View):
:param kwargs: :param kwargs:
:return: :return:
""" """
UITestTask.objects.get(id=ui_test_task_id).delete() UITestTask.objects.delete()
return response_success("删除UI测试任务成功") return response_success("删除UI测试任务成功")
...@@ -257,7 +257,7 @@ class CheckResultList(View): ...@@ -257,7 +257,7 @@ class CheckResultList(View):
if ui_test_result_id == "": if ui_test_result_id == "":
return response_failed({"status": 10102, "message": "ui_test_result_id不能为空"}) return response_failed({"status": 10102, "message": "ui_test_result_id不能为空"})
UITestResult.objects.get(ui_task_id=ui_test_task_id, id=ui_test_result_id).delete() UITestResult.objects.delete()
return response_success("删除UI测试任务成功") return response_success("删除UI测试任务成功")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment