首页 文章 精选 留言 我的

精选列表

搜索[学习],共10000篇文章
优秀的个人博客,低调大师

Storm概念学习系列之storm-starter项目(完整版)(博主推荐)

这是书籍《从零开始学Storm》赵必厦 2014年出版的配套代码! storm-starter项目包含使用storm的各种各样的例子。项目托管在GitHub上面,其网址为:http://github.com/nathanmarz/storm-starter 或者 storm-starter项目的包结构: storm-starter项目的拓扑结构: 新建maven项目的方式 以“新建Maven项目的方式”导入storm-starter项目的步骤如下: 1、新建一个Maven项目,项目名称可以随意,如storm-starter。 2、把storm-starter项目根目录的src\jvm目录中的全部文件复制到Maven项目的src/main/java目录下。 storm-starter-master\src\jvm\storm\starter下的BasicDRPCTopology.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a * "!" to any string you send the DRPC function. * <p/> * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of * Storm. */ public class BasicDRPCTopology { public static class ExclaimBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[]{ "hello", "goodbye" }) { System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } } storm-starter-master\src\jvm\storm\starter下的的ExclamationTopology.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; /** * This is a basic example of a Storm topology. */ public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } } storm-starter-master\src\jvm\storm\starter下的ManualDRPC.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.ReturnResults; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class ManualDRPC { public static class ExclamationBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result", "return-info")); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String arg = tuple.getString(0); Object retInfo = tuple.getValue(1); collector.emit(new Values(arg + "!!!", retInfo)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); LocalDRPC drpc = new LocalDRPC(); DRPCSpout spout = new DRPCSpout("exclamation", drpc); builder.setSpout("drpc", spout); builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim"); LocalCluster cluster = new LocalCluster(); Config conf = new Config(); cluster.submitTopology("exclaim", conf, builder.createTopology()); System.out.println(drpc.execute("exclamation", "aaa")); System.out.println(drpc.execute("exclamation", "bbb")); } } storm-starter-master\src\jvm\storm\starter下的PrintSampleStream.java /* // to use this example, uncomment the twitter4j dependency information in the project.clj, // uncomment storm.starter.spout.TwitterSampleSpout, and uncomment this class package storm.starter; import storm.starter.spout.TwitterSampleSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import storm.starter.bolt.PrinterBolt; public class PrintSampleStream { public static void main(String[] args) { String username = args[0]; String pwd = args[1]; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new TwitterSampleSpout(username, pwd)); builder.setBolt("print", new PrinterBolt()) .shuffleGrouping("spout"); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.shutdown(); } } */ storm-starter-master\src\jvm\storm\starter下的ReachTopology.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.*; /** * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation. * <p/> * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower * records. * <p/> * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes * minutes on a single machine into one that takes just a couple seconds. * <p/> * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps. * <p/> * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC. */ public class ReachTopology { public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); }}; public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); put("tim", Arrays.asList("alex")); put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); put("adam", Arrays.asList("david", "carissa")); put("mike", Arrays.asList("john", "bob")); put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); }}; public static class GetTweeters extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { Object id = tuple.getValue(0); String url = tuple.getString(1); List<String> tweeters = TWEETERS_DB.get(url); if (tweeters != null) { for (String tweeter : tweeters) { collector.emit(new Values(id, tweeter)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "tweeter")); } } public static class GetFollowers extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { Object id = tuple.getValue(0); String tweeter = tuple.getString(1); List<String> followers = FOLLOWERS_DB.get(tweeter); if (followers != null) { for (String follower : followers) { collector.emit(new Values(id, follower)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "follower")); } } public static class PartialUniquer extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Set<String> _followers = new HashSet<String>(); @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _followers.add(tuple.getString(1)); } @Override public void finishBatch() { _collector.emit(new Values(_id, _followers.size())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "partial-count")); } } public static class CountAggregator extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _count += tuple.getInteger(1); } @Override public void finishBatch() { _collector.emit(new Values(_id, _count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "reach")); } } public static LinearDRPCTopologyBuilder construct() { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 4); builder.addBolt(new GetFollowers(), 12).shuffleGrouping(); builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id")); return builder; } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = construct(); Config conf = new Config(); if (args == null || args.length == 0) { conf.setMaxTaskParallelism(3); LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc)); String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" }; for (String url : urlsToTry) { System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(6); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } } storm-starter-master\src\jvm\storm\starter下的RollingTopWords.java package storm.starter; import backtype.storm.Config; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import storm.starter.bolt.IntermediateRankingsBolt; import storm.starter.bolt.RollingCountBolt; import storm.starter.bolt.TotalRankingsBolt; import storm.starter.util.StormRunner; /** * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things * like trending topics or trending images on Twitter. */ public class RollingTopWords { private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; private static final int TOP_N = 5; private final TopologyBuilder builder; private final String topologyName; private final Config topologyConfig; private final int runtimeInSeconds; public RollingTopWords() throws InterruptedException { builder = new TopologyBuilder(); topologyName = "slidingWindowCounts"; topologyConfig = createTopologyConfiguration(); runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; wireTopology(); } private static Config createTopologyConfiguration() { Config conf = new Config(); conf.setDebug(true); return conf; } private void wireTopology() throws InterruptedException { String spoutId = "wordGenerator"; String counterId = "counter"; String intermediateRankerId = "intermediateRanker"; String totalRankerId = "finalRanker"; builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields( "obj")); builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); } public void run() throws InterruptedException { StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); } public static void main(String[] args) throws Exception { new RollingTopWords().run(); } } storm-starter-master\src\jvm\storm\starter下的SingleJoinExample.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.testing.FeederSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import storm.starter.bolt.SingleJoinBolt; public class SingleJoinExample { public static void main(String[] args) { FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout); builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) .fieldsGrouping("age", new Fields("id")); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("join-example", conf, builder.createTopology()); for (int i = 0; i < 10; i++) { String gender; if (i % 2 == 0) { gender = "male"; } else { gender = "female"; } genderSpout.feed(new Values(i, gender)); } for (int i = 9; i >= 0; i--) { ageSpout.feed(new Values(i, i + 20)); } Utils.sleep(2000); cluster.shutdown(); } } storm-starter-master\src\jvm\storm\starter下的TransactionalGlobalCount.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This * class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies */ public class TransactionalGlobalCount { public static final int PARTITION_TAKE_PER_BATCH = 3; public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ put(0, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("chicken")); add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); }}); put(1, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); add(new Values("banana")); }}); put(2, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); }}); }}; public static class Value { int count = 0; BigInteger txid; } public static Map<String, Value> DATABASE = new HashMap<String, Value>(); public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; public static class BatchCount extends BaseBatchBolt { Object _id; BatchOutputCollector _collector; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _count++; } @Override public void finishBatch() { _collector.emit(new Values(_id, _count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "count")); } } public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter { TransactionAttempt _attempt; BatchOutputCollector _collector; int _sum = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override public void execute(Tuple tuple) { _sum += tuple.getInteger(1); } @Override public void finishBatch() { Value val = DATABASE.get(GLOBAL_COUNT_KEY); Value newval; if (val == null || !val.txid.equals(_attempt.getTransactionId())) { newval = new Value(); newval.txid = _attempt.getTransactionId(); if (val == null) { newval.count = _sum; } else { newval.count = _sum + val.count; } DATABASE.put(GLOBAL_COUNT_KEY, newval); } else { newval = val; } _collector.emit(new Values(_attempt, newval.count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "sum")); } } public static void main(String[] args) throws Exception { MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug(true); config.setMaxSpoutPending(3); cluster.submitTopology("global-count-topology", config, builder.buildTopology()); Thread.sleep(3000); cluster.shutdown(); } } storm-starter-master\src\jvm\storm\starter下的TransactionalWords.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a * stream of words and produces two outputs: * <p/> * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on. * <p/> * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move * between buckets as their counts accumulate. */ public class TransactionalWords { public static class CountValue { Integer prev_count = null; int count = 0; BigInteger txid = null; } public static class BucketValue { int count = 0; BigInteger txid; } public static final int BUCKET_SIZE = 10; public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>(); public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>(); public static final int PARTITION_TAKE_PER_BATCH = 3; public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ put(0, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("chicken")); add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); }}); put(1, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); add(new Values("banana")); }}); put(2, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); }}); }}; public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter { Map<String, Integer> _counts = new HashMap<String, Integer>(); BatchOutputCollector _collector; TransactionAttempt _id; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { String key = tuple.getString(1); Integer curr = _counts.get(key); if (curr == null) curr = 0; _counts.put(key, curr + 1); } @Override public void finishBatch() { for (String key : _counts.keySet()) { CountValue val = COUNT_DATABASE.get(key); CountValue newVal; if (val == null || !val.txid.equals(_id)) { newVal = new CountValue(); newVal.txid = _id.getTransactionId(); if (val != null) { newVal.prev_count = val.count; newVal.count = val.count; } newVal.count = newVal.count + _counts.get(key); COUNT_DATABASE.put(key, newVal); } else { newVal = val; } _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "key", "count", "prev-count")); } } public static class Bucketize extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); int curr = tuple.getInteger(2); Integer prev = tuple.getInteger(3); int currBucket = curr / BUCKET_SIZE; Integer prevBucket = null; if (prev != null) { prevBucket = prev / BUCKET_SIZE; } if (prevBucket == null) { collector.emit(new Values(attempt, currBucket, 1)); } else if (currBucket != prevBucket) { collector.emit(new Values(attempt, currBucket, 1)); collector.emit(new Values(attempt, prevBucket, -1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("attempt", "bucket", "delta")); } } public static class BucketCountUpdater extends BaseTransactionalBolt { Map<Integer, Integer> _accum = new HashMap<Integer, Integer>(); BatchOutputCollector _collector; TransactionAttempt _attempt; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override public void execute(Tuple tuple) { Integer bucket = tuple.getInteger(1); Integer delta = tuple.getInteger(2); Integer curr = _accum.get(bucket); if (curr == null) curr = 0; _accum.put(bucket, curr + delta); } @Override public void finishBatch() { for (Integer bucket : _accum.keySet()) { BucketValue currVal = BUCKET_DATABASE.get(bucket); BucketValue newVal; if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) { newVal = new BucketValue(); newVal.txid = _attempt.getTransactionId(); newVal.count = _accum.get(bucket); if (currVal != null) newVal.count += currVal.count; BUCKET_DATABASE.put(bucket, newVal); } else { newVal = currVal; } _collector.emit(new Values(_attempt, bucket, newVal.count)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "bucket", "count")); } } public static void main(String[] args) throws Exception { MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("bucketize", new Bucketize()).noneGrouping("count"); builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket")); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug(true); config.setMaxSpoutPending(3); cluster.submitTopology("top-n-topology", config, builder.buildTopology()); Thread.sleep(3000); cluster.shutdown(); } } storm-starter-master\src\jvm\storm\starter下的WordCountTopology.java package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout; import java.util.HashMap; import java.util.Map; /** * This topology demonstrates Storm's stream groupings and multilang capabilities. */ public class WordCountTopology { public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } } storm-starter-master\src\jvm\storm\starter\spout的RandomSentenceSpout.java package storm.starter.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } storm-starter-master\src\jvm\storm\starter\spout的TwitterSampleSpout.java /* package storm.starter.spout; import backtype.storm.Config; import twitter4j.conf.ConfigurationBuilder; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String _username; String _pwd; public TwitterSampleSpout(String username, String pwd) { _username = username; _pwd = pwd; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) { } @Override public void onTrackLimitationNotice(int i) { } @Override public void onScrubGeo(long l, long l1) { } @Override public void onException(Exception e) { } }; TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build()); _twitterStream = fact.getInstance(); _twitterStream.addListener(listener); _twitterStream.sample(); } @Override public void nextTuple() { Status ret = queue.poll(); if(ret==null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } } */ storm-starter-master\src\jvm\storm\starter\bolt的AbstractRankerBolt.java package storm.starter.bolt; import backtype.storm.Config; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; import storm.starter.util.TupleHelpers; import java.util.HashMap; import java.util.Map; /** * This abstract bolt provides the basic behavior of bolts that rank objects according to their count. * <p/> * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those * tuples are retrieved and counted. */ public abstract class AbstractRankerBolt extends BaseBasicBolt { private static final long serialVersionUID = 4931640198501530202L; private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; private static final int DEFAULT_COUNT = 10; private final int emitFrequencyInSeconds; private final int count; private final Rankings rankings; public AbstractRankerBolt() { this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } public AbstractRankerBolt(int topN) { this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { if (topN < 1) { throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")"); } if (emitFrequencyInSeconds < 1) { throw new IllegalArgumentException( "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)"); } count = topN; this.emitFrequencyInSeconds = emitFrequencyInSeconds; rankings = new Rankings(count); } protected Rankings getRankings() { return rankings; } /** * This method functions as a template method (design pattern). */ @Override public final void execute(Tuple tuple, BasicOutputCollector collector) { if (TupleHelpers.isTickTuple(tuple)) { getLogger().debug("Received tick tuple, triggering emit of current rankings"); emitRankings(collector); } else { updateRankingsWithTuple(tuple); } } abstract void updateRankingsWithTuple(Tuple tuple); private void emitRankings(BasicOutputCollector collector) { collector.emit(new Values(rankings.copy())); getLogger().debug("Rankings: " + rankings); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rankings")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; } abstract Logger getLogger(); } storm-starter-master\src\jvm\storm\starter\bolt的IntermediateRankingsBolt.java package storm.starter.bolt; import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankable; import storm.starter.tools.RankableObjectWithFields; /** * This bolt ranks incoming objects by their count. * <p/> * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1, * additionalField2, ..., additionalFieldN). */ public final class IntermediateRankingsBolt extends AbstractRankerBolt { private static final long serialVersionUID = -1369800530256637409L; private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class); public IntermediateRankingsBolt() { super(); } public IntermediateRankingsBolt(int topN) { super(topN); } public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { super(topN, emitFrequencyInSeconds); } @Override void updateRankingsWithTuple(Tuple tuple) { Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable); } @Override Logger getLogger() { return LOG; } } storm-starter-master\src\jvm\storm\starter\bolt的PrinterBolt.java package storm.starter.bolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } } storm-starter-master\src\jvm\storm\starter\bolt的RollingCountBolt.java package storm.starter.bolt; import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; import storm.starter.util.TupleHelpers; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; /** * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting. * <p/> * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every * minute. * <p/> * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the * actual duration of the sliding window. The latter is included in case the expected sliding window length (as * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual * window length is tracked and calculated for the window, and not individually for each object within a window. * <p/> * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning * during the first ~ five minutes of startup time if the window length is set to five minutes). */ public class RollingCountBolt extends BaseRichBolt { private static final long serialVersionUID = 5537727428628598519L; private static final Logger LOG = Logger.getLogger(RollingCountBolt.class); private static final int NUM_WINDOW_CHUNKS = 5; private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; private static final String WINDOW_LENGTH_WARNING_TEMPLATE = "Actual window length is %d seconds when it should be %d seconds" + " (you can safely ignore this warning during the startup phase)"; private final SlidingWindowCounter<Object> counter; private final int windowLengthInSeconds; private final int emitFrequencyInSeconds; private OutputCollector collector; private NthLastModifiedTimeTracker lastModifiedTracker; public RollingCountBolt() { this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { this.windowLengthInSeconds = windowLengthInSeconds; this.emitFrequencyInSeconds = emitFrequencyInSeconds; counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); } private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { return windowLengthInSeconds / windowUpdateFrequencyInSeconds; } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); } @Override public void execute(Tuple tuple) { if (TupleHelpers.isTickTuple(tuple)) { LOG.debug("Received tick tuple, triggering emit of current window counts"); emitCurrentWindowCounts(); } else { countObjAndAck(tuple); } } private void emitCurrentWindowCounts() { Map<Object, Long> counts = counter.getCountsThenAdvanceWindow(); int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); lastModifiedTracker.markAsModified(); if (actualWindowLengthInSeconds != windowLengthInSeconds) { LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); } emit(counts, actualWindowLengthInSeconds); } private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) { for (Entry<Object, Long> entry : counts.entrySet()) { Object obj = entry.getKey(); Long count = entry.getValue(); collector.emit(new Values(obj, count, actualWindowLengthInSeconds)); } } private void countObjAndAck(Tuple tuple) { Object obj = tuple.getValue(0); counter.incrementCount(obj); collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; } } storm-starter-master\src\jvm\storm\starter\bolt的SingleJoinBolt.java package storm.starter.bolt; import backtype.storm.Config; import backtype.storm.generated.GlobalStreamId; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.utils.TimeCacheMap; import java.util.*; public class SingleJoinBolt extends BaseRichBolt { OutputCollector _collector; Fields _idFields; Fields _outFields; int _numSources; TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending; Map<String, GlobalStreamId> _fieldLocations; public SingleJoinBolt(Fields outFields) { _outFields = outFields; } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _fieldLocations = new HashMap<String, GlobalStreamId>(); _collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); _numSources = context.getThisSources().size(); Set<String> idFields = null; for (GlobalStreamId source : context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); Set<String> setFields = new HashSet<String>(fields.toList()); if (idFields == null) idFields = setFields; else idFields.retainAll(setFields); for (String outfield : _outFields) { for (String sourcefield : fields) { if (outfield.equals(sourcefield)) { _fieldLocations.put(outfield, source); } } } } _idFields = new Fields(new ArrayList<String>(idFields)); if (_fieldLocations.size() != _outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); } } @Override public void execute(Tuple tuple) { List<Object> id = tuple.select(_idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); if (!_pending.containsKey(id)) { _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); } Map<GlobalStreamId, Tuple> parts = _pending.get(id); if (parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice"); parts.put(streamId, tuple); if (parts.size() == _numSources) { _pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); for (String outField : _outFields) { GlobalStreamId loc = _fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); } _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); for (Tuple part : parts.values()) { _collector.ack(part); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_outFields); } private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { @Override public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { for (Tuple tuple : tuples.values()) { _collector.fail(tuple); } } } } storm-starter-master\src\jvm\storm\starter\bolt的TotalRankingsBolt.java package storm.starter.bolt; import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; /** * This bolt merges incoming {@link Rankings}. * <p/> * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final, * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}. */ public final class TotalRankingsBolt extends AbstractRankerBolt { private static final long serialVersionUID = -8447525895532302198L; private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class); public TotalRankingsBolt() { super(); } public TotalRankingsBolt(int topN) { super(topN); } public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) { super(topN, emitFrequencyInSeconds); } @Override void updateRankingsWithTuple(Tuple tuple) { Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); super.getRankings().updateWith(rankingsToBeMerged); super.getRankings().pruneZeroCounts(); } @Override Logger getLogger() { return LOG; } } storm-starter-master\src\jvm\storm\starter\tools的NthLastModifiedTimeTracker.java package storm.starter.tools; import backtype.storm.utils.Time; import org.apache.commons.collections.buffer.CircularFifoBuffer; /** * This class tracks the time-since-last-modify of a "thing" in a rolling fashion. * <p/> * For example, create a 5-slot tracker to track the five most recent time-since-last-modify. * <p/> * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just * been modified. */ public class NthLastModifiedTimeTracker { private static final int MILLIS_IN_SEC = 1000; private final CircularFifoBuffer lastModifiedTimesMillis; public NthLastModifiedTimeTracker(int numTimesToTrack) { if (numTimesToTrack < 1) { throw new IllegalArgumentException( "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); } lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); initLastModifiedTimesMillis(); } private void initLastModifiedTimesMillis() { long nowCached = now(); for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { lastModifiedTimesMillis.add(Long.valueOf(nowCached)); } } private long now() { return Time.currentTimeMillis(); } public int secondsSinceOldestModification() { long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue(); return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); } public void markAsModified() { updateLastModifiedTime(); } private void updateLastModifiedTime() { lastModifiedTimesMillis.add(now()); } } storm-starter-master\src\jvm\storm\starter\tools的Rankable.java package storm.starter.tools; public interface Rankable extends Comparable<Rankable> { Object getObject(); long getCount(); /** * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is. * * @return a defensive copy */ Rankable copy(); } storm-starter-master\src\jvm\storm\starter\tools的RankableObjectWithFields.java package storm.starter.tools; import backtype.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.List; /** * This class wraps an objects and its associated count, including any additional data fields. * <p/> * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology. */ public class RankableObjectWithFields implements Rankable, Serializable { private static final long serialVersionUID = -9102878650001058090L; private static final String toStringSeparator = "|"; private final Object obj; private final long count; private final ImmutableList<Object> fields; public RankableObjectWithFields(Object obj, long count, Object... otherFields) { if (obj == null) { throw new IllegalArgumentException("The object must not be null"); } if (count < 0) { throw new IllegalArgumentException("The count must be >= 0"); } this.obj = obj; this.count = count; fields = ImmutableList.copyOf(otherFields); } /** * Construct a new instance based on the provided {@link Tuple}. * <p/> * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}. * * @param tuple * * @return new instance based on the provided tuple */ public static RankableObjectWithFields from(Tuple tuple) { List<Object> otherFields = Lists.newArrayList(tuple.getValues()); Object obj = otherFields.remove(0); Long count = (Long) otherFields.remove(0); return new RankableObjectWithFields(obj, count, otherFields.toArray()); } public Object getObject() { return obj; } public long getCount() { return count; } /** * @return an immutable list of any additional data fields of the object (may be empty but will never be null) */ public List<Object> getFields() { return fields; } @Override public int compareTo(Rankable other) { long delta = this.getCount() - other.getCount(); if (delta > 0) { return 1; } else if (delta < 0) { return -1; } else { return 0; } } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof RankableObjectWithFields)) { return false; } RankableObjectWithFields other = (RankableObjectWithFields) o; return obj.equals(other.obj) && count == other.count; } @Override public int hashCode() { int result = 17; int countHash = (int) (count ^ (count >>> 32)); result = 31 * result + countHash; result = 31 * result + obj.hashCode(); return result; } public String toString() { StringBuffer buf = new StringBuffer(); buf.append("["); buf.append(obj); buf.append(toStringSeparator); buf.append(count); for (Object field : fields) { buf.append(toStringSeparator); buf.append(field); } buf.append("]"); return buf.toString(); } /** * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. * * @return */ @Override public Rankable copy() { List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); } } storm-starter-master\src\jvm\storm\starter\tools的Rankings.java package storm.starter.tools; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collections; import java.util.List; public class Rankings implements Serializable { private static final long serialVersionUID = -1549827195410578903L; private static final int DEFAULT_COUNT = 10; private final int maxSize; private final List<Rankable> rankedItems = Lists.newArrayList(); public Rankings() { this(DEFAULT_COUNT); } public Rankings(int topN) { if (topN < 1) { throw new IllegalArgumentException("topN must be >= 1"); } maxSize = topN; } /** * Copy constructor. * @param other */ public Rankings(Rankings other) { this(other.maxSize()); updateWith(other); } /** * @return the maximum possible number (size) of ranked objects this instance can hold */ public int maxSize() { return maxSize; } /** * @return the number (size) of ranked objects this instance is currently holding */ public int size() { return rankedItems.size(); } /** * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the * contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within * a Rankable will be defensively copied, too. * * @return a somewhat defensive copy of ranked items */ public List<Rankable> getRankings() { List<Rankable> copy = Lists.newLinkedList(); for (Rankable r: rankedItems) { copy.add(r.copy()); } return ImmutableList.copyOf(copy); } public void updateWith(Rankings other) { for (Rankable r : other.getRankings()) { updateWith(r); } } public void updateWith(Rankable r) { synchronized(rankedItems) { addOrReplace(r); rerank(); shrinkRankingsIfNeeded(); } } private void addOrReplace(Rankable r) { Integer rank = findRankOf(r); if (rank != null) { rankedItems.set(rank, r); } else { rankedItems.add(r); } } private Integer findRankOf(Rankable r) { Object tag = r.getObject(); for (int rank = 0; rank < rankedItems.size(); rank++) { Object cur = rankedItems.get(rank).getObject(); if (cur.equals(tag)) { return rank; } } return null; } private void rerank() { Collections.sort(rankedItems); Collections.reverse(rankedItems); } private void shrinkRankingsIfNeeded() { if (rankedItems.size() > maxSize) { rankedItems.remove(maxSize); } } /** * Removes ranking entries that have a count of zero. */ public void pruneZeroCounts() { int i = 0; while (i < rankedItems.size()) { if (rankedItems.get(i).getCount() == 0) { rankedItems.remove(i); } else { i++; } } } public String toString() { return rankedItems.toString(); } /** * Creates a (defensive) copy of itself. */ public Rankings copy() { return new Rankings(this); } } storm-starter-master\src\jvm\storm\starter\tools的SlidingWindowCounter.java package storm.starter.tools; import java.io.Serializable; import java.util.Map; /** * This class counts objects in a sliding window fashion. * <p/> * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads * will go to. Also, by itself this class will not advance the head slot. * <p/> * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code> * iterations, this sliding window counter will always return object counts that are equal or greater than in the * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually, * this is the desired behavior. * <p/> * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each: * <p/> * <pre> * {@code * Sliding window counts of an object X over time * * Minute (timeline): * 1 2 3 4 5 6 7 8 * * Observed counts per minute: * 1 1 1 1 0 0 0 0 * * Counts returned by counter: * 1 2 3 4 4 3 2 1 * } * </pre> * <p/> * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing * counts. * <p/> * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes, * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times * in the past five minutes", implying that it can only account for the last two of those five minutes because the * counter was not running before that time. * * @param <T> The type of those objects we want to count. */ public final class SlidingWindowCounter<T> implements Serializable { private static final long serialVersionUID = -2645063988768785810L; private SlotBasedCounter<T> objCounter; private int headSlot; private int tailSlot; private int windowLengthInSlots; public SlidingWindowCounter(int windowLengthInSlots) { if (windowLengthInSlots < 2) { throw new IllegalArgumentException( "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); } this.windowLengthInSlots = windowLengthInSlots; this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); this.headSlot = 0; this.tailSlot = slotAfter(headSlot); } public void incrementCount(T obj) { objCounter.incrementCount(obj, headSlot); } /** * Return the current (total) counts of all tracked objects, then advance the window. * <p/> * Whenever this method is called, we consider the counts of the current sliding window to be available to and * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent * objects within the next "chunk" of the sliding window. * * @return The current (total) counts of all tracked objects. */ public Map<T, Long> getCountsThenAdvanceWindow() { Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); advanceHead(); return counts; } private void advanceHead() { headSlot = tailSlot; tailSlot = slotAfter(tailSlot); } private int slotAfter(int slot) { return (slot + 1) % windowLengthInSlots; } } storm-starter-master\src\jvm\storm\starter\tools的SlotBasedCounter.java package storm.starter.tools; import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * This class provides per-slot counts of the occurrences of objects. * <p/> * It can be used, for instance, as a building block for implementing sliding window counting of objects. * * @param <T> The type of those objects we want to count. */ public final class SlotBasedCounter<T> implements Serializable { private static final long serialVersionUID = 4858185737378394432L; private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); private final int numSlots; public SlotBasedCounter(int numSlots) { if (numSlots <= 0) { throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); } this.numSlots = numSlots; } public void incrementCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { counts = new long[this.numSlots]; objToCounts.put(obj, counts); } counts[slot]++; } public long getCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { return 0; } else { return counts[slot]; } } public Map<T, Long> getCounts() { Map<T, Long> result = new HashMap<T, Long>(); for (T obj : objToCounts.keySet()) { result.put(obj, computeTotalCount(obj)); } return result; } private long computeTotalCount(T obj) { long[] curr = objToCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; } /** * Reset the slot count of any tracked objects to zero for the given slot. * * @param slot */ public void wipeSlot(int slot) { for (T obj : objToCounts.keySet()) { resetSlotCountToZero(obj, slot); } } private void resetSlotCountToZero(T obj, int slot) { long[] counts = objToCounts.get(obj); counts[slot] = 0; } private boolean shouldBeRemovedFromCounter(T obj) { return computeTotalCount(obj) == 0; } /** * Remove any object from the counter whose total count is zero (to free up memory). */ public void wipeZeros() { Set<T> objToBeRemoved = new HashSet<T>(); for (T obj : objToCounts.keySet()) { if (shouldBeRemovedFromCounter(obj)) { objToBeRemoved.add(obj); } } for (T obj : objToBeRemoved) { objToCounts.remove(obj); } } } storm-starter-master\src\jvm\storm\starter\trident的TridentReach.java package storm.starter.trident; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.generated.StormTopology; import backtype.storm.task.IMetricsContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.CombinerAggregator; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.state.ReadOnlyState; import storm.trident.state.State; import storm.trident.state.StateFactory; import storm.trident.state.map.ReadOnlyMapState; import storm.trident.tuple.TridentTuple; import java.util.*; public class TridentReach { public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); }}; public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); put("tim", Arrays.asList("alex")); put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); put("adam", Arrays.asList("david", "carissa")); put("mike", Arrays.asList("john", "bob")); put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); }}; public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { public static class Factory implements StateFactory { Map _map; public Factory(Map map) { _map = map; } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return new StaticSingleKeyMapState(_map); } } Map _map; public StaticSingleKeyMapState(Map map) { _map = map; } @Override public List<Object> multiGet(List<List<Object>> keys) { List<Object> ret = new ArrayList(); for (List<Object> key : keys) { Object singleKey = key.get(0); ret.add(_map.get(singleKey)); } return ret; } } public static class One implements CombinerAggregator<Integer> { @Override public Integer init(TridentTuple tuple) { return 1; } @Override public Integer combine(Integer val1, Integer val2) { return 1; } @Override public Integer zero() { return 1; } } public static class ExpandList extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { List l = (List) tuple.getValue(0); if (l != null) { for (Object o : l) { collector.emit(new Values(o)); } } } } public static StormTopology buildTopology(LocalDRPC drpc) { TridentTopology topology = new TridentTopology(); TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB)); TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)); topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields( "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery( tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"), new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields( "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach")); return topology.build(); } public static void main(String[] args) throws Exception { LocalDRPC drpc = new LocalDRPC(); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("reach", conf, buildTopology(drpc)); Thread.sleep(2000); System.out.println("REACH: " + drpc.execute("reach", "aaa")); System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1")); System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5")); cluster.shutdown(); drpc.shutdown(); } } storm-starter-master\src\jvm\storm\starter\trident的TridentWordCount.java package storm.starter.trident; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple; public class TridentWordCount { public static class Split extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } } public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16); topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields( "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum")); return topology.build(); } public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); for (int i = 0; i < 100; i++) { System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); Thread.sleep(1000); } } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, buildTopology(null)); } } } storm-starter-master\src\jvm\storm\starter\util的StormRunner.java package storm.starter.util; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; public final class StormRunner { private static final int MILLIS_IN_SEC = 1000; private StormRunner() { } public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } } storm-starter-master\src\jvm\storm\starter\util的TupleHelpers.java package storm.starter.util; import backtype.storm.Constants; import backtype.storm.tuple.Tuple; public final class TupleHelpers { private TupleHelpers() { } public static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals( Constants.SYSTEM_TICK_STREAM_ID); } } 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5989195.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop HDFS概念学习系列之初步掌握HDFS的架构及原理4(四)

