Luigi: Defining dynamic requirements (on output files)
In my last blog post I showed how to convert a JSON document containing meetup groups into a CSV file using Luigi, the Python library for building data pipelines. As well as creating that CSV file I wanted to go back to the meetup.com API and download all the members of those groups.
This was a rough flow of what i wanted to do:
- Take JSON document containing all groups
- Parse that document and for each group:
- Call the /members endpoint
- Save each one of those files as a JSON file
- Iterate over all those JSON files and create a members CSV file
In the previous post we created the GroupsToJSON task which calls the /groups endpoint on the meetup API and creates the file /tmp/groups.json.
Our new task has that as its initial requirement:
class MembersToCSV(luigi.Task): key = luigi.Parameter() lat = luigi.Parameter() lon = luigi.Parameter() def requires(self): yield GroupsToJSON(self.key, self.lat, self.lon)
But we also want to create a requirement on a task that will make those calls to the /members endpoint and store the result in a JSON file.
One of the patterns that Luigi imposes on us is that each task should only create one file so actually we have a requirement on a collection of tasks rather than just one. It took me a little while to get my head around that!
We don’t know the parameters of those tasks at compile time – we can only calculate them by parsing the JSON file produced by GroupsToJSON.
In Luigi terminology what we want to create is a dynamic requirement. A dynamic requirement is defined inside the run method of a task and can rely on the output of any tasks specified in the requires method, which is exactly what we need.
This code does the delegating part of the job:
class MembersToCSV(luigi.Task): key = luigi.Parameter() lat = luigi.Parameter() lon = luigi.Parameter() def run(self): outputs = [] for input in self.input(): with input.open('r') as group_file: groups_json = json.load(group_file) groups = [str(group['id']) for group in groups_json] for group_id in groups: members = MembersToJSON(group_id, self.key) outputs.append(members.output().path) yield members def requires(self): yield GroupsToJSON(self.key, self.lat, self.lon)
Inside our run method we iterate over the output of GroupsToJSON (which is our input) and we yield to another task as well as collecting its outputs in the array outputs that we’ll use later.
MembersToJSON looks like this:
class MembersToJSON(luigi.Task): group_id = luigi.IntParameter() key = luigi.Parameter() def run(self): results = [] uri = "https://api.meetup.com/2/members?&group_id={0}&key={1}".format(self.group_id, self.key) while True: if uri is None: break r = requests.get(uri) response = r.json() for result in response["results"]: results.append(result) uri = response["meta"]["next"] if response["meta"]["next"] else None with self.output().open("w") as output: json.dump(results, output) def output(self): return luigi.LocalTarget("/tmp/members/{0}.json".format(self.group_id))
This task generates one file per group containing a list of all the members of that group.
We can now go back to MembersToCSV and convert those JSON files into a single CSV file:
class MembersToCSV(luigi.Task): out_path = "/tmp/members.csv" key = luigi.Parameter() lat = luigi.Parameter() lon = luigi.Parameter() def run(self): outputs = [] for input in self.input(): with input.open('r') as group_file: groups_json = json.load(group_file) groups = [str(group['id']) for group in groups_json] for group_id in groups: members = MembersToJSON(group_id, self.key) outputs.append(members.output().path) yield members with self.output().open("w") as output: writer = csv.writer(output, delimiter=",") writer.writerow(["id", "name", "joined", "topics", "groupId"]) for path in outputs: group_id = path.split("/")[-1].replace(".json", "") with open(path) as json_data: d = json.load(json_data) for member in d: topic_ids = ";".join([str(topic["id"]) for topic in member["topics"]]) if "name" in member: writer.writerow([member["id"], member["name"], member["joined"], topic_ids, group_id]) def output(self): return luigi.LocalTarget(self.out_path) def requires(self): yield GroupsToJSON(self.key, self.lat, self.lon)
We then just need to add our new task as a requirement of the wrapper task:
And we’re ready to roll:
$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup --workers 3
We’ve defined the number of workers here as we can execute those calls to the /members endpoint in parallel and there are ~ 600 calls to make.
All the code from both blog posts is available as a gist if you want to play around with it.
Any questions/advice let me know in the comments or I’m @markhneedham on twitter.
Reference: | Luigi: Defining dynamic requirements (on output files) from our WCG partner Mark Needham at the Mark Needham Blog blog. |