前面文章介绍过流数据输入速率要和处理能力相匹配,短时数据爆发由内部缓冲队列来缓冲。如果确实存在某个时间点持续数据爆发,可以考虑采取反压限流的方法。
1. 示例操作步骤
(1)下载SODBASE Studio2.0.22(sp1)以上版本,解压,打开configuration/global.properties,将引擎的缓冲队列长度设的较小,作以下配置
maxqueuelength=100
warnqueuelength=20
(2)下载示例EPL模型backpress01.sod、backpress02.sod
backpress01功能:CPU满负荷不停的产生模拟数据,打印到屏幕。因为严格地讲,屏幕打印的速度也是跟不上CPU满负荷产生数据速度的。因此容易达到队列报警值20。
backpress02功能:将system.sys_warn的报警事件反接到backpress01的控制Socket端口上。
(3)打开SODBASE Studio,导入backpress01.sod、backpress02.sod
先测试运行backpress02,再将backpress01也测试运行起来。
(4)结果输出
可以看到如下的输出结果,并且发现隔一段时间,数据就慢下来,这就是反压限速的原因。
2. 工作原理
2.1可控制输入适配器
上面示例能够实现反压限速,是因为编写backpress01的输入适配器时,继承了SODBASE CEP一类可控制输入适配器。这类可控制输入适配器,通过Socket监听控制事件,并在接收到控制事件后调用回调函数callback(PrimitiveEvent e),在回调函数中可以让输入流sleep一段时间。
读者要实现同样的功能,只需集成实现com.sodbase.inputadaptor.controllable.ControllableInputAdaptorI类。前3个参数默认为数据流名、控制监听端口、控制监听端口超时时间(ms)。读者要添加参数,可以从第4个参数开始添加。示例中的输入适配器代码如下
public class ControllableTestInputAdaptor extends ControllableInputAdaptorI
{
private boolean running=true;
private long suspendtime=0;
@Override
public void setUp()
{
//必须调用super.setUp()启动控制监听端口
super.setUp();
}
@Override
public void callback(PrimitiveEvent primitiveEvent)
{
if(primitiveEvent.getAttributeValueType("cause").getValue().equals(Constants.causecode1))
suspendtime=5000;
}
@Override
public boolean isRunning()
{
return running;
}
@Override
public void stopInputStream()
{
//必须调用super.stopInputStream()关闭控制监听端口
super.stopInputStream();
this.running=false;
}
public void run()
{
int count = 1;
while (running)
{
try
{
if(suspendtime>0)
{
Thread.sleep(suspendtime);
suspendtime=0;
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
PrimitiveEvent primitiveEvent = new PrimitiveEvent();
primitiveEvent.getAttributeMap().put("id", new ValueType(String.valueOf(count),"string"));
count++;
Date d = new Date();
long time = d.getTime();
primitiveEvent.setStart_ts(time);
primitiveEvent.setEnd_ts(time);
this.putEventToStream(primitiveEvent);
}
}
}
2.2 系统报警事件
SODBASE CEP允许通过warnqueuelength配置缓冲队列长度的报警长度,可针对报警采取一些运维管理措施。如果不配置,默认为最大缓冲队列长度maxqueuelength的80%。
system.sys_warn和system.sys_error是系统内置报警流,通常会含3个字段
(1)cause:报警和报错的原因
(2)queryname:引起报警和报错的EPL语句名称
(3)message:消息提示
如cause=’warnqueuelengthexceeded’时,即超过了缓冲队列的报警阈值。
2.3 注意事项
(1)多用户环境下,报警流queryname字段会加前缀"用户名."。
(2)设计系统时,不要过度依赖反压限速功能,因为反压限速会增加输入端负载,也会给系统带来新的问题。正确方式是在系统架构初期,采用模拟数据和最大输入速率配置好缓冲区大小,设计好处理方式并留出余量,让处理能力和输入速率相匹配。
SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目。EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE Studio。嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成。缓存扩展参见与分布式缓存集成。
转载自原文链接, 如需删除请联系管理员。
原文链接:SODBASE CEP学习进阶篇(六):实现反压和流限速,转载请注明来源!