首页 » 技术分享 » SODBASE CEP学习进阶篇(六):实现反压和流限速

SODBASE CEP学习进阶篇(六):实现反压和流限速

 

前面文章介绍过流数据输入速率要和处理能力相匹配,短时数据爆发由内部缓冲队列来缓冲。如果确实存在某个时间点持续数据爆发,可以考虑采取反压限流的方法。

1. 示例操作步骤

(1)下载SODBASE Studio2.0.22(sp1)以上版本,解压,打开configuration/global.properties,将引擎的缓冲队列长度设的较小,作以下配置

maxqueuelength=100
warnqueuelength=20

(2)下载示例EPL模型backpress01.sod、backpress02.sod

backpress01功能:CPU满负荷不停的产生模拟数据,打印到屏幕。因为严格地讲,屏幕打印的速度也是跟不上CPU满负荷产生数据速度的。因此容易达到队列报警值20。

backpress02功能:将system.sys_warn的报警事件反接到backpress01的控制Socket端口上。

(3)打开SODBASE Studio,导入backpress01.sod、backpress02.sod

先测试运行backpress02,再将backpress01也测试运行起来。

(4)结果输出

可以看到如下的输出结果,并且发现隔一段时间,数据就慢下来,这就是反压限速的原因。

2. 工作原理

2.1可控制输入适配器

上面示例能够实现反压限速,是因为编写backpress01的输入适配器时,继承了SODBASE CEP一类可控制输入适配器。这类可控制输入适配器,通过Socket监听控制事件,并在接收到控制事件后调用回调函数callback(PrimitiveEvent e),在回调函数中可以让输入流sleep一段时间。

读者要实现同样的功能,只需集成实现com.sodbase.inputadaptor.controllable.ControllableInputAdaptorI类。前3个参数默认为数据流名、控制监听端口、控制监听端口超时时间(ms)。读者要添加参数,可以从第4个参数开始添加。示例中的输入适配器代码如下

public class ControllableTestInputAdaptor extends ControllableInputAdaptorI
{
	private boolean running=true;
	private long suspendtime=0;
	@Override
	public void setUp()
	{
		//必须调用super.setUp()启动控制监听端口
		super.setUp();
	}
	
	@Override
	public void callback(PrimitiveEvent primitiveEvent)
	{
		if(primitiveEvent.getAttributeValueType("cause").getValue().equals(Constants.causecode1))
			suspendtime=5000;
	}

	@Override
	public boolean isRunning()
	{
		return running;
	}
	
	@Override
	public void stopInputStream()
	{
		//必须调用super.stopInputStream()关闭控制监听端口
		super.stopInputStream();
		this.running=false;
	}
	public void run()
	{
		int count = 1;
		while (running)
		{
			try
			{
				if(suspendtime>0)
				{
					Thread.sleep(suspendtime);
					suspendtime=0;
				}
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			PrimitiveEvent primitiveEvent = new PrimitiveEvent();
			primitiveEvent.getAttributeMap().put("id", new ValueType(String.valueOf(count),"string"));
			count++;
			Date d = new Date();
			long time = d.getTime();
			primitiveEvent.setStart_ts(time);
			primitiveEvent.setEnd_ts(time);
			this.putEventToStream(primitiveEvent);
		}
	}

}

2.2 系统报警事件

SODBASE CEP允许通过warnqueuelength配置缓冲队列长度的报警长度,可针对报警采取一些运维管理措施。如果不配置,默认为最大缓冲队列长度maxqueuelength的80%。

system.sys_warn和system.sys_error是系统内置报警流,通常会含3个字段

(1)cause:报警和报错的原因

(2)queryname:引起报警和报错的EPL语句名称

(3)message:消息提示

如cause=’warnqueuelengthexceeded’时,即超过了缓冲队列的报警阈值。

2.3 注意事项

(1)多用户环境下,报警流queryname字段会加前缀"用户名."。

(2)设计系统时,不要过度依赖反压限速功能,因为反压限速会增加输入端负载,也会给系统带来新的问题。正确方式是在系统架构初期,采用模拟数据和最大输入速率配置好缓冲区大小,设计好处理方式并留出余量,让处理能力和输入速率相匹配。

SODBASE CEP用于轻松、高效实施数据监测、监控类、实时交易类项目微笑EPL语法见SODSQL写法与示例。图形化建模请使用SODBASE Studio嵌入式方式编程参见运行第一个EPL例子。与Storm集成参见EPL与Storm集成缓存扩展参见与分布式缓存集成

转载自原文链接, 如需删除请联系管理员。

原文链接:SODBASE CEP学习进阶篇(六):实现反压和流限速,转载请注明来源!

0