云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

Dify工具使用全场景:彩票信息保存入库 (应用篇·第2期)

jxf315 2025-04-11 00:02:59 教程文章 13 ℃

上一篇Dify工具使用全场景:彩票自动兑奖管理系统 (应用篇·第1期)

上一篇通过上传彩票调用大模型,可以按规则生成包含彩票信息的json串,如下信息。

{
    "point": "save",
    "params": {
        "销售站点": "测试站点",
        "期号": "2024001",
        "玩法": "双色球",
        "投注时间": "2024-01-01 10:00:00",
        "开奖时间": "2024-01-01 21:00:00",
        "验票码": "ABC123",
        "金额": "10元",
        "公益贡献": "1元",
        "微信订阅号": "lottery_test",
        "红球和蓝球对": [
            {
                "红球": [1, 2, 3, 4, 5, 6],
                "蓝球": 7
            }
        ]
    }
}

接下来,要把json信息入库保存,因为要把彩票信息管理起来,了解自己平时写的号码。

设计表结构存储

# 数据库表结构
"""
CREATE TABLE lottery_records (
    id INT AUTO_INCREMENT PRIMARY KEY,
    sales_site VARCHAR(255),
    issue_number VARCHAR(50),
    lottery_type VARCHAR(50),
    betting_time DATETIME,
    draw_time DATETIME,
    verification_code VARCHAR(100),
    amount DECIMAL(10,2),
    charity_contribution DECIMAL(10,2),
    wechat_subscription VARCHAR(100)
);

CREATE TABLE lottery_numbers (
    id INT AUTO_INCREMENT PRIMARY KEY,
    lottery_record_id INT,
    red_balls VARCHAR(100),
    blue_ball VARCHAR(10),
    FOREIGN KEY (lottery_record_id) REFERENCES lottery_records(id)
);
"""

WEB服务提供

这里需要写一段代码,用于实现web服务。

核心代码如下:

from pydantic import BaseModel

class LotteryData(BaseModel):
    point: str
    params: dict

from fastapi import APIRouter, Request, Header, HTTPException
import json
import logging
import os
import time

router = APIRouter()

import mysql.connector
from mysql.connector import Error

# MySQL数据库配置
db_config = {
    'host': '***.***.***.****',
    'user': 'root',
    'password': '3333',
    'database': 'dbname'
}

# 配置日志
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(os.path.join(log_dir, 'lottery.log')),
        logging.StreamHandler()
    ]
)

@router.post('/save')
async def save_lottery(data: LotteryData, authorization: str = Header(None)):
    start_time = time.time()
    logging.info(f"收到保存请求: {data.point}, 参数: {data.params}")
    
    # 验证API密钥
    expected_api_key = "123456"  # 设置您的API密钥
    if not authorization:
        logging.warning("请求缺少authorization header")
        raise HTTPException(status_code=401, detail="No authorization header")
    
    auth_scheme, _, api_key = authorization.partition(' ')
    if auth_scheme.lower() != "bearer" or api_key != expected_api_key:
        logging.warning(f"无效的认证信息: {auth_scheme}")
        raise HTTPException(status_code=401, detail="Unauthorized")
    
    # 从params中获取实际的彩票数据
    content = data.params
    result = json_to_sql(content)
    
    end_time = time.time()
    logging.info(f"请求处理完成,耗时: {end_time - start_time:.2f}秒,结果: {result}")
    return result

def json_to_sql(data):
    start_time = time.time()
    logging.info(f"开始处理彩票数据: {data}")
    try:
        # 连接到MySQL数据库
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()
        logging.info(f"数据库连接成功,耗时: {time.time() - start_time:.2f}秒")

        # 主表插入语句
        main_sql = """
        INSERT INTO lottery_records (
            sales_site, issue_number, lottery_type, betting_time, draw_time,
            verification_code, amount, charity_contribution, wechat_subscription
        ) VALUES (
            %(sales_site)s, %(issue_number)s, %(lottery_type)s, %(betting_time)s,
            %(draw_time)s, %(verification_code)s, %(amount)s, %(charity_contribution)s,
            %(wechat_subscription)s
        )
        """
        
        # 准备主表数据
        main_data = {
            'sales_site': data['销售站点'],
            'issue_number': data['期号'],
            'lottery_type': data['玩法'],
            'betting_time': data['投注时间'],
            'draw_time': data['开奖时间'],
            'verification_code': data['验票码'],
            'amount': float(data['金额'].replace('元', '')),
            'charity_contribution': float(data['公益贡献'].replace('元', '')),
            'wechat_subscription': data['微信订阅号']
        }
        
        # 执行主表插入
        logging.info(f"准备执行主表插入,SQL: {main_sql}, 参数: {main_data}")
        cursor.execute(main_sql, main_data)
        lottery_record_id = cursor.lastrowid
        logging.info(f"主表数据插入成功,ID: {lottery_record_id}, 耗时: {time.time() - start_time:.2f}秒")

        # 号码表插入语句
        numbers_sql = """
        INSERT INTO lottery_numbers (lottery_record_id, red_balls, blue_ball)
        VALUES (%(lottery_record_id)s, %(red_balls)s, %(blue_ball)s)
        """

        # 插入所有号码记录
        for ball_pair in data['红球和蓝球对']:
            numbers_data = {
                'lottery_record_id': lottery_record_id,
                'red_balls': ','.join(map(str, ball_pair['红球'])),
                'blue_ball': ball_pair['蓝球']
            }
            logging.info(f"准备执行号码表插入,SQL: {numbers_sql}, 参数: {numbers_data}")
            cursor.execute(numbers_sql, numbers_data)
            logging.info(f"号码数据插入成功: 红球 {numbers_data['red_balls']}, 蓝球 {numbers_data['blue_ball']}")

        # 提交事务
        conn.commit()
        end_time = time.time()
        logging.info(f"事务提交成功,总耗时: {end_time - start_time:.2f}秒")
        return {"status": "success", "message": "数据已成功保存", "execution_time": f"{end_time - start_time:.2f}秒"}

    except Error as e:
        error_time = time.time()
        if conn.is_connected():
            conn.rollback()
            logging.error(f"数据库操作失败,执行回滚: {str(e)},耗时: {error_time - start_time:.2f}秒")
        return {"status": "error", "message": str(e), "execution_time": f"{error_time - start_time:.2f}秒"}

    finally:
        if 'cursor' in locals():
            cursor.close()
            logging.info("数据库游标已关闭")
        if 'conn' in locals() and conn.is_connected():
            conn.close()
            logging.info("数据库连接已关闭")

启动类

from fastapi import FastAPI
from word import router as word_router
from receive import router as receive_router
from JsonToSave import router as json_router
import os
import uvicorn

app = FastAPI()

# 注册子路由
app.include_router(json_router, prefix="/json")

if __name__ == '__main__':
    os.makedirs('temp', exist_ok=True)
    os.makedirs('output', exist_ok=True)
    uvicorn.run(app, host='0.0.0.0', port=5000)

这样启动服务后,后端就提供了一个http://IP:5000/json/save 的请求,如下图在dify中的配置。

其中,需要注意的是配置要写在body里,同时把上下文传递进去才行。

最后把这个工作流发布成工具:

成功效果:

在数据库查看的结果,发现已经成功入库。

Tags:

最近发表
标签列表