99网
您的当前位置:首页luigi??1

luigi??1

来源:99网
luigi学习1

⼀、luigi介绍

luigi是基于python语⾔的,可帮助建⽴复杂流式批处理任务管理系统。这些批处理作业典型的有hadoop job,数据库数据的导⼊与导出,或者是机器学习算法等等。

⽬前已经有⼀些抽象层次较低的数据处理⼯具,⽐如hive,pig,cascading等。luigi并不是要取代他们,⽽是帮助你管理这些作业,luigi的task可以是⼀个hive查询,java写的hadoop作业,⼀个scala写的spark作业或⼀个python程序等。luigi提供了互相依赖的⼤量作业的⼯作流程管理,所以程序员可以把他们的精⼒放到作业本⾝。

⽬前有⼀些相似的项⽬⽐如Oozie和Azkaban。⼀个重要的区别是luigi并不仅仅为hadoop作业,它可以很⽅便的扩展其他类型的任务。⼆、luigi官⽹的hello world例⼦2.1top Artists例⼦的⽬的

这个例⼦的⽬的想要集合⼀些⽣产数据的流,然后找到前10个artists,并把最终的结果保存到数据库2.2Aggregate Artist Streams

class AggregateArtists(luigi.Task):

date_interval = luigi.DateIntervalParameter()

def output(self):

return luigi.LocalTarget(\"data/artist_streams_%s.tsv\" % self.date_interval) def requires(self):

return [Streams(date) for date in self.date_interval] def run(self):

artist_count = defaultdict(int)

for input in self.input():

with input.open('r') as in_file: for line in in_file:

timestamp, artist, track = line.strip().split() artist_count[artist] += 1

with self.output().open('w') as out_file:

for artist, count in artist_count.iteritems(): print >> out_file, artist, count

对于这个类的解释:

requires⽅法:这个⽅法指定了本task需要的依赖,在这个例⼦中,AggregateArttists依赖⼀个Stream作业,Stream作业需要⼀个⽇期作为参数。参数:每⼀个作业都可以定义⼀个或者多个参数,这些参数需要定义在类级别。⽐如上⾯这个类就有⼀个参数date_intervaloutput⽅法:定义了作业结果的保存地。

run⽅法:对于普通的task,你需要实现run⽅法。在run⽅法中可以是任何东西,可以创建⼦进程,进⾏长时间的算术运算等等。对于⼀些task的⼦类,你就不需要实现run⽅法了,⽐如JobTask要求你实现mapper和reducer⽅法。

LocalTarget:这是⼀个内置的类,可以帮助你很容易的读取或者写本地磁盘。并且保证对磁盘的操作是原⼦性的。2.3Streams

class Streams(luigi.Task):

date = luigi.DateParameter()

def run(self):

with self.output().open('w') as output: for _ in range(1000):

output.write('{} {} {}\\n'.format( random.randint(0, 999), random.randint(0, 999), random.randint(0, 999)))

def output(self):

return luigi.LocalTarget(self.date.strftime('data/streams_%Y_%m_%d_faked.tsv'))

这个类没有依赖,最终产⽣的效果是在本地⽂件系统上产⽣⼀个结果⽂件。2.4在本地执⾏

PYTHONPATH='' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06

执⾏完成之后,在当前⽬录下产⽣了⼀个data⽬录,data⽬录下的内容如下:

(my_python_env)[root@hadoop26 data]# ls

artist_streams_2012-06.tsv streams_2012_06_06_faked.tsv streams_2012_06_12_faked.tsv streams_2012_06_18_faked.tsv streams_2012_06_24_faked.tsv streams_2012_06_30_faked.tsvstreams_2012_06_01_faked.tsv streams_2012_06_07_faked.tsv streams_2012_06_13_faked.tsv streams_2012_06_19_faked.tsv streams_2012_06_25_faked.tsvstreams_2012_06_02_faked.tsv streams_2012_06_08_faked.tsv streams_2012_06_14_faked.tsv streams_2012_06_20_faked.tsv streams_2012_06_26_faked.tsvstreams_2012_06_03_faked.tsv streams_2012_06_09_faked.tsv streams_2012_06_15_faked.tsv streams_2012_06_21_faked.tsv streams_2012_06_27_faked.tsvstreams_2012_06_04_faked.tsv streams_2012_06_10_faked.tsv streams_2012_06_16_faked.tsv streams_2012_06_22_faked.tsv streams_2012_06_28_faked.tsvstreams_2012_06_05_faked.tsv streams_2012_06_11_faked.tsv streams_2012_06_17_faked.tsv streams_2012_06_23_faked.tsv streams_2012_06_29_faked.tsv

streams_*:就是stream作业⽣成的。

artist_*:是AggregateArtists⽣成的,就⼀个⽂件⽽已2.5扩展

再次运⾏上⾯的执⾏命令发现并没有执⾏任何操作,因为所有任务的output已经存在。这意味着luigi的task都是幂等的,也就是说不管执⾏多少次,作业的输出应该是不变的。

--local-scheduler告诉luigi不要去连接scheduler server。这是不推荐的运⾏⽅式,这种⽅式也就⽤在测试阶段。

因篇幅问题不能全部显示,请点此查看更多更全内容