本节我们以查询为例,看下GeoSpark如何利用分布式来实现高效查询。首先,对于Spark来说,想要利用Spark,必须要将自己的类型转为RDD,我们就先看下Geospark是如何读取GeoJson,并将Geometry
转为RDD的。
1 | public class SpatialRDD<T extends Geometry> |
Geospark自定义了一个RDD,SpatialRDD
,他是一个泛型类,并且泛型要求是Geometry
的子类,对于Geometry
来说,他的子类有Point
、Line
、Polygon
等,这个大家可以去看JTS库http://www.tsusiatsoftware.net/jts/main.html。然后我这里列举了SpatialRDD
一个重要的成员,对于rawSpatialRDD
来说,他里面存储的就是我们的需要分析的Geometry
。
GeoSpark提供了PointRDD
,PolygonRDD
等,他们都继承自SpatialRDD
,我们以PointRDD
为例,分析一下GeoSpark是如何将geojson转为RDD的。
1 | public PointRDD(JavaSparkContext sparkContext, String InputLocation, Integer Offset, FileDataSplitter splitter, |
这是PointRDD常用的一个构造函数,其中第4行JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);
则是利用Spark的原生方法将geojson首先转为一个RDD,他的类型可以理解为是String,第7行if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}
则是做了一个坐标转换,关键是第5行this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));
,
在第5行中,Geospark首先调用了mapPartitions
方法来将rawTextRDD中的每一行转为Geometry,其中pointFormatMapper
中有一个方法
1 | public Iterator<T> call(Iterator<String> stringIterator) |
他是一个重载,函数参数stringIterator
是每个分区的所有string,Geospark遍历这个集合,在每一行调用了一个addGeometry
方法,将String转为Geometry,这个方法就不细讲,主要是解析GeoJson,感兴趣的可以去看GeoSpark源码。
这样构造完成后,就将GeoJson转为了一个RDD,此时我们还没有构建空间索引,但是对于大数据量的空间数据我们已经可以利用Spark的RDD进行并行计算了。
1 | public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex) |
这里我们看第16行return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
在第9行if (useIndex == true)
判断不用索引时,就会跳到第16行,本质上还是用了RDD来利用自定义函数进行判断,如果是真,就过滤出来,我们看RangeFilter
这个类。
1 | public class RangeFilter<U extends Geometry, T extends Geometry> |
注意到call
这个方法,里面又调用了match
方法,它在父类JudgementBase
定义有:
1 | public boolean match(Geometry spatialObject, Geometry queryWindow) |
这里面,我们可以看到第4行和第7行均是利用了JTS来判断的,到这里,就一目了然了,实际上还是我们提供了match这个方法,利用Spark来计算。
本文中,我们并没有涉及到索引,GeoSpark也将JTS的索引进行了封装,原理和上面讲的是一样的,我们下一篇文章中在进行分析。