下面例子都使用pipenv虚拟环境
基本以DRF这个二次封装的Django框架来实现功能
安装与初始化
pip3 install django -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
官网是djangoproject.com
如果是用pipenv安装
pipenv install --pypi-mirror <mirror_url> <package_name>
在放源代码的目录下运行,创建项目的配置管理程序
django-admin startproject mysite
默认创建的一些文件都是项目的配置,manage不用动,控制程序用的,urls路由,settings配置项目诸如数据库连接之类的配置,asgi\wgsi在服务器上部署会用到
如果配置没问题,可以来创建你的后端程序目录,注意新建的app是在源代码目录下,和上面的配置管理程序同级
python manage.py startapp demo
创建完后的目录里面,views是目录,models是数据库模型,但是最好还是写sql生成表,test是测试,apps是配置
如果你的要用需要把程序拆分成多个app,可以用上面这条命令分别创建app对应的目录
可以使用这条命令来运行你的程序
python manage.py runserver
当你创建好了你项目的程序目录、数据库等,需要配置django的配置项来使其生效
编辑setting.py
没提到的配置项可以参考
https://blog.csdn.net/zhouruifu2015/article/details/129646086
BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.insert(0,BASE_DIR)//代码目录
sys.path.insert(0,os.path.join(BASE_DIR,"demo")) //app目录
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'demo', //有几个app就注册几个
]
STATICFILES_DIRS = [os.path.join(BASE_DIR,'static')] //放置静态文件
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'webdb',
'USER': 'root',
'PASSWORD': '123456',
'HOST': '127.0.0.1' ,
'PORT': '3306',
}
}
安装mysql依赖
pipenv install --pypi-mirror http://mirrors.aliyun.com/pypi/simple/ mysqlclient
建个库试一下
use webdb;
create table USER (
id int not null auto_increment comment '用户id',
name varchar(255) collate utf8mb4_general_ci not null comment '用户名',
birthday datetime default null,
mobile varchar(255) collate utf8mb4_general_ci default null,
email varchar(255) collate utf8mb4_general_ci not null,
password varchar(255) collate utf8mb4_general_ci not null,
create_time datetime default current_timestamp,
primary key (id)
)ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
python manage.py runserver
你也可以通过写model类来建表,写完后先运行python manage.py makemigrations app名,然后再运行python manage.py migrate,就会给你创建出表,但不灵活,还是推荐写sql
反过来也可以把已有的表直接导成model类,python manage.py inspectdb --database default(你配置里面配的数据库) xxx(你要搞的表名) > backend/appname/models.py
如果要导出多个表可以直接写多个,空格隔开,如果你非得写成多个语句导入,记得用>>而不是>
示例:python .\mysite\manage.py inspectdb default USER > .\mysite\demo\models.py
注意这个很容易出现编码问题,导完看看是不是utf-8的
安装个跨域用的pipenv install --pypi-mirror http://mirrors.aliyun.com/pypi/simple/ django-cors-headers
INSTALLED_APPS = [
... ,
"corsheaders" ,
... ,
]
MIDDLEWARE = [
#'django.middleware.csrf.CsrfViewMiddleware',
"corsheaders.middleware.CorsMiddleware" ,
... ,
]
# CORS_ORIGIN_ALLOW_ALL为True, 指定所有域名(ip)都可以访问后端接口, 默认为False
CORS_ORIGIN_ALLOW_ALL = True
#允许cookie
CORS_ALLOW_CREDENTIALS = True
CORS_ALLOW_HEADERS = [
"accept" ,
"accept-encoding" ,
"authorization" ,
"content-type" ,
"dnt" ,
"origin" ,
"user-agent" ,
"x-csrftoken" ,
"x-requested-with" ,
]
题外话:windows上面启动mysql的方法
SERVICES.MSC找到对应的服务启动
如果运行的时候出现ValueError: source code string cannot contain null bytes这个错误,把你的文件看一下,看看是不是有文件编码方式变成了utf-16,改回utf-8就行了
基础
类视图与model
建一个宝宝程序
新建一个menu目录,在views里面编写
from django.shortcuts import render
from django.views import View
from django.http import HttpResponse
# Create your views here.
class GoodsMainMenu(View):
def get(self,request):
print("get!")
return HttpResponse("get it!")
def post(self,request):
print("post coming~")
return HttpResponse("post it!")
配置settingsINSTALLED_APPS
配置urls
from django.contrib import admin
from django.urls import path
from menu.views import GoodsMainMenu
urlpatterns = [
path('admin/', admin.site.urls),
path('main_menu',GoodsMainMenu.as_view())
]
让我们用前面教的方法,从数据库中把一张表导成model
from django.db import models
class User(models.Model):
name = models.CharField(max_length=255, db_comment='用户名')
birthday = models.DateTimeField(blank=True, null=True)
mobile = models.CharField(max_length=255, blank=True, null=True)
email = models.CharField(max_length=255)
password = models.CharField(max_length=255)
create_time = models.DateTimeField(blank=True, null=True, auto_now=True)
class Meta:
managed = False
db_table = 'user'
def to_dict(self): //写个数据处理函数
return {
'name': self.name,
'email': self.email
}
然后让我们用从数据库里面取到的数据作为返回的内容,当然,得先进行序列化处理
from django.core import serializers
from django.shortcuts import render
from django.views import View
from django.http import HttpResponse
from menu.models import User
from django.http import JsonResponse
import json
# Create your views here.
class GoodsMainMenu(View):
def get(self,request):
users = User.objects.all()
print("get!")
#序列化,这里不能用json.dumps
main_menu = serializers.serialize('json', users) #获取所有数据
#取部分数据,需要在models中预先返回
data = [user.to_dict() for user in users]
result = {}
result["status"] = '1000'
result["data"] = data
#jsonresponse会自动返回json格式的结果
return JsonResponse(result)
# return HttpResponse(json.dumps(result))
# return HttpResponse(main_menu)
def post(self,request):
print("post coming~")
return HttpResponse("post it!")
带参数查询,附只获取部分列的方法
class ProjectView(View):
def get(self,request):
param_id = request.GET['manager_id'] #注意这里是[]
project = Project.objects.filter(manager_id=param_id).values('project_id', 'project_name', 'create_time')
# project_list = serializers.serialize('json',project) #转化为列表
project_list = list(project)
result = {
'status': 900,
'data': project_list,
}
#return HttpResponse(project_list)
return JsonResponse(result)
上面的表结构
class Project(models.Model):
project_id = models.AutoField(primary_key=True, db_comment='项目id')
manager_id = models.IntegerField(db_comment='经理id来自user的id')
project_name = models.CharField(max_length=255)
create_time = models.DateTimeField(blank=True, null=True)
class Meta:
managed = False
db_table = 'project'
让我们来抽象一点,把返回结果抽象到一个模块里面
class ViewResponse():
@staticmethod
def success(data):
result = {
'status': 1000,
'data': data
}
return(result)
然后改造前面那两个api
class GoodsMainMenu(View):
def get(self,request):
users = User.objects.all()
#序列化,这里不能用json.dumps
main_menu = serializers.serialize('json', users) #获取所有数据
#取部分数据,需要在models中预先返回
data = [user.to_dict() for user in users]
#jsonresponse会自动返回json格式的结果
return JsonResponse(response.ViewResponse.success(data))
class ProjectView(View):
def get(self,request):
param_id = request.GET['manager_id']
project = Project.objects.filter(manager_id=param_id).values('project_id', 'project_name', 'create_time')
# project_list = serializers.serialize('json',project) #转化为列表
project_list = list(project)
return JsonResponse(response.ViewResponse.success(project_list))
合并url
假设我们现在已经写了一堆功能,有一堆app,自然也会有一堆路由
把所有的路由都放在总的urls.py里面总归看起来会很乱,为了方便管理,我们可以把app的路由放在自己的目录下面
例:menu和demo两个app(需要注册
#menu urls.py
from django.urls import path, re_path
from menu.views import GoodsMainMenu,ProjectView, UserresigterView, UsersearchView, UsergenericAPIvie, UsergenericmixAPIView
urlpatterns = [
path('main_menu',GoodsMainMenu.as_view()),
path('project',ProjectView.as_view()),
path('register',UsergenericmixAPIView.as_view()),
re_path("register/(?P<pk>.*)", UsergenericmixAPIView.as_view()),
path("search",UsersearchView.as_view())
]
#demo urls.py
from django.urls import path, re_path
from demo.views import UserApiview
urlpatterns = [
path('usertype/<int:type>/<int:page>',UserApiview.as_view()),
]
总,访问的时候把总入口跟具体的路由拼接访问就行了
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('menu/',include('menu.urls')),
path('demo/',include('demo.urls'))
]
#例:http://127.0.0.1:8000/menu/register/1
跨站
pip install django-cors-headers
在你的Django项目的settings.py文件中,做如下配置:
INSTALLED_APPS = [
...
'corsheaders',
...
]
MIDDLEWARE = [
...
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
...
]
# 允许所有源
CORS_ORIGIN_ALLOW_ALL = True
# 或者,只允许特定的源
CORS_ORIGIN_WHITELIST = [
'http://localhost:5173',
]
修改时区
默认存储时间为utc时间,修改时区方法:
修改settings.py
TIME_ZONE = 'Asia/Shanghai'
#如果只改上面没生效,把下面也改了
USE_TZ = False
中间件
django可以在settings里面配置中间件MIDDLEWARE
当请求从外部传入时,先按顺序从上到下走一遍中间件里面的处理方式,然后才到视图,视图返回结果后,同样自底向上的再走一遍处理,最后才返回给接收端。利用这种特性可以配置一些玩意在里面,比如统一的response处理器
例:
class CustomResponseMiddleware(MiddlewareMixin):
def process_response(self, request, response):
if isinstance(response, Response):
result = {}
print(response.data)
data = response.data.get('data')
pagination = response.data.get('pagination', None)
status_code = response.status_code
if status_code >= 400:
result['responsecode'] = 1001
result['data'] = data
elif status_code == 200 or status_code == 201:
result['responsecode'] = 1000
result['data'] = data
if pagination:
result['pagination'] = pagination['pagination']
else:
return response
return JsonResponse(result)
return response
视图只要正常用Response来返回结果就可以了,这个处理器会自动处理成json
另外就是DRF框架里面也有个配置在settings里的配置,比如配置个分页处理器
REST_FRAMEWORK = {
'DEFAULT_PAGINATION_CLASS': 'utils.custompagination.CustomPagination',
}
from rest_framework.pagination import PageNumberPagination
class CustomPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 50
def get_page_size(self, request):
page_size = super().get_page_size(request)
return max(page_size, 10)
def get_paginated_response(self, data):
return {
'pagination': {
'total_items': self.page.paginator.count,
'page_size': self.get_page_size(self.request),
'page': int(self.request.query_params.get('page', 1)),
'total_pages': self.page.paginator.num_pages,
}
}
这东西的处理步骤在中间件跟视图的中间,也就是请求流入处理过程是中间件--DRF--视图,返回亦然,另外亲测这里重写的get_paginated_response方法貌似不会被自动触发,需要手动调用,get_page_size和上面配置的最大size那些倒是可以生效,怪耶
一个视图被上面两玩意处理后,前端调用获得的数据结构大概长这样
data: (10) [{…}, {…}, {…}, {…}, {…}, {…}, {…}, {…}, {…}, {…}]
pagination: {total_items: 105, page_size: 10, page: 1, total_pages: 11}
responsecode: 1000
django rest framework
https://www.django-rest-framework.org/
一个用于开发restful框架的封装,看了下还支持oauth2,不错
pipenv install --pypi-mirror http://mirrors.aliyun.com/pypi/simple/ djangorestframework
然后
INSTALLED_APPS = [
...
'rest_framework',
]
获取参数方法
get:
request.query_params.get(xxx)
post:
request.data.get(xxx)
视图
类视图APIview
简单来讲,从数据库到api的流程是这样的
- 建库建表
- 建model
- 建与之对应的序列化器
- 建立视图,在视图里面调用model进行查询(从api调用中获得参数),用序列化器对查询结果进行处理(反序列化save或者序列化返回给前端)
- url暴露api
https://blog.csdn.net/weixin_43865008/article/details/128851985
api类视图,在查询功能的同时实现了翻页功能
from rest_framework.views import APIView
from menu.models import User
from django.http import JsonResponse
from utils import response
#main_menu/type/1
class UserApiview(APIView):
def get(self,request,type,page):
current_page = (page - 1)*2
end_page = page*2
userdata = User.objects.filter(type=type).values('name', 'email', 'create_time')[current_page:end_page]
return JsonResponse(response.ViewResponse.success(list(userdata)))
#path('usertype/<int:type>/<int:page>',UserApiview.as_view())
查询经理名下的项目,post更新或者新建项目
注意这里涉及到时区转换,需要先在settings里面配置TIME_ZONE = 'Asia/Shanghai',django默认存放的为UTC时间,如果想要根据你配置的时区来转换显示的数据,需要用timezone.localtime来处理一下
class ProjectView(APIView):
def get(self,request):
param_id = request.GET['manager_id']
projects = Project.objects.filter(manager_id=param_id).values('project_id', 'project_name', 'create_time')
# project_list = serializers.serialize('json',project) #转化为列表
project_list = []
for project in projects:
project_list.append({
'project_id': project["project_id"],
'project_name': project["project_name"],
'create_time': timezone.localtime(project["create_time"]).isoformat()
})
return JsonResponse(response.ViewResponse.success(project_list))
def post(self, request, *args, **kwargs):
data = request.data
manager_id = data.get('manager_id') #这个是APIView取数据的写法,django原生没有data
project_name = data.get('project_name')
if not manager_id or not isinstance(manager_id, int):
return JsonResponse({'message': 'manager_id必须是数字'})
# 对project_name进行数据校验
if not project_name or not isinstance(project_name, str):
return JsonResponse({'message': 'project_name必须是字符串'})
project, created = Project.objects.update_or_create(
manager_id=manager_id,
project_name=project_name,
defaults={'create_time': timezone.now()}
)
if created:
return JsonResponse({'message': '新项目创建成功'})
else:
return JsonResponse({'message': '项目已存在,更新成功'})
使用原生sql,实现关键字查询、多种排序,连表查询,翻页
class GoodsSearchAPIView(APIView):
def get(self,request,keyword,page,order_by):
"""
select r.comment_count,g.image,g.name,g.p_price,g.shop_name,g.sku_id from goods g
left join
(
select count(c.sku_id) as comment_count,c.sku_id from comment c group by c.sku_id
) r
on g.sku_id=r.sku_id
where g.name like "%手机%"
order by r.comment_count desc limit 15,15
"""
order_dict = {
1:"r.comment_count",
2:"g.p_price"
}
limit_page = (page-1)*15
# 执行原生sql
from django.db import connection
from django.conf import settings
sql = """
select r.comment_count,concat('{}',g.image) as image,g.name,g.p_price,g.shop_name,g.sku_id from goods g
left join
(
select count(c.sku_id) as comment_count,c.sku_id from comment c group by c.sku_id
) r
on g.sku_id=r.sku_id
where g.name like "%{}%"
order by {} desc limit {},15
""".format(settings.IMAGE_URL,keyword,order_dict[order_by],limit_page)
cursor = connection.cursor()
cursor.execute(sql)
res = self.dict_fetchall(cursor)
final_list = []
for i in res:
res_json = json.dumps(i,cls=DecimalEncoder,ensure_ascii=False)
final_list.append(res_json)
return ResponseMessage.GoodsResponse.success(final_list)
def dict_fetchall(self, cursor):
desc = cursor.description #返回元组,包含了查询结果的字段信息,[0]是字段名,如[('name', ...), ('age', ...)]
return [dict(zip([col[0] for col in desc], row)) for row in cursor.fetchall()]
#fetchall返回查询的所有结果,元组,元组中的元素顺序和字段的顺序是对应的。例如,[('Tom', 20), ('Jerry', 22)]
model与序列化器
经过上面的例子,我们已经知道django里面数据库的表是以一种叫做model的object存在的,而数据一般以json格式转换,那么从获取数据到导入数据,需要进行反序列化处理
https://www.django-rest-framework.org/api-guide/serializers/
写个user的序列化器,他可以把json数据和user model之间进行转换
from menu.models import User
from rest_framework import serializers
from rest_framework.validators import UniqueValidator
class UserSerializer(serializers.ModelSerializer):
email = serializers.EmailField( #EmailField 字段类型可以验证数据是否为有效的电子邮件地址
required=True,allow_blank=False,
validators=[UniqueValidator(queryset=User.objects.all(),message="用户已存在")]#检查值是否唯一
)
class Meta:
model = User #对标的model
fields = "__all__" #序列化所有的字段
这是视图类
class UserresigterView(APIView):
def post(self,request):
user_data_serializer = UserSerializer(data=request.data)
if user_data_serializer.is_valid():
user_data = User.objects.create(**user_data_serializer.validated_data)
user_front = UserSerializer(user_data)#这里因为要返回给前端看,所以得序列化一下
#这里用前面的数据注册了一个user后,返回的完整的user数据作格式化
return JsonResponse(response.ViewResponse.success(user_front.data))
else:
return JsonResponse(user_data_serializer.errors, status=400)
# post
# {
# "name": "uuko",
# "email": "[email protected]",
# "password": "123456",
# "type": 1
# }
# return
# {
# "status": 1000,
# "data": {
# "id": 11,
# "email": "[email protected]",
# "name": "uuko",
# "birthday": null,
# "mobile": null,
# "password": "123456",
# "create_time": null, #这里没自动生成是因为user moderl没加auto_now=True
# 也可以试试直接把model里面时间的语句删掉,避免覆盖数据库的自动生成语句
# "type": 1
# }
# }
当然明文存储密码是不安全的,我们可以加个md5,这里使用save(),他可以调用我们二次封装的create函数
class UserSerializer(serializers.ModelSerializer):
email = serializers.EmailField(
required=True,allow_blank=False,
validators=[UniqueValidator(queryset=User.objects.all(),message="用户已存在")]
)
def create(self,validated_data): #第二个参数就代表外面传进来的数据
validated_data["password"] = md5(validated_data["password"].encode()).hexdigest()
result = User.objects.create(**validated_data)
return result
class Meta:
model = User
fields = "__all__"
class UserresigterView(APIView):
def post(self,request):
user_data_serializer = UserSerializer(data=request.data)
if user_data_serializer.is_valid():
user_data = user_data_serializer.save()
user_front = UserSerializer(user_data)
return JsonResponse(response.ViewResponse.success(user_front.data))
else:
return JsonResponse(user_data_serializer.errors, status=400)
当然还可以再进一步,禁止password被序列化,也就是这东西不会出现在api的响应里面
class UserSerializer(serializers.ModelSerializer):
···
password = serializers.CharField(write_only=True)
依葫芦画瓢再写个查询用户的功能
class UsersearchView(APIView):
def get(self,request):
email = request.GET['email']
try:
user_data = User.objects.get(email=email)
user_front = UserSerializer(user_data)
return JsonResponse(response.ViewResponse.success(user_front.data))
except Exception as e:
print(e)
return JsonResponse({"msg": "failed to get user info"})
GenericAPIView
经过上面的例子,我们可以看出,视图类里面有很多重复代码,尤其是model、序列化器这一块,使用GenericAPIView可以帮助我们统一配置这些功能,当然还有其他能力,比如配置分页器
from rest_framework.generics import GenericAPIView
让我们来重新封装上面注册用户信息的api
class UsergenericAPIvie(GenericAPIView):
queryset = User.objects #封装了对象查询
serializer_class = UserSerializer #封装了序列化器
def post(self, request):
result = self.get_serializer(data = request.data) #调用序列化器
try:
result.is_valid()
result.save()
return JsonResponse(response.ViewResponse.success(result.data))
except Exception as e:
return JsonResponse(result.errors, status=400)
让我们再丰富下查询功能,如果不带参则查询所有,带参则查询对应的参数,而且可以支持不同的/复数的参数
class UsergenericAPIvie(GenericAPIView):
#queryset = User.objects
serializer_class = UserSerializer
def get_queryset(self):
queryset = User.objects.all()
email = self.request.query_params.get('email', None)
type = self.request.query_params.get('type', None)
if email is not None:
queryset = queryset.filter(email=email)
#return User.objects.filter(email=email)
if type is not None:
queryset = queryset.filter(type=type)
return queryset
def post(self, request):
result = self.get_serializer(data = request.data)
try:
result.is_valid()
result.save()
return JsonResponse(response.ViewResponse.success(result.data))
except Exception as e:
return JsonResponse(result.errors, status=400)
def get(self, request):
return JsonResponse(self.get_serializer(instance = self.get_queryset(),many = True).data,safe=False)
#当 safe=True 时,JsonResponse 要求传递给它的数据必须是一个字典,这里可能是一个包含多个字典的列表
Mixin
mixin是个扩展类,最大的特点是可以和别的类一起使用,快速实现某些功能
from rest_framework.mixins import CreateModelMixin
快速调用create,无需再手动调is_valid,save
等于post
还是重写上面注册功能
class UsergenericmixAPIView(GenericAPIView,CreateModelMixin):
serializer_class = UserSerializer
queryset = User.objects.all()
def post(self, request):
return self.create(request)
此外还有
ListModelMixin=get全部
RetrieveModelMixin=get具体对象,默认用pk主键,等于lookup_field
UpdateModelMixin=put更新
DestroyModelMixin=delete销毁
用主键查询对应信息
class UsergenericmixAPIView(GenericAPIView, CreateModelMixin, RetrieveModelMixin):
serializer_class = UserSerializer
queryset = User.objects.all()
def post(self, request):
return self.create(request)
def get(self, request, pk):
return self.retrieve(request, pk)
#re_path("register/(?P<pk>.*)", UsergenericmixAPIView.as_view()),
viewsets视图集
上面我们对于相同动作带不带参的处理方式是另外写一个get_queryset来处理参数,但其实也可以使用viewsets来处理不同情况下调用的函数
ViewSetMixin
from rest_framework.viewsets import ViewSetMixin
class UserMixApiView(ViewSetMixin,GenericAPIView,RetrieveModelMixin,ListModelMixin,UpdateModelMixin,DestroyModelMixin):
//注意这里因为py继承优先级的问题,必须让ViewSetMixin在前面
serializer_class = UserSerializer
queryset = User.objects.all()
def single(self,request,pk):
return self.retrieve(request, pk)
def alldata(self,request):
return self.list(request)
def save(self,request):
return self.create(request)
def update_user(self,request,pk):
return self.update(request,pk)
def delete(self,request):
return self.destroy(request)
path("sapi",UserMixApiView.as_view({
'get': 'alldata',
'post': 'save'
})),//没参来这
re_path('sapi/(?P<pk>.*)',UserMixApiView.as_view({
'get': 'single',
'post': 'update'
}))//有参来这
当然,这样写就要写一大堆的继承,看起来很麻烦,通过查看源码,我们发现这一堆玩意都可以用一个东西替代
ModelViewSet
然后就只要继承一个玩意就行了
class UserMixApiView(ModelViewSet):
#从源码可以看出这玩意包罗万象
# class ModelViewSet(mixins.CreateModelMixin,
# mixins.RetrieveModelMixin,
# mixins.UpdateModelMixin,
# mixins.DestroyModelMixin,
# mixins.ListModelMixin,
# GenericViewSet):
model
数据库表在django中的模型叫做Model Class,继承自django.db.models.Model
这个类定义了一些对数据的操作方法
objects是每个 Django 模型的默认管理器,它提供了对数据库的查询操作。可以通过模型类的 objects 属性来进行数据查询
YourModel.objects.all() 返回一个包含所有 YourModel 对象的 QuerySet
YourModel.objects.get(field1='some_value')返回满足查询条件的单个对象,不存在或者多结果会引起错误
try:
obj = YourModel.objects.get(field1='some_value')
except YourModel.DoesNotExist:
# 处理查询结果不存在的情况
except YourModel.MultipleObjectsReturned:
# 处理查询结果多于一个对象的情况
YourModel.objects.filter(field1='some_value')返回一个满足筛选条件的queryset
YourModel.objects.exclude(field1='some_value') 返回一个包含不符合条件的对象的 QuerySet。
save(): save() 方法用于保存或更新对象到数据库。如果对象是新创建的,则插入新记录;如果对象已存在(根据主键判断),则更新相应的记录。
delete(): delete() 方法用于删除对象。obj.delete() 将会从数据库中删除与 obj 对象对应的记录。
count(): count() 方法返回 QuerySet 中对象的数量。
queryset是一个用于表示数据库查询的对象。它允许你执行数据库查询,并得到符合条件的一组数据。如果你相对查到的数据进行序列化并返回
from django.http import JsonResponse
from .models import YourModel
from .serializers import YourModelSerializer
def get_data(request, field_value):
# 根据字段的值获取数据
queryset = YourModel.objects.filter(your_field=field_value)
# 使用序列化器将数据序列化为 JSON 格式,这里有多组数据,需要 many=True
serializer = YourModelSerializer(queryset, many=True)
serialized_data = serializer.data
# 返回 JSON 响应
return JsonResponse({'data': serialized_data})
翻页器
原版
https://docs.djangoproject.com/en/5.0/topics/pagination/
drf的翻页器
https://www.django-rest-framework.org/api-guide/pagination/
如果需要全局配置
REST_FRAMEWORK = {
# 全局分页
'DEFAULT_PAGINATION_CLASS': 'xxxxx',
}
示例:带翻页和页码选择的翻页器
from rest_framework.pagination import PageNumberPagination
class CustomPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 50
def get_page_size(self, request):
page_size = super().get_page_size(request)
return max(page_size, 10)
class ProjectView(GenericAPIView, ListModelMixin, CreateModelMixin):
queryset = Project.objects.all()
serializer_class = ProjectSerializer
pagination_class = CustomPagination
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
return self.create(request, *args, **kwargs)
#GET /your-api-endpoint/?page=2&page_size=20
数据
#http://127.0.0.1:8000/project/projects?page=2&page_size=20
{
"count": 115,
"next": "http://127.0.0.1:8000/project/projects?page=3&page_size=20",
"previous": "http://127.0.0.1:8000/project/projects?page_size=20",
"results": [
{...}
]
}
JWT
cookie分为会话性和存储性,会话性窗口关闭就会失效,存储性则会落盘,时效较长
session保存在服务端,至于具体是内存redis还是数据库看设计
token:令牌,用户访问api时会附带上使用,由服务端生成,客户端保存
用户在网页登录,服务端验证账户密码等信息无误后,生成一个token返回,用户接收token,并使用这个token来重新发起请求,服务端接收到token,解密后校验一些用户的数据,无误后把数据返回给用户,如果不通过则返回错误信息
https://jwt.io/
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.cThIIoDvwdueQB468K5xDc5633seEFoqwxjF_xSJyQQ
{
"alg": "HS256",
"typ": "JWT"
}
{
"sub": "1234567890",
"name": "John Doe",
"iat": 1516239022
}
一个jwt外观看起来由.连接的三段加密字符组成
第一段字符是header,包含了加密算法的信息,使用base64处理
第二段是payload,包含存储着的用户的一些信息,使用base64处理
第三段是经过加盐和base64处理的验证信息,将第一段和第二段做以上加密处理后得到的base64字符串和本段内容进行对比,如果一致则通过校验
pip install pyjwt
在项目的setting里面有SECRET_KEY这个字段,就代表项目的盐(你可以修改这个字段)
import jwt
import datetime
SECRET_KEY = 'uoaoiscnzxo&(^(*h9hnhuhga6^E%))'
def create_token():
header = {
"alg": "HS256",
"typ": "JWT"
}
payload = {
'user_id': 1,
'user_name': 'zhangsan',
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)#过期时间,别改exp这个字段名
}
result = jwt.encode(headers=header,payload=payload,key=SECRET_KEY,algorithm='HS256')
return result
def token_decode(token):
try:
return jwt.decode(token,SECRET_KEY,algorithms=["HS256"])
except jwt.exceptions.ExpiredSignatureError:
return("Failed to decode jwt token:expired")
token = create_token()
print(token)
print(token_decode("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxLCJ1c2VyX25hbWUiOiJ6aGFuZ3NhbiIsImV4cCI6MTcwMDYzNTc3N30.mlJQ3vVN_gRa-_jFHP-UhLVa6b1PTF6qYZooo3HAoCc"))
通过url方式进行token的验证
import jwt
import datetime
from rest_framework.authentication import BaseAuthentication
from django.http import JsonResponse
from rest_framework.exceptions import AuthenticationFailed
SECRET_KEY = 'uoaoiscnzxo&(^(*h9hnhuhga6^E%))'
#创建token
def create_token(payload,minutes):
header = {
"alg": "HS256",
"typ": "JWT"
}
payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=minutes)
result = jwt.encode(headers=header,payload=payload,key=SECRET_KEY,algorithm='HS256')
return result
#解密token
def token_decode(token):
result = {'status': False, 'content': '', 'error': None}
try:
result['content'] = jwt.decode(token,SECRET_KEY,algorithms=["HS256"])
result['status'] = True
except jwt.exceptions.ExpiredSignatureError:
result['error'] = "Failed to decode jwt token:expired"
except:
result['error'] = "Failed to decode jwt token"
return(result)
#token验证器,直接自定义验证规则
# class tokenauth(BaseAuthentication):
# def authenticate(self,request):
# token = request.GET.get('token')
# if token:
# result_token = token_decode(token)
# if result_token['status'] != True:
# raise AuthenticationFailed({"code": "A00002", "data": None, "msg": "token已失效", "success": False})
# else:
# return(result_token,token)
# else:
# raise AuthenticationFailed({"code": "A00003", "data": None, "msg": "缺少token", "success": False})
# raise AuthenticationFailed错误会被django捕获变成40x,不会导致程序中断
#token验证,不对token内容做处理
class tokenauth(BaseAuthentication):
def authenticate(self,request):
token = request.GET.get('token')
result_token = token_decode(token)
return(result_token,token) #必须返回两个
from hashlib import md5
from rest_framework.views import APIView
from demo.serializers import UserSerializer
from menu.models import User
from django.http import JsonResponse
from utils import response,mysite_jwt
from rest_framework.generics import GenericAPIView
from rest_framework.mixins import ListModelMixin
#main_menu/type/1
# class UserApiview(APIView):
# def get(self,request,type,page):
# current_page = (page - 1)*2
# end_page = page*2
# userdata = User.objects.filter(type=type).values('name', 'email', 'create_time')[current_page:end_page]
# return JsonResponse(response.ViewResponse.success(list(userdata)))
class Userloginview(GenericAPIView):
serializer_class = UserSerializer
def post(self,request):
request_data = request.data
email = request_data['email']
user_data = User.objects.get(email = email)
if user_data:
user = self.get_serializer(instance = user_data)
request_pw = md5(request_data['password'].encode()).hexdigest()
user_pw = user.data['password']
if request_pw != user_pw:
return JsonResponse({'error':'用户名或密码错误'})
else:
payload = {
'User_name': user.data['name'],
'User_type': user.data['type']
}
return JsonResponse({'token':mysite_jwt.create_token(payload=payload,minutes=1)})
else:
return JsonResponse({'error':"用户不存在"})
class Userlist(GenericAPIView, ListModelMixin):
serializer_class = UserSerializer
queryset = User.objects.all()
authentication_classes = [mysite_jwt.tokenauth]#单个配置
def get(self,request):
print(request.user) #固定用法,返回token验证函数的result_token部分
print(request.auth) #固定用法,返回token验证函数的token部分
if not request.user['status']: #自行定义对token有效性的处理,比直接在验证函数写灵活
return JsonResponse(request.user,safe=False)
return self.list(request)
#使用示例
# http://127.0.0.1:8000/demo/alluser?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJVc2VyX25hbWUiOiJvcmluIiwiVXNlcl90eXBlIjoxLCJleHAiOjE3MDEwNjQyODN9.y4sS4xksOjcwcS8ogE4M-QCcnK8gS2armjTEnMjahNk
通过header的方式传token
把token放在http header里面
class tokenauthheader(BaseAuthentication):
def authenticate(self,request):
token = request.META.get('HTTP_TOKEN')#header字段会被处理成大写加HTTP_,如token变成HTTP_TOKEN
print(token)
result_token = token_decode(token)
return(result_token,token)
全局配置token解析器
在settings中配置
REST_FRAMEWORK = {"DEFAULT_AUTHENTICATION_CLASSES":['utils.mysite_jwt.tokenauthheader']}
#实际路径根据自己项目来
示例
查询/创建/更新api
get:查询所有,带翻页器
post:自动分辨创建还是更新,时间字段自动创建
model
class ResServer(models.Model):
res_server_type = models.CharField(max_length=8, db_comment='类型')
res_server_platform = models.CharField(max_length=20, db_comment='平台')
res_server_code = models.CharField(max_length=100, blank=True, null=True, db_comment='资源在平台上的编号')
res_server_public_ip = models.CharField(max_length=45, db_comment='公网ip')
res_server_internal_ip = models.CharField(max_length=45, db_comment='内网ip')
res_server_env = models.CharField(max_length=4, blank=True, null=True, db_comment='环境')
res_server_project = models.CharField(max_length=50, db_comment='所属项目')
res_server_create_time = models.DateTimeField(blank=True, null=True)
res_server_update_time = models.DateTimeField(blank=True, null=True)
class Meta:
managed = False
db_table = 'res_server'
序列化器
class ResServerSerializer(serializers.ModelSerializer):
def create(self, validated_data):
validated_data['res_server_create_time'] = timezone.now()
validated_data['res_server_update_time'] = timezone.now()
return super().create(validated_data)
def update(self, instance, validated_data):
instance.res_server_type = validated_data.get('res_server_type', instance.res_server_type)
instance.res_server_platform = validated_data.get('res_server_platform', instance.res_server_platform)
instance.res_server_code = validated_data.get('res_server_code', instance.res_server_code)
instance.res_server_public_ip = validated_data.get('res_server_public_ip', instance.res_server_public_ip)
instance.res_server_internal_ip = validated_data.get('res_server_internal_ip', instance.res_server_internal_ip)
instance.res_server_env = validated_data.get('res_server_env', instance.res_server_env)
instance.res_server_project = validated_data.get('res_server_project', instance.res_server_project)
instance.res_server_update_time = timezone.now()
instance.save()
return instance
class Meta:
model = ResServer
fields = '__all__'
read_only_fields = ['res_server_create_time', 'res_server_update_time']
视图
class CustomPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 50
def get_page_size(self, request):
page_size = super().get_page_size(request)
return max(page_size, 10)
class ResServerView(GenericAPIView):
queryset = ResServer.objects.all()
serializer_class = ResServerSerializer
pagination_class = CustomPagination
def get(self, request):
resservers = self.paginate_queryset(self.get_queryset())
serializer = self.serializer_class(instance=resservers, many=True)
return ProjectInfoResponse.success(serializer.data)
def post(self, request):
data = request.data
res_server_id = data.get('id')
try:
if res_server_id and ResServer.objects.filter(id=res_server_id).exists():
instance = ResServer.objects.get(id=res_server_id)
serializer = self.get_serializer(instance, data=data)
else:
serializer = self.serializer_class(data=data)
if serializer.is_valid():
serializer.save()
return ProjectInfoResponse.success(serializer.data)
else:
return ProjectInfoResponse.failed(serializer.errors)
except ResServer.DoesNotExist:
return ProjectInfoResponse.failed("Instance does not exist.")
except Exception as e:
return ProjectInfoResponse.failed(str(e))
复杂查询
部分匹配,非原生sql
封装翻页器返回
from django.db.models import Q
from rest_framework.response import Response
class CustomPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 50
def get_page_size(self, request):
page_size = super().get_page_size(request)
return max(page_size, 10)
def get_paginated_response(self, data):
return Response({
'pagination': {
'total_items': self.page.paginator.count,
'page_size': self.get_page_size(self.request),
'page': int(self.request.query_params.get('page', 1)),
'total_pages': self.page.paginator.num_pages,
},
'data': data,
})
class ProjectView(GenericAPIView):
queryset = Project.objects.all()
serializer_class = ProjectSerializer
pagination_class = CustomPagination
def get(self, request):
project_name = request.query_params.get('project_name', None)
project_owner_id = request.query_params.get('project_owner_id', None)
query_filter = Q()
if project_name:
query_filter |= Q(project_name__icontains=project_name)
if project_owner_id:
query_filter |= Q(project_owner_id__icontains=project_owner_id)
projects = self.paginate_queryset(self.queryset.filter(query_filter))
serializer = self.get_serializer(projects, many=True)
pagination = self.get_paginated_response(serializer.data)
return ProjectInfoResponse.success(serializer.data,pagination.data)
from django.http import JsonResponse
class ProjectInfoResponse:
@staticmethod
def success(data, pagination=None):
result = {'responsecode': 1000, 'data': data}
if pagination:
result['pagination'] = pagination['pagination']
return JsonResponse(result)