文章作者:jqpeng
原文链接: 一起读源码之zookeeper(1) – 启动分析

从本文开始,不定期分析一个开源项目源代码,起篇从大名鼎鼎的zookeeper开始。
为什么是zk,因为用到zk的场景实在太多了,大部分耳熟能详的分布式系统都有zookeeper的影子,比如hbase,storm,dubbo,kafka等等,另外前面提到的RPC框架原理与实现也用到了zookeeper。

目录

  • 1 环境准备
    • 1.1 导入代码
    • 1.2 设置配置文件
    • 1.3 调试配置
  • 2 启动分析
    • 2.1 QuorumPeerMain
    • 2.2 ZooKeeperServerMain
    • 2.3 ServerCnxnFactory
    • 2.4 ZooKeeperServer
    • 2.5 服务启动
      • 2.5.1 配置cnxnFactory
      • 2.5.2 启动cnxnFactory
        • socket处理线程
        • socket网络请求处理
        • 读取连接请求
        • 创建session
      • 2.5.3 zk服务器启动
        • SessionTracker
      • 2.5.4 ZooKeeperServer请求处理器链介绍
        • RequestProcessor
        • PrepRequestProcessor
        • SyncRequestProcessor
        • FinalRequestProcessor

1 环境准备

首先,下载zk的新版本,最新的稳定版是3.4.10,由于已下载3.4.9.先直接使用。

1.1 导入代码

IDEA直接打开zk目录:
enter description here

项目设置为jdk1.7
然后,将src/java下面的main和generated设置为源码目录,同时将lib目录添加为liabary。

1.2 设置配置文件

在conf目录,新建zoo.cfg,拷贝sample.cfg即可

enter description here

1.3 调试配置

查看bin/zkServer

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
....
endlocal

调用的是org.apache.zookeeper.server.quorum.QuorumPeerMain,因此QuorumPeerMain,配置调试程序,arguments设置conf/zoo.cfg

enter description here

这样,就可以愉快的Debug代码了-😃

2 启动分析

2.1 QuorumPeerMain

QuorumPeerMain的main里,调用initializeAndRun

    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task 清理任务
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        // 集群模式
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            // 单机模式
            ZooKeeperServerMain.main(args);
        }
    }

主要执行了:

  • 加载解析配置文件到QuorumPeerConfig
  • 执行清理任务
  • 判断是集群模式还是单机模式,我们的配置文件未配置server,所以是单机模式,执行 ZooKeeperServerMain.main

本文重点分析单机模式下的zk,集群模式暂时不解读

2.2 ZooKeeperServerMain

ZooKeeperServerMain.main调用initializeAndRun

 protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }

        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        } else {
            config.parse(args);
        }

        runFromConfig(config);
    }```

读取配置,然后runFromConfig:

``` java
 public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            final ZooKeeperServer zkServer = new ZooKeeperServer();
            // Registers shutdown handler which will be used to know the
            // server error or shutdown state changes.
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(
                    new ZooKeeperServerShutdownHandler(shutdownLatch));

            // 快照
            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                    config.dataDir));
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            // socket工厂
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);

            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await();
            shutdown();

            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }

几件事情:

  • 创建zkServer,对ZooKeeperServer设置一些配置参数,如tickTime、minSessionTimeout、maxSessionTimeout
  • 创建CountDownLatch,注释里写了,用来watch zk的状态,当zk关闭或者出现内部错误的时候优雅的关闭服务
  • 根据配置参数dataLogDir和dataDir创建FileTxnSnapLog,用来存储zk数据和日志快照
  • 创建cnxnFactory,zk的 socket工厂,负责处理网络请求,zk里有netty和NIO两种实现
  • cnxnFactory.startup(zkServer),启动zk服务器

2.3 ServerCnxnFactory

cnxnFactory负责zk的网络请求,createFactory中,从系统配置中读取ZOOKEEPER_SERVER_CNXN_FACTORY,默认是没有这个配置的,因此默认是使用NIOServerCnxnFactory,基于java的NIO实现,

    static public ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName =
            System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate "
                    + serverCnxnFactoryName);
            ioe.initCause(e);
            throw ioe;
        }
    }

当然,我们可以很容易发现:
enter description here

ServerCnxnFactory还有个NettyServerCnxnFactory实现,基于Netty实现NIO。ServerCnxnFactory里具体负责什么,后面再来看。

2.4 ZooKeeperServer

现在,主角登场,我们来看ZooKeeperServer内部有什么玄妙。
enter description here

ZooKeeperServer是单机模式使用的类,在集群模式下使用的是它的子类。
我们先来看ZooKeeperServer包含哪些内容:

    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    /** value of -1 indicates unset, use default */
    protected int minSessionTimeout = -1;
    /** value of -1 indicates unset, use default */
    protected int maxSessionTimeout = -1;
    protected SessionTracker sessionTracker; //创建和管理session
    private FileTxnSnapLog txnLogFactory = null; //文件快照
    private ZKDatabase zkDb; // ZooKeeper树形数据的模型
    private final AtomicLong hzxid = new AtomicLong(0); //原子增长Long,用于分配事务编号
    public final static Exception ok = new Exception("No prob");
    protected RequestProcessor firstProcessor; // ZooKeeperServer请求处理器链中的第一个处理器
    protected volatile State state = State.INITIAL;

    protected enum State {
        INITIAL, RUNNING, SHUTDOWN, ERROR;
    }

    /**
     * This is the secret that we use to generate passwords, for the moment it
     * is more of a sanity check.
     */
    static final private long superSecret = 0XB3415C00L;

    private final AtomicInteger requestsInProcess = new AtomicInteger(0);
    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
    // this data structure must be accessed under the outstandingChanges lock
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
        new HashMap<String, ChangeRecord>();
    
    private ServerCnxnFactory serverCnxnFactory; //ServerSocket工厂,接受客户端的socket连接

    private final ServerStats serverStats; //server的运行状态统计
    private final ZooKeeperServerListener listener; // ZK运行状态监听
    private ZooKeeperServerShutdownHandler zkShutdownHandler;

2.5 服务启动

前面有点跑偏,继续回归启动过程:

            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);

2.5.1 配置cnxnFactory

进入configure:

    @Override
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        configureSaslLogin();

        // ZK网络请求主线程
        thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
        thread.setDaemon(true);

        maxClientCnxns = maxcc;
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port " + addr);
        ss.socket().bind(addr);
        ss.configureBlocking(false);
        ss.register(selector, SelectionKey.OP_ACCEPT);
    }

几件事情:

  • configureSaslLogin,具体不细看,应该是处理鉴权
  • 初始化ZooKeeperThread,这个ZooKeeperThread的作用是负责处理未处理异常:
public class ZooKeeperThread extends Thread {

    private static final Logger LOG = LoggerFactory
            .getLogger(ZooKeeperThread.class);

    private UncaughtExceptionHandler uncaughtExceptionalHandler = new UncaughtExceptionHandler() {

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            handleException(t.getName(), e);
        }
    };

    public ZooKeeperThread(Runnable thread, String threadName) {
        super(thread, threadName);
        setUncaughtExceptionHandler(uncaughtExceptionalHandler);
    }

    protected void handleException(String thName, Throwable e) {
        LOG.warn("Exception occured from thread {}", thName, e);
    }
}
  • 启动ServerSocketChannel,并绑定配置的addr,并且注册selector(可以搜索NIO了解细节)

2.5.2 启动cnxnFactory

继续分析,进入cnxnFactory.startup(zkServer)

    @Override
    public void startup(ZooKeeperServer zks) throws IOException,
            InterruptedException {
        start();
        setZooKeeperServer(zks);
        zks.startdata();
        zks.startup();
    }

