How to do distributed processing of Landsat data in Python
Using Cloud Dataflow and Google Cloud Public Datasets
Originally posted on Google Cloud Blog at https://cloud.google.com/blog/products/gcp/how-to-do-distributed-processing-of-landsat-data-in-python
One common data analysis task across the agricultural industry, as well in academia and government (for drought studies, climate modeling, and so on), is to create a monthly vegetation index from Landsat images, which is now available as a public dataset on Google Cloud Platform (source of Landsat images: U.S. Geological Survey). One approach to create such a monthly vegetation index is to write a data processing script that does the following:
- Find all the Landsat images that cover the location in question.
- Find the least-cloudy image for each month, making sure to correctly handle months when the study area needs multiple Landsat images for complete coverage.
- For each image, compute the normalized difference between bands 3 and 4 or 4 and 5 depending on the type of spacecraft — this is the vegetation index known as NDVI.
If you were the data analyst or engineer assigned to this task you could code this workflow as a Python script, but parallelizing such a script to run on many machines in a fault-tolerant way is quite hard. Wouldn’t it be great if there were a framework that would distribute this script onto many machines without you having to manage any clusters or retries? That’s exactly what Google Cloud Dataflow (which has an open source API in the form of an incubating Apache project called Apache Beam) can do.
Processing Landsat data at scale using Google Cloud Dataflow
Cloud Dataflow provides a fully-managed, autoscaling, serverless execution environment for data pipelines written in Apache Beam. While it excels as a unified batch and stream-processing framework and as an Apache Pig/Apache Spark replacement for doing data processing jobs without having to spin up (and manage) Apache Hadoop clusters, you can also use Cloud Dataflow to parallelize your scripting code across many machines, helping to bring speed, reliability, and scalability to the process.
In the remainder of this post, we’ll explain how it’s done. Although the use case involved is fairly specific, the same principles will apply to many other workloads across different industries.
Main Script
The Python code is quite straightforward. The Landsat dataset on GCP includes an index file that is read line-by-line using beam.Read on a TextFileSource:
scenes = (p
| 'read_index' >> beam.Read(beam.io.TextFileSource(index_file))
| 'filter_scenes' >> beam.FlatMap(lambda line: filterScenes(line, lat, lon) )
| 'least_cloudy' >> beam.CombinePerKey(clearest)
)
The results from beam.Read are piped to the filterScenes method, which finds scenes that cover the lat/lon in which we are interested. We will get multiple scenes because the spacecraft makes 2–3 passes over the area every month, so we next find the clearest image for each month.
The information about those scenes is written to an output file, and the vegetation index is computed for each scene:
# write out info about scene
scenes | beam.Map(lambda (yrmon, scene): scene.__dict__) | 'scene_info' >> beam.io.textio.WriteToText(output_file)
# compute ndvi on scene
scenes | 'compute_ndvi' >> beam.Map(lambda (yrmon, scene): ndvi.computeNdvi(scene.BASE_URL, 'gs://cloud-training-demos/landsat/'))
The code above also illustrates how you can pipe the result of a transform (i.e. scenes) to two places in Google Cloud Dataflow. The end result, over Reunion Island (in the Indian Ocean), looks like this:
There are three methods being invoked from the pipeline above: filterScenes, clearest, and computeNdvi. Let’s look at these methods next.
Transforms
The filterScenes method verifies that the scene contains the (lat, lon) in question.
def filterScenes(line, lat, lon):
scene = SceneInfo(line)
if scene.contains(lat, lon):
yrmon = '{0}-{1}'.format(scene.DATE_ACQUIRED.year, scene.DATE_ACQUIRED.month)
yield (yrmon, scene)
The SceneInfo class parses a single line of the comma-separated-values file and stores the values. If the scene meets the spatial criterion, the method returns a 2-tuple of the year-month and the scene. Note that filterScenes is a generator method. (In general, Map in Python Dataflow corresponds to lambda functions with one return value per input, and FlatMap corresponds to generator functions that are not one-to-one.)
The method clearest takes a list of scenes and finds the one with minimum cloud cover:
def clearest(scenes):
if scenes:
return min(scenes, key=lambda s: s.CLOUD_COVER)
else:
return None
computeNdvi consists of downloading the appropriate images, computing the normalized difference, and uploading the result to Google Cloud Storage. (See the github repo for full code.)
Landsat over an area
If we are interested in a single point, it is sufficient to simply look for Landsat images that cover that one point (assuming that we are careful to disregard areas within the bounding box that were not actually scanned by the spacecraft). Here, though, we are interested in coverage of an area. This makes the problem a little more difficult since individual Landsat scenes may cover only part of the island:
In our workflow, therefore, we need to look for all Landsat images that cover any part of the island. Then, for each unique path and row of the spacecraft, we should find the clearest image during the month. These images can then be mosaiced to form the Landsat image over the island.
Fortunately, Python (and Cloud Dataflow) are sufficiently expressive to handle this task with ease. Here’s a more sophisticated version of the above script:
lat =-21.1; lon = 55.50 # center of Reunion Island
dlat = 0.4; dlon = 0.4
# Read the index file and find all scenes that cover this area
allscenes = (p
| 'read_index' >> beam.Read(beam.io.TextFileSource(index_file))
| 'to_scene' >> beam.Map(lambda line: SceneInfo(line))
| 'by_area' >> beam.FlatMap(lambda scene: filterByArea(scene,lat+dlat,lon-dlon,lat-dlat,lon+dlon) )
)
# for each month and spacecraft-coverage-pattern (given by the path and row), find clearest scene
scenes = (allscenes
| 'cov_month' >> beam.Map(lambda scene: (scene.month_path_row(), scene))
| 'least_cloudy' >> beam.CombinePerKey(clearest)
| 'yrmon-scene' >> beam.Map(lambda (key,scene): (scene.yrmon(), scene))
)
The new functions in the above snippet deal with the intersection of bounding boxes and creating appropriate keys:
class SceneInfo:
# as before …
def intersects(self, slat, wlon, nlat, elon):
return (nlat > self.SOUTH_LAT) and (slat < self.NORTH_LAT) and (elon > self.WEST_LON) and (wlon < self.EAST_LON)
def month_path_row(self):
return '{}-{}-{}'.format(self.yrmon(), self.WRS_PATH, self.WRS_ROW) def yrmon(self):
return '{}-{:02d}'.format(self.DATE_ACQUIRED.year, self.DATE_ACQUIRED.month) def filterByArea(scene, slat, wlon, nlat, elon):
if scene.intersects(slat, wlon, nlat, elon):
yield scene
Parallel, distributed execution
You can run the above code locally as long as you have Python packages for GDAL (needed to read Landsat) and Google Cloud Dataflow installed; simply run the dfndvi.py script. Alternately, spin up a Google Compute Engine instance, install the above Python packages using install_packages.sh, and run dfndvi.py. Doing that for 2015 and visualizing the resulting images as a movie shows changes in vegetation across the year:
One of the key benefits of Google Cloud Dataflow is not having to run it locally or on a single machine — rather, you can run it at scale on many machines to get the job completed quickly. To do that, change the pipeline runner from DirectPipelineRunner to DataflowPipelineRunner and supply your GCP project id. The run_oncloud.sh script specifies the necessary command-line parameters to lob off the computation to Google Cloud:
./dfndvi.py \
- project=$PROJECT \
- runner=DataflowPipelineRunner \
- staging_location=gs://$BUCKET/landsat/staging \
- temp_location=gs://$BUCKET/landsat/staging \
- index_file=gs://gcp-public-data-landsat/index.csv.gz \
- max_num_workers=10 \
- output_file=gs://$BUCKET/landsat/output/scenes.txt \
- output_dir=gs://$BUCKET/landsat/output \
- job_name=monthly-landsat \
- save_main_session \
- setup_file=./setup.py
Running the script in distributed fashion with Cloud Dataflow also brings orchestration and failure handling to the process. If one of the workers fails, for example, a replacement worker will take over its task. If some step is taking too long, more workers will automatically be provisioned. In addition, the GCP web console allows you to monitor the task as it executes, showing progress and number of elements processed at each step:
Conclusion
In this post, you learned how to process the public Landsat dataset on Google Cloud Storage in a distributed manner, the concepts of which can be applied to many other use cases. The key ingredient here was Google Cloud Dataflow, which lets you distribute your Python scripts over lots of machines without having to configure or manage servers.
Happy coding!