Commit 34057f67 authored by 井庆林's avatar 井庆林

采用线程同步方案

parent cc7f7ede
...@@ -66,11 +66,9 @@ NSString *const MockCityId = @"beijing"; ...@@ -66,11 +66,9 @@ NSString *const MockCityId = @"beijing";
// [Phobos track:@"page_view" attributes:dict]; // [Phobos track:@"page_view" attributes:dict];
// array = [GMCache fetchObjectAtDocumentPathWithkey:PhobosCacheKey]; // array = [GMCache fetchObjectAtDocumentPathWithkey:PhobosCacheKey];
for (int i = 0; i < 10; i++) { for (int i = 0; i < 200; i++) {
[Phobos track:[NSString stringWithFormat:@"test-%d", i] attributes:dict sendNow:YES]; [Phobos track:[NSString stringWithFormat:@"tt-%d", i] attributes:dict sendNow:YES];
dispatch_async(dispatch_get_global_queue(0, 0), ^{ [Phobos track:[NSString stringWithFormat:@"pv-%d", i] attributes:dict sendNow:YES];
[Phobos track:[NSString stringWithFormat:@"page_view-%d", i] attributes:dict];// sendNow:(i % 2) == 0];
});
} }
} }
......
...@@ -420,14 +420,13 @@ static NewPhobos *_sharedClient; ...@@ -420,14 +420,13 @@ static NewPhobos *_sharedClient;
* 处理发送数据 * 处理发送数据
*/ */
+ (void)disposeSendDataWithImmediately:(BOOL)immediately { + (void)disposeSendDataWithImmediately:(BOOL)immediately {
[PhobosDataManager fetchToBeSendDataEntitiesWithEntitiesBlock:^(NSArray<PhobosSendDataEntity *> *entities) { NSArray<PhobosSendDataEntity *> *entities = [PhobosDataManager fetchToBeSendDataEntities];
if (immediately || entities.count >= PhobosShardCount) { if (immediately || entities.count >= PhobosShardCount) {
[PhobosDataManager updateDataEntities:entities sendStatus:PhobosDataSendStatusSending]; [PhobosDataManager updateDataEntities:entities sendStatus:PhobosDataSendStatusSending];
[PhobosSendManager sendDataWithEntities:entities completion:^(NSArray<PhobosSendDataEntity *> * _Nonnull finishEntities, NSInteger code) { [PhobosSendManager sendDataWithEntities:entities completion:^(NSArray<PhobosSendDataEntity *> * _Nonnull finishEntities, NSInteger code) {
[PhobosDataManager updateDataEntities:finishEntities sendStatus:(code == 200 ? PhobosDataSendStatusFinish : PhobosDataSendStatusError)]; [PhobosDataManager updateDataEntities:finishEntities sendStatus:(code == 200 ? PhobosDataSendStatusFinish : PhobosDataSendStatusError)];
}]; }];
} }
}];
} }
...@@ -437,26 +436,12 @@ static NewPhobos *_sharedClient; ...@@ -437,26 +436,12 @@ static NewPhobos *_sharedClient;
/** 获取所有非立即发送埋点数量 */ /** 获取所有非立即发送埋点数量 */
+ (NSUInteger)fetchToBeSendPhobosDataCount { + (NSUInteger)fetchToBeSendPhobosDataCount {
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); return [PhobosDataManager fetchCountOfToBeSendEntities];
__block NSUInteger count = 0;
[PhobosDataManager fetchCountOfToBeSendEntitiesWithCountBlock:^(NSUInteger entitiesCount) {
count = entitiesCount;
dispatch_semaphore_signal(semaphore);
}];
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
return count;
} }
/** 获取待发送埋点数据, 用同步来保障异步获取数据 */ /** 获取待发送埋点数据, 用同步来保障异步获取数据 */
+ (NSArray *)fetchToBeSendPhobosData { + (NSArray *)fetchToBeSendPhobosData {
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); return [PhobosDataManager fetchToBeSendDataEntities];
__block NSArray<PhobosSendDataEntity *> *entityArray;
[PhobosDataManager fetchToBeSendDataEntitiesWithEntitiesBlock:^(NSArray<PhobosSendDataEntity *> *entities) {
entityArray = entities;
dispatch_semaphore_signal(semaphore);
}];
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
return entityArray;
} }
/** 清除待发送埋点数据缓存 */ /** 清除待发送埋点数据缓存 */
......
...@@ -21,22 +21,22 @@ typedef NS_ENUM(NSInteger, PhobosDataSendStatus) { ...@@ -21,22 +21,22 @@ typedef NS_ENUM(NSInteger, PhobosDataSendStatus) {
@interface PhobosDataManager : NSObject @interface PhobosDataManager : NSObject
/** 获取待发送数据,包含待发送数据和发送失败数据 */ /** 获取待发送数据,包含待发送数据和发送失败数据 */
+ (void)fetchToBeSendDataEntitiesWithEntitiesBlock:(void (^)(NSArray<PhobosSendDataEntity *> *entities))entitiesBlock; + (NSArray<PhobosSendDataEntity *> *)fetchToBeSendDataEntities;
/** /**
* 通过 searchFilter 获取数据 * 通过 searchFilter 获取数据
*/ */
+ (void)fetchDataEntitiesWithPredicate:(NSPredicate *)searchFilter entitiesBlock:(void (^)(NSArray<PhobosSendDataEntity *> *entities))entitiesBlock; + (NSArray<PhobosSendDataEntity *> *)fetchDataEntitiesWithPredicate:(NSPredicate *)searchFilter;
/** /**
* 获取待发送和发送失败的数据数量 * 获取待发送和发送失败的数据数量
*/ */
+ (void)fetchCountOfToBeSendEntitiesWithCountBlock:(nonnull void (^)(NSUInteger count))countBlock; + (NSUInteger)fetchCountOfToBeSendEntities;
/** /**
* 通过 searchFilter 获取数据数量 * 通过 searchFilter 获取数据数量
*/ */
+ (void)fetchCountOfEntitiesWithPredicate:(NSPredicate *)searchFilter countBlock:(nonnull void (^)(NSUInteger count))countBlock; + (NSUInteger)fetchCountOfEntities;
/** /**
* 插入埋点数据 * 插入埋点数据
......
...@@ -16,10 +16,13 @@ ...@@ -16,10 +16,13 @@
static NSManagedObjectContext *PhobosDefaultContext; static NSManagedObjectContext *PhobosDefaultContext;
static dispatch_queue_t PhobosDataSaveQueue; static dispatch_queue_t PhobosDataSaveQueue;
static dispatch_semaphore_t phobos_semaphore_t;
static dispatch_semaphore_t phobos_rd_semaphore_t;
+ (void)initialize { + (void)initialize {
static dispatch_once_t onceToken; static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{ dispatch_once(&onceToken, ^{
phobos_semaphore_t = dispatch_semaphore_create(1);
PhobosDataSaveQueue = dispatch_queue_create("dispatch_rw_queue_phobos", DISPATCH_QUEUE_CONCURRENT); PhobosDataSaveQueue = dispatch_queue_create("dispatch_rw_queue_phobos", DISPATCH_QUEUE_CONCURRENT);
// 创建 NSManagedObjectContext,供埋点库访问CoreData库使用 // 创建 NSManagedObjectContext,供埋点库访问CoreData库使用
...@@ -43,45 +46,36 @@ static dispatch_queue_t PhobosDataSaveQueue; ...@@ -43,45 +46,36 @@ static dispatch_queue_t PhobosDataSaveQueue;
/** 将上次没有获取到发送结果的数据的状态修改为发送失败,待下次重新发送 */ /** 将上次没有获取到发送结果的数据的状态修改为发送失败,待下次重新发送 */
NSPredicate *predicate = [NSPredicate predicateWithFormat:@"status = %d", PhobosDataSendStatusSending]; NSPredicate *predicate = [NSPredicate predicateWithFormat:@"status = %d", PhobosDataSendStatusSending];
[self fetchDataEntitiesWithPredicate:predicate entitiesBlock:^(NSArray<PhobosSendDataEntity *> * entities) { NSArray<PhobosSendDataEntity *> *entities = [self fetchDataEntitiesWithPredicate:predicate];
[self updateDataEntities:entities sendStatus:PhobosDataSendStatusError]; [self updateDataEntities:entities sendStatus:PhobosDataSendStatusError];
}];
/** 将发送成功的数据删除 */ /** 将发送成功的数据删除 */
NSPredicate *finishPredicate = [NSPredicate predicateWithFormat:@"status = %d", PhobosDataSendStatusFinish]; NSPredicate *finishPredicate = [NSPredicate predicateWithFormat:@"status = %d", PhobosDataSendStatusFinish];
// [PhobosSendDataEntity MR_deleteAllMatchingPredicate:finishPredicate]; // [PhobosSendDataEntity MR_deleteAllMatchingPredicate:finishPredicate];
[self fetchDataEntitiesWithPredicate:finishPredicate entitiesBlock:^(NSArray<PhobosSendDataEntity *> * finishEntities) { NSArray<PhobosSendDataEntity *> *finishEntities = [self fetchDataEntitiesWithPredicate:finishPredicate];
[self deleteDataEntities:finishEntities]; [self deleteDataEntities:finishEntities];
}];
}); });
} }
+ (void)fetchToBeSendDataEntitiesWithEntitiesBlock:(void (^)(NSArray<PhobosSendDataEntity *> *))entitiesBlock { + (NSArray<PhobosSendDataEntity *> *)fetchToBeSendDataEntities {
NSPredicate *predicate = [NSPredicate predicateWithFormat:@"status = %d or status = %d", PhobosDataSendStatusToBeSend, PhobosDataSendStatusError]; NSPredicate *predicate = [NSPredicate predicateWithFormat:@"status = %d or status = %d", PhobosDataSendStatusToBeSend, PhobosDataSendStatusError];
[self fetchDataEntitiesWithPredicate:predicate entitiesBlock:entitiesBlock]; return [self fetchDataEntitiesWithPredicate:predicate];
} }
+ (void)fetchDataEntitiesWithPredicate:(NSPredicate *)searchFilter entitiesBlock:(nonnull void (^)(NSArray<PhobosSendDataEntity *> *))entitiesBlock { + (NSArray<PhobosSendDataEntity *> *)fetchDataEntitiesWithPredicate:(NSPredicate *)searchFilter {
dispatch_sync(PhobosDataSaveQueue, ^{ dispatch_semaphore_wait(phobos_semaphore_t, DISPATCH_TIME_FOREVER);
NSArray<PhobosSendDataEntity *> *entities = [PhobosSendDataEntity MR_findAllWithPredicate:searchFilter inContext:PhobosDefaultContext]; NSArray<PhobosSendDataEntity *> *entities = [PhobosSendDataEntity MR_findAllWithPredicate:searchFilter inContext:PhobosDefaultContext];
if (entitiesBlock) { dispatch_semaphore_signal(phobos_semaphore_t);
entitiesBlock(entities); return entities;
}
});
} }
+ (void)fetchCountOfToBeSendEntitiesWithCountBlock:(nonnull void (^)(NSUInteger))countBlock { + (NSUInteger)fetchCountOfToBeSendEntities {
NSPredicate *predicate = [NSPredicate predicateWithFormat:@"status = %d or status = %d", PhobosDataSendStatusToBeSend, PhobosDataSendStatusError]; NSPredicate *predicate = [NSPredicate predicateWithFormat:@"status = %d or status = %d", PhobosDataSendStatusToBeSend, PhobosDataSendStatusError];
[self fetchCountOfEntitiesWithPredicate:predicate countBlock:countBlock]; return [self fetchCountOfEntitiesWithPredicate:predicate];
} }
+ (void)fetchCountOfEntitiesWithPredicate:(NSPredicate *)searchFilter countBlock:(nonnull void (^)(NSUInteger))countBlock { + (NSUInteger)fetchCountOfEntitiesWithPredicate:(NSPredicate *)searchFilter {
dispatch_sync(PhobosDataSaveQueue, ^{ return [[PhobosSendDataEntity MR_numberOfEntitiesWithPredicate:searchFilter inContext:PhobosDefaultContext] integerValue];
NSUInteger count = [[PhobosSendDataEntity MR_numberOfEntitiesWithPredicate:searchFilter inContext:PhobosDefaultContext] integerValue];
if (countBlock) {
countBlock(count);
}
});
} }
+ (void)insertData:(NSDictionary *)data sendAPI:(NSString *)sendAPI { + (void)insertData:(NSDictionary *)data sendAPI:(NSString *)sendAPI {
...@@ -92,14 +86,14 @@ static dispatch_queue_t PhobosDataSaveQueue; ...@@ -92,14 +86,14 @@ static dispatch_queue_t PhobosDataSaveQueue;
if (!sendAPI || [sendAPI isEqualToString:@""] || !data) { if (!sendAPI || [sendAPI isEqualToString:@""] || !data) {
return; return;
} }
dispatch_barrier_async(PhobosDataSaveQueue, ^{ dispatch_semaphore_wait(phobos_semaphore_t, DISPATCH_TIME_FOREVER);
PhobosSendDataEntity *entity = [PhobosSendDataEntity MR_createEntityInContext:PhobosDefaultContext]; PhobosSendDataEntity *entity = [PhobosSendDataEntity MR_createEntityInContext:PhobosDefaultContext];
entity.data = [data mj_JSONData]; entity.data = [data mj_JSONData];
entity.api = sendAPI; entity.api = sendAPI;
entity.status = PhobosDataSendStatusToBeSend; entity.status = PhobosDataSendStatusToBeSend;
entity.id = mach_absolute_time(); entity.id = mach_absolute_time();
dispatch_semaphore_signal(phobos_semaphore_t);
[self saveWithCompletion:completion]; [self saveWithCompletion:completion];
});
} }
+ (void)updateDataEntities:(NSArray<PhobosSendDataEntity *> *)entities sendStatus:(PhobosDataSendStatus)sendStatus { + (void)updateDataEntities:(NSArray<PhobosSendDataEntity *> *)entities sendStatus:(PhobosDataSendStatus)sendStatus {
...@@ -108,37 +102,31 @@ static dispatch_queue_t PhobosDataSaveQueue; ...@@ -108,37 +102,31 @@ static dispatch_queue_t PhobosDataSaveQueue;
+ (void)updateDataEntities:(NSArray<PhobosSendDataEntity *> *)entities sendStatus:(PhobosDataSendStatus)sendStatus completion:(MRSaveCompletionHandler)completion { + (void)updateDataEntities:(NSArray<PhobosSendDataEntity *> *)entities sendStatus:(PhobosDataSendStatus)sendStatus completion:(MRSaveCompletionHandler)completion {
if (entities.count > 0) { if (entities.count > 0) {
dispatch_barrier_async(PhobosDataSaveQueue, ^{ dispatch_semaphore_wait(phobos_semaphore_t, DISPATCH_TIME_FOREVER);
NSInteger count = entities.count;
[entities enumerateObjectsUsingBlock:^(PhobosSendDataEntity *obj, NSUInteger idx, BOOL * _Nonnull stop) { [entities enumerateObjectsUsingBlock:^(PhobosSendDataEntity *obj, NSUInteger idx, BOOL * _Nonnull stop) {
obj.status = sendStatus; obj.status = sendStatus;
if (idx == count - 1) {
[self saveWithCompletion:completion];
}
}]; }];
}); dispatch_semaphore_signal(phobos_semaphore_t);
[self saveWithCompletion:completion];
} }
} }
+ (void)deleteDataEntities:(NSArray<PhobosSendDataEntity *> *)entities { + (void)deleteDataEntities:(NSArray<PhobosSendDataEntity *> *)entities {
if (entities.count > 0) { if (entities.count > 0) {
dispatch_barrier_async(PhobosDataSaveQueue, ^{ dispatch_semaphore_wait(phobos_semaphore_t, DISPATCH_TIME_FOREVER);
NSInteger count = entities.count;
[entities enumerateObjectsUsingBlock:^(PhobosSendDataEntity *obj, NSUInteger idx, BOOL * _Nonnull stop) { [entities enumerateObjectsUsingBlock:^(PhobosSendDataEntity *obj, NSUInteger idx, BOOL * _Nonnull stop) {
[obj MR_deleteEntityInContext:PhobosDefaultContext]; [obj MR_deleteEntityInContext:PhobosDefaultContext];
if (idx == count - 1) {
[self saveWithCompletion:nil];
}
}]; }];
}); dispatch_semaphore_signal(phobos_semaphore_t);
[self saveWithCompletion:nil];
} }
} }
+ (void)deleteAllEntities { + (void)deleteAllEntities {
dispatch_barrier_async(PhobosDataSaveQueue, ^{ dispatch_semaphore_wait(phobos_semaphore_t, DISPATCH_TIME_FOREVER);
[PhobosSendDataEntity MR_truncateAllInContext:PhobosDefaultContext]; [PhobosSendDataEntity MR_truncateAllInContext:PhobosDefaultContext];
dispatch_semaphore_signal(phobos_semaphore_t);
[self saveWithCompletion:nil]; [self saveWithCompletion:nil];
});
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment