How to do distributed processing of Landsat data in Python

Using Cloud Dataflow and Google Cloud Public Datasets

Originally posted on Google Cloud Blog at https://cloud.google.com/blog/products/gcp/how-to-do-distributed-processing-of-landsat-data-in-python

One common data analysis task across the agricultural industry, as well in academia and government (for drought studies, climate modeling, and so on), is to create a monthly vegetation index from Landsat images, which is now available as a public dataset on Google Cloud Platform (source of Landsat images: U.S. Geological Survey). One approach to create such a monthly vegetation index is to write a data processing script that does the following:

  1. Find all the Landsat images that cover the location in question.
  2. Find the least-cloudy image for each month, making sure to correctly handle months when the study area needs multiple Landsat images for complete coverage.
  3. For each image, compute the normalized difference between bands 3 and 4 or 4 and 5 depending on the type of spacecraft — this is the vegetation index known as NDVI.

If you were the data analyst or engineer assigned to this task you could code this workflow as a Python script, but parallelizing such a script to run on many machines in a fault-tolerant way is quite hard. Wouldn’t it be great if there were a framework that would distribute this script onto many machines without you having to manage any clusters or retries? That’s exactly what Google Cloud Dataflow (which has an open source API in the form of an incubating Apache project called Apache Beam) can do.

Processing Landsat data at scale using Google Cloud Dataflow

In the remainder of this post, we’ll explain how it’s done. Although the use case involved is fairly specific, the same principles will apply to many other workloads across different industries.

Main Script

scenes = (p
| 'read_index' >> beam.Read(beam.io.TextFileSource(index_file))
| 'filter_scenes' >> beam.FlatMap(lambda line: filterScenes(line, lat, lon) )
| 'least_cloudy' >> beam.CombinePerKey(clearest)
)

The results from beam.Read are piped to the filterScenes method, which finds scenes that cover the lat/lon in which we are interested. We will get multiple scenes because the spacecraft makes 2–3 passes over the area every month, so we next find the clearest image for each month.

The information about those scenes is written to an output file, and the vegetation index is computed for each scene:

# write out info about scene
scenes | beam.Map(lambda (yrmon, scene): scene.__dict__) | 'scene_info' >> beam.io.textio.WriteToText(output_file)
# compute ndvi on scene
scenes | 'compute_ndvi' >> beam.Map(lambda (yrmon, scene): ndvi.computeNdvi(scene.BASE_URL, 'gs://cloud-training-demos/landsat/'))

The code above also illustrates how you can pipe the result of a transform (i.e. scenes) to two places in Google Cloud Dataflow. The end result, over Reunion Island (in the Indian Ocean), looks like this:

There are three methods being invoked from the pipeline above: filterScenes, clearest, and computeNdvi. Let’s look at these methods next.

Transforms

def filterScenes(line, lat, lon):
scene = SceneInfo(line)
if scene.contains(lat, lon):
yrmon = '{0}-{1}'.format(scene.DATE_ACQUIRED.year, scene.DATE_ACQUIRED.month)
yield (yrmon, scene)

The SceneInfo class parses a single line of the comma-separated-values file and stores the values. If the scene meets the spatial criterion, the method returns a 2-tuple of the year-month and the scene. Note that filterScenes is a generator method. (In general, Map in Python Dataflow corresponds to lambda functions with one return value per input, and FlatMap corresponds to generator functions that are not one-to-one.)

The method clearest takes a list of scenes and finds the one with minimum cloud cover:

def clearest(scenes):
if scenes:
return min(scenes, key=lambda s: s.CLOUD_COVER)
else:
return None

computeNdvi consists of downloading the appropriate images, computing the normalized difference, and uploading the result to Google Cloud Storage. (See the github repo for full code.)

Landsat over an area

In our workflow, therefore, we need to look for all Landsat images that cover any part of the island. Then, for each unique path and row of the spacecraft, we should find the clearest image during the month. These images can then be mosaiced to form the Landsat image over the island.

Fortunately, Python (and Cloud Dataflow) are sufficiently expressive to handle this task with ease. Here’s a more sophisticated version of the above script:

lat =-21.1; lon = 55.50 # center of Reunion Island
dlat = 0.4; dlon = 0.4
# Read the index file and find all scenes that cover this area
allscenes = (p
| 'read_index' >> beam.Read(beam.io.TextFileSource(index_file))
| 'to_scene' >> beam.Map(lambda line: SceneInfo(line))
| 'by_area' >> beam.FlatMap(lambda scene: filterByArea(scene,lat+dlat,lon-dlon,lat-dlat,lon+dlon) )
)
# for each month and spacecraft-coverage-pattern (given by the path and row), find clearest scene
scenes = (allscenes
| 'cov_month' >> beam.Map(lambda scene: (scene.month_path_row(), scene))
| 'least_cloudy' >> beam.CombinePerKey(clearest)
| 'yrmon-scene' >> beam.Map(lambda (key,scene): (scene.yrmon(), scene))
)

The new functions in the above snippet deal with the intersection of bounding boxes and creating appropriate keys:

class SceneInfo:
# as before …
def intersects(self, slat, wlon, nlat, elon):
return (nlat > self.SOUTH_LAT) and (slat < self.NORTH_LAT) and (elon > self.WEST_LON) and (wlon < self.EAST_LON)

def month_path_row(self):
return '{}-{}-{}'.format(self.yrmon(), self.WRS_PATH, self.WRS_ROW)
def yrmon(self):
return '{}-{:02d}'.format(self.DATE_ACQUIRED.year, self.DATE_ACQUIRED.month)
def filterByArea(scene, slat, wlon, nlat, elon):
if scene.intersects(slat, wlon, nlat, elon):
yield scene

Parallel, distributed execution

One of the key benefits of Google Cloud Dataflow is not having to run it locally or on a single machine — rather, you can run it at scale on many machines to get the job completed quickly. To do that, change the pipeline runner from DirectPipelineRunner to DataflowPipelineRunner and supply your GCP project id. The run_oncloud.sh script specifies the necessary command-line parameters to lob off the computation to Google Cloud:

./dfndvi.py \
- project=$PROJECT \
- runner=DataflowPipelineRunner \
- staging_location=gs://$BUCKET/landsat/staging \
- temp_location=gs://$BUCKET/landsat/staging \
- index_file=gs://gcp-public-data-landsat/index.csv.gz \
- max_num_workers=10 \
- output_file=gs://$BUCKET/landsat/output/scenes.txt \
- output_dir=gs://$BUCKET/landsat/output \
- job_name=monthly-landsat \
- save_main_session \
- setup_file=./setup.py

Running the script in distributed fashion with Cloud Dataflow also brings orchestration and failure handling to the process. If one of the workers fails, for example, a replacement worker will take over its task. If some step is taking too long, more workers will automatically be provisioned. In addition, the GCP web console allows you to monitor the task as it executes, showing progress and number of elements processed at each step:

Conclusion

Happy coding!