ZeroMQ pub和sub模式示例 广播和订阅 AAuto | Thinkai's Blog

Thinkai's Blog

Autohoutkey|Python|php|aardio|VOIP|IT 爱好者

ZeroMQ pub和sub模式示例 广播和订阅 AAuto 1553

作者为 发表

AAuto快手

Pub端(服务器):

import zeromq;

//准备上下文和PUB套接字
var context = zeromq.context()
var publisher = context.zmq_socket_pub()
publisher.bind("tcp://*:5556")
//publisher.bind("ipc://weather.ipc")

//初始化随机数生成器
math.randomize(time.tick())
while(1){
    //获取数据,忽悠下老板
    var zipcode = math.random(10000, 11000)
    var temperature = math.random(-80, 135)
    var relhumidity = math.random(10, 60)
    
    //向所有订阅者发送消息
    publisher.sendMsg( string.format("%05d %d %d", zipcode, temperature, relhumidity) )
}

publisher.close()
context.term()

Sub端:

import zeromq;

io.open()
start = function(filter,tcp){
        var context = zeromq.context()
        
        //用于和服务端通信的套接字
        io.print("Collecting updates from weather server...")
        var subscriber = context.zmq_socket_sub()
        subscriber.connect( tcp : "tcp://localhost:5556" )
        
        //设置订阅信息,默认为纽约,邮编10001
        filter := {byte value[] = "10001 "}
        subscriber.setsockopt(6/*_ZMQ_SUBSCRIBE*/,filter)
//        var ret = subscriber.setsockopt(0x11/*_ZMQ_LINGER*/,{int value = 0})
        io.print(ret);
        
        //处理100条更新信息
        var updateNum = 100;
        var totalTemp = 0;
        for(i=1;updateNum){
                var msg = zeromq.message()
                var ret = subscriber.recvMsg(msg,1/*_ZMQ_NOBLOCK*/);
                if(ret){
                        var str = ..raw.tostring( msg.getData(),1,msg.getSize())
                        msg.close();
                        var zipcode, temperature, relhumidity = string.match(str,"(\d*) ([\d-]*) (\d*)")
                        io.print(i,temperature)
                        totalTemp += temperature;
                }
                sleep(1000)//加点延迟
        }
        io.print(string.format("Average temperature for zipcode '%s' was %dF, total = %d",
            filter.value, (totalTemp / updateNum), totalTemp))
        
        subscriber.close();
        context.term();
}

start()

以下是一个监听AMI socket消息并广播的Pub端实例代码:

import win.ui;
import zeromq;
import thread.command;
/*DSG{{*/
mainForm = ..win.form(cls="ChannelMon";text="ChannelMon";right=0;bottom=0;border="none";exmode="none";max=false;min=false;mode="popup";sysmenu=false;title=false)
mainForm.add()
/*}}*/



//准备上下文和PUB套接字
var context = zeromq.context()
var publisher = context.zmq_socket_pub()
publisher.bind("tcp://*:5556")
var listener = thread.command();
listener.amierror = function(msg){
    mainForm.msgbox("致命错误:"++msg)
}
listener.callin = function(obj){
msg = string.concat(obj.exten," ",obj.phone," ",obj.channel," ",obj.uniqid);
//console.log(msg);
publisher.sendMsg(msg);
}

//AMI桥接通道监控
listener.invoke(

    function(hwnd){ 
        import thread.command;
        import wsock.tcp.client;
		//开启AMI连接
		var tcp = wsock.tcp.client();
		tcp.connect("192.168.1.2",5038);
		//传递句柄
		thread.command.post("posthandle",tcp._handle)
		
		//登陆
		tcp.write('Action:login\r\nUsername:user\r\nSecret:passwd\r\n\r\n')
		//等待回复
		tcp.recv()
		
		//监听消息
		var ok = true
		while(ok = true){
			var pkg = tcp.recv()
			if(pkg!=null){
				var msg = string.split(pkg,'<\r\n\r\n>') //按双回车分割消息单元
				for k,v in msg {
					var line = string.split(v,'<\r\n>')  //按回车分割消息行
					if(table.range(line)>0){
						var msgobj = {};
						for m,n in line {
							var part = string.split(n,":")  //按冒号分割key和value
							if(part[1]!=null && part[2]!=null){ 
								msgobj[part[1]] = tostring(string.trimleft(part[2],space))
								//msgobj[part[1]] = string.trimleft(part[2],space)	
							} //if part
						} //for line
						//msgobj生成完毕
						
						if(msgobj["Event"]="Bridge" and msgobj["Bridgestate"]="Link" and msgobj["Channel1"]!=null){ //通道桥接
							if(string.find(msgobj["Channel1"],"DAHDI/.*")){
								var obj = {};
								obj.exten = msgobj["CallerID2"];
								obj.phone = msgobj["CallerID1"];
								obj.channel = msgobj["Channel1"];
								obj.uniqid = msgobj["Uniqueid2"];
								thread.command.post("callin",obj);
							}
						}
					} //if line
				}// for msg
			}else{
				ok = false
			} //if pkg
		} //while
		
		tcp.close();
		thread.command.post("amierror","到呼叫系统的AMI连接已断开!")
		return
    }
);

mainForm.onClose = function(hwnd,message,wParam,lParam){
	publisher.close()
}

win.loopMessage(); 
context.term()



来了就留个评论吧! 没有评论




友情链接:Autohotkey中文论坛Autohotkey中文帮助Autohotkey官网我的B站直播间如若生涯一场梦博客联系作者免GooglePlay APK下载

 主题设计 • skyfrit.com  Thinkai's Blog | 保留所有权利

63 queries in 1.891 seconds |