commit
eb162a5afc
@ -0,0 +1,72 @@ |
||||
import base64 |
||||
|
||||
from PIL import Image, ImageDraw |
||||
import random |
||||
import io |
||||
import numpy as np |
||||
|
||||
|
||||
def generate_geometry_captcha(): |
||||
# 可选颜色和形状 |
||||
colors = {"red": (255, 0, 0), "blue": (0, 0, 255), "green": (0, 128, 0)} |
||||
shapes = ["cone", "cylinder", "cube"] |
||||
|
||||
# 随机生成 5 个图形 |
||||
objects = [] |
||||
for _ in range(5): |
||||
color_name = random.choice(list(colors.keys())) |
||||
shape = random.choice(shapes) |
||||
width = random.randint(30, 80) # 随机宽度 |
||||
height = random.randint(40, 100) # 随机高度 |
||||
x = random.randint(50, 350) |
||||
y = random.randint(50, 150) |
||||
objects.append({ |
||||
"color": color_name, |
||||
"shape": shape, |
||||
"width": width, |
||||
"height": height, |
||||
"x": x, |
||||
"y": y, |
||||
"bbox": (x, y, x + width, y + height) # 图形边界框 |
||||
}) |
||||
|
||||
# 创建图片 |
||||
img = Image.new("RGB", (400, 200), color=(240, 240, 240)) |
||||
draw = ImageDraw.Draw(img) |
||||
|
||||
# 绘制所有图形 |
||||
for obj in objects: |
||||
x, y, w, h = obj["x"], obj["y"], obj["width"], obj["height"] |
||||
if obj["shape"] == "cylinder": |
||||
draw.ellipse((x, y, x + w, y + h // 3), fill=colors[obj["color"]]) |
||||
draw.rectangle((x, y + h // 6, x + w, y + h), fill=colors[obj["color"]]) |
||||
draw.ellipse((x, y + h - h // 3, x + w, y + h), fill=colors[obj["color"]]) |
||||
elif obj["shape"] == "cone": |
||||
draw.polygon([(x, y + h), (x + w // 2, y), (x + w, y + h)], fill=colors[obj["color"]]) |
||||
elif obj["shape"] == "cube": |
||||
draw.rectangle((x, y, x + w, y + h), fill=colors[obj["color"]]) |
||||
|
||||
# 随机生成题目(例如:选择红色最小圆柱体) |
||||
target_color = "red" |
||||
target_shape = "cylinder" |
||||
|
||||
# 筛选符合条件的图形(红色圆柱体) |
||||
candidates = [obj for obj in objects if obj["color"] == target_color and obj["shape"] == target_shape] |
||||
|
||||
# 找出最小的一个(按面积排序) |
||||
if candidates: |
||||
target_obj = min(candidates, key=lambda obj: obj["width"] * obj["height"]) |
||||
correct_answer = target_obj["bbox"] # 正确答案的坐标范围 |
||||
else: |
||||
correct_answer = None # 若无符合条件的图形,重新生成 |
||||
|
||||
# 返回图片和答案 |
||||
img_bytes = io.BytesIO() |
||||
img.save(img_bytes, format="PNG") |
||||
return { |
||||
"image": base64.b64encode(img_bytes.getvalue()).decode("utf-8"), |
||||
"correct_answer": correct_answer, |
||||
"prompt": f"请点击颜色为 {target_color} 的最小 {target_shape}" |
||||
} |
||||
|
||||
print(generate_geometry_captcha()) |
@ -0,0 +1,68 @@ |
||||
import re |
||||
import uuid |
||||
import json |
||||
from fastapi import APIRouter, Query |
||||
from dbgpt_app.openapi.api_view_model import Result |
||||
from dbgpt._private.pydantic import BaseModel, Field,field_validator |
||||
from dbgpt_app.login.login_db import AuthLoginDao |
||||
from dbgpt.redis.redis import redis_client |
||||
router = APIRouter() |
||||
|
||||
|
||||
auth_login = AuthLoginDao() |
||||
|
||||
|
||||
|
||||
class AuthLogin(BaseModel): |
||||
username: str |
||||
password: str |
||||
verificationCode: str | None = None |
||||
|
||||
# 验证用户名 |
||||
@field_validator('username') |
||||
def validate_username(cls,v): |
||||
if not v: |
||||
raise ValueError('用户名不能为空') |
||||
#只允许英文、数据和特殊字符 !@#$%^&* |
||||
pattern = r'^[a-zA-Z0-9!@#$%^&*()]{5,20}$' |
||||
if not re.fullmatch(pattern,v): |
||||
raise ValueError('用户名必须为5-20个字符') |
||||
return v |
||||
# 验证密码 |
||||
@field_validator('password') |
||||
def validate_password(cls,v): |
||||
if not v: |
||||
raise ValueError('密码不能为空') |
||||
# 只允许英文、数字和特殊字符 |
||||
pattern = r'^[a-zA-Z0-9!@#$%^&*()]{5,100}$' |
||||
if not re.fullmatch(pattern, v): |
||||
raise ValueError('密码必须为5-20个字符,只能包含英文、数字或!@#$%^&*()') |
||||
return v |
||||
|
||||
|
||||
@router.post("/doLogin") |
||||
async def dologin( authLogin :AuthLogin): |
||||
|
||||
rt = auth_login.select_user(authLogin) |
||||
if rt is not None: |
||||
loginUser = { |
||||
"id": rt.id, |
||||
"username": rt.username, |
||||
"password": rt.password, |
||||
"nick_name": rt.nick_name, |
||||
"sex": rt.sex, |
||||
"phone": rt.phone, |
||||
"dept_id": rt.dept_id, |
||||
"status": rt.status, |
||||
"avatar": rt.avatar, |
||||
"email": rt.email, |
||||
} |
||||
print(json.dumps(loginUser)) |
||||
random_str = uuid.uuid4().hex |
||||
uuids_str = random_str[:24] |
||||
redis_client.set("oauth_access_token:"+ uuids_str,json.dumps(loginUser, ensure_ascii=False), ex=3600) |
||||
|
||||
return Result.succ(uuids_str) |
||||
else: |
||||
return Result.failed(code="301", msg=f"账号或密码错误,请重新输入") |
||||
|
@ -0,0 +1,19 @@ |
||||
|
||||
from dbgpt.storage.metadata import BaseDao |
||||
from dbgpt_serve.user.user import UserEntity |
||||
|
||||
userEntity = UserEntity |
||||
|
||||
class AuthLoginDao(BaseDao): |
||||
def select_user(self, user : UserEntity): |
||||
session = self.get_raw_session() |
||||
result = ( |
||||
session.query(userEntity) |
||||
.filter(userEntity.username == user.username ) |
||||
.filter(userEntity.password == user.password) |
||||
.filter(userEntity.status == '0') |
||||
.filter(userEntity.deleted == 0) |
||||
.first() |
||||
) |
||||
session.close() |
||||
return result |
@ -0,0 +1,13 @@ |
||||
from redis import Redis |
||||
from fastapi import FastAPI |
||||
|
||||
app = FastAPI() |
||||
|
||||
# 初始化 redis 连接(带密码) |
||||
redis_client = Redis( |
||||
host= "192.168.130.154", |
||||
port = 6379, |
||||
password = "cjy@123", |
||||
db=9, |
||||
decode_responses=True |
||||
) |
@ -0,0 +1,21 @@ |
||||
|
||||
from sqlalchemy import Column,Integer,String,Text |
||||
|
||||
from dbgpt.storage.metadata import Model |
||||
|
||||
|
||||
class UserEntity(Model): |
||||
__tablename__ = "sys_user" |
||||
id = Column(Integer,primary_key=True) |
||||
username = Column(String(255)) |
||||
password = Column(String(255)) |
||||
nick_name = Column(String(255)) |
||||
sex = Column(String(1)) |
||||
phone = Column(String(20)) |
||||
dept_id = Column(Integer) |
||||
status = Column(String(1)) |
||||
avatar = Column(String(500)) |
||||
email = Column(String(50)) |
||||
remark = Column(Text) |
||||
deleted = Column(Integer) |
||||
|
Loading…
Reference in new issue