luigi 学习
luigi 学习
1、mac 上安装luigi
pip install luigi
pip install boto3 (luigi依赖 boto3)
2、基本概念
class Streams(luigi.Task):""" Faked version right now, just generates bogus data.""" date = luigi.DateParameter()def run(self):""" Generates bogus data and writes it into the :py:meth:`~.Streams.output` target.""" with self.output().open('w') as output:for _ in range(1000):output.write('{} {} {}\n'.format(random.randint(0, 999),random.randint(0, 999),random.randint(0, 999)))def output(self):""" Returns the target output for this task.In this case, a successful execution of this task will create a file in the local file system.:return: the target output for this task.:rtype: object (:py:class:`luigi.target.Target`)""" return luigi.LocalTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv'))class AggregateArtists(luigi.Task):""" This task runs over the target data returned by :py:meth:`~/.Streams.output` andwrites the result into its :py:meth:`~.AggregateArtists.output` target (local file).""" date_interval = luigi.DateIntervalParameter()def output(self):""" Returns the target output for this task.In this case, a successful execution of this task will create a file on the local filesystem.:return: the target output for this task.:rtype: object (:py:class:`luigi.target.Target`)""" return luigi.LocalTarget("data/artist_streams_{}.tsv".format(self.date_interval))def requires(self):""" This task's dependencies:* :py:class:`~.Streams`:return: list of object (:py:class:`luigi.task.Task`)""" return [Streams(date) for date in self.date_interval]def run(self):artist_count = defaultdict(int)for t in self.input():with t.open('r') as in_file:for line in in_file:_, artist, track = line.strip().split()artist_count[artist] += 1with self.output().open('w') as out_file:for artist, count in six.iteritems(artist_count):out_file.write('{}\t{}\n'.format(artist, count))
run()是这个task要执行的内容
requires()是这个task所依赖的任务,这里依赖一系列的Stream
output()是这个task的输出
input()这个是所依赖的task产生的输出
二、使用central planner
先用
luigid --background --pidfile--logdir --state-path
打开liguid server
然后运行任务,比如:
luigi --module top_artists2 Top10Artists --date-interval 2012-06
注意,要去掉 --local-scheduler
然后可以用 localhost:8082来访问现在的任务
如果A -> B,A依赖B,那么B的output可以在A里面直接用input()来使用,如果B的output是若干文件的话,那么在A中的input()也是若干文件,可以用for循环来读取
posted on 2018-10-25 13:10 potatoknight 阅读( ...) 评论( ...) 编辑 收藏转载于:https://www.cnblogs.com/longjmp/p/9849175.html
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
