2017年1月

转拼音多音字的处理

背景

汉字转拼音五笔 最开始的时候选择了粗暴简单的方法,就是在遇到多音字的时候,直接取第一个读音;但是后来同事使用的时候发现多音字的转换效果太差了,于是进行了改造;刚开始的时候使用的php-jieba,但是php在每次request的时候都需要去加载jieba的词库,极其低效;所以选择了使用python来实现逻辑,php通过thrift来调用python的服务

处理流程

piyin.png

Java RPC中的反射 (Server)

每次根据方法名来反射获取Method的成本太大,所以在bean初始化的时候,就将该服务下interface的方法都放到HashMap里面

用来测试的interface

public interface TestApi {

    List<Integer> listIds();
    long convertId(long id);

}

扫描interface的方法

    private <T> Map<String, Method> initMethodMap(Class<T> clz) {
        Map<String, Method> methodMap = new HashMap<>();
        Method[] methods = clz.getMethods();

        for (Method method : methods) {
            String methodDesc = ReflectUtil.getMethodDesc(method);
            methodMap.put(methodDesc, method);
        }
        return methodMap;
    }

参数desc获取

class ReflectUtil {

    private static final String PARAM_SPLIT = ",";
    private static final String EMPTY_PARAM = "void";

    public static String getMethodParamDesc(Method method) {
        if (method.getParameterTypes() == null || method.getParameterTypes().length == 0) {
            return EMPTY_PARAM;
        }

        StringBuilder builder = new StringBuilder();

        Class<?>[] clzs = method.getParameterTypes();

        for (Class<?> clz : clzs) {
            String className = getName(clz);
            builder.append(className).append(PARAM_SPLIT);
        }

        return builder.substring(0, builder.length() - 1);
    }

    private static String getName(Class<?> clz) {
        if (!clz.isArray()) {
            return clz.getName();
        }

        StringBuilder sb = new StringBuilder();
        sb.append(clz.getName());
        while (clz.isArray()) {
            sb.append("[]");
            clz = clz.getComponentType();
        }

        return sb.toString();
    }

}

注:代码摘自 weibo 的 rpc 框架 motan; 并有部分修改

Java RPC中的代理 (Client)

调用图

rpc.png

Client实现

远程接口定义

public interface XxxApi {
    boolean remoteMethod();
}

Proxy工厂

public final class ProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clz, InvocationHandler invocationHandler) {
        return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {clz}, invocationHandler);
    }

}

函数返回值基本类型的默认值

public final class PrimitiveDefault {
    private static boolean defaultBoolean;
    private static char defaultChar;
    private static byte defaultByte;
    private static short defaultShort;
    private static int defaultInt;
    private static long defaultLong;
    private static float defaultFloat;
    private static double defaultDouble;
    private static Map<Class<?>, Object> primitiveValues = new HashMap<Class<?>, Object>();

    static {
        primitiveValues.put(boolean.class, defaultBoolean);
        primitiveValues.put(char.class, defaultChar);
        primitiveValues.put(byte.class, defaultByte);
        primitiveValues.put(short.class, defaultShort);
        primitiveValues.put(int.class, defaultInt);
        primitiveValues.put(long.class, defaultLong);
        primitiveValues.put(float.class, defaultFloat);
        primitiveValues.put(double.class, defaultDouble);
    }

    public static Object getDefaultReturnValue(Class<?> returnType) {
        return primitiveValues.get(returnType);
    }
}

Client调用

public final class Runner {

    private static final Logger logger = LoggerFactory.getLogger(Runner.class);

    public static void main(String[] ignore) {
        ProxyFactory proxyFactory = new ProxyFactory();

        XxxApi xxxApi = proxyFactory.getProxy(XxxApi.class, (proxy, method, args) -> {
            // 判断method是否定义过 todo
            logger.info("{} {}", method, args);
            // 产生1个默认值
            Class<?> returnType = method.getReturnType();
            if (returnType != null && returnType.isPrimitive()) {
                return PrimitiveDefault.getDefaultReturnValue(returnType);
            }
            return null;
        });

        xxxApi.remoteMethod();
    }

}

消费生产模式扫表

背景

有一批数据需要导入到ElasticSearch中,但是写ElasticSearch的速度比较慢,需要采用多线程的方式,但是在每个线程中都扫表,会产生重复的数据段,所以采用生产消费的模型来解决该问题 (为什么不直接选择线程池?线程池提交是异步的,一般table中的数据量都比较大,很容易塞爆内存)

流程图

produce-consume.png

  1. 由生产者进行扫表,每次取出一批的数据(如:500条)
  2. 将500条数据放入java的Queue中
  3. 多个生产者来消费这个Queue
  4. 当生产者结束扫表,或者外部中断扫表的时候,中断消费者

中断消费者的方式,往Queue中扔入一个毒药对象,当消费者获取到毒药对象时,停止消费,并将毒药对象塞回Queue,用于停止其他消费者

功能点

  1. 开始扫表
  2. 暂停扫表
  3. 结束扫表
  4. 数据扫表状态
  5. 恢复扫表(支持指定offset)

实现

Producer

public class Producer implements Runnable {

    private final SynchronousQueue<List<Long>> queue;

    private volatile boolean running = true;

