夜间无人自主作业系统

📋
项目概述
本项目基于Python构建一套完整的夜间无人自主作业系统,实现农机在夜间环境下的全自主作业。
系统集成多传感器融合定位、AI视觉避障、智能路径规划和作业管理功能,解决传统农业夜间作业依赖人工、效率低下的问题。
🌙
实际应用场景
场景描述
某大型现代化农场位于华北平原,主要种植冬小麦。
每年春季需要进行夜间灌溉和施肥作业,面临以下挑战:
传统作业模式:
-
人力成本占作业成本的40%
解决方案:
通过夜间无人自主作业系统,农场可在晚10点至早6点期间,由农机全自动完成灌溉施肥任务,无需人工值守。
作业流程
1.
19:00
生成作业报告,准备次日任务
😫
行业痛点分析
痛点类别
核心逻辑架构
┌─────────────────────────────────────────────────────────────────────┐
│
夜间无人自主作业系统
│
├─────────────────────────────────────────────────────────────────────┤
│
│
│
│
├─────────────────────────────────────────────────────────────────────┤
│
核心技术栈:
│
└─────────────────────────────────────────────────────────────────────┘
夜间定位+避障核心逻辑
┌─────────────────────────────────────────────────────────────────────┐
│
夜间定位与避障数据流
│
├─────────────────────────────────────────────────────────────────────┤
│
│
│
└───────────┴─────┬─────┴───────────┴───────────┘
│
│
│
└─────────────────────────────────────────────────────────────────────┘
📁
项目结构
night_autonomous_operation/
│
├──
README.md
test_obstacle_detection.py
│
├──
test_area_calculation.py
│
├──
scripts/
troubleshooting.md
💻
核心代码实现
1.
配置文件
"config/settings.py"
"""
全局配置文件
夜间无人自主作业系统
包含所有系统参数和传感器配置
"""
from
dataclasses
OperationMode(Enum):
"""作业模式"""
AUTO
=
ObstacleType(Enum):
"""障碍物类型"""
STATIC
=
静态障碍物(岩石、树桩)
DYNAMIC
=
动态障碍物(动物、人员)
BOUNDARY
=
边界障碍物(围栏、沟渠)
UNKNOWN
=
CropType(Enum):
"""作物类型"""
WHEAT
=
"rice"
@dataclass
class
CoordinateSystem:
"""坐标系配置"""
origin_lat:
float
原点纬度(北京)
origin_lon:
float
原点海拔(米)
utm_zone:
int
北半球
@dataclass
class
RtkGnssConfig:
"""RTK-GNSS配置"""
primary_port:
str
"/dev/ttyUSB0"
secondary_port:
str
"/dev/ttyUSB1"
baud_rate:
int
Hz
rtk_correction_source:
str
ntrip/cors/builtin
ntrip_server:
str
"rtk.example.com"
ntrip_port:
int
"RTCM32"
ntrip_username:
str
"user"
ntrip_password:
str
"pass"
horizontal_accuracy_threshold:
float
米
vertical_accuracy_threshold:
float
米
fixed_solution_required:
bool
True
@dataclass
class
ImuConfig:
"""IMU配置"""
model:
str
"/dev/ttyACM0"
baud_rate:
int
madgwick/mahony/kalman
@dataclass
class
LidarConfig:
"""LiDAR配置"""
model:
str
"RS-LiDAR-M1"
interface:
str
"e***rnet"
ip_address:
str
"192.168.1.200"
port:
int
True
@dataclass
class
InfraredCameraConfig:
"""红外相机配置"""
model:
str
"FLIR_A655sc"
width:
int
测量距离(米)
nuc_enabled:
bool
True
@dataclass
class
UwbConfig:
"""UWB配置"""
anchors:
Dict[str,
field(default_factory=lambda:
(0.0,
2.0),
"anchor_2":
(500.0,
2.0),
"anchor_3":
(250.0,
"vehicle_tag_01"
update_rate:
int
0.3
@dataclass
class
FieldConfig:
"""作业区域配置"""
boundary_file:
str
"config/field_boundaries.geojson"
working_width:
float
作业幅宽(米)
headland_width:
float
地头宽度(米)
row_spacing:
float
行间距(米)
coverage_overlap:
float
覆盖率重叠(5%)
border_margin:
float
边界余量(米)
@dataclass
class
OperationConfig:
"""作业配置"""
mode:
OperationMode
OperationMode.AUTO
crop_type:
CropType
CropType.WHEAT
operation_type:
str
planting/irrigation/fertilization/harvesting
target_speed:
float
目标速度(km/h)
max_speed:
float
最大速度(km/h)
min_speed:
float
最小速度(km/h)
turning_speed:
float
转弯速度(km/h)
safety_distance:
float
安全距离(米)
obstacle_stop_distance:
float
障碍物停止距离(米)
night_vision_gain:
float
自适应速度
wea***r_compensation:
bool
天气补偿
@dataclass
class
SystemConfig:
"""系统总配置"""
#
str
"night_auto_001"
version:
str
"1.0.0"
debug_mode:
bool
"INFO"
log_directory:
str
field(default_factory=CoordinateSystem)
rtk_gnss:
RtkGnssConfig
field(default_factory=RtkGnssConfig)
imu:
ImuConfig
field(default_factory=ImuConfig)
lidar:
LidarConfig
field(default_factory=LidarConfig)
infrared_camera:
InfraredCameraConfig
field(default_factory=InfraredCameraConfig)
uwb:
UwbConfig
field(default_factory=UwbConfig)
field:
FieldConfig
field(default_factory=FieldConfig)
operation:
OperationConfig
field(default_factory=OperationConfig)
#
str
"22:00"
planned_end_time:
str
"06:00"
max_operating_hours:
float
"output"
report_format:
str
json/pdf/csv
telemetry_save_interval:
int
秒
@classmethod
def
load_from_file(cls,
'SystemConfig':
"""从文件加载配置"""
path
=
str):
"""保存配置到文件"""
path
=
Path(filepath)
path.parent.mkdir(parents=True,
open(path,
f:
json.dump(self.__dict__,
indent=2,
任务加载器
"src/input_layer/mission_loader.py"
"""
任务加载器模块
负责加载和解析作业任务
支持多种任务格式(JSON、YAML、GeoJSON)
"""
import
asyncio
import
ParameterValidator
logger
=
logging.getLogger(__name__)
@dataclass
class
MissionTask:
"""作业任务"""
task_id:
str
task_name:
1-10,数字越大优先级越高
parameters:
Dict[str,
field(default_factory=dict)
status:
str
pending/running/paused/completed/failed/cancelled
progress:
float
field(default_factory=datetime.now)
updated_at:
datetime
field(default_factory=datetime.now)
@dataclass
class
MissionPlan:
"""作业计划"""
plan_id:
str
plan_name:
field(default_factory=list)
total_area:
float
总面积(亩)
estimated_duration:
float
预计时长(小时)
created_at:
datetime
field(default_factory=datetime.now)
status:
str
draft/approved/scheduled/executing/completed
class
MissionLoader:
"""
任务加载器
功能特性:
-
支持多种任务格式
-
计划优化
"""
def
__init__(self):
self.field_parser
=
FieldParser()
self.validator
=
ParameterValidator()
self._loaded_plans:
Dict[str,
MissionPlan:
"""
加载作业计划
Args:
source:
任务源(文件路径、URL或字典)
Returns:
MissionPlan:
加载的作业计划
"""
logger.info(f"开始加载作业计划:
解析任务源
if
self._load_from_source(source)
else:
raise
验证和解析计划
plan
self._parse_mission_plan(plan_data)
#
解析作业区域
await
self._resolve_field_boundaries(plan)
#
=
self._calculate_total_area(plan)
#
估算作业时长
plan.estimated_duration
=
self._estimate_duration(plan)
#
验证任务参数
await
self._validate_plan(plan)
self._loaded_plans[plan.plan_id]
=
plan
logger.info(f"作业计划加载完成:
{plan.plan_name},
{plan.total_area:.2f}亩")
return
plan
async
Dict:
"""从不同来源加载任务数据"""
path
=
yaml.safe_load(content)
elif
path.suffix.lower()
json.loads(content)
return
path.stem,
"field_boundaries":
geojson,
"tasks":
[]
}
else:
raise
{path.suffix}")
async
def
MissionPlan:
"""解析任务计划数据"""
plan_id
=
f"plan_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
plan_name
=
MissionPlan(
plan_id=plan_id,
plan_name=plan_name
)
#
=
self._parse_task(task_data)
plan.tasks.append(task)
#
解析字段边界(如果在任务数据中)
if
data:
plan.field_boundaries
=
data["field_boundaries"]
return
plan
async
MissionTask:
"""解析单个任务"""
task_id
=
task_data.get("task_id",
f"task_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}")
#
=
self._parse_datetime(task_data.get("planned_start"))
planned_end
=
self._parse_datetime(task_data.get("planned_end"))
task
=
MissionTask(
task_id=task_id,
task_name=task_data.get("task_name",
f"任务{task_id}"),
operation_type=task_data.get("operation_type",
"fertilization"),
field_id=task_data.get("field_id",
"default_field"),
planned_start=planned_start,
planned_end=planned_end,
priority=task_data.get("priority",
5),
parameters=task_data.get("parameters",
task
def
_parse_datetime(
利用AI解决实际情问题,如果你觉得这个工具好用,欢迎关注长安牧笛!