HDFS 副本存放策略 namenode 如何选择在哪个 datanode 存储副本(replication)?这里需要对可靠性、写入带宽和读取带宽进行权衡。Hadoop 对 datanode 存储副本有自己的副本策略,在其发展过程中一共有两个版本的副本策略,分别如下所示。 Hadoop 0.17之前的副本策略 第一个副本:存储在同机架的不同节点上。 第二个副本:存储在同机架的另外一个节点上。 第三个副本:存储在不同机架的另外一个节点。 其它副本:选择随机存储。 Hadoop 0.17 之后的副本策略 第一个副本:存储在同 Client 相同节点上。 第二个副本:存储在不同机架的节点上。 第三个副本:存储在第二个副本机架中的另外一个节点上。 其它副本:选择随机存储。 注意: 比如,一个10M的数据文件,进来被切分很多个Block,每个Block都有3个副本。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5080357.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

《从零开始学Swift》学习笔记(Day 15)——请注意数字类型之间的转换

在C、Objective-C和Java等其他语言中,整型之间有两种转换方法: 从小范围数到大范围数转换是自动的; 从大范围数到小范围数需要强制类型转换,有可能造成数据精度的丢失。 而在Swift中这两种方法是行不通的,需要通过一些函数进行显式地转换,代码如下: 1 2 3 4 5 lethistoryScore:UInt8= 90 letenglishScore:UInt16= 130 lettotalScore=historyScore+englishScore //错误 程序就会有编译错误,原因是historyScore是UInt8类型,而englishScore是UInt16类型,它们之间不能转换。 两种转换方法: 一种是把UInt8的historyScore转换为UInt16类型。由于是从小范围数转换为大范围数,这种转换是安全的。 代码: 1 2 lettotalScore=UInt16(historyScore)+englishScore // 是正确的转换方法 。 另外一种是把UInt16的englishScore转换为UInt8类型。由于是从大范围数转换为小范围数,这种转换是不安全的,如果转换的数比较大可能会造成精度的丢失。 代码: 1 2 lettotalScore=historyScore+UInt8(englishScore) // 是正确的转换方法。 本例中englishScore的值是130,这个转换是成功的,如果把这个数修改为1300,虽然程序编译没有问题,但是会在控制台中输出异常信息。 整型与浮点型之间的转换 整型与浮点型之间的转换与整型之间的转换类似: 1 2 3 4 5 6 7 8 9 lethistoryScore:Float= 90.6 letenglishScore:UInt16= 130 lettotalScore=historyScore+englishScore //错误 lettotalScore=historyScore+Float(englishScore) //正确,安全 lettotalScore=UInt16(historyScore)+englishScore //正确,小数被截掉 本文转自 tony关东升 51CTO博客,原文链接:http://blog.51cto.com/tonyguan/1746109,如需转载请自行联系原作者

优秀的个人博客,低调大师

WebSocket 学习笔记--IE,IOS,Android等设备的兼容性问题与代码实现

一、背景 公司最近准备将一套产品放到Andriod和IOS上面去,为了统一应用的开发方式,决定用各平台APP嵌套一个HTML5浏览器来实现,其中数据通信,准备使用WebSocket的方式。于是,我开始在各大浏览器上测试。 二、协议分析 2.1 WebSocket的请求包 首先把原来做Socket通信的程序拿出来,跟踪下浏览器在WebSocket应用请求服务端的时候发的数据包的内容: IE11: GET /chat HTTP/1.1 Origin: http://localhost Sec-WebSocket-Key: 98JFoEb6pMLFYhAQATn6hw== Connection: Upgrade Upgrade: Websocket Sec-WebSocket-Version: 13 User-Agent: Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko Host: 127.0.0.1:1333 Cache-Control: no-cache Cookie: s_pers=%20s_20s_nr%3D1390552565333-Repeat%7C1422088565333%3B FireFox 26.0: GET /chat HTTP/1.1 Host: 127.0.0.1:1333 User-Agent: Mozilla/5.0 (Windows NT 6.3; rv:26.0) Gecko/20100101 Firefox/26.0 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Language: zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3 Accept-Encoding: gzip, deflate Sec-WebSocket-Version: 13 Origin: http://localhost Sec-WebSocket-Key: kO4aF1Gpm1mBwOr6j30h0Q== Connection: keep-alive, Upgrade Pragma: no-cache Cache-Control: no-cache Upgrade: websocket Google Chrome 33.0.1707.0 : GET /chat HTTP/1.1 Upgrade: websocket Connection: Upgrade Host: 127.0.0.1:1333 Origin: http://192.168.137.1 Pragma: no-cache Cache-Control: no-cache Sec-WebSocket-Key: NvxdeWLLsLXkt5DirLJ1yA== Sec-WebSocket-Version: 13 Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits, x-webkit-deflate-frame User-Agent: Mozilla/5.0 (Windows NT 6.3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1707.0 Safari/537.36 Apple Safari 5.1.7(7534.57.2)Windows 版本 GET /chat HTTP/1.1 Upgrade: WebSocket Connection: Upgrade Host: 127.0.0.1:1333 Origin: http://localhost Sec-WebSocket-Key1: 25 58 510 64 > = 7 Sec-WebSocket-Key2: 13q9% 974M${fdj6 2`Qn32 �����R�q 分析了下这几种不同的浏览器,除了 Safari,其它均使用了最新的 WebSocket版本 即 Sec-WebSocket-Version:13 但是 Safrai 使用的还是老式的 WebSocket 7.5-7.6版本。 有下面一些文章,对于WebSocket的版本进行了说明: WebSocket握手总结http://www.hoverlees.com/blog/?p=1413 基于Websocket草案10协议的升级及基于Netty的握手实现 http://blog.csdn.net/fenglibing/article/details/6852497 WebSocket握手协议http://blog.163.com/liwei1987821@126/blog/static/17266492820133190211333/ 2.2,IE 对WebSocket的支持问题 这里有必要单独拿出来说,IE10以后才支持HTML5,因此也要这个版本后的浏览器才支持WebSocket,所以默认的Win7下面的浏览器都没法支持。我测试用的是Win8.1的IE11,可以支持WebSocket,效果跟FireFox、Chrome一样,但有一个恼火的问题,IE的WebSocket它会自动向服务器端发起“心跳包”,而此时服务端使用SockeAsyncEventArgs 组件,会出问题,需要客户端多次发数据之后,才会取到正确的客户端请求的数据。 另外,.NET 4.5内置支持了WebSocket,但要求操作系统是Win8或者 Server2012以上的版本。所以如果生产环境的服务器没有这么新,那么WebSocketServer只有自己写了。 2.3,IOS系统上WebSoket问题 Apple 内置的浏览器就是 Safrai,那么IOS上面的浏览器 支持的 WebSocket 版本怎么样呢 ? 找了下同事的 iPhone 4s,IOS 7.0.1 的版本 ,经过测试 ,正常,跟其它浏览器一样,但不知道其它版本的IOS下面的浏览器支持得 怎么样。这就奇怪了,为何Windows 桌面版本的Safrai 不行呢 ? 2.4,安卓上的WebSocket问题 很不幸,目前安卓最新的版本 ,内置的浏览器插件仍然不支持WebSocket,而下载的QQ浏览器等是可以支持的。但是安卓上面的 App默认使用的都是 Andriod内核的浏览器插件,因此它们没法支持WebSocket。但是,既然是系统上面运行的 APP了,为何不直接走Socket 通信方式呢?同事说是为了2个平台能够使用同一套Web应用,毕竟应用嵌套在一个浏览器里面对于开发维护还是最方便的。 所以,解决的路径还是想办法让安卓的默认浏览器插件能够支持WebSocket,查找了下资料,大概有这些资料: android怎么集成支持websocket的浏览器内核http://www.oschina.net/question/1049351_116337 在android的webview中实现websockethttp://xuepiaoqiyue.blog.51cto.com/4391594/1285791 但同事说,这些方法用过了,就是现在测试的效果,跟真正的WebSocket 兼容得不好,使用我的程序测试可以握手连接,但是解析内容上不成功。后来分析,是同事的程序对数据有特殊格式的要求,只要按照他的要求去分析,那么是可以解析得到正确的结果的。 三、WebSocket 服务端和客户端实现 最新的WebSocket 13 版本支持的服务端代码: SocketServer 对于WebSocket信息的处理: private void ProcessReceive(SocketAsyncEventArgs e) { // check if the remote host closed the connection AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { //increment the count of the total bytes receive by the server Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred); Console.WriteLine("The server has read a total of {0} bytes", m_totalBytesRead); //echo the data received back to the client //增加自定义处理 string received = System.Text.Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred); Byte[] sendBuffer = null; //IE:Upgrade: Websocket //FireFox: Upgrade: websocket //Chrome: Upgrade: websocket if (received.IndexOf("Upgrade: websocket", StringComparison.OrdinalIgnoreCase) > 0) { //Web Socket 初次连接 token.Name = "WebSocket"; Console.WriteLine("Accept WebSocket."); sendBuffer=PackHandShakeData(GetSecKeyAccetp(received)); } else { if (token.Name == "WebSocket") { string clientMsg; if (e.Offset > 0) { byte[] buffer = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, buffer, 0, buffer.Length); clientMsg = AnalyticData(buffer, buffer.Length); Console.WriteLine("--DEBUG:Web Socket Recive data offset:{0}--",e.Offset); } else { clientMsg = AnalyticData(e.Buffer, e.BytesTransferred); } Console.WriteLine("接受到客户端数据:" + clientMsg); if (!string.IsNullOrEmpty(clientMsg)) { //解析来的真正消息,执行服务器处理 //To Do // //发送数据 string sendMsg = "Hello," + clientMsg; Console.WriteLine("发送数据:“" + sendMsg + "” 至客户端...."); sendBuffer = PackData(sendMsg); } else { sendBuffer = PackData("心跳包"); } } else { Console.WriteLine("服务器接收到的数据:[{0}],将进行1000ms的处理。CurrentThread.ID:{1}", received, System.Threading.Thread.CurrentThread.ManagedThreadId); System.Threading.Thread.Sleep(1000); Console.WriteLine("线程{0} 任务处理结束,发送数据", System.Threading.Thread.CurrentThread.ManagedThreadId); // 格式化数据后发回客户端。 sendBuffer = Encoding.UTF8.GetBytes("Returning " + received); } } //设置传回客户端的缓冲区。 e.SetBuffer(sendBuffer, 0, sendBuffer.Length); //结束 bool willRaiseEvent = token.Socket.SendAsync(e); if (!willRaiseEvent) { ProcessSend(e); } } else { CloseClientSocket(e); } } 下面是一些相关的WebSocket 处理代码,包括握手、打包数据等: // <summary> /// 打包握手信息 /// <remarks> http://www.hoverlees.com/blog/?p=1413 Safari 早期版本不支持标准的version 13,握手不成功。 /// 据测试,最新的IOS 7.0 支持 /// </remarks> /// </summary> /// <param name="secKeyAccept">Sec-WebSocket-Accept</param> /// <returns>数据包</returns> private static byte[] PackHandShakeData(string secKeyAccept) { var responseBuilder = new StringBuilder(); responseBuilder.Append("HTTP/1.1 101 Switching Protocols" + Environment.NewLine); responseBuilder.Append("Upgrade: websocket" + Environment.NewLine); responseBuilder.Append("Connection: Upgrade" + Environment.NewLine); responseBuilder.Append("Sec-WebSocket-Accept: " + secKeyAccept + Environment.NewLine + Environment.NewLine); //如果把上一行换成下面两行,才是thewebsocketprotocol-17协议,但居然握手不成功,目前仍没弄明白! //responseBuilder.Append("Sec-WebSocket-Accept: " + secKeyAccept + Environment.NewLine); //responseBuilder.Append("Sec-WebSocket-Protocol: chat" + Environment.NewLine); return Encoding.UTF8.GetBytes(responseBuilder.ToString()); } /// <summary> /// 生成Sec-WebSocket-Accept /// </summary> /// <param name="handShakeText">客户端握手信息</param> /// <returns>Sec-WebSocket-Accept</returns> private static string GetSecKeyAccetp(string handShakeText) //byte[] handShakeBytes, int bytesLength { //string handShakeText = Encoding.UTF8.GetString(handShakeBytes, 0, bytesLength); string key = string.Empty; Regex r = new Regex(@"Sec\-WebSocket\-Key:(.*?)\r\n"); Match m = r.Match(handShakeText); if (m.Groups.Count != 0) { key = Regex.Replace(m.Value, @"Sec\-WebSocket\-Key:(.*?)\r\n", "$1").Trim(); } byte[] encryptionString = SHA1.Create().ComputeHash(Encoding.ASCII.GetBytes(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); return Convert.ToBase64String(encryptionString); } /// <summary> /// 解析客户端数据包 /// </summary> /// <param name="recBytes">服务器接收的数据包</param> /// <param name="recByteLength">有效数据长度</param> /// <returns></returns> private static string AnalyticData(byte[] recBytes, int recByteLength) { if (recByteLength < 2) { return string.Empty; } bool fin = (recBytes[0] & 0x80) == 0x80; // 1bit,1表示最后一帧 if (!fin) { return string.Empty;// 超过一帧暂不处理 } bool mask_flag = (recBytes[1] & 0x80) == 0x80; // 是否包含掩码 if (!mask_flag) { return string.Empty;// 不包含掩码的暂不处理 } int payload_len = recBytes[1] & 0x7F; // 数据长度 byte[] masks = new byte[4]; byte[] payload_data; if (payload_len == 126) { Array.Copy(recBytes, 4, masks, 0, 4); payload_len = (UInt16)(recBytes[2] << 8 | recBytes[3]); payload_data = new byte[payload_len]; Array.Copy(recBytes, 8, payload_data, 0, payload_len); } else if (payload_len == 127) { Array.Copy(recBytes, 10, masks, 0, 4); byte[] uInt64Bytes = new byte[8]; for (int i = 0; i < 8; i++) { uInt64Bytes[i] = recBytes[9 - i]; } UInt64 len = BitConverter.ToUInt64(uInt64Bytes, 0); payload_data = new byte[len]; for (UInt64 i = 0; i < len; i++) { payload_data[i] = recBytes[i + 14]; } } else { Array.Copy(recBytes, 2, masks, 0, 4); payload_data = new byte[payload_len]; //修改,WebSocket如果自动触发了心跳,之后再发送数据,可能出错,增加下面的判断 if (recBytes.Length < 6 + payload_len) Array.Copy(recBytes, 6, payload_data, 0, recBytes.Length - 6); else Array.Copy(recBytes, 6, payload_data, 0, payload_len); } for (var i = 0; i < payload_len; i++) { payload_data[i] = (byte)(payload_data[i] ^ masks[i % 4]); } return Encoding.UTF8.GetString(payload_data); } /// <summary> /// 打包服务器数据 /// </summary> /// <param name="message">数据</param> /// <returns>数据包</returns> private static byte[] PackData(string message) { byte[] contentBytes = null; byte[] temp = Encoding.UTF8.GetBytes(message); if (temp.Length < 126) { contentBytes = new byte[temp.Length + 2]; contentBytes[0] = 0x81; contentBytes[1] = (byte)temp.Length; Array.Copy(temp, 0, contentBytes, 2, temp.Length); } else if (temp.Length < 0xFFFF) { contentBytes = new byte[temp.Length + 4]; contentBytes[0] = 0x81; contentBytes[1] = 126; contentBytes[2] = (byte)(temp.Length & 0xFF); contentBytes[3] = (byte)(temp.Length >> 8 & 0xFF); Array.Copy(temp, 0, contentBytes, 4, temp.Length); } else { // 暂不处理超长内容 } return contentBytes; } 测试配套的客户端 HTML代码: <html> <head> <meta charset="UTF-8"> <title>Web sockets test</title> <script type="text/javascript"> var ws; function ToggleConnectionClicked() { try { var ip = document.getElementById("txtIP").value; ws = new WebSocket("ws://" + ip + ":1333/chat"); //连接服务器 ws://localhost:1818/chat ws.onopen = function(event){alert("已经与服务器建立了连接\r\n当前连接状态:"+this.readyState);}; ws.onmessage = function(event){alert("接收到服务器发送的数据:\r\n"+event.data);}; ws.onclose = function(event){alert("已经与服务器断开连接\r\n当前连接状态:"+this.readyState);}; ws.onerror = function(event){alert("WebSocket异常!");}; } catch (ex) { alert(ex.message); } }; function SendData() { try{ ws.send("张三."); }catch(ex){ alert(ex.message); } }; function seestate(){ alert(ws.readyState); } </script> </head> <body> Server IP:<input type="text" id="txtIP" value="127.0.0.1"/> <button id='ToggleConnection' type="button" onclick='ToggleConnectionClicked();'>连接服务器</button><br /><br /> <button id='ToggleConnection' type="button" onclick='SendData();'>发送我的名字:beston</button><br /><br /> <button id='ToggleConnection' type="button" onclick='seestate();'>查看状态</button><br /><br /> </body> </html> 但是上面的代码依然无法处理IE的“心跳”数据引起的问题。此时需要修改一下WebSocket对接受到数据的处理方式,如果客户端发送的是无效的数据,比如IE的心跳数据 ,那么直接过滤,不写入任何数据,将服务端的代码做下面的修改即可: if (sendBuffer != null) { //设置传回客户端的缓冲区。 e.SetBuffer(sendBuffer, 0, sendBuffer.Length); //结束 bool willRaiseEvent = token.Socket.SendAsync(e); if (!willRaiseEvent) { ProcessSend(e); } } else { ProcessSend(e); } 这样,就得到了最终正确的结果了。此问题困扰了我好几天。下面是运行结果图: 本文转自深蓝医生博客园博客,原文链接:http://www.cnblogs.com/bluedoctor/p/3534087.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

