RikoPython 流处理引擎
Riko是一款Python 流处理引擎,类似Yahoo Pipes。采用纯python开发,用于分析处理结构化数据流。拥有同步和异步APIs,同时也支持并行RSS feeds。Riko也支持字符终端界面。
功能特性:
-
可读取csv/xml/json/html文件。
-
通过模块化的管道可创建文本流和数据流。
-
可解析、处理、提取RSS/Atom feeds。
-
可创建强大的混合型APIs和maps。
-
支持并行处理。
使用示例代码:
>>> ### Create a SyncPipe flow ### >>> # >>> # `SyncPipe` is a convenience class that creates chainable flows >>> # and allows for parallel processing. >>> from riko.collections.sync import SyncPipe >>> >>> ### Set the pipe configurations ### >>> # >>> # Notes: >>> # 1. the `detag` option will strip all html tags from the result >>> # 2. fetch the text contained inside the 'body' tag of the hackernews >>> # homepage >>> # 3. replace newlines with spaces and assign the result to 'content' >>> # 4. tokenize the resulting text using whitespace as the delimeter >>> # 5. count the number of times each token appears >>> # 6. obtain the raw stream >>> # 7. extract the first word and its count >>> # 8. extract the second word and its count >>> # 9. extract the third word and its count >>> url = 'https://news.ycombinator.com/' >>> fetch_conf = { ... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1 >>> >>> replace_conf = { ... 'rule': [ ... {'find': '\r\n', 'replace': ' '}, ... {'find': '\n', 'replace': ' '}]} >>> >>> flow = ( ... SyncPipe('fetchpage', conf=fetch_conf) # 2 ... .strreplace(conf=replace_conf, assign='content') # 3 ... .stringtokenizer(conf={'delimiter': ' '}, emit=True) # 4 ... .count(conf={'count_key': 'content'})) # 5 >>> >>> stream = flow.output # 6 >>> next(stream) # 7 {"'sad": 1} >>> next(stream) # 8 {'(': 28} >>> next(stream) # 9 {'(1999)': 1}
评论