首先,start,判断线程状态,如果未启动则启动线程,注意只会启动一次。

    @Override
    public void start() {
        // ensure thread is started once and only once
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
socket处理线程

启动后,就会执行cnxnFactory.run

    public void run() {
        while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                        selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k
                                .channel()).accept();
                        InetAddress ia = sc.socket().getInetAddress();
                        int cnxncount = getClientCnxnCount(ia);
                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                            LOG.warn("Too many connections from " + ia
                                     + " - max is " + maxClientCnxns );
                            sc.close();
                        } else {
                            LOG.info("Accepted socket connection from "
                                     + sc.socket().getRemoteSocketAddress());
                            sc.configureBlocking(false);
                            SelectionKey sk = sc.register(selector,
                                    SelectionKey.OP_READ);
                            NIOServerCnxn cnxn = createConnection(sc, sk);
                            sk.attach(cnxn);
                            addCnxn(cnxn);
                        }
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                      + k.readyOps());
                        }
                    }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
        }
        closeAll();
        LOG.info("NIOServerCnxn factory exited run method");
    }

这里相当于一个独立线程来处理网络连接,通过selector.select(1000)来获取网络请求,一旦有连接就绪,则开始处理:

  • 首先打乱 Collections.shuffle(selectedList);
  • for循环处理
    • 如果SelectionKey.OP_ACCEPT,代表一个新连接请求,创建SocketChannel,创建NIOServerCnxn,然后addCnxn
    • 如果可读写,则 NIOServerCnxn.doIO(k),执行IO操作
socket网络请求处理

这里简单分析下doIO,摘录部分代码:

void doIO(SelectionKey k) throws InterruptedException {
        try {
            if (isSocketOpen() == false) {
                LOG.warn("trying to do i/o on a null socket for session:0x"
                         + Long.toHexString(sessionId));

                return;
            }
            if (k.isReadable()) {
                // 读取4个字节
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
                // 读满了
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip(); // 复位
                        isPayload = readLength(k); // 读取载荷长度
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
                    if (isPayload) { // not the case for 4letterword
                        readPayload();
                    }
                    else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }

读取4个字节,获取到数据长度,然后读取载荷,也就是请求

    private void readPayload() throws IOException, InterruptedException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
        }

        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            packetReceived();
            incomingBuffer.flip(); // 复位
            if (!initialized) {
                readConnectRequest(); // 读取连接请求
            } else {
                readRequest();
            }
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

先是读取数据,然后再读取请求,这里关注readConnectRequest

读取连接请求
    private void readConnectRequest() throws IOException, InterruptedException {
        if (zkServer == null) {
            throw new IOException("ZooKeeperServer not running");
        }
        zkServer.processConnectRequest(this, incomingBuffer);
        initialized = true;
    }

继续,下面是处理连接请求:

     public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect"); // 反序列化请求
        ....
        // 客户端设置的超时时间
        int sessionTimeout = connReq.getTimeOut();
        byte passwd[] = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        // 服务端设置的最大超时时间
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
        cnxn.disableRecv();
        // 请求是否带上sessionid
        long sessionId = connReq.getSessionId();
        if (sessionId != 0) {
            // 请求带了sessionid
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            // 关闭请求
            serverCnxnFactory.closeSession(sessionId);
            cnxn.setSessionId(sessionId);
            // 重新打开请求
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            // 创建新sesssion
            createSession(cnxn, passwd, sessionTimeout);
        }
    }

以上完成:

  • 将读取出来的incomingBuffer反序列化为ConnectRequest对象
  • 然后设置超时时间,ServerCnxn接收到该申请后,根据客户端传递过来的sessionTimeout时间以及ZooKeeperServer本身的minSessionTimeout、maxSessionTimeout参数,确定最终的sessionTimeout时间
  • 判断客户端的请求是否已经含有sessionId
    • 如果含有,则执行sessionId的是否过期、密码是否正确等检查
    • 如果没有sessionId,则创建一个session
创建session

所以,我们需要再看一下如何创建session:

    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
        long sessionId = sessionTracker.createSession(timeout);
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        cnxn.setSessionId(sessionId);
        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
        return sessionId;
    }
  • 使用sessionTracker生成一个sessionId
  • submitRequest构建一个Request请求,请求的类型为OpCode.createSession
    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
        submitRequest(si);
    }
    
    public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

上面的代码:

  • 创建一个Request
  • 等待firstProcessor创建完成,然后调用firstProcessor.processRequest

firstProcessor是什么东东,下面再揭晓

2.5.3 zk服务器启动

再次回到startup, setZooKeeperServer(zks),代码很简单

 final public void setZooKeeperServer(ZooKeeperServer zk) {
        this.zkServer = zk;
        if (zk != null) {
            zk.setServerCnxnFactory(this);
        }
    }

然后是zk服务器的startdata:

    public void startdata() 
    throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }  
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }

初始化ZKDatabase,从txnLogFactory里读取快照数据。

最后是zk服务器的startup:

    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        registerJMX();

        setState(State.RUNNING);
        notifyAll();
    }

几件事情:

  • createSessionTracker创建sessionTracker
  • startSessionTracker启动SessionTracker
  • setupRequestProcessors 创建请求处理器链
  • registerJMX 注册JMX
  • setState(State.RUNNING) 设置状态为运行中
SessionTracker

看SessionTracker的注释:

This is the basic interface that ZooKeeperServer uses to track sessions.
负责追踪Session的

在zk里的实现是SessionTrackerImpl:

    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1, getZooKeeperServerListener());
    }
    
    protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }

SessionTrackerImpl后面再详细分析。

2.5.4 ZooKeeperServer请求处理器链介绍

这里是zk的核心部分之一,zk接收到的请求最终在这里进行处理。

 protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

请求处理链介绍

  • 首先是PrepRequestProcessor
  • 然后是SyncRequestProcessor
  • 最后是finalProcessor

下面依次解读:

RequestProcessor

RequestProcessors are chained together to process transactions.
RequestProcessors都是链在一起的事务处理链

public interface RequestProcessor {
    @SuppressWarnings("serial")
    public static class RequestProcessorException extends Exception {
        public RequestProcessorException(String msg, Throwable t) {
            super(msg, t);
        }
    }

    void processRequest(Request request) throws RequestProcessorException;

    void shutdown();
}

包含下面这些实现:
enter description here
我们重点来看下面几个:

PrepRequestProcessor

为什么成为请求处理链,看下PrepRequestProcessor代码就知道了:

    RequestProcessor nextProcessor;

    ZooKeeperServer zks;

    public PrepRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }protected void pRequest(Request request) throws RequestProcessorException {
        ……
        nextProcessor.processRequest(request);
    }

构造函数里包含nextProcessor,在pRequest完成后,执行nextProcessor.processRequest,相当于链式执行。

接着分析,再来看类的定义:

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
            RequestProcessor {

        LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

        RequestProcessor nextProcessor;    
}

几个要点

  • 继承自ZooKeeperCriticalThread,是一个Thread
  • 重要属性submittedRequests 是一个LinkedBlockingQueue,LinkedBlockingQueue实现是线程安全的,实现了先进先出特性,是作为生产者消费者的首选。

PrepRequestProcessor作为处理链的源头,对外提供processRequest方法收集请求,由于是单线程,所以需要将请求放入submittedRequests队列。

    public void processRequest(Request request) {
        // request.addRQRec(">prep="+zks.outstandingChanges.size());
        submittedRequests.add(request);
    }

放入队列后,PrepRequestProcessor本身就是一个Thread,所以start后执行run,在run方法中又会将用户提交的请求取出来进行处理:

    public void run() {
            while (true) {
                // 取出一个请求
                Request request = submittedRequests.take();
                if (Request.requestOfDeath == request) {
                    break;
                }
                // 处理请求
                pRequest(request);
            }
        }

再来看pRequest:
enter description here

根据request的type,构造对应的请求,对于增删改等影响数据状态的操作都被认为是事务(txn:transaction) ,需要创建出事务请求头(hdr),调用pRequest2Txn,其他操作则不属于事务操作,需要验证下sessionId是否合法。

 //create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                break;
 
            //All the rest don't need to create a Txn - just verify session
            case OpCode.sync:
            case OpCode.exists:
            case OpCode.getData:
            case OpCode.getACL:
            case OpCode.getChildren:
            case OpCode.getChildren2:
            case OpCode.ping:
            case OpCode.setWatches:
                zks.sessionTracker.checkSession(request.sessionId,
                        request.getOwner());
                break;

