`
kylinsoong
  • 浏览: 236325 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Cassandra Dev 3:Cassandra 应用之CassandraAppender

阅读更多

    本文的目的是展示一个测试结果:Cassandra应用的高效性,稳定性,可靠性。具体:应用Cassandra记录Application运行中的日志。

    首先说明在一个Application中,不管是它在运行过程,还是在它的开发过程中,记录日志是非常有必要的,特别是一些Service-Based Application或Distributed系统。通过日志,Application的维护人员和开发人员可以更好的完成他们的工作。Log4j是一个很好的日志记录开源工具包,它可以将日志记录到文件系统、从Console输出、发送到指定邮箱等等。本文就是先对Log4j做一扩展,写一个自己的CassandraAppender,使其具有将日志保存到Cassandra的功能。具体实现Log4j定义的Appender……,如下

1. 首先Log4j结构:

 Log4j包括三个主要组件:

          Logger:用来记录日志,可以对她指定记录级别(ALL,DEBUG,INFO,WARN,ERROR,FATA);

          Layout:设置日志记录样式;

          Appender:指定日志输出位置。

这里主要研究Appender,先看下图:

  

 

      如图所示为Appender主要UML类图,AppenderSkeleton为抽象类,它实现了 org.apache.log4j.Appenderorg.apache.log4j.spi.OptionHandler 接口,所有的 appender 都必须扩展org.apache.log4j.AppenderSkeleton 类,

当然我们这里的CassandraAppender就是通过继 AppenderSkeleton来实现的 。在抽象类AppenderSkeleton中定义了抽象方法:

abstract protected void append(LoggingEvent event);

 


 所有的子类都要事先此方法,此方法中参数LoggingEvent代表日志信息,可以通过它获取日志名字,LoggerInfo等,此类还定义一方法:

public void activateOptions() { }

 


 如果需要对某些参数做一处理,本文中CassandraAppender就是在此方法中初始化Thrift RPC连接;

2. Appender生命周期,如下图:



     appender 实例不存在。或许框架还没有配置好。

     框架实例化了一个新的 appender。这发生在配置器类分析配置脚本中的一个 appender 声明的时候。配置器类调用 Class.newInstanceYourCustomAppender.class) ,这等价于动态调用 new YourCustomAppender() 。框架这样做是为了避免被硬编码为任何特定的 appender 名称;框架是通用的,适用于任何 appender。

   框架判断 appender 是否需要 layout。如果该 appender 不需要 layout,配置器就不会尝试从配置脚本中加载 layout 信息。



 如图CassandraAppender处于就绪状态时,将系统日志发送到Cassandra服务器是它的唯一任务,具体我给出源代码:

   Log4j 配置器调用 setter 方法。在所有属性都已设置好之后,框架就会调用这个方法。我们可以在这里激活必须同时激活的属性。

   配置器调用 activateOptions() 方法。在所有属性都已设置好之后,框架就会调用这个方法。程序员可以在这里激活必须同时激活的属性。

   Appender 准备就绪。 此刻,框架可以调用 append() 方法来处理日志记录请求。这个方法由 AppenderSkeleton.doAppend() 方法调用。

   最后,关闭appender。 当框架即将要删除您的自定义 appender 实例时,它会调用您的 appender 的 close() 方法。 close() 是一个清理方法,意味着 您需要释放已分配的所有资源。它是一个必需的方法,并且不接受任何参数。它必须把 closed 字段设置为 true ,并在有人尝试使用关闭的 appender 时向框架发出警报。

 

3. 扩展自己的Appender

如下图所示为CassandraAppender工作原理:

package com.xxx.log4j;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.UUID;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;


public class CassandraAppender extends AppenderSkeleton {
	
	Logger logger = Logger.getLogger(CassandraAppender.class);
	
	private TTransport tr;
	
	private Cassandra.Client client;
	
	private String keyspace;
	
	private String columnFamily;
	
	private String thriftAddress;
	
	private int thriftPort;
	
	private String keyStrategy;
	
	public CassandraAppender() {
	}

	public void activateOptions() {
		tr = new TSocket(thriftAddress, thriftPort);
		TProtocol proto = new TBinaryProtocol(tr);
        client = new Cassandra.Client(proto);
        try {
			tr.open();
		} catch (TTransportException e) {
			e.printStackTrace();
		}
	}

	public String getKeyspace() {
		return keyspace;
	}

	public void setKeyspace(String keyspace) {
		this.keyspace = keyspace;
	}

	public String getColumnFamily() {
		return columnFamily;
	}

	public void setColumnFamily(String columnFamily) {
		this.columnFamily = columnFamily;
	}

	public String getThriftAddress() {
		return thriftAddress;
	}

	public void setThriftAddress(String thriftAddress) {
		this.thriftAddress = thriftAddress;
	}

	public int getThriftPort() {
		return thriftPort;
	}

	public void setThriftPort(int thriftPort) {
		this.thriftPort = thriftPort;
	}

	public String getKeyStrategy() {
		return keyStrategy;
	}

	public void setKeyStrategy(String keyStrategy) {
		this.keyStrategy = keyStrategy;
	}

	protected void append(LoggingEvent event) {
		if(client == null) {
			System.out.println("client == null");
		}
		try {
			client.insert(
							keyspace, 
							event.getLoggerName(), 
							getColumnPath(columnFamily,event), 
							getStoreValue(event), 
							new Date().getTime(),
							ConsistencyLevel.ONE
							);
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		} catch (InvalidRequestException e) {
			e.printStackTrace();
		} catch (UnavailableException e) {
			e.printStackTrace();
		} catch (TimedOutException e) {
			e.printStackTrace();
		} catch (TException e) {
			e.printStackTrace();
		}
			}

	private ColumnPath getColumnPath(String columnFamily, LoggingEvent event) throws UnsupportedEncodingException {
		ColumnPath path = new ColumnPath();
		path.setColumn_family(columnFamily);
		if(getKeyStrategy().equals("uuid")) {
			UUID uuid = UUID.randomUUID();   
	        String str = uuid.toString();   
	        path.setColumn(str.getBytes());
		} else {
			path.setColumn((event.getLocationInformation().getMethodName() + "-" +  event.getLocationInformation().getLineNumber() + new Date().getTime()).getBytes("UTF-8"));
		}
		return path;
	}
	
	private byte[] getStoreValue(LoggingEvent event) throws UnsupportedEncodingException {
		return (event.getMessage()+ "").getBytes("UTF-8");
	}
	
	public void close() {
		if(tr.isOpen()) {
			tr.close();
		}
	}

	public boolean requiresLayout() {
		return false;
	}

}

 

 4. 测试

首先在配置Log4j日志文件log4j.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" >  

   <appender name="Cassandra" class="com.xxx.log4j.CassandraAppender">
     <errorHandler class="org.apache.log4j.helpers.OnlyOnceErrorHandler"/>
     <param name="keyspace" value="Twitter"/>
     <param name="ColumnFamily" value="Users"/>
     <param name="ThriftAddress" value="0.0.0.0"/>
     <param name="ThriftPort" value="9160"/>
     <param name="keyStrategy" value="uuid"/>
     <layout class="org.apache.log4j.PatternLayout">
       <param name="ConversionPattern" value="%d %-5p [%c{1}] %m%n"/>
     </layout>	    
   </appender>
 
   <root>
  	  <priority value="INFO" />
  	  <appender-ref ref="Cassandra"/>
   </root>
   
</log4j:configuration>

 

 然后运行自己测试类:

public class Log4jTestOne {
	
	static Logger logger = Logger.getLogger(Log4jTestOne.class);

	public static void main(String[] args) {
		Date start = new Date();
		for(int i = 0 ; i < 100000 ; i ++) {
			logger.info("info");
		}
		Date end = new Date();
		System.out.println("Completed spent time: " + (end.getTime() - start.getTime()));
	}

}

 

 接着到Cassandra查看结果,可以用Cassandra Thrift访问(我会就Cassandra Client有专门Blog)



 

 

 

 

 

 


 

  • 大小: 42.7 KB
  • 大小: 25 KB
  • 大小: 7.4 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics