Pub端(服务器):
import zeromq; //准备上下文和PUB套接字 var context = zeromq.context() var publisher = context.zmq_socket_pub() publisher.bind("tcp://*:5556") //publisher.bind("ipc://weather.ipc") //初始化随机数生成器 math.randomize(time.tick()) while(1){ //获取数据,忽悠下老板 var zipcode = math.random(10000, 11000) var temperature = math.random(-80, 135) var relhumidity = math.random(10, 60) //向所有订阅者发送消息 publisher.sendMsg( string.format("%05d %d %d", zipcode, temperature, relhumidity) ) } publisher.close() context.term()
Sub端:
import zeromq; io.open() start = function(filter,tcp){ var context = zeromq.context() //用于和服务端通信的套接字 io.print("Collecting updates from weather server...") var subscriber = context.zmq_socket_sub() subscriber.connect( tcp : "tcp://localhost:5556" ) //设置订阅信息,默认为纽约,邮编10001 filter := {byte value[] = "10001 "} subscriber.setsockopt(6/*_ZMQ_SUBSCRIBE*/,filter) // var ret = subscriber.setsockopt(0x11/*_ZMQ_LINGER*/,{int value = 0}) io.print(ret); //处理100条更新信息 var updateNum = 100; var totalTemp = 0; for(i=1;updateNum){ var msg = zeromq.message() var ret = subscriber.recvMsg(msg,1/*_ZMQ_NOBLOCK*/); if(ret){ var str = ..raw.tostring( msg.getData(),1,msg.getSize()) msg.close(); var zipcode, temperature, relhumidity = string.match(str,"(\d*) ([\d-]*) (\d*)") io.print(i,temperature) totalTemp += temperature; } sleep(1000)//加点延迟 } io.print(string.format("Average temperature for zipcode '%s' was %dF, total = %d", filter.value, (totalTemp / updateNum), totalTemp)) subscriber.close(); context.term(); } start()
以下是一个监听AMI socket消息并广播的Pub端实例代码:
import win.ui; import zeromq; import thread.command; /*DSG{{*/ mainForm = ..win.form(cls="ChannelMon";text="ChannelMon";right=0;bottom=0;border="none";exmode="none";max=false;min=false;mode="popup";sysmenu=false;title=false) mainForm.add() /*}}*/ //准备上下文和PUB套接字 var context = zeromq.context() var publisher = context.zmq_socket_pub() publisher.bind("tcp://*:5556") var listener = thread.command(); listener.amierror = function(msg){ mainForm.msgbox("致命错误:"++msg) } listener.callin = function(obj){ msg = string.concat(obj.exten," ",obj.phone," ",obj.channel," ",obj.uniqid); //console.log(msg); publisher.sendMsg(msg); } //AMI桥接通道监控 listener.invoke( function(hwnd){ import thread.command; import wsock.tcp.client; //开启AMI连接 var tcp = wsock.tcp.client(); tcp.connect("192.168.1.2",5038); //传递句柄 thread.command.post("posthandle",tcp._handle) //登陆 tcp.write('Action:login\r\nUsername:user\r\nSecret:passwd\r\n\r\n') //等待回复 tcp.recv() //监听消息 var ok = true while(ok = true){ var pkg = tcp.recv() if(pkg!=null){ var msg = string.split(pkg,'<\r\n\r\n>') //按双回车分割消息单元 for k,v in msg { var line = string.split(v,'<\r\n>') //按回车分割消息行 if(table.range(line)>0){ var msgobj = {}; for m,n in line { var part = string.split(n,":") //按冒号分割key和value if(part[1]!=null && part[2]!=null){ msgobj[part[1]] = tostring(string.trimleft(part[2],space)) //msgobj[part[1]] = string.trimleft(part[2],space) } //if part } //for line //msgobj生成完毕 if(msgobj["Event"]="Bridge" and msgobj["Bridgestate"]="Link" and msgobj["Channel1"]!=null){ //通道桥接 if(string.find(msgobj["Channel1"],"DAHDI/.*")){ var obj = {}; obj.exten = msgobj["CallerID2"]; obj.phone = msgobj["CallerID1"]; obj.channel = msgobj["Channel1"]; obj.uniqid = msgobj["Uniqueid2"]; thread.command.post("callin",obj); } } } //if line }// for msg }else{ ok = false } //if pkg } //while tcp.close(); thread.command.post("amierror","到呼叫系统的AMI连接已断开!") return } ); mainForm.onClose = function(hwnd,message,wParam,lParam){ publisher.close() } win.loopMessage(); context.term()
65 queries in 2.054 seconds |