来看pRequest2Txn,以create为例

  pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
 
   protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
    {
        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                    zks.getTime(), type);

        switch (type) {
            case OpCode.create:                
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                CreateRequest createRequest = (CreateRequest)record;   
                if(deserialize)
                    ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                String path = createRequest.getPath();
                int lastSlash = path.lastIndexOf('/');
                if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
                    LOG.info("Invalid path " + path + " with session 0x" +
                            Long.toHexString(request.sessionId));
                    throw new KeeperException.BadArgumentsException(path);
                }
                List<ACL> listACL = removeDuplicates(createRequest.getAcl());
                if (!fixupACL(request.authInfo, listACL)) {
                    throw new KeeperException.InvalidACLException(path);
                }
                String parentPath = path.substring(0, lastSlash);
                ChangeRecord parentRecord = getRecordForPath(parentPath);

                checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
                        request.authInfo);
                int parentCVersion = parentRecord.stat.getCversion();
                CreateMode createMode =
                    CreateMode.fromFlag(createRequest.getFlags());
                if (createMode.isSequential()) {
                    path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                }
                validatePath(path, request.sessionId);
                try {
                    if (getRecordForPath(path) != null) {
                        throw new KeeperException.NodeExistsException(path);
                    }
                } catch (KeeperException.NoNodeException e) {
                    // ignore this one
                }
                boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
                if (ephemeralParent) {
                    throw new KeeperException.NoChildrenForEphemeralsException(path);
                }
                int newCversion = parentRecord.stat.getCversion()+1;
                request.txn = new CreateTxn(path, createRequest.getData(),
                        listACL,
                        createMode.isEphemeral(), newCversion);
                StatPersisted s = new StatPersisted();
                if (createMode.isEphemeral()) {
                    s.setEphemeralOwner(request.sessionId);
                }
                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                parentRecord.childCount++;
                parentRecord.stat.setCversion(newCversion);
                addChangeRecord(parentRecord);
                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                        0, listACL));
                break;
  • 首先是 zks.getNextZxid()创建一个事务id,AtomicLong hzxid是自增长id,初始化为0,每次加一
  • 在pRequest2Txn内部,先给request创建一个TxnHeader,这个header包含事务id
  • 然后判断请求类型
  • zks.sessionTracker.checkSession(request.sessionId, request.getOwner()) 检查session
  • 反序列化为CreateRequest
SyncRequestProcessor
FinalRequestProcessor

未完待续


作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: 从编辑距离、BK树到文本纠错

搜索引擎里有一个很重要的话题,就是文本纠错,主要有两种做法,一是从词典纠错,一是分析用户搜索日志,今天我们探讨使用基于词典的方式纠错,核心思想就是基于编辑距离,使用BK树。下面我们来逐一探讨:

编辑距离

1965年,俄国科学家Vladimir
Levenshtein给字符串相似度做出了一个明确的定义叫做Levenshtein距离,我们通常叫它“编辑距离”。

字符串A到B的编辑距离是指,只用插入、删除和替换三种操作,最少需要多少步可以把A变成B。例如,从FAME到GATE需要两步(两次替换),从GAME到ACM则需要三步(删除G和E再添加C)。Levenshtein给出了编辑距离的一般求法,就是大家都非常熟悉的经典动态规划问题。

 class LevenshteinDistanceFunction {

        private final boolean isCaseSensitive;

        public LevenshteinDistanceFunction(boolean isCaseSensitive) {
            this.isCaseSensitive = isCaseSensitive;
        }

        public int distance(CharSequence left, CharSequence right) {
            int leftLength = left.length(), rightLength = right.length();

            // special cases.
            if (leftLength == 0)
                return rightLength;
            if (rightLength == 0)
                return leftLength;

            // Use the iterative matrix method.
            int[] currentRow = new int[rightLength + 1];
            int[] nextRow    = new int[rightLength + 1];

            // Fill first row with all edit counts.
            for (int i = 0; i <= rightLength; i++)
                currentRow[i] = i;

            for (int i = 1; i <= leftLength; i++) {
                nextRow[0] = i;

                for(int j = 1; j <= rightLength; j++) {
                    int subDistance = currentRow[j - 1]; // Distance without insertions or deletions.
                    if (!charEquals(left.charAt(i - 1), right.charAt(j - 1), isCaseSensitive))
                            subDistance++; // Add one edit if letters are different.
                    nextRow[j] = Math.min(Math.min(nextRow[j - 1], currentRow[j]) + 1, subDistance);
                }

                // Swap rows, use last row for next row.
                int[] t = currentRow;
                currentRow = nextRow;
                nextRow = t;
            }

            return currentRow[rightLength];
        }

    }

BK树

编辑距离的经典应用就是用于拼写检错,如果用户输入的词语不在词典中,自动从词典中找出编辑距离小于某个数n的单词,让用户选择正确的那一个,n通常取到2或者3。

