Pub端(服务器):
import zeromq;
//准备上下文和PUB套接字
var context = zeromq.context()
var publisher = context.zmq_socket_pub()
publisher.bind("tcp://*:5556")
//publisher.bind("ipc://weather.ipc")
//初始化随机数生成器
math.randomize(time.tick())
while(1){
//获取数据,忽悠下老板
var zipcode = math.random(10000, 11000)
var temperature = math.random(-80, 135)
var relhumidity = math.random(10, 60)
//向所有订阅者发送消息
publisher.sendMsg( string.format("%05d %d %d", zipcode, temperature, relhumidity) )
}
publisher.close()
context.term()
Sub端:
import zeromq;
io.open()
start = function(filter,tcp){
var context = zeromq.context()
//用于和服务端通信的套接字
io.print("Collecting updates from weather server...")
var subscriber = context.zmq_socket_sub()
subscriber.connect( tcp : "tcp://localhost:5556" )
//设置订阅信息,默认为纽约,邮编10001
filter := {byte value[] = "10001 "}
subscriber.setsockopt(6/*_ZMQ_SUBSCRIBE*/,filter)
// var ret = subscriber.setsockopt(0x11/*_ZMQ_LINGER*/,{int value = 0})
io.print(ret);
//处理100条更新信息
var updateNum = 100;
var totalTemp = 0;
for(i=1;updateNum){
var msg = zeromq.message()
var ret = subscriber.recvMsg(msg,1/*_ZMQ_NOBLOCK*/);
if(ret){
var str = ..raw.tostring( msg.getData(),1,msg.getSize())
msg.close();
var zipcode, temperature, relhumidity = string.match(str,"(\d*) ([\d-]*) (\d*)")
io.print(i,temperature)
totalTemp += temperature;
}
sleep(1000)//加点延迟
}
io.print(string.format("Average temperature for zipcode '%s' was %dF, total = %d",
filter.value, (totalTemp / updateNum), totalTemp))
subscriber.close();
context.term();
}
start()
以下是一个监听AMI socket消息并广播的Pub端实例代码:
import win.ui;
import zeromq;
import thread.command;
/*DSG{{*/
mainForm = ..win.form(cls="ChannelMon";text="ChannelMon";right=0;bottom=0;border="none";exmode="none";max=false;min=false;mode="popup";sysmenu=false;title=false)
mainForm.add()
/*}}*/
//准备上下文和PUB套接字
var context = zeromq.context()
var publisher = context.zmq_socket_pub()
publisher.bind("tcp://*:5556")
var listener = thread.command();
listener.amierror = function(msg){
mainForm.msgbox("致命错误:"++msg)
}
listener.callin = function(obj){
msg = string.concat(obj.exten," ",obj.phone," ",obj.channel," ",obj.uniqid);
//console.log(msg);
publisher.sendMsg(msg);
}
//AMI桥接通道监控
listener.invoke(
function(hwnd){
import thread.command;
import wsock.tcp.client;
//开启AMI连接
var tcp = wsock.tcp.client();
tcp.connect("192.168.1.2",5038);
//传递句柄
thread.command.post("posthandle",tcp._handle)
//登陆
tcp.write('Action:login\r\nUsername:user\r\nSecret:passwd\r\n\r\n')
//等待回复
tcp.recv()
//监听消息
var ok = true
while(ok = true){
var pkg = tcp.recv()
if(pkg!=null){
var msg = string.split(pkg,'<\r\n\r\n>') //按双回车分割消息单元
for k,v in msg {
var line = string.split(v,'<\r\n>') //按回车分割消息行
if(table.range(line)>0){
var msgobj = {};
for m,n in line {
var part = string.split(n,":") //按冒号分割key和value
if(part[1]!=null && part[2]!=null){
msgobj[part[1]] = tostring(string.trimleft(part[2],space))
//msgobj[part[1]] = string.trimleft(part[2],space)
} //if part
} //for line
//msgobj生成完毕
if(msgobj["Event"]="Bridge" and msgobj["Bridgestate"]="Link" and msgobj["Channel1"]!=null){ //通道桥接
if(string.find(msgobj["Channel1"],"DAHDI/.*")){
var obj = {};
obj.exten = msgobj["CallerID2"];
obj.phone = msgobj["CallerID1"];
obj.channel = msgobj["Channel1"];
obj.uniqid = msgobj["Uniqueid2"];
thread.command.post("callin",obj);
}
}
} //if line
}// for msg
}else{
ok = false
} //if pkg
} //while
tcp.close();
thread.command.post("amierror","到呼叫系统的AMI连接已断开!")
return
}
);
mainForm.onClose = function(hwnd,message,wParam,lParam){
publisher.close()
}
win.loopMessage();
context.term()
63 queries in 1.955 seconds |