Luigi: An ExternalProgramTask example – Converting JSON to CSV
I’ve been playing around with the Python library Luigi which is used to build pipelines of batch jobs and I struggled to find an example of an ExternalProgramTask so this is my attempt at filling that void.
I’m building a little data pipeline to get data from the meetup.com API and put it into CSV files that can be loaded into Neo4j using the LOAD CSV command.
The first task I created calls the /groups endpoint and saves the result into a JSON file:
import luigi import requests import json from collections import Counter class GroupsToJSON(luigi.Task): key = luigi.Parameter() lat = luigi.Parameter() lon = luigi.Parameter() def run(self): seed_topic = "nosql" uri = "https://api.meetup.com/2/groups?⊤ic={0}⪫={1}&lon={2}&key={3}".format(seed_topic, self.lat, self.lon, self.key) r = requests.get(uri) all_topics = [topic["urlkey"] for result in r.json()["results"] for topic in result["topics"]] c = Counter(all_topics) topics = [entry[0] for entry in c.most_common(10)] groups = {} for topic in topics: uri = "https://api.meetup.com/2/groups?⊤ic={0}⪫={1}&lon={2}&key={3}".format(topic, self.lat, self.lon, self.key) r = requests.get(uri) for group in r.json()["results"]: groups[group["id"]] = group with self.output().open('w') as groups_file: json.dump(list(groups.values()), groups_file, indent=4, sort_keys=True) def output(self): return luigi.LocalTarget("/tmp/groups.json")
We define a few parameters at the top of the class which will be passed in when this task is executed. The most interesting lines of the run function are the last couple where we write the JSON to a file. self.output() refers to the target defined in the output function which in this case is /tmp/groups.json.
Now we need to create a task to convert that JSON file into CSV format. The jq command line tool does this job well so we’ll use that. The following task does the job:
from luigi.contrib.external_program import ExternalProgramTask class GroupsToCSV(luigi.contrib.external_program.ExternalProgramTask): file_path = "/tmp/groups.csv" key = luigi.Parameter() lat = luigi.Parameter() lon = luigi.Parameter() def program_args(self): return ["./groups.sh", self.input()[0].path, self.output().path] def output(self): return luigi.LocalTarget(self.file_path) def requires(self): yield GroupsToJSON(self.key, self.lat, self.lon)
groups.sh
#!/bin/bash in=${1} out=${2} echo "id,name,urlname,link,rating,created,description,organiserName,organiserMemberId" > ${out} jq -r '.[] | [.id, .name, .urlname, .link, .rating, .created, .description, .organizer.name, .organizer.member_id] | @csv' ${in} >> ${out}
I wanted to call jq directly from the Python code but I couldn’t figure out how to do it so putting that code in a shell script is my workaround.
The last piece of the puzzle is a wrapper task that launches the others:
import os class Meetup(luigi.WrapperTask): def run(self): print("Running Meetup") def requires(self): key = os.environ['MEETUP_API_KEY'] lat = os.getenv('LAT', "51.5072") lon = os.getenv('LON', "0.1275") yield GroupsToCSV(key, lat, lon)
Now we’re ready to run the tasks:
$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup DEBUG: Checking if Meetup() is complete DEBUG: Checking if GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) is complete INFO: Informed scheduler that task Meetup__99914b932b has status PENDING DEBUG: Checking if GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) is complete INFO: Informed scheduler that task GroupsToCSV_xxx_51_5072_0_1275_e07372cebf has status PENDING INFO: Informed scheduler that task GroupsToJSON_xxx_51_5072_0_1275_e07372cebf has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 3 INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task GroupsToJSON_xxx_51_5072_0_1275_e07372cebf has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) INFO: Running command: ./groups.sh /tmp/groups.json /tmp/groups.csv INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task GroupsToCSV_xxx_51_5072_0_1275_e07372cebf has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running Meetup() Running Meetup INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done Meetup() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task Meetup__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 3 tasks of which: * 3 ran successfully: - 1 GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) - 1 GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) - 1 Meetup() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
Looks good! Let’s quickly look at our CSV file:
$ head -n10 /tmp/groups.csv id,name,urlname,link,rating,created,description,organiserName,organiserMemberId 1114381,"London NoSQL, MySQL, Open Source Community","london-nosql-mysql","https://www.meetup.com/london-nosql-mysql/",4.28,1208505614000,"<p>Meet others in London interested in NoSQL, MySQL, and Open Source Databases.</p>","Sinead Lawless",185675230 1561841,"Enterprise Search London Meetup","es-london","https://www.meetup.com/es-london/",4.66,1259157419000,"<p>Enterprise Search London is a meetup for anyone interested in building search and discovery experiences — from intranet search and site search, to advanced discovery applications and beyond.</p> <p>Disclaimer: This meetup is NOT about SEO or search engine marketing.</p> <p><strong>What people are saying:</strong></p> <ul> <li><span>""Join this meetup if you have a passion for enterprise search and user experience that you would like to share with other able-minded practitioners."" — Vegard Sandvold</span></li> <li><span>""Full marks for vision and execution. Looking forward to the next Meetup."" — Martin White</span></li> <li><span>“Consistently excellent” — Helen Lippell</span></li> </ul>
Sweet! And what if we run it again?
$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup DEBUG: Checking if Meetup() is complete INFO: Informed scheduler that task Meetup__99914b932b has status DONE INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=172768377, workers=1, host=Marks-MBP-4, username=markneedham, pid=4531) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 present dependencies were encountered: - 1 Meetup() Did not run any tasks This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
As expected nothing happens since our dependencies are already satisfied and we have our first Luigi pipeline up and running.
Reference: | Luigi: An ExternalProgramTask example – Converting JSON to CSV from our WCG partner Mark Needham at the Mark Needham Blog blog. |