网站首页 > 教程文章 正文
上一篇:Dify工具使用全场景:彩票自动兑奖管理系统 (应用篇·第1期)
上一篇通过上传彩票调用大模型,可以按规则生成包含彩票信息的json串,如下信息。
{
"point": "save",
"params": {
"销售站点": "测试站点",
"期号": "2024001",
"玩法": "双色球",
"投注时间": "2024-01-01 10:00:00",
"开奖时间": "2024-01-01 21:00:00",
"验票码": "ABC123",
"金额": "10元",
"公益贡献": "1元",
"微信订阅号": "lottery_test",
"红球和蓝球对": [
{
"红球": [1, 2, 3, 4, 5, 6],
"蓝球": 7
}
]
}
}
接下来,要把json信息入库保存,因为要把彩票信息管理起来,了解自己平时写的号码。
设计表结构存储
# 数据库表结构
"""
CREATE TABLE lottery_records (
id INT AUTO_INCREMENT PRIMARY KEY,
sales_site VARCHAR(255),
issue_number VARCHAR(50),
lottery_type VARCHAR(50),
betting_time DATETIME,
draw_time DATETIME,
verification_code VARCHAR(100),
amount DECIMAL(10,2),
charity_contribution DECIMAL(10,2),
wechat_subscription VARCHAR(100)
);
CREATE TABLE lottery_numbers (
id INT AUTO_INCREMENT PRIMARY KEY,
lottery_record_id INT,
red_balls VARCHAR(100),
blue_ball VARCHAR(10),
FOREIGN KEY (lottery_record_id) REFERENCES lottery_records(id)
);
"""
WEB服务提供
这里需要写一段代码,用于实现web服务。
核心代码如下:
from pydantic import BaseModel
class LotteryData(BaseModel):
point: str
params: dict
from fastapi import APIRouter, Request, Header, HTTPException
import json
import logging
import os
import time
router = APIRouter()
import mysql.connector
from mysql.connector import Error
# MySQL数据库配置
db_config = {
'host': '***.***.***.****',
'user': 'root',
'password': '3333',
'database': 'dbname'
}
# 配置日志
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'lottery.log')),
logging.StreamHandler()
]
)
@router.post('/save')
async def save_lottery(data: LotteryData, authorization: str = Header(None)):
start_time = time.time()
logging.info(f"收到保存请求: {data.point}, 参数: {data.params}")
# 验证API密钥
expected_api_key = "123456" # 设置您的API密钥
if not authorization:
logging.warning("请求缺少authorization header")
raise HTTPException(status_code=401, detail="No authorization header")
auth_scheme, _, api_key = authorization.partition(' ')
if auth_scheme.lower() != "bearer" or api_key != expected_api_key:
logging.warning(f"无效的认证信息: {auth_scheme}")
raise HTTPException(status_code=401, detail="Unauthorized")
# 从params中获取实际的彩票数据
content = data.params
result = json_to_sql(content)
end_time = time.time()
logging.info(f"请求处理完成,耗时: {end_time - start_time:.2f}秒,结果: {result}")
return result
def json_to_sql(data):
start_time = time.time()
logging.info(f"开始处理彩票数据: {data}")
try:
# 连接到MySQL数据库
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
logging.info(f"数据库连接成功,耗时: {time.time() - start_time:.2f}秒")
# 主表插入语句
main_sql = """
INSERT INTO lottery_records (
sales_site, issue_number, lottery_type, betting_time, draw_time,
verification_code, amount, charity_contribution, wechat_subscription
) VALUES (
%(sales_site)s, %(issue_number)s, %(lottery_type)s, %(betting_time)s,
%(draw_time)s, %(verification_code)s, %(amount)s, %(charity_contribution)s,
%(wechat_subscription)s
)
"""
# 准备主表数据
main_data = {
'sales_site': data['销售站点'],
'issue_number': data['期号'],
'lottery_type': data['玩法'],
'betting_time': data['投注时间'],
'draw_time': data['开奖时间'],
'verification_code': data['验票码'],
'amount': float(data['金额'].replace('元', '')),
'charity_contribution': float(data['公益贡献'].replace('元', '')),
'wechat_subscription': data['微信订阅号']
}
# 执行主表插入
logging.info(f"准备执行主表插入,SQL: {main_sql}, 参数: {main_data}")
cursor.execute(main_sql, main_data)
lottery_record_id = cursor.lastrowid
logging.info(f"主表数据插入成功,ID: {lottery_record_id}, 耗时: {time.time() - start_time:.2f}秒")
# 号码表插入语句
numbers_sql = """
INSERT INTO lottery_numbers (lottery_record_id, red_balls, blue_ball)
VALUES (%(lottery_record_id)s, %(red_balls)s, %(blue_ball)s)
"""
# 插入所有号码记录
for ball_pair in data['红球和蓝球对']:
numbers_data = {
'lottery_record_id': lottery_record_id,
'red_balls': ','.join(map(str, ball_pair['红球'])),
'blue_ball': ball_pair['蓝球']
}
logging.info(f"准备执行号码表插入,SQL: {numbers_sql}, 参数: {numbers_data}")
cursor.execute(numbers_sql, numbers_data)
logging.info(f"号码数据插入成功: 红球 {numbers_data['red_balls']}, 蓝球 {numbers_data['blue_ball']}")
# 提交事务
conn.commit()
end_time = time.time()
logging.info(f"事务提交成功,总耗时: {end_time - start_time:.2f}秒")
return {"status": "success", "message": "数据已成功保存", "execution_time": f"{end_time - start_time:.2f}秒"}
except Error as e:
error_time = time.time()
if conn.is_connected():
conn.rollback()
logging.error(f"数据库操作失败,执行回滚: {str(e)},耗时: {error_time - start_time:.2f}秒")
return {"status": "error", "message": str(e), "execution_time": f"{error_time - start_time:.2f}秒"}
finally:
if 'cursor' in locals():
cursor.close()
logging.info("数据库游标已关闭")
if 'conn' in locals() and conn.is_connected():
conn.close()
logging.info("数据库连接已关闭")
启动类
from fastapi import FastAPI
from word import router as word_router
from receive import router as receive_router
from JsonToSave import router as json_router
import os
import uvicorn
app = FastAPI()
# 注册子路由
app.include_router(json_router, prefix="/json")
if __name__ == '__main__':
os.makedirs('temp', exist_ok=True)
os.makedirs('output', exist_ok=True)
uvicorn.run(app, host='0.0.0.0', port=5000)
这样启动服务后,后端就提供了一个http://IP:5000/json/save 的请求,如下图在dify中的配置。
其中,需要注意的是配置要写在body里,同时把上下文传递进去才行。
最后把这个工作流发布成工具:
成功效果:
在数据库查看的结果,发现已经成功入库。
猜你喜欢
- 2025-04-11 手把手教你开发智能备份神器,小白也能30分钟搞定!
- 2025-04-11 画像笔记23- 作业流程调度(2)(作业流程图怎么画)
- 2025-04-11 Python接口自动化之常见用例读取方法介绍
- 2025-04-11 AI办公自动化:批量合并多个Excel表格的数据并汇总
- 2025-04-11 pc端微信用户图片DAT格式解码为图片
- 2025-04-11 Python自动化脚本应用与示例(pythonui自动化脚本)
- 2025-04-11 21-02-Python-文件操作下(python文件操作方法)
- 2025-04-11 Python定时任务,三步实现自动化(python定时任务,三步实现自动化效果)
- 2025-04-11 自动下载邮箱里未阅读的发票,并用邮件标题里指定的几个字命名
- 2025-04-11 照片整理很繁琐?3个python脚本帮你快速整理照片
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- md5 sha1 (32)
- mybatis plus page (35)
- semaphore 使用详解 (32)
- update from 语句 (32)
- vue @scroll (38)
- 堆栈区别 (33)
- 在线子域名爆破 (32)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)