这个问题的难点在于,怎样才能快速在字典里找出最相近的单词?可以像 使用贝叶斯做英文拼写检查(c#) 里是那样,通过单词自动修改一个单词,检查是否在词典里,这样有暴力破解的嫌疑,是否有更优雅的方案呢?

1973年,Burkhard和Keller提出的BK树有效地解决了这个问题。BK树的核心思想是:

令d(x,y)表示字符串x到y的Levenshtein距离,那么显然:
d(x,y) = 0 当且仅当 x=y (Levenshtein距离为0 <==> 字符串相等)
d(x,y) = d(y,x) (从x变到y的最少步数就是从y变到x的最少步数)
d(x,y) + d(y,z) >= d(x,z) (从x变到z所需的步数不会超过x先变成y再变成z的步数)

最后这一个性质叫做三角形不等式。就好像一个三角形一样,两边之和必然大于第三边。

BK建树

首先我们随便找一个单词作为根(比如GAME)。以后插入一个单词时首先计算单词与根的Levenshtein距离:如果这个距离值是该节点处头一次出现,建立一个新的儿子节点;否则沿着对应的边递归下去。例如,我们插入单词FAME,它与GAME的距离为1,于是新建一个儿子,连一条标号为1的边;下一次插入GAIN,算得它与GAME的距离为2,于是放在编号为2的边下。再下次我们插入GATE,它与GAME距离为1,于是沿着那条编号为1的边下去,递归地插入到FAME所在子树;GATE与FAME的距离为2,于是把GATE放在FAME节点下,边的编号为2。

enter description here

BK查询

如果我们需要返回与错误单词距离不超过n的单词,这个错误单词与树根所对应的单词距离为d,那么接下来我们只需要递归地考虑编号在d-n到d+n范围内的边所连接的子树。由于n通常很小,因此每次与某个节点进行比较时都可以排除很多子树

可以通过下图(来自 超酷算法(1):BK树 (及个人理解))理解:

enter description here

BK 实现

知道了原理实现就简单了,这里从github找一段代码

建树:

public boolean add(T t) {
        if (t == null)
            throw new NullPointerException();

        if (rootNode == null) {
            rootNode = new Node<>(t);
            length = 1;
            modCount++; // Modified tree by adding root.
            return true;
        }

        Node<T> parentNode = rootNode;
        Integer distance;
        while ((distance = distanceFunction.distance(parentNode.item, t)) != 0
                || !t.equals(parentNode.item)) {
            Node<T> childNode = parentNode.children.get(distance);
            if (childNode == null) {
                parentNode.children.put(distance, new Node<>(t));
                length++;
                modCount++; // Modified tree by adding a child.
                return true;
            }
            parentNode = childNode;
        }

        return false;
    }

查找:

 public List<SearchResult<T>> search(T t, int radius) {
        if (t == null)
            return Collections.emptyList();
        ArrayList<SearchResult<T>> searchResults = new ArrayList<>();
        ArrayDeque<Node<T>> nextNodes = new ArrayDeque<>();
        if (rootNode != null)
            nextNodes.add(rootNode);

        while(!nextNodes.isEmpty()) {
            Node<T> nextNode = nextNodes.poll();
            int distance = distanceFunction.distance(nextNode.item, t);
            if (distance <= radius)
                searchResults.add(new SearchResult<>(distance, nextNode.item));
            int lowBound = Math.max(0, distance - radius), highBound = distance + radius;
            for (Integer i = lowBound; i <= highBound; i++) {
                if (nextNode.children.containsKey(i))
                    nextNodes.add(nextNode.children.get(i));
            }
        }

        searchResults.trimToSize();
        Collections.sort(searchResults);
        return Collections.unmodifiableList(searchResults);
    }

使用BK树做文本纠错

准备词典,18万的影视名称:
enter description here

测试代码:

  static void outputSearchResult( List<SearchResult<CharSequence>> results){
        for(SearchResult<CharSequence> item : results){
            System.out.println(item.item);
        }
    }

    static void test(BKTree<CharSequence> tree,String word){
        System.out.println(word+"的最相近结果:");
        outputSearchResult(tree.search(word,Math.max(1,word.length()/4)));
    }

    public static void main(String[] args) {

        BKTree<CharSequence> tree = new BKTree(DistanceFunctions.levenshteinDistance());
        List<String> testStrings = FileUtil.readLine("./src/main/resources/act/name.txt");
        System.out.println("词典条数:"+testStrings.size());
        long startTime = System.currentTimeMillis();
        for(String testStr: testStrings){
            tree.add(testStr.replace(".",""));
        }
        System.out.println("建树耗时:"+(System.currentTimeMillis()-startTime)+"ms");
        startTime = System.currentTimeMillis();
        String[] testWords = new String[]{
                "湄公河凶案",
                "葫芦丝兄弟",
                "少林足球"
        };

        for (String testWord: testWords){
            test(tree,testWord);
        }
        System.out.println("测试耗时:"+(System.currentTimeMillis()-startTime)+"ms");
    }

结果:

词典条数:18513
建树耗时:421ms
湄公河凶案的最相近结果:
湄公河大案
葫芦丝兄弟的最相近结果:
葫芦兄弟
少林足球的最相近结果:
少林足球
笑林足球
测试耗时:20ms

参考:
http://blog.csdn.net/tradymeky/article/details/40581547
https://github.com/sk-scd91/BKTree
https://www.cnblogs.com/data2value/p/5707973.html


作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: 从Trie树到双数组Trie树

Trie树

原理

又称单词查找树,Trie树,是一种树形结构,是一种哈希树的变种。它的优点是:利用字符串的公共前缀来减少查询时间,最大限度地减少无谓的字符串比较,能在常数时间O(len)内实现插入和查询操作,是一种以空间换取时间的数据结构,广泛用于词频统计和输入统计领域。

来看看Trie树长什么样,我们从百度找一张图片:

enter description here

字典树在查找时,先看第一个字是否在字典树里,如果在继续往下,如果不在,则字典里不存在,因此,对于一个长度为len的字符串,可以在O(len)时间内完成查询。

实现trie树

怎么实现trie树呢,trie树的关键是一个节点要在O(1)时间跳转到下一级节点,因此链表方式不可取,最好用数组来存储下一级节点。问题就来了,如果是纯英文字母,长度26的数组就可以搞定,N个节点的数,就需要N个长度为26的数组。但是,如果包含中文等字符呢,就需要N个65535的数组,特别占用存储空间。当然,可以考虑使用map来存储下级节点。

定义一个Node,包含节点的Character word,以及下级节点nexts和节点可能附件的值values:

public static class Node<T> {
        Character word;

        List<T> values;

        Map<Character, Node> nexts = new HashMap<>(24);

        public Node() {
        }

        public Node(Character word) {
            this.word = word;
        }

        public Character getWord() {
            return word;
        }

        public void setWord(Character word) {
            this.word = word;
        }

        public void addValue(T value){
            if(values == null){
                values = new ArrayList<>();
            }
            values.add(value);
        }

        public List<T> getValues() {
            return values;
        }

        public Map<Character, Node> getNexts() {
            return nexts;
        }

        /**
         * @param node
         */
        public void addNext(Node node) {
            this.nexts.put(node.getWord(), node);
        }

        public Node getNext(Character word) {
            return this.nexts.get(word);
        }
    }

来看如何构建字典树,首先定义一棵树,包含根节点即可

    public static class Trie<T> {
        Node<T> rootNode;

        public Trie() {
            this.rootNode = new Node<T>();
        }

        public Node<T> getRootNode() {
            return rootNode;
        }

    }

构建树,拆分成单字,然后逐级构建树。

 public static class TrieBuilder {
        public static  Trie<String> buildTrie(String... values){
            Trie<String> trie = new Trie<String>();
            for(String sentence : values){
                // 根节点
                Node<String> currentNode = trie.getRootNode();
                for (int i = 0; i < sentence.length(); i++) {
                    Character character = sentence.charAt(i);
                    // 寻找首个节点
                    Node<String> node = currentNode.getNext(character);
                    if(node == null){
                        // 不存在,创建节点
                        node = new Node<String>(character);
                        currentNode.addNext(node);
                    }
                    currentNode = node;
                }

                // 添加数据
                currentNode.addValue(sentence);
            }

            return trie;
        }

Trie树应用

比如判断一个词是否在字典树里,非常简单,逐级匹配,末了判断最后的节点是否包含数据:

   public boolean isContains(String word) {
            if (word == null || word.length() == 0) {
                return false;
            }
            Node<T> currentState = rootNode;
            for (int i = 0; i < word.length(); i++) {
                currentState = currentState.getNext(word.charAt(i));
                if (currentState == null) {
                    return false;
                }
            }
            return currentState.getValues()!=null;
        }

测试代码:

        public static void main(String[] args) {

            Trie trie = TrieBuilder.buildTrie("刘德华","刘三姐","刘德刚","江姐");
            System.out.println(trie.isContains("刘德华"));
            System.out.println(trie.isContains("刘德"));
            System.out.println(trie.isContains("刘大大"));
        }

结果:

true
false
false

双数组Trie树

在Trie数实现过程中,我们发现了每个节点均需要 一个数组来存储next节点,非常占用存储空间,空间复杂度大,双数组Trie树正是解决这个问题的。双数组Trie树(DoubleArrayTrie)是一种空间复杂度低的Trie树,应用于字符区间大的语言(如中文、日文等)分词领域。

原理

双数组的原理是,将原来需要多个数组才能表示的Trie树,使用两个数据就可以存储下来,可以极大的减小空间复杂度。具体来说:

使用两个数组base和check来维护Trie树,base负责记录状态,check负责检查各个字符串是否是从同一个状态转移而来,当check[i]为负值时,表示此状态为字符串的结束。

上面的有点抽象,举个例子,假定两个单词ta,tb,base和check的值会满足下面的条件:
base[t] + a.code = base[ta]
base[t] + b.code = base[tb]
check[ta] = check[tb]

在每个节点插入的过程中会修改这两个数组,具体说来:

1、初始化root节点base[0] = 1; check[0] = 0;

2、对于每一群兄弟节点,寻找一个begin值使得check[begin + a1…an] == 0,也就是找到了n个空闲空间,a1…an是siblings中的n个节点对应的code。

3、然后将这群兄弟节点的check设为check[begin + a1…an] = begin

4、接着对每个兄弟节点,如果它没有孩子,令其base为负值;否则为该节点的子节点的插入位置(也就是begin值),同时插入子节点(迭代跳转到步骤2)。

码表:
   胶    名    动    知    下    成    举    一    能    天    万    
33014 21517 21160 30693 19979 25104 20030 19968 33021 22825 19975 

DoubleArrayTrie{
char =      ×    一    万     ×    举     ×    动     ×     下    名    ×    知      ×     ×    能    一    天    成    胶
i    =      0 19970 19977 20032 20033 21162 21164 21519 21520 21522 30695 30699 33023 33024 33028 40001 44345 45137 66038
base =      1     2     6    -1 20032    -2 21162    -3     5 21519    -4 30695    -5    -6 33023     3  1540     4 33024
check=      0     1     1 20032     2 21162     3 21519  1540     4 30695     5 33023 33024     6 20032 21519 20032 33023
size=66039, allocSize=2097152, key=[一举, 一举一动, 一举成名, 一举成名天下知, 万能, 万能胶], keySize=6, progress=6, nextCheckPos=33024, error_=0}

首层:一[19968],万[ 19975]
base[一] = base[0]+19968-19968 = 1
base[万] = base[0]+19975-19968 =

实现

参考 双数组Trie树(DoubleArrayTrie)Java实现
开源项目:https://github.com/komiya-atsushi/darts-java

双数组Trie+AC自动机

参见:http://www.hankcs.com/program/algorithm/aho-corasick-double-array-trie.html

结合了AC自动机+双数组Trie树:
AC自动机能高速完成多模式匹配,然而具体实现聪明与否决定最终性能高低。大部分实现都是一个Map<Character, State>了事,无论是TreeMap的对数复杂度,还是HashMap的巨额空间复杂度与哈希函数的性能消耗,都会降低整体性能。

双数组Trie树能高速O(n)完成单串匹配,并且内存消耗可控,然而软肋在于多模式匹配,如果要匹配多个模式串,必须先实现前缀查询,然后频繁截取文本后缀才可多匹配,这样一份文本要回退扫描多遍,性能极低。

如果能用双数组Trie树表达AC自动机,就能集合两者的优点,得到一种近乎完美的数据结构。在我的Java实现中,我称其为AhoCorasickDoubleArrayTrie,支持泛型和持久化,自己非常喜爱。


作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: 使用websocket-sharp来创建c#版本的websocket服务

当前有一个需求,需要网页端调用扫描仪,javascript不具备调用能力,因此需要在机器上提供一个ws服务给前端网页调用扫描仪。而扫描仪有一个c#版本的API,因此需要寻找一个c#的websocket库。

java里有大名鼎鼎的netty,通过搜索,c#可以选择websocket-sharp来实现websocket Server。

使用websocket-sharp创建websocket server###

using System;
using WebSocketSharp;
using WebSocketSharp.Server;

namespace Example
{
  public class Laputa : WebSocketBehavior
  {
    protected override void OnMessage (MessageEventArgs e)
    {
      var msg = e.Data == "BALUS"
                ? "I've been balused already..."
                : "I'm not available now.";

      Send (msg);
    }
  }

  public class Program
  {
    public static void Main (string[] args)
    {
      var wssv = new WebSocketServer ("ws://dragonsnest.far");
      wssv.AddWebSocketService<Laputa> ("/Laputa");
      wssv.Start ();
      Console.ReadKey (true);
      wssv.Stop ();
    }
  }
}

Step 1

Required namespace.

using WebSocketSharp.Server;

The WebSocketBehavior and WebSocketServer 两个类需要引用 WebSocketSharp.Server namespace.

Step 2

编写处理类,需要继承 WebSocketBehavior class.

例如,如果你要创建一个echo Service,

using System;
using WebSocketSharp;
using WebSocketSharp.Server;

public class Echo : WebSocketBehavior
{
  protected override void OnMessage (MessageEventArgs e)
  {
    Send (e.Data);
  }
}

再提供一个 chat service,

using System;
using WebSocketSharp;
using WebSocketSharp.Server;

public class Chat : WebSocketBehavior
{
  private string _suffix;

  public Chat ()
    : this (null)
  {
  }

  public Chat (string suffix)
  {
    _suffix = suffix ?? String.Empty;
  }

  protected override void OnMessage (MessageEventArgs e)
  {
    Sessions.Broadcast (e.Data + _suffix);
  }
}

可以通过继承WebSocketBehavior类来自定义Service.

通过重载 WebSocketBehavior.OnMessage (MessageEventArgs) 方法, 来处理消息

同时你也可以重载 WebSocketBehavior.OnOpen (), WebSocketBehavior.OnError (ErrorEventArgs), 和 WebSocketBehavior.OnClose (CloseEventArgs) 方法,来处理websocket连接事件。

通过WebSocketBehavior.Send 方法来给客户端发送消息。

If you would like to get the sessions in the service, you should access the WebSocketBehavior.Sessions property (returns a WebSocketSharp.Server.WebSocketSessionManager).

The WebSocketBehavior.Sessions.Broadcast method can send data to every client in the service.

Step 3

创建 WebSocketServer 对象.

var wssv = new WebSocketServer (4649);
wssv.AddWebSocketService<Echo> ("/Echo");
wssv.AddWebSocketService<Chat> ("/Chat");
wssv.AddWebSocketService<Chat> ("/ChatWithNyan", () => new Chat (" Nyan!"));

Step 4

启动 WebSocket server.

wssv.Start ();

Step 5

停止 WebSocket server.

wssv.Stop (code, reason);

测试Demo

目的:对外提供一个websocket服务,让网页端的js可以调用扫描仪

服务端代码

 class Program
    {
        static void Main(string[] args)
        {
            var wssv = new WebSocketServer(10086);
            wssv.AddWebSocketService<ScannerHandler>("/scan");
            wssv.Start();
            if (wssv.IsListening)
            {
                Console.WriteLine("Listening on port {0}, and providing WebSocket services:", wssv.Port);
                foreach (var path in wssv.WebSocketServices.Paths)
                    Console.WriteLine("- {0}", path);
            }

            Console.WriteLine("\nPress Enter key to stop the server...");
            Console.ReadLine();

            wssv.Stop();
        }
    }

    public class ScannerHandler : WebSocketBehavior
    {
        protected override void OnMessage(MessageEventArgs e)
        {
            if(e.Data == "scan")
            {
                ScanResult result = ScanerHelper.Scan("D:\\test.jpg");
                if (result.Success)
                {
                    Console.WriteLine("scan success");
                    Send("scan success");
                }
                else
                {
                    Send("scan eror");
                }
            }
           
        }
    }

前端代码

javascript代码

     var ws;
    function initWS() {
        ws = new WebSocket("ws://127.0.0.1:10086/scan");
        ws.onopen = function () {
            console.log("Openened connection to websocket");

        };
        ws.onclose = function () {
            console.log("Close connection to websocket");
            // 断线重连
            initWS();
        }

        ws.onmessage = function (e) {
            alert(e.data)
        }
    }
    initWS();
    function scan() {
        ws && ws.send('scan');
    }

html代码

<button onclick="scan()">扫描</button>
  • initWS创建连接,支持断线重连
  • 可以调用scan函数,发送scan指令

文章作者:jqpeng
原文链接: IDEA+PHP+XDebug调试配置

XDebug调试配置

临时需要调试服务器上的PHP web程序,因此安装xdebug,下面简单记录

安装xdebug

下载最新并解压

wget https://xdebug.org/files/xdebug-2.5.4.tgz
tar zxvf xdebug-2.5.4.tgz 
cd xdebug-2.5.4/

编译

按照README里的步骤安装

./configure --enable-xdebug
···

报错
>checking Check for supported PHP versions... configure: error: not supported. Need a PHP version >= 5.5.0 and < 7.2.0 (found 5.3.10-1ubuntu3.21)


原来服务器上的php版本比较低:
>PHP 5.3.10-1ubuntu3.26 with Suhosin-Patch (cli) (built: Feb 13 2017 20:37:53) 
Copyright (c) 1997-2012 The PHP Group
Zend Engine v2.3.0, Copyright (c) 1998-2012 Zend Technologies

最稳妥起见,下载老版本的xdebug,下载2.2.2版本

``` bash
wget https://xdebug.org/files/xdebug-2.2.2.tgz
tar zxvf xdebug-2.2.2.tgz 
cd xdebug-2.2.2/
./configure --enable-xdebug
make

make完成后,modules下面就有了编译好的xdebug.so:

root@nginx01:/opt/research/xdebug-2.2.2# ll modules/
total 808
drwxr-xr-x 2 root root   4096 Jun 19 14:17 ./
drwxr-xr-x 9 root root   4096 Jun 19 13:10 ../
-rw-r--r-- 1 root root    939 Jun 19 13:09 xdebug.la
-rwxr-xr-x 1 root root 814809 Jun 19 13:09 xdebug.so*

配置

修改php.ini,服务器使用的php5-fpm,配置文件在/etc/php5/fpm/php.ini

修改,增加xdebug配置信息

zend_extension="/opt/research/xdebug-2.2.2/modules/xdebug.so"
xdebug.remote_enable = On
xdebug.remote_handler = dbgp
xdebug.remote_port = 9001 #端口9001
xdebug.remote_connect_back = 1 
#xdebug.remote_host= 192.168.xxx.xxx
xdebug.idekey = PHPSTORM
xdebug.remote_log = /opt/research/xdebug-2.2.2/xdebug.log

IDEA 配置

配置xdebug端口为9001

在设置里搜索XDEBUG,配置端口9001
enter description here

调试配置

在RUN-Edit Configuratins里,新增PHP Web Application
enter description here

Server新增服务器地址,Debugger设置为Xdebug,将服务器上的绝对地址,映射到本地

XDEBUG配置

然后就可以启动调试了

文章作者:jqpeng
原文链接: HTML5录音控件

最近的项目又需要用到录音,年前有过调研,再次翻出来使用,这里做一个记录。

HTML5提供了录音支持,因此可以方便使用HTML5来录音,来实现录音、语音识别等功能,语音开发必备。但是ES标准提供的API并不人性化,不方便使用,并且不提供保存为wav的功能,开发起来费劲啊!!

github寻找轮子,发现Recorder.js,基本上可以满足需求了,良好的封装,支持导出wav,但是存在:

  • wav采样率不可调整
  • recorder创建麻烦,需要自己初始化getUserMedia
  • 无实时数据回调,不方便绘制波形
  • 。。。

改造轮子

创建recorder工具方法

提供创建recorder工具函数,封装audio接口:

static createRecorder(callback,config){
        window.AudioContext = window.AudioContext || window.webkitAudioContext;
        window.URL = window.URL || window.webkitURL;
        navigator.getUserMedia = navigator.getUserMedia || navigator.webkitGetUserMedia || navigator.mozGetUserMedia || navigator.msGetUserMedia;
        
        if (navigator.getUserMedia) {
            navigator.getUserMedia(
                { audio: true } //只启用音频
                , function (stream) {
                    var audio_context = new AudioContext;
                    var input = audio_context.createMediaStreamSource(stream);
                    var rec = new Recorder(input, config);
                    callback(rec);
                }
                , function (error) {
                    switch (error.code || error.name) {
                        case 'PERMISSION_DENIED':
                        case 'PermissionDeniedError':
                            throwError('用户拒绝提供信息。');
                            break;
                        case 'NOT_SUPPORTED_ERROR':
                        case 'NotSupportedError':
                            throwError('浏览器不支持硬件设备。');
                            break;
                        case 'MANDATORY_UNSATISFIED_ERROR':
                        case 'MandatoryUnsatisfiedError':
                            throwError('无法发现指定的硬件设备。');
                            break;
                        default:
                            throwError('无法打开麦克风。异常信息:' + (error.code || error.name));
                            break;
                    }
                });
        } else {
            throwError('当前浏览器不支持录音功能。'); return;
        }
    }

采样率

H5录制的默认是44k的,文件大,不方便传输,因此需要进行重新采样,一般采用插值取点方法:

以下代码主要来自stackoverflow:

             /**
             * 转换采样率
             * @param data
             * @param newSampleRate 目标采样率
             * @param oldSampleRate 原始数据采样率
             * @returns {any[]|Array}
             */
            function interpolateArray(data, newSampleRate, oldSampleRate) {
                var fitCount = Math.round(data.length * (newSampleRate / oldSampleRate));
                var newData = new Array();
                var springFactor = new Number((data.length - 1) / (fitCount - 1));
                newData[0] = data[0]; // for new allocation
                for (var i = 1; i < fitCount - 1; i++) {
                    var tmp = i * springFactor;
                    var before = new Number(Math.floor(tmp)).toFixed();
                    var after = new Number(Math.ceil(tmp)).toFixed();
                    var atPoint = tmp - before;
                    newData[i] = this.linearInterpolate(data[before], data[after], atPoint);
                }
                newData[fitCount - 1] = data[data.length - 1]; // for new allocation
                return newData;
            }

            function linearInterpolate(before, after, atPoint) {
                return before + (after - before) * atPoint;
            }

修改导出wav函数exportWAV,增加采样率选项:

            /**
             * 导出wav
             * @param type
             * @param desiredSamplingRate 期望的采样率
             */
            function exportWAV(type,desiredSamplingRate) {
                // 默认为16k
                desiredSamplingRate = desiredSamplingRate || 16000;
                var buffers = [];
                for (var channel = 0; channel < numChannels; channel++) {
                    var buffer = mergeBuffers(recBuffers[channel], recLength);
                    // 需要转换采样率
                    if (desiredSamplingRate!=sampleRate) {
                        // 插值去点
                        buffer = interpolateArray(buffer, desiredSamplingRate, sampleRate);
                    }
                    buffers.push(buffer);
                }
                var interleaved = numChannels === 2 ? interleave(buffers[0], buffers[1]) : buffers[0];
                var dataview = encodeWAV(interleaved,desiredSamplingRate);
                var audioBlob = new Blob([dataview], { type: type });
                self.postMessage({ command: 'exportWAV', data: audioBlob });
            }

实时录音数据回调

为了方便绘制音量、波形图,需要获取到实时数据:

config新增一个回调函数onaudioprocess:

  config = {
        bufferLen: 4096,
        numChannels: 1, // 默认单声道
        mimeType: 'audio/wav',
        onaudioprocess:null
    };

修改录音数据处理函数:

        this.node.onaudioprocess = (e) => {
            if (!this.recording) return;
            var buffer = [];

            for (var channel = 0; channel < this.config.numChannels; channel++) {
                buffer.push(e.inputBuffer.getChannelData(channel));
            }

            // 发送给worker
            this.worker.postMessage({
                command: 'record',
                buffer: buffer
            });

            // 数据回调
            if(this.config.onaudioprocess){
                this.config.onaudioprocess(buffer[0]);
            }
        };

这样,在创建recorder时,配置onaudioprocess就可以获取到实时数据了

实时数据编码

编码计算耗时,需要放到worker执行:

接口函数新增encode,发送消息给worker,让worker执行:

    encode(cb,buffer,sampleRate) {
        cb = cb || this.config.callback;
        if (!cb) throw new Error('Callback not set');
        this.callbacks.encode.push(cb);
        this.worker.postMessage({ command: 'encode',buffer:buffer,sampleRate:sampleRate});
    }

worker里新增encode函数,处理encode请求,完成后执行回调

 self.onmessage = function (e) {
                switch (e.data.command) {

                    case 'encode':
                        encode(e.data.buffer,e.data.sampleRate);
                        break;

                }
            };        
    encode(cb,buffer,sampleRate) {
        cb = cb || this.config.callback;
        if (!cb) throw new Error('Callback not set');
        this.callbacks.encode.push(cb);
        this.worker.postMessage({ command: 'encode',buffer:buffer,sampleRate:sampleRate});
    }

wav上传

增加一个上传函数:

     exportWAVAndUpload(url, callback) {
        var _url = url;
        exportWAV(function(blob){
            var fd = new FormData();
            fd.append("audioData", blob);
            var xhr = new XMLHttpRequest();
            if (callback) {
                xhr.upload.addEventListener("progress", function (e) {
                    callback('uploading', e);
                }, false);
                xhr.addEventListener("load", function (e) {
                    callback('ok', e);
                }, false);
                xhr.addEventListener("error", function (e) {
                    callback('error', e);
                }, false);
                xhr.addEventListener("abort", function (e) {
                    callback('cancel', e);
                }, false);
            }
            xhr.open("POST", url);
            xhr.send(fd);
        })     
    }

完整代码

=点击下载

发现新轮子

今天再次看这个项目,发现这个项目已经不维护了,

Note: This repository is not being actively maintained due to lack of time and interest. If you maintain or know of a good fork, please let me know so I can direct future visitors to it. In the meantime, if this library isn’t working, you can find a list of popular forks here: http://forked.yannick.io/mattdiamond/recorderjs.

作者推荐https://github.com/chris-rudmin/Recorderjs,提供更多的功能:

  • bitRate (optional) Specifies the target bitrate in bits/sec. The encoder selects an application-specific default when this is not specified.
  • bufferLength - (optional) The length of the buffer that the internal JavaScriptNode uses to capture the audio. Can be tweaked if experiencing performance issues. Defaults to 4096.
  • encoderApplication - (optional) Specifies the encoder application. Supported values are 2048 - Voice, 2049 - Full Band Audio, 2051 - Restricted Low Delay. Defaults to 2049.
  • encoderComplexity - (optional) Value between 0 and 10 which determines latency and processing for resampling. 0 is fastest with lowest complexity. 10 is slowest with highest complexity. The encoder selects a default when this is not specified.
  • encoderFrameSize (optional) Specifies the frame size in ms used for encoding. Defaults to 20.
  • encoderPath - (optional) Path to encoderWorker.min.js worker script. Defaults to encoderWorker.min.js
  • encoderSampleRate - (optional) Specifies the sample rate to encode at. Defaults to 48000. Supported values are 8000, 12000, 16000, 24000 or 48000.
  • leaveStreamOpen - (optional) Keep the stream around when trying to stop recording, so you can re-start without re-initStream. Defaults to false.
  • maxBuffersPerPage - (optional) Specifies the maximum number of buffers to use before generating an Ogg page. This can be used to lower the streaming latency. The lower the value the more overhead the ogg stream will incur. Defaults to 40.
  • monitorGain - (optional) Sets the gain of the monitoring output. Gain is an a-weighted value between 0 and 1. Defaults to 0
  • numberOfChannels - (optional) The number of channels to record. 1 = mono, 2 = stereo. Defaults to 1. Maximum 2 channels are supported.
  • originalSampleRateOverride - (optional) Override the ogg opus ‘input sample rate’ field. Google Speech API requires this field to be 16000.
  • resampleQuality - (optional) Value between 0 and 10 which determines latency and processing for resampling. 0 is fastest with lowest quality. 10 is slowest with highest quality. Defaults to 3.
  • streamPages - (optional) dataAvailable event will fire after each encoded page. Defaults to false.

推荐使用


作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: Spring Boot配置文件放在jar外部

Spring Boot程序默认从application.properties或者application.yaml读取配置,如何将配置信息外置,方便配置呢?

查询官网,可以得到下面的几种方案:

通过命令行指定

SpringApplication会默认将命令行选项参数转换为配置信息
例如,启动时命令参数指定:

java -jar myproject.jar --server.port = 9000

从命令行指定配置项的优先级最高,不过你可以通过setAddCommandLineProperties来禁用

SpringApplication.setAddCommandLineProperties(false).

外置配置文件

Spring程序会按优先级从下面这些路径来加载application.properties配置文件

  • 当前目录下的/config目录
  • 当前目录
  • classpath里的/config目录
  • classpath 跟目录

因此,要外置配置文件就很简单了,在jar所在目录新建config文件夹,然后放入配置文件,或者直接放在配置文件在jar目录

自定义配置文件

如果你不想使用application.properties作为配置文件,怎么办?完全没问题

java -jar myproject.jar --spring.config.location=classpath:/default.properties,classpath:/override.properties

或者

java -jar -Dspring.config.location=D:\config\config.properties springbootrestdemo-0.0.1-SNAPSHOT.jar 

当然,还能在代码里指定

@SpringBootApplication
@PropertySource(value={"file:config.properties"})
public class SpringbootrestdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootrestdemoApplication.class, args);
    }
}

按Profile不同环境读取不同配置

不同环境的配置设置一个配置文件,例如:

  • dev环境下的配置配置在application-dev.properties中;
  • prod环境下的配置配置在application-prod.properties中。

在application.properties中指定使用哪一个文件

spring.profiles.active = dev

当然,你也可以在运行的时候手动指定:

java -jar myproject.jar --spring.profiles.active = prod

参考:
1 参见Externalized Configuration


作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: Netty断线重连

Netty断线重连

最近使用Netty开发一个中转服务,需要一直保持与Server端的连接,网络中断后需要可以自动重连,查询官网资料,实现方案很简单,核心思想是在channelUnregistered钩子函数里执行重连。

创建连接

需要把configureBootstrap重构为一个函数,方便后续复用

  1. EventLoopGroup group = new NioEventLoopGroup(); 

  1. private volatile Bootstrap bootstrap; 


  1. public void init(String host, int port) throws RobotException { 

  1. this.serverIp = host; 

  1. this.serverPort = port; 

  1. try { 

  1. // 创建并初始化 Netty 客户端 Bootstrap 对象 

  1. bootstrap = configureBootstrap(new Bootstrap(),group); 

  1. bootstrap.option(ChannelOption.TCP_NODELAY, true); 

  1. doConnect(bootstrap); 


  1. catch(Exception ex){ 

  1. ex.printStackTrace(); 

  1. throw new RobotException(“connect remote control server error!”,ex.getCause()); 




  1. Bootstrap configureBootstrap(Bootstrap b, EventLoopGroup g) { 

  1. b.group(g).channel(NioSocketChannel.class) 

  1. .remoteAddress(serverIp, serverPort) 

  1. .handler(new ChannelInitializer<SocketChannel>() { 

  1. @Override 

  1. public void initChannel(SocketChannel channel) throws Exception { 

  1. ChannelPipeline pipeline = channel.pipeline(); 

  1. // 编解码器 

  1. pipeline.addLast(protoCodec); 

  1. // 请求处理 

  1. pipeline.addLast(RobotClient.this); 


  1. }); 


  1. return b; 



  1. void doConnect(Bootstrap b) { 

  1. try { 


  1. ChannelFuture future = b.connect(); 

  1. future.addListener(new ChannelFutureListener() { 

  1. @Override 

  1. public void operationComplete(ChannelFuture future) throws Exception { 

  1. if (future.isSuccess()) { 

  1. System.out.println(“Started Tcp Client: “ + serverIp); 

  1. } else { 

  1. System.out.println(“Started Tcp Client Failed: “); 


  1. if (future.cause() != null) { 

  1. future.cause().printStackTrace(); 




  1. }); 

  1. } catch (Exception e) { 

  1. e.printStackTrace(); 



断线重连

来看断线重连的关键代码:

  1. @ChannelHandler.Sharable 

  1. public class RobotClient extends SimpleChannelInboundHandler<RobotProto> { 

  1. @Override 

  1. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { 

  1. // 状态重置 

  1. isConnected = false; 

  1. this.serverStatus = -1; 


  1. final EventLoop loop = ctx.channel().eventLoop(); 

  1. loop.schedule(new Runnable() { 

  1. @Override 

  1. public void run() { 

  1. doConnect(configureBootstrap(new Bootstrap(), loop)); 


  1. }, 1, TimeUnit.SECONDS); 



需要注意,Client类需要添加@ChannelHandler.Sharable注解,否则重连时会报错


作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: 高效沟通技巧

高效沟通是工作中的必备技巧,本文为学习总结。

什么是沟通?

  • 沟通是达成共识,理解一致
  • 沟通 VS. 表达 : 沟通是双向交流,而表达是单向
  • 沟通关键在于,需要抓住时机及时沟通到位,并不是讲过了就通了

沟通技巧

沟通障碍在于认知偏差

每个人的背景、价值观并不尽相同,对同一个事物的认识会产生较多的偏差,因此这个认知偏差会导致沟通出现障碍。

克服认知偏差小技巧:

  • 刚开始不要说定性的结论
  • 及时沟通到位,一次不行,多次沟通
  • 要有为能做好事情找方法,而不是为不能做找理由!

认知偏差通常会由这些原因导致:

  • 听不到位
  • 缺乏参与
  • 无效表达

聆听技巧

关注下听的繁体字:

  • 十目
  • 一心
  • 一耳
  • 一王

聆听小秘诀

  • 理解确认
    • 用自己的语言复述对方所表达的意思
    • 在复述后通过澄清与对方确认理解,确认对话说话的目的
  • 回应情绪
    • 同理心
    • 表示理解
    • 让对方释放情绪
    • 不要一上来就将应该怎么做

沟通重在通,是双向的,因此需要提升参与度

提升参与

沟通三技巧

  • 收集信息、了解真相
  • 澄清疑虑、核对想法
  • 拓展思维,鼓励参与

用开放式问题,激发思考

  • WHAT
    • 你的建议是什么
    • 有什么困难挑战障碍
  • WHEN
    • 你认为什么时候可以完成
  • WHY
    • 这样的理由是?
    • 你认为可能是什么原因
  • WHO
    • 谁可以帮忙
    • 可以从谁那里获取帮助
  • HOW
    • 你打算如何开始
    • 要怎么做才可以
  • WHERE

提问技巧

  • 一次一个问题
  • 保持开放
  • 由广泛到具体
  • 尊重隐私
  • 不带威胁

沟通互动流程

但凡沟通,可以按这个秘籍(套路)来做:

  1. 定方向
    • 确定沟通的目的——今天主要是。。。
    • 然后再将讲重要性 —— 有什么重要性
  2. 理情况
    • 摆事实、看数据
    • 问题、疑虑
  3. 想方案
    • 方向
    • 资源
    • 支持
  4. 明做法
    • 行动计划
    • 追踪、应变
  5. 做总结
    • 要点
    • 信心

作者:Jadepeng
出处:jqpeng的技术记事本–http://www.cnblogs.com/xiaoqi
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

文章作者:jqpeng
原文链接: Latex 公式在线可视化编辑器

寻觅

最近的一个demo需要用到Latex公式在线编辑器,从搜索引擎一般会得到类似http://latex.codecogs.com/eqneditor/editor.php的结果,这个编辑器的问题在于使用成本高,并且界面不美观。
codecogs

继续探寻,发现了wiris Editor
wiris Editor

支持mathml和latex:
wiris Editor

那么就它了!

选型

首先,我们不会直接使用这个编辑器,只是在编辑公式的时候才使用,所以要选择合适的版本。
wiris Editor
以前用过CKEditor,所以就这它了!选用java版本
我们的数据已经是latex的,在wiris 编辑器显示需要注意latex需要用两个$$包括起来
例如:

The history of $$\sqrt(2)$$.

但是CK版本的wiris对latex的支持是非可视化支持,在编辑器里输入latex还是显示为latex:
enter description here

将焦点移动到$$内部,再点击按钮出现wiris的公式编辑器:

enter description here
这种设计适合对latex熟悉的人员,可以裸写latex,同时对不熟悉的人来说,可以使用公式编辑器。但是,这样不直观啊!你让不会latex的看到的就一堆符号!

适配

简单试用可以发现,如果直接使用公式编辑器插入公式,是直观显示的:
enter description here

可以看到保存的时候,mathml是:

<math class="wrs_chemistry" xmlns="http://www.w3.org/1998/Math/MathML"><msqrt>    <mn>2</mn></msqrt>
</math>

那么在latex输入情况下呢:

<math xmlns="http://www.w3.org/1998/Math/MathML"><semantics>    <mrow>        <msqrt><mo>(</mo></msqrt><mn>2</mn><mo>)</mo>    </mrow>    <annotation encoding="LaTeX">\sqrt(2)</annotation></semantics>
</math>

原来问题在这里,正是mathML的区别导致处理的区别。也就是说一开始就生成不带LaTeX的mathML,然后再放入编辑器。简单查看代码,可以知道先调用wrs_endParse,再wrs_initParse就可以了。

CKEDITOR.on("instanceReady", function(event){    CKEDITOR.instances.example.focus();    var mathxml = wrs_endParse("已知向量$$\\vec{a}=(\\sqrt{3},2)$$,$$\\vec{b}=(0,-2)$$,向量$$\\vec{c}=(k,\\sqrt{2})$$.$$\\vec{a}-1\\vec{b}$$与$$\\vec{d}$$共线,$$k=$$__.");    CKEDITOR.instances.example.setData(wrs_initParse(mathxml));    // 等待完成    window.setTimeout(updateFunction,0);});

Latex

直观显示没问题了,但是mathml如何再转换成Latex呢?core.js里的wrs_parseMathmlToLatex函数是直接从mathml里将。。。里的内容提取出来:

function wrs_parseMathmlToLatex(content, characters){
    ....
    var openTarget = characters.tagOpener + 'annotation encoding=' + characters.doubleQuote + 'LaTeX' + characters.doubleQuote + characters.tagCloser;
 
        mathml = content.substring(start, end);

        startAnnotation = mathml.indexOf(openTarget);    // 包含 encoding=latex,保留latex
        if (startAnnotation != -1){
            startAnnotation += openTarget.length;
            closeAnnotation = mathml.indexOf(closeTarget);
            var latex = mathml.substring(startAnnotation, closeAnnotation);
            if (characters == _wrs_safeXmlCharacters) {
                latex = wrs_mathmlDecode(latex);
            }
            output += '$$' + latex + '$$';
            // Populate latex into cache.
            wrs_populateLatexCache(latex, mathml);
        }else{
            output += mathml;
        }
   ......
}

但是现在的mathml不包含这个信息,如何处理?查看官方文档,发现有一个mathml2latex的服务,查看官方给的java demo里servlet并不包含这个服务,但是jar包里存在代码,于是自己封装一个servlet即可:

public class ServiceServlet extends com.wiris.plugin.dispatchers.MainServlet {

    @Override
    public void doGet(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response)
            throws ServletException, IOException {
        PluginBuilder pb = newPluginBuilder(request);
        String origin = request.getHeader("origin");
        HttpResponse res = new HttpResponse(response);
        pb.addCorsHeaders(res, origin);
        String pathInfo = request.getServletPath();
        if (pathInfo.equals("/mathml2latex")) {
            response.setContentType("text/plain; charset=utf-8");
            ParamsProvider provider = pb.getCustomParamsProvider();
            String mml = provider.getParameter("mml", (String)null);
            String r = pb.newTextService().mathml2latex(mml);
            PrintWriter out = response.getWriter();
            out.print(r);
            out.close();
        }

js里,调用这个服务:

var _wrs_mathmlCache = {};
function wrs_getLatexFromMathML(mml) {
    if (_wrs_mathmlCache.hasOwnProperty(mml)) {
        return _wrs_mathmlCache[mml];
    }
    var data = {
        'service': 'mathml2latex',
        'mml': mml
    };

    var latex = wrs_getContent(_wrs_conf_servicePath, data);
    // Populate LatexCache.
    if (!_wrs_mathmlCache.hasOwnProperty(mml)) {
        _wrs_mathmlCache[mml] = latex;
    }
    return latex.split("\r").join('').split("\n").join(' ');
}

wrs_getLatexFromMathML只能将一个mathml转换为latex,对于编辑器里的内容来说,需要将mathML抽取出来逐一转换:

function wrs_parseRawMathmlToLatex(content, characters){
    var output = '';
    var mathTagBegin = characters.tagOpener + 'math';
    var mathTagEnd = characters.tagOpener + '/math' + characters.tagCloser;
    var start = content.indexOf(mathTagBegin);
    var end = 0;
    var mathml, startAnnotation, closeAnnotation;

    while (start != -1) {
        output += content.substring(end, start);
        end = content.indexOf(mathTagEnd, start);

        if (end == -1) {
            end = content.length - 1;
        }
        else {
            end += mathTagEnd.length;
        }

        mathml = content.substring(start, end);

        output += wrs_getLatexFromMathML(mathml);

        start = content.indexOf(mathTagBegin, end);
    }

    output += content.substring(end, content.length);
    return output;
}
function wrs_getLatex(code) {
    return wrs_parseRawMathmlToLatex(code, _wrs_xmlCharacters);
}

末了,为了方便获取,可以将latex放到_current_latex变量里:

    // 获取数据editor.on('getData', function (e) {    e.data.dataValue = wrs_endParse(e.data.dataValue || "");    _current_latex = wrs_getLatex(e.data.dataValue || "");});

再简单修改下网页,显示latex:
enter description here

收官!