[Android学习笔记三] Support v7提供交错式网格布局开发示例

本文主要介绍Android Support v7提供的RecycleView和交错式布局(通常成为瀑布流布局)的使用和事件监听处理。 1. 涉及到开源库有: Fresco: Facebook开源的不是一般强大的图片加载组件库 Bufferknife: Android View和事件绑定库,通过注解完成,在编译时APT处理注解文档。 2. 模块配置 Android Studio模块,build.gradle配置: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 applyplugin: 'com.android.application' android{ compileSdkVersion 21 buildToolsVersion '21.1.2' defaultConfig{ applicationId 'secondriver.sdk' minSdkVersion 16 targetSdkVersion 21 versionCode 1 versionName '1.0' } buildTypes{ release{ minifyEnabledfalse proguardFilesgetDefaultProguardFile( 'proguard-android.txt' ), 'proguard-rules.pro' } } packagingOptions{ exclude 'META-INF/services/javax.annotation.processing.Processor' } lintOptions{ disable 'InvalidPackage' } } dependencies{ compilefileTree(dir: 'libs' ,include:[ '*.jar' ]) compile 'com.android.support:appcompat-v7:21.0.3' compile 'com.android.support:recyclerview-v7:21.0.3@aar' compile 'com.android.support:cardview-v7:21.0.3@aar' compile 'com.facebook.fresco:fresco:0.8.0' compile 'com.jakewharton:butterknife:7.0.1' testCompile 'junit:junit:4.12' } 3. 布局文件 主布局文件:activity_recycleview.xml 包含一个RecycleView,将作为主屏幕组件。 1 2 3 4 5 6 7 8 9 10 11 12 13 <? xml version = "1.0" encoding = "utf-8" ?> < RelativeLayout android:id = "@+id/swipe_refresh_layout" xmlns:android = "http://schemas.android.com/apk/res/android" android:layout_width = "match_parent" android:layout_height = "wrap_content" > < android.support.v7.widget.RecyclerView android:id = "@+id/recycler_view" android:layout_width = "match_parent" android:layout_height = "wrap_content" android:layout_alignParentTop = "true" android:scrollbars = "vertical" /> </ RelativeLayout > RecycleView 通过设置不同的布局管理器对象来实现不同的布局显示,如: android.support.v7.widget.LinearLayoutManager 可以实现ListView的布局效果 android.support.v7.widget.GridLayoutManager 可以实现GridView的布局效果 android.support.v7.widget.StaggeredGridLayoutManager 可以实现交错式网格布局效果 次布局文件(RecycleView中显示的每一项视图的布局):item_recycelview.xml 通过com.android.support:CardView包提供的CardView帧式容器布局包含一个ImageView和TextView作为RecycleView的每一项视图的布局。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 <? xml version = "1.0" encoding = "utf-8" ?> < LinearLayout xmlns:android = "http://schemas.android.com/apk/res/android" xmlns:card_view = "http://schemas.android.com/apk/res-auto" xmlns:fresco = "http://schemas.android.com/tools" android:layout_width = "match_parent" android:layout_height = "match_parent" android:gravity = "center" > < android.support.v7.widget.CardView android:id = "@+id/card_view" android:layout_width = "wrap_content" android:layout_height = "wrap_content" android:layout_gravity = "center" card_view:cardCornerRadius = "4dp" card_view:cardUseCompatPadding = "true" > < RelativeLayout android:layout_width = "match_parent" android:layout_height = "wrap_content" android:orientation = "vertical" > < com.facebook.drawee.view.SimpleDraweeView android:id = "@+id/info_image" android:layout_width = "300dp" android:layout_height = "420dp" fresco:actualImageScaleType = "centerCrop" fresco:placeholderImage = "@mipmap/ic_launcher" fresco:roundAsCircle = "false" fresco:roundBottomLeft = "true" fresco:roundBottomRight = "true" fresco:roundTopLeft = "true" fresco:roundTopRight = "true" fresco:roundedCornerRadius = "1dp" fresco:roundingBorderWidth = "2dp" /> < TextView android:id = "@+id/info_text" android:layout_width = "200dp" android:layout_height = "80dp" android:textSize = "20sp" android:textColor = "@android:color/holo_blue_dark" android:textAppearance = "?android:attr/textAppearanceMedium" android:layout_alignLeft = "@id/info_image" android:layout_alignRight = "@id/info_image" android:layout_below = "@id/info_image" android:gravity = "left|center_vertical" /> </ RelativeLayout > </ android.support.v7.widget.CardView > </ LinearLayout > ImageView组件使用的是Fresco库提供的视图组件。 4. Activity实现和数据填充 4.1 RecycleView中的每一项视图的数据组成 文本信息+ 图片地址(url,file,resId) 本示例中采用Url网络图片 1 2 3 4 5 6 7 8 9 10 11 12 /** *View项的数据对象 */ static class Pair{ public Stringtext; public Stringurl; public Pair(Stringtext,Stringurl){ this .text=text; this .url=url; } } 4.2 RecycleView中国的每一项视图的数据填充,即适配器 自定义适配器需要实现RecycleView.Adapter类。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static class RecycleViewAdapter extends RecyclerView.Adapter<RecyclerView.ViewHolder>{ /** *RecycleView的View项单击事件监听 */ public interface OnRecycleViewItemClickListener{ void onRecycleViewItemClick(Viewview, int position); } private ArrayList<Pair>items= new ArrayList<>(); private OnRecycleViewItemClickListenermOnRecycleViewItemClickListener; public RecycleViewAdapter(ArrayList<Pair>items){ this .items=items; } @Override public RecyclerView.ViewHolderonCreateViewHolder(ViewGroupparent, int viewType){ Viewview=LayoutInflater.from(parent.getContext()).inflate(R.layout.item_recyclerview,parent, false ); return new RecycleViewItemHolder(view,mOnRecycleViewItemClickListener); } @Override public void onBindViewHolder(RecyclerView.ViewHolderholder, int position){ Pairpair=items.get(position); ((RecycleViewItemHolder)holder).setContent(pair); } @Override public int getItemCount(){ return items.size(); } public void setOnRecycleViewItemClickListener(OnRecycleViewItemClickListeneronItemClickListener){ if ( null !=onItemClickListener){ this .mOnRecycleViewItemClickListener=onItemClickListener; } } } 主要实现onCreateViewHolder和onBindViewHolder方法。由于RecycleView并未提供为视图项添加监听事件,这里自定义个一个事件监听接口,在实例化适配器的时候可以选择设置事件监听。 4.3 创建ViewHolder需要通过自定义ViewHolder类继承RecycleView.ViewHolder。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 static class RecycleViewItemHolder extends RecyclerView.ViewHolder implements View.OnClickListener{ @Bind (R.id.info_text) public TextViewinfoTextView; @Bind (R.id.info_image) public SimpleDraweeViewdraweeView; private RecycleViewAdapter.OnRecycleViewItemClickListeneronItemClickListener; public RecycleViewItemHolder(ViewitemView,RecycleViewAdapter.OnRecycleViewItemClickListeneronItemClickListener){ super (itemView); ButterKnife.bind( this ,itemView); this .onItemClickListener=onItemClickListener; itemView.setOnClickListener( this ); } public void setContent(Pairpair){ infoTextView.setText(pair.text); draweeView.setImageURI(Uri.parse(pair.url)); } @Override public void onClick(Viewv){ if ( null !=onItemClickListener){ onItemClickListener.onRecycleViewItemClick(v,getPosition()); } } } 在构造方法中的传入OnRecycleViewItemClickListener对象,并且自定义ViewHolder实现OnClickListener接口,通过为itemView对象设置OnClickListener监听事件,在onClick方法中将点击事件的处理交由OnRecycleViewItemClickListener对象处理,从而达到为RecycleView中的itemView注册点击事件。 如果不采用这种方式添加事件监听,就不需要自定义监听接口和自定义ViewHolder实现事件监听接口以及事件处理。另外可以通过获得的itemView之后,通过组件I的获取组件来添加事件监听。还可以通过获得的自定义ViewHolder,来访问每个组件。 4.4 Activity的具体实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 package secondriver.sdk.activity; import android.app.Activity; import android.net.Uri; import android.os.Bundle; import android.support.v7.widget.DefaultItemAnimator; import android.support.v7.widget.RecyclerView; import android.support.v7.widget.StaggeredGridLayoutManager; import android.util.Log; import android.view.LayoutInflater; import android.view.View; import android.view.ViewGroup; import android.widget.TextView; import android.widget.Toast; import com.facebook.drawee.view.SimpleDraweeView; import java.util.ArrayList; import java.util.Arrays; import java.util.Random; import butterknife.Bind; import butterknife.ButterKnife; import secondriver.sdk.R; /** *Author:secondriver *Created:2015/11/18 */ public class RecycleViewActivity extends Activity{ private static final StringTAG=RecyclerView. class .getName(); private static final String[]RES_URL= new String[]{ "http://p1.wmpic.me/article/2015/11/16/1447644849_hySANEEF.jpg" , //减少篇幅,此处省去14个图片Url }; @Bind (R.id.recycler_view) public RecyclerViewmRecycleView; private final int PRE_SCREEN_NUMBER= 6 ; private final int SPAN_COUNT= 2 ; private int previousLastIndex= 0 ; private boolean isSlidingToLast= false ; private RecycleViewAdaptermAdapter; private ArrayList<Pair>mItem= new ArrayList<>(); //交错式网格布局管理对象,即通常称的瀑布流布局 private StaggeredGridLayoutManagermLayoutManager; @Override protected void onCreate(BundlesavedInstanceState){ super .onCreate(savedInstanceState); setContentView(R.layout.activity_recyclerview); ButterKnife.bind( this ); Fresco.initialize( this ); //重要,Fresco做一系列初始化工作 initRecycleView(); } private void initRecycleView(){ mLayoutManager= new StaggeredGridLayoutManager(SPAN_COUNT,StaggeredGridLayoutManager.VERTICAL); mRecycleView.setLayoutManager(mLayoutManager); mAdapter= new RecycleViewAdapter(mItem); loadData( false ); mRecycleView.setAdapter(mAdapter); mRecycleView.setItemAnimator( new DefaultItemAnimator()); //RecycleView的View项单击事件监听 mAdapter.setOnRecycleViewItemClickListener( new RecycleViewAdapter.OnRecycleViewItemClickListener(){ @Override public void onRecycleViewItemClick(Viewview, int position){ long id=mRecycleView.getChildItemId(view); Log.d(TAG, "View项的根视图:" +view.getClass().getName()+ ",position=" +position+ "ViewHolder_Id=" +id); //通过findViewById查找View项中的元素 SimpleDraweeViewdraweeView=(SimpleDraweeView)view.findViewById(R.id.info_image); if ( null !=draweeView){ draweeView.setImageURI(Uri.parse(RES_URL[ 0 ])); Toast.makeText(RecycleViewActivity. this , "通过findViewById查找View项中的元素" ,Toast.LENGTH_LONG).show(); } RecycleViewItemHolderrecycleViewItemHolder=(RecycleViewItemHolder)mRecycleView.findViewHolderForPosition(position); if ( null !=recycleViewItemHolder){ recycleViewItemHolder.infoTextView.setText( "通过ViewHolder找到View项中的元素" ); } } }); //下拉刷新,追加内容 mRecycleView.setOnScrollListener( new RecyclerView.OnScrollListener(){ @Override public void onScrollStateChanged(RecyclerViewrecyclerView, int newState){ super .onScrollStateChanged(recyclerView,newState); if (newState==RecyclerView.SCROLL_STATE_IDLE){ if (isPullToBottom()&&isSlidingToLast){ if (mItem.size()> 36 ){ //最大数据量 Toast.makeText(RecycleViewActivity. this , "没有数据了" ,Toast.LENGTH_LONG).show(); return ; } else { loadData( false ); Log.d(TAG, "notifyItemRangeInsertedstartPosition=" +previousLastIndex); mAdapter.notifyItemRangeInserted(previousLastIndex,PRE_SCREEN_NUMBER); } } } if (newState==RecyclerView.SCROLL_STATE_SETTLING){ Log.d(TAG, "settling" ); } } @Override public void onScrolled(RecyclerViewrecyclerView, int dx, int dy){ super .onScrolled(recyclerView,dx,dy); isSlidingToLast=dy> 0 ; //上拉,下滑 Log.d(TAG, "dx=" +dx+ "dy=" +dy+ "isSlidingToLast=" +isSlidingToLast); } } ); } private boolean isPullToBottom(){ int []lastIndexs=mLayoutManager.findLastCompletelyVisibleItemPositions( null ); Log.d(TAG, "lastitem=" +Arrays.toString(lastIndexs)+ ",haveitem=" +mAdapter.getItemCount()); int maxIndex=mAdapter.getItemCount()- 1 ; for ( int i:lastIndexs){ if (i==maxIndex){ return true ; } } return false ; } private void loadData( boolean isClear){ if (isClear){ mItem.clear(); } previousLastIndex=mItem.size(); Randomr= new Random(); for ( int index= 0 ;index<PRE_SCREEN_NUMBER&&index<RES_URL.length;index++){ mItem.add( new Pair( "Card" +(previousLastIndex+index),RES_URL[r.nextInt(RES_URL.length)])); } Log.d(TAG, "mItemcount=" +mItem.size()); } } 说明: RecycleView的ItemView的数据由Pair对象管理存储 Fresco库默认配置下的使用之前需要初始化调用Fresco.initialize(Context) 上述实现了2个事件监听,一个OnScrollerListener由RecycleView提供,实现下拉刷新;一个OnRecycleViewItemClickListener由自定义的RecycleViewAdpater提供,实现单击itemView来更换图片(SimpleDraweeView)和文字(TextView)。 Butterknife和Fresco库的具体使用参见其Github上的文档。 5. 效果图 比较遗憾ScreenRecord不支持模拟器和Android4.4一下的系统,只能附上截图。 本文转自 secondriver 51CTO博客,原文链接:http://blog.51cto.com/aiilive/1715139,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop Hive概念学习系列之hive里的扩展接口(CLI、Beeline、JDBC)(十六)

《Spark最佳实战 陈欢》写的这本书,关于此知识点,非常好,在94页。 hive里的扩展接口,主要包括CLI(控制命令行接口)、Beeline和JDBC等方式访问Hive。 CLI和Beeline都是交互式用户接口,并且功能相似,但是语法和实现不同。 JDBC是一种类似于编程访问关系型数据库的编程接口。 1、CLI 在UNIX shell环境下输入hive命令可以启用Hive CLI。在CLI下,所有的Hive语句都以分号结束。 在CLI下可以对一些属性做出设置,像是设置底层MapReduce任务中Reducer的实例数。这些信息都详细地记录在在线Hive语言手册中。 下面是一些专门针对Hive,并且对使用Hive CLI非常有帮助的属性: hive.cli.print.header:当设置为true时,查询返回结果的同时会打印列名。默认情况下设置为false。因此不会打印。 想要开启列名打印的功能需要输入以下指令。 hive > set hive.cli.print.header=true; hive.cli.print.current.db:当设置为true时,将打印当前数据库的名字。默认情况下设置为false。 可以通过输入以下指令修改属性: hive > set hive.cli.print.current.db=true; hive (default) > 2、Beeline Beeline可以作为标准命令行接口的替代者。它使用JDBC连接Hive,而且基于开源的SQLLine项目。 Beeline的工作方式和Hive CLI很像,但是使用Beeline需要与Hive建立显示的连接: $ beeline Beeline version 0.11.0 by Apache Hive beeline > !connect jdbc:hive:// nouser nopassword 本地模式中使用的JDBC的URL是jdbc:hive//。如果是集群中的配置,那么JDBC的URL通常是这样的形式:dbc:hive//<hostname>:<port>。 <hostname>是Hive服务器的主机名,<port>是预先配置的端口号(默认为10000)。 这样的情况下,我们可以使用Beeline执行任何Hive语句,与使用CLI一样。 3、JDBC Java客户端可以使用预先提供的JDBC驱动来连接Hive。连接步骤和其他兼容JDBC的数据库一样。首先载入驱动,然后建立连接。 JDBC驱动的类名是org.apache.hadoop.hive.jdbc.HiveDriver。 本地模式中使用的JDBC的URL是jdbc:hive://。 如果是集群中的配置,那么JDBC的URL通常是这样的形式:jdbc:hive//<hostname>:<port>。 <hostname>是Hive服务器的主机名,<port>是预先配置的端口号(默认为10000)。 给一个例子,展示使用JDBC连接本地模式的Hive,并提交查询请求: import java.sql.Connection; import java.sql.Driver; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; import org.apache.log4j.Level; import org.apache.log4j.LogManager; public class HiveJdbcClient{ private static String driverName="org.apache.hadoop.hive.jdbc.HiveDriver"; public static void main(String[] args)throws Exception{ LogManager.getRootLogger().setLevel(Level.ERROR); Class.forName(driverName); Connection con=DriverManager.getConnection( "jdbc:hive://","",""); Statement stmt=con.createStatement(); stmt.executeQuery(:drop table videos_ex); ResultSet res=stmt.executeQuery("CREATE EXTERNAL TABLE videos_ex" + "(producer string,title string,category string,year int)" + "ROW FROMAT DELTMTIED FIELDS TERMINATED BY \",\" LOCATION " + "/home/madhu/external/videos_ex/data"); //show tables String sql = "show tables"; System.out.println("Running:" +sql); res=stmt.executeQuery(sql); if(res.next()){ System.out.println(res.getString(1)); } //describe table sql="describe videos_ex"; System.out.println("Running:" +sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.println(res.getString(1) + "\t" +res.getString(2)); } //select query sql="select * from videos_ex"; System.out.println("Running:" + sql); res=stmt.executeQuery(sql); ResultSetMetaData rsmd=res.getMetaData(); int ncols=rsmd.getColumnCount(); for(int i=0;i<ncols;i++){ System.out.print(rsmd.getColumnLabel(i+1)); System.out.print("\t"); } System.out.println(); while(res.next()){ for(int i=0;i<ncols;i++){ System.out.print(res.getString(i+1)); System.out.print("\t"); } System.out.println(); } //regular hive query sql ="select count(1) from videos_ex"; System.out.println.("Running:" +sql); res=stmt.executeQuery(sql); if(res.next()){ System.out.println("Number of rows:" + res.getString(1)); } } } 再次谈谈 Hive JDBC编程接口与程序设计 Hive支持标准的数据库查询接口JDBC,在JDBC中需要指定驱动字符串以及连接字符串,Hive使用的驱动器字符串为“org.apache.hadoop.hive.jdbc.HiveDriver”。 在Hive的软件包中已经加入了对应的JDBC的驱动程序,连接字符串标志了将要访问的Hive服务器。例如 jdbc://master:10000/default,在配置连接字符串后可以直接使用传统的JDBC编程技术去访问Hive所提供的功能。 当然这里,可以,手动。一般包括 commons-lang-*.*.jar commons-logging-*.*.*.jar commons-logging-api-*.*.*.jar hadoop-core-*.*.*-Intel.jar hive-exec-*.*.*-Intel.jar hive-jdbc*.*.*Intel.jar hive-metastore-*.*.*-Intel.jar libfb***-*.*.*.jar log4j-*.*.*.jar slf4j-api-*.*.*.jar slf4j-log4j*-*.*.*.jar 为了展示如何基于Hive JDBC进行具体的java编程,设有如下预存在文件中的样例数据: 1&data1_value 2&data2_value 3&data3_value ... 198&data198_value 199&data199_value 200&data200_value 所演示的示例程序将首先创建应Hive表,然后将存放在上述文件中的样例数据装入到这个Hive表中,并通过查询接口并显示出这些数据。 基于Hive JDBC的Java编程示例代码如下: import java.sql.Connection; import java.sql.DriverManager; import java.sql.Driver; import java.sql.SQLException; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; import org.apache.log4j.Level; import org.apache.log4j.LogManager; //该类用于将Hive作为数据库,使用JDBC连接Hive,实现对Hive进行增、删、查等操作。 public class classHiveJdbc{ private static String driverName="org.apache.hadoop.hive.jdbc.HiveDriver"; /** *实现连接Hive,并对Hive进行增、删、查等操作 */ public static void main(String[] args)throws SQLException{ LogManager.getRootLogger().setLevel(Level.ERROR); { try{ Class.forName(driverName); }catch (ClassNotFoundException e){ e.printStackTrace(); System.exit(1); } Connection con=DriverManager.getConnection( "jdbc:hive://192.168.81.182:100000/hivebase","",""); Statement stmt=con.createStatement(); String tableName="HiveTables"; //删除和创建数据表 stmt.executeQuery("DROP TABLE" + tableName); ResultSet res=stmt.executeQuery("CREATE TABLE " + tableName + "(key int,value string)" + "ROW FROMAT DELTMTIED FIELDS TERMINATED BY '&' + stored as textfile); //检查和显示数据表 String sql = "SHOW TABLES '" + tableName + "'"; System.out.println("Running:" +sql); res=stmt.executeQuery(sql); if(res.next()){ System.out.println(res.getString(1)); } //显示数据表字段描述信息 sql="describe" + tableName"; System.out.println("Running:" +sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.println(res.getString(1) + "\t" +res.getString(2)); } //将文件数据装载到Hive表中 String filepath="/Test/data.txt"; sql="load data local inpath '" + filepath + "' into table " + tableName; System.out.println("Running:" + sql); res=stmt.executeQuery(sql); //字段查询 sql="select * from" + tableName; System.out.println("Running:" + sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.print(String.valueOf(res.getInt(1)) + "\t" + res.getString(2)); System.out.print("\t"); } //统计查询 sql ="select count(1) from tableName"; System.out.println.("Running:" +sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.println(res.getString(1)); } }//main函数结束 }//HiveJdbc类结束 以下对程序中的重要部分进行说明。 private static String driverName="org.apache.hadoop.hive.jdbc.HiveDriver"; 为驱动字符串。 Class.forName(driverName); 为完成加载数据库驱动,它的主要功能为加载指定的class文件到java虚拟机的内存。 Connection con=DriverManager.getConnection( "jdbc:hive://192.168.81.182:100000/hivebase","",""); 为连接字符串,这里需要制定服务器IP以及所用到的数据库。由于Hive不需要用户名和密码,所以第2个参数和第3个参数为空。 加载好驱动,配置好连接数据库字符串以后,便可以编写语句对Hive进行相应的操作。 如果操作的数据表已经存在,可以先将该表删掉,如stmt.executeQuery("DROP TABLE" + tableName); 删除表后,27行再创建表。 ResultSet res=stmt.executeQuery("CREATE TABLE " + tableName + "(key int,value string)" + "ROW FROMAT DELTMTIED FIELDS TERMINATED BY '&' + stored as textfile); 在使用JDBC对Hive进行表的操作时所用到的语句与命令行的语句完全相同,只需要在程序中拼接出相应的语句即可。 创建表后,查看数据库是否有该表,将查询回来的结果输出到控制台。 String sql = "SHOW TABLES '" + tableName + "'"; System.out.println("Running:" +sql); res=stmt.executeQuery(sql); if(res.next()){ System.out.println(res.getString(1)); } 对表结构的查询、向表加载数据、查询数据以及统计等操作均可以通过与Hive命令相同的方式进行。 显示该表的字段结构信息,共有Key和value两个字段。 sql="describe" + tableName"; System.out.println("Running:" +sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.println(res.getString(1) + "\t" +res.getString(2)); } 将前述预存在一个文件中的数据装载到数据表中。 String filepath="/Test/data.txt"; sql="load data local inpath '" + filepath + "' into table " + tableName; System.out.println("Running:" + sql); res=stmt.executeQuery(sql); 执行常规的字段数据查询,并打印输出查询结果 sql="select * from" + tableName; System.out.println("Running:" + sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.print(String.valueOf(res.getInt(1)) + "\t" + res.getString(2)); System.out.print("\t"); } 执行一个统计查询,统计数据记录的行数并打印输出统计结果 sql ="select count(1) from tableName"; System.out.println.("Running:" +sql); res=stmt.executeQuery(sql); while(res.next()){ System.out.println(res.getString(1)); } 最后,执行,得到,以下为程序执行后控制台输出的日志: 1 data1_value 2 data2_value 3 data3_value 4 data4_value 5 data5_value ... 198 data198_value 199 data199_value 200 data200_value Running:select count(1) from HiveTables 200 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6105571.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop概念学习系列之谈hadoop/spark里为什么都有,键值对呢?(四十)

很少有人会这样来自问自己?只知道,以键值对的形式处理数据并输出结果,而没有解释为什么要以键值对的形式进行。 包括hadoop的mapreduce里的键值对,spark里的rdd里的map等。 这是为什么呢? 1、键值对的具体含义 首先,我们会通过强调Java标准库中的类似概念,来阐明我们所说的键值对的含义。 java.util.Map接口是常用类,如HashMap,甚至原始Hashtable的父类(通过向后重构代码库)。 对于任何Java Map对象,其内容是从指定类型的给定键到相关值的一组映射,键与值的数据类型可能不同。例如,一个HashMap对象可以包含从人名(String)到其生日(Date)的一组映射。 Hadoop中的数据包含与相关值关联的键。这些数据的存储方式允许对数据集的不同值根据键进行分类和重排。如果使用键值对数据,应该会有如下疑问: 1、在数据集中,一个给定的键必然有映射值吗? 2、给定键的关联值是什么? 3、键的完整集合是什么? 回忆我们很熟悉的wordcount吧。该程序的输出显然是键/值关系的组合。对于每个字(键),都有对应着它出现的次数(值)。 键/值数据的一些重要特征就变得清晰起来,具体如下: 1、键必须是唯一的,而值并不一定是唯一的。 2、每个值必须与键相关联,但键可能没有值(虽然在这个特定的例子中没有出现这种情况)。 3、对键进行明确定义非常重要。它决定了计数是否区分大小写,这将产生不同的结果。 注意,我们需要审慎对待“键是唯一的”这一概念,这并不是说键只出现一次。在我们的数据集中,可以看到键多次出现。并且我们看到,MapReduce模型有一股将所有与特定键关联的数据汇集的步骤。键的唯一性保证了,假如我们为某一给定键汇集对应的值,结果将是从该键的实例到每个值的映射,不会忽略掉任何值。 2、为什么会采用键/值数据 键/值数据作为mapreduce操作的基础,成就了一个强大的编程模型,使mapreduce获得了令人惊讶的广泛应用。hadoop和mapreduce被多种不同行业的问题领域所采用即证实了这一点。很多数据要么本身即为键/值形式,要么可以以键/值这种方式来表示。键值数据这一简单的模型具有广泛的适用性,以这种形式定义的程序可以应用于hadoop和spark框架。 当然,数据模型本身并非是使hadoop如此强大的唯一要素,它真正的强大之处在于如何运用并行处理技术以及分而治之思想。我们可以在大量主机上存储、执行数据,甚至使用将较大任务分割成较小任务的框架,然后将所有并行结果整合成最终结论。 但是,我们需要上述框架提供一种描述问题的方法,即便用户不懂该框架的运行机理,也能表达清楚要处理的问题。我们只需要对数据所需的转换进行描述,其余事情由该框架完成。 mapreduce利用其键/值接口提供了这样的抽象:程序员只需指定所要求的转换,hadoop完成对任意规模数据集的复杂的数据转换处理过程。 一些实际应用 为了更为具体的理解键值对,可以想象一些实际应用的键值对数据: 通讯簿将一个名字(键)和联系方法(值)关联起来; 银行账号使用一个账号(键)关联账户明细(值); 一本书的索引关联一个关键字(键)和其所在的页码(值); 在计算机文件系统中,根据文件名(键)访问各类数据,如文本、图片和语音(值)。 我这里,刻意列举了一些范围宽泛的例子,帮助你认识到,键/值数据并不是只能应用于高端数据挖掘的约束模型,其实啊,就环绕在我们身边的非常普通的类型啊! 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6092436.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

Hadoop HDFS概念学习系列之初步掌握HDFS的架构及原理2(二)

HDFS 如何读取文件? HDFS的文件读取原理,主要包括以下几个步骤: 1、首先调用FileSystem对象的open方法,其实获取的是一个DistributedFileSystem的实例。 2、DistributedFileSystem通过RPC(远程过程调用)获得文件的第一批block的locations,同一block按照重复数会返回多个locations,这些locations按照hadoop拓扑结构排序,距离客户端近的排在前面。 3、前两步会返回一个FSDataInputStream对象,该对象会被封装成DFSInputStream对象,DFSInputStream可以方便的管理datanode和namenode数据流。客户端调用read方法,DFSInputStream就会找出离客户端最近的datanode并连接datanode。 4、数据从datanode源源不断的流向客户端。 5、如果第一个block块的数据读完了,就会关闭指向第一个block块的datanode连接,接着读取下一个block块。这些操作对客户端来说是透明的,从客户端的角度来看只是读一个持续不断的流。 6、如果第一批block都读完了,DFSInputStream就会去namenode拿下一批blocks的location,然后继续读,如果所有的block块都读完,这时就会关闭掉所有的流。 详细过程: 首先,客户端通过调用FileSystem对象中的open()函数来读取它所需的数据。FileSystem是HDFS中DistributedFileSystem的一个实例。DistributedFileSystem会通过RPC协议调用NameNode来确定请求文件块所在的位置。这里需要注意的是,NameNode只会返回所调用文件中开始的几个块而不是全部返回。对于每个返回的块,都包含块所在的DataNode地址。随后,这些 返回的DataNode会按照Hadoop定义的集群拓扑结构得出客户端的距离,然后再进行排序。如果客户端本身就是一个DataNode,那么它将从本地读取文件。 其次,DistributedFileSystem会向客户端返回一个支持文件定位的输入流对象FSDataInputStream,用于给客户端读取数据。FSDataInputStream包含一个DFSInputStream对象,这个对象用来管理DataNode和NameNade之间的I/o。 当以上步骤完成时,客户端便会在这个输入流之上调用read()函数。DF SInputStream对象中包含文件开始部分的数据块所在的DataNode地址,首先它会连接包含文件第一个块最近DataNode 。随后,在数据流中重复调用read()函数,直到这个块全部读完为止。当最后一个块读取完毕时,DFSInputStream会关闭连接,并查找存储下一个数据块距离客户端最近的DataNode。以上这些 步骤对客户端来说都是透明的。 客户端按照DFSInputStream打开和DataNode连接返回的数据流的顺序读取该块,它也会调用NameNode来检索下一组块所在的DataNode的位置信息。当客户端完成所有文件的读取时,则会在FSDataInputStream中调用close()函数。 当然,HDFS会考虑在读取中节点出现故障的情况。目前HDFS是这样处理的:如果客户端和所连接的DataNode在读取时出现故障,那么它就会去尝试连接存储这个块的下一个最近的DataNode,同时它会记录这个节点的故障。这样它就不会再去尝试连接和读取块。客户端还会验证从DataNode传送过来的数据校验和。如果发现一个损坏的块.那么客户端将会再尝试从别的DataNode读取数据块,向NameNode报告这个信息,NameNode也会更新保存的文件信息。 这里要关注的一个设计要点是,客户端通过NameNode引导获取最合适的DataNode地址,然后直接连接DataNode读取数据。这种设计的好处是,可以使HDFS扩展到更大规模的客户端并行处理,这是因为数据的流动是在所有DataNode之间分散进行的。同时NameNode的压力也变小了,使得NameNode只用提供请求块所在的位置信息就可以了,而不用通过它提供数据,这样就避免了NameNode随着客户端数量的增长而成为系统瓶颈。 本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5080296.html,如需转载请自行联系原作者

优秀的个人博客,低调大师

【ASP.NET开发】ASP.NET(MVC)三层架构知识的学习总结

至从使用过一次MVC进行团队开发后,体会到了面向对象开发软件的便利。使用MVC的时候,各个层之间的低耦合使得他们之间的联系非常的第,也就降低了模块之间的依赖程度。 首先介绍一下MVC的意义,和各个层面之间的用途和功能。 1)实体层。主要用来声明在视图层和业务逻辑层之间传递数据的载体。通常代表来源与项目数据库中一个或者多个表组成的一条有意义的记录。 2)业务逻辑层。根据业务逻辑向视图层提供数据,这个项目中的类有权根据业务逻辑来决定是否调用数据库访问层的方法 3)数据库访问层。项业务逻辑层提供访问数据的方法。 4)视图层。主要是以网站开发为例。用于提供显示,添加,编辑和删除数据。 我在VS中创建的项目的结构图: NetMVC就是所谓的视图层。Entity是实体层,用来声明开发过程中被要使用的所有变量。DAL是数据库访问层,主要完成所有操作对数据库的访问。BLL是业务逻辑层,处理视图层传来的业务逻辑,然后传递给数据库访问层进行处理。 本例主要演示的是登陆页面的实现,Entity层的代码如下 usingSystem; usingSystem.Collections.Generic; usingSystem.Text; namespaceEntity { publicclassUserInfo { privateintUserId; ///<summary> ///用户比编号 ///</summary> publicintUserId1 { get{returnUserId;} set{UserId=value;} } privatestringusername; ///<summary> ///登录账户名称 ///</summary> publicstringUsername { get{returnusername;} set{username=value;} } privatestringpassword; ///<summary> ///登录密码 ///</summary> publicstringPassword { get{returnpassword;} set{password=value;} } privateintloginCount; ///<summary> ///登陆次数 ///</summary> publicintLoginCount { get{returnloginCount;} set{loginCount=value;} } privateDateTimeregDate; ///<summary> ///注册时间 ///</summary> publicDateTimeRegDate { get{returnregDate;} set{regDate=value;} } privateDateTimelastLoginDate; ///<summary> ///最后登录时间 ///</summary> publicDateTimeLastLoginDate { get{returnlastLoginDate;} set{lastLoginDate=value;} } privateboolisForbidden; privatestringpasswordQuestion; ///<summary> ///找回密码提示问题 ///</summary> publicstringPasswordQuestion { get{returnpasswordQuestion;} set{passwordQuestion=value;} } privatestringpasswordAnswer; ///<summary> ///找回密码答案 ///</summary> publicstringPasswordAnswer { get{returnpasswordAnswer;} set{passwordAnswer=value;} } } } 完成对实体类的创建,接着就要完成对数据库访问层的创建(其中需要用到上篇问章所写的SqlHelper数据库访问通用类http://yisuowushinian.blog.51cto.com/4241271/999324),代码如下, usingSystem; usingSystem.Collections.Generic; usingSystem.Text; usingSystem.Data; usingSystem.Data.SqlClient; usingSystem.Configuration; usingEntity; namespaceDAL { publicclassUserDal { privatestringconnectionString=ConfigurationManager.ConnectionStrings["connectionString"].ConnectionString; ///<summary> ///添加用户 ///</summary> ///<paramname="info">用户的实体类</param> ///<returns></returns> publicboolAddUser(UserInfoinfo) { stringsql="insertintoUsers(Username,password)values('@Username','@password')"; SqlParameter[]parameters=newSqlParameter[4]; parameters[0]=newSqlParameter("@Username",SqlDbType.NVarChar,30); parameters[0].Value=info.Username; parameters[1]=newSqlParameter("password",SqlDbType.VarChar,50); parameters[1].Value=info.Password; returnnewSqlDbHelper(connectionString).ExecuteNonQuery(sql)>0; } ///<summary> ///删除用户 ///</summary> ///<paramname="UserId">用户编号</param> ///<returns></returns> publicboolDeleteUser(intUserId) { stringsql="deletefromuserswhereUserId="+UserId; returnnewSqlDbHelper(connectionString).ExecuteNonQuery(sql)>0; } ///<summary> ///更新用户 ///</summary> ///<paramname="info">用户的实体类</param> ///<returns></returns> publicboolUpDateUser(UserInfoinfo) { stringsql="updateuserssetpassword=@password,loginCount=@loginCountwhereuserid=@userid"; SqlParameter[]parameters=newSqlParameter[7]; parameters[0]=newSqlParameter("@password",SqlDbType.VarChar,30); parameters[0].Value=info.Password; parameters[1]=newSqlParameter("@loginCount",SqlDbType.Int,4); parameters[1].Value=info.LoginCount; returnnewSqlDbHelper(connectionString).ExecuteNonQuery(sql)>0; } ///<summary> ///根据用户名或者用户编号查询用户 ///</summary> ///<paramname="userId">用户编号</param> ///<returns></returns> publicDataTableGetUser(intuserId) { stringsql="select*fromuserswhereuserId=@UserId"; SqlParameter[]parameters=newSqlParameter[1]; parameters[0]=newSqlParameter("@UserId",SqlDbType.Int,4); parameters[0].Value=userId; returnnewSqlDbHelper(connectionString).ExecuteDataTable(sql,CommandType.Text,parameters); } ///<summary> ///根据用户名查询用户信息 ///</summary> ///<paramname="Username">用户名</param> ///<returns></returns> publicDataTableGetUser(stringUsername) { stringsql="select*fromuserswhereusername=@username"; SqlParameter[]parameters=newSqlParameter[1]; parameters[1]=newSqlParameter("@username",SqlDbType.NVarChar,30); parameters[1].Value=Username; returnnewSqlDbHelper(connectionString).ExecuteDataTable(sql,CommandType.Text,parameters); } ///<summary> ///按照用户编号升序查询从指定位置开始的指定条数记录 ///</summary> ///<paramname="startIndex">查询的起始位置</param> ///<paramname="size">返回的最大记录条数</param> ///<returns></returns> publicDataTableGetUserList(intstartIndex,intsize) { stringsql="selecttop"+size+"*fromuserswhereUserIdnotin(selecttop"+startIndex+"UserIdfromUsersorderbyUserIdasc)orderbyUserIdasc"; returnnewSqlDbHelper(connectionString).ExecuteDataTable(sql); } ///<summary> ///查询用户总数 ///</summary> ///<returns></returns> publicintGetUserCount() { stringsql="selectcount(1)fromUsers"; returnint.Parse(newSqlDbHelper(connectionString).ExecuteScalar(sql).ToString()); } } } 然后创建业务逻辑层,代码如下: usingSystem; usingSystem.Collections.Generic; usingSystem.Text; usingDAL; usingEntity; usingSystem.Data; usingSystem.Data.SqlClient; usingSystem.Security.Cryptography; usingSystem.Security; namespaceBLL { publicclassUserBLL { privatestaticstringMD5Hash(stringpassword) { MD5md5=MD5.Create();//创建Md5算法实例 //将原始字符串转换成UTF-8编码的字节数组 byte[]sourceBytes=System.Text.Encoding.UTF8.GetBytes(password); //计算字节数组的哈希值 byte[]resultBytes=md5.ComputeHash(sourceBytes); StringBuilderbuffer=newStringBuilder(resultBytes.Length); //将计算得到的哈希值的字节数组中的每一个字节转换成十六进制形式 foreach(bytebinresultBytes) { buffer.Append(b.ToString("X")); } returnbuffer.ToString(); } ///<summary> ///添加用户 ///</summary> ///<paramname="info">用户的实体类</param> ///<returns></returns> publicstaticboolAddUser(UserInfoinfo) { UserDaldal=newUserDal(); DataTabledata=dal.GetUser(info.Username); if(data.Rows.Count>0) { returnfalse; } else { info.Password=MD5Hash(info.Password); returnnewUserDal().AddUser(info); } } ///<summary> ///删除用户 ///</summary> ///<paramname="userId">用户编号</param> ///<returns></returns> publicstaticboolDeleteUser(intuserId) { returnnewUserDal().DeleteUser(userId); } ///<summary> ///更新用户 ///</summary> ///<paramname="info">用户的实体类</param> ///<paramname="changePassword">是否需要对用户密码进行加密</param> ///<returns></returns> publicstaticboolUpdateUser(UserInfoinfo,boolchangePassword) { //如果更改密码就需要对新密码进行加密 //如果没有更改密码则不能对密码重复加密,否则会对用户造成无法登陆 if(changePassword) { info.Password=MD5Hash(info.Password); } returnnewUserDal().UpDateUser(info); } ///<summary> ///根据用户编号查询用户 ///</summary> ///<paramname="UserId">用户的编号</param> ///<returns></returns> publicstaticDataTableGetUser(intUserId) { returnnewUserDal().GetUser(UserId); } ///<summary> ///根据用户名查询用户信息 ///</summary> ///<paramname="userName">用户名</param> ///<returns></returns> publicstaticDataTableGetUser(stringuserName) { returnnewUserDal().GetUser(userName); } ///<summary> ///按照用户编号升序查询从指定位置开始的指定条数的记录 ///</summary> ///<paramname="startIndex">查询的起始位置</param> ///<paramname="size">返回的最大记录条数</param> ///<returns></returns> publicstaticDataTableGetUserList(intstartIndex,intsize) { returnnewUserDal().GetUserList(startIndex,size); } ///<summary> ///查询用户总数 ///</summary> ///<returns></returns> publicstaticintGetUserCount() { returnnewUserDal().GetUserCount(); } ///<summary> ///根据用户名或者用户编号从数据库查询对应记录的实体,如果不存在则返回null ///</summary> ///<paramname="userId">用户编号</param> ///<returns></returns> publicstaticUserInfoGetUserEntity(intuserId) { returnChangeToEntity(newUserDal().GetUser(userId)); } ///<summary> ///根据用户名或者用户编号从数据库查询对应记录的实体,如果不存在则返回null ///</summary> ///<paramname="userName">用户名</param> ///<returns></returns> publicstaticUserInfoGetUserEntity(stringuserName) { returnChangeToEntity(newUserDal().GetUser(userName)); } ///<summary> ///将包含Users表记录的DataTables中的第一条记录转换成Userinfo实体 ///</summary> ///<paramname="data"></param> ///<returns></returns> privatestaticUserInfoChangeToEntity(DataTabledata) { UserInfoinfo=null; //如果data不为空并且包含的记录条数大于0、 if(data!=null&&data.Rows.Count>0) { DataRowrow=data.Rows[0]; info=newUserInfo(); } returninfo; } publicstaticboolLogin(stringuserName,stringpassword) { boolexits=false; UserInfoinfo=GetUserEntity(userName); if(info!=null&&MD5Hash(password)==info.Password) { exits=true; info.LoginCount=info.LoginCount+1;//将用户登陆的次数加1 info.LastLoginDate=DateTime.Now;//将用户最后的登录时间设置为现在 UpdateUser(info,false);//更新用户的登陆次数和最后的登录时间 } returnexits; } } } 这个就完成了对数据库访问层,实体层,业务逻辑层的创建工作。因为在创建过程中被,有很多的重复的工作,我为了加快速度,就省略了一些的代码。 在登陆页面使用时,只需要引入业务逻辑层的方法,然后根据其中的方法,传入相应的参数,就可以完成所有的操作。 本文转自yisuowushinian 51CTO博客,原文链接:http://blog.51cto.com/yisuowushinian/1008416,如需转载请自行联系原作者

优秀的个人博客,低调大师

MongoDB学习笔记(三)--权限 && 导出导入备份恢复 && fsync和锁

权限 绑定内网IP访问MongoDB服务 在启动的时候带上 –bind_ip 192.168.1.1 参数,可以使指定IP访问。 mongod --bind_ip 192.168.1.1 连接时必须指定IP,否则会失败。 mongo 192.168.1.1 用户 MongoDB中默认有一个空的admin数据库,在admin.system.users中保存的用户比其他数据库中设置的权限更大。在admin.system.users中没有添加任何用户的情况下,鸡屎在MongoDB启动时启用了 –auth 参数,客户端不进行任何认证依然可以连接到数据库,并且可以对数据库进行任何操作。 建立系统root用户 使用addUser()函数添加一个root用户。 建立指定权限的用户 使用addUser()函数为test库添加了一个只读权限的用户,设置只读只需要在addUser()函数中传入第3个参数值为true。 执行指定文件中的内容 text.js的内容是 var count = db.yyd.count(); printjson('count if yyd is : ' + count); 查看活动进程 db.currentOp(); 结束进程 db,killOp(opid号) serverStatus 获取运行中的MongoDB服务器统计信息。 db.runCommand({"serverStatus":1}); mongostat 便捷的查看serverStatus的结果。 导出 导入 备份 恢复 fsync和锁 fsync命令会强制服务器将所有缓冲区写入磁盘。还可以选择上锁阻止对数据库的进一步写入,直到释放锁为止。 db,runCommand({"fsync":1,"lock":1}); 上锁之后便可以不用停掉服务器,也不用牺牲备份的实施特性,只是会导致写入操作暂时被阻塞。 本文转自我爱物联网博客园博客,原文链接:http://www.cnblogs.com/yydcdut/p/3558446.html,如需转载请自行联系原作者

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册