Fig流水线式开发框架
Fig(无花果)是一个根据多任务流水线模型开发的运行框架,框架多任务并发使用的java的线程池进行控制,使用队列实现任务间的数据传递
预览版 仓库 地址:https://oss.sonatype.org/content/repositories/snapshots
maven 依赖 :
<dependency>
<groupId>com.github.taomus.fig</groupId>
<artifactId>fig-core</artifactId>
<version>0.1.0-SNAPSHOT</version>
<type>module</type>
</dependency>
<dependency>
<groupId>com.github.taomus.fig</groupId>
<artifactId>fig-spring-plugin</artifactId>
<version>0.1.0-SNAPSHOT</version>
<type>module</type>
</dependency>
Xtend 代码实例:
package com.github.test1.stock
import com.github.taomus.fig.core.engine.Data
import com.github.taomus.fig.core.engine.Fig
import com.github.taomus.fig.core.engine.FigEngine
import com.github.taomus.fig.core.engine.Task
import com.github.test.stock.entity.StockData
import java.nio.charset.Charset
import java.util.Arrays
import java.util.Vector
import joinery.DataFrame
import org.apache.commons.lang3.SerializationUtils
import org.jsoup.Connection
import org.jsoup.Jsoup
import org.slf4j.LoggerFactory
import org.springframework.util.StreamUtils
class CollectStock1 extends Fig {
val static LOG = LoggerFactory.getLogger(CollectStock1)
def static void main(String[] args){
FigEngine.instance.addModule(CollectStock1)
FigEngine.instance.run()
FigEngine.instance.taskQueue.put(Data.create("start",null))
}
def String getUrl(String code,String date) {
if (code.startsWith("0")) {
return ''' http://quotes.money.163.com/service/chddata.html?code=1«code»&start=«date»0101&end=«date»1231
'''
} else {
return ''' http://quotes.money.163.com/service/chddata.html?code=0«code»&start=«date»0101&end=«date»1231
'''
}
}
@Task("start")
def Data[] getStockCodes(Data value){
return #[
Data.create("buildData",new StockData("平安银行","000001")),
Data.create("buildData",new StockData("万科A","000002"))
]
}
@Task("buildData")
def Data[] collect(Data value){
var stockinfo = value.getData as StockData
var results = new Vector<Data>();
for(date:2000..2020){
var info = SerializationUtils.<StockData>clone(stockinfo)
info.date = String.valueOf(date)
results.add(Data.create("stock_history",info))
}
return results
}
@Task("stock_history")
def Data startHistory(Data value) {
var d = value.getData() as StockData
var String url = getUrl(d.code,d.date)
LOG.info(url)
var Connection connection = Jsoup.connect(url);
var Connection.Response response = connection.method(Connection.Method.GET).ignoreContentType(true).timeout(10 *
1000).execute();
var a = StreamUtils.copyToString(response.bodyStream, Charset.forName("UTF-8"))
LOG.info(a)
var data = a.split("\n")
if(data.size == 1){
return null
}
var DataFrame<Object> df = new DataFrame("日期", "股票代码", "名称", "收盘价", "最高价", "最低价", "开盘价", "前收盘", "涨跌额", "涨跌幅",
"换手率", "成交量", "成交金额", "总市值", "流通市值", "成交笔数");
for(index : 1..data.length-1){
df.append(Arrays.asList(data.get(index).split(',')))
}
var results = new Vector();
var indexs = df.index();
for (Object index : indexs) {
var sd = new StockData()
sd.date = df.get(index,"日期") as String
sd.code = df.get(index,"股票代码") as String
sd.code = sd.code.substring(1)
sd.name = d.name
sd.opening = Float.valueOf(df.get(index,"开盘价") as String)
sd.ending = Float.valueOf(df.get(index,"收盘价") as String)
sd.low = Float.valueOf(df.get(index,"最低价") as String)
sd.hig = Float.valueOf(df.get(index,"最高价") as String)
results.add(sd)
}
var resdata = Data.create("calc",results.reverse);
resdata.priority = 2
return resdata
}
@Task("calc")
def Data calc(Data value) {
return null;
}
}
评论