电商全域数据治理与 Multi-Agent 分析中台:基于 n8n + LLM 架构,实现多源电商数据 ETL、规范化数仓分层建模 (DWD/ADS) 及飞书端智能对话查询闭环。
本项目是一套面向电商业务的全域数据中台解决方案,覆盖 抖店、小红书、微信视频号、巨量千川、电商罗盘 五大平台。系统采用 "Python CLI 脚本 + n8n 工作流引擎" 的解耦架构:
- 底层业务逻辑:Python CLI 脚本实现 ETL 提取、SQL 建模、飞书同步
- 调度编排层:n8n 实现 Master-Sub 工作流调度,解决 Cron 绝对时间调度带来的资源竞争问题
- 数据分层:ODS(原始层)→ DWD(明细层)→ DWS/DIM(主题层)→ADS(应用层)标准数仓建模
核心能力:
- 多平台订单/售后/结算数据、投放数据、电商罗盘数据自动化采集(API + 爬虫双模式)
- 分钟级/日级自动化调度,支持增量/全量同步
- 飞书多维表双向同步(MySQL ↔ 飞书)
- 🎬 素材智能诊断(Whisper ASR + 千问大模型 + 极值截图 + 诊断报告)
- 🤖 多智能体协作架构(Supervisor Agent 路由分发 + 专业 Agent 协作 + Text-to-SQL + 自动可视化)
- 🏛️ 企业级数据治理(元数据管理 + 数据字典)
- 🏢 企业级Agent增强版(完整数据治理 + 权限控制 + RAG知识库 + 审计日志)
| 指标 | 数值 | 说明 |
|---|---|---|
| 日均数据采集量 | 100万+ 条 | 四平台订单+售后+结算+投放+罗盘榜单数据 |
| 单次全量采集耗时 | 1.5-2 小时 | 并行执行优化后 |
| AI Agent 响应时间 | 60-120 秒 | Text-to-SQL + 报告生成 |
| 飞书同步延迟 | < 30 秒 | 增量/全量同步 |
| 维度 | 传统 Cron 、 EVENT 调度 | 本项目 Master-Sub 架构 |
|---|---|---|
| 资源竞争 | ❌ 多任务同时抢占资源 | ✅ 串行熔断避免 OOM |
| 错误处理 | ❌ 手动排查 | ✅ 自动告警推送 |
| 可视化 | ❌ 黑盒执行 | ✅ n8n 可视化编排 |
| 扩展性 | ❌ 修改需重启 | ✅ 低代码拖拽扩展 |
| 任务依赖 | ❌ 无依赖性 | ✅ 任务调度编排 |
| 平台 | 数据类型 | 字段数量 | 特性 |
|---|---|---|---|
| 抖店 | 订单/售后/结算 | 300+/180+/50+ | 多线程、指数退避重试、自动Token刷新、增量采集、异常报警 |
| 小红书 | 订单/售后/结算 | 100+/60+/50+ | 多线程、指数退避重试、数据库Token管理、增量采集、异常报警 |
| 微信视频号 | 订单/售后/结算 | 100+/60+/80+ | 多线程、指数退避重试、自动Token管理、全量采集、异常报警 |
| 巨量千川 | 直播画面/视频/其他创意 | 60+/80+/9 | 多线程、指数退避重试、分钟级数据、数据库Token管理、多账户支持、增量更新、异常报警 |
- 内容榜:热门内容数据采集
- 达人榜:达人排行榜数据
- 商品榜:商品销售排行
- 搜索榜:搜索热词排行
- 店铺榜:店铺销售排行
| 能力 | 说明 |
|---|---|
| 视频自动下载 | 基于抖音视频 ID 自动下载视频文件 |
| Whisper ASR 语音识别 | 提取视频语音文案,生成带时间戳的文本 |
| 千问大模型分析 | 自动纠错、情感分析、卖点提炼 |
| 极值画面截图 | 自动识别视频高潮点(最高点击、最大上升等)并截图 |
| 诊断报告生成 | 输出 Excel 报告,包含截图、文案、分析结果 |
典型应用场景:
- 批量分析竞品视频,提取爆款文案和卖点
- 诊断视频内容,识别高转化片段
- 自动生成视频内容分析报告
- 自动化 SQL 建表
- 数据仓库分层设计(ODS/DWD/DIM/ADS)
- 支持增量更新
- MySQL ↔ 飞书多维表双向同步
- 主键去重、增量同步
- 字段映射、类型转换
| 能力 | 说明 |
|---|---|
| 自动化元数据采集 | 通过 information_schema 自动同步所有业务库表结构 |
| 企业级管理字段 | 支持数据负责人、安全等级、业务标签管理 |
| 幂等增量更新 | ON DUPLICATE KEY UPDATE 实现增量同步 |
| AI Agent 集成 | 为 Text-to-SQL Agent 提供完整的 DDL Schema 上下文 |
核心价值:元数据表为 AI Agent 提供结构化的 Schema 上下文,显著提升 Text-to-SQL 准确性。
| 能力 | 基础版 | 企业级增强版 |
|---|---|---|
| 多智能体协作 | ✅ Supervisor + 2个专业Agent | ✅ Supervisor + 7个专业Agent |
| 智能路由分发 | ✅ 2种路由策略 | ✅ 4种路由策略(+澄清/拒绝) |
| Text-to-SQL | ✅ 基础DDL Schema | ✅ 完整DDL + 分析规划 + 查询预算守卫 |
| 多轮对话记忆 | ✅ LangChain Memory Buffer | ✅ LangChain Memory Buffer |
| 智能图表生成 | ✅ QuickChart API | ✅ QuickChart API |
| 报告增强 | ✅ 结构化报告 | ✅ 结构化报告 + 结果有效性判断 |
| QA 质检 | ✅ 基础QA | ✅ 企业级QA + 审计日志 |
| 飞书交互卡片 | ✅ 富文本消息 | ✅ 富文本 + 企业级卡片模板 |
| 错误自动告警 | ✅ 异常告警 | ✅ 分级告警 + 空结果处理 |
| 入口治理 | ❌ | ✅ Webhook校验 + 敏感词拦截 |
| 权限控制 | ❌ | ✅ 管理员白名单 + 敏感字段拦截 |
| RAG知识库 | ❌ | ✅ 文档上传 + 向量检索 + Agent工具 |
| 治理维度 | 能力说明 |
|---|---|
| 🚀 入口治理 | Webhook合法性校验、统一traceId生成、敏感词拦截(drop/truncate/手机号导出等)、风控标签 |
| 🔐 权限治理 | 管理员OpenID白名单、敏感字段自动识别、导出权限分级控制、数据范围限制(最大行数/回溯天数) |
| 📋 路由治理 | 4种路由策略:sql_analysis(分析)、direct_reply(直答)、clarification(澄清)、reject(拒绝) |
| 📊 分析治理 | 分析规划Agent(目标/指标/维度/策略)、查询预算守卫(最大行数/时间窗口)、结果有效性判断、空结果友好处理 |
| 📚 RAG知识库 | 支持PDF/TXT/MD/DOCX/CSV/JSON上传、自动文档切块、内置向量存储、Agent工具调用检索 |
| 📤 输出治理 | 统一QA质检、图表/Excel智能分离、飞书卡片/文件双通道推送、审计日志封装 |
典型应用场景:
- "帮我分析本月抖店 GMV TOP10 达人"
- "对比上周和本周各平台订单量变化趋势"
- "查询最近7天千川投放各主播的 ROI 数据"
本项目实现了一套 多智能体协作架构(Multi-Agent Architecture),基于 n8n + LangChain + 千问大模型,通过 Supervisor Agent 路由分发 实现任务智能分流:
用户提问(飞书)→ Supervisor Agent(路由分发)→ 专业 Agent 协作处理 → 飞书推送结果
在基础版之上,我们实现了企业级增强版,增加了完整的数据治理、权限控制、RAG知识库等能力:
| 治理维度 | 基础版 | 企业级增强版 |
|---|---|---|
| 入口治理 | 简单消息解析 | Webhook合法性校验、统一traceId、风控标签、敏感词拦截 |
| 路由治理 | 2种路由 (sql_analysis/direct_reply) | 4种路由 (+ clarification/reject) |
| 权限治理 | 无 | 管理员白名单、敏感字段拦截、导出权限控制、数据范围限制 |
| 分析治理 | 直接执行 | 分析规划、查询预算守卫、结果有效性判断、空结果处理 |
| RAG知识库 | 无 | 文档上传、自动切块、向量检索、Agent工具调用 |
| 输出治理 | 基础QA | 统一QA、图表/Excel分离、飞书卡片/文件双通道、审计日志 |
flowchart TB
direction TB
%% 1. 飞书Webhook入口
subgraph Entry["🚀 企业级入口治理"]
direction TB
Webhook["Webhook触发"]
Governance["企业级入口治理
• 合法性校验
• 敏感词拦截
• 统一traceId"]
RiskCheck["请求合法性判断"]
RiskReject["非法请求响应"]
end
%% 2. 会话与权限治理
subgraph Session["🔐 会话与权限治理"]
direction TB
SessionGov["会话与风险治理
• 优先级识别
• 风险等级评估"]
Permission["权限与数据范围校验
• 管理员白名单
• 敏感字段拦截"]
PermCheck["权限判断"]
PermReject["权限不足回复Agent"]
end
%% 3. 路由分发
subgraph Router["🎯 Enterprise Supervisor Agent"]
direction TB
Supervisor["Supervisor Agent
• 任务拆解
• 风控判断
• 路由决策"]
RouteParse["解析路由结果"]
RouteSwitch["任务路由 (4分支)"]
end
%% 4. 四大分支
subgraph Branches["📋 四大处理分支"]
direction TB
subgraph SQLBranch["📊 SQL分析分支"]
AnalysisPlan["分析规划Agent"]
ParsePlan["解析分析规划"]
QueryGuard["查询预算守卫"]
Analyst["Analyst Agent
(Text-to-SQL)"]
MySQL["MySQL Tool"]
ResultCheck["分析结果评估"]
ResultValid["结果有效性判断"]
EmptyAgent["空结果回复Agent"]
Report["报告增强Agent"]
QA["QA Agent"]
end
subgraph DirectBranch["💬 直接回复分支"]
DirectAgent["Direct Reply Agent
• 知识库工具
• 指标口径查询"]
end
subgraph ClarifyBranch["❓ 澄清分支"]
ClarifyAgent["澄清问题Agent
• 生成澄清问题
• 提供示例"]
end
subgraph RejectBranch["🚫 拒绝分支"]
RejectAgent["拒绝回复Agent
• 说明拒绝原因
• 提供替代方案"]
end
end
%% 5. 输出治理
subgraph Output["📤 输出治理与推送"]
direction TB
Split["分离文本和图表"]
ChartCheck["图片生成判断"]
QuickChart["QuickChart生成"]
FeishuImg["飞书图片上传"]
ExcelGen["Excel生成"]
FeishuFile["飞书文件上传"]
CardPush["飞书卡片推送"]
Audit["审计日志封装"]
end
%% 6. RAG知识库
subgraph RAG["📚 RAG知识库模块"]
direction TB
Form["知识库上传表单"]
Normalize["标准化知识库文件"]
Loader["知识库文档加载器"]
Splitter["知识库切块器"]
VectorStore["写入内置向量库"]
Retrieve["知识库检索工具"]
end
%% 流程连线 - 入口治理
Webhook --> Governance
Governance --> RiskCheck
RiskCheck -->|非法| RiskReject
RiskCheck -->|合法| SessionGov
%% 会话与权限
SessionGov --> Permission
Permission --> PermCheck
PermCheck -->|无权限| PermReject
PermCheck -->|有权限| Supervisor
%% 路由分发
Supervisor --> RouteParse
RouteParse --> RouteSwitch
%% 四大分支
RouteSwitch -->|sql_analysis| SQLBranch
RouteSwitch -->|direct_reply| DirectBranch
RouteSwitch -->|clarification| ClarifyBranch
RouteSwitch -->|reject| RejectBranch
%% SQL分支内部
AnalysisPlan --> ParsePlan
ParsePlan --> QueryGuard
QueryGuard --> Analyst
Analyst --> MySQL
MySQL --> ResultCheck
ResultCheck --> ResultValid
ResultValid -->|无效| EmptyAgent
ResultValid -->|有效| Report
Report --> QA
%% 输出治理
QA --> Split
DirectAgent --> Split
ClarifyAgent --> Split
RejectAgent --> Split
EmptyAgent --> Split
PermReject --> Split
Split --> ChartCheck
ChartCheck -->|有图表| QuickChart
QuickChart --> FeishuImg
ChartCheck -->|有Excel| ExcelGen
ExcelGen --> FeishuFile
FeishuImg --> CardPush
FeishuFile --> CardPush
Split -->|纯文本| CardPush
CardPush --> Audit
%% RAG知识库
Form --> Normalize
Normalize --> Loader
Loader --> Splitter
Splitter --> VectorStore
VectorStore --> Retrieve
Retrieve -.->|工具调用| DirectAgent
Retrieve -.->|工具调用| Analyst
%% 样式
style Entry fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Session fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Router fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style Branches fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Output fill:#fce4ec,stroke:#c2185b,stroke-width:2px
style RAG fill:#e0f2f1,stroke:#00796b,stroke-width:2px
// Enterprise Supervisor Agent 路由规则
{
"route": "sql_analysis | direct_reply | clarification | reject",
"reason": "路由原因",
"needChart": true,
"needExcel": false,
"sqlFocus": "查询重点",
"riskLevel": "low | medium | high",
"requiredDataDomain": ["ads", "dwd"],
"missingInfo": ["缺失条件"],
"responseStyle": "standard | executive"
}
// 路由策略:
// 1. 数据查询/统计/趋势/对比/ROI/GMV -> sql_analysis
// 2. 信息不足无法安全推断 -> clarification
// 3. 寒暄/流程说明/FAQ -> direct_reply
// 4. 越权/敏感导出/高风险 -> reject| Agent | 基础版 | 企业级增强版 |
|---|---|---|
| Supervisor Agent | 简单路由 | 增加风控判断、风险等级、响应风格 |
| Analyst Agent | Text-to-SQL | 增加分析规划、查询预算守卫、DDL Schema |
| Direct Reply Agent | 简单回复 | 集成RAG知识库检索、指标口径查询 |
| 新增: 澄清Agent | ❌ | 信息不足时生成澄清问题 |
| 新增: 拒绝Agent | ❌ | 高风险请求礼貌拒绝 |
| 新增: 空结果Agent | ❌ | SQL失败/结果为空时友好回复 |
| 新增: 权限回复Agent | ❌ | 敏感字段越权时专业回复 |
flowchart TB
direction TB
%% 1. 飞书机器人
subgraph FeishuBot["飞书机器人 (Webhook 触发)"]
end
%% 2. 任务预处理
subgraph Preprocess["任务预处理 (Code Node)
解析飞书消息、提取用户文本、生成 TraceID"]
end
%% 3. 路由分发Agent
subgraph Supervisor["🎯 Supervisor Agent (路由分发)"]
direction LR
RouteInfo["职责:任务拆解与路由,不直接进行 SQL 查询
输出:{ route: 'sql_analysis' | 'direct_reply',
reason: '路由原因',
needChart: true,
sqlFocus: '查询重点' }"]
end
%% 4. 任务路由Switch
subgraph SwitchNode["任务路由 (Switch Node)
根据 route 字段分发到不同分支"]
end
%% 5. 数据分析分支
subgraph SQLBranch["📊 数据分析分支"]
direction TB
SQLAgent["SQL Agent (Text-to-SQL)"]
MySQL["MySQL Tool (执行查询)"]
ReportAgent["报告增强 Agent (打磨报告)"]
QAAgent["QA Agent (质量检查)"]
end
%% 6. 直接回复分支
subgraph DirectBranch["💬 直接回复分支"]
direction TB
DirectAgent["Direct Reply Agent"]
DirectTip["处理寒暄、解释流程
非数据问题、权限外"]
end
%% 7. 智能输出处理
subgraph Output["智能输出处理 (Code Node)"]
direction LR
Text["文本分析 (Markdown)"]
Chart["图表 JSON (QuickChart)"]
Excel["Excel 数据 (表格组件)"]
end
%% 8. 飞书消息卡片
subgraph Card["飞书消息卡片 (交互式推送)"]
direction LR
CardContent["📊 智能数据中台洞察
────────────────────
【分析结论】
本月抖店GMV为 xxx 万元,环比增长 xx%...
────────────────────
[图表图片]
────────────────────
[数据明细表格] (支持翻页)"]
end
%% 流程连线
FeishuBot -->|用户提问| Preprocess
Preprocess --> Supervisor
Supervisor --> SwitchNode
SwitchNode -->|route=sql_analysis| SQLBranch
SwitchNode -->|route=direct_reply| DirectBranch
SQLBranch --> Output
DirectBranch --> Output
Output --> Card
%% 数据分析分支内部流程
SQLAgent --> MySQL
MySQL --> ReportAgent
ReportAgent --> QAAgent
%% 样式(和你整套完全统一)
style FeishuBot fill:#f5f5f5,stroke:#333,stroke-width:2px
style Preprocess fill:#f5f5f5,stroke:#333,stroke-width:2px
style Supervisor fill:#e1f5fe,stroke:#0288d1,stroke-width:2px
style SwitchNode fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
style SQLBranch fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
style DirectBranch fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Output fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
style Card fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style RouteInfo fill:#fff,stroke:#555
style SQLAgent fill:#fff,stroke:#555
style MySQL fill:#fff,stroke:#555
style ReportAgent fill:#fff,stroke:#555
style QAAgent fill:#fff,stroke:#555
style DirectAgent fill:#fff,stroke:#555
style DirectTip fill:#fff,stroke:#555
style Text fill:#fff,stroke:#555
style Chart fill:#fff,stroke:#555
style Excel fill:#fff,stroke:#555
style CardContent fill:#fff,stroke:#555
| Agent | 职责 | 输入 | 输出 |
|---|---|---|---|
| 🎯 Supervisor Agent | 任务拆解与路由分发 | 用户问题 | { route, reason, needChart, sqlFocus } |
| 📊 SQL Agent | Text-to-SQL 生成与执行 | 用户问题 + 路由计划 | SQL 查询结果 + 分析初稿 |
| 📝 报告增强 Agent | 打磨分析报告 | 分析初稿 | 结构化报告(结论+洞察+建议+图表配置) |
| ✅ QA Agent | 统一输出质量 | 增强后报告 | 修复后的 Markdown + 图表 JSON + Excel 数据 |
| 💬 Direct Reply Agent | 处理非数据问题 | 用户问题 | 直接回复文本 |
// Supervisor Agent 路由规则
{
"route": "sql_analysis | direct_reply",
"reason": "路由原因",
"needChart": true,
"needExcel": false,
"sqlFocus": "查询重点"
}
// 路由策略:
// 1. 需要查数据库/统计/趋势/对比/ROI/GMV 等分析任务 -> sql_analysis
// 2. 寒暄、解释流程、非数据问题、权限外问题 -> direct_reply
// 3. 用户意图不清晰但明显偏数据分析 -> 优先 sql_analysis| 能力 | 说明 |
|---|---|
| 多智能体协作 | Supervisor Agent 路由分发 + 专业 Agent 协作处理 |
| 智能路由分发 | 自动判断任务类型,路由到数据分析分支或直接回复分支 |
| Text-to-SQL | 用户用自然语言提问,AI 自动生成 SQL 查询语句 |
| DDL Schema 注入 | 将数据库表结构注入 System Prompt,确保 SQL 准确性 |
| 报告增强 | 自动打磨分析报告,生成结构化洞察和建议 |
| QA 质检 | 统一输出质量,修复 Markdown 可读性,确保飞书兼容 |
| 自动可视化 | 智能判断是否需要生成图表,调用 QuickChart API |
| 飞书交互卡片 | 支持 Markdown、图片、表格组件的富文本推送 |
| 错误兜底 | 异常时自动推送告警消息到飞书群 |
| 数据库 | 表名 | 用途 |
|---|---|---|
dwd |
dwd_电商数据_宽表 |
抖店/微信/小红书全渠道订单+售后+结算明细 |
dwd |
dwd_千川素材h_中间表 |
千川投放素材维度的转化分析 |
ads |
ads_直播素材_m分摊表 |
每分钟每个主播的直播间明细(直播+投放+订单) |
用户提问:
"帮我分析一下本月抖店的GMV,按达人分组,取前10名"
AI Agent 自动处理流程:
- Supervisor Agent 分析意图,输出路由计划:
{
"route": "sql_analysis",
"reason": "需要查询数据库统计GMV",
"needChart": true,
"sqlFocus": "抖店本月GMV按达人分组TOP10"
}- 生成 SQL 查询语句:
SELECT
`达人名称`,
SUM(`买家实付`) as GMV
FROM dwd.dwd_电商数据_宽表
WHERE `平台` = '抖店'
AND `支付时间` >= '2024-03-01'
AND `支付时间` < '2024-04-01'
GROUP BY `达人名称`
ORDER BY GMV DESC
LIMIT 10- 报告增强 Agent 打磨分析报告:
## 📊 本月抖店 GMV TOP10 达人分析
### 一句话结论
本月抖店 GMV 集中度较高,TOP10 达人贡献了 xx% 的总 GMV...
### 核心指标总览
| 排名 | 达人名称 | GMV(万元) | 占比 |
|------|----------|-----------|------|
| 1 | xxx | 123.45 | 15.2% |
| ... | ... | ... | ... |
### 关键洞察
1. 头部达人效应明显...
2. 增长趋势分析...
3. 异常波动提示...
### 可执行建议
1. 优先级1:...
2. 优先级2:...-
QA Agent 质量检查,修复格式问题
-
推送飞书消息卡片(包含分析结论 + 图表 + 数据表格)
// n8n Supervisor Agent 节点配置
{
"type": "@n8n/n8n-nodes-langchain.agent",
"parameters": {
"promptType": "define",
"text": "={{ $json.userText }}",
"options": {
"systemMessage": "=# Role\n你是 Supervisor Agent,只负责任务拆解与路由,不直接进行 SQL 查询。\n\n# Output Rule\n你必须只输出一个 JSON 对象:\n{\n \"route\": \"sql_analysis 或 direct_reply\",\n \"reason\": \"一句话说明路由原因\",\n \"needChart\": true,\n \"sqlFocus\": \"查询重点\"\n}"
}
}
}
// n8n Switch 节点配置(任务路由)
{
"type": "n8n-nodes-base.switch",
"parameters": {
"rules": {
"values": [
{
"conditions": {
"conditions": [
{
"leftValue": "={{ $json.routePlan.route }}",
"rightValue": "sql_analysis",
"operator": { "type": "string", "operation": "equals" }
}
]
},
"outputKey": "数据分析分支"
},
{
"conditions": {
"conditions": [
{
"leftValue": "={{ $json.routePlan.route }}",
"rightValue": "direct_reply",
"operator": { "type": "string", "operation": "equals" }
}
]
},
"outputKey": "直接回复分支"
}
]
}
}
}
// MySQL Tool 节点配置
{
"type": "n8n-nodes-base.mySqlTool",
"parameters": {
"operation": "executeQuery",
"query": "{{ $fromAI('sql') }}"
}
}┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Error │────▶│ 飞书推送 │────▶│ 告警消息 │
│ Trigger │ │ 错误信息 │ │ 卡片 │
└─────────────┘ └─────────────┘ └─────────────┘
当工作流执行失败时,自动触发错误兜底节点,推送包含以下信息的告警:
- 工作流名称
- 报错节点
- 错误详情
flowchart TB
direction TB
%% 第一层:主调度器
subgraph MasterWorkflow["n8n Master Workflow<br/>(主调度器 - 解决资源竞争)"]
end
%% 第二层:API采集、爬虫采集
subgraph SubWF1["Sub-Workflow 1<br/>API 采集<br/>(并行执行)"]
end
subgraph SubWF2["Sub-Workflow 2<br/>爬虫采集<br/>(串行熔断)"]
end
%% 第三层:SQL 建模
subgraph SubWF3["Sub-Workflow 3<br/>SQL 建模<br/>(顺序执行)"]
end
%% 第四层:飞书同步
subgraph SubWF4["Sub-Workflow 4<br/>飞书同步<br/>(并行执行)"]
end
%% 正确层级连线
MasterWorkflow --> SubWF1
MasterWorkflow --> SubWF2
SubWF1 --> SubWF3
SubWF2 --> SubWF3
SubWF3 --> SubWF4
%% 统一配色(和你之前完全一致)
style MasterWorkflow fill:#f5f5f5,stroke:#333,stroke-width:2px
style SubWF1 fill:#e1f5fe,stroke:#0288d1,stroke-width:2px
style SubWF2 fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
style SubWF3 fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
style SubWF4 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
| 分层 | 数据库 | 说明 | 核心表 |
|---|---|---|---|
| ODS (Operational Data Store) | ods |
原始数据层,保持数据原貌 | ods_抖店_订单_全量、ods_微信_订单_全量、ods_红书_订单_全量 |
| DWD (Data Warehouse Detail) | dwd |
明细数据层,多源合并清洗 | dwd_电商数据_宽表(抖店+微信+小红书 UNION ALL) |
| DIM (Dimension) | dim |
维度数据层 | dim_达人维度、dim_商品维度 |
| ADS (Application Data Service) | ads |
应用数据层 | 业务指标聚合表 |
💡 设计亮点:DWD 层通过
UNION ALL实现多平台订单数据统一建模,字段标准化映射,支持跨平台分析。
n8n-Multi_Agent_Workflow/
├── API_data_collect/ # API 数据采集模块
│ ├── order_D/ # 抖店订单采集
│ │ ├── main.py # CLI 入口
│ │ └── doudian/ # 抖店 SDK 封装
│ ├── order_H/ # 小红书订单采集
│ │ ├── main.py
│ │ └── xiaohongshu/
│ ├── order_W/ # 微信视频号订单采集
│ │ ├── main.py
│ │ └── weixin/
│ └── qc_material/ # 巨量千川素材采集
│ ├── main.py
│ └── qianchuan/
│
├── web_crawler/ # 爬虫数据采集模块
│ ├── luopan_rank/ # 电商罗盘榜单爬虫
│ │ ├── rank_kol.py # 达人榜单
│ │ ├── rank_product.py # 商品榜单
│ │ ├── rank_content.py # 内容榜单
│ │ └── config.py # 行业类目配置
│ └── material_ASR/ # 🎬 素材智能诊断
│ ├── material_qianchuan.py # 千川素材分析(极值截图+AI分析)
│ ├── material_rank.py # 抖音视频批量分析(Whisper+千问)
│ └── run_single_video.py # 单条视频处理(n8n 调用入口)
│
├── chrome_port/ # 浏览器持久化配置
│ ├── port_9222/ # 接管浏览器9222端口
│ ├── ·········
│ └── port_9227/ # 接管9227端口
│
├── SQL_create_table/ # SQL 建表脚本
│ ├── order_total.sql # DWD 电商订单数据 DDL + 更新语句
│ ├── order_total.sql # DWD 千川投放数据 DDL + 更新语句
│ ├── ·········
│ ├── dim_order_kol.sql # 达人维度表 DDL + 更新语句
│ ├── dim_order_product.sql # 商品维度表 DDL + 更新语句
│ └── dim_Metadata&Dictionary.sql # 🌟 全域数据字典表
│
├── mysql_syn_feishu/ # 飞书同步模块
│ ├── run_sync.py # 统一同步入口
│ ├── sync_order_kol.py # 达人维度同步
│ ├── sync_order_product.py # 商品维度同步
│ └── feishu_sync/ # 飞书 SDK 封装
│
├── docs/
│ └── images/
│ ├── master_workflow.png # n8n 主工作流截图
│ ├── sub_workflow_qc.png # API 采集子工作流截图
│ ├── data_analysis_agent.png # 数据分析 Agent 截图
│ ├── agent使用示例.png # 数据分析 Agent 使用示例截图
│ └── n8n必要凭证. # n8n凭证管理截图
|
├── n8n_Workflow/ # n8n 工作流 JSON
│ ├── 数据中台 - 完整ETL工作流Agent.json # 主ETL工作流
│ ├── 数据分析Agent.json # 🤖 AI数据分析Agent(Text-to-SQL)基础版
│ ├── 数据分析Agent_增强版.json # 🏢 企业级增强版(推荐)
│ ├── [ETL-WeChat] 订单+售后+结算同步.json
│ └── [MySQL-Feishu] 通用Sync逻辑.json
│
├── n8n_setting/ # n8n Docker 部署配置
│ ├── docker-compose.yml
│ └── Dockerfile
│
├── README.md
├── LICENSE
├── .gitignore
├── .env.example
└── requirements.txt # 环境配置文件
# 数据库配置
export DB_HOST=<YOUR_DB_HOST>
export DB_PORT=3306
export DB_USER=<YOUR_DB_USER>
export DB_PASSWORD=<YOUR_DB_PASSWORD>
export DB_NAME=ods
# 平台 API 凭证
export DOUDIAN_APP_KEY=<YOUR_APP_KEY>
export DOUDIAN_APP_SECRET=<YOUR_APP_SECRET>
export XHS_APP_KEY=<YOUR_APP_KEY>
export XHS_APP_SECRET=<YOUR_APP_SECRET>
export XHS_ACCESS_TOKEN=<YOUR_ACCESS_TOKEN>
export WEIXIN_APP_ID=<YOUR_APP_ID>
export WEIXIN_APP_SECRET=<YOUR_APP_SECRET>
export QIANCHUAN_APP_ID=<YOUR_APP_ID>
export QIANCHUAN_APP_SECRET=<YOUR_APP_SECRET>
export QIANCHUAN_REFRESH_TOKEN=<YOUR_REFRESH_TOKEN># 默认采集昨天数据
python API_data_collect/order_D/main.py \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}
# 指定时间范围采集
python API_data_collect/order_D/main.py \
--order_start "2024-01-01 00:00:00" \
--order_end "2024-01-31 23:59:59" \
--settle_start "2024-01-01 00:00:00" \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}
# 跳过特定数据类型
python API_data_collect/order_D/main.py \
--skip_orders \
--skip_aftersales \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}
# 调整并发线程数
python API_data_collect/order_D/main.py \
--max_workers 100 \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}# 基础采集
python API_data_collect/order_H/main.py \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}
# 指定时间范围
python API_data_collect/order_H/main.py \
--order_start "2024-01-01 00:00:00" \
--order_end "2024-01-31 23:59:59" \
--settle_start "2024-01-01 00:00:00" \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}# 采集全部数据类型(订单+售后+结算)
python API_data_collect/order_W/main.py \
--data-type all \
--start-date "2024-01-01 00:00:00" \
--end-date "2024-01-31 23:59:59" \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}
# 仅采集订单数据
python API_data_collect/order_W/main.py \
--data-type order \
--start-date "2024-01-01 00:00:00" \
--end-date "2024-01-31 23:59:59" \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}# 采集全部素材类型(直播间画面+视频+其他创意)
python API_data_collect/qc_material/main.py \
--data-type all \
--start-date "2024-01-01 00:00:00" \
--end-date "2024-01-31 23:59:59" \
--accounts "advertiser_id:aweme_id:账户名称,advertiser_id2:aweme_id2:账户名称2" \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}
# 仅采集直播间画面数据
python API_data_collect/qc_material/main.py \
--data-type live \
--accounts "123456:987654321:测试账户" \
--db_host ${DB_HOST} \
--db_user ${DB_USER} \
--db_password ${DB_PASSWORD} \
--db_name ${DB_NAME}# 初始化浏览器端口,持久化配置,提前登录或者准备cookie,drissionpage接管
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222 --user-data-dir="/Users/$(whoami)/Library/Application Support/Google/Chrome"
# 达人榜单采集
python web_crawler/luopan_rank/rank_kol.py \
--start_time "2024-01-01" \
--end_time "2024-01-31" \
--cookie "<YOUR_COOKIE>" \
--db_url "mysql+pymysql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:3306/ods?charset=utf8mb4"# 增量同步所有表
python mysql_syn_feishu/run_sync.py --all
# 全量同步所有表
python mysql_syn_feishu/run_sync.py --all --full-sync
# 同步指定表
python mysql_syn_feishu/run_sync.py --tables kol,product,rank
# 仅同步达人维度表(全量)
python mysql_syn_feishu/run_sync.py --tables kol --full-sync# 单条视频处理(n8n 调用入口)
python web_crawler/material_ASR/run_single_video.py <视频ID> <达人昵称>
# 输出示例(JSON):
# {
# "error": null,
# "video_id": "7123456789012345678",
# "segments_text": "[00:01] 大家好...\n[00:05] 今天推荐...",
# "segments": [...]
# }
# 千川素材分析(极值截图 + AI 分析)
python web_crawler/material_ASR/material_qianchuan.py
# 抖音视频批量分析(Whisper + 千问)
python web_crawler/material_ASR/material_rank.py💡 素材诊断流程:
- 通过
run_single_video.py下载视频并提取语音文案- 使用 Whisper ASR 进行语音识别,生成带时间戳的文本
- 调用千问大模型进行文案纠错、情感分析、卖点提炼
- 自动识别视频高潮点并截图
- 生成 Excel 诊断报告
每个平台的数据采集模块都采用模块化设计:
抖店平台模块/
├── __init__.py # 模块导出
├── config.py # 配置和映射字典
├── utils.py # 工具函数
├── client.py # API客户端
├── token_service.py # Token管理
├── order_service.py # 订单服务
├── aftersale_service.py # 售后服务
└── settle_service.py # 结算服务
核心特性:
- ✅ 支持命令行参数传入
- ✅ 支持环境变量配置
- ✅ 多线程批量获取(可配置
--max_workers) - ✅ 自动 Token 刷新
- ✅ 数据库自动写入(支持
append/replace模式)
基于 DrissionPage 和 RPC 签名服务实现:
web_crawler/
├── luopan_rank/ # 罗盘榜单爬虫
├── material_ASR/ # 视频解析爬虫
├── JS_a_bogus/ # 签名算法(JS)
├── server.py # RPC签名服务
└── hook.py # 浏览器Hook
工作原理:
- 启动 Chrome 浏览器(远程调试模式)
- 运行 hook.py 注入签名算法
- 启动 server.py 提供 RPC 服务
- 爬虫脚本调用签名服务获取 a_bogus 参数
支持 MySQL 与飞书多维表的双向同步:
from feishu_sync import quick_sync_mysql_to_feishu
quick_sync_mysql_to_feishu(
feishu_app_id="cli_xxx",
feishu_app_secret="xxx",
feishu_app_token="xxx",
feishu_table_id="tblxxx",
primary_key_field="达人ID",
mysql_host="localhost",
mysql_user="root",
mysql_password="password",
mysql_database="test_db",
mysql_table="target_table",
sql_template="SELECT * FROM source_table",
update_time_field="update_time",
full_sync=False
)| 表名 | 说明 | 数据来源 | 分层 |
|---|---|---|---|
ods_抖店_订单_全量 |
抖店订单数据 | 抖店API | ODS |
ods_抖店_售后_全量 |
抖店售后数据 | 抖店API | ODS |
ods_抖店_结算_全量 |
抖店结算数据 | 抖店API | ODS |
ods_红书_订单_全量 |
小红书订单数据 | 小红书API | ODS |
ods_红书_售后_全量 |
小红书售后数据 | 小红书API | ODS |
ods_红书_结算_全量 |
小红书结算数据 | 小红书API | ODS |
ods_微信_订单_全量 |
微信订单数据 | 微信API | ODS |
ods_微信_售后_全量 |
微信售后数据 | 微信API | ODS |
ods_微信_结算_全量 |
微信结算数据 | 微信API | ODS |
ods_千川素材_直播画面_m |
千川直播分钟数据 | 千川API | ODS |
ods_千川素材_视频_m |
千川素材投放分钟数据 | 千川API | ODS |
ods_千川素材_其他_m |
千川其他创意分钟数据 | 千川API | ODS |
ods_直播数据minute_v2 |
店铺直播分钟数据 | 抖店网页 | ODS |
ods_主播排期表 |
店铺主播排期数据 | 飞书多维表 | ODS |
ods_达人榜单_day |
电商罗盘达人榜单天数据 | 电商罗盘网页 | ODS |
ods_店铺榜单_day |
电商罗盘店铺榜单天数据 | 电商罗盘网页 | ODS |
ods_搜索榜单_day |
电商罗盘搜索榜单天数据 | 电商罗盘网页 | ODS |
ods_商品榜单_day |
电商罗盘商品榜单天数据 | 电商罗盘网页 | ODS |
ods_内容榜_视频销量榜_day |
电商罗盘内容榜单销量天数据 | 电商罗盘网页 | ODS |
ods_内容榜_引流直播榜_day |
电商罗盘内容榜单引流天数据 | 电商罗盘网页 | ODS |
ods_内容榜_直播交易榜_day |
电商罗盘商品榜单交易天数据 | 电商罗盘网页 | ODS |
ods_达人信息_主表 |
精选联盟达人基础信息+评分+匹配度+内容+带货数据 | 精选联盟网页 | ODS |
ods_达人信息_粉丝画像 |
精选联盟达人粉丝画像 | 精选联盟网页 | ODS |
ods_达人信息_商品流水 |
精选联盟达人带货商品销售数据 | 精选联盟网页 | ODS |
ods_达人信息_直播流水 |
精选联盟达人带货直播交易数据 | 精选联盟网页 | ODS |
dwd_千川_素材m |
千川素材分钟拼接数据 | SQL建模 | DWD |
dwd_千川_素材h |
千川视频小时聚合数据关联千川财务数据 | SQL建模 | DWD |
dwd_抖店_宽表 |
抖店订单+售后+结算数据 | SQL建模 | DWD |
dwd_红书_宽表 |
小红书订单+售后+结算数据 | SQL建模 | DWD |
dwd_微信_宽表 |
微信小店订单+售后+结算数据 | SQL建模 | DWD |
dwd_电商数据_宽表 |
三个平台订单合并宽表 | SQL建模 | DWD |
dim_达人维度 |
订单达人维度表 | SQL建模 | DIM |
dim_商品维度 |
订单商品维度表 | SQL建模 | DIM |
dim_达人榜单 |
榜单达人维度表 | SQL建模 | DIM |
dim_sys_metadata_dict |
🌟 全域数据字典表 | 自动采集 | DIM |
ads_千川素材_直播画面_m |
店铺主播直播+投放+订单数据 | SQL建模 | ADS |
ads_达人专场_分时段效果监控 |
达人直播专场实时监控视图 | SQL建模 | ADS |
本项目实现了一套企业级元数据管理系统,通过 dim_sys_metadata_dict 表实现全域数据资产的自动化治理:
CREATE TABLE dim.dim_sys_metadata_dict (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`db_name` varchar(64) NOT NULL COMMENT '库名 (ODS/DWD/DIM/ADS)',
`table_name` varchar(128) NOT NULL COMMENT '表英文名',
`table_comment` varchar(256) DEFAULT NULL COMMENT '表中文注释/业务含义',
`column_name` varchar(128) NOT NULL COMMENT '字段英文名',
`column_type` varchar(64) DEFAULT NULL COMMENT '字段完整类型',
`column_comment` text COMMENT '字段业务口径/逻辑说明',
`is_pk` tinyint(1) DEFAULT '0' COMMENT '是否主键',
`is_nullable` varchar(10) DEFAULT 'YES' COMMENT '是否允许为空',
`column_order` int(11) DEFAULT '0' COMMENT '字段排序',
-- 🌟 企业级管理字段
`data_owner` varchar(64) DEFAULT 'DataTeam' COMMENT '数据负责人',
`security_level` varchar(20) DEFAULT 'L1' COMMENT '安全等级 (L1:公开, L2:内部, L3:绝密)',
`tag_list` varchar(255) DEFAULT NULL COMMENT '标签 (如: 财务, 流量, 核心)',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_db_table_col` (`db_name`,`table_name`,`column_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='[核心] 全域数据字典与元数据表';| 能力 | 说明 |
|---|---|
| 自动化元数据采集 | 通过 information_schema 自动同步所有业务库表结构 |
| 智能类型拼接 | 保留字段精度(如 decimal(18,2)、varchar(256)) |
| 企业级管理字段 | 支持数据负责人、安全等级、业务标签管理 |
| 幂等增量更新 | ON DUPLICATE KEY UPDATE 实现增量同步,保留手动维护的业务属性 |
| AI Agent 集成 | 为 Text-to-SQL Agent 提供完整的 DDL Schema 上下文 |
flowchart TB
direction TB
%% 总流程
subgraph MetaDataFlow["元数据自动采集流程"]
direction TB
%% 第一层:四层分层
subgraph Layers[" "]
direction LR
ODS["ODS<br/>原始层"]
DWD["DWD<br/>明细层"]
DIM["DIM<br/>维度层"]
ADS["ADS<br/>应用层"]
end
%% 第二层:系统元数据表
subgraph InfoSchema["information_schema"]
direction TB
COLUMNS["COLUMNS"]
TABLES["TABLES"]
end
%% 第三层:业务元数据字典表
subgraph MetaTable["dim_sys_metadata_dict (元数据表)"]
direction TB
Fields["""
• 表名/字段名/类型/注释
• data_owner (数据负责人)
• security_level (安全等级)
• tag_list (业务标签)
"""]
end
%% 第四层:输出应用
subgraph Output[" "]
direction LR
AIAgent["AI Agent DDL<br/>(Text-to-SQL)"]
GovPlatform["数据治理平台<br/>(资产目录/血缘)"]
end
end
%% 连线
ODS --> InfoSchema
DWD --> InfoSchema
DIM --> InfoSchema
ADS --> InfoSchema
InfoSchema --> MetaTable
MetaTable --> AIAgent
MetaTable --> GovPlatform
%% 配色(沿用你整套主题色)
style MetaDataFlow fill:#f9f9f9,stroke:#333,stroke-width:2px
style Layers fill:#e1f5fe,stroke:#0288d1,stroke-width:1.5px
style InfoSchema fill:#e8f5e8,stroke:#2e7d32,stroke-width:1.5px
style MetaTable fill:#fff3e0,stroke:#f57c00,stroke-width:1.5px
style Output fill:#f3e5f5,stroke:#8e24aa,stroke-width:1.5px
style ODS fill:#fff,stroke:#555
style DWD fill:#fff,stroke:#555
style DIM fill:#fff,stroke:#555
style ADS fill:#fff,stroke:#555
style COLUMNS fill:#fff,stroke:#555
style TABLES fill:#fff,stroke:#555
style Fields fill:#fff,stroke:#555
style AIAgent fill:#fff,stroke:#555
style GovPlatform fill:#fff,stroke:#555
元数据表为智能数据分析 Agent 提供了完整的 DDL Schema 上下文:
// n8n AI Agent System Prompt 注入
const systemMessage = `
你是一个资深电商数据中台专家,拥有以下数据表:
${metadataDictRows.map(row =>
`表: ${row.table_name} (${row.table_comment})
字段: ${row.column_name} - ${row.column_type} - ${row.column_comment}`
).join('\n')}
请根据用户问题生成准确的 SQL 查询语句。
`;💡 设计亮点:元数据表不仅支持数据治理,还为 AI Agent 提供了结构化的 Schema 上下文,显著提升 Text-to-SQL 的准确性。
本项目采用 1 个主工作流 + 4 个子工作流 的 Master-Sub 架构,解决传统 Cron 绝对时间调度带来的资源竞争问题:
flowchart TD
%% 主调度器容器
subgraph MasterWorkflow["Master Workflow (主调度器)"]
%% 触发方式
trigger["触发方式: 每日凌晨 02:00 或手动触发"]
%% 核心节点
Switch["Switch(模式)"]
Wait["Wait (等待)"]
SQL["SQL (建模)"]
Notify["Notify (通知)"]
%% 子流程节点
SubWF1["Sub-WF 1<br/>API采集 (并行)"]
SubWF2["Sub-WF 2<br/>爬虫采集 (串行)"]
SubWF4["Sub-WF 4<br/>飞书同步 (并行)"]
%% 核心流程(语法修正+逻辑正确)
trigger --> Switch
Switch --> SubWF1
Switch --> SubWF2
SubWF1 --> Wait
SubWF2 --> Wait
Wait --> SQL
SQL --> SubWF4
SubWF4 --> Notify
end
%% 样式美化(保留原有配色)
style MasterWorkflow fill:#f5f5f5,stroke:#333,stroke-width:2px
style Switch fill:#e1f5fe,stroke:#0288d1,stroke-width:1.5px
style Wait fill:#f3e5f5,stroke:#8e24aa,stroke-width:1.5px
style SQL fill:#e8f5e8,stroke:#2e7d32,stroke-width:1.5px
style Notify fill:#fff3e0,stroke:#f57c00,stroke-width:1.5px
style SubWF1 fill:#e1f5fe,stroke:#0288d1,stroke-width:1px
style SubWF2 fill:#e8f5e8,stroke:#2e7d32,stroke-width:1px
style SubWF4 fill:#fff3e0,stroke:#f57c00,stroke-width:1px
| 节点 | 命令 | 超时 | 执行策略 |
|---|---|---|---|
| 抖店订单采集 | python API_data_collect/order_D/main.py |
7200s | 并行 |
| 小红书订单采集 | python API_data_collect/order_H/main.py |
7200s | 并行 |
| 微信订单采集 | python API_data_collect/order_W/main.py |
7200s | 并行 |
| 千川素材采集 | python API_data_collect/qc_material/main.py |
3600s | 并行 |
| 节点 | 命令 | 超时 | 执行策略 |
|---|---|---|---|
| 达人榜单采集 | python web_crawler/luopan_rank/rank_kol.py |
10800s | 串行 |
| 商品榜单采集 | python web_crawler/luopan_rank/rank_product.py |
10800s | 串行 |
| 内容榜单采集 | python web_crawler/luopan_rank/rank_content.py |
10800s | 串行 |
⚠️ 串行熔断机制:爬虫节点采用串行执行,避免多浏览器实例同时运行导致 OOM。
# 批量执行 SQL 建表脚本
for f in SQL_create_table/*.sql; do
mysql -h ${MYSQL_HOST} -u ${MYSQL_USER} -p"${MYSQL_PASSWORD}" ${MYSQL_DB} < "$f"
done| 节点 | 命令 | 同步方向 | 执行策略 |
|---|---|---|---|
| 主播排期同步 | sync_live_author_schedule.py |
飞书→MySQL | 并行 |
| 达人维度同步 | sync_order_kol.py |
MySQL→飞书 | 并行 |
| 商品维度同步 | sync_order_product.py |
MySQL→飞书 | 并行 |
| 达人榜单同步 | sync_rank_kol.py |
MySQL→飞书 | 并行 |
{
"command": "cd /data && python3 API_data_collect/order_D/main.py --db_host ${DB_HOST} --db_user ${DB_USER} --db_password ${DB_PASSWORD} --db_name ${DB_NAME}",
"executeOnce": true,
"timeout": 7200,
"continueOnFail": false
}| 节点名称 | 节点类型 | 功能说明 | 关键参数 |
|---|---|---|---|
Manual Trigger |
n8n-nodes-base.manualTrigger |
手动触发入口 | - |
Schedule Trigger |
n8n-nodes-base.scheduleTrigger |
定时触发(每日02:00) | cron: "0 2 * * *" |
Switch |
n8n-nodes-base.switch |
模式选择(全量/增量) | conditions: mode |
Execute Workflow |
n8n-nodes-base.executeWorkflow |
调用子工作流 | workflowId: "sub-workflow-id" |
Wait |
n8n-nodes-base.wait |
等待所有采集完成 | amount: 1, unit: hours |
Error Trigger |
n8n-nodes-base.errorTrigger |
错误兜底触发器 | - |
| 节点名称 | 节点类型 | 执行脚本 | 超时 | 并发 |
|---|---|---|---|---|
抖店订单采集 |
n8n-nodes-base.executeCommand |
python API_data_collect/order_D/main.py |
7200s | 并行 |
小红书订单采集 |
n8n-nodes-base.executeCommand |
python API_data_collect/order_H/main.py |
7200s | 并行 |
微信订单采集 |
n8n-nodes-base.executeCommand |
python API_data_collect/order_W/main.py |
7200s | 并行 |
千川素材采集 |
n8n-nodes-base.executeCommand |
python API_data_collect/qc_material/main.py |
3600s | 并行 |
| 节点名称 | 节点类型 | 执行脚本 | 超时 | 执行策略 |
|---|---|---|---|---|
达人榜单采集 |
n8n-nodes-base.executeCommand |
python web_crawler/luopan_rank/rank_kol.py |
10800s | 串行 |
商品榜单采集 |
n8n-nodes-base.executeCommand |
python web_crawler/luopan_rank/rank_product.py |
10800s | 串行 |
内容榜单采集 |
n8n-nodes-base.executeCommand |
python web_crawler/luopan_rank/rank_content.py |
10800s | 串行 |
搜索榜单采集 |
n8n-nodes-base.executeCommand |
python web_crawler/luopan_rank/rank_serach.py |
10800s | 串行 |
店铺榜单采集 |
n8n-nodes-base.executeCommand |
python web_crawler/luopan_rank/rank_shop.py |
10800s | 串行 |
| 节点名称 | 节点类型 | 执行命令 | 超时 |
|---|---|---|---|
执行SQL更新 |
n8n-nodes-base.executeCommand |
for f in *.sql; do mysql ... < "$f"; done |
1800s |
| 节点名称 | 节点类型 | 执行脚本 | 同步方向 | 并发 |
|---|---|---|---|---|
主播排期同步 |
n8n-nodes-base.executeCommand |
python mysql_syn_feishu/sync_live_author_schedule.py |
飞书→MySQL | 并行 |
达人维度同步 |
n8n-nodes-base.executeCommand |
python mysql_syn_feishu/sync_order_kol.py |
MySQL→飞书 | 并行 |
商品维度同步 |
n8n-nodes-base.executeCommand |
python mysql_syn_feishu/sync_order_product.py |
MySQL→飞书 | 并行 |
达人榜单同步 |
n8n-nodes-base.executeCommand |
python mysql_syn_feishu/sync_rank_kol.py |
MySQL→飞书 | 并行 |
| 节点名称 | 节点类型 | 功能说明 | 关键配置 |
|---|---|---|---|
Webhook |
n8n-nodes-base.webhook |
飞书机器人入口 | httpMethod: POST, path: feishu-bot |
任务预处理 |
n8n-nodes-base.code |
解析飞书消息、提取文本 | 提取 userText、chatId、traceId |
Supervisor Agent |
@n8n/n8n-nodes-langchain.agent |
🎯 路由分发 Agent | 输出 { route, reason, needChart, sqlFocus } |
解析路由结果 |
n8n-nodes-base.code |
解析 Supervisor 输出 | 默认进入数据分析分支 |
任务路由 |
n8n-nodes-base.switch |
根据 route 分发任务 | sql_analysis / direct_reply |
SQL Agent |
@n8n/n8n-nodes-langchain.agent |
📊 Text-to-SQL Agent | DDL Schema 注入 + MySQL Tool |
MySQL Tool |
n8n-nodes-base.mySqlTool |
执行 SQL 查询 | query: {{ $fromAI('sql') }} |
报告增强 Agent |
@n8n/n8n-nodes-langchain.agent |
📝 打磨分析报告 | 输出结构化洞察 + 图表配置 |
QA Agent |
@n8n/n8n-nodes-langchain.agent |
✅ 质量检查 Agent | 修复 Markdown + 图表 JSON |
Direct Reply Agent |
@n8n/n8n-nodes-langchain.agent |
💬 直接回复 Agent | 处理寒暄、非数据问题 |
分离文本和图表 |
n8n-nodes-base.code |
解析 AI 输出 | 提取 Markdown/JSON/Excel |
图片生成判断 |
n8n-nodes-base.if |
判断是否需要生成图表 | hasChart == true |
获取飞书授权Token |
n8n-nodes-feishu-lite.feishuNode |
获取上传文件必须的Token | 准备飞书应用凭证 |
生成图片 |
n8n-nodes-base.httpRequest |
调用 QuickChart API | url: https://quickchart.io/chart |
上传图片 |
n8n-nodes-base.httpRequest |
上传图片到飞书 | url: https://open.feishu.cn/open-apis/im/v1/images |
飞书推送数据分析结果 |
n8n-nodes-feishu-lite.feishuNode |
推送消息卡片 | msg_type: interactive |
表格生成判断 |
n8n-nodes-base.if |
判断是否需要生成表格 | hasExcel == true |
生成表格 |
n8n-nodes-base.Convert-to-File |
转换Excel文件 | Operation == Convert to XLSX |
上传表格 |
n8n-nodes-base.httpRequest |
上传文件到飞书 | url: https://open.feishu.cn/open-apis/im/v1/files |
飞书推送表格生成结果 |
n8n-nodes-feishu-lite.feishuNode |
推送Excel文件 | msg_type: interactive |
错误兜底 |
n8n-nodes-base.errorTrigger |
错误触发器 | - |
飞书推送错误信息 |
n8n-nodes-feishu-lite.feishuNode |
推送告警卡片 | template: red |
| 时间 | 任务 | 执行方式 | 预计时长 |
|---|---|---|---|
| 02:00 | API 数据采集 | 并行 | 2 小时 |
| 03:00 | 爬虫数据采集 | 串行 | 3 小时 |
| 04:00 | SQL 建表 | 串行 | 30 分钟 |
| 04:30 | 飞书同步 | 并行 | 30 分钟 |
💡 调度优化建议:爬虫任务采用串行执行避免资源竞争,API 采集和飞书同步采用并行执行提升效率。
# API_data_collect/order_D/doudian/client.py
def request_with_retry(self, method, params, max_retries=3):
for attempt in range(max_retries):
try:
result = self.request(method, params)
if result:
return result
wait_time = (2 ** attempt) + 1 # 1s, 3s, 5s
time.sleep(wait_time)
except Exception as e:
if attempt == max_retries - 1:
raise e
time.sleep((2 ** attempt) + 1)
return None# web_crawler/luopan_rank/rank_kol.py
# 采用 DrissionPage + Chromium 无头模式
co = ChromiumOptions()
co.headless(True)
co.set_argument('--no-sandbox')
co.set_argument('--disable-gpu')
co.no_imgs(True) # 无图模式省流加速
# 串行执行每个榜单采集,避免多浏览器实例 OOM
for category in INDUSTRY_CATEGORY:
for target_date in date_range:
for rank_cat in RANK_CATEGORY:
# 单个榜单采集完成后释放资源
process_rank(category, target_date, rank_cat)# mysql_syn_feishu/feishu_sync/mysql_to_feishu.py
# 使用参数化查询,防止 SQL 注入
SYNC_SQL_TEMPLATE = """
SELECT * FROM {table}
WHERE update_time > %s
"""
cursor.execute(SYNC_SQL_TEMPLATE.format(table=self.table_name), (last_sync_time,))# 基于 primary_key_field 实现增量同步
# 已存在的记录自动更新,新记录自动插入
syncer = MySQLToFeishuSync(
feishu_config=FEISHU_CONFIG,
mysql_config=MYSQL_CONFIG,
primary_key_field="达人UID" # 主键去重
)
syncer.sync(full_sync=False) # 增量同步| 错误类型 | 处理策略 | 重试次数 |
|---|---|---|
| API 采集失败 | 指数退避重试 | 3 次 |
| 爬虫采集失败 | 记录日志,继续执行 | 2 次 |
| SQL 建表失败 | 记录失败 SQL,继续执行其他 | 3 次 |
| 飞书同步失败 | 重试后发送告警 | 2 次 |
| 分类 | 技术栈 | 用途 |
|---|---|---|
| 编程语言 | Python 3.9+ | 核心业务逻辑开发 |
| 工作流引擎 | n8n | 自动化调度编排 + AI Agent |
| AI/LLM | LangChain, Qwen (通义千问) | 智能数据分析、Text-to-SQL |
| 数据库 | MySQL 8.0 | 数仓分层存储 |
| 爬虫框架 | DrissionPage | 浏览器自动化 |
| 数据处理 | Pandas, Polars, SQLAlchemy | ETL 数据处理 |
| 大模型 | Whisper (ASR), Qwen | 素材智能诊断 |
| 可视化 | QuickChart API | 动态图表生成 |
| 协作平台 | 飞书多维表/机器人 | 数据可视化与协作 |
| 容器化 | Docker, docker-compose | 环境部署 |
| 组件 | 版本要求 | 说明 |
|---|---|---|
| Python | 3.9+ | 核心业务逻辑 |
| Docker | 20.10+ | n8n 容器化部署 |
| Docker Compose | 2.0+ | 容器编排 |
| MySQL | 8.0+ | 数仓存储 |
| Node.js | 20+ | 可选,用于签名服务 |
# 克隆项目
git clone https://github.com/liuchaoqi-7/DA_Multi_Agent_Workflow.git
cd n8n-Multi_Agent_Workflow
# 安装 Python 依赖
pip install -r requirements.txt# 复制环境变量模板
cp .env.example .env
# 编辑 .env 文件,填入真实凭证
vim .env.env 文件配置说明:
| 配置分类 | 必填项 | 说明 |
|---|---|---|
| 数据库 | DB_HOST, DB_USER, DB_PASSWORD, DB_NAME |
MySQL 连接配置 |
| 抖店 API | DOUDIAN_APP_KEY, DOUDIAN_APP_SECRET |
抖店开放平台凭证 |
| 小红书 API | XHS_APP_KEY, XHS_APP_SECRET, XHS_ACCESS_TOKEN |
小红书开放平台凭证 |
| 微信 API | WEIXIN_APP_ID, WEIXIN_APP_SECRET |
微信开放平台凭证 |
| 千川 API | QIANCHUAN_APP_ID, QIANCHUAN_APP_SECRET, QIANCHUAN_REFRESH_TOKEN |
巨量千川凭证 |
| 飞书 API | FEISHU_APP_ID, FEISHU_APP_SECRET, FEISHU_APP_TOKEN |
飞书开放平台凭证 |
| 大模型 API | DASHSCOPE_API_KEY |
通义千问 API Key |
| n8n | - | n8n 服务配置(默认即可) |
cd n8n_setting
docker-compose up -d访问 http://localhost:5678 进入 n8n 界面。
- 打开 n8n 界面
- 点击 "Import from File"
- 选择
n8n_Workflow/目录下的 JSON 文件:数据中台 - 完整ETL工作流Agent.json- 主 ETL 工作流数据分析Agent.json- 🤖 AI 数据分析 Agent[ETL-WeChat] 订单+售后+结算同步.json- 微信数据采集[MySQL-Feishu] 通用Sync逻辑.json- 飞书同步逻辑
- 配置环境变量和凭证
- 点击 "Execute Workflow" 测试运行
确保使用相对路径或基于脚本位置的路径:
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$SCRIPT_DIR"各平台 Token 有效期:
| 平台 | Token 有效期 | 处理方式 |
|---|---|---|
| 抖店 | 7 天 | 自动刷新 |
| 小红书 | 需手动刷新 | 手动更新 |
| 微信视频号 | 2 小时 | 自动刷新 |
| 巨量千川 | 2 小时 | 自动刷新 |
默认线程数已优化,如遇限流可降低线程数:
python main.py --max_workers 20检查项:
- ✅ 环境变量是否正确
- ✅ 网络连通性
- ✅ 数据库用户权限
- ✅ 防火墙配置
原因:多浏览器实例同时运行
解决方案:
- 采用串行执行策略
- 启用无头模式
co.headless(True) - 启用无图模式
co.no_imgs(True)
⚠️ 使用环境变量存储密钥,不要硬编码⚠️ 不要将密钥提交到版本控制⚠️ 定期更换 API 密钥⚠️ 使用.env文件并添加到.gitignore
- 🔒 限制数据库访问 IP
- 🔒 使用最小权限原则
- 🔒 启用 n8n 基本认证
- 🔒 定期审计访问日志
- 💾 定期备份数据库
- 💾 敏感数据加密存储
- 💾 日志脱敏处理
- 💾 数据传输使用 SSL/TLS
- ✨ 重构为模块化架构
- ✨ 支持 n8n 工作流编排
- ✨ 新增飞书同步模块
- ✨ 优化爬虫签名服务
- ✨ 新增素材智能诊断(Whisper + Qwen)
- ✨ 新增多智能体协作架构(Supervisor Agent 路由分发 + 专业 Agent 协作)
- ✨ 新增企业级数据治理(元数据管理 + 数据字典)
- 🎉 初始版本
- 🎉 单文件脚本
MIT License - 详见 LICENSE 文件
欢迎提交 Issue 和 Pull Request!
- Fork 本仓库
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 开启 Pull Request
刘朝齐 (Data Analyst / AI Agent Developer)
- 🛰️ Wechat: LCQ2312437922
- 📧 Email: lcq2312437922@gmail.com / 2312437922@qq.com
- 💬 专注领域:全域电商数据治理 / ETL 数据工程 / Multi-Agent 智能体架构落地
- 🎯 目前正在寻找 上海/杭州 地区的数据分析师/数据工程师机会,欢迎随时联系!
注意:本项目仅供学习交流使用,请勿用于商业用途。使用前请确保遵守各平台 API 使用规范。







