ServerConnectionHandler.java
/*
* Copyright © 2011 Nokia Corporation. All rights reserved.
* Nokia and Nokia Connecting People are registered trademarks of Nokia Corporation.
* Oracle and Java are trademarks or registered trademarks of Oracle and/or its
* affiliates. Other product and company names mentioned herein may be trademarks
* or trade names of their respective owners.
* See LICENSE.TXT for license information.
*/
package com.nokia.example.btsppecho.server;
import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Hashtable;
import java.util.Enumeration;
import javax.bluetooth.ServiceRecord;
import javax.microedition.io.Connector;
import javax.microedition.io.StreamConnection;
import com.nokia.example.btsppecho.LogScreen;
public class ServerConnectionHandler
implements Runnable {
private final static byte ZERO = (byte) '0';
private final static int LENGTH_MAX_DIGITS = 5;
// this is an arbitrarily chosen value:
private final static int MAX_MESSAGE_LENGTH =
65536 - LENGTH_MAX_DIGITS;
private final ServiceRecord serviceRecord;
private final int requiredSecurity;
private final ServerConnectionHandlerListener listener;
private final Hashtable sendMessages = new Hashtable();
private StreamConnection connection;
private OutputStream out;
private InputStream in;
private volatile boolean aborting;
private Writer writer;
public ServerConnectionHandler(
ServerConnectionHandlerListener listener,
ServiceRecord serviceRecord,
int requiredSecurity) {
this.listener = listener;
this.serviceRecord = serviceRecord;
this.requiredSecurity = requiredSecurity;
aborting = false;
connection = null;
out = null;
in = null;
listener = null;
// the caller must call method 'start'
// to start the reader and writer
}
public ServiceRecord getServiceRecord() {
return serviceRecord;
}
public synchronized void start() {
Thread thread = new Thread(this);
thread.start();
}
public void close() {
if (!aborting) {
synchronized (this) {
aborting = true;
}
synchronized (sendMessages) {
sendMessages.notify();
}
if (out != null) {
try {
out.close();
synchronized (this) {
out = null;
}
} catch (IOException e) {
// there is nothing we can do: ignore it
}
}
if (in != null) {
try {
in.close();
synchronized (this) {
in = null;
}
} catch (IOException e) {
// there is nothing we can do: ignore it
}
}
if (connection != null) {
try {
connection.close();
synchronized (this) {
connection = null;
}
} catch (IOException e) {
// there is nothing we can do: ignore it
}
}
}
}
public void queueMessageForSending(Integer id, byte[] data) {
if (data.length > MAX_MESSAGE_LENGTH) {
throw new IllegalArgumentException(
"Message too long: limit is "
+ MAX_MESSAGE_LENGTH + " bytes");
}
synchronized (sendMessages) {
sendMessages.put(id, data);
sendMessages.notify();
}
}
public void run() {
// the reader
// 1. open the connection and streams, start the writer
String url = null;
try {
// 'must be master': false
url = serviceRecord.getConnectionURL(
requiredSecurity,
false);
connection = (StreamConnection) Connector.open(url);
in = connection.openInputStream();
out = connection.openOutputStream();
LogScreen.log("Opened connection & streams to: '"
+ url + "'\n");
// start the writer
this.writer = new Writer(this);
Thread writeThread = new Thread(writer);
writeThread.start();
LogScreen.log("Started a reader & writer for: '"
+ url + "'\n");
// open succeeded, inform listener
listener.handleOpen(this);
} catch (IOException e) {
// open failed, close any connections/streams, and
// inform listener that the open failed
LogScreen.log("Failed to open "
+ "connection or streams for '"
+ url + "' , Error: "
+ e.getMessage());
close();
listener.handleOpenError(
this,
"IOException: '" + e.getMessage() + "'");
return;
} catch (SecurityException e) {
// open failed, close any connections/streams, and
// inform listener that the open failed
LogScreen.log("Failed to open "
+ "connection or streams for '"
+ url + "' , Error: "
+ e.getMessage());
close();
listener.handleOpenError(
this,
"SecurityException: '" + e.getMessage() + "'");
return;
}
// 2. wait to receive and read messages
while (!aborting) {
int length = 0;
try {
byte[] lengthBuf = new byte[LENGTH_MAX_DIGITS];
readFully(in, lengthBuf);
length = readLength(lengthBuf);
byte[] temp = new byte[length];
readFully(in, temp);
listener.handleReceivedMessage(this, temp);
} catch (IOException e) {
close();
if (length == 0) {
listener.handleClose(this);
} else {
// we were in the middle of reading...
listener.handleErrorClose(this, e.getMessage());
}
}
}
}
private static void readFully(InputStream in, byte[] buffer)
throws IOException {
int bytesRead = 0;
while (bytesRead < buffer.length) {
int count = in.read(buffer,
bytesRead,
buffer.length - bytesRead);
if (count == -1) {
throw new IOException("Input stream closed");
}
bytesRead += count;
}
}
private static int readLength(byte[] buffer) {
int value = 0;
for (int i = 0; i < LENGTH_MAX_DIGITS; ++i) {
value *= 10;
value += buffer[i] - ZERO;
}
return value;
}
private void sendMessage(OutputStream out, byte[] data)
throws IOException {
if (data.length > MAX_MESSAGE_LENGTH) {
throw new IllegalArgumentException(
"Message too long: limit is: "
+ MAX_MESSAGE_LENGTH + " bytes");
}
byte[] buf = new byte[LENGTH_MAX_DIGITS + data.length];
writeLength(data.length, buf);
System.arraycopy(data,
0,
buf,
LENGTH_MAX_DIGITS,
data.length);
out.write(buf);
out.flush();
}
private static void writeLength(int value, byte[] buffer) {
for (int i = LENGTH_MAX_DIGITS - 1; i >= 0; --i) {
buffer[i] = (byte) (ZERO + value % 10);
value = value / 10;
}
}
private class Writer
implements Runnable {
private final ServerConnectionHandler handler;
Writer(ServerConnectionHandler handler) {
this.handler = handler;
}
public void run() {
while (!aborting) {
synchronized (sendMessages) {
Enumeration e = sendMessages.keys();
if (e.hasMoreElements()) {
// send any pending messages
Integer id = (Integer) e.nextElement();
byte[] sendData =
(byte[]) sendMessages.get(id);
try {
sendMessage(out, sendData);
// remove sent message from queue
sendMessages.remove(id);
// inform listener that it was sent
listener.handleQueuedMessageWasSent(
handler,
id);
} catch (IOException ex) {
close(); // stop the networking thread
// inform that we got an error close
listener.handleErrorClose(handler,
ex.getMessage());
}
}
if (sendMessages.isEmpty()) {
try {
sendMessages.wait();
} catch (InterruptedException ex) {
// this can't happen in MIDP: ignore it
}
}
}
}
}
}
}