无人机全自动巡田分析系统

项目概述
实际应用场景描述
随着智慧农业的发展,传统人工巡田方式面临诸多挑战。
某大型农场拥有5000亩小麦种植区,传统巡田需要4名农技人员花费3天时间才能完成全面检查,不仅效率低下,还容易遗漏病虫害早期征兆。
本系统通过无人机自动采集航拍图像,结合计算机视觉和深度学习技术,实现农田健康状况的智能分析与可视化。
引入痛点
1.
效率低下:人工巡田耗时耗力,无法及时发现作物异常
2.
主观性强:依赖个人经验判断,缺乏客观标准
3.
覆盖不全:地形复杂区域难以到达,存在监测盲区
4.
数据分散:影像资料难以整合分析,无法形成全局视图
5.
预警滞后:病虫害发现时已扩散,错过最佳防治期
核心逻辑讲解
┌─────────────────────────────────────────────────────────────────┐
│
无人机全自动巡田分析系统
│
├─────────────────────────────────────────────────────────────────┤
│
+
│
└─────────────────────────────────────────────────────────────────┘
核心算法流程:
1.
图像预处理:使用OpenCV进行去畸变、白平衡、对比度增强
2.
图像拼接:基于特征点的全景图构建,采用RANSAC剔除误匹配
3.
植被指数计算:NDVI
长势分析:结合颜色空间转换和纹理特征,使用K-means聚类
5.
健康评估:多维度评分模型,综合NDVI、叶面积指数、病害指数
项目结构
drone_field_analyzer/
├──
README.md
预训练模型到路径
sys.path.insert(0,
str(Path(__file__).parent.parent))
from
src.utils.logger
src.preprocessing.image_denoiser
import
src.preprocessing.geometric_corrector
import
src.preprocessing.radiometric_calibrator
import
RadiometricCalibrator
from
import
src.visualization.health_map_generator
import
ProcessingStage(Enum):
"""处理阶段枚举"""
PREPROCESSING
=
"preprocessing"
STITCHING
=
"stitching"
ANALYSIS
=
"analysis"
VISUALIZATION
=
"visualization"
COMPLETED
=
"completed"
@dataclass
class
ProcessingConfig:
"""处理配置数据类"""
input_dir:
str
enable_preprocessing:
bool
True
enable_visualization:
bool
direct
ndvi_threshold_low:
float
"INFO"
@classmethod
def
from_yaml(cls,
'ProcessingConfig':
"""从YAML文件加载配置"""
with
open(config_path,
cls(**config_dict.get('processing',
{}))
@classmethod
def
default(cls)
'ProcessingConfig':
"""返回默认配置"""
return
cls(
input_dir="./data/input",
output_dir="./data/output",
enable_preprocessing=True,
enable_stitching=True,
enable_analysis=True,
enable_visualization=True
)
@dataclass
class
ProcessingResult:
"""处理结果数据类"""
stage:
bool
message:
field(default_factory=dict)
metrics:
Dict
field(default_factory=dict)
timestamp:
datetime
field(default_factory=datetime.now)
class
DroneFieldAnalyzer:
"""
无人机全自动巡田分析系统核心类
该类实现了从原始航拍图像到农田健康地图的完整处理流水线,
集成了图像预处理、拼接、长势分析和可视化功能。
Attributes:
config:
处理配置对象
logger:
各阶段处理结果记录
"""
def
__init__(self,
None):
"""
初始化分析系统
Args:
config:
处理配置,如果为None则使用默认配置
"""
self.config
=
ProcessingConfig.default()
self.logger
=
setup_logger(
name="DroneFieldAnalyzer",
level=self.config.log_level
)
self.file_handler
=
FileHandler(self.config.output_dir)
self.current_stage
=
ProcessingStage.PREPROCESSING
self.results:
ProcessingResult]
初始化各处理模块
self._initialize_modules()
self.logger.info("无人机全自动巡田分析系统初始化完成")
def
->
None:
"""初始化所有处理模块"""
self.logger.info("正在初始化处理模块...")
#
=
self.config.enable_preprocessing
else
self.config.enable_preprocessing
else
None
self.radiometric_calibrator
=
self.config.enable_preprocessing
else
拼接模块
self.panorama_builder
=
PanoramaBuilder(
method=self.config.stitch_method
)
else
分析模块
self.ndvi_calculator
=
NDVICalculator(
threshold_low=self.config.ndvi_threshold_low,
threshold_high=self.config.ndvi_threshold_high
)
else
None
self.growth_assessor
=
None
self.disease_detector
=
可视化模块
self.health_map_generator
=
self.config.enable_visualization
else
None
self.logger.info("所有处理模块初始化完成")
def
process(self,
ProcessingResult:
"""
执行完整的处理流水线
Args:
progress_callback:
progress,
message)
Returns:
ProcessingResult:
最终处理结果
"""
try:
self.logger.info("="
60)
self.logger.info("开始执行无人机全自动巡田分析流水线")
self.logger.info("="
60)
#
self.config.enable_preprocessing:
result
=
self._run_preprocessing(progress_callback)
if
not
self.config.enable_stitching:
result
=
self._run_stitching(progress_callback)
if
not
self.config.enable_analysis:
result
=
self._run_analysis(progress_callback)
if
not
self.config.enable_visualization:
result
=
self._run_visualization(progress_callback)
if
not
self._generate_final_report()
self.logger.info("="
60)
self.logger.info("无人机全自动巡田分析流水线执行完成!")
self.logger.info("="
60)
return
e:
self.logger.error(f"处理流水线执行失败:
{str(e)}",
exc_info=True)
return
ProcessingResult(
stage=self.current_stage,
success=False,
message=f"处理失败:
{str(e)}"
)
def
->
ProcessingResult:
"""执行图像预处理阶段"""
self.current_stage
=
ProcessingStage.PREPROCESSING
self.logger.info("[阶段1]
开始图像预处理...")
try:
input_images
=
self.file_handler.load_input_images(self.config.input_dir)
processed_images
=
enumerate(input_images):
if
progress_callback:
progress
=
100
progress_callback(self.current_stage,
{idx+1}/{total_images}")
self.logger.debug(f"处理图像:
读取图像
image
self.file_handler.read_image(image_path)
#
图像去噪
if
method='bilateral')
#
几何校正
if
self.geo_corrector:
image
=
self.geo_corrector.correct_distortion(image)
#
辐射定标
if
self.radiometric_calibrator:
image
=
self.radiometric_calibrator.calibrate(image)
processed_images.append(image)
#
=
self.file_handler.save_processed_image(
image,
Path(image_path).stem
+
"_processed"
)
self.logger.debug(f"已保存预处理图像:
{output_path}")
result
=
ProcessingResult(
stage=ProcessingStage.PREPROCESSING,
success=True,
message=f"成功预处理
张图像",
data={'processed_images':
processed_images,
len(processed_images)},
metrics={
'input_count':
total_images,
'output_count':
len(processed_images),
'avg_processing_time':
0.0
可添加实际计时
}
)
self.results[self.current_stage]
=
result
self.logger.info(f"[阶段1]
图像预处理完成:
{result.message}")
return
result
except
{str(e)}"
self.logger.error(error_msg,
exc_info=True)
return
ProcessingResult(
stage=ProcessingStage.PREPROCESSING,
success=False,
message=error_msg
)
def
_run_stitching(self,
ProcessingResult:
"""执行图像拼接阶段"""
self.current_stage
=
ProcessingStage.STITCHING
self.logger.info("[阶段2]
开始图像拼接...")
try:
#
self.results.get(ProcessingStage.PREPROCESSING):
images
=
self.results[ProcessingStage.PREPROCESSING].data['processed_images']
else:
images
=
self.file_handler.load_input_images(
self.config.input_dir,
pattern="*_processed.*"
)
if
len(images)
ProcessingResult(
stage=ProcessingStage.STITCHING,
success=False,
message="图像数量不足,至少需要2张图像进行拼接"
)
#
执行拼接
panorama,
self.panorama_builder.build_panorama(
images,
progress_callback=lambda
(
progress_callback(self.current_stage,
m)
if
self.file_handler.save_panorama(panorama,
"field_panorama")
result
=
ProcessingResult(
stage=ProcessingStage.STITCHING,
success=True,
message=f"成功拼接
{len(images)}
张图像,生成全景图",
data={
'panorama':
panorama,
'homographies':
homographies,
'panorama_path':
output_path
},
metrics={
'input_count':
len(images),
'panorama_size':
panorama.shape[:2],
'overlap_regions':
len(homographies)
}
)
self.results[self.current_stage]
=
result
self.logger.info(f"[阶段2]
图像拼接完成:
{result.message}")
return
result
except
{str(e)}"
self.logger.error(error_msg,
exc_info=True)
return
ProcessingResult(
stage=ProcessingStage.STITCHING,
success=False,
message=error_msg
)
def
_run_analysis(self,
ProcessingResult:
"""执行长势分析阶段"""
self.current_stage
=
ProcessingStage.ANALYSIS
self.logger.info("[阶段3]
开始长势分析...")
try:
#
self.results.get(ProcessingStage.STITCHING):
panorama
=
self.results[ProcessingStage.STITCHING].data['panorama']
else:
panorama
=
self.file_handler.load_panorama("field_panorama")
analysis_results
=
progress_callback:
progress_callback(self.current_stage,
20,
"计算NDVI植被指数...")
ndvi_map,
vegetation_mask
self.ndvi_calculator.calculate(panorama)
analysis_results['ndvi']
=
ndvi_map,
'mask':
vegetation_mask,
'statistics':
self.ndvi_calculator.get_statistics(ndvi_map)
}
#
保存NDVI图
self.file_handler.save_ndvi_map(ndvi_map,
"ndvi_map")
self.logger.info("NDVI计算完成")
#
长势评估
if
progress_callback:
progress_callback(self.current_stage,
50,
"评估作物长势...")
growth_map,
growth_zones
self.growth_assessor.assess(
panorama,
ndvi_map
'ndvi'
None
)
analysis_results['growth']
=
growth_map,
'zones':
growth_zones,
'statistics':
self.growth_assessor.get_statistics(growth_map)
}
#
保存长势图
self.file_handler.save_growth_map(growth_map,
"growth_map")
self.logger.info("长势评估完成")
#
病害检测
if
self.disease_detector:
if
progress_callback:
progress_callback(self.current_stage,
80,
"检测病虫害...")
disease_map,
disease_instances
self.disease_detector.detect(
panorama,
ndvi_map
'ndvi'
None
)
analysis_results['disease']
=
disease_map,
'instances':
disease_instances,
'statistics':
self.disease_detector.get_statistics(disease_instances)
}
#
保存病害检测结果
self.file_handler.save_disease_map(disease_map,
"disease_map")
self.logger.info(f"病害检测完成:
=
ProcessingResult(
stage=ProcessingStage.ANALYSIS,
success=True,
message="长势分析完成",
data=analysis_results,
metrics=self._calculate_analysis_metrics(analysis_results)
)
self.results[self.current_stage]
=
result
self.logger.info(f"[阶段3]
长势分析完成:
{result.message}")
return
result
except
{str(e)}"
self.logger.error(error_msg,
exc_info=True)
return
ProcessingResult(
stage=ProcessingStage.ANALYSIS,
success=False,
message=error_msg
)
def
->
ProcessingResult:
"""执行可视化输出阶段"""
self.current_stage
=
ProcessingStage.VISUALIZATION
self.logger.info("[阶段4]
开始生成健康地图...")
try:
#
=
self.results.get(ProcessingStage.ANALYSIS)
if
not
analysis_result.success:
return
ProcessingResult(
stage=ProcessingStage.VISUALIZATION,
success=False,
message="无法进行可视化:
缺少有效的分析结果"
)
analysis_data
=
analysis_result.data
panorama
=
self.results[ProcessingStage.STITCHING].data['panorama']
if
progress_callback:
progress_callback(self.current_stage,
30,
self.health_map_generator.generate(
panorama,
analysis_data.get('ndvi',
{}).get('map'),
analysis_data.get('growth',
{}).get('map'),
analysis_data.get('disease',
{}).get('map')
)
#
=
self.file_handler.save_health_map(health_map,
progress_callback:
progress_callback(self.current_stage,
70,
self.health_map_generator.generate_report(
analysis_data,
health_map
)
report_path
=
self.file_handler.save_report(report,
"analysis_report")
#
生成预警信息
alerts
self.health_map_generator.generate_alerts(analysis_data)
alerts_path
=
self.file_handler.save_alerts(alerts,
=
ProcessingResult(
stage=ProcessingStage.VISUALIZATION,
success=True,
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!