    public Producer(SynchronousQueue<List<Long>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        long lastId = 0L;
        int batchSize = 500;
        while (running) {
            // select * from my_table where id > ${lastId} order by id asc limit ${batchSize};
            List<Long> ids = new ArrayList<>(); // 自行实现上面的查询
            if (CollectionUtils.isEmpty(ids)) {
                putQueueQuite(Context.poison);
                break;
            }
            putQueueQuite(ids);
            lastId = Collections.max(ids);
            if (ids.size() < batchSize) {
                putQueueQuite(Context.poison);
                break;
            }
        }
        // throw poison
    }

    private void putQueueQuite(List<Long> pill) {
        try {
            queue.put(pill);
        } catch (InterruptedException e) {
            // ignore
        }
    }

}

Consumer

public class Consumer implements Runnable {

    private final SynchronousQueue<List<Long>> queue;

    public Consumer(SynchronousQueue<List<Long>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                List<Long> ids = queue.take();
                if (ids == Context.poison) {
                    queue.put(Context.poison);
                    break;
                }
                // do something
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }
}

Context

public class Context {

    public static final List<Long> poison = new ArrayList<>();

}

Runner

public class Runner {
    public static void main(String[] args) {
        int maxThreads = 10;
        int consumerThreads = 3;
        ExecutorService executorService = Executors.newFixedThreadPool(maxThreads);
        SynchronousQueue<List<Long>> queue = new SynchronousQueue<>();
        executorService.submit(new Producer(queue));
        for (int i = 0; i < consumerThreads; i++) {
            executorService.submit(new Consumer(queue));
        }
    }
}

功能点的控制,自己实现就好了

订单号的生成规则

背景

防止订单Id号泄露每日流水,暴露商业机密;需要对订单Id号进行相应的处理,但是订单号的生成又需要满足以下条件

  1. 唯一性
  2. 语义性
  3. 考虑分库分表的情况能快速路由到相应的表
  4. 长度

大厂的生成策略

#平台rule来源
1大众点评时间戳+用户标识码+随机数大众点评订单系统分库分表实践
2美团团购单表自增Id * 100 + 买家Id后2位美团团购订单系统优化记
3淘宝发号器Id + 买家Id后4位淘宝在线交易数据演变

其他策略: 生产乱序码和真实的orderId关联

发号器

MTDDL——美团点评分布式数据访问层中间件

Leaf整体架构.png

id和code的转换(Base62)

id.png

版本加密解密的实现

package lib

import (
    "fmt"
    "strconv"
)

func Id2Code(id int, version byte) string {
    var code string = ""
    if version == '1' {
        number := id
        for {
            remain := number % 10000000
            str := Base62Encode(remain)
            code = str + code
            number = number / 10000000
            if number == 0 {
                break
            }
        }
        code = string(version) + code
    }
    return code
}

func Code2Id(code string) int {
    version := code[0]
    code = code[1:]
    if version == '1' {
        var buffer string = ""
        for i := len(code); i > 0; i -= 4 {
            start := i - 4
            if start < 0 {
                start = 0
            }
            seg := code[start:i]
            segId := Base62Decode(seg)
            // 大于7位非法
            if segId >= 100000000 {
                return 0
            }
            buffer += fmt.Sprintf("%07d", Base62Decode(seg))
        }
        result, _ := strconv.Atoi(buffer)
        return result
    }
    return 0
}

Base62加密解密的实现

package lib

import (
    "math"
    "bytes"
)

const dictLength = 62

var dict []byte = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'}

func Base62Encode(id int) string {
    result := make([]byte, 0)
    number := id
    for number > 0 {
        round := number / dictLength
        remain := number % dictLength
        result = append([]byte{dict[remain]}, result...)
        number = round
    }
    return string(result)
}

func Base62Decode(code string) int {
    var result int = 0
    codeLength := len(code)
    for i, c := range []byte(code) {
        result += bytes.IndexByte(dict, c) * int(math.Pow(dictLength, float64(codeLength - 1 - i)))
    }
    return result
}

建一个socks5代理集群

背景

由于在线工具中部分工具有翻墙的需求,而又没有找到一个稳定的翻墙方案(支持集群),在此背景下产生了这个项目。

架构图

arch.png

功能点

  1. 多账户支持, socks5密码验证
  2. 可用性,集群保证
  3. 流量计算,阈值限制
  4. Custom DNS, 可进行域名白名单的控制
  5. 安全,防止暴力破解socks5密码

表设计

CREATE TABLE `pre_accounts` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `accountname` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `password` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `used` int(10) unsigned NOT NULL DEFAULT '0',
  `max` int(10) unsigned NOT NULL DEFAULT '0',
  `created_at` datetime NOT NULL,
  `updated_at` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_accountname` (`accountname`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

Socks5的基础知识

这里就不重复写了,链接到 一个简单的Golang实现的Socks5 Proxy

功能点的实现

多账户支持, socks5密码验证

实现 github.com/armon/go-socks5/credentials.go CredentialStore 接口,进行校验;注意:每次proxy都会进行验证,如果请求量比较大,自行选择缓存方式

可用性,集群保证

每台socks5 server心跳上报到etcd,通过阈值,判断socks5的存活

流量计算,阈值限制

github.com/armon/go-socks5/request.go:358 修改这个函数的调用函数

安全,防止暴力破解socks5密码

使用 ratelimit 来保证,如果错误次数过多,则直接将IP加入黑名单;原理可以看之前写过的一篇文章 使用 redis 做限流