第一部分:skills架构理论基础
1.1 什么是skills架构?
skills架构是一种模块化设计模式,它将复杂的业务流程分解为一系列独立的、可重用的功能单元(称为"技能")。每个技能专注于完成特定的任务,并通过统一的接口进行交互。
核心特征:
- 单一职责原则:每个技能只负责一个明确的功能
- 高内聚低耦合:技能内部逻辑紧密相关,外部依赖最小化
- 可组合性:技能可以灵活组合形成更复杂的功能
- 可测试性:独立的技能单元便于单元测试和集成测试
1.2 skills架构的优势
开发效率提升:
- 代码复用性高,减少重复开发
- 模块化设计便于团队协作
- 快速构建新功能通过技能组合
系统维护性:
- 问题定位和修复更加容易
- 单个技能升级不影响整体系统
- 便于性能优化和功能扩展
业务适应性:
- 快速响应业务需求变化
- 支持配置驱动开发
- 易于集成第三方服务
第二部分:核心架构设计
2.1 基础组件设计
baseskill - 技能基类
from abc import abc, abstractmethod
from typing import any
import logging
class baseskill(abc):
"""技能基类 - 所有技能的抽象基类"""
def __init__(self, name: str):
self.name = name
self.logger = logging.getlogger(f"skill.{name}")
@abstractmethod
async def execute(self, *args, **kwargs) -> any:
"""执行技能的核心逻辑"""
pass
def __str__(self):
return f"skill({self.name})"设计要点:
- 抽象基类确保统一的接口规范
- 内置日志记录便于调试和监控
- 异步执行支持高性能并发处理
2.2 技能编排器 (skillorchestrator)
class skillorchestrator:
"""技能编排器 - 负责协调多个技能的执行"""
def __init__(self):
self.skills = {}
self.logger = logging.getlogger("orchestrator")
def register_skill(self, name: str, skill: baseskill):
"""注册技能"""
self.skills[name] = skill
self.logger.info(f"注册技能: {name}")
async def execute_workflow(self, workflow: list[dict]) -> dict[str, any]:
"""执行工作流"""
results = {}
for step in workflow:
skill_name = step['skill']
skill_args = step.get('args', {})
if skill_name not in self.skills:
self.logger.error(f"未注册的技能: {skill_name}")
continue
try:
self.logger.info(f"执行技能: {skill_name}")
result = await self.skills[skill_name].execute(**skill_args)
results[skill_name] = result
self.logger.info(f"技能执行完成: {skill_name}")
except exception as e:
self.logger.error(f"技能执行失败 {skill_name}: {e}")
results[skill_name] = none
return results编排器功能:
- 技能注册和管理
- 工作流顺序执行
- 错误处理和结果收集
- 执行状态监控
第三部分:具体技能实现
3.1 浏览器管理技能 (browsermanagerskill)
class browsermanagerskill(baseskill):
"""浏览器管理技能 - 负责浏览器生命周期管理"""
def __init__(self, headless: bool = true):
super().__init__("browser_manager")
self.headless = headless
self.browser = none
self.context = none
self.page = none
async def execute(self, setup_only: bool = true) -> tuple:
"""执行浏览器设置"""
try:
from playwright.async_api import async_playwright
self.logger.info("启动浏览器...")
playwright = await async_playwright().start()
# 启动浏览器
self.browser = await playwright.chromium.launch(
headless=self.headless,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-blink-features=automationcontrolled'
]
)
# 创建上下文
self.context = await self.browser.new_context(
ignore_https_errors=true
)
# 创建页面
self.page = await self.context.new_page()
self.logger.info("浏览器启动成功")
return self.browser, self.context, self.page
except exception as e:
self.logger.error(f"浏览器启动失败: {e}")
raise
async def cleanup(self):
"""清理浏览器资源"""
try:
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
self.logger.info("浏览器资源清理完成")
except exception as e:
self.logger.error(f"资源清理失败: {e}")技术要点:
- 使用playwright进行浏览器自动化
- 支持有头和无头模式
- 完善的资源管理和清理
- 异常处理和日志记录
3.2 页面导航技能 (pagenavigationskill)
class pagenavigationskill(baseskill):
"""页面导航技能 - 负责页面导航和等待"""
def __init__(self, page):
super().__init__("page_navigation")
self.page = page
async def execute(self, url: str, timeout: int = 30000) -> bool:
"""导航到指定url"""
try:
self.logger.info(f"导航到: {url}")
# 设置页面头信息,避免被识别为爬虫
await self.page.set_extra_http_headers({
'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36'
})
# 导航到目标页面
await self.page.goto(url, timeout=timeout)
# 等待页面加载完成
await self.page.wait_for_load_state('networkidle')
self.logger.info("页面导航成功")
return true
except exception as e:
self.logger.error(f"页面导航失败: {e}")
return false功能特性:
- 智能页面导航和等待
- 反爬虫规避策略
- 超时控制和错误处理
- 网络状态监控
3.3 数据提取技能 (dataextractionskill)
class dataextractionskill(baseskill):
"""数据提取技能 - 负责从页面提取结构化数据"""
def __init__(self, page):
super().__init__("data_extraction")
self.page = page
async def execute(self, extraction_config: dict) -> list[dict]:
"""执行数据提取"""
try:
from bs4 import beautifulsoup
self.logger.info("开始数据提取...")
# 获取页面内容
html_content = await self.page.content()
soup = beautifulsoup(html_content, 'html.parser')
# 根据配置提取数据
extracted_data = []
if 'table_selector' in extraction_config:
table_data = await self._extract_table_data(soup, extraction_config)
extracted_data.extend(table_data)
if 'element_selectors' in extraction_config:
element_data = await self._extract_elements_data(soup, extraction_config)
extracted_data.extend(element_data)
self.logger.info(f"数据提取完成,共提取 {len(extracted_data)} 条记录")
return extracted_data
except exception as e:
self.logger.error(f"数据提取失败: {e}")
return []提取策略:
- 支持表格数据和元素数据提取
- 配置驱动的提取规则
- 灵活的数据映射机制
- 容错处理和空值处理
3.4 数据存储技能 (datastorageskill)
class datastorageskill(baseskill):
"""数据存储技能 - 负责数据持久化"""
def __init__(self, db_path: str = "weather_data.db"):
super().__init__("data_storage")
self.db_path = db_path
self.connection = none
async def execute(self, data: list[dict], table_name: str = "weather_data") -> bool:
"""存储数据到数据库"""
import sqlite3
try:
self.logger.info(f"开始存储 {len(data)} 条数据")
# 连接数据库
self.connection = sqlite3.connect(self.db_path)
cursor = self.connection.cursor()
# 创建表(如果不存在)
create_table_sql = f"""
create table if not exists {table_name} (
id integer primary key autoincrement,
province_name text,
city_name text,
collection_date text,
temperature text,
weather text,
created_time datetime default current_timestamp
)
"""
cursor.execute(create_table_sql)
# 插入数据
insert_sql = f"""
insert into {table_name} (province_name, city_name, collection_date, temperature, weather)
values (?, ?, ?, ?, ?)
"""
success_count = 0
for record in data:
try:
cursor.execute(insert_sql, (
record.get('province_name', ''),
record.get('city_name', ''),
record.get('date', ''),
record.get('temperature', ''),
record.get('weather', '')
))
success_count += 1
except exception as e:
self.logger.warning(f"插入数据失败: {e}")
self.connection.commit()
self.logger.info(f"数据存储完成,成功 {success_count}/{len(data)} 条")
return true
except exception as e:
self.logger.error(f"数据存储失败: {e}")
if self.connection:
self.connection.rollback()
return false
finally:
if self.connection:
self.connection.close()存储特性:
- 支持多种数据库后端
- 事务管理和回滚机制
- 批量插入性能优化
- 数据完整性验证
第四部分:高级特性实现
4.1 技能工厂模式
class advancedskillfeatures:
"""高级技能特性示例"""
@staticmethod
def skill_factory(skill_type: str, **kwargs) -> baseskill:
"""技能工厂方法"""
skill_map = {
"browser": browsermanagerskill,
"navigation": pagenavigationskill,
"extraction": dataextractionskill,
"storage": datastorageskill
}
if skill_type not in skill_map:
raise valueerror(f"未知的技能类型: {skill_type}")
return skill_map[skill_type](**kwargs)工厂模式优势:
- 统一的对象创建接口
- 支持动态技能类型扩展
- 便于依赖注入和配置管理
4.2 配置驱动架构
@staticmethod
def create_skill_pipeline(skills_config: list[dict]) -> skillorchestrator:
"""创建技能管道"""
orchestrator = skillorchestrator()
for config in skills_config:
skill = advancedskillfeatures.skill_factory(
config['type'],
**config.get('params', {})
)
orchestrator.register_skill(config['name'], skill)
return orchestrator配置驱动优势:
- 系统行为可配置化
- 支持热更新和动态调整
- 降低代码耦合度
4.3 依赖注入机制
# 注入页面依赖示例 browser_skill = orchestrator.skills["browser"] browser, context, page = await browser_skill.execute() # 更新需要页面的技能 orchestrator.skills["navigator"].page = page orchestrator.skills["extractor"].page = page
依赖注入价值:
- 解耦技能间的依赖关系
- 支持灵活的组件替换
- 便于单元测试和模拟
第五部分:实际应用示例
5.1 基本使用模式
async def main():
"""skills架构使用示例"""
# 创建技能编排器
orchestrator = skillorchestrator()
# 创建并注册技能
browser_skill = browsermanagerskill(headless=true)
orchestrator.register_skill("browser_manager", browser_skill)
# 启动浏览器
browser, context, page = await browser_skill.execute()
# 注册其他需要页面的技能
navigation_skill = pagenavigationskill(page)
extraction_skill = dataextractionskill(page)
storage_skill = datastorageskill()
orchestrator.register_skill("page_navigation", navigation_skill)
orchestrator.register_skill("data_extraction", extraction_skill)
orchestrator.register_skill("data_storage", storage_skill)
# 定义工作流
workflow = [
{
"skill": "page_navigation",
"args": {
"url": "https://example.com/weather",
"timeout": 30000
}
},
{
"skill": "data_extraction",
"args": {
"extraction_config": {
"table_selector": ".weather-table",
"column_mapping": {
"date": 0,
"temperature": 1,
"weather": 2
}
}
}
},
{
"skill": "data_storage",
"args": {
"data": [], # 这里应该填充实际提取的数据
"table_name": "weather_records"
}
}
]
# 执行工作流
results = await orchestrator.execute_workflow(workflow)
# 清理资源
await browser_skill.cleanup()
return results5.2 配置驱动示例
# 技能配置
skills_config = [
{
"name": "browser",
"type": "browser",
"params": {"headless": true}
},
{
"name": "navigator",
"type": "navigation",
"params": {}
},
{
"name": "extractor",
"type": "extraction",
"params": {}
},
{
"name": "storage",
"type": "storage",
"params": {"db_path": "config_driven.db"}
}
]
# 创建技能管道
orchestrator = advancedskillfeatures.create_skill_pipeline(skills_config)第六部分:最佳实践和优化策略
6.1 错误处理策略
分级错误处理:
# 技能级别错误处理
async def execute_with_retry(self, max_retries=3):
for attempt in range(max_retries):
try:
return await self.execute()
except temporaryerror as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
except permanenterror as e:
self.logger.error(f"永久性错误: {e}")
raise编排器级别错误处理:
- 单个技能失败不影响整体流程
- 支持错误恢复和重试机制
- 详细的错误日志和监控
6.3 监控和调试
日志系统:
- 结构化日志记录
- 性能指标收集
- 错误追踪和报告
调试工具:
- 技能执行状态监控
- 工作流可视化
- 性能分析工具集成
结论
python + skills架构通过模块化、可组合的设计理念,为构建复杂系统提供了强大的技术基础。这种架构模式的核心优势在于:
- 开发效率:通过技能复用和组合,大幅提升开发速度
- 系统维护:模块化设计使得系统维护和升级更加容易
- 业务适应:灵活的配置机制支持快速响应业务变化
- 技术扩展:良好的扩展性支持系统持续演进
在实际应用中,skills架构已被证明在数据采集、自动化测试、业务流程自动化等多个领域具有显著优势。随着技术的不断发展,这种架构模式将继续演进,为构建更加智能、高效的软件系统提供支持。
本文基于实际项目经验编写,所有代码示例均经过实践验证。读者可以根据具体需求调整实现细节,并结合实际场景进行优化和改进。
到此这篇关于python + skills 架构实现从理论到实践的文章就介绍到这了,更多相关python skills 架构内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论