OpenECHO
 All Classes Namespaces Files Functions Variables
EchoSocket.java
Go to the documentation of this file.
1 /*
2  * Copyright 2012 Sony Computer Science Laboratories, Inc. <info@kadecot.net>
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.sonycsl.echo;
17 
18 import java.io.DataInputStream;
19 import java.io.DataOutputStream;
20 import java.io.IOException;
21 import java.net.DatagramPacket;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.MulticastSocket;
25 import java.net.ServerSocket;
26 import java.net.Socket;
27 import java.util.ArrayList;
28 import java.util.Enumeration;
29 import java.util.HashMap;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Queue;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.TimeUnit;
37 
42 
43 
44 
45 public final class EchoSocket {
46  @SuppressWarnings("unused")
47  private static final String TAG = EchoSocket.class.getSimpleName();
48 
49  public static int UDP_MAX_PACKET_SIZE = 65507;
50  public static int TCP_MAX_PACKET_SIZE = 65507;
51 
52  public static final String SELF_ADDRESS = "127.0.0.1";
53  public static final String MULTICAST_ADDRESS = "224.0.23.0";
54  private static final int PORT = 3610;
55 
56  private static final Queue<EchoFrame> mSelfFrameQueue = new LinkedList<EchoFrame>();
57  private static MulticastSocket sMulticastSocket;
58  private static InetAddress sMulticastAddress;
59  //private static ExecutorService sExecutors = Executors.newSingleThreadExecutor();
60  private static Thread sRecvThread;
61 
62  // for TCP.
63  private static ServerSocket sServerSocket;
64  private static ExecutorService sConnectedTCPSocketThreads;
65  // may be connected from same source many times.
66  private static HashMap<String, ArrayList<Socket>> sTCPSockets;
67 
68 
69  private static short sNextTID = 0;
70 
71  private EchoSocket() {
72  }
73  private static class Receiver implements Runnable {
74  @Override
75  public void run() {
76  while(!Thread.currentThread().isInterrupted()) {
77  try {
78  receiveUDP();
79  receiveTCP();
80  } catch (IOException e) {
81  // TODO Auto-generated catch block
82  //e.printStackTrace();
83  }
84  receiveFrameFromSelfNode();
85  try {
86  Thread.sleep(10);
87  } catch (InterruptedException e) {
88  // TODO Auto-generated catch block
89  e.printStackTrace();
90  break;
91  }
92  }
93  }
94  }
95 
96  public static void openSocket() throws IOException {
97 
98  sMulticastAddress = InetAddress.getByName(MULTICAST_ADDRESS);
99  sMulticastSocket = new MulticastSocket(PORT);
100 
101  //stopReceiverThread();
102  //sExecutors = Executors.newSingleThreadExecutor();
103 
104 
105  //int ttl = 1;
106  //sMulticastSocket.setLoopbackMode(true);
107  //sMulticastSocket.setTimeToLive(ttl);
108  sMulticastSocket.setNetworkInterface(EchoUtils.getNetworkInterface());
109  //sMulticastSocket.setInterface(EchoUtils.getLocalIpAddress());
110  sMulticastSocket.joinGroup(sMulticastAddress);
111  sMulticastSocket.setLoopbackMode(true);
112  sMulticastSocket.setSoTimeout(10);
113 
114 
115  sTCPSockets = new HashMap<String, ArrayList<Socket>>();
116 
117  sServerSocket = new ServerSocket();
118  sServerSocket.setSoTimeout(10);
119  sServerSocket.setReuseAddress(true);
120  sServerSocket.bind(new InetSocketAddress(PORT));
121  sConnectedTCPSocketThreads = Executors.newCachedThreadPool();
122 
123  }
124 
125  public static void closeSocket() throws IOException {
126  if(sMulticastSocket != null){
127  MulticastSocket s = sMulticastSocket;
128  sMulticastSocket = null;
129  s.leaveGroup(sMulticastAddress);
130  s.close();
131  }
132  if(sServerSocket != null){
133  ServerSocket s = sServerSocket;
134  sServerSocket = null;
135  s.close();
136  }
137  if(sConnectedTCPSocketThreads != null){
138  sConnectedTCPSocketThreads.shutdownNow();
139  sConnectedTCPSocketThreads = null;
140  }
141  if(sTCPSockets != null){
142  for(Map.Entry<String,ArrayList<Socket>> entry : sTCPSockets.entrySet()){
143  for(Socket s : entry.getValue()){
144  s.close();
145  s = null;
146  }
147  }
148  sTCPSockets = null;
149  }
150  // if we have no socket,there is no need to receive.
151  //stopReceiverThread();
152  }
153 
154  private static void sendFrameToSelfNode(EchoFrame frame) {
155  mSelfFrameQueue.offer(frame);
156  }
157  private static void receiveFrameFromSelfNode() {
158  EchoFrame frame = mSelfFrameQueue.poll();
159  if(frame != null) {
160  onReceiveUDPFrame(frame);
161  }
162  }
163 
164  private static void onReceiveUDPFrame(EchoFrame frame) {
165  checkNewObjectInResponse(frame.copy());
166  Echo.getEventListener().receiveEvent(frame);
167 
168  switch(frame.getESV()) {
169  case EchoFrame.ESV_SETI_SNA:
170  case EchoFrame.ESV_SET_RES: case EchoFrame.ESV_SETC_SNA:
171  case EchoFrame.ESV_GET_RES: case EchoFrame.ESV_GET_SNA:
172  case EchoFrame.ESV_INF: case EchoFrame.ESV_INF_SNA:
173  case EchoFrame.ESV_INFC_RES:
174  // not request
175  onReceiveNotRequest(frame);
176  break;
177  case EchoFrame.ESV_INFC:
178  onReceiveNotRequest(frame);
179  case EchoFrame.ESV_SETI: case EchoFrame.ESV_SETC:
180  case EchoFrame.ESV_GET:
181  case EchoFrame.ESV_INF_REQ:
182  case EchoFrame.ESV_SET_GET:
183  // request
184  EchoNode selfNode = Echo.getSelfNode();
185  if(selfNode == null) {
186  return;
187  }
188  if(frame.getDstEchoInstanceCode() == 0) {
189  if(frame.getDstEchoClassCode() == NodeProfile.ECHO_CLASS_CODE) {
190  EchoObject deoj = selfNode.getNodeProfile();
191  onReceiveUDPRequestFrame(deoj, frame);
192  } else {
193  DeviceObject[] deojList = selfNode.getDevices(frame.getDstEchoClassCode());
194  for(DeviceObject deoj : deojList) {
195  onReceiveUDPRequestFrame(deoj, frame);
196  }
197  }
198  } else {
199  EchoObject deoj = selfNode.getInstance(frame.getDstEchoClassCode(), frame.getDstEchoInstanceCode());
200  if(deoj == null) {return;}
201  onReceiveUDPRequestFrame(deoj, frame);
202  }
203  break;
204  }
205  }
206 
207  private static void onReceiveUDPRequestFrame(EchoObject deoj, EchoFrame frame){
208  //checkNewObjectInResponse(frame.copy());
209  EchoFrame request = frame.copy();
210  request.setDstEchoInstanceCode(deoj.getInstanceCode());
211  EchoFrame response = deoj.onReceiveRequest(request);
212 
213  if(response.getESV() == EchoFrame.ESV_INF) {
214  response.setDstEchoAddress(MULTICAST_ADDRESS);
215  }
216  if(response.getESV() == EchoFrame.ESV_SET_NO_RES) {
217  return;
218  }
219  try {
220  sendUDPFrame(response);
221  } catch (IOException e) {
222  e.printStackTrace();
223  }
224  }
225 
226  private static void onReceiveNotRequest(EchoFrame frame) {
227  EchoNode node = Echo.getNode(frame.getSrcEchoAddress());
228  EchoObject seoj = node.getInstance(frame.getSrcEchoClassCode(),
229  frame.getSrcEchoInstanceCode());
230 
231  if(seoj == null) {return;}
232  seoj.setNode(node);
233 
234  // receiver
235  EchoObject.Receiver receiver = seoj.getReceiver();
236  if(receiver != null) {
237  receiver.onReceive(seoj, frame);
238  }
239  }
240 
241 
242  private static void onReceiveTCPFrame(EchoFrame frame, Socket socket) {
243  checkNewObjectInResponse(frame.copy());
244  Echo.getEventListener().receiveEvent(frame);
245  switch(frame.getESV()) {
246  case EchoFrame.ESV_SETI_SNA:
247  case EchoFrame.ESV_SET_RES: case EchoFrame.ESV_SETC_SNA:
248  case EchoFrame.ESV_GET_RES: case EchoFrame.ESV_GET_SNA:
249  case EchoFrame.ESV_INF: case EchoFrame.ESV_INF_SNA:
250  case EchoFrame.ESV_INFC_RES:
251  // not request
252  onReceiveNotRequest(frame);
253  break;
254  case EchoFrame.ESV_INFC:
255  onReceiveNotRequest(frame);
256  case EchoFrame.ESV_SETI: case EchoFrame.ESV_SETC:
257  case EchoFrame.ESV_GET:
258  case EchoFrame.ESV_INF_REQ:
259  case EchoFrame.ESV_SET_GET:
260  // request
261  EchoNode selfNode = Echo.getSelfNode();
262  if(selfNode == null) {
263  return;
264  }
265  if(frame.getDstEchoInstanceCode() == 0) {
266  if(frame.getDstEchoClassCode() == NodeProfile.ECHO_CLASS_CODE) {
267  EchoObject deoj = selfNode.getNodeProfile();
268  onReceiveTCPRequestFrame(deoj, frame, socket);
269  } else {
270  DeviceObject[] deojList = selfNode.getDevices(frame.getDstEchoClassCode());
271  for(DeviceObject deoj : deojList) {
272  onReceiveTCPRequestFrame(deoj, frame, socket);
273  }
274  }
275  } else {
276  EchoObject deoj = selfNode.getInstance(frame.getDstEchoClassCode(), frame.getDstEchoInstanceCode());
277  if(deoj == null) {return;}
278  onReceiveTCPRequestFrame(deoj, frame, socket);
279  }
280  break;
281  }
282  }
283 
284  private static void onReceiveTCPRequestFrame(EchoObject deoj, EchoFrame frame, Socket socket){
285  EchoFrame request = frame.copy();
286  request.setDstEchoInstanceCode(deoj.getInstanceCode());
287  EchoFrame response = deoj.onReceiveRequest(request);
288 
289  if(response.getESV() == EchoFrame.ESV_INF) {
290  response.setDstEchoAddress(MULTICAST_ADDRESS);
291  }
292  if(response.getESV() == EchoFrame.ESV_SET_NO_RES) {
293  return;
294  }
295  try {
296  sendTCPFrame(response, socket);
297  } catch (IOException e) {
298  e.printStackTrace();
299  }
300  }
301 
302 
303  private static void checkNewObjectInResponse(EchoFrame frame) {
304  EchoNode node = Echo.getNode(frame.getSrcEchoAddress());
305  boolean flagNewNode = false;
306  if(node == null) {
307  node = Echo.addOtherNode(frame.getSrcEchoAddress());
308  flagNewNode = true;
309  if(node == null) {return;}
310 
311  node.getNodeProfile().setNode(node);
312  }
313 
314  if(frame.getSrcEchoClassCode() == NodeProfile.ECHO_CLASS_CODE
315  && frame.getSrcEchoInstanceCode() == NodeProfile.INSTANCE_CODE_TRANSMISSION_ONLY) {
316  //node.get()->getNodeProfile().get()->setInstanceCode(NodeProfile::INSTANCE_CODE_TRANSMISSION_ONLY);
317  NodeProfile profile = node.getNodeProfile();
318  NodeProfile.Proxy proxy = (NodeProfile.Proxy)profile;
319  proxy.setInstanceCode(NodeProfile.INSTANCE_CODE_TRANSMISSION_ONLY);
320  }
321 
322  boolean flagNewDevice = false;
323  EchoObject seoj = node.getInstance(frame.getSrcEchoClassCode(), frame.getSrcEchoInstanceCode());
324  if(seoj == null) {
325  // generate
326  // device
327 
328  seoj = node.addOtherDevice(frame.getSrcEchoClassCode(), frame.getSrcEchoInstanceCode());
329  flagNewDevice = true;
330 
331  if(seoj != null) {seoj.setNode(node);}
332 
333  //seoj = node.get()->getInstnace(frame.getSrcEchoClassCode(), frame.getSrcEchoInstanceCode());
334  }
335  if(seoj == null) {
336  if(flagNewNode) {
337  //Echo.getEventListener().onNewNode(node);
338  node.onNew();
339  }
340  //Echo.getEventListener().onFoundNode(node);
341  node.onFound();
342  return;
343  }
344  if(seoj.getEchoClassCode() == NodeProfile.ECHO_CLASS_CODE
345  && (seoj.getInstanceCode() == NodeProfile.INSTANCE_CODE
346  || seoj.getInstanceCode() == NodeProfile.INSTANCE_CODE_TRANSMISSION_ONLY)
347  && (frame.getESV() == EchoFrame.ESV_GET_RES
348  || frame.getESV() == EchoFrame.ESV_GET_SNA
349  || frame.getESV() == EchoFrame.ESV_INF
350  || frame.getESV() == EchoFrame.ESV_INF_SNA
351  || frame.getESV() == EchoFrame.ESV_INFC)) {
352  // seoj is NodeProfile
353  List<EchoObject> foundDevices = new ArrayList<EchoObject>();
354  List<Boolean> flagNewDevices = new ArrayList<Boolean>();
355 
356  for(EchoProperty p : frame.getPropertyList()) {
357  if(p.epc != NodeProfile.EPC_INSTANCE_LIST_NOTIFICATION
358  && p.epc != NodeProfile.EPC_SELF_NODE_INSTANCE_LIST_S) {continue;}
359  if(p.pdc == 0) {continue;}
360  int deviceListSize = (int)p.edt[0];
361  if(deviceListSize > 84) {
362  deviceListSize = 84;
363  }
364  for(int d = 0, i = 1; d < deviceListSize; d++) {
365  if(i == p.pdc) break;
366  short echoClassCode = (short)(((p.edt[i]) & 0xFF) << 8);
367  i += 1;
368  if(i == p.pdc) break;
369  echoClassCode += p.edt[i] & 0xFF;
370  i += 1;
371  if(i == p.pdc) break;
372  byte echoInstanceCode = p.edt[i];
373  i += 1;
374  if(node.containsDevice(echoClassCode, echoInstanceCode)) {
375  flagNewDevices.add(false);
376  foundDevices.add(node.getInstance(echoClassCode, echoInstanceCode));
377  } else {
378  // new
379  flagNewDevices.add(true);
380  EchoObject eoj = node.addOtherDevice(echoClassCode, echoInstanceCode);
381  foundDevices.add(eoj);
382  if(eoj != null) {eoj.setNode(node);}
383  }
384  }
385  }
386 
387  if(flagNewNode) {
388  //Echo.getEventListener().onNewNode(node);
389  node.onNew();
390  }
391  //Echo.getEventListener().onFoundNode(node);
392  node.onFound();
393  if(flagNewDevice) {
394  //Echo.getEventListener().onNewEchoObject(seoj);
395  seoj.onNew();
396  }
397  //Echo.getEventListener().onFoundEchoObject(seoj);
398  seoj.onFound();
399  int foundDeviceListSize = foundDevices.size();
400  for(int i = 0; i < foundDeviceListSize; i++) {
401  if(flagNewDevices.get(i)) {
402  //Echo.getEventListener().onNewEchoObject(foundDevices.get(i));
403  foundDevices.get(i).onNew();
404  }
405  //Echo.getEventListener().onFoundEchoObject(foundDevices.get(i));
406  foundDevices.get(i).onFound();
407  }
408  } else {
409  // seoj is DeviceObject
410  if(flagNewNode) {
411  //Echo.getEventListener().onNewNode(node);
412  node.onNew();
413  }
414  //Echo.getEventListener().onFoundNode(node);
415  node.onFound();
416  if(flagNewDevice) {
417  //Echo.getEventListener().onNewEchoObject(seoj);
418  seoj.onNew();
419  }
420  //Echo.getEventListener().onFoundEchoObject(seoj);
421  seoj.onFound();
422  return;
423  }
424  }
425 
426  public static void sendUDPFrame(EchoFrame frame) throws IOException {
427  Echo.getEventListener().sendEvent(frame);
428 
429  if(frame.getDstEchoAddress().equals(SELF_ADDRESS)) {
430  sendFrameToSelfNode(frame.copy());
431  return;
432  }
433  byte[] data = frame.getFrameByteArray();
434 
435  InetAddress address = InetAddress.getByName(frame.getDstEchoAddress());
436  DatagramPacket packet = new DatagramPacket(data, data.length,
437  address, PORT);
438  sMulticastSocket.send(packet);
439  if(frame.getDstEchoAddress().equals(MULTICAST_ADDRESS)) {
440  EchoFrame f = frame.copy();
441  f.setDstEchoAddress(SELF_ADDRESS);
442  sendFrameToSelfNode(f);
443  }
444  }
445 
446  public static void sendTCPFrame(EchoFrame frame) throws IOException {
447  Echo.getEventListener().sendEvent(frame);
448  // will not occur?
449  if(frame.getDstEchoAddress().equals(SELF_ADDRESS)){
450  sendFrameToSelfNode(frame.copy());
451  return;
452  }
453  InetAddress address = InetAddress.getByName(frame.getDstEchoAddress());
454 
455  if(sTCPSockets.containsKey(frame.getDstEchoAddress())) {
456  ArrayList<Socket> list = sTCPSockets.get(frame.getDstEchoAddress());
457  // 既存のsocketを新しいものから試す.
458  for(int i = list.size() - 1; i >= 0; --i) {
459  Socket sock = list.get(i);
460  try {
461  //System.err.println("Reuse " + sock.getInetAddress() + " [" + i + "]");
462  sendTCPFrame(frame, sock);
463  return;
464  } catch(IOException e) {
465  closeTCPSocket(sock);
466  continue;
467  }
468  }
469  }
470 
471  // 既存のsocketが使えない場合
472  Socket sock = new Socket(address,PORT);
473  //System.err.println("Socket add" + sock.getInetAddress());
474 
475  sendTCPFrame(frame, sock);
476  if(sTCPSockets.containsKey(address.getHostAddress())) {
477  sTCPSockets.get(address.getHostAddress()).add(sock);
478  } else {
479  ArrayList<Socket> list = new ArrayList<Socket>();
480  list.add(sock);
481  sTCPSockets.put(address.getHostAddress(), list);
482  }
483  // at first,read. 要求電文に対する応答電文は同一のコネクションで送信するものとする。
484  sConnectedTCPSocketThreads.execute(new TCPSocketThread(sock));
485  }
486 
487  public static void sendTCPFrame(EchoFrame frame, Socket socket) throws IOException {
488  byte[] data = frame.getFrameByteArray();
489  DataOutputStream out = new DataOutputStream(socket.getOutputStream());
490  out.write(data);
491  }
492 
493  public static void receiveUDP() throws IOException {
494  DatagramPacket packet =
495  new DatagramPacket(
498  // closed?
499  if(sMulticastSocket == null){
500  //System.err.println("sMulticastSocket has been closed.");
501  return;
502  }
503  sMulticastSocket.receive(packet);
504  Enumeration<InetAddress> enumIpAddr = sMulticastSocket.getNetworkInterface().getInetAddresses();
505  while(enumIpAddr.hasMoreElements()) {
506  InetAddress inetAddress = enumIpAddr.nextElement();
507  if (inetAddress.equals(packet.getAddress())) {
508  return;
509  }
510  }
511  byte[] data = new byte[packet.getLength()];
512  System.arraycopy(packet.getData(), 0, data, 0, packet.getLength());
513 
514  if(data.length < EchoFrame.MIN_FRAME_SIZE) {
515  return;
516  }
517  InetAddress address = packet.getAddress();
518  String srcEchoAddress = address.getHostAddress();
519  EchoFrame frame = new EchoFrame(srcEchoAddress, data);
520  onReceiveUDPFrame(frame);
521  }
522 
523  public static void receiveTCP() throws IOException {
524  // has been closed?
525  if(sServerSocket == null){
526  //System.err.println("TCP server socket has been closed.");
527  return;
528  }
529  Socket sock = sServerSocket.accept();
530  String address = sock.getInetAddress().getHostAddress();
531  if(sTCPSockets.containsKey(address)) {
532  sTCPSockets.get(address).add(sock);
533  } else {
534  ArrayList<Socket> list = new ArrayList<Socket>();
535  list.add(sock);
536  sTCPSockets.put(address, list);
537  }
538  System.err.println("Socket add" + sock.getInetAddress() + "(income)");
539 
540  sConnectedTCPSocketThreads.execute(new TCPSocketThread(sock));
541  }
542 
543  public static void startReceiverThread() {
544  //stopReceiverThread();
545  //sExecutors.execute(new Receiver());
546  if(sRecvThread == null){
547  sRecvThread = new Thread(new Receiver());
548  sRecvThread.start();
549  }else{
550  //System.err.println("There is already receiver thread.");
551  }
552  }
553 
554  public static void stopReceiverThread() {
555  //sExecutors.shutdown();
556  //sExecutors.shutdownNow();
557  if(sRecvThread != null){
558  sRecvThread.interrupt();
559  sRecvThread = null;
560  }
561  }
562 
563  public static void resumeReceiverThread() {
564 
565  }
566 
567  public static void pauseReceiverThread() {
568 
569  }
570  public static synchronized short nextTID() {
571  short ret = sNextTID;
572  sNextTID += 1;
573  //Echo::getStorage().get()->setNextTID(sNextTID);
574  return ret;
575  }
576 
577  public static short getNextTIDNoIncrement() {
578  return sNextTID;
579  }
580 
581  public static void closeTCPSocket(Socket socket) {
582  ArrayList<Socket> list = sTCPSockets.get(socket.getInetAddress().getHostAddress());
583  list.remove(socket);
584  try {
585  socket.close();
586  } catch (IOException e) {
587  // TODO Auto-generated catch block
588  e.printStackTrace();
589  }
590  }
591 
592  private static class TCPSocketThread implements Runnable {
593  private Socket sock;
594  public TCPSocketThread(Socket s) {
595  sock = s;
596  }
597  // first state is recv.
598  @Override
599  public void run() {
600  // Thread.interrupt is called by executor.
601  try {
602  //DataOutputStream out = new DataOutputStream(sock.getOutputStream());
603  DataInputStream in = new DataInputStream(sock.getInputStream());
604  while (!Thread.interrupted() && sock.isConnected()) {
605  String address = sock.getInetAddress().getHostAddress();
606  try {
607  EchoFrame frame = EchoFrame.getEchoFrameFromStream(address, in);
608  if(frame != null) {
609  //System.out.println("TCP: " + frame);
610  onReceiveTCPFrame(frame, sock);
611  }
612  } catch (InterruptedException e) {
613  e.printStackTrace();
614  break;
615  }
616  }
617  } catch (IOException e) {
618  e.printStackTrace();
619  } finally{
620  closeTCPSocket(sock);
621  }
622  }
623  }
624 }
static final String SELF_ADDRESS
Definition: EchoSocket.java:52
static final byte ESV_SET_NO_RES
Definition: EchoFrame.java:59
static void sendTCPFrame(EchoFrame frame)
static void resumeReceiverThread()
static final byte ESV_INFC
Definition: EchoFrame.java:52
static void closeTCPSocket(Socket socket)
static void sendUDPFrame(EchoFrame frame)
static final int MIN_FRAME_SIZE
Definition: EchoFrame.java:39
static short getNextTIDNoIncrement()
static final byte ESV_INF
Definition: EchoFrame.java:51
static synchronized short nextTID()
static void stopReceiverThread()
static void startReceiverThread()
static final byte INSTANCE_CODE_TRANSMISSION_ONLY
static final String MULTICAST_ADDRESS
Definition: EchoSocket.java:53
static void pauseReceiverThread()
static void sendTCPFrame(EchoFrame frame, Socket socket)