1: <?php
2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18:
19: namespace Fluent\Logger;
20:
21: 22: 23: 24: 25:
26: class FluentLogger extends BaseLogger
27: {
28: const CONNECTION_TIMEOUT = 3;
29: const SOCKET_TIMEOUT = 3;
30: const MAX_WRITE_RETRY = 10;
31:
32:
33: const USLEEP_WAIT = 1000;
34:
35:
36: const DEFAULT_LISTEN_PORT = 24224;
37: const DEFAULT_ADDRESS = "127.0.0.1";
38:
39:
40: protected $host;
41:
42:
43: protected $port;
44:
45:
46: protected $transport;
47:
48: protected $socket;
49:
50:
51: protected $packer;
52:
53: protected $options = array(
54: "socket_timeout" => self::SOCKET_TIMEOUT,
55: "connection_timeout" => self::CONNECTION_TIMEOUT,
56: "usleep_wait" => self::USLEEP_WAIT,
57: );
58:
59: protected static $supported_transports = array(
60: "tcp","unix",
61: );
62:
63: protected static $acceptable_options = array(
64: "socket_timeout", "connection_timeout","usleep_wait",
65: );
66:
67: protected static $instances = array();
68:
69: 70: 71: 72: 73: 74: 75: 76: 77:
78: public function __construct($host = FluentLogger::DEFAULT_ADDRESS, $port = FluentLogger::DEFAULT_LISTEN_PORT, array $options = array())
79: {
80:
81: $this->host = $host;
82: $this->port = $port;
83:
84:
85: $this->transport = self::getTransportUri($host, $port);
86:
87: $this->packer = new JsonPacker();
88:
89: $this->mergeOptions($options);
90: }
91:
92: 93: 94: 95: 96: 97: 98: 99: 100:
101: public static function getTransportUri($host, $port)
102: {
103: if (($pos = strpos($host,"://")) !== false) {
104: $transport = substr($host,0,$pos);
105: $host = substr($host, $pos + 3);
106:
107: if (!in_array($transport, self::$supported_transports)) {
108: throw new \Exception("transport `{$transport}` does not support");
109: }
110:
111:
112: if ($transport == "unix") {
113:
114: $result = "unix://" . $host;
115: } else {
116: if (strpos($host,"::") !== false) {
117:
118: $host = sprintf("[%s]",trim($host,"[]"));
119: }
120:
121: $result = sprintf("%s://%s:%d",$transport, $host, $port);
122: }
123:
124: } else {
125: if (strpos($host,"::") !== false) {
126:
127: $host = sprintf("[%s]",trim($host,"[]"));
128: }
129:
130: $result = sprintf("tcp://%s:%d",$host, $port);
131: }
132:
133: return $result;
134: }
135:
136: 137: 138: 139: 140: 141:
142: public function setPacker(PackerInterface $packer)
143: {
144: return $this->packer = $packer;
145: }
146:
147: 148: 149: 150: 151:
152: public function getPacker()
153: {
154: return $this->packer;
155: }
156:
157: 158: 159: 160: 161: 162:
163: public function mergeOptions(array $options)
164: {
165: foreach ($options as $key => $value) {
166: if (!in_array($key, self::$acceptable_options)) {
167: throw new \Exception("option {$key} does not support");
168: }
169:
170: $this->options[$key] = $value;
171: }
172: }
173:
174: 175: 176: 177: 178: 179:
180: public function setOptions(array $options)
181: {
182: $this->options = array();
183:
184: foreach ($options as $key => $value) {
185: if (!in_array($key, self::$acceptable_options)) {
186: throw new \Exception("option {$key} does not support");
187: }
188:
189: $this->options[$key] = $value;
190: }
191: }
192:
193: 194: 195: 196: 197: 198: 199: 200:
201: public static function open($host = FluentLogger::DEFAULT_ADDRESS, $port = FluentLogger::DEFAULT_LISTEN_PORT, array $options = array())
202: {
203: $key = sprintf("%s:%s:%s",$host, $port, md5(join(",",$options)));
204:
205: if (!isset(self::$instances[$key])) {
206: $logger = new self($host,$port, $options);
207: self::$instances[$key] = $logger;
208: }
209:
210: return self::$instances[$key];
211: }
212:
213: 214: 215: 216: 217:
218: public static function clearIntances()
219: {
220: foreach (self::$instances as $object) {
221: unset($object);
222: }
223: self::$instances = array();
224: }
225:
226: 227: 228: 229: 230:
231: protected function connect()
232: {
233:
234:
235: $socket = @stream_socket_client($this->transport,$errno,$errstr,
236: $this->options["connection_timeout"], \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT);
237:
238: if (!$socket) {
239: $errors = error_get_last();
240: throw new \Exception($errors['message']);
241: }
242:
243:
244: stream_set_timeout($socket,$this->options["socket_timeout"]);
245: $this->socket = $socket;
246: }
247:
248: 249: 250: 251: 252:
253: protected function reconnect()
254: {
255: if (!is_resource($this->socket)) {
256: $this->connect();
257: }
258: }
259:
260: 261: 262: 263: 264: 265: 266: 267: 268:
269: public function post($tag, array $data)
270: {
271: $entity = new Entity($tag, $data);
272: return $this->postImpl($entity);
273: }
274:
275: 276: 277: 278: 279: 280:
281: public function post2(Entity $entity)
282: {
283: return $this->postImpl($entity);
284: }
285:
286: protected function postImpl(Entity $entity)
287: {
288: $buffer = $packed = $this->packer->pack($entity);
289: $length = strlen($packed);
290: $retry = $written = 0;
291:
292: try {
293: $this->reconnect();
294: } catch (\Exception $e) {
295: $this->processError($entity, $e->getMessage());
296: return false;
297: }
298:
299: try {
300:
301: while ($written < $length) {
302: $nwrite = $this->write($buffer);
303:
304: if ($nwrite === false) {
305:
306:
307: throw new \Exception("could not write message");
308: } else if ($nwrite === "") {
309:
310:
311: throw new \Exception("connection aborted");
312: } else if ($nwrite === 0) {
313: if ($retry > self::MAX_WRITE_RETRY) {
314: throw new \Exception("failed fwrite retry: max retry count");
315: }
316:
317: $retry++;
318: usleep($this->options["usleep_wait"]);
319: continue;
320: }
321:
322: $written += $nwrite;
323: $buffer = substr($packed,$written);
324: }
325: } catch (\Exception $e) {
326: $this->processError($entity, $e->getMessage());
327: return false;
328: }
329:
330: return true;
331: }
332:
333: 334: 335: 336: 337: 338:
339: protected function write($buffer)
340: {
341: return fwrite($this->socket, $buffer);
342: }
343:
344: public function __destruct()
345: {
346:
347: }
348: }
349: