使用PySpark来分析Geolife项目(Microsoft Research Asia)在五年内由100+人收集的GPS轨迹数据集
数据:用户ID、纬度、经度、全部零、海拔高度、时间戳、日期、时间
1.计算每个人每天行驶的距离(以公里为单位)。对于每个用户输出他们旅行最多的(最早)天。此外,输出所有用户在所有日子里行驶的总距离。提示:使用lag和window函数。
2.计算每个人在一天内在任何两个数据点之间行驶的最快速度(以公里/小时为单位)。我们假设速度在任意两个时间点之间是恒定的,可以计算为距离/时间滞后。对于每个用户输出,他们旅行最快的(最早)天
from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql import Window # 创建 SparkSession spark = SparkSession.builder.appName("Geolife Analysis").getOrCreate() # 读取数据文件并创建 DataFrame data = spark.read.csv("path_to_your_data_file.csv", header=True) # 数据类型转换 data = data.withColumn("纬度", data["纬度"].cast("float")) \ .withColumn("经度", data["经度"].cast("float")) \ .withColumn("海拔高度", data["海拔高度"].cast("float")) \ .withColumn("时间戳", data["时间戳"].cast("timestamp")) \ .withColumn("日期", data["日期"].cast("date")) \ .withColumn("时间", data["时间"].cast("string")) # 计算每个数据点之间的距离 w = Window.partitionBy("用户ID").orderBy("时间戳") data = data.withColumn("lag_latitude", F.lag("纬度").over(w)) \ .withColumn("lag_longitude", F.lag("经度").over(w)) \ .withColumn("distance", F.when(F.isnull(data["lag_latitude"]), 0) .otherwise(F.acos(F.sin(F.radians(data["纬度"])) * F.sin(F.radians(data["lag_latitude"])) + F.cos(F.radians(data["纬度"])) * F.cos(F.radians(data["lag_latitude"])) * F.cos(F.radians(data["经度"]) - F.radians(data["lag_longitude"]))) * 6371)) # 计算每天行驶的总距离 daily_distance = data.groupBy("用户ID", "日期").agg(F.sum("distance").alias("daily_distance")) max_daily_distance = daily_distance.groupBy("用户ID").agg(F.max("daily_distance").alias("max_daily_distance")) result_1 = max_daily_distance.join(daily_distance, ["用户ID", "daily_distance"]).select("用户ID", "日期", "daily_distance") # 输出每个用户旅行最多的(最早)天 result_2 = max_daily_distance.join(daily_distance, ["用户ID", "max_daily_distance"]).select("用户ID", "日期", "max_daily_distance") # 计算每个人在一天内在任何两个数据点之间行驶的最快速度 data = data.withColumn("time_diff", F.unix_timestamp("时间戳") - F.unix_timestamp(F.lag("时间戳").over(w))) data = data.withColumn("speed_kmh", F.when(data["time_diff"] > 0, data["distance"] / (data["time_diff"] / 3600)).otherwise(None)) max_speed_per_day = data.groupBy("用户ID", "日期").agg(F.max("speed_kmh").alias("max_speed")) result_3 = max_speed_per_day.join(data, ["用户ID", "日期", "max_speed"]).select("用户ID", "日期", "max_speed") # 输出结果 result_1.show() result_2.show() result_3.show()
鄂ICP备2023011697号-1 | Powered By